]> Untitled Git - lemmy.git/blob - crates/apub/src/inbox/user_inbox.rs
Rename `lemmy_structs` to `lemmy_api_structs`
[lemmy.git] / crates / apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   check_is_apub_id_valid,
20   fetcher::community::get_or_fetch_and_upsert_community,
21   inbox::{
22     assert_activity_not_local,
23     get_activity_id,
24     get_activity_to_and_cc,
25     inbox_verify_http_signature,
26     is_activity_already_known,
27     is_addressed_to_community_followers,
28     is_addressed_to_local_user,
29     is_addressed_to_public,
30     receive_for_community::{
31       receive_create_for_community,
32       receive_delete_for_community,
33       receive_dislike_for_community,
34       receive_like_for_community,
35       receive_remove_for_community,
36       receive_undo_for_community,
37       receive_update_for_community,
38     },
39   },
40   insert_activity,
41   ActorType,
42 };
43 use activitystreams::{
44   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
45   base::AnyBase,
46   prelude::*,
47 };
48 use actix_web::{web, HttpRequest, HttpResponse};
49 use anyhow::{anyhow, Context};
50 use diesel::NotFound;
51 use lemmy_api_structs::blocking;
52 use lemmy_db_queries::{source::user::User, ApubObject, Followable};
53 use lemmy_db_schema::source::{
54   community::{Community, CommunityFollower},
55   private_message::PrivateMessage,
56   user::User_,
57 };
58 use lemmy_utils::{location_info, LemmyError};
59 use lemmy_websocket::LemmyContext;
60 use log::debug;
61 use serde::{Deserialize, Serialize};
62 use std::fmt::Debug;
63 use strum_macros::EnumString;
64 use url::Url;
65
66 /// Allowed activities for user inbox.
67 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
68 #[serde(rename_all = "PascalCase")]
69 pub enum UserValidTypes {
70   Accept,   // community accepted our follow request
71   Create,   // create private message
72   Update,   // edit private message
73   Delete,   // private message or community deleted by creator
74   Undo,     // private message or community restored
75   Remove,   // community removed by admin
76   Announce, // post, comment or vote in community
77 }
78
79 pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
80
81 /// Handler for all incoming activities to user inboxes.
82 pub async fn user_inbox(
83   request: HttpRequest,
84   input: web::Json<UserAcceptedActivities>,
85   path: web::Path<String>,
86   context: web::Data<LemmyContext>,
87 ) -> Result<HttpResponse, LemmyError> {
88   let activity = input.into_inner();
89   // First of all check the http signature
90   let request_counter = &mut 0;
91   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
92
93   // Do nothing if we received the same activity before
94   let activity_id = get_activity_id(&activity, &actor.actor_id())?;
95   if is_activity_already_known(context.pool(), &activity_id).await? {
96     return Ok(HttpResponse::Ok().finish());
97   }
98
99   // Check if the activity is actually meant for us
100   let username = path.into_inner();
101   let user = blocking(&context.pool(), move |conn| {
102     User_::read_from_name(&conn, &username)
103   })
104   .await??;
105   let to_and_cc = get_activity_to_and_cc(&activity);
106   // TODO: we should also accept activities that are sent to community followers
107   if !to_and_cc.contains(&&user.actor_id()) {
108     return Err(anyhow!("Activity delivered to wrong user").into());
109   }
110
111   assert_activity_not_local(&activity)?;
112   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
113
114   debug!(
115     "User {} received activity {:?} from {}",
116     user.name,
117     &activity.id_unchecked(),
118     &actor.actor_id()
119   );
120
121   user_receive_message(
122     activity.clone(),
123     Some(user.clone()),
124     actor.as_ref(),
125     &context,
126     request_counter,
127   )
128   .await
129 }
130
131 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
132 pub(crate) async fn user_receive_message(
133   activity: UserAcceptedActivities,
134   to_user: Option<User_>,
135   actor: &dyn ActorType,
136   context: &LemmyContext,
137   request_counter: &mut i32,
138 ) -> Result<HttpResponse, LemmyError> {
139   is_for_user_inbox(context, &activity).await?;
140
141   let any_base = activity.clone().into_any_base()?;
142   let kind = activity.kind().context(location_info!())?;
143   let actor_url = actor.actor_id();
144   match kind {
145     UserValidTypes::Accept => {
146       receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
147     }
148     UserValidTypes::Announce => {
149       receive_announce(&context, any_base, actor, request_counter).await?
150     }
151     UserValidTypes::Create => {
152       receive_create(&context, any_base, actor_url, request_counter).await?
153     }
154     UserValidTypes::Update => {
155       receive_update(&context, any_base, actor_url, request_counter).await?
156     }
157     UserValidTypes::Delete => {
158       receive_delete(context, any_base, &actor_url, request_counter).await?
159     }
160     UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
161     UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
162   };
163
164   // TODO: would be logical to move websocket notification code here
165
166   Ok(HttpResponse::Ok().finish())
167 }
168
169 /// Returns true if the activity is addressed directly to one or more local users, or if it is
170 /// addressed to the followers collection of a remote community, and at least one local user follows
171 /// it.
172 async fn is_for_user_inbox(
173   context: &LemmyContext,
174   activity: &UserAcceptedActivities,
175 ) -> Result<(), LemmyError> {
176   let to_and_cc = get_activity_to_and_cc(activity);
177   // Check if it is addressed directly to any local user
178   if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
179     return Ok(());
180   }
181
182   // Check if it is addressed to any followers collection of a remote community, and that the
183   // community has local followers.
184   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
185   if let Some(c) = community {
186     let community_id = c.id;
187     let has_local_followers = blocking(&context.pool(), move |conn| {
188       CommunityFollower::has_local_followers(conn, community_id)
189     })
190     .await??;
191     if c.local {
192       return Err(
193         anyhow!("Remote activity cant be addressed to followers of local community").into(),
194       );
195     }
196     if has_local_followers {
197       return Ok(());
198     }
199   }
200
201   Err(anyhow!("Not addressed for any local user").into())
202 }
203
204 /// Handle accepted follows.
205 async fn receive_accept(
206   context: &LemmyContext,
207   activity: AnyBase,
208   actor: &dyn ActorType,
209   user: User_,
210   request_counter: &mut i32,
211 ) -> Result<(), LemmyError> {
212   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
213   verify_activity_domains_valid(&accept, &actor.actor_id(), false)?;
214
215   let object = accept.object().to_owned().one().context(location_info!())?;
216   let follow = Follow::from_any_base(object)?.context(location_info!())?;
217   verify_activity_domains_valid(&follow, &user.actor_id(), false)?;
218
219   let community_uri = accept
220     .actor()?
221     .to_owned()
222     .single_xsd_any_uri()
223     .context(location_info!())?;
224
225   let community =
226     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
227
228   let community_id = community.id;
229   let user_id = user.id;
230   // This will throw an error if no follow was requested
231   blocking(&context.pool(), move |conn| {
232     CommunityFollower::follow_accepted(conn, community_id, user_id)
233   })
234   .await??;
235
236   Ok(())
237 }
238
239 #[derive(EnumString)]
240 enum AnnouncableActivities {
241   Create,
242   Update,
243   Like,
244   Dislike,
245   Delete,
246   Remove,
247   Undo,
248 }
249
250 /// Takes an announce and passes the inner activity to the appropriate handler.
251 pub async fn receive_announce(
252   context: &LemmyContext,
253   activity: AnyBase,
254   actor: &dyn ActorType,
255   request_counter: &mut i32,
256 ) -> Result<(), LemmyError> {
257   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
258   verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
259   is_addressed_to_public(&announce)?;
260
261   let kind = announce
262     .object()
263     .as_single_kind_str()
264     .and_then(|s| s.parse().ok());
265   let inner_activity = announce
266     .object()
267     .to_owned()
268     .one()
269     .context(location_info!())?;
270
271   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
272   check_is_apub_id_valid(&inner_id)?;
273   if is_activity_already_known(context.pool(), &inner_id).await? {
274     return Ok(());
275   }
276
277   use AnnouncableActivities::*;
278   match kind {
279     Some(Create) => {
280       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
281     }
282     Some(Update) => {
283       receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
284     }
285     Some(Like) => {
286       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
287     }
288     Some(Dislike) => {
289       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
290     }
291     Some(Delete) => receive_delete_for_community(context, inner_activity, &inner_id).await,
292     Some(Remove) => receive_remove_for_community(context, inner_activity, &inner_id).await,
293     Some(Undo) => {
294       receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
295     }
296     _ => receive_unhandled_activity(inner_activity),
297   }
298 }
299
300 async fn receive_create(
301   context: &LemmyContext,
302   activity: AnyBase,
303   expected_domain: Url,
304   request_counter: &mut i32,
305 ) -> Result<(), LemmyError> {
306   let create = Create::from_any_base(activity)?.context(location_info!())?;
307   verify_activity_domains_valid(&create, &expected_domain, true)?;
308   if is_addressed_to_public(&create).is_ok() {
309     receive_create_comment(create, context, request_counter).await
310   } else {
311     receive_create_private_message(&context, create, expected_domain, request_counter).await
312   }
313 }
314
315 async fn receive_update(
316   context: &LemmyContext,
317   activity: AnyBase,
318   expected_domain: Url,
319   request_counter: &mut i32,
320 ) -> Result<(), LemmyError> {
321   let update = Update::from_any_base(activity)?.context(location_info!())?;
322   verify_activity_domains_valid(&update, &expected_domain, true)?;
323   if is_addressed_to_public(&update).is_ok() {
324     receive_update_comment(update, context, request_counter).await
325   } else {
326     receive_update_private_message(&context, update, expected_domain, request_counter).await
327   }
328 }
329
330 async fn receive_delete(
331   context: &LemmyContext,
332   any_base: AnyBase,
333   expected_domain: &Url,
334   request_counter: &mut i32,
335 ) -> Result<(), LemmyError> {
336   use CommunityOrPrivateMessage::*;
337
338   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
339   verify_activity_domains_valid(&delete, expected_domain, true)?;
340   let object_uri = delete
341     .object()
342     .to_owned()
343     .single_xsd_any_uri()
344     .context(location_info!())?;
345
346   match find_community_or_private_message_by_id(context, object_uri).await? {
347     Community(c) => receive_delete_community(context, c).await,
348     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
349   }
350 }
351
352 async fn receive_undo(
353   context: &LemmyContext,
354   any_base: AnyBase,
355   expected_domain: &Url,
356   request_counter: &mut i32,
357 ) -> Result<(), LemmyError> {
358   use CommunityOrPrivateMessage::*;
359   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
360   verify_activity_domains_valid(&undo, expected_domain, true)?;
361
362   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
363   let kind = inner_activity.kind_str();
364   match kind {
365     Some("Delete") => {
366       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
367       verify_activity_domains_valid(&delete, expected_domain, true)?;
368       let object_uri = delete
369         .object()
370         .to_owned()
371         .single_xsd_any_uri()
372         .context(location_info!())?;
373       match find_community_or_private_message_by_id(context, object_uri).await? {
374         Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
375         PrivateMessage(p) => {
376           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
377             .await
378         }
379       }
380     }
381     Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
382     _ => receive_unhandled_activity(undo),
383   }
384 }
385 enum CommunityOrPrivateMessage {
386   Community(Community),
387   PrivateMessage(PrivateMessage),
388 }
389
390 async fn find_community_or_private_message_by_id(
391   context: &LemmyContext,
392   apub_id: Url,
393 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
394   let ap_id = apub_id.to_owned();
395   let community = blocking(context.pool(), move |conn| {
396     Community::read_from_apub_id(conn, &ap_id.into())
397   })
398   .await?;
399   if let Ok(c) = community {
400     return Ok(CommunityOrPrivateMessage::Community(c));
401   }
402
403   let ap_id = apub_id.to_owned();
404   let private_message = blocking(context.pool(), move |conn| {
405     PrivateMessage::read_from_apub_id(conn, &ap_id.into())
406   })
407   .await?;
408   if let Ok(p) = private_message {
409     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
410   }
411
412   Err(NotFound.into())
413 }