]> Untitled Git - lemmy.git/blob - crates/apub/src/inbox/user_inbox.rs
Allow adding remote users as community mods (ref #1061)
[lemmy.git] / crates / apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::{
3     comment::{receive_create_comment, receive_update_comment},
4     community::{
5       receive_delete_community,
6       receive_remove_community,
7       receive_undo_delete_community,
8       receive_undo_remove_community,
9     },
10     private_message::{
11       receive_create_private_message,
12       receive_delete_private_message,
13       receive_undo_delete_private_message,
14       receive_update_private_message,
15     },
16     receive_unhandled_activity,
17     verify_activity_domains_valid,
18   },
19   check_is_apub_id_valid,
20   fetcher::community::get_or_fetch_and_upsert_community,
21   inbox::{
22     assert_activity_not_local,
23     get_activity_id,
24     get_activity_to_and_cc,
25     inbox_verify_http_signature,
26     is_activity_already_known,
27     is_addressed_to_community_followers,
28     is_addressed_to_local_user,
29     is_addressed_to_public,
30     receive_for_community::{
31       receive_add_for_community,
32       receive_create_for_community,
33       receive_delete_for_community,
34       receive_dislike_for_community,
35       receive_like_for_community,
36       receive_remove_for_community,
37       receive_undo_for_community,
38       receive_update_for_community,
39     },
40   },
41   insert_activity,
42   ActorType,
43 };
44 use activitystreams::{
45   activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
46   base::AnyBase,
47   prelude::*,
48 };
49 use actix_web::{web, HttpRequest, HttpResponse};
50 use anyhow::{anyhow, Context};
51 use diesel::NotFound;
52 use lemmy_api_structs::blocking;
53 use lemmy_db_queries::{source::user::User, ApubObject, Followable};
54 use lemmy_db_schema::source::{
55   community::{Community, CommunityFollower},
56   private_message::PrivateMessage,
57   user::User_,
58 };
59 use lemmy_utils::{location_info, LemmyError};
60 use lemmy_websocket::LemmyContext;
61 use log::debug;
62 use serde::{Deserialize, Serialize};
63 use std::fmt::Debug;
64 use strum_macros::EnumString;
65 use url::Url;
66
67 /// Allowed activities for user inbox.
68 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
69 #[serde(rename_all = "PascalCase")]
70 pub enum UserValidTypes {
71   Accept,   // community accepted our follow request
72   Create,   // create private message
73   Update,   // edit private message
74   Delete,   // private message or community deleted by creator
75   Undo,     // private message or community restored
76   Remove,   // community removed by admin
77   Announce, // post, comment or vote in community
78 }
79
80 pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
81
82 /// Handler for all incoming activities to user inboxes.
83 pub async fn user_inbox(
84   request: HttpRequest,
85   input: web::Json<UserAcceptedActivities>,
86   path: web::Path<String>,
87   context: web::Data<LemmyContext>,
88 ) -> Result<HttpResponse, LemmyError> {
89   let activity = input.into_inner();
90   // First of all check the http signature
91   let request_counter = &mut 0;
92   let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
93
94   // Do nothing if we received the same activity before
95   let activity_id = get_activity_id(&activity, &actor.actor_id())?;
96   if is_activity_already_known(context.pool(), &activity_id).await? {
97     return Ok(HttpResponse::Ok().finish());
98   }
99
100   // Check if the activity is actually meant for us
101   let username = path.into_inner();
102   let user = blocking(&context.pool(), move |conn| {
103     User_::read_from_name(&conn, &username)
104   })
105   .await??;
106   let to_and_cc = get_activity_to_and_cc(&activity);
107   // TODO: we should also accept activities that are sent to community followers
108   if !to_and_cc.contains(&&user.actor_id()) {
109     return Err(anyhow!("Activity delivered to wrong user").into());
110   }
111
112   assert_activity_not_local(&activity)?;
113   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
114
115   debug!(
116     "User {} received activity {:?} from {}",
117     user.name,
118     &activity.id_unchecked(),
119     &actor.actor_id()
120   );
121
122   user_receive_message(
123     activity.clone(),
124     Some(user.clone()),
125     actor.as_ref(),
126     &context,
127     request_counter,
128   )
129   .await
130 }
131
132 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
133 pub(crate) async fn user_receive_message(
134   activity: UserAcceptedActivities,
135   to_user: Option<User_>,
136   actor: &dyn ActorType,
137   context: &LemmyContext,
138   request_counter: &mut i32,
139 ) -> Result<HttpResponse, LemmyError> {
140   is_for_user_inbox(context, &activity).await?;
141
142   let any_base = activity.clone().into_any_base()?;
143   let kind = activity.kind().context(location_info!())?;
144   let actor_url = actor.actor_id();
145   match kind {
146     UserValidTypes::Accept => {
147       receive_accept(
148         &context,
149         any_base,
150         actor,
151         to_user.expect("user provided"),
152         request_counter,
153       )
154       .await?;
155     }
156     UserValidTypes::Announce => {
157       receive_announce(&context, any_base, actor, request_counter).await?
158     }
159     UserValidTypes::Create => {
160       receive_create(&context, any_base, actor_url, request_counter).await?
161     }
162     UserValidTypes::Update => {
163       receive_update(&context, any_base, actor_url, request_counter).await?
164     }
165     UserValidTypes::Delete => {
166       receive_delete(context, any_base, &actor_url, request_counter).await?
167     }
168     UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
169     UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
170   };
171
172   // TODO: would be logical to move websocket notification code here
173
174   Ok(HttpResponse::Ok().finish())
175 }
176
177 /// Returns true if the activity is addressed directly to one or more local users, or if it is
178 /// addressed to the followers collection of a remote community, and at least one local user follows
179 /// it.
180 async fn is_for_user_inbox(
181   context: &LemmyContext,
182   activity: &UserAcceptedActivities,
183 ) -> Result<(), LemmyError> {
184   let to_and_cc = get_activity_to_and_cc(activity);
185   // Check if it is addressed directly to any local user
186   if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
187     return Ok(());
188   }
189
190   // Check if it is addressed to any followers collection of a remote community, and that the
191   // community has local followers.
192   let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
193   if let Some(c) = community {
194     let community_id = c.id;
195     let has_local_followers = blocking(&context.pool(), move |conn| {
196       CommunityFollower::has_local_followers(conn, community_id)
197     })
198     .await??;
199     if c.local {
200       return Err(
201         anyhow!("Remote activity cant be addressed to followers of local community").into(),
202       );
203     }
204     if has_local_followers {
205       return Ok(());
206     }
207   }
208
209   Err(anyhow!("Not addressed for any local user").into())
210 }
211
212 /// Handle accepted follows.
213 async fn receive_accept(
214   context: &LemmyContext,
215   activity: AnyBase,
216   actor: &dyn ActorType,
217   user: User_,
218   request_counter: &mut i32,
219 ) -> Result<(), LemmyError> {
220   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
221   verify_activity_domains_valid(&accept, &actor.actor_id(), false)?;
222
223   let object = accept.object().to_owned().one().context(location_info!())?;
224   let follow = Follow::from_any_base(object)?.context(location_info!())?;
225   verify_activity_domains_valid(&follow, &user.actor_id(), false)?;
226
227   let community_uri = accept
228     .actor()?
229     .to_owned()
230     .single_xsd_any_uri()
231     .context(location_info!())?;
232
233   let community =
234     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
235
236   let community_id = community.id;
237   let user_id = user.id;
238   // This will throw an error if no follow was requested
239   blocking(&context.pool(), move |conn| {
240     CommunityFollower::follow_accepted(conn, community_id, user_id)
241   })
242   .await??;
243
244   Ok(())
245 }
246
247 #[derive(EnumString)]
248 enum AnnouncableActivities {
249   Create,
250   Update,
251   Like,
252   Dislike,
253   Delete,
254   Remove,
255   Undo,
256   Add,
257 }
258
259 /// Takes an announce and passes the inner activity to the appropriate handler.
260 pub async fn receive_announce(
261   context: &LemmyContext,
262   activity: AnyBase,
263   actor: &dyn ActorType,
264   request_counter: &mut i32,
265 ) -> Result<(), LemmyError> {
266   let announce = Announce::from_any_base(activity)?.context(location_info!())?;
267   verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
268   is_addressed_to_public(&announce)?;
269
270   let kind = announce
271     .object()
272     .as_single_kind_str()
273     .and_then(|s| s.parse().ok());
274   let inner_activity = announce
275     .object()
276     .to_owned()
277     .one()
278     .context(location_info!())?;
279
280   let inner_id = inner_activity.id().context(location_info!())?.to_owned();
281   check_is_apub_id_valid(&inner_id)?;
282   if is_activity_already_known(context.pool(), &inner_id).await? {
283     return Ok(());
284   }
285
286   use AnnouncableActivities::*;
287   match kind {
288     Some(Create) => {
289       receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
290     }
291     Some(Update) => {
292       receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
293     }
294     Some(Like) => {
295       receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
296     }
297     Some(Dislike) => {
298       receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
299     }
300     Some(Delete) => receive_delete_for_community(context, inner_activity, &inner_id).await,
301     Some(Remove) => {
302       receive_remove_for_community(context, inner_activity, &inner_id, request_counter).await
303     }
304     Some(Undo) => {
305       receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
306     }
307     Some(Add) => {
308       receive_add_for_community(context, inner_activity, &inner_id, request_counter).await
309     }
310     _ => receive_unhandled_activity(inner_activity),
311   }
312 }
313
314 async fn receive_create(
315   context: &LemmyContext,
316   activity: AnyBase,
317   expected_domain: Url,
318   request_counter: &mut i32,
319 ) -> Result<(), LemmyError> {
320   let create = Create::from_any_base(activity)?.context(location_info!())?;
321   verify_activity_domains_valid(&create, &expected_domain, true)?;
322   if is_addressed_to_public(&create).is_ok() {
323     receive_create_comment(create, context, request_counter).await
324   } else {
325     receive_create_private_message(&context, create, expected_domain, request_counter).await
326   }
327 }
328
329 async fn receive_update(
330   context: &LemmyContext,
331   activity: AnyBase,
332   expected_domain: Url,
333   request_counter: &mut i32,
334 ) -> Result<(), LemmyError> {
335   let update = Update::from_any_base(activity)?.context(location_info!())?;
336   verify_activity_domains_valid(&update, &expected_domain, true)?;
337   if is_addressed_to_public(&update).is_ok() {
338     receive_update_comment(update, context, request_counter).await
339   } else {
340     receive_update_private_message(&context, update, expected_domain, request_counter).await
341   }
342 }
343
344 async fn receive_delete(
345   context: &LemmyContext,
346   any_base: AnyBase,
347   expected_domain: &Url,
348   request_counter: &mut i32,
349 ) -> Result<(), LemmyError> {
350   use CommunityOrPrivateMessage::*;
351
352   let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
353   verify_activity_domains_valid(&delete, expected_domain, true)?;
354   let object_uri = delete
355     .object()
356     .to_owned()
357     .single_xsd_any_uri()
358     .context(location_info!())?;
359
360   match find_community_or_private_message_by_id(context, object_uri).await? {
361     Community(c) => receive_delete_community(context, c).await,
362     PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
363   }
364 }
365
366 async fn receive_undo(
367   context: &LemmyContext,
368   any_base: AnyBase,
369   expected_domain: &Url,
370   request_counter: &mut i32,
371 ) -> Result<(), LemmyError> {
372   use CommunityOrPrivateMessage::*;
373   let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
374   verify_activity_domains_valid(&undo, expected_domain, true)?;
375
376   let inner_activity = undo.object().to_owned().one().context(location_info!())?;
377   let kind = inner_activity.kind_str();
378   match kind {
379     Some("Delete") => {
380       let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
381       verify_activity_domains_valid(&delete, expected_domain, true)?;
382       let object_uri = delete
383         .object()
384         .to_owned()
385         .single_xsd_any_uri()
386         .context(location_info!())?;
387       match find_community_or_private_message_by_id(context, object_uri).await? {
388         Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
389         PrivateMessage(p) => {
390           receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
391             .await
392         }
393       }
394     }
395     Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
396     _ => receive_unhandled_activity(undo),
397   }
398 }
399 enum CommunityOrPrivateMessage {
400   Community(Community),
401   PrivateMessage(PrivateMessage),
402 }
403
404 async fn find_community_or_private_message_by_id(
405   context: &LemmyContext,
406   apub_id: Url,
407 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
408   let ap_id = apub_id.to_owned();
409   let community = blocking(context.pool(), move |conn| {
410     Community::read_from_apub_id(conn, &ap_id.into())
411   })
412   .await?;
413   if let Ok(c) = community {
414     return Ok(CommunityOrPrivateMessage::Community(c));
415   }
416
417   let ap_id = apub_id.to_owned();
418   let private_message = blocking(context.pool(), move |conn| {
419     PrivateMessage::read_from_apub_id(conn, &ap_id.into())
420   })
421   .await?;
422   if let Ok(p) = private_message {
423     return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
424   }
425
426   Err(NotFound.into())
427 }