]> Untitled Git - lemmy.git/blob - crates/apub/src/inbox/person_inbox.rs
Split api crate into api_structs and api
[lemmy.git] / crates / apub / src / inbox / person_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   check_is_apub_id_valid,
20   fetcher::community::get_or_fetch_and_upsert_community,
21   inbox::{
22     assert_activity_not_local,
23     get_activity_id,
24     get_activity_to_and_cc,
25     inbox_verify_http_signature,
26     is_activity_already_known,
27     is_addressed_to_community_followers,
28     is_addressed_to_local_person,
29     receive_for_community::{
30       receive_add_for_community,
31       receive_create_for_community,
32       receive_delete_for_community,
33       receive_dislike_for_community,
34       receive_like_for_community,
35       receive_remove_for_community,
36       receive_undo_for_community,
37       receive_update_for_community,
38     },
39     verify_is_addressed_to_public,
40   },
41   insert_activity,
42   ActorType,
43 };
44 use activitystreams::{
45   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Remove, Undo, Update},
46   base::AnyBase,
47   prelude::*,
48 };
49 use actix_web::{web, HttpRequest, HttpResponse};
50 use anyhow::{anyhow, Context};
51 use diesel::NotFound;
52 use lemmy_api_common::blocking;
53 use lemmy_db_queries::{source::person::Person_, ApubObject, Followable};
54 use lemmy_db_schema::source::{
55   community::{Community, CommunityFollower},
56   person::Person,
57   private_message::PrivateMessage,
58 };
59 use lemmy_utils::{location_info, LemmyError};
60 use lemmy_websocket::LemmyContext;
61 use log::debug;
62 use serde::{Deserialize, Serialize};
63 use std::fmt::Debug;
64 use strum_macros::EnumString;
65 use url::Url;
66
67 /// Allowed activities for person inbox.
68 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
69 #[serde(rename_all = "PascalCase")]
70 pub enum PersonValidTypes {
71   Accept,   // community accepted our follow request
72   Create,   // create private message
73   Update,   // edit private message
74   Delete,   // private message or community deleted by creator
75   Undo,     // private message or community restored
76   Remove,   // community removed by admin
77   Announce, // post, comment or vote in community
78 }
79
80 pub type PersonAcceptedActivities = ActorAndObject<PersonValidTypes>;
81
82 /// Handler for all incoming activities to person inboxes.
83 pub async fn person_inbox(
84   request: HttpRequest,
85   input: web::Json<PersonAcceptedActivities>,
86   path: web::Path<String>,
87   context: web::Data<LemmyContext>,
88 ) -> Result<HttpResponse, LemmyError> {
89   let activity = input.into_inner();
90   // First of all check the http signature
91   let request_counter = &mut 0;
92   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
93
94   // Do nothing if we received the same activity before
95   let activity_id = get_activity_id(&activity, &actor.actor_id())?;
96   if is_activity_already_known(context.pool(), &activity_id).await? {
97     return Ok(HttpResponse::Ok().finish());
98   }
99
100   // Check if the activity is actually meant for us
101   let username = path.into_inner();
102   let person = blocking(&context.pool(), move |conn| {
103     Person::find_by_name(&conn, &username)
104   })
105   .await??;
106   let to_and_cc = get_activity_to_and_cc(&activity);
107   // TODO: we should also accept activities that are sent to community followers
108   if !to_and_cc.contains(&&person.actor_id()) {
109     return Err(anyhow!("Activity delivered to wrong person").into());
110   }
111
112   assert_activity_not_local(&activity)?;
113   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
114
115   debug!(
116     "Person {} received activity {:?} from {}",
117     person.name,
118     &activity.id_unchecked(),
119     &actor.actor_id()
120   );
121
122   person_receive_message(
123     activity.clone(),
124     Some(person.clone()),
125     actor.as_ref(),
126     &context,
127     request_counter,
128   )
129   .await
130 }
131
132 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
133 pub(crate) async fn person_receive_message(
134   activity: PersonAcceptedActivities,
135   to_person: Option<Person>,
136   actor: &dyn ActorType,
137   context: &LemmyContext,
138   request_counter: &mut i32,
139 ) -> Result<HttpResponse, LemmyError> {
140   is_for_person_inbox(context, &activity).await?;
141
142   let any_base = activity.clone().into_any_base()?;
143   let kind = activity.kind().context(location_info!())?;
144   let actor_url = actor.actor_id();
145   match kind {
146     PersonValidTypes::Accept => {
147       receive_accept(
148         &context,
149         any_base,
150         actor,
151         to_person.expect("person provided"),
152         request_counter,
153       )
154       .await?;
155     }
156     PersonValidTypes::Announce => {
157       Box::pin(receive_announce(&context, any_base, actor, request_counter)).await?
158     }
159     PersonValidTypes::Create => {
160       Box::pin(receive_create(
161         &context,
162         any_base,
163         actor_url,
164         request_counter,
165       ))
166       .await?
167     }
168     PersonValidTypes::Update => {
169       Box::pin(receive_update(
170         &context,
171         any_base,
172         actor_url,
173         request_counter,
174       ))
175       .await?
176     }
177     PersonValidTypes::Delete => {
178       Box::pin(receive_delete(
179         context,
180         any_base,
181         &actor_url,
182         request_counter,
183       ))
184       .await?
185     }
186     PersonValidTypes::Undo => {
187       Box::pin(receive_undo(context, any_base, &actor_url, request_counter)).await?
188     }
189     PersonValidTypes::Remove => Box::pin(receive_remove(context, any_base, &actor_url)).await?,
190   };
191
192   // TODO: would be logical to move websocket notification code here
193
194   Ok(HttpResponse::Ok().finish())
195 }
196
197 /// Returns true if the activity is addressed directly to one or more local persons, or if it is
198 /// addressed to the followers collection of a remote community, and at least one local person follows
199 /// it.
200 async fn is_for_person_inbox(
201   context: &LemmyContext,
202   activity: &PersonAcceptedActivities,
203 ) -> Result<(), LemmyError> {
204   let to_and_cc = get_activity_to_and_cc(activity);
205   // Check if it is addressed directly to any local person
206   if is_addressed_to_local_person(&to_and_cc, context.pool()).await? {
207     return Ok(());
208   }
209
210   // Check if it is addressed to any followers collection of a remote community, and that the
211   // community has local followers.
212   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
213   if let Some(c) = community {
214     let community_id = c.id;
215     let has_local_followers = blocking(&context.pool(), move |conn| {
216       CommunityFollower::has_local_followers(conn, community_id)
217     })
218     .await??;
219     if c.local {
220       return Err(
221         anyhow!("Remote activity cant be addressed to followers of local community").into(),
222       );
223     }
224     if has_local_followers {
225       return Ok(());
226     }
227   }
228
229   Err(anyhow!("Not addressed for any local person").into())
230 }
231
232 /// Handle accepted follows.
233 async fn receive_accept(
234   context: &LemmyContext,
235   activity: AnyBase,
236   actor: &dyn ActorType,
237   person: Person,
238   request_counter: &mut i32,
239 ) -> Result<(), LemmyError> {
240   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
241   verify_activity_domains_valid(&accept, &actor.actor_id(), false)?;
242
243   let object = accept.object().to_owned().one().context(location_info!())?;
244   let follow = Follow::from_any_base(object)?.context(location_info!())?;
245   verify_activity_domains_valid(&follow, &person.actor_id(), false)?;
246
247   let community_uri = accept
248     .actor()?
249     .to_owned()
250     .single_xsd_any_uri()
251     .context(location_info!())?;
252
253   let community =
254     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
255
256   let community_id = community.id;
257   let person_id = person.id;
258   // This will throw an error if no follow was requested
259   blocking(&context.pool(), move |conn| {
260     CommunityFollower::follow_accepted(conn, community_id, person_id)
261   })
262   .await??;
263
264   Ok(())
265 }
266
267 #[derive(EnumString)]
268 enum AnnouncableActivities {
269   Create,
270   Update,
271   Like,
272   Dislike,
273   Delete,
274   Remove,
275   Undo,
276   Add,
277 }
278
279 /// Takes an announce and passes the inner activity to the appropriate handler.
280 pub async fn receive_announce(
281   context: &LemmyContext,
282   activity: AnyBase,
283   actor: &dyn ActorType,
284   request_counter: &mut i32,
285 ) -> Result<(), LemmyError> {
286   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
287   verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
288   verify_is_addressed_to_public(&announce)?;
289
290   let kind = announce
291     .object()
292     .as_single_kind_str()
293     .and_then(|s| s.parse().ok());
294   let inner_activity = announce
295     .object()
296     .to_owned()
297     .one()
298     .context(location_info!())?;
299
300   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
301   check_is_apub_id_valid(&inner_id)?;
302   if is_activity_already_known(context.pool(), &inner_id).await? {
303     return Ok(());
304   }
305
306   use AnnouncableActivities::*;
307   match kind {
308     Some(Create) => {
309       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
310     }
311     Some(Update) => {
312       receive_update_for_community(
313         context,
314         inner_activity,
315         Some(announce),
316         &inner_id,
317         request_counter,
318       )
319       .await
320     }
321     Some(Like) => {
322       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
323     }
324     Some(Dislike) => {
325       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
326     }
327     Some(Delete) => {
328       receive_delete_for_community(
329         context,
330         inner_activity,
331         Some(announce),
332         &inner_id,
333         request_counter,
334       )
335       .await
336     }
337     Some(Remove) => {
338       receive_remove_for_community(context, inner_activity, Some(announce), request_counter).await
339     }
340     Some(Undo) => {
341       receive_undo_for_community(
342         context,
343         inner_activity,
344         Some(announce),
345         &inner_id,
346         request_counter,
347       )
348       .await
349     }
350     Some(Add) => {
351       receive_add_for_community(context, inner_activity, Some(announce), request_counter).await
352     }
353     _ => receive_unhandled_activity(inner_activity),
354   }
355 }
356
357 /// Receive either a new private message, or a new comment mention. We distinguish them by checking
358 /// whether the activity is public.
359 async fn receive_create(
360   context: &LemmyContext,
361   activity: AnyBase,
362   expected_domain: Url,
363   request_counter: &mut i32,
364 ) -> Result<(), LemmyError> {
365   let create = Create::from_any_base(activity)?.context(location_info!())?;
366   verify_activity_domains_valid(&create, &expected_domain, true)?;
367   if verify_is_addressed_to_public(&create).is_ok() {
368     receive_create_comment(create, context, request_counter).await
369   } else {
370     receive_create_private_message(&context, create, expected_domain, request_counter).await
371   }
372 }
373
374 /// Receive either an updated private message, or an updated comment mention. We distinguish
375 /// them by checking whether the activity is public.
376 async fn receive_update(
377   context: &LemmyContext,
378   activity: AnyBase,
379   expected_domain: Url,
380   request_counter: &mut i32,
381 ) -> Result<(), LemmyError> {
382   let update = Update::from_any_base(activity)?.context(location_info!())?;
383   verify_activity_domains_valid(&update, &expected_domain, true)?;
384   if verify_is_addressed_to_public(&update).is_ok() {
385     receive_update_comment(update, context, request_counter).await
386   } else {
387     receive_update_private_message(&context, update, expected_domain, request_counter).await
388   }
389 }
390
391 async fn receive_delete(
392   context: &LemmyContext,
393   any_base: AnyBase,
394   expected_domain: &Url,
395   request_counter: &mut i32,
396 ) -> Result<(), LemmyError> {
397   use CommunityOrPrivateMessage::*;
398
399   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
400   verify_activity_domains_valid(&delete, expected_domain, true)?;
401   let object_uri = delete
402     .object()
403     .to_owned()
404     .single_xsd_any_uri()
405     .context(location_info!())?;
406
407   match find_community_or_private_message_by_id(context, object_uri).await? {
408     Community(c) => receive_delete_community(context, c).await,
409     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
410   }
411 }
412
413 async fn receive_remove(
414   context: &LemmyContext,
415   any_base: AnyBase,
416   expected_domain: &Url,
417 ) -> Result<(), LemmyError> {
418   let remove = Remove::from_any_base(any_base.clone())?.context(location_info!())?;
419   verify_activity_domains_valid(&remove, expected_domain, true)?;
420   let object_uri = remove
421     .object()
422     .to_owned()
423     .single_xsd_any_uri()
424     .context(location_info!())?;
425   let community = blocking(context.pool(), move |conn| {
426     Community::read_from_apub_id(conn, &object_uri.into())
427   })
428   .await??;
429   receive_remove_community(&context, community).await
430 }
431
432 async fn receive_undo(
433   context: &LemmyContext,
434   any_base: AnyBase,
435   expected_domain: &Url,
436   request_counter: &mut i32,
437 ) -> Result<(), LemmyError> {
438   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
439   verify_activity_domains_valid(&undo, expected_domain, true)?;
440
441   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
442   let kind = inner_activity.kind_str();
443   match kind {
444     Some("Delete") => {
445       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
446       verify_activity_domains_valid(&delete, expected_domain, true)?;
447       let object_uri = delete
448         .object()
449         .to_owned()
450         .single_xsd_any_uri()
451         .context(location_info!())?;
452       use CommunityOrPrivateMessage::*;
453       match find_community_or_private_message_by_id(context, object_uri).await? {
454         Community(c) => receive_undo_delete_community(context, c).await,
455         PrivateMessage(p) => {
456           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
457             .await
458         }
459       }
460     }
461     Some("Remove") => {
462       let remove = Remove::from_any_base(inner_activity)?.context(location_info!())?;
463       let object_uri = remove
464         .object()
465         .to_owned()
466         .single_xsd_any_uri()
467         .context(location_info!())?;
468       let community = blocking(context.pool(), move |conn| {
469         Community::read_from_apub_id(conn, &object_uri.into())
470       })
471       .await??;
472       receive_undo_remove_community(context, community).await
473     }
474     _ => receive_unhandled_activity(undo),
475   }
476 }
477 enum CommunityOrPrivateMessage {
478   Community(Community),
479   PrivateMessage(PrivateMessage),
480 }
481
482 async fn find_community_or_private_message_by_id(
483   context: &LemmyContext,
484   apub_id: Url,
485 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
486   let ap_id = apub_id.to_owned();
487   let community = blocking(context.pool(), move |conn| {
488     Community::read_from_apub_id(conn, &ap_id.into())
489   })
490   .await?;
491   if let Ok(c) = community {
492     return Ok(CommunityOrPrivateMessage::Community(c));
493   }
494
495   let ap_id = apub_id.to_owned();
496   let private_message = blocking(context.pool(), move |conn| {
497     PrivateMessage::read_from_apub_id(conn, &ap_id.into())
498   })
499   .await?;
500   if let Ok(p) = private_message {
501     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
502   }
503
504   Err(NotFound.into())
505 }