]> Untitled Git - lemmy.git/blob - crates/apub/src/inbox/user_inbox.rs
6e04767403be67cc04e23e0a13524bb1586e4135
[lemmy.git] / crates / apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   check_is_apub_id_valid,
20   fetcher::community::get_or_fetch_and_upsert_community,
21   inbox::{
22     assert_activity_not_local,
23     get_activity_id,
24     get_activity_to_and_cc,
25     inbox_verify_http_signature,
26     is_activity_already_known,
27     is_addressed_to_community_followers,
28     is_addressed_to_local_user,
29     is_addressed_to_public,
30     receive_for_community::{
31       receive_create_for_community,
32       receive_delete_for_community,
33       receive_dislike_for_community,
34       receive_like_for_community,
35       receive_remove_for_community,
36       receive_undo_for_community,
37       receive_update_for_community,
38     },
39   },
40   insert_activity,
41   ActorType,
42 };
43 use activitystreams::{
44   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
45   base::AnyBase,
46   prelude::*,
47 };
48 use actix_web::{web, HttpRequest, HttpResponse};
49 use anyhow::{anyhow, Context};
50 use diesel::NotFound;
51 use lemmy_api_structs::blocking;
52 use lemmy_db_queries::{source::user::User, ApubObject, Followable};
53 use lemmy_db_schema::source::{
54   community::{Community, CommunityFollower},
55   private_message::PrivateMessage,
56   user::User_,
57 };
58 use lemmy_utils::{location_info, LemmyError};
59 use lemmy_websocket::LemmyContext;
60 use log::debug;
61 use serde::{Deserialize, Serialize};
62 use std::fmt::Debug;
63 use strum_macros::EnumString;
64 use url::Url;
65
66 /// Allowed activities for user inbox.
67 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
68 #[serde(rename_all = "PascalCase")]
69 pub enum UserValidTypes {
70   Accept,   // community accepted our follow request
71   Create,   // create private message
72   Update,   // edit private message
73   Delete,   // private message or community deleted by creator
74   Undo,     // private message or community restored
75   Remove,   // community removed by admin
76   Announce, // post, comment or vote in community
77 }
78
79 pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
80
81 /// Handler for all incoming activities to user inboxes.
82 pub async fn user_inbox(
83   request: HttpRequest,
84   input: web::Json<UserAcceptedActivities>,
85   path: web::Path<String>,
86   context: web::Data<LemmyContext>,
87 ) -> Result<HttpResponse, LemmyError> {
88   let activity = input.into_inner();
89   // First of all check the http signature
90   let request_counter = &mut 0;
91   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
92
93   // Do nothing if we received the same activity before
94   let activity_id = get_activity_id(&activity, &actor.actor_id())?;
95   if is_activity_already_known(context.pool(), &activity_id).await? {
96     return Ok(HttpResponse::Ok().finish());
97   }
98
99   // Check if the activity is actually meant for us
100   let username = path.into_inner();
101   let user = blocking(&context.pool(), move |conn| {
102     User_::read_from_name(&conn, &username)
103   })
104   .await??;
105   let to_and_cc = get_activity_to_and_cc(&activity);
106   // TODO: we should also accept activities that are sent to community followers
107   if !to_and_cc.contains(&&user.actor_id()) {
108     return Err(anyhow!("Activity delivered to wrong user").into());
109   }
110
111   assert_activity_not_local(&activity)?;
112   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
113
114   debug!(
115     "User {} received activity {:?} from {}",
116     user.name,
117     &activity.id_unchecked(),
118     &actor.actor_id()
119   );
120
121   user_receive_message(
122     activity.clone(),
123     Some(user.clone()),
124     actor.as_ref(),
125     &context,
126     request_counter,
127   )
128   .await
129 }
130
131 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
132 pub(crate) async fn user_receive_message(
133   activity: UserAcceptedActivities,
134   to_user: Option<User_>,
135   actor: &dyn ActorType,
136   context: &LemmyContext,
137   request_counter: &mut i32,
138 ) -> Result<HttpResponse, LemmyError> {
139   is_for_user_inbox(context, &activity).await?;
140
141   let any_base = activity.clone().into_any_base()?;
142   let kind = activity.kind().context(location_info!())?;
143   let actor_url = actor.actor_id();
144   match kind {
145     UserValidTypes::Accept => {
146       receive_accept(
147         &context,
148         any_base,
149         actor,
150         to_user.expect("user provided"),
151         request_counter,
152       )
153       .await?;
154     }
155     UserValidTypes::Announce => {
156       receive_announce(&context, any_base, actor, request_counter).await?
157     }
158     UserValidTypes::Create => {
159       receive_create(&context, any_base, actor_url, request_counter).await?
160     }
161     UserValidTypes::Update => {
162       receive_update(&context, any_base, actor_url, request_counter).await?
163     }
164     UserValidTypes::Delete => {
165       receive_delete(context, any_base, &actor_url, request_counter).await?
166     }
167     UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
168     UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
169   };
170
171   // TODO: would be logical to move websocket notification code here
172
173   Ok(HttpResponse::Ok().finish())
174 }
175
176 /// Returns true if the activity is addressed directly to one or more local users, or if it is
177 /// addressed to the followers collection of a remote community, and at least one local user follows
178 /// it.
179 async fn is_for_user_inbox(
180   context: &LemmyContext,
181   activity: &UserAcceptedActivities,
182 ) -> Result<(), LemmyError> {
183   let to_and_cc = get_activity_to_and_cc(activity);
184   // Check if it is addressed directly to any local user
185   if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
186     return Ok(());
187   }
188
189   // Check if it is addressed to any followers collection of a remote community, and that the
190   // community has local followers.
191   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
192   if let Some(c) = community {
193     let community_id = c.id;
194     let has_local_followers = blocking(&context.pool(), move |conn| {
195       CommunityFollower::has_local_followers(conn, community_id)
196     })
197     .await??;
198     if c.local {
199       return Err(
200         anyhow!("Remote activity cant be addressed to followers of local community").into(),
201       );
202     }
203     if has_local_followers {
204       return Ok(());
205     }
206   }
207
208   Err(anyhow!("Not addressed for any local user").into())
209 }
210
211 /// Handle accepted follows.
212 async fn receive_accept(
213   context: &LemmyContext,
214   activity: AnyBase,
215   actor: &dyn ActorType,
216   user: User_,
217   request_counter: &mut i32,
218 ) -> Result<(), LemmyError> {
219   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
220   verify_activity_domains_valid(&accept, &actor.actor_id(), false)?;
221
222   let object = accept.object().to_owned().one().context(location_info!())?;
223   let follow = Follow::from_any_base(object)?.context(location_info!())?;
224   verify_activity_domains_valid(&follow, &user.actor_id(), false)?;
225
226   let community_uri = accept
227     .actor()?
228     .to_owned()
229     .single_xsd_any_uri()
230     .context(location_info!())?;
231
232   let community =
233     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
234
235   let community_id = community.id;
236   let user_id = user.id;
237   // This will throw an error if no follow was requested
238   blocking(&context.pool(), move |conn| {
239     CommunityFollower::follow_accepted(conn, community_id, user_id)
240   })
241   .await??;
242
243   Ok(())
244 }
245
246 #[derive(EnumString)]
247 enum AnnouncableActivities {
248   Create,
249   Update,
250   Like,
251   Dislike,
252   Delete,
253   Remove,
254   Undo,
255 }
256
257 /// Takes an announce and passes the inner activity to the appropriate handler.
258 pub async fn receive_announce(
259   context: &LemmyContext,
260   activity: AnyBase,
261   actor: &dyn ActorType,
262   request_counter: &mut i32,
263 ) -> Result<(), LemmyError> {
264   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
265   verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
266   is_addressed_to_public(&announce)?;
267
268   let kind = announce
269     .object()
270     .as_single_kind_str()
271     .and_then(|s| s.parse().ok());
272   let inner_activity = announce
273     .object()
274     .to_owned()
275     .one()
276     .context(location_info!())?;
277
278   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
279   check_is_apub_id_valid(&inner_id)?;
280   if is_activity_already_known(context.pool(), &inner_id).await? {
281     return Ok(());
282   }
283
284   use AnnouncableActivities::*;
285   match kind {
286     Some(Create) => {
287       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
288     }
289     Some(Update) => {
290       receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
291     }
292     Some(Like) => {
293       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
294     }
295     Some(Dislike) => {
296       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
297     }
298     Some(Delete) => receive_delete_for_community(context, inner_activity, &inner_id).await,
299     Some(Remove) => {
300       receive_remove_for_community(context, inner_activity, &inner_id, request_counter).await
301     }
302     Some(Undo) => {
303       receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
304     }
305     _ => receive_unhandled_activity(inner_activity),
306   }
307 }
308
309 async fn receive_create(
310   context: &LemmyContext,
311   activity: AnyBase,
312   expected_domain: Url,
313   request_counter: &mut i32,
314 ) -> Result<(), LemmyError> {
315   let create = Create::from_any_base(activity)?.context(location_info!())?;
316   verify_activity_domains_valid(&create, &expected_domain, true)?;
317   if is_addressed_to_public(&create).is_ok() {
318     receive_create_comment(create, context, request_counter).await
319   } else {
320     receive_create_private_message(&context, create, expected_domain, request_counter).await
321   }
322 }
323
324 async fn receive_update(
325   context: &LemmyContext,
326   activity: AnyBase,
327   expected_domain: Url,
328   request_counter: &mut i32,
329 ) -> Result<(), LemmyError> {
330   let update = Update::from_any_base(activity)?.context(location_info!())?;
331   verify_activity_domains_valid(&update, &expected_domain, true)?;
332   if is_addressed_to_public(&update).is_ok() {
333     receive_update_comment(update, context, request_counter).await
334   } else {
335     receive_update_private_message(&context, update, expected_domain, request_counter).await
336   }
337 }
338
339 async fn receive_delete(
340   context: &LemmyContext,
341   any_base: AnyBase,
342   expected_domain: &Url,
343   request_counter: &mut i32,
344 ) -> Result<(), LemmyError> {
345   use CommunityOrPrivateMessage::*;
346
347   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
348   verify_activity_domains_valid(&delete, expected_domain, true)?;
349   let object_uri = delete
350     .object()
351     .to_owned()
352     .single_xsd_any_uri()
353     .context(location_info!())?;
354
355   match find_community_or_private_message_by_id(context, object_uri).await? {
356     Community(c) => receive_delete_community(context, c).await,
357     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
358   }
359 }
360
361 async fn receive_undo(
362   context: &LemmyContext,
363   any_base: AnyBase,
364   expected_domain: &Url,
365   request_counter: &mut i32,
366 ) -> Result<(), LemmyError> {
367   use CommunityOrPrivateMessage::*;
368   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
369   verify_activity_domains_valid(&undo, expected_domain, true)?;
370
371   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
372   let kind = inner_activity.kind_str();
373   match kind {
374     Some("Delete") => {
375       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
376       verify_activity_domains_valid(&delete, expected_domain, true)?;
377       let object_uri = delete
378         .object()
379         .to_owned()
380         .single_xsd_any_uri()
381         .context(location_info!())?;
382       match find_community_or_private_message_by_id(context, object_uri).await? {
383         Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
384         PrivateMessage(p) => {
385           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
386             .await
387         }
388       }
389     }
390     Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
391     _ => receive_unhandled_activity(undo),
392   }
393 }
394 enum CommunityOrPrivateMessage {
395   Community(Community),
396   PrivateMessage(PrivateMessage),
397 }
398
399 async fn find_community_or_private_message_by_id(
400   context: &LemmyContext,
401   apub_id: Url,
402 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
403   let ap_id = apub_id.to_owned();
404   let community = blocking(context.pool(), move |conn| {
405     Community::read_from_apub_id(conn, &ap_id.into())
406   })
407   .await?;
408   if let Ok(c) = community {
409     return Ok(CommunityOrPrivateMessage::Community(c));
410   }
411
412   let ap_id = apub_id.to_owned();
413   let private_message = blocking(context.pool(), move |conn| {
414     PrivateMessage::read_from_apub_id(conn, &ap_id.into())
415   })
416   .await?;
417   if let Ok(p) = private_message {
418     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
419   }
420
421   Err(NotFound.into())
422 }