]> Untitled Git - lemmy.git/blob - lemmy_apub/src/inbox/user_inbox.rs
653a447c5489dec347f57524ecd8489a18ed1b29
[lemmy.git] / lemmy_apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::verify_activity_domains_valid,
3   check_is_apub_id_valid,
4   extensions::signatures::verify_signature,
5   fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
6   inbox::{get_activity_id, is_activity_already_known},
7   insert_activity,
8   ActorType,
9   FromApub,
10 };
11 use activitystreams::{
12   activity::{Accept, ActorAndObject, Create, Delete, Follow, Undo, Update},
13   base::AnyBase,
14   object::Note,
15   prelude::*,
16 };
17 use actix_web::{web, HttpRequest, HttpResponse};
18 use anyhow::{anyhow, Context};
19 use lemmy_db::{
20   community::{CommunityFollower, CommunityFollowerForm},
21   private_message::{PrivateMessage, PrivateMessageForm},
22   private_message_view::PrivateMessageView,
23   user::User_,
24   Crud,
25   Followable,
26 };
27 use lemmy_structs::{blocking, user::PrivateMessageResponse};
28 use lemmy_utils::{location_info, LemmyError};
29 use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
30 use log::debug;
31 use serde::{Deserialize, Serialize};
32 use std::fmt::Debug;
33
34 /// Allowed activities for user inbox.
35 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
36 #[serde(rename_all = "PascalCase")]
37 pub enum ValidTypes {
38   Accept,
39   Create,
40   Update,
41   Delete,
42   Undo,
43 }
44
45 pub type AcceptedActivities = ActorAndObject<ValidTypes>;
46
47 /// Handler for all incoming activities to user inboxes.
48 pub async fn user_inbox(
49   request: HttpRequest,
50   input: web::Json<AcceptedActivities>,
51   path: web::Path<String>,
52   context: web::Data<LemmyContext>,
53 ) -> Result<HttpResponse, LemmyError> {
54   let activity = input.into_inner();
55   let username = path.into_inner();
56   let user = blocking(&context.pool(), move |conn| {
57     User_::read_from_name(&conn, &username)
58   })
59   .await??;
60
61   let to = activity
62     .to()
63     .context(location_info!())?
64     .to_owned()
65     .single_xsd_any_uri();
66   if Some(user.actor_id()?) != to {
67     return Err(anyhow!("Activity delivered to wrong user").into());
68   }
69
70   let actor_uri = activity
71     .actor()?
72     .as_single_xsd_any_uri()
73     .context(location_info!())?;
74   debug!(
75     "User {} inbox received activity {:?} from {}",
76     user.name,
77     &activity.id_unchecked(),
78     &actor_uri
79   );
80
81   check_is_apub_id_valid(actor_uri)?;
82
83   let request_counter = &mut 0;
84   let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
85   verify_signature(&request, actor.as_ref())?;
86
87   let activity_id = get_activity_id(&activity, actor_uri)?;
88   if is_activity_already_known(context.pool(), &activity_id).await? {
89     return Ok(HttpResponse::Ok().finish());
90   }
91
92   let any_base = activity.clone().into_any_base()?;
93   let kind = activity.kind().context(location_info!())?;
94   let res = match kind {
95     ValidTypes::Accept => {
96       receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
97     }
98     ValidTypes::Create => {
99       receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
100     }
101     ValidTypes::Update => {
102       receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
103     }
104     ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
105     ValidTypes::Undo => {
106       receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
107     }
108   };
109
110   insert_activity(
111     &activity_id,
112     actor.user_id(),
113     activity.clone(),
114     false,
115     context.pool(),
116   )
117   .await?;
118   res
119 }
120
121 /// Handle accepted follows.
122 async fn receive_accept(
123   context: &LemmyContext,
124   activity: AnyBase,
125   actor: &dyn ActorType,
126   user: User_,
127   request_counter: &mut i32,
128 ) -> Result<HttpResponse, LemmyError> {
129   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
130   verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
131
132   // TODO: we should check that we actually sent this activity, because the remote instance
133   //       could just put a fake Follow
134   let object = accept.object().to_owned().one().context(location_info!())?;
135   let follow = Follow::from_any_base(object)?.context(location_info!())?;
136   verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
137
138   let community_uri = accept
139     .actor()?
140     .to_owned()
141     .single_xsd_any_uri()
142     .context(location_info!())?;
143
144   let community =
145     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
146
147   // Now you need to add this to the community follower
148   let community_follower_form = CommunityFollowerForm {
149     community_id: community.id,
150     user_id: user.id,
151   };
152
153   // This will fail if they're already a follower
154   blocking(&context.pool(), move |conn| {
155     CommunityFollower::follow(conn, &community_follower_form).ok()
156   })
157   .await?;
158
159   Ok(HttpResponse::Ok().finish())
160 }
161
162 async fn receive_create_private_message(
163   context: &LemmyContext,
164   activity: AnyBase,
165   actor: &dyn ActorType,
166   request_counter: &mut i32,
167 ) -> Result<HttpResponse, LemmyError> {
168   let create = Create::from_any_base(activity)?.context(location_info!())?;
169   verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
170
171   let note = Note::from_any_base(
172     create
173       .object()
174       .as_one()
175       .context(location_info!())?
176       .to_owned(),
177   )?
178   .context(location_info!())?;
179
180   let private_message =
181     PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
182
183   let inserted_private_message = blocking(&context.pool(), move |conn| {
184     PrivateMessage::create(conn, &private_message)
185   })
186   .await??;
187
188   let message = blocking(&context.pool(), move |conn| {
189     PrivateMessageView::read(conn, inserted_private_message.id)
190   })
191   .await??;
192
193   let res = PrivateMessageResponse { message };
194
195   let recipient_id = res.message.recipient_id;
196
197   context.chat_server().do_send(SendUserRoomMessage {
198     op: UserOperation::CreatePrivateMessage,
199     response: res,
200     recipient_id,
201     websocket_id: None,
202   });
203
204   Ok(HttpResponse::Ok().finish())
205 }
206
207 async fn receive_update_private_message(
208   context: &LemmyContext,
209   activity: AnyBase,
210   actor: &dyn ActorType,
211   request_counter: &mut i32,
212 ) -> Result<HttpResponse, LemmyError> {
213   let update = Update::from_any_base(activity)?.context(location_info!())?;
214   verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
215
216   let object = update
217     .object()
218     .as_one()
219     .context(location_info!())?
220     .to_owned();
221   let note = Note::from_any_base(object)?.context(location_info!())?;
222
223   let private_message_form =
224     PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
225
226   let private_message_ap_id = private_message_form
227     .ap_id
228     .as_ref()
229     .context(location_info!())?
230     .clone();
231   let private_message = blocking(&context.pool(), move |conn| {
232     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
233   })
234   .await??;
235
236   let private_message_id = private_message.id;
237   blocking(&context.pool(), move |conn| {
238     PrivateMessage::update(conn, private_message_id, &private_message_form)
239   })
240   .await??;
241
242   let private_message_id = private_message.id;
243   let message = blocking(&context.pool(), move |conn| {
244     PrivateMessageView::read(conn, private_message_id)
245   })
246   .await??;
247
248   let res = PrivateMessageResponse { message };
249
250   let recipient_id = res.message.recipient_id;
251
252   context.chat_server().do_send(SendUserRoomMessage {
253     op: UserOperation::EditPrivateMessage,
254     response: res,
255     recipient_id,
256     websocket_id: None,
257   });
258
259   Ok(HttpResponse::Ok().finish())
260 }
261
262 async fn receive_delete_private_message(
263   context: &LemmyContext,
264   activity: AnyBase,
265   actor: &dyn ActorType,
266 ) -> Result<HttpResponse, LemmyError> {
267   let delete = Delete::from_any_base(activity)?.context(location_info!())?;
268   verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
269
270   let private_message_id = delete
271     .object()
272     .to_owned()
273     .single_xsd_any_uri()
274     .context(location_info!())?;
275   let private_message = blocking(context.pool(), move |conn| {
276     PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
277   })
278   .await??;
279   let deleted_private_message = blocking(context.pool(), move |conn| {
280     PrivateMessage::update_deleted(conn, private_message.id, true)
281   })
282   .await??;
283
284   let message = blocking(&context.pool(), move |conn| {
285     PrivateMessageView::read(&conn, deleted_private_message.id)
286   })
287   .await??;
288
289   let res = PrivateMessageResponse { message };
290   let recipient_id = res.message.recipient_id;
291   context.chat_server().do_send(SendUserRoomMessage {
292     op: UserOperation::EditPrivateMessage,
293     response: res,
294     recipient_id,
295     websocket_id: None,
296   });
297
298   Ok(HttpResponse::Ok().finish())
299 }
300
301 async fn receive_undo_delete_private_message(
302   context: &LemmyContext,
303   activity: AnyBase,
304   actor: &dyn ActorType,
305 ) -> Result<HttpResponse, LemmyError> {
306   let undo = Undo::from_any_base(activity)?.context(location_info!())?;
307   verify_activity_domains_valid(&undo, actor.actor_id()?, true)?;
308   let object = undo.object().to_owned().one().context(location_info!())?;
309   let delete = Delete::from_any_base(object)?.context(location_info!())?;
310   verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
311
312   let private_message_id = delete
313     .object()
314     .to_owned()
315     .single_xsd_any_uri()
316     .context(location_info!())?;
317   let private_message = blocking(context.pool(), move |conn| {
318     PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
319   })
320   .await??;
321   let deleted_private_message = blocking(context.pool(), move |conn| {
322     PrivateMessage::update_deleted(conn, private_message.id, false)
323   })
324   .await??;
325
326   let message = blocking(&context.pool(), move |conn| {
327     PrivateMessageView::read(&conn, deleted_private_message.id)
328   })
329   .await??;
330
331   let res = PrivateMessageResponse { message };
332   let recipient_id = res.message.recipient_id;
333   context.chat_server().do_send(SendUserRoomMessage {
334     op: UserOperation::EditPrivateMessage,
335     response: res,
336     recipient_id,
337     websocket_id: None,
338   });
339
340   Ok(HttpResponse::Ok().finish())
341 }