]> Untitled Git - lemmy.git/blob - lemmy_apub/src/inbox/user_inbox.rs
In activity table, remove `user_id` and add `sensitive` (#127)
[lemmy.git] / lemmy_apub / src / inbox / user_inbox.rs
1 use crate::{
2   activities::receive::verify_activity_domains_valid,
3   check_is_apub_id_valid,
4   extensions::signatures::verify_signature,
5   fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
6   inbox::{get_activity_id, is_activity_already_known},
7   insert_activity,
8   ActorType,
9   FromApub,
10 };
11 use activitystreams::{
12   activity::{Accept, ActorAndObject, Create, Delete, Follow, Undo, Update},
13   base::AnyBase,
14   object::Note,
15   prelude::*,
16 };
17 use actix_web::{web, HttpRequest, HttpResponse};
18 use anyhow::{anyhow, Context};
19 use lemmy_db::{
20   community::{CommunityFollower, CommunityFollowerForm},
21   private_message::{PrivateMessage, PrivateMessageForm},
22   private_message_view::PrivateMessageView,
23   user::User_,
24   Crud,
25   Followable,
26 };
27 use lemmy_structs::{blocking, user::PrivateMessageResponse};
28 use lemmy_utils::{location_info, LemmyError};
29 use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
30 use log::debug;
31 use serde::{Deserialize, Serialize};
32 use std::fmt::Debug;
33
34 /// Allowed activities for user inbox.
35 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
36 #[serde(rename_all = "PascalCase")]
37 pub enum ValidTypes {
38   Accept,
39   Create,
40   Update,
41   Delete,
42   Undo,
43 }
44
45 pub type AcceptedActivities = ActorAndObject<ValidTypes>;
46
47 /// Handler for all incoming activities to user inboxes.
48 pub async fn user_inbox(
49   request: HttpRequest,
50   input: web::Json<AcceptedActivities>,
51   path: web::Path<String>,
52   context: web::Data<LemmyContext>,
53 ) -> Result<HttpResponse, LemmyError> {
54   let activity = input.into_inner();
55   let username = path.into_inner();
56   let user = blocking(&context.pool(), move |conn| {
57     User_::read_from_name(&conn, &username)
58   })
59   .await??;
60
61   let to = activity
62     .to()
63     .context(location_info!())?
64     .to_owned()
65     .single_xsd_any_uri();
66   if Some(user.actor_id()?) != to {
67     return Err(anyhow!("Activity delivered to wrong user").into());
68   }
69
70   let actor_uri = activity
71     .actor()?
72     .as_single_xsd_any_uri()
73     .context(location_info!())?;
74   debug!(
75     "User {} inbox received activity {:?} from {}",
76     user.name,
77     &activity.id_unchecked(),
78     &actor_uri
79   );
80
81   check_is_apub_id_valid(actor_uri)?;
82
83   let request_counter = &mut 0;
84   let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
85   verify_signature(&request, actor.as_ref())?;
86
87   let activity_id = get_activity_id(&activity, actor_uri)?;
88   if is_activity_already_known(context.pool(), &activity_id).await? {
89     return Ok(HttpResponse::Ok().finish());
90   }
91
92   let any_base = activity.clone().into_any_base()?;
93   let kind = activity.kind().context(location_info!())?;
94   let res = match kind {
95     ValidTypes::Accept => {
96       receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
97     }
98     ValidTypes::Create => {
99       receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
100     }
101     ValidTypes::Update => {
102       receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
103     }
104     ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
105     ValidTypes::Undo => {
106       receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
107     }
108   };
109
110   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
111   res
112 }
113
114 /// Handle accepted follows.
115 async fn receive_accept(
116   context: &LemmyContext,
117   activity: AnyBase,
118   actor: &dyn ActorType,
119   user: User_,
120   request_counter: &mut i32,
121 ) -> Result<HttpResponse, LemmyError> {
122   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
123   verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
124
125   // TODO: we should check that we actually sent this activity, because the remote instance
126   //       could just put a fake Follow
127   let object = accept.object().to_owned().one().context(location_info!())?;
128   let follow = Follow::from_any_base(object)?.context(location_info!())?;
129   verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
130
131   let community_uri = accept
132     .actor()?
133     .to_owned()
134     .single_xsd_any_uri()
135     .context(location_info!())?;
136
137   let community =
138     get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
139
140   // Now you need to add this to the community follower
141   let community_follower_form = CommunityFollowerForm {
142     community_id: community.id,
143     user_id: user.id,
144   };
145
146   // This will fail if they're already a follower
147   blocking(&context.pool(), move |conn| {
148     CommunityFollower::follow(conn, &community_follower_form).ok()
149   })
150   .await?;
151
152   Ok(HttpResponse::Ok().finish())
153 }
154
155 async fn receive_create_private_message(
156   context: &LemmyContext,
157   activity: AnyBase,
158   actor: &dyn ActorType,
159   request_counter: &mut i32,
160 ) -> Result<HttpResponse, LemmyError> {
161   let create = Create::from_any_base(activity)?.context(location_info!())?;
162   verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
163
164   let note = Note::from_any_base(
165     create
166       .object()
167       .as_one()
168       .context(location_info!())?
169       .to_owned(),
170   )?
171   .context(location_info!())?;
172
173   let private_message =
174     PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
175
176   let inserted_private_message = blocking(&context.pool(), move |conn| {
177     PrivateMessage::create(conn, &private_message)
178   })
179   .await??;
180
181   let message = blocking(&context.pool(), move |conn| {
182     PrivateMessageView::read(conn, inserted_private_message.id)
183   })
184   .await??;
185
186   let res = PrivateMessageResponse { message };
187
188   let recipient_id = res.message.recipient_id;
189
190   context.chat_server().do_send(SendUserRoomMessage {
191     op: UserOperation::CreatePrivateMessage,
192     response: res,
193     recipient_id,
194     websocket_id: None,
195   });
196
197   Ok(HttpResponse::Ok().finish())
198 }
199
200 async fn receive_update_private_message(
201   context: &LemmyContext,
202   activity: AnyBase,
203   actor: &dyn ActorType,
204   request_counter: &mut i32,
205 ) -> Result<HttpResponse, LemmyError> {
206   let update = Update::from_any_base(activity)?.context(location_info!())?;
207   verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
208
209   let object = update
210     .object()
211     .as_one()
212     .context(location_info!())?
213     .to_owned();
214   let note = Note::from_any_base(object)?.context(location_info!())?;
215
216   let private_message_form =
217     PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
218
219   let private_message_ap_id = private_message_form
220     .ap_id
221     .as_ref()
222     .context(location_info!())?
223     .clone();
224   let private_message = blocking(&context.pool(), move |conn| {
225     PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
226   })
227   .await??;
228
229   let private_message_id = private_message.id;
230   blocking(&context.pool(), move |conn| {
231     PrivateMessage::update(conn, private_message_id, &private_message_form)
232   })
233   .await??;
234
235   let private_message_id = private_message.id;
236   let message = blocking(&context.pool(), move |conn| {
237     PrivateMessageView::read(conn, private_message_id)
238   })
239   .await??;
240
241   let res = PrivateMessageResponse { message };
242
243   let recipient_id = res.message.recipient_id;
244
245   context.chat_server().do_send(SendUserRoomMessage {
246     op: UserOperation::EditPrivateMessage,
247     response: res,
248     recipient_id,
249     websocket_id: None,
250   });
251
252   Ok(HttpResponse::Ok().finish())
253 }
254
255 async fn receive_delete_private_message(
256   context: &LemmyContext,
257   activity: AnyBase,
258   actor: &dyn ActorType,
259 ) -> Result<HttpResponse, LemmyError> {
260   let delete = Delete::from_any_base(activity)?.context(location_info!())?;
261   verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
262
263   let private_message_id = delete
264     .object()
265     .to_owned()
266     .single_xsd_any_uri()
267     .context(location_info!())?;
268   let private_message = blocking(context.pool(), move |conn| {
269     PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
270   })
271   .await??;
272   let deleted_private_message = blocking(context.pool(), move |conn| {
273     PrivateMessage::update_deleted(conn, private_message.id, true)
274   })
275   .await??;
276
277   let message = blocking(&context.pool(), move |conn| {
278     PrivateMessageView::read(&conn, deleted_private_message.id)
279   })
280   .await??;
281
282   let res = PrivateMessageResponse { message };
283   let recipient_id = res.message.recipient_id;
284   context.chat_server().do_send(SendUserRoomMessage {
285     op: UserOperation::EditPrivateMessage,
286     response: res,
287     recipient_id,
288     websocket_id: None,
289   });
290
291   Ok(HttpResponse::Ok().finish())
292 }
293
294 async fn receive_undo_delete_private_message(
295   context: &LemmyContext,
296   activity: AnyBase,
297   actor: &dyn ActorType,
298 ) -> Result<HttpResponse, LemmyError> {
299   let undo = Undo::from_any_base(activity)?.context(location_info!())?;
300   verify_activity_domains_valid(&undo, actor.actor_id()?, true)?;
301   let object = undo.object().to_owned().one().context(location_info!())?;
302   let delete = Delete::from_any_base(object)?.context(location_info!())?;
303   verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
304
305   let private_message_id = delete
306     .object()
307     .to_owned()
308     .single_xsd_any_uri()
309     .context(location_info!())?;
310   let private_message = blocking(context.pool(), move |conn| {
311     PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
312   })
313   .await??;
314   let deleted_private_message = blocking(context.pool(), move |conn| {
315     PrivateMessage::update_deleted(conn, private_message.id, false)
316   })
317   .await??;
318
319   let message = blocking(&context.pool(), move |conn| {
320     PrivateMessageView::read(&conn, deleted_private_message.id)
321   })
322   .await??;
323
324   let res = PrivateMessageResponse { message };
325   let recipient_id = res.message.recipient_id;
326   context.chat_server().do_send(SendUserRoomMessage {
327     op: UserOperation::EditPrivateMessage,
328     response: res,
329     recipient_id,
330     websocket_id: None,
331   });
332
333   Ok(HttpResponse::Ok().finish())
334 }