]> Untitled Git - lemmy.git/blob - crates/apub/src/inbox/user_inbox.rs
Move most code into crates/ subfolder
[lemmy.git] / crates / apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   check_is_apub_id_valid,
20   fetcher::community::get_or_fetch_and_upsert_community,
21   inbox::{
22     assert_activity_not_local,
23     get_activity_id,
24     get_activity_to_and_cc,
25     inbox_verify_http_signature,
26     is_activity_already_known,
27     is_addressed_to_community_followers,
28     is_addressed_to_local_user,
29     is_addressed_to_public,
30     receive_for_community::{
31       receive_create_for_community,
32       receive_delete_for_community,
33       receive_dislike_for_community,
34       receive_like_for_community,
35       receive_remove_for_community,
36       receive_undo_for_community,
37       receive_update_for_community,
38     },
39   },
40   insert_activity,
41   ActorType,
42 };
43 use activitystreams::{
44   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
45   base::AnyBase,
46   prelude::*,
47 };
48 use actix_web::{web, HttpRequest, HttpResponse};
49 use anyhow::{anyhow, Context};
50 use diesel::NotFound;
51 use lemmy_db_queries::{source::user::User, ApubObject, Followable};
52 use lemmy_db_schema::source::{
53   community::{Community, CommunityFollower},
54   private_message::PrivateMessage,
55   user::User_,
56 };
57 use lemmy_structs::blocking;
58 use lemmy_utils::{location_info, LemmyError};
59 use lemmy_websocket::LemmyContext;
60 use log::debug;
61 use serde::{Deserialize, Serialize};
62 use std::fmt::Debug;
63 use url::Url;
64
65 /// Allowed activities for user inbox.
66 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
67 #[serde(rename_all = "PascalCase")]
68 pub enum UserValidTypes {
69   Accept,   // community accepted our follow request
70   Create,   // create private message
71   Update,   // edit private message
72   Delete,   // private message or community deleted by creator
73   Undo,     // private message or community restored
74   Remove,   // community removed by admin
75   Announce, // post, comment or vote in community
76 }
77
78 pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
79
80 /// Handler for all incoming activities to user inboxes.
81 pub async fn user_inbox(
82   request: HttpRequest,
83   input: web::Json<UserAcceptedActivities>,
84   path: web::Path<String>,
85   context: web::Data<LemmyContext>,
86 ) -> Result<HttpResponse, LemmyError> {
87   let activity = input.into_inner();
88   // First of all check the http signature
89   let request_counter = &mut 0;
90   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
91
92   // Do nothing if we received the same activity before
93   let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
94   if is_activity_already_known(context.pool(), &activity_id).await? {
95     return Ok(HttpResponse::Ok().finish());
96   }
97
98   // Check if the activity is actually meant for us
99   let username = path.into_inner();
100   let user = blocking(&context.pool(), move |conn| {
101     User_::read_from_name(&conn, &username)
102   })
103   .await??;
104   let to_and_cc = get_activity_to_and_cc(&activity);
105   // TODO: we should also accept activities that are sent to community followers
106   if !to_and_cc.contains(&&user.actor_id()?) {
107     return Err(anyhow!("Activity delivered to wrong user").into());
108   }
109
110   assert_activity_not_local(&activity)?;
111   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
112
113   debug!(
114     "User {} received activity {:?} from {}",
115     user.name,
116     &activity.id_unchecked(),
117     &actor.actor_id_str()
118   );
119
120   user_receive_message(
121     activity.clone(),
122     Some(user.clone()),
123     actor.as_ref(),
124     &context,
125     request_counter,
126   )
127   .await
128 }
129
130 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
131 pub(crate) async fn user_receive_message(
132   activity: UserAcceptedActivities,
133   to_user: Option<User_>,
134   actor: &dyn ActorType,
135   context: &LemmyContext,
136   request_counter: &mut i32,
137 ) -> Result<HttpResponse, LemmyError> {
138   is_for_user_inbox(context, &activity).await?;
139
140   let any_base = activity.clone().into_any_base()?;
141   let kind = activity.kind().context(location_info!())?;
142   let actor_url = actor.actor_id()?;
143   match kind {
144     UserValidTypes::Accept => {
145       receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
146     }
147     UserValidTypes::Announce => {
148       receive_announce(&context, any_base, actor, request_counter).await?
149     }
150     UserValidTypes::Create => {
151       receive_create(&context, any_base, actor_url, request_counter).await?
152     }
153     UserValidTypes::Update => {
154       receive_update(&context, any_base, actor_url, request_counter).await?
155     }
156     UserValidTypes::Delete => {
157       receive_delete(context, any_base, &actor_url, request_counter).await?
158     }
159     UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
160     UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
161   };
162
163   // TODO: would be logical to move websocket notification code here
164
165   Ok(HttpResponse::Ok().finish())
166 }
167
168 /// Returns true if the activity is addressed directly to one or more local users, or if it is
169 /// addressed to the followers collection of a remote community, and at least one local user follows
170 /// it.
171 async fn is_for_user_inbox(
172   context: &LemmyContext,
173   activity: &UserAcceptedActivities,
174 ) -> Result<(), LemmyError> {
175   let to_and_cc = get_activity_to_and_cc(activity);
176   // Check if it is addressed directly to any local user
177   if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
178     return Ok(());
179   }
180
181   // Check if it is addressed to any followers collection of a remote community, and that the
182   // community has local followers.
183   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
184   if let Some(c) = community {
185     let community_id = c.id;
186     let has_local_followers = blocking(&context.pool(), move |conn| {
187       CommunityFollower::has_local_followers(conn, community_id)
188     })
189     .await??;
190     if c.local {
191       return Err(
192         anyhow!("Remote activity cant be addressed to followers of local community").into(),
193       );
194     }
195     if has_local_followers {
196       return Ok(());
197     }
198   }
199
200   Err(anyhow!("Not addressed for any local user").into())
201 }
202
203 /// Handle accepted follows.
204 async fn receive_accept(
205   context: &LemmyContext,
206   activity: AnyBase,
207   actor: &dyn ActorType,
208   user: User_,
209   request_counter: &mut i32,
210 ) -> Result<(), LemmyError> {
211   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
212   verify_activity_domains_valid(&accept, &actor.actor_id()?, false)?;
213
214   let object = accept.object().to_owned().one().context(location_info!())?;
215   let follow = Follow::from_any_base(object)?.context(location_info!())?;
216   verify_activity_domains_valid(&follow, &user.actor_id()?, false)?;
217
218   let community_uri = accept
219     .actor()?
220     .to_owned()
221     .single_xsd_any_uri()
222     .context(location_info!())?;
223
224   let community =
225     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
226
227   let community_id = community.id;
228   let user_id = user.id;
229   // This will throw an error if no follow was requested
230   blocking(&context.pool(), move |conn| {
231     CommunityFollower::follow_accepted(conn, community_id, user_id)
232   })
233   .await??;
234
235   Ok(())
236 }
237
238 /// Takes an announce and passes the inner activity to the appropriate handler.
239 async fn receive_announce(
240   context: &LemmyContext,
241   activity: AnyBase,
242   actor: &dyn ActorType,
243   request_counter: &mut i32,
244 ) -> Result<(), LemmyError> {
245   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
246   verify_activity_domains_valid(&announce, &actor.actor_id()?, false)?;
247   is_addressed_to_public(&announce)?;
248
249   let kind = announce.object().as_single_kind_str();
250   let inner_activity = announce
251     .object()
252     .to_owned()
253     .one()
254     .context(location_info!())?;
255
256   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
257   check_is_apub_id_valid(&inner_id)?;
258   if is_activity_already_known(context.pool(), &inner_id).await? {
259     return Ok(());
260   }
261
262   match kind {
263     Some("Create") => {
264       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
265     }
266     Some("Update") => {
267       receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
268     }
269     Some("Like") => {
270       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
271     }
272     Some("Dislike") => {
273       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
274     }
275     Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await,
276     Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await,
277     Some("Undo") => {
278       receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
279     }
280     _ => receive_unhandled_activity(inner_activity),
281   }
282 }
283
284 async fn receive_create(
285   context: &LemmyContext,
286   activity: AnyBase,
287   expected_domain: Url,
288   request_counter: &mut i32,
289 ) -> Result<(), LemmyError> {
290   let create = Create::from_any_base(activity)?.context(location_info!())?;
291   verify_activity_domains_valid(&create, &expected_domain, true)?;
292   if is_addressed_to_public(&create).is_ok() {
293     receive_create_comment(create, context, request_counter).await
294   } else {
295     receive_create_private_message(&context, create, expected_domain, request_counter).await
296   }
297 }
298
299 async fn receive_update(
300   context: &LemmyContext,
301   activity: AnyBase,
302   expected_domain: Url,
303   request_counter: &mut i32,
304 ) -> Result<(), LemmyError> {
305   let update = Update::from_any_base(activity)?.context(location_info!())?;
306   verify_activity_domains_valid(&update, &expected_domain, true)?;
307   if is_addressed_to_public(&update).is_ok() {
308     receive_update_comment(update, context, request_counter).await
309   } else {
310     receive_update_private_message(&context, update, expected_domain, request_counter).await
311   }
312 }
313
314 async fn receive_delete(
315   context: &LemmyContext,
316   any_base: AnyBase,
317   expected_domain: &Url,
318   request_counter: &mut i32,
319 ) -> Result<(), LemmyError> {
320   use CommunityOrPrivateMessage::*;
321
322   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
323   verify_activity_domains_valid(&delete, expected_domain, true)?;
324   let object_uri = delete
325     .object()
326     .to_owned()
327     .single_xsd_any_uri()
328     .context(location_info!())?;
329
330   match find_community_or_private_message_by_id(context, object_uri).await? {
331     Community(c) => receive_delete_community(context, c).await,
332     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
333   }
334 }
335
336 async fn receive_undo(
337   context: &LemmyContext,
338   any_base: AnyBase,
339   expected_domain: &Url,
340   request_counter: &mut i32,
341 ) -> Result<(), LemmyError> {
342   use CommunityOrPrivateMessage::*;
343   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
344   verify_activity_domains_valid(&undo, expected_domain, true)?;
345
346   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
347   let kind = inner_activity.kind_str();
348   match kind {
349     Some("Delete") => {
350       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
351       verify_activity_domains_valid(&delete, expected_domain, true)?;
352       let object_uri = delete
353         .object()
354         .to_owned()
355         .single_xsd_any_uri()
356         .context(location_info!())?;
357       match find_community_or_private_message_by_id(context, object_uri).await? {
358         Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
359         PrivateMessage(p) => {
360           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
361             .await
362         }
363       }
364     }
365     Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
366     _ => receive_unhandled_activity(undo),
367   }
368 }
369 enum CommunityOrPrivateMessage {
370   Community(Community),
371   PrivateMessage(PrivateMessage),
372 }
373
374 async fn find_community_or_private_message_by_id(
375   context: &LemmyContext,
376   apub_id: Url,
377 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
378   let ap_id = apub_id.to_string();
379   let community = blocking(context.pool(), move |conn| {
380     Community::read_from_apub_id(conn, &ap_id)
381   })
382   .await?;
383   if let Ok(c) = community {
384     return Ok(CommunityOrPrivateMessage::Community(c));
385   }
386
387   let ap_id = apub_id.to_string();
388   let private_message = blocking(context.pool(), move |conn| {
389     PrivateMessage::read_from_apub_id(conn, &ap_id)
390   })
391   .await?;
392   if let Ok(p) = private_message {
393     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
394   }
395
396   Err(NotFound.into())
397 }