]> Untitled Git - lemmy.git/blob - lemmy_apub/src/inbox/user_inbox.rs
Move websocket code into workspace (#107)
[lemmy.git] / lemmy_apub / src / inbox / user_inbox.rs
1 use crate::{
2   check_is_apub_id_valid,
3   extensions::signatures::verify,
4   fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
5   insert_activity,
6   FromApub,
7 };
8 use activitystreams::{
9   activity::{Accept, ActorAndObject, Create, Delete, Undo, Update},
10   base::AnyBase,
11   object::Note,
12   prelude::*,
13 };
14 use actix_web::{web, HttpRequest, HttpResponse};
15 use anyhow::Context;
16 use lemmy_db::{
17   community::{CommunityFollower, CommunityFollowerForm},
18   naive_now,
19   private_message::{PrivateMessage, PrivateMessageForm},
20   private_message_view::PrivateMessageView,
21   user::User_,
22   Crud,
23   Followable,
24 };
25 use lemmy_structs::{blocking, user::PrivateMessageResponse};
26 use lemmy_utils::{location_info, LemmyError};
27 use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
28 use log::debug;
29 use serde::{Deserialize, Serialize};
30 use std::fmt::Debug;
31
32 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
33 #[serde(rename_all = "PascalCase")]
34 pub enum ValidTypes {
35   Accept,
36   Create,
37   Update,
38   Delete,
39   Undo,
40 }
41
42 pub type AcceptedActivities = ActorAndObject<ValidTypes>;
43
44 /// Handler for all incoming activities to user inboxes.
45 pub async fn user_inbox(
46   request: HttpRequest,
47   input: web::Json<AcceptedActivities>,
48   path: web::Path<String>,
49   context: web::Data<LemmyContext>,
50 ) -> Result<HttpResponse, LemmyError> {
51   let activity = input.into_inner();
52   let username = path.into_inner();
53   debug!("User {} received activity: {:?}", &username, &activity);
54
55   let actor_uri = activity
56     .actor()?
57     .as_single_xsd_any_uri()
58     .context(location_info!())?;
59
60   check_is_apub_id_valid(actor_uri)?;
61
62   let actor = get_or_fetch_and_upsert_actor(actor_uri, &context).await?;
63   verify(&request, actor.as_ref())?;
64
65   let any_base = activity.clone().into_any_base()?;
66   let kind = activity.kind().context(location_info!())?;
67   let res = match kind {
68     ValidTypes::Accept => receive_accept(any_base, username, &context).await,
69     ValidTypes::Create => receive_create_private_message(any_base, &context).await,
70     ValidTypes::Update => receive_update_private_message(any_base, &context).await,
71     ValidTypes::Delete => receive_delete_private_message(any_base, &context).await,
72     ValidTypes::Undo => receive_undo_delete_private_message(any_base, &context).await,
73   };
74
75   insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
76   res
77 }
78
79 /// Handle accepted follows.
80 async fn receive_accept(
81   activity: AnyBase,
82   username: String,
83   context: &LemmyContext,
84 ) -> Result<HttpResponse, LemmyError> {
85   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
86   let community_uri = accept
87     .actor()?
88     .to_owned()
89     .single_xsd_any_uri()
90     .context(location_info!())?;
91
92   let community = get_or_fetch_and_upsert_community(&community_uri, context).await?;
93
94   let user = blocking(&context.pool(), move |conn| {
95     User_::read_from_name(conn, &username)
96   })
97   .await??;
98
99   // Now you need to add this to the community follower
100   let community_follower_form = CommunityFollowerForm {
101     community_id: community.id,
102     user_id: user.id,
103   };
104
105   // This will fail if they're already a follower
106   blocking(&context.pool(), move |conn| {
107     CommunityFollower::follow(conn, &community_follower_form).ok()
108   })
109   .await?;
110
111   // TODO: make sure that we actually requested a follow
112   Ok(HttpResponse::Ok().finish())
113 }
114
115 async fn receive_create_private_message(
116   activity: AnyBase,
117   context: &LemmyContext,
118 ) -> Result<HttpResponse, LemmyError> {
119   let create = Create::from_any_base(activity)?.context(location_info!())?;
120   let note = Note::from_any_base(
121     create
122       .object()
123       .as_one()
124       .context(location_info!())?
125       .to_owned(),
126   )?
127   .context(location_info!())?;
128
129   let domain = Some(create.id_unchecked().context(location_info!())?.to_owned());
130   let private_message = PrivateMessageForm::from_apub(&note, context, domain).await?;
131
132   let inserted_private_message = blocking(&context.pool(), move |conn| {
133     PrivateMessage::create(conn, &private_message)
134   })
135   .await??;
136
137   let message = blocking(&context.pool(), move |conn| {
138     PrivateMessageView::read(conn, inserted_private_message.id)
139   })
140   .await??;
141
142   let res = PrivateMessageResponse { message };
143
144   let recipient_id = res.message.recipient_id;
145
146   context.chat_server().do_send(SendUserRoomMessage {
147     op: UserOperation::CreatePrivateMessage,
148     response: res,
149     recipient_id,
150     websocket_id: None,
151   });
152
153   Ok(HttpResponse::Ok().finish())
154 }
155
156 async fn receive_update_private_message(
157   activity: AnyBase,
158   context: &LemmyContext,
159 ) -> Result<HttpResponse, LemmyError> {
160   let update = Update::from_any_base(activity)?.context(location_info!())?;
161   let note = Note::from_any_base(
162     update
163       .object()
164       .as_one()
165       .context(location_info!())?
166       .to_owned(),
167   )?
168   .context(location_info!())?;
169
170   let domain = Some(update.id_unchecked().context(location_info!())?.to_owned());
171   let private_message_form = PrivateMessageForm::from_apub(&note, context, domain).await?;
172
173   let private_message_ap_id = private_message_form
174     .ap_id
175     .as_ref()
176     .context(location_info!())?
177     .clone();
178   let private_message = blocking(&context.pool(), move |conn| {
179     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
180   })
181   .await??;
182
183   let private_message_id = private_message.id;
184   blocking(&context.pool(), move |conn| {
185     PrivateMessage::update(conn, private_message_id, &private_message_form)
186   })
187   .await??;
188
189   let private_message_id = private_message.id;
190   let message = blocking(&context.pool(), move |conn| {
191     PrivateMessageView::read(conn, private_message_id)
192   })
193   .await??;
194
195   let res = PrivateMessageResponse { message };
196
197   let recipient_id = res.message.recipient_id;
198
199   context.chat_server().do_send(SendUserRoomMessage {
200     op: UserOperation::EditPrivateMessage,
201     response: res,
202     recipient_id,
203     websocket_id: None,
204   });
205
206   Ok(HttpResponse::Ok().finish())
207 }
208
209 async fn receive_delete_private_message(
210   activity: AnyBase,
211   context: &LemmyContext,
212 ) -> Result<HttpResponse, LemmyError> {
213   let delete = Delete::from_any_base(activity)?.context(location_info!())?;
214   let note = Note::from_any_base(
215     delete
216       .object()
217       .as_one()
218       .context(location_info!())?
219       .to_owned(),
220   )?
221   .context(location_info!())?;
222
223   let domain = Some(delete.id_unchecked().context(location_info!())?.to_owned());
224   let private_message_form = PrivateMessageForm::from_apub(&note, context, domain).await?;
225
226   let private_message_ap_id = private_message_form.ap_id.context(location_info!())?;
227   let private_message = blocking(&context.pool(), move |conn| {
228     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
229   })
230   .await??;
231
232   let private_message_form = PrivateMessageForm {
233     content: private_message_form.content,
234     recipient_id: private_message.recipient_id,
235     creator_id: private_message.creator_id,
236     deleted: Some(true),
237     read: None,
238     ap_id: Some(private_message.ap_id),
239     local: private_message.local,
240     published: None,
241     updated: Some(naive_now()),
242   };
243
244   let private_message_id = private_message.id;
245   blocking(&context.pool(), move |conn| {
246     PrivateMessage::update(conn, private_message_id, &private_message_form)
247   })
248   .await??;
249
250   let private_message_id = private_message.id;
251   let message = blocking(&context.pool(), move |conn| {
252     PrivateMessageView::read(&conn, private_message_id)
253   })
254   .await??;
255
256   let res = PrivateMessageResponse { message };
257
258   let recipient_id = res.message.recipient_id;
259
260   context.chat_server().do_send(SendUserRoomMessage {
261     op: UserOperation::EditPrivateMessage,
262     response: res,
263     recipient_id,
264     websocket_id: None,
265   });
266
267   Ok(HttpResponse::Ok().finish())
268 }
269
270 async fn receive_undo_delete_private_message(
271   activity: AnyBase,
272   context: &LemmyContext,
273 ) -> Result<HttpResponse, LemmyError> {
274   let undo = Undo::from_any_base(activity)?.context(location_info!())?;
275   let delete = Delete::from_any_base(undo.object().as_one().context(location_info!())?.to_owned())?
276     .context(location_info!())?;
277   let note = Note::from_any_base(
278     delete
279       .object()
280       .as_one()
281       .context(location_info!())?
282       .to_owned(),
283   )?
284   .context(location_info!())?;
285
286   let domain = Some(undo.id_unchecked().context(location_info!())?.to_owned());
287   let private_message = PrivateMessageForm::from_apub(&note, context, domain).await?;
288
289   let private_message_ap_id = private_message
290     .ap_id
291     .as_ref()
292     .context(location_info!())?
293     .clone();
294   let private_message_id = blocking(&context.pool(), move |conn| {
295     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id).map(|pm| pm.id)
296   })
297   .await??;
298
299   let private_message_form = PrivateMessageForm {
300     content: private_message.content,
301     recipient_id: private_message.recipient_id,
302     creator_id: private_message.creator_id,
303     deleted: Some(false),
304     read: None,
305     ap_id: private_message.ap_id,
306     local: private_message.local,
307     published: None,
308     updated: Some(naive_now()),
309   };
310
311   blocking(&context.pool(), move |conn| {
312     PrivateMessage::update(conn, private_message_id, &private_message_form)
313   })
314   .await??;
315
316   let message = blocking(&context.pool(), move |conn| {
317     PrivateMessageView::read(&conn, private_message_id)
318   })
319   .await??;
320
321   let res = PrivateMessageResponse { message };
322
323   let recipient_id = res.message.recipient_id;
324
325   context.chat_server().do_send(SendUserRoomMessage {
326     op: UserOperation::EditPrivateMessage,
327     response: res,
328     recipient_id,
329     websocket_id: None,
330   });
331
332   Ok(HttpResponse::Ok().finish())
333 }