]> Untitled Git - lemmy.git/blob - lemmy_apub/src/inbox/user_inbox.rs
Merge branch 'main' into move_views_to_diesel
[lemmy.git] / lemmy_apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   check_is_apub_id_valid,
20   fetcher::get_or_fetch_and_upsert_community,
21   inbox::{
22     assert_activity_not_local,
23     get_activity_id,
24     get_activity_to_and_cc,
25     inbox_verify_http_signature,
26     is_activity_already_known,
27     is_addressed_to_community_followers,
28     is_addressed_to_local_user,
29     is_addressed_to_public,
30     receive_for_community::{
31       receive_create_for_community,
32       receive_delete_for_community,
33       receive_dislike_for_community,
34       receive_like_for_community,
35       receive_remove_for_community,
36       receive_undo_for_community,
37       receive_update_for_community,
38     },
39   },
40   insert_activity,
41   ActorType,
42 };
43 use activitystreams::{
44   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
45   base::AnyBase,
46   prelude::*,
47 };
48 use actix_web::{web, HttpRequest, HttpResponse};
49 use anyhow::{anyhow, Context};
50 use diesel::NotFound;
51 use lemmy_db::{
52   community::{Community, CommunityFollower},
53   private_message::PrivateMessage,
54   user::User_,
55   ApubObject,
56   Followable,
57 };
58 use lemmy_structs::blocking;
59 use lemmy_utils::{location_info, LemmyError};
60 use lemmy_websocket::LemmyContext;
61 use log::debug;
62 use serde::{Deserialize, Serialize};
63 use std::fmt::Debug;
64 use url::Url;
65
66 /// Allowed activities for user inbox.
67 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
68 #[serde(rename_all = "PascalCase")]
69 pub enum UserValidTypes {
70   Accept,   // community accepted our follow request
71   Create,   // create private message
72   Update,   // edit private message
73   Delete,   // private message or community deleted by creator
74   Undo,     // private message or community restored
75   Remove,   // community removed by admin
76   Announce, // post, comment or vote in community
77 }
78
79 pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
80
81 /// Handler for all incoming activities to user inboxes.
82 pub async fn user_inbox(
83   request: HttpRequest,
84   input: web::Json<UserAcceptedActivities>,
85   path: web::Path<String>,
86   context: web::Data<LemmyContext>,
87 ) -> Result<HttpResponse, LemmyError> {
88   let activity = input.into_inner();
89   // First of all check the http signature
90   let request_counter = &mut 0;
91   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
92
93   // Do nothing if we received the same activity before
94   let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
95   if is_activity_already_known(context.pool(), &activity_id).await? {
96     return Ok(HttpResponse::Ok().finish());
97   }
98
99   // Check if the activity is actually meant for us
100   let username = path.into_inner();
101   let user = blocking(&context.pool(), move |conn| {
102     User_::read_from_name(&conn, &username)
103   })
104   .await??;
105   let to_and_cc = get_activity_to_and_cc(&activity);
106   // TODO: we should also accept activities that are sent to community followers
107   if !to_and_cc.contains(&&user.actor_id()?) {
108     return Err(anyhow!("Activity delivered to wrong user").into());
109   }
110
111   assert_activity_not_local(&activity)?;
112   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
113
114   debug!(
115     "User {} received activity {:?} from {}",
116     user.name,
117     &activity.id_unchecked(),
118     &actor.actor_id_str()
119   );
120
121   user_receive_message(
122     activity.clone(),
123     Some(user.clone()),
124     actor.as_ref(),
125     &context,
126     request_counter,
127   )
128   .await
129 }
130
131 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
132 pub(crate) async fn user_receive_message(
133   activity: UserAcceptedActivities,
134   to_user: Option<User_>,
135   actor: &dyn ActorType,
136   context: &LemmyContext,
137   request_counter: &mut i32,
138 ) -> Result<HttpResponse, LemmyError> {
139   is_for_user_inbox(context, &activity).await?;
140
141   let any_base = activity.clone().into_any_base()?;
142   let kind = activity.kind().context(location_info!())?;
143   let actor_url = actor.actor_id()?;
144   match kind {
145     UserValidTypes::Accept => {
146       receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
147     }
148     UserValidTypes::Announce => {
149       receive_announce(&context, any_base, actor, request_counter).await?
150     }
151     UserValidTypes::Create => {
152       receive_create(&context, any_base, actor_url, request_counter).await?
153     }
154     UserValidTypes::Update => {
155       receive_update(&context, any_base, actor_url, request_counter).await?
156     }
157     UserValidTypes::Delete => {
158       receive_delete(context, any_base, &actor_url, request_counter).await?
159     }
160     UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
161     UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
162   };
163
164   // TODO: would be logical to move websocket notification code here
165
166   Ok(HttpResponse::Ok().finish())
167 }
168
169 /// Returns true if the activity is addressed directly to one or more local users, or if it is
170 /// addressed to the followers collection of a remote community, and at least one local user follows
171 /// it.
172 async fn is_for_user_inbox(
173   context: &LemmyContext,
174   activity: &UserAcceptedActivities,
175 ) -> Result<(), LemmyError> {
176   let to_and_cc = get_activity_to_and_cc(activity);
177   // Check if it is addressed directly to any local user
178   if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
179     return Ok(());
180   }
181
182   // Check if it is addressed to any followers collection of a remote community, and that the
183   // community has local followers.
184   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
185   if let Some(c) = community {
186     let community_id = c.id;
187     let has_local_followers = blocking(&context.pool(), move |conn| {
188       CommunityFollower::has_local_followers(conn, community_id)
189     })
190     .await??;
191     if c.local {
192       return Err(
193         anyhow!("Remote activity cant be addressed to followers of local community").into(),
194       );
195     }
196     if has_local_followers {
197       return Ok(());
198     }
199   }
200
201   Err(anyhow!("Not addressed for any local user").into())
202 }
203
204 /// Handle accepted follows.
205 async fn receive_accept(
206   context: &LemmyContext,
207   activity: AnyBase,
208   actor: &dyn ActorType,
209   user: User_,
210   request_counter: &mut i32,
211 ) -> Result<(), LemmyError> {
212   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
213   verify_activity_domains_valid(&accept, &actor.actor_id()?, false)?;
214
215   let object = accept.object().to_owned().one().context(location_info!())?;
216   let follow = Follow::from_any_base(object)?.context(location_info!())?;
217   verify_activity_domains_valid(&follow, &user.actor_id()?, false)?;
218
219   let community_uri = accept
220     .actor()?
221     .to_owned()
222     .single_xsd_any_uri()
223     .context(location_info!())?;
224
225   let community =
226     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
227
228   let community_id = community.id;
229   let user_id = user.id;
230   // This will throw an error if no follow was requested
231   blocking(&context.pool(), move |conn| {
232     CommunityFollower::follow_accepted(conn, community_id, user_id)
233   })
234   .await??;
235
236   Ok(())
237 }
238
239 /// Takes an announce and passes the inner activity to the appropriate handler.
240 async fn receive_announce(
241   context: &LemmyContext,
242   activity: AnyBase,
243   actor: &dyn ActorType,
244   request_counter: &mut i32,
245 ) -> Result<(), LemmyError> {
246   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
247   verify_activity_domains_valid(&announce, &actor.actor_id()?, false)?;
248   is_addressed_to_public(&announce)?;
249
250   let kind = announce.object().as_single_kind_str();
251   let inner_activity = announce
252     .object()
253     .to_owned()
254     .one()
255     .context(location_info!())?;
256
257   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
258   check_is_apub_id_valid(&inner_id)?;
259   if is_activity_already_known(context.pool(), &inner_id).await? {
260     return Ok(());
261   }
262
263   match kind {
264     Some("Create") => {
265       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
266     }
267     Some("Update") => {
268       receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
269     }
270     Some("Like") => {
271       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
272     }
273     Some("Dislike") => {
274       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
275     }
276     Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await,
277     Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await,
278     Some("Undo") => {
279       receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
280     }
281     _ => receive_unhandled_activity(inner_activity),
282   }
283 }
284
285 async fn receive_create(
286   context: &LemmyContext,
287   activity: AnyBase,
288   expected_domain: Url,
289   request_counter: &mut i32,
290 ) -> Result<(), LemmyError> {
291   let create = Create::from_any_base(activity)?.context(location_info!())?;
292   verify_activity_domains_valid(&create, &expected_domain, true)?;
293   if is_addressed_to_public(&create).is_ok() {
294     receive_create_comment(create, context, request_counter).await
295   } else {
296     receive_create_private_message(&context, create, expected_domain, request_counter).await
297   }
298 }
299
300 async fn receive_update(
301   context: &LemmyContext,
302   activity: AnyBase,
303   expected_domain: Url,
304   request_counter: &mut i32,
305 ) -> Result<(), LemmyError> {
306   let update = Update::from_any_base(activity)?.context(location_info!())?;
307   verify_activity_domains_valid(&update, &expected_domain, true)?;
308   if is_addressed_to_public(&update).is_ok() {
309     receive_update_comment(update, context, request_counter).await
310   } else {
311     receive_update_private_message(&context, update, expected_domain, request_counter).await
312   }
313 }
314
315 async fn receive_delete(
316   context: &LemmyContext,
317   any_base: AnyBase,
318   expected_domain: &Url,
319   request_counter: &mut i32,
320 ) -> Result<(), LemmyError> {
321   use CommunityOrPrivateMessage::*;
322
323   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
324   verify_activity_domains_valid(&delete, expected_domain, true)?;
325   let object_uri = delete
326     .object()
327     .to_owned()
328     .single_xsd_any_uri()
329     .context(location_info!())?;
330
331   match find_community_or_private_message_by_id(context, object_uri).await? {
332     Community(c) => receive_delete_community(context, c).await,
333     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
334   }
335 }
336
337 async fn receive_undo(
338   context: &LemmyContext,
339   any_base: AnyBase,
340   expected_domain: &Url,
341   request_counter: &mut i32,
342 ) -> Result<(), LemmyError> {
343   use CommunityOrPrivateMessage::*;
344   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
345   verify_activity_domains_valid(&undo, expected_domain, true)?;
346
347   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
348   let kind = inner_activity.kind_str();
349   match kind {
350     Some("Delete") => {
351       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
352       verify_activity_domains_valid(&delete, expected_domain, true)?;
353       let object_uri = delete
354         .object()
355         .to_owned()
356         .single_xsd_any_uri()
357         .context(location_info!())?;
358       match find_community_or_private_message_by_id(context, object_uri).await? {
359         Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
360         PrivateMessage(p) => {
361           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
362             .await
363         }
364       }
365     }
366     Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
367     _ => receive_unhandled_activity(undo),
368   }
369 }
370 enum CommunityOrPrivateMessage {
371   Community(Community),
372   PrivateMessage(PrivateMessage),
373 }
374
375 async fn find_community_or_private_message_by_id(
376   context: &LemmyContext,
377   apub_id: Url,
378 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
379   let ap_id = apub_id.to_string();
380   let community = blocking(context.pool(), move |conn| {
381     Community::read_from_apub_id(conn, &ap_id)
382   })
383   .await?;
384   if let Ok(c) = community {
385     return Ok(CommunityOrPrivateMessage::Community(c));
386   }
387
388   let ap_id = apub_id.to_string();
389   let private_message = blocking(context.pool(), move |conn| {
390     PrivateMessage::read_from_apub_id(conn, &ap_id)
391   })
392   .await?;
393   if let Ok(p) = private_message {
394     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
395   }
396
397   return Err(NotFound.into());
398 }