]> Untitled Git - lemmy.git/blob - crates/apub_receive/src/inbox/person_inbox.rs
02d1d8be9ebb15482f3f82b92621e11e086e05dc
[lemmy.git] / crates / apub_receive / src / inbox / person_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   inbox::{
20     assert_activity_not_local,
21     get_activity_id,
22     inbox_verify_http_signature,
23     is_activity_already_known,
24     is_addressed_to_community_followers,
25     is_addressed_to_local_person,
26     receive_for_community::{
27       receive_add_for_community,
28       receive_block_user_for_community,
29       receive_create_for_community,
30       receive_delete_for_community,
31       receive_dislike_for_community,
32       receive_like_for_community,
33       receive_remove_for_community,
34       receive_undo_for_community,
35       receive_update_for_community,
36     },
37     verify_is_addressed_to_public,
38   },
39 };
40 use activitystreams::{
41   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Remove, Undo, Update},
42   base::AnyBase,
43   prelude::*,
44 };
45 use actix_web::{web, HttpRequest, HttpResponse};
46 use anyhow::{anyhow, Context};
47 use diesel::NotFound;
48 use lemmy_api_common::blocking;
49 use lemmy_apub::{
50   check_is_apub_id_valid,
51   fetcher::community::get_or_fetch_and_upsert_community,
52   get_activity_to_and_cc,
53   insert_activity,
54   ActorType,
55 };
56 use lemmy_db_queries::{source::person::Person_, ApubObject, Followable};
57 use lemmy_db_schema::source::{
58   community::{Community, CommunityFollower},
59   person::Person,
60   private_message::PrivateMessage,
61 };
62 use lemmy_utils::{location_info, LemmyError};
63 use lemmy_websocket::LemmyContext;
64 use log::info;
65 use serde::{Deserialize, Serialize};
66 use std::fmt::Debug;
67 use strum_macros::EnumString;
68 use url::Url;
69
70 /// Allowed activities for person inbox.
71 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
72 #[serde(rename_all = "PascalCase")]
73 pub enum PersonValidTypes {
74   Accept,   // community accepted our follow request
75   Create,   // create private message
76   Update,   // edit private message
77   Delete,   // private message or community deleted by creator
78   Undo,     // private message or community restored
79   Remove,   // community removed by admin
80   Announce, // post, comment or vote in community
81 }
82
83 pub type PersonAcceptedActivities = ActorAndObject<PersonValidTypes>;
84
85 /// Handler for all incoming activities to person inboxes.
86 pub async fn person_inbox(
87   request: HttpRequest,
88   input: web::Json<PersonAcceptedActivities>,
89   path: web::Path<String>,
90   context: web::Data<LemmyContext>,
91 ) -> Result<HttpResponse, LemmyError> {
92   let activity = input.into_inner();
93   // First of all check the http signature
94   let request_counter = &mut 0;
95   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
96
97   // Do nothing if we received the same activity before
98   let activity_id = get_activity_id(&activity, &actor.actor_id())?;
99   if is_activity_already_known(context.pool(), &activity_id).await? {
100     return Ok(HttpResponse::Ok().finish());
101   }
102
103   // Check if the activity is actually meant for us
104   let username = path.into_inner();
105   let person = blocking(&context.pool(), move |conn| {
106     Person::find_by_name(&conn, &username)
107   })
108   .await??;
109   let to_and_cc = get_activity_to_and_cc(&activity);
110   // TODO: we should also accept activities that are sent to community followers
111   if !to_and_cc.contains(&&person.actor_id()) {
112     return Err(anyhow!("Activity delivered to wrong person").into());
113   }
114
115   assert_activity_not_local(&activity)?;
116   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
117
118   person_receive_message(
119     activity.clone(),
120     Some(person.clone()),
121     actor.as_ref(),
122     &context,
123     request_counter,
124   )
125   .await
126 }
127
128 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
129 pub(crate) async fn person_receive_message(
130   activity: PersonAcceptedActivities,
131   to_person: Option<Person>,
132   actor: &dyn ActorType,
133   context: &LemmyContext,
134   request_counter: &mut i32,
135 ) -> Result<HttpResponse, LemmyError> {
136   is_for_person_inbox(context, &activity).await?;
137
138   info!(
139     "User received activity {:?} from {}",
140     &activity
141       .id_unchecked()
142       .context(location_info!())?
143       .to_string(),
144     &actor.actor_id().to_string()
145   );
146
147   let any_base = activity.clone().into_any_base()?;
148   let kind = activity.kind().context(location_info!())?;
149   let actor_url = actor.actor_id();
150   match kind {
151     PersonValidTypes::Accept => {
152       receive_accept(
153         &context,
154         any_base,
155         actor,
156         to_person.expect("person provided"),
157         request_counter,
158       )
159       .await?;
160     }
161     PersonValidTypes::Announce => {
162       Box::pin(receive_announce(&context, any_base, actor, request_counter)).await?
163     }
164     PersonValidTypes::Create => {
165       Box::pin(receive_create(
166         &context,
167         any_base,
168         actor_url,
169         request_counter,
170       ))
171       .await?
172     }
173     PersonValidTypes::Update => {
174       Box::pin(receive_update(
175         &context,
176         any_base,
177         actor_url,
178         request_counter,
179       ))
180       .await?
181     }
182     PersonValidTypes::Delete => {
183       Box::pin(receive_delete(
184         context,
185         any_base,
186         &actor_url,
187         request_counter,
188       ))
189       .await?
190     }
191     PersonValidTypes::Undo => {
192       Box::pin(receive_undo(context, any_base, &actor_url, request_counter)).await?
193     }
194     PersonValidTypes::Remove => Box::pin(receive_remove(context, any_base, &actor_url)).await?,
195   };
196
197   // TODO: would be logical to move websocket notification code here
198
199   Ok(HttpResponse::Ok().finish())
200 }
201
202 /// Returns true if the activity is addressed directly to one or more local persons, or if it is
203 /// addressed to the followers collection of a remote community, and at least one local person follows
204 /// it.
205 async fn is_for_person_inbox(
206   context: &LemmyContext,
207   activity: &PersonAcceptedActivities,
208 ) -> Result<(), LemmyError> {
209   let to_and_cc = get_activity_to_and_cc(activity);
210   // Check if it is addressed directly to any local person
211   if is_addressed_to_local_person(&to_and_cc, context.pool()).await? {
212     return Ok(());
213   }
214
215   // Check if it is addressed to any followers collection of a remote community, and that the
216   // community has local followers.
217   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
218   if let Some(c) = community {
219     let community_id = c.id;
220     let has_local_followers = blocking(&context.pool(), move |conn| {
221       CommunityFollower::has_local_followers(conn, community_id)
222     })
223     .await??;
224     if c.local {
225       return Err(
226         anyhow!("Remote activity cant be addressed to followers of local community").into(),
227       );
228     }
229     if has_local_followers {
230       return Ok(());
231     }
232   }
233
234   Err(anyhow!("Not addressed for any local person").into())
235 }
236
237 /// Handle accepted follows.
238 async fn receive_accept(
239   context: &LemmyContext,
240   activity: AnyBase,
241   actor: &dyn ActorType,
242   person: Person,
243   request_counter: &mut i32,
244 ) -> Result<(), LemmyError> {
245   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
246   verify_activity_domains_valid(&accept, &actor.actor_id(), false)?;
247
248   let object = accept.object().to_owned().one().context(location_info!())?;
249   let follow = Follow::from_any_base(object)?.context(location_info!())?;
250   verify_activity_domains_valid(&follow, &person.actor_id(), false)?;
251
252   let community_uri = accept
253     .actor()?
254     .to_owned()
255     .single_xsd_any_uri()
256     .context(location_info!())?;
257
258   let community =
259     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
260
261   let community_id = community.id;
262   let person_id = person.id;
263   // This will throw an error if no follow was requested
264   blocking(&context.pool(), move |conn| {
265     CommunityFollower::follow_accepted(conn, community_id, person_id)
266   })
267   .await??;
268
269   Ok(())
270 }
271
272 #[derive(EnumString)]
273 enum AnnouncableActivities {
274   Create,
275   Update,
276   Like,
277   Dislike,
278   Delete,
279   Remove,
280   Undo,
281   Add,
282   Block,
283 }
284
285 /// Takes an announce and passes the inner activity to the appropriate handler.
286 pub async fn receive_announce(
287   context: &LemmyContext,
288   activity: AnyBase,
289   actor: &dyn ActorType,
290   request_counter: &mut i32,
291 ) -> Result<(), LemmyError> {
292   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
293   verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
294   verify_is_addressed_to_public(&announce)?;
295
296   let kind = announce
297     .object()
298     .as_single_kind_str()
299     .and_then(|s| s.parse().ok());
300   let inner_activity = announce
301     .object()
302     .to_owned()
303     .one()
304     .context(location_info!())?;
305
306   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
307   check_is_apub_id_valid(&inner_id, false)?;
308   if is_activity_already_known(context.pool(), &inner_id).await? {
309     return Ok(());
310   }
311
312   use AnnouncableActivities::*;
313   match kind {
314     Some(Create) => {
315       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
316     }
317     Some(Update) => {
318       receive_update_for_community(
319         context,
320         inner_activity,
321         Some(announce),
322         &inner_id,
323         request_counter,
324       )
325       .await
326     }
327     Some(Like) => {
328       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
329     }
330     Some(Dislike) => {
331       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
332     }
333     Some(Delete) => {
334       receive_delete_for_community(
335         context,
336         inner_activity,
337         Some(announce),
338         &inner_id,
339         request_counter,
340       )
341       .await
342     }
343     Some(Remove) => {
344       receive_remove_for_community(context, inner_activity, Some(announce), request_counter).await
345     }
346     Some(Undo) => {
347       receive_undo_for_community(
348         context,
349         inner_activity,
350         Some(announce),
351         &inner_id,
352         request_counter,
353       )
354       .await
355     }
356     Some(Add) => {
357       receive_add_for_community(context, inner_activity, Some(announce), request_counter).await
358     }
359     Some(Block) => {
360       receive_block_user_for_community(context, inner_activity, Some(announce), request_counter)
361         .await
362     }
363     _ => receive_unhandled_activity(inner_activity),
364   }
365 }
366
367 /// Receive either a new private message, or a new comment mention. We distinguish them by checking
368 /// whether the activity is public.
369 async fn receive_create(
370   context: &LemmyContext,
371   activity: AnyBase,
372   expected_domain: Url,
373   request_counter: &mut i32,
374 ) -> Result<(), LemmyError> {
375   let create = Create::from_any_base(activity)?.context(location_info!())?;
376   verify_activity_domains_valid(&create, &expected_domain, true)?;
377   if verify_is_addressed_to_public(&create).is_ok() {
378     receive_create_comment(create, context, request_counter).await
379   } else {
380     receive_create_private_message(&context, create, expected_domain, request_counter).await
381   }
382 }
383
384 /// Receive either an updated private message, or an updated comment mention. We distinguish
385 /// them by checking whether the activity is public.
386 async fn receive_update(
387   context: &LemmyContext,
388   activity: AnyBase,
389   expected_domain: Url,
390   request_counter: &mut i32,
391 ) -> Result<(), LemmyError> {
392   let update = Update::from_any_base(activity)?.context(location_info!())?;
393   verify_activity_domains_valid(&update, &expected_domain, true)?;
394   if verify_is_addressed_to_public(&update).is_ok() {
395     receive_update_comment(update, context, request_counter).await
396   } else {
397     receive_update_private_message(&context, update, expected_domain, request_counter).await
398   }
399 }
400
401 async fn receive_delete(
402   context: &LemmyContext,
403   any_base: AnyBase,
404   expected_domain: &Url,
405   request_counter: &mut i32,
406 ) -> Result<(), LemmyError> {
407   use CommunityOrPrivateMessage::*;
408
409   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
410   verify_activity_domains_valid(&delete, expected_domain, true)?;
411   let object_uri = delete
412     .object()
413     .to_owned()
414     .single_xsd_any_uri()
415     .context(location_info!())?;
416
417   match find_community_or_private_message_by_id(context, object_uri).await? {
418     Community(c) => receive_delete_community(context, c).await,
419     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
420   }
421 }
422
423 async fn receive_remove(
424   context: &LemmyContext,
425   any_base: AnyBase,
426   expected_domain: &Url,
427 ) -> Result<(), LemmyError> {
428   let remove = Remove::from_any_base(any_base.clone())?.context(location_info!())?;
429   verify_activity_domains_valid(&remove, expected_domain, true)?;
430   let object_uri = remove
431     .object()
432     .to_owned()
433     .single_xsd_any_uri()
434     .context(location_info!())?;
435   let community = blocking(context.pool(), move |conn| {
436     Community::read_from_apub_id(conn, &object_uri.into())
437   })
438   .await??;
439   receive_remove_community(&context, community).await
440 }
441
442 async fn receive_undo(
443   context: &LemmyContext,
444   any_base: AnyBase,
445   expected_domain: &Url,
446   request_counter: &mut i32,
447 ) -> Result<(), LemmyError> {
448   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
449   verify_activity_domains_valid(&undo, expected_domain, true)?;
450
451   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
452   let kind = inner_activity.kind_str();
453   match kind {
454     Some("Delete") => {
455       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
456       verify_activity_domains_valid(&delete, expected_domain, true)?;
457       let object_uri = delete
458         .object()
459         .to_owned()
460         .single_xsd_any_uri()
461         .context(location_info!())?;
462       use CommunityOrPrivateMessage::*;
463       match find_community_or_private_message_by_id(context, object_uri).await? {
464         Community(c) => receive_undo_delete_community(context, c).await,
465         PrivateMessage(p) => {
466           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
467             .await
468         }
469       }
470     }
471     Some("Remove") => {
472       let remove = Remove::from_any_base(inner_activity)?.context(location_info!())?;
473       let object_uri = remove
474         .object()
475         .to_owned()
476         .single_xsd_any_uri()
477         .context(location_info!())?;
478       let community = blocking(context.pool(), move |conn| {
479         Community::read_from_apub_id(conn, &object_uri.into())
480       })
481       .await??;
482       receive_undo_remove_community(context, community).await
483     }
484     _ => receive_unhandled_activity(undo),
485   }
486 }
487 enum CommunityOrPrivateMessage {
488   Community(Community),
489   PrivateMessage(PrivateMessage),
490 }
491
492 async fn find_community_or_private_message_by_id(
493   context: &LemmyContext,
494   apub_id: Url,
495 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
496   let ap_id = apub_id.to_owned();
497   let community = blocking(context.pool(), move |conn| {
498     Community::read_from_apub_id(conn, &ap_id.into())
499   })
500   .await?;
501   if let Ok(c) = community {
502     return Ok(CommunityOrPrivateMessage::Community(c));
503   }
504
505   let ap_id = apub_id.to_owned();
506   let private_message = blocking(context.pool(), move |conn| {
507     PrivateMessage::read_from_apub_id(conn, &ap_id.into())
508   })
509   .await?;
510   if let Ok(p) = private_message {
511     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
512   }
513
514   Err(NotFound.into())
515 }