3 comment::{receive_create_comment, receive_update_comment},
5 receive_delete_community,
6 receive_remove_community,
7 receive_undo_delete_community,
8 receive_undo_remove_community,
11 receive_create_private_message,
12 receive_delete_private_message,
13 receive_undo_delete_private_message,
14 receive_update_private_message,
16 receive_unhandled_activity,
17 verify_activity_domains_valid,
19 check_is_apub_id_valid,
20 fetcher::community::get_or_fetch_and_upsert_community,
22 assert_activity_not_local,
24 get_activity_to_and_cc,
25 inbox_verify_http_signature,
26 is_activity_already_known,
27 is_addressed_to_community_followers,
28 is_addressed_to_local_user,
29 is_addressed_to_public,
30 receive_for_community::{
31 receive_create_for_community,
32 receive_delete_for_community,
33 receive_dislike_for_community,
34 receive_like_for_community,
35 receive_remove_for_community,
36 receive_undo_for_community,
37 receive_update_for_community,
43 use activitystreams::{
44 activity::{Accept, ActorAndObject, Announce, Create, Delete, Follow, Undo, Update},
48 use actix_web::{web, HttpRequest, HttpResponse};
49 use anyhow::{anyhow, Context};
51 use lemmy_db_queries::{source::user::User, ApubObject, Followable};
52 use lemmy_db_schema::source::{
53 community::{Community, CommunityFollower},
54 private_message::PrivateMessage,
57 use lemmy_structs::blocking;
58 use lemmy_utils::{location_info, LemmyError};
59 use lemmy_websocket::LemmyContext;
61 use serde::{Deserialize, Serialize};
65 /// Allowed activities for user inbox.
66 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
67 #[serde(rename_all = "PascalCase")]
68 pub enum UserValidTypes {
69 Accept, // community accepted our follow request
70 Create, // create private message
71 Update, // edit private message
72 Delete, // private message or community deleted by creator
73 Undo, // private message or community restored
74 Remove, // community removed by admin
75 Announce, // post, comment or vote in community
78 pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
80 /// Handler for all incoming activities to user inboxes.
81 pub async fn user_inbox(
83 input: web::Json<UserAcceptedActivities>,
84 path: web::Path<String>,
85 context: web::Data<LemmyContext>,
86 ) -> Result<HttpResponse, LemmyError> {
87 let activity = input.into_inner();
88 // First of all check the http signature
89 let request_counter = &mut 0;
90 let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
92 // Do nothing if we received the same activity before
93 let activity_id = get_activity_id(&activity, &actor.actor_id())?;
94 if is_activity_already_known(context.pool(), &activity_id).await? {
95 return Ok(HttpResponse::Ok().finish());
98 // Check if the activity is actually meant for us
99 let username = path.into_inner();
100 let user = blocking(&context.pool(), move |conn| {
101 User_::read_from_name(&conn, &username)
104 let to_and_cc = get_activity_to_and_cc(&activity);
105 // TODO: we should also accept activities that are sent to community followers
106 if !to_and_cc.contains(&&user.actor_id()) {
107 return Err(anyhow!("Activity delivered to wrong user").into());
110 assert_activity_not_local(&activity)?;
111 insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
114 "User {} received activity {:?} from {}",
116 &activity.id_unchecked(),
120 user_receive_message(
130 /// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
131 pub(crate) async fn user_receive_message(
132 activity: UserAcceptedActivities,
133 to_user: Option<User_>,
134 actor: &dyn ActorType,
135 context: &LemmyContext,
136 request_counter: &mut i32,
137 ) -> Result<HttpResponse, LemmyError> {
138 is_for_user_inbox(context, &activity).await?;
140 let any_base = activity.clone().into_any_base()?;
141 let kind = activity.kind().context(location_info!())?;
142 let actor_url = actor.actor_id();
144 UserValidTypes::Accept => {
145 receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
147 UserValidTypes::Announce => {
148 receive_announce(&context, any_base, actor, request_counter).await?
150 UserValidTypes::Create => {
151 receive_create(&context, any_base, actor_url, request_counter).await?
153 UserValidTypes::Update => {
154 receive_update(&context, any_base, actor_url, request_counter).await?
156 UserValidTypes::Delete => {
157 receive_delete(context, any_base, &actor_url, request_counter).await?
159 UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
160 UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
163 // TODO: would be logical to move websocket notification code here
165 Ok(HttpResponse::Ok().finish())
168 /// Returns true if the activity is addressed directly to one or more local users, or if it is
169 /// addressed to the followers collection of a remote community, and at least one local user follows
171 async fn is_for_user_inbox(
172 context: &LemmyContext,
173 activity: &UserAcceptedActivities,
174 ) -> Result<(), LemmyError> {
175 let to_and_cc = get_activity_to_and_cc(activity);
176 // Check if it is addressed directly to any local user
177 if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
181 // Check if it is addressed to any followers collection of a remote community, and that the
182 // community has local followers.
183 let community = is_addressed_to_community_followers(&to_and_cc, context.pool()).await?;
184 if let Some(c) = community {
185 let community_id = c.id;
186 let has_local_followers = blocking(&context.pool(), move |conn| {
187 CommunityFollower::has_local_followers(conn, community_id)
192 anyhow!("Remote activity cant be addressed to followers of local community").into(),
195 if has_local_followers {
200 Err(anyhow!("Not addressed for any local user").into())
203 /// Handle accepted follows.
204 async fn receive_accept(
205 context: &LemmyContext,
207 actor: &dyn ActorType,
209 request_counter: &mut i32,
210 ) -> Result<(), LemmyError> {
211 let accept = Accept::from_any_base(activity)?.context(location_info!())?;
212 verify_activity_domains_valid(&accept, &actor.actor_id(), false)?;
214 let object = accept.object().to_owned().one().context(location_info!())?;
215 let follow = Follow::from_any_base(object)?.context(location_info!())?;
216 verify_activity_domains_valid(&follow, &user.actor_id(), false)?;
218 let community_uri = accept
221 .single_xsd_any_uri()
222 .context(location_info!())?;
225 get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
227 let community_id = community.id;
228 let user_id = user.id;
229 // This will throw an error if no follow was requested
230 blocking(&context.pool(), move |conn| {
231 CommunityFollower::follow_accepted(conn, community_id, user_id)
238 /// Takes an announce and passes the inner activity to the appropriate handler.
239 pub async fn receive_announce(
240 context: &LemmyContext,
242 actor: &dyn ActorType,
243 request_counter: &mut i32,
244 ) -> Result<(), LemmyError> {
245 let announce = Announce::from_any_base(activity)?.context(location_info!())?;
246 verify_activity_domains_valid(&announce, &actor.actor_id(), false)?;
247 is_addressed_to_public(&announce)?;
249 let kind = announce.object().as_single_kind_str();
250 let inner_activity = announce
254 .context(location_info!())?;
256 let inner_id = inner_activity.id().context(location_info!())?.to_owned();
257 check_is_apub_id_valid(&inner_id)?;
258 if is_activity_already_known(context.pool(), &inner_id).await? {
264 receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
267 receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
270 receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
273 receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
275 Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await,
276 Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await,
278 receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
280 _ => receive_unhandled_activity(inner_activity),
284 async fn receive_create(
285 context: &LemmyContext,
287 expected_domain: Url,
288 request_counter: &mut i32,
289 ) -> Result<(), LemmyError> {
290 let create = Create::from_any_base(activity)?.context(location_info!())?;
291 verify_activity_domains_valid(&create, &expected_domain, true)?;
292 if is_addressed_to_public(&create).is_ok() {
293 receive_create_comment(create, context, request_counter).await
295 receive_create_private_message(&context, create, expected_domain, request_counter).await
299 async fn receive_update(
300 context: &LemmyContext,
302 expected_domain: Url,
303 request_counter: &mut i32,
304 ) -> Result<(), LemmyError> {
305 let update = Update::from_any_base(activity)?.context(location_info!())?;
306 verify_activity_domains_valid(&update, &expected_domain, true)?;
307 if is_addressed_to_public(&update).is_ok() {
308 receive_update_comment(update, context, request_counter).await
310 receive_update_private_message(&context, update, expected_domain, request_counter).await
314 async fn receive_delete(
315 context: &LemmyContext,
317 expected_domain: &Url,
318 request_counter: &mut i32,
319 ) -> Result<(), LemmyError> {
320 use CommunityOrPrivateMessage::*;
322 let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
323 verify_activity_domains_valid(&delete, expected_domain, true)?;
324 let object_uri = delete
327 .single_xsd_any_uri()
328 .context(location_info!())?;
330 match find_community_or_private_message_by_id(context, object_uri).await? {
331 Community(c) => receive_delete_community(context, c).await,
332 PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
336 async fn receive_undo(
337 context: &LemmyContext,
339 expected_domain: &Url,
340 request_counter: &mut i32,
341 ) -> Result<(), LemmyError> {
342 use CommunityOrPrivateMessage::*;
343 let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
344 verify_activity_domains_valid(&undo, expected_domain, true)?;
346 let inner_activity = undo.object().to_owned().one().context(location_info!())?;
347 let kind = inner_activity.kind_str();
350 let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
351 verify_activity_domains_valid(&delete, expected_domain, true)?;
352 let object_uri = delete
355 .single_xsd_any_uri()
356 .context(location_info!())?;
357 match find_community_or_private_message_by_id(context, object_uri).await? {
358 Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
359 PrivateMessage(p) => {
360 receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
365 Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
366 _ => receive_unhandled_activity(undo),
369 enum CommunityOrPrivateMessage {
370 Community(Community),
371 PrivateMessage(PrivateMessage),
374 async fn find_community_or_private_message_by_id(
375 context: &LemmyContext,
377 ) -> Result<CommunityOrPrivateMessage, LemmyError> {
378 let ap_id = apub_id.to_owned();
379 let community = blocking(context.pool(), move |conn| {
380 Community::read_from_apub_id(conn, &ap_id.into())
383 if let Ok(c) = community {
384 return Ok(CommunityOrPrivateMessage::Community(c));
387 let ap_id = apub_id.to_owned();
388 let private_message = blocking(context.pool(), move |conn| {
389 PrivateMessage::read_from_apub_id(conn, &ap_id.into())
392 if let Ok(p) = private_message {
393 return Ok(CommunityOrPrivateMessage::PrivateMessage(p));