]> Untitled Git - lemmy.git/commitdiff
Separate logic for user and community inbox
authorFelix Ableitner <me@nutomic.com>
Wed, 28 Oct 2020 16:14:18 +0000 (17:14 +0100)
committerFelix Ableitner <me@nutomic.com>
Mon, 9 Nov 2020 12:42:08 +0000 (13:42 +0100)
more refactoring with tons of changes:

- inbox functions return LemmyError instead of HttpResponse
- announce is done directly in community inbox
- reorganized functions for handling inbox activities
- additional checks for private messages
- moved inbox handler functions for post, comment, vote into separete file
- ensure that posts, comments etc are addressed to public (ref #1220)
- probably more

12 files changed:
lemmy_apub/src/activities/receive/comment.rs
lemmy_apub/src/activities/receive/comment_undo.rs
lemmy_apub/src/activities/receive/community.rs
lemmy_apub/src/activities/receive/mod.rs
lemmy_apub/src/activities/receive/post.rs
lemmy_apub/src/activities/receive/post_undo.rs
lemmy_apub/src/activities/receive/private_message.rs [new file with mode: 0644]
lemmy_apub/src/inbox/community_inbox.rs
lemmy_apub/src/inbox/mod.rs
lemmy_apub/src/inbox/receive_for_community.rs [new file with mode: 0644]
lemmy_apub/src/inbox/shared_inbox.rs
lemmy_apub/src/inbox/user_inbox.rs

index 3fe9c0b25c890fca10c971d55fee14e618ed916a..b60d4c950af188aa62c511993a46956dce245e00 100644 (file)
@@ -1,15 +1,14 @@
 use crate::{
-  activities::receive::{announce_if_community_is_local, get_actor_as_user},
+  activities::receive::get_actor_as_user,
   fetcher::get_or_fetch_and_insert_comment,
   ActorType,
   FromApub,
 };
 use activitystreams::{
-  activity::{ActorAndObjectRefExt, Create, Delete, Dislike, Like, Remove, Update},
+  activity::{ActorAndObjectRefExt, Create, Dislike, Like, Remove, Update},
   base::ExtendsExt,
   object::Note,
 };
-use actix_web::HttpResponse;
 use anyhow::Context;
 use lemmy_db::{
   comment::{Comment, CommentForm, CommentLike, CommentLikeForm},
@@ -26,7 +25,7 @@ pub(crate) async fn receive_create_comment(
   create: Create,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(&create, context, request_counter).await?;
   let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -73,15 +72,14 @@ pub(crate) async fn receive_create_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(create, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_update_comment(
   update: Update,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
   let user = get_actor_as_user(&update, context, request_counter).await?;
@@ -131,15 +129,14 @@ pub(crate) async fn receive_update_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(update, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_like_comment(
   like: Like,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
   let user = get_actor_as_user(&like, context, request_counter).await?;
@@ -183,15 +180,14 @@ pub(crate) async fn receive_like_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(like, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_dislike_comment(
   dislike: Dislike,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let note = Note::from_any_base(
     dislike
       .object()
@@ -241,16 +237,13 @@ pub(crate) async fn receive_dislike_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(dislike, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_delete_comment(
   context: &LemmyContext,
-  delete: Delete,
   comment: Comment,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let deleted_comment = blocking(context.pool(), move |conn| {
     Comment::update_deleted(conn, comment.id, true)
   })
@@ -276,15 +269,14 @@ pub(crate) async fn receive_delete_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(delete, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_remove_comment(
   context: &LemmyContext,
   _remove: Remove,
   comment: Comment,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let removed_comment = blocking(context.pool(), move |conn| {
     Comment::update_removed(conn, comment.id, true)
   })
@@ -310,5 +302,5 @@ pub(crate) async fn receive_remove_comment(
     websocket_id: None,
   });
 
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
index ec61b11164b1f5dde0acd2d4ee16ecffd9e9d64c..709e8481c3795fe1b5da61eb9712f298141bed39 100644 (file)
@@ -1,10 +1,9 @@
 use crate::{
-  activities::receive::{announce_if_community_is_local, get_actor_as_user},
+  activities::receive::get_actor_as_user,
   fetcher::get_or_fetch_and_insert_comment,
   FromApub,
 };
 use activitystreams::{activity::*, object::Note, prelude::*};
-use actix_web::HttpResponse;
 use anyhow::Context;
 use lemmy_db::{
   comment::{Comment, CommentForm, CommentLike},
@@ -16,11 +15,10 @@ use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
 
 pub(crate) async fn receive_undo_like_comment(
-  undo: Undo,
   like: &Like,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(like, context, request_counter).await?;
   let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -57,16 +55,14 @@ pub(crate) async fn receive_undo_like_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_dislike_comment(
-  undo: Undo,
   dislike: &Dislike,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(dislike, context, request_counter).await?;
   let note = Note::from_any_base(
     dislike
@@ -109,16 +105,13 @@ pub(crate) async fn receive_undo_dislike_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_delete_comment(
   context: &LemmyContext,
-  undo: Undo,
   comment: Comment,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let deleted_comment = blocking(context.pool(), move |conn| {
     Comment::update_deleted(conn, comment.id, false)
   })
@@ -145,16 +138,13 @@ pub(crate) async fn receive_undo_delete_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_remove_comment(
   context: &LemmyContext,
-  undo: Undo,
   comment: Comment,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let removed_comment = blocking(context.pool(), move |conn| {
     Comment::update_removed(conn, comment.id, false)
   })
@@ -181,6 +171,5 @@ pub(crate) async fn receive_undo_remove_comment(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
index 80aad5e799a119032c274b34a91766c026ee6e98..ed43b33e301c627817206ce440c6f8d99a392a61 100644 (file)
@@ -1,17 +1,19 @@
-use crate::activities::receive::announce_if_community_is_local;
-use activitystreams::activity::{Delete, Remove, Undo};
-use actix_web::HttpResponse;
+use crate::{activities::receive::verify_activity_domains_valid, inbox::is_addressed_to_public};
+use activitystreams::{
+  activity::{ActorAndObjectRefExt, Delete, Remove, Undo},
+  base::{AnyBase, ExtendsExt},
+};
+use anyhow::Context;
 use lemmy_db::{community::Community, community_view::CommunityView};
 use lemmy_structs::{blocking, community::CommunityResponse};
-use lemmy_utils::LemmyError;
+use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation};
+use url::Url;
 
 pub(crate) async fn receive_delete_community(
   context: &LemmyContext,
-  delete: Delete,
   community: Community,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let deleted_community = blocking(context.pool(), move |conn| {
     Community::update_deleted(conn, community.id, true)
   })
@@ -33,15 +35,28 @@ pub(crate) async fn receive_delete_community(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(delete, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_remove_community(
   context: &LemmyContext,
-  _remove: Remove,
-  community: Community,
-) -> Result<HttpResponse, LemmyError> {
+  activity: AnyBase,
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  let remove = Remove::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&remove, expected_domain, true)?;
+  is_addressed_to_public(&remove)?;
+
+  let community_uri = remove
+    .object()
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+  let community = blocking(context.pool(), move |conn| {
+    Community::read_from_actor_id(conn, community_uri.as_str())
+  })
+  .await??;
+
   let removed_community = blocking(context.pool(), move |conn| {
     Community::update_removed(conn, community.id, true)
   })
@@ -63,16 +78,21 @@ pub(crate) async fn receive_remove_community(
     websocket_id: None,
   });
 
-  // TODO: this should probably also call announce_if_community_is_local()
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_delete_community(
   context: &LemmyContext,
   undo: Undo,
   community: Community,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  is_addressed_to_public(&undo)?;
+  let inner = undo.object().to_owned().one().context(location_info!())?;
+  let delete = Delete::from_any_base(inner)?.context(location_info!())?;
+  verify_activity_domains_valid(&delete, expected_domain, true)?;
+  is_addressed_to_public(&delete)?;
+
   let deleted_community = blocking(context.pool(), move |conn| {
     Community::update_deleted(conn, community.id, false)
   })
@@ -94,16 +114,31 @@ pub(crate) async fn receive_undo_delete_community(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_remove_community(
   context: &LemmyContext,
   undo: Undo,
-  community: Community,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  is_addressed_to_public(&undo)?;
+
+  let inner = undo.object().to_owned().one().context(location_info!())?;
+  let remove = Remove::from_any_base(inner)?.context(location_info!())?;
+  verify_activity_domains_valid(&remove, &expected_domain, true)?;
+  is_addressed_to_public(&remove)?;
+
+  let community_uri = remove
+    .object()
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+  let community = blocking(context.pool(), move |conn| {
+    Community::read_from_actor_id(conn, community_uri.as_str())
+  })
+  .await??;
+
   let removed_community = blocking(context.pool(), move |conn| {
     Community::update_removed(conn, community.id, false)
   })
@@ -126,6 +161,5 @@ pub(crate) async fn receive_undo_remove_community(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
index 2756fd147cc0c4c6c16e578e2c64cedd349e71ea..1f17fe9f3bc958b1cd772ad1db7f712b0c43e977 100644 (file)
@@ -1,22 +1,14 @@
-use crate::{
-  fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
-  ActorType,
-};
+use crate::fetcher::get_or_fetch_and_upsert_user;
 use activitystreams::{
   activity::{ActorAndObjectRef, ActorAndObjectRefExt},
-  base::{AsBase, BaseExt, Extends, ExtendsExt},
+  base::{AsBase, BaseExt},
   error::DomainError,
-  object::{AsObject, ObjectExt},
 };
-use actix_web::HttpResponse;
-use anyhow::Context;
-use diesel::result::Error::NotFound;
-use lemmy_db::{comment::Comment, community::Community, post::Post, user::User_};
-use lemmy_structs::blocking;
+use anyhow::{anyhow, Context};
+use lemmy_db::user::User_;
 use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::LemmyContext;
 use log::debug;
-use serde::Serialize;
 use std::fmt::Debug;
 use url::Url;
 
@@ -25,46 +17,15 @@ pub(crate) mod comment_undo;
 pub(crate) mod community;
 pub(crate) mod post;
 pub(crate) mod post_undo;
+pub(crate) mod private_message;
 
 /// Return HTTP 501 for unsupported activities in inbox.
-pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, LemmyError>
+pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<(), LemmyError>
 where
   A: Debug,
 {
   debug!("received unhandled activity type: {:?}", activity);
-  Ok(HttpResponse::NotImplemented().finish())
-}
-
-/// Reads the destination community from the activity's `cc` field. If this refers to a local
-/// community, the activity is announced to all community followers.
-async fn announce_if_community_is_local<T, Kind>(
-  activity: T,
-  context: &LemmyContext,
-  request_counter: &mut i32,
-) -> Result<(), LemmyError>
-where
-  T: AsObject<Kind>,
-  T: Extends<Kind>,
-  Kind: Serialize,
-  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
-{
-  let cc = activity.cc().context(location_info!())?;
-  let cc = cc.as_many().context(location_info!())?;
-  let community_uri = cc
-    .first()
-    .context(location_info!())?
-    .as_xsd_any_uri()
-    .context(location_info!())?;
-  // TODO: we could just read from the local db here (and ignore if the community is not found)
-  let community =
-    get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
-
-  if community.local {
-    community
-      .send_announce(activity.into_any_base()?, context)
-      .await?;
-  }
-  Ok(())
+  Err(anyhow!("Activity not supported").into())
 }
 
 /// Reads the actor field of an activity and returns the corresponding `User_`.
@@ -81,49 +42,6 @@ where
   get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
 }
 
-pub(crate) enum FindResults {
-  Comment(Comment),
-  Community(Community),
-  Post(Post),
-}
-
-/// Tries to find a community, post or comment in the local database, without any network requests.
-/// This is used to handle deletions and removals, because in case we dont have the object, we can
-/// simply ignore the activity.
-pub(crate) async fn find_by_id(
-  context: &LemmyContext,
-  apub_id: Url,
-) -> Result<FindResults, LemmyError> {
-  let ap_id = apub_id.to_string();
-  let community = blocking(context.pool(), move |conn| {
-    Community::read_from_actor_id(conn, &ap_id)
-  })
-  .await?;
-  if let Ok(c) = community {
-    return Ok(FindResults::Community(c));
-  }
-
-  let ap_id = apub_id.to_string();
-  let post = blocking(context.pool(), move |conn| {
-    Post::read_from_apub_id(conn, &ap_id)
-  })
-  .await?;
-  if let Ok(p) = post {
-    return Ok(FindResults::Post(p));
-  }
-
-  let ap_id = apub_id.to_string();
-  let comment = blocking(context.pool(), move |conn| {
-    Comment::read_from_apub_id(conn, &ap_id)
-  })
-  .await?;
-  if let Ok(c) = comment {
-    return Ok(FindResults::Comment(c));
-  }
-
-  return Err(NotFound.into());
-}
-
 /// Ensure that the ID of an incoming activity comes from the same domain as the actor. Optionally
 /// also checks the ID of the inner object.
 ///
@@ -131,7 +49,7 @@ pub(crate) async fn find_by_id(
 /// HTTP signature.
 pub(crate) fn verify_activity_domains_valid<T, Kind>(
   activity: &T,
-  actor_id: Url,
+  actor_id: &Url,
   object_domain_must_match: bool,
 ) -> Result<(), LemmyError>
 where
index 0753b838f1d9ed420fdb926efa61fa3135406be9..80044237f65448a56f497cc08a79e5b51c33fccd 100644 (file)
@@ -1,15 +1,14 @@
 use crate::{
-  activities::receive::{announce_if_community_is_local, get_actor_as_user},
+  activities::receive::get_actor_as_user,
   fetcher::get_or_fetch_and_insert_post,
   ActorType,
   FromApub,
   PageExt,
 };
 use activitystreams::{
-  activity::{Create, Delete, Dislike, Like, Remove, Update},
+  activity::{Create, Dislike, Like, Remove, Update},
   prelude::*,
 };
-use actix_web::HttpResponse;
 use anyhow::Context;
 use lemmy_db::{
   post::{Post, PostForm, PostLike, PostLikeForm},
@@ -25,7 +24,7 @@ pub(crate) async fn receive_create_post(
   create: Create,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(&create, context, request_counter).await?;
   let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -51,15 +50,14 @@ pub(crate) async fn receive_create_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(create, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_update_post(
   update: Update,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(&update, context, request_counter).await?;
   let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -89,15 +87,14 @@ pub(crate) async fn receive_update_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(update, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_like_post(
   like: Like,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(&like, context, request_counter).await?;
   let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -134,15 +131,14 @@ pub(crate) async fn receive_like_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(like, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_dislike_post(
   dislike: Dislike,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(&dislike, context, request_counter).await?;
   let page = PageExt::from_any_base(
     dislike
@@ -185,16 +181,13 @@ pub(crate) async fn receive_dislike_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(dislike, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_delete_post(
   context: &LemmyContext,
-  delete: Delete,
   post: Post,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let deleted_post = blocking(context.pool(), move |conn| {
     Post::update_deleted(conn, post.id, true)
   })
@@ -214,15 +207,14 @@ pub(crate) async fn receive_delete_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(delete, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_remove_post(
   context: &LemmyContext,
   _remove: Remove,
   post: Post,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let removed_post = blocking(context.pool(), move |conn| {
     Post::update_removed(conn, post.id, true)
   })
@@ -242,5 +234,5 @@ pub(crate) async fn receive_remove_post(
     websocket_id: None,
   });
 
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
index bee56d863f98998c434e06286752d0aebc9f070e..99d0ed1d98b718c8dabbc5f567d47bfd10a76fbb 100644 (file)
@@ -1,11 +1,10 @@
 use crate::{
-  activities::receive::{announce_if_community_is_local, get_actor_as_user},
+  activities::receive::get_actor_as_user,
   fetcher::get_or_fetch_and_insert_post,
   FromApub,
   PageExt,
 };
 use activitystreams::{activity::*, prelude::*};
-use actix_web::HttpResponse;
 use anyhow::Context;
 use lemmy_db::{
   post::{Post, PostForm, PostLike},
@@ -17,11 +16,10 @@ use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
 
 pub(crate) async fn receive_undo_like_post(
-  undo: Undo,
   like: &Like,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(like, context, request_counter).await?;
   let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
     .context(location_info!())?;
@@ -52,16 +50,14 @@ pub(crate) async fn receive_undo_like_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_dislike_post(
-  undo: Undo,
   dislike: &Dislike,
   context: &LemmyContext,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let user = get_actor_as_user(dislike, context, request_counter).await?;
   let page = PageExt::from_any_base(
     dislike
@@ -98,16 +94,13 @@ pub(crate) async fn receive_undo_dislike_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_delete_post(
   context: &LemmyContext,
-  undo: Undo,
   post: Post,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let deleted_post = blocking(context.pool(), move |conn| {
     Post::update_deleted(conn, post.id, false)
   })
@@ -127,16 +120,13 @@ pub(crate) async fn receive_undo_delete_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
 pub(crate) async fn receive_undo_remove_post(
   context: &LemmyContext,
-  undo: Undo,
   post: Post,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let removed_post = blocking(context.pool(), move |conn| {
     Post::update_removed(conn, post.id, false)
   })
@@ -157,6 +147,5 @@ pub(crate) async fn receive_undo_remove_post(
     websocket_id: None,
   });
 
-  announce_if_community_is_local(undo, context, request_counter).await?;
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
diff --git a/lemmy_apub/src/activities/receive/private_message.rs b/lemmy_apub/src/activities/receive/private_message.rs
new file mode 100644 (file)
index 0000000..80fb0b7
--- /dev/null
@@ -0,0 +1,217 @@
+use crate::{
+  activities::receive::verify_activity_domains_valid,
+  check_is_apub_id_valid,
+  fetcher::get_or_fetch_and_upsert_user,
+  inbox::get_activity_to_and_cc,
+  FromApub,
+};
+use activitystreams::{
+  activity::{ActorAndObjectRefExt, Create, Delete, Undo, Update},
+  base::{AnyBase, AsBase, ExtendsExt},
+  object::{AsObject, Note},
+  public,
+};
+use anyhow::{anyhow, Context};
+use lemmy_db::{
+  private_message::{PrivateMessage, PrivateMessageForm},
+  private_message_view::PrivateMessageView,
+  Crud,
+};
+use lemmy_structs::{blocking, user::PrivateMessageResponse};
+use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
+use url::Url;
+
+pub(crate) async fn receive_create_private_message(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let create = Create::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&create, &expected_domain, true)?;
+  check_private_message_activity_valid(&create, context, request_counter).await?;
+
+  let note = Note::from_any_base(
+    create
+      .object()
+      .as_one()
+      .context(location_info!())?
+      .to_owned(),
+  )?
+  .context(location_info!())?;
+
+  let private_message =
+    PrivateMessageForm::from_apub(&note, context, Some(expected_domain), request_counter).await?;
+
+  let inserted_private_message = blocking(&context.pool(), move |conn| {
+    PrivateMessage::create(conn, &private_message)
+  })
+  .await??;
+
+  let message = blocking(&context.pool(), move |conn| {
+    PrivateMessageView::read(conn, inserted_private_message.id)
+  })
+  .await??;
+
+  let res = PrivateMessageResponse { message };
+
+  let recipient_id = res.message.recipient_id;
+
+  context.chat_server().do_send(SendUserRoomMessage {
+    op: UserOperation::CreatePrivateMessage,
+    response: res,
+    recipient_id,
+    websocket_id: None,
+  });
+
+  Ok(())
+}
+
+pub(crate) async fn receive_update_private_message(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let update = Update::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&update, &expected_domain, true)?;
+  check_private_message_activity_valid(&update, context, request_counter).await?;
+
+  let object = update
+    .object()
+    .as_one()
+    .context(location_info!())?
+    .to_owned();
+  let note = Note::from_any_base(object)?.context(location_info!())?;
+
+  let private_message_form =
+    PrivateMessageForm::from_apub(&note, context, Some(expected_domain), request_counter).await?;
+
+  let private_message_ap_id = private_message_form
+    .ap_id
+    .as_ref()
+    .context(location_info!())?
+    .clone();
+  let private_message = blocking(&context.pool(), move |conn| {
+    PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
+  })
+  .await??;
+
+  let private_message_id = private_message.id;
+  blocking(&context.pool(), move |conn| {
+    PrivateMessage::update(conn, private_message_id, &private_message_form)
+  })
+  .await??;
+
+  let private_message_id = private_message.id;
+  let message = blocking(&context.pool(), move |conn| {
+    PrivateMessageView::read(conn, private_message_id)
+  })
+  .await??;
+
+  let res = PrivateMessageResponse { message };
+
+  let recipient_id = res.message.recipient_id;
+
+  context.chat_server().do_send(SendUserRoomMessage {
+    op: UserOperation::EditPrivateMessage,
+    response: res,
+    recipient_id,
+    websocket_id: None,
+  });
+
+  Ok(())
+}
+
+pub(crate) async fn receive_delete_private_message(
+  context: &LemmyContext,
+  delete: Delete,
+  private_message: PrivateMessage,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  check_private_message_activity_valid(&delete, context, request_counter).await?;
+
+  let deleted_private_message = blocking(context.pool(), move |conn| {
+    PrivateMessage::update_deleted(conn, private_message.id, true)
+  })
+  .await??;
+
+  let message = blocking(&context.pool(), move |conn| {
+    PrivateMessageView::read(&conn, deleted_private_message.id)
+  })
+  .await??;
+
+  let res = PrivateMessageResponse { message };
+  let recipient_id = res.message.recipient_id;
+  context.chat_server().do_send(SendUserRoomMessage {
+    op: UserOperation::EditPrivateMessage,
+    response: res,
+    recipient_id,
+    websocket_id: None,
+  });
+
+  Ok(())
+}
+
+pub(crate) async fn receive_undo_delete_private_message(
+  context: &LemmyContext,
+  undo: Undo,
+  expected_domain: &Url,
+  private_message: PrivateMessage,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  check_private_message_activity_valid(&undo, context, request_counter).await?;
+  let object = undo.object().to_owned().one().context(location_info!())?;
+  let delete = Delete::from_any_base(object)?.context(location_info!())?;
+  verify_activity_domains_valid(&delete, expected_domain, true)?;
+  check_private_message_activity_valid(&delete, context, request_counter).await?;
+
+  let deleted_private_message = blocking(context.pool(), move |conn| {
+    PrivateMessage::update_deleted(conn, private_message.id, false)
+  })
+  .await??;
+
+  let message = blocking(&context.pool(), move |conn| {
+    PrivateMessageView::read(&conn, deleted_private_message.id)
+  })
+  .await??;
+
+  let res = PrivateMessageResponse { message };
+  let recipient_id = res.message.recipient_id;
+  context.chat_server().do_send(SendUserRoomMessage {
+    op: UserOperation::EditPrivateMessage,
+    response: res,
+    recipient_id,
+    websocket_id: None,
+  });
+
+  Ok(())
+}
+
+async fn check_private_message_activity_valid<T, Kind>(
+  activity: &T,
+  context: &LemmyContext,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError>
+where
+  T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
+{
+  let to_and_cc = get_activity_to_and_cc(activity)?;
+  if to_and_cc.len() != 1 {
+    return Err(anyhow!("Private message can only be addressed to one user").into());
+  }
+  if to_and_cc.contains(&public()) {
+    return Err(anyhow!("Private message cant be public").into());
+  }
+  let user_id = activity
+    .actor()?
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+  check_is_apub_id_valid(&user_id)?;
+  // check that the sender is a user, not a community
+  get_or_fetch_and_upsert_user(&user_id, &context, request_counter).await?;
+
+  Ok(())
+}
index b80d739a7fbf386749623402c97b5fcd634641dc..630a2db924091b0d9630b71075f934e57d5044c6 100644 (file)
@@ -1,14 +1,25 @@
 use crate::{
   activities::receive::verify_activity_domains_valid,
-  check_is_apub_id_valid,
-  extensions::signatures::verify_signature,
-  fetcher::get_or_fetch_and_upsert_user,
-  inbox::{get_activity_id, is_activity_already_known},
+  inbox::{
+    get_activity_id,
+    get_activity_to_and_cc,
+    inbox_verify_http_signature,
+    is_activity_already_known,
+    is_addressed_to_public,
+    receive_for_community::{
+      receive_create_for_community,
+      receive_delete_for_community,
+      receive_dislike_for_community,
+      receive_like_for_community,
+      receive_undo_for_community,
+      receive_update_for_community,
+    },
+  },
   insert_activity,
   ActorType,
 };
 use activitystreams::{
-  activity::{ActorAndObject, Follow, Undo},
+  activity::{kind::FollowType, ActorAndObject, Follow, Undo},
   base::AnyBase,
   prelude::*,
 };
@@ -25,89 +36,153 @@ use lemmy_websocket::LemmyContext;
 use log::info;
 use serde::{Deserialize, Serialize};
 use std::fmt::Debug;
+use url::Url;
 
 /// Allowed activities for community inbox.
 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
 #[serde(rename_all = "PascalCase")]
-pub enum ValidTypes {
-  Follow,
-  Undo,
+pub enum CommunityValidTypes {
+  Follow,  // follow request from a user
+  Undo,    // unfollow from a user
+  Create,  // create post or comment
+  Update,  // update post or comment
+  Like,    // upvote post or comment
+  Dislike, // downvote post or comment
+  Delete,  // post or comment deleted by creator
+  Remove,  // post or comment removed by mod or admin
 }
 
-pub type AcceptedActivities = ActorAndObject<ValidTypes>;
+pub type CommunityAcceptedActivities = ActorAndObject<CommunityValidTypes>;
 
 /// Handler for all incoming receive to community inboxes.
 pub async fn community_inbox(
   request: HttpRequest,
-  input: web::Json<AcceptedActivities>,
+  input: web::Json<CommunityAcceptedActivities>,
   path: web::Path<String>,
   context: web::Data<LemmyContext>,
 ) -> Result<HttpResponse, LemmyError> {
   let activity = input.into_inner();
+  // First of all check the http signature
+  let request_counter = &mut 0;
+  let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
+
+  // Do nothing if we received the same activity before
+  let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
+  if is_activity_already_known(context.pool(), &activity_id).await? {
+    return Ok(HttpResponse::Ok().finish());
+  }
 
+  // Check if the activity is actually meant for us
   let path = path.into_inner();
   let community = blocking(&context.pool(), move |conn| {
     Community::read_from_name(&conn, &path)
   })
   .await??;
-
-  let to = activity
-    .to()
-    .context(location_info!())?
-    .to_owned()
-    .single_xsd_any_uri();
-  if Some(community.actor_id()?) != to {
+  let to_and_cc = get_activity_to_and_cc(&activity)?;
+  if !to_and_cc.contains(&&community.actor_id()?) {
     return Err(anyhow!("Activity delivered to wrong community").into());
   }
 
+  insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
+
   info!(
-    "Community {} received activity {:?}",
-    &community.name, &activity
-  );
-  let user_uri = activity
-    .actor()?
-    .as_single_xsd_any_uri()
-    .context(location_info!())?;
-  info!(
-    "Community {} inbox received activity {:?} from {}",
+    "Community {} received activity {:?} from {}",
     community.name,
     &activity.id_unchecked(),
-    &user_uri
+    &actor.actor_id_str()
   );
-  check_is_apub_id_valid(user_uri)?;
-
-  let request_counter = &mut 0;
-  let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?;
 
-  verify_signature(&request, &user)?;
+  community_receive_message(
+    activity.clone(),
+    community.clone(),
+    actor.as_ref(),
+    &context,
+    request_counter,
+  )
+  .await
+}
 
-  let activity_id = get_activity_id(&activity, user_uri)?;
-  if is_activity_already_known(context.pool(), &activity_id).await? {
-    return Ok(HttpResponse::Ok().finish());
-  }
+/// Receives Follow, Undo/Follow, post actions, comment actions (including votes)
+pub(crate) async fn community_receive_message(
+  activity: CommunityAcceptedActivities,
+  to_community: Community,
+  actor: &dyn ActorType,
+  context: &LemmyContext,
+  request_counter: &mut i32,
+) -> Result<HttpResponse, LemmyError> {
+  // TODO: check if the sending user is banned by the community
 
   let any_base = activity.clone().into_any_base()?;
-  let kind = activity.kind().context(location_info!())?;
-  let res = match kind {
-    ValidTypes::Follow => handle_follow(any_base, user, community, &context).await,
-    ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await,
+  let actor_url = actor.actor_id()?;
+  let activity_kind = activity.kind().context(location_info!())?;
+  let do_announce = match activity_kind {
+    CommunityValidTypes::Follow => {
+      handle_follow(any_base.clone(), actor_url, &to_community, &context).await?;
+      false
+    }
+    CommunityValidTypes::Undo => {
+      handle_undo(
+        context,
+        activity.clone(),
+        actor_url,
+        &to_community,
+        request_counter,
+      )
+      .await?
+    }
+    CommunityValidTypes::Create => {
+      receive_create_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+      true
+    }
+    CommunityValidTypes::Update => {
+      receive_update_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+      true
+    }
+    CommunityValidTypes::Like => {
+      receive_like_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+      true
+    }
+    CommunityValidTypes::Dislike => {
+      receive_dislike_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+      true
+    }
+    CommunityValidTypes::Delete => {
+      receive_delete_for_community(context, any_base.clone(), &actor_url).await?;
+      true
+    }
+    CommunityValidTypes::Remove => {
+      // TODO: we dont support remote mods, so this is ignored for now
+      //receive_remove_for_community(context, any_base.clone(), &user_url).await?
+      false
+    }
   };
 
-  insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
-  res
+  if do_announce {
+    // Check again that the activity is public, just to be sure
+    is_addressed_to_public(&activity)?;
+    to_community
+      .send_announce(activity.into_any_base()?, context)
+      .await?;
+  }
+
+  return Ok(HttpResponse::Ok().finish());
 }
 
 /// Handle a follow request from a remote user, adding the user as follower and returning an
 /// Accept activity.
 async fn handle_follow(
   activity: AnyBase,
-  user: User_,
-  community: Community,
+  user_url: Url,
+  community: &Community,
   context: &LemmyContext,
 ) -> Result<HttpResponse, LemmyError> {
   let follow = Follow::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
+  verify_activity_domains_valid(&follow, &user_url, false)?;
 
+  let user = blocking(&context.pool(), move |conn| {
+    User_::read_from_actor_id(&conn, user_url.as_str())
+  })
+  .await??;
   let community_follower_form = CommunityFollowerForm {
     community_id: community.id,
     user_id: user.id,
@@ -124,20 +199,44 @@ async fn handle_follow(
   Ok(HttpResponse::Ok().finish())
 }
 
+async fn handle_undo(
+  context: &LemmyContext,
+  activity: CommunityAcceptedActivities,
+  actor_url: Url,
+  to_community: &Community,
+  request_counter: &mut i32,
+) -> Result<bool, LemmyError> {
+  let inner_kind = activity
+    .object()
+    .is_single_kind(&FollowType::Follow.to_string());
+  let any_base = activity.into_any_base()?;
+  if inner_kind {
+    handle_undo_follow(any_base, actor_url, to_community, &context).await?;
+    Ok(false)
+  } else {
+    receive_undo_for_community(context, any_base, &actor_url, request_counter).await?;
+    Ok(true)
+  }
+}
+
 /// Handle `Undo/Follow` from a user, removing the user from followers list.
 async fn handle_undo_follow(
   activity: AnyBase,
-  user: User_,
-  community: Community,
+  user_url: Url,
+  community: &Community,
   context: &LemmyContext,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let undo = Undo::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&undo, user.actor_id()?, true)?;
+  verify_activity_domains_valid(&undo, &user_url, true)?;
 
   let object = undo.object().to_owned().one().context(location_info!())?;
   let follow = Follow::from_any_base(object)?.context(location_info!())?;
-  verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
+  verify_activity_domains_valid(&follow, &user_url, false)?;
 
+  let user = blocking(&context.pool(), move |conn| {
+    User_::read_from_actor_id(&conn, user_url.as_str())
+  })
+  .await??;
   let community_follower_form = CommunityFollowerForm {
     community_id: community.id,
     user_id: user.id,
@@ -149,5 +248,5 @@ async fn handle_undo_follow(
   })
   .await?;
 
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
index e2305b448e5d11fc7838bbb4ee9729c2aae8c301..cb5bd9a7914c1f27c778ae81c3d86c16af42ad18 100644 (file)
@@ -1,12 +1,26 @@
-use activitystreams::base::{BaseExt, Extends};
-use anyhow::Context;
+use crate::{
+  check_is_apub_id_valid,
+  extensions::signatures::verify_signature,
+  fetcher::get_or_fetch_and_upsert_actor,
+  ActorType,
+};
+use activitystreams::{
+  activity::ActorAndObjectRefExt,
+  base::{AsBase, BaseExt, Extends},
+  object::{AsObject, ObjectExt},
+  public,
+};
+use actix_web::HttpRequest;
+use anyhow::{anyhow, Context};
 use lemmy_db::{activity::Activity, DbPool};
 use lemmy_structs::blocking;
 use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::LemmyContext;
 use serde::{export::fmt::Debug, Serialize};
 use url::Url;
 
 pub mod community_inbox;
+mod receive_for_community;
 pub mod shared_inbox;
 pub mod user_inbox;
 
@@ -35,3 +49,65 @@ pub(crate) async fn is_activity_already_known(
     Err(_) => Ok(false),
   }
 }
+
+pub(crate) fn get_activity_to_and_cc<T, Kind>(activity: &T) -> Result<Vec<Url>, LemmyError>
+where
+  T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
+{
+  let mut to_and_cc = vec![];
+  if let Some(to) = activity.to() {
+    let to = to.to_owned().unwrap_to_vec();
+    let mut to = to
+      .iter()
+      .map(|t| t.as_xsd_any_uri())
+      .flatten()
+      .map(|t| t.to_owned())
+      .collect();
+    to_and_cc.append(&mut to);
+  }
+  if let Some(cc) = activity.cc() {
+    let cc = cc.to_owned().unwrap_to_vec();
+    let mut cc = cc
+      .iter()
+      .map(|c| c.as_xsd_any_uri())
+      .flatten()
+      .map(|c| c.to_owned())
+      .collect();
+    to_and_cc.append(&mut cc);
+  }
+  Ok(to_and_cc)
+}
+
+pub(crate) fn is_addressed_to_public<T, Kind>(activity: &T) -> Result<(), LemmyError>
+where
+  T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
+{
+  let to_and_cc = get_activity_to_and_cc(activity)?;
+  if to_and_cc.contains(&public()) {
+    Ok(())
+  } else {
+    Err(anyhow!("Activity is not addressed to public").into())
+  }
+}
+
+pub(crate) async fn inbox_verify_http_signature<T, Kind>(
+  activity: &T,
+  context: &LemmyContext,
+  request: HttpRequest,
+  request_counter: &mut i32,
+) -> Result<Box<dyn ActorType>, LemmyError>
+where
+  T: AsObject<Kind> + ActorAndObjectRefExt + Extends<Kind> + AsBase<Kind>,
+  Kind: Serialize,
+  <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+  let actor_id = activity
+    .actor()?
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+  check_is_apub_id_valid(&actor_id)?;
+  let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
+  verify_signature(&request, actor.as_ref())?;
+  Ok(actor)
+}
diff --git a/lemmy_apub/src/inbox/receive_for_community.rs b/lemmy_apub/src/inbox/receive_for_community.rs
new file mode 100644 (file)
index 0000000..d4cd43d
--- /dev/null
@@ -0,0 +1,345 @@
+use crate::{
+  activities::receive::{
+    comment::{
+      receive_create_comment,
+      receive_delete_comment,
+      receive_dislike_comment,
+      receive_like_comment,
+      receive_remove_comment,
+      receive_update_comment,
+    },
+    comment_undo::{
+      receive_undo_delete_comment,
+      receive_undo_dislike_comment,
+      receive_undo_like_comment,
+      receive_undo_remove_comment,
+    },
+    post::{
+      receive_create_post,
+      receive_delete_post,
+      receive_dislike_post,
+      receive_like_post,
+      receive_remove_post,
+      receive_update_post,
+    },
+    post_undo::{
+      receive_undo_delete_post,
+      receive_undo_dislike_post,
+      receive_undo_like_post,
+      receive_undo_remove_post,
+    },
+    receive_unhandled_activity,
+    verify_activity_domains_valid,
+  },
+  inbox::is_addressed_to_public,
+};
+use activitystreams::{
+  activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
+  base::AnyBase,
+  prelude::*,
+};
+use anyhow::Context;
+use diesel::result::Error::NotFound;
+use lemmy_db::{comment::Comment, post::Post, site::Site, Crud};
+use lemmy_structs::blocking;
+use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::LemmyContext;
+use url::Url;
+
+/// This file is for post/comment activities received by the community, and for post/comment
+///       activities announced by the community and received by the user.
+
+/// A post or comment being created
+pub(in crate::inbox) async fn receive_create_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let create = Create::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&create, &expected_domain, true)?;
+  is_addressed_to_public(&create)?;
+
+  match create.object().as_single_kind_str() {
+    Some("Page") => receive_create_post(create, context, request_counter).await,
+    Some("Note") => receive_create_comment(create, context, request_counter).await,
+    _ => receive_unhandled_activity(create),
+  }
+}
+
+/// A post or comment being edited
+pub(in crate::inbox) async fn receive_update_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let update = Update::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&update, &expected_domain, true)?;
+  is_addressed_to_public(&update)?;
+
+  match update.object().as_single_kind_str() {
+    Some("Page") => receive_update_post(update, context, request_counter).await,
+    Some("Note") => receive_update_comment(update, context, request_counter).await,
+    _ => receive_unhandled_activity(update),
+  }
+}
+
+/// A post or comment being upvoted
+pub(in crate::inbox) async fn receive_like_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let like = Like::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&like, &expected_domain, false)?;
+  is_addressed_to_public(&like)?;
+
+  match like.object().as_single_kind_str() {
+    Some("Page") => receive_like_post(like, context, request_counter).await,
+    Some("Note") => receive_like_comment(like, context, request_counter).await,
+    _ => receive_unhandled_activity(like),
+  }
+}
+
+/// A post or comment being downvoted
+pub(in crate::inbox) async fn receive_dislike_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let enable_downvotes = blocking(context.pool(), move |conn| {
+    Site::read(conn, 1).map(|s| s.enable_downvotes)
+  })
+  .await??;
+  if !enable_downvotes {
+    return Ok(());
+  }
+
+  let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&dislike, &expected_domain, false)?;
+  is_addressed_to_public(&dislike)?;
+
+  match dislike.object().as_single_kind_str() {
+    Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
+    Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
+    _ => receive_unhandled_activity(dislike),
+  }
+}
+
+/// A post or comment being deleted by its creator
+pub(in crate::inbox) async fn receive_delete_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  dbg!("receive_delete_for_community");
+  let delete = Delete::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&delete, &expected_domain, true)?;
+  is_addressed_to_public(&delete)?;
+
+  let object = delete
+    .object()
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+
+  match find_post_or_comment_by_id(context, object).await {
+    Ok(PostOrComment::Post(p)) => receive_delete_post(context, p).await,
+    Ok(PostOrComment::Comment(c)) => receive_delete_comment(context, c).await,
+    // if we dont have the object, no need to do anything
+    Err(_) => Ok(()),
+  }
+}
+
+/// A post or comment being removed by a mod/admin
+pub(in crate::inbox) async fn receive_remove_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  dbg!("receive_remove_for_community");
+  let remove = Remove::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&remove, &expected_domain, false)?;
+  is_addressed_to_public(&remove)?;
+
+  let cc = remove
+    .cc()
+    .map(|c| c.as_many())
+    .flatten()
+    .context(location_info!())?;
+  let community_id = cc
+    .first()
+    .map(|c| c.as_xsd_any_uri())
+    .flatten()
+    .context(location_info!())?;
+
+  let object = remove
+    .object()
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+
+  // Ensure that remove activity comes from the same domain as the community
+  remove.id(community_id.domain().context(location_info!())?)?;
+
+  match find_post_or_comment_by_id(context, object).await {
+    Ok(PostOrComment::Post(p)) => receive_remove_post(context, remove, p).await,
+    Ok(PostOrComment::Comment(c)) => receive_remove_comment(context, remove, c).await,
+    // if we dont have the object, no need to do anything
+    Err(_) => Ok(()),
+  }
+}
+
+/// A post/comment action being reverted (either a delete, remove, upvote or downvote)
+pub(in crate::inbox) async fn receive_undo_for_community(
+  context: &LemmyContext,
+  activity: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let undo = Undo::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&undo, &expected_domain.to_owned(), true)?;
+  is_addressed_to_public(&undo)?;
+
+  match undo.object().as_single_kind_str() {
+    Some("Delete") => receive_undo_delete_for_community(context, undo, expected_domain).await,
+    Some("Remove") => receive_undo_remove_for_community(context, undo, expected_domain).await,
+    Some("Like") => {
+      receive_undo_like_for_community(context, undo, expected_domain, request_counter).await
+    }
+    Some("Dislike") => {
+      receive_undo_dislike_for_community(context, undo, expected_domain, request_counter).await
+    }
+    _ => receive_unhandled_activity(undo),
+  }
+}
+
+/// A post or comment deletion being reverted
+pub(in crate::inbox) async fn receive_undo_delete_for_community(
+  context: &LemmyContext,
+  undo: Undo,
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+    .context(location_info!())?;
+  verify_activity_domains_valid(&delete, &expected_domain, true)?;
+  is_addressed_to_public(&delete)?;
+
+  let object = delete
+    .object()
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+  match find_post_or_comment_by_id(context, object).await {
+    Ok(PostOrComment::Post(p)) => receive_undo_delete_post(context, p).await,
+    Ok(PostOrComment::Comment(c)) => receive_undo_delete_comment(context, c).await,
+    // if we dont have the object, no need to do anything
+    Err(_) => Ok(()),
+  }
+}
+
+/// A post or comment removal being reverted
+pub(in crate::inbox) async fn receive_undo_remove_for_community(
+  context: &LemmyContext,
+  undo: Undo,
+  expected_domain: &Url,
+) -> Result<(), LemmyError> {
+  let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+    .context(location_info!())?;
+  verify_activity_domains_valid(&remove, &expected_domain, false)?;
+  is_addressed_to_public(&remove)?;
+
+  let object = remove
+    .object()
+    .to_owned()
+    .single_xsd_any_uri()
+    .context(location_info!())?;
+  match find_post_or_comment_by_id(context, object).await {
+    Ok(PostOrComment::Post(p)) => receive_undo_remove_post(context, p).await,
+    Ok(PostOrComment::Comment(c)) => receive_undo_remove_comment(context, c).await,
+    // if we dont have the object, no need to do anything
+    Err(_) => Ok(()),
+  }
+}
+
+/// A post or comment upvote being reverted
+pub(in crate::inbox) async fn receive_undo_like_for_community(
+  context: &LemmyContext,
+  undo: Undo,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+    .context(location_info!())?;
+  verify_activity_domains_valid(&like, &expected_domain, false)?;
+  is_addressed_to_public(&like)?;
+
+  let type_ = like
+    .object()
+    .as_single_kind_str()
+    .context(location_info!())?;
+  match type_ {
+    "Note" => receive_undo_like_comment(&like, context, request_counter).await,
+    "Page" => receive_undo_like_post(&like, context, request_counter).await,
+    _ => receive_unhandled_activity(like),
+  }
+}
+
+/// A post or comment downvote being reverted
+pub(in crate::inbox) async fn receive_undo_dislike_for_community(
+  context: &LemmyContext,
+  undo: Undo,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+    .context(location_info!())?;
+  verify_activity_domains_valid(&dislike, &expected_domain, false)?;
+  is_addressed_to_public(&dislike)?;
+
+  let type_ = dislike
+    .object()
+    .as_single_kind_str()
+    .context(location_info!())?;
+  match type_ {
+    "Note" => receive_undo_dislike_comment(&dislike, context, request_counter).await,
+    "Page" => receive_undo_dislike_post(&dislike, context, request_counter).await,
+    _ => receive_unhandled_activity(dislike),
+  }
+}
+
+enum PostOrComment {
+  Comment(Comment),
+  Post(Post),
+}
+
+/// Tries to find a post or comment in the local database, without any network requests.
+/// This is used to handle deletions and removals, because in case we dont have the object, we can
+/// simply ignore the activity.
+async fn find_post_or_comment_by_id(
+  context: &LemmyContext,
+  apub_id: Url,
+) -> Result<PostOrComment, LemmyError> {
+  let ap_id = apub_id.to_string();
+  let post = blocking(context.pool(), move |conn| {
+    Post::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(p) = post {
+    return Ok(PostOrComment::Post(p));
+  }
+
+  let ap_id = apub_id.to_string();
+  let comment = blocking(context.pool(), move |conn| {
+    Comment::read_from_apub_id(conn, &ap_id)
+  })
+  .await?;
+  if let Ok(c) = comment {
+    return Ok(PostOrComment::Comment(c));
+  }
+
+  return Err(NotFound.into());
+}
index 3b07400d6119f6e276150318c5c29e2663db4ebd..ab0a5f1891cebff3d1e414c2087d2910a95e53f7 100644 (file)
@@ -1,63 +1,21 @@
 use crate::{
-  activities::receive::{
-    comment::{
-      receive_create_comment,
-      receive_delete_comment,
-      receive_dislike_comment,
-      receive_like_comment,
-      receive_remove_comment,
-      receive_update_comment,
-    },
-    comment_undo::{
-      receive_undo_delete_comment,
-      receive_undo_dislike_comment,
-      receive_undo_like_comment,
-      receive_undo_remove_comment,
-    },
-    community::{
-      receive_delete_community,
-      receive_remove_community,
-      receive_undo_delete_community,
-      receive_undo_remove_community,
-    },
-    find_by_id,
-    post::{
-      receive_create_post,
-      receive_delete_post,
-      receive_dislike_post,
-      receive_like_post,
-      receive_remove_post,
-      receive_update_post,
-    },
-    post_undo::{
-      receive_undo_delete_post,
-      receive_undo_dislike_post,
-      receive_undo_like_post,
-      receive_undo_remove_post,
-    },
-    receive_unhandled_activity,
-    verify_activity_domains_valid,
-    FindResults,
+  inbox::{
+    community_inbox::{community_receive_message, CommunityAcceptedActivities},
+    get_activity_id,
+    get_activity_to_and_cc,
+    inbox_verify_http_signature,
+    is_activity_already_known,
+    user_inbox::{user_receive_message, UserAcceptedActivities},
   },
-  check_is_apub_id_valid,
-  extensions::signatures::verify_signature,
-  fetcher::get_or_fetch_and_upsert_actor,
-  inbox::{get_activity_id, is_activity_already_known},
   insert_activity,
-  ActorType,
-};
-use activitystreams::{
-  activity::{ActorAndObject, Announce, Create, Delete, Dislike, Like, Remove, Undo, Update},
-  base::AnyBase,
-  prelude::*,
 };
+use activitystreams::{activity::ActorAndObject, prelude::*};
 use actix_web::{web, HttpRequest, HttpResponse};
-use anyhow::{anyhow, Context};
-use lemmy_db::{site::Site, Crud};
+use anyhow::Context;
+use lemmy_db::{community::Community, user::User_, DbPool};
 use lemmy_structs::blocking;
 use lemmy_utils::{location_info, LemmyError};
 use lemmy_websocket::LemmyContext;
-use log::debug;
 use serde::{Deserialize, Serialize};
 use std::fmt::Debug;
 use url::Url;
@@ -77,7 +35,7 @@ pub enum ValidTypes {
 }
 
 // TODO: this isnt entirely correct, cause some of these receive are not ActorAndObject,
-//       but it might still work due to the anybase conversion
+//       but it still works due to the anybase conversion
 pub type AcceptedActivities = ActorAndObject<ValidTypes>;
 
 /// Handler for all incoming requests to shared inbox.
@@ -87,332 +45,141 @@ pub async fn shared_inbox(
   context: web::Data<LemmyContext>,
 ) -> Result<HttpResponse, LemmyError> {
   let activity = input.into_inner();
-
-  let actor_id = activity
-    .actor()?
-    .to_owned()
-    .single_xsd_any_uri()
-    .context(location_info!())?;
-  debug!(
-    "Shared inbox received activity {:?} from {}",
-    &activity.id_unchecked(),
-    &actor_id
-  );
-
-  check_is_apub_id_valid(&actor_id)?;
-
+  // First of all check the http signature
   let request_counter = &mut 0;
-  let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
-  verify_signature(&request, actor.as_ref())?;
+  let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
 
+  // Do nothing if we received the same activity before
+  let actor_id = actor.actor_id()?;
   let activity_id = get_activity_id(&activity, &actor_id)?;
   if is_activity_already_known(context.pool(), &activity_id).await? {
     return Ok(HttpResponse::Ok().finish());
   }
 
-  let any_base = activity.clone().into_any_base()?;
-  let kind = activity.kind().context(location_info!())?;
-  let res = match kind {
-    ValidTypes::Announce => {
-      receive_announce(&context, any_base, actor.as_ref(), request_counter).await
-    }
-    ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await,
-    ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await,
-    ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await,
-    ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await,
-    ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
-    ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await,
-    ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
-  };
-
+  // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
+  // if we receive the same activity twice in very quick succession.
   insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
-  res
-}
-
-/// Takes an announce and passes the inner activity to the appropriate handler.
-async fn receive_announce(
-  context: &LemmyContext,
-  activity: AnyBase,
-  actor: &dyn ActorType,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let announce = Announce::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
-
-  let kind = announce.object().as_single_kind_str();
-  let object = announce
-    .object()
-    .to_owned()
-    .one()
-    .context(location_info!())?;
-
-  let inner_id = object.id().context(location_info!())?.to_owned();
-  check_is_apub_id_valid(&inner_id)?;
-  if is_activity_already_known(context.pool(), &inner_id).await? {
-    return Ok(HttpResponse::Ok().finish());
-  }
-
-  match kind {
-    Some("Create") => receive_create(context, object, inner_id, request_counter).await,
-    Some("Update") => receive_update(context, object, inner_id, request_counter).await,
-    Some("Like") => receive_like(context, object, inner_id, request_counter).await,
-    Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await,
-    Some("Delete") => receive_delete(context, object, inner_id, request_counter).await,
-    Some("Remove") => receive_remove(context, object, inner_id).await,
-    Some("Undo") => receive_undo(context, object, inner_id, request_counter).await,
-    _ => receive_unhandled_activity(announce),
-  }
-}
 
-async fn receive_create(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let create = Create::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&create, expected_domain, true)?;
-
-  match create.object().as_single_kind_str() {
-    Some("Page") => receive_create_post(create, context, request_counter).await,
-    Some("Note") => receive_create_comment(create, context, request_counter).await,
-    _ => receive_unhandled_activity(create),
-  }
-}
-
-async fn receive_update(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let update = Update::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&update, expected_domain, true)?;
-
-  match update.object().as_single_kind_str() {
-    Some("Page") => receive_update_post(update, context, request_counter).await,
-    Some("Note") => receive_update_comment(update, context, request_counter).await,
-    _ => receive_unhandled_activity(update),
+  let activity_any_base = activity.clone().into_any_base()?;
+  let mut res: Option<HttpResponse> = None;
+  let to_and_cc = get_activity_to_and_cc(&activity)?;
+  // If to_and_cc contains a local community, pass to receive_community_message()
+  // Handle community first, so in case the sender is banned by the community, it will error out.
+  // If we handled the user receive first, the activity would be inserted to the database before the
+  // community could check for bans.
+  let community = extract_local_community_from_destinations(&to_and_cc, context.pool()).await?;
+  if let Some(community) = community {
+    let community_activity = CommunityAcceptedActivities::from_any_base(activity_any_base.clone())?
+      .context(location_info!())?;
+    res = Some(
+      community_receive_message(
+        community_activity,
+        community,
+        actor.as_ref(),
+        &context,
+        request_counter,
+      )
+      .await?,
+    );
   }
-}
 
-async fn receive_like(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let like = Like::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&like, expected_domain, false)?;
-
-  match like.object().as_single_kind_str() {
-    Some("Page") => receive_like_post(like, context, request_counter).await,
-    Some("Note") => receive_like_comment(like, context, request_counter).await,
-    _ => receive_unhandled_activity(like),
+  // If to_and_cc contains a local user, pass to receive_user_message()
+  if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
+    let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())?
+      .context(location_info!())?;
+    // `to_user` is only used for follow activities (which we dont receive here), so no need to pass
+    // it in
+    user_receive_message(
+      user_activity,
+      None,
+      actor.as_ref(),
+      &context,
+      request_counter,
+    )
+    .await?;
   }
-}
 
-async fn receive_dislike(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let enable_downvotes = blocking(context.pool(), move |conn| {
-    Site::read(conn, 1).map(|s| s.enable_downvotes)
-  })
-  .await??;
-  if !enable_downvotes {
-    return Ok(HttpResponse::Ok().finish());
+  // If to_and_cc contains followers collection of a community, pass to receive_user_message()
+  if is_addressed_to_community_followers(&to_and_cc, context.pool()).await? {
+    let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())?
+      .context(location_info!())?;
+    res = Some(
+      user_receive_message(
+        user_activity,
+        None,
+        actor.as_ref(),
+        &context,
+        request_counter,
+      )
+      .await?,
+    );
   }
 
-  let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&dislike, expected_domain, false)?;
-
-  match dislike.object().as_single_kind_str() {
-    Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
-    Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
-    _ => receive_unhandled_activity(dislike),
+  // If none of those, throw an error
+  if let Some(r) = res {
+    Ok(r)
+  } else {
+    Ok(HttpResponse::NotImplemented().finish())
   }
 }
 
-pub async fn receive_delete(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let delete = Delete::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&delete, expected_domain, true)?;
-
-  let object = delete
-    .object()
-    .to_owned()
-    .single_xsd_any_uri()
-    .context(location_info!())?;
-
-  match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await,
-    Ok(FindResults::Comment(c)) => {
-      receive_delete_comment(context, delete, c, request_counter).await
-    }
-    Ok(FindResults::Community(c)) => {
-      receive_delete_community(context, delete, c, request_counter).await
+/// If `to_and_cc` contains the ID of a local community, return that community, otherwise return
+/// None.
+///
+/// This doesnt handle the case where an activity is addressed to multiple communities (because
+/// Lemmy doesnt generate such activities).
+async fn extract_local_community_from_destinations(
+  to_and_cc: &[Url],
+  pool: &DbPool,
+) -> Result<Option<Community>, LemmyError> {
+  for url in to_and_cc {
+    let url = url.to_string();
+    let community = blocking(&pool, move |conn| {
+      Community::read_from_actor_id(&conn, &url)
+    })
+    .await?;
+    if let Ok(c) = community {
+      if c.local {
+        return Ok(Some(c));
+      }
     }
-    // if we dont have the object, no need to do anything
-    Err(_) => Ok(HttpResponse::Ok().finish()),
-  }
-}
-
-async fn receive_remove(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-) -> Result<HttpResponse, LemmyError> {
-  let remove = Remove::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&remove, expected_domain, false)?;
-
-  let cc = remove
-    .cc()
-    .map(|c| c.as_many())
-    .flatten()
-    .context(location_info!())?;
-  let community_id = cc
-    .first()
-    .map(|c| c.as_xsd_any_uri())
-    .flatten()
-    .context(location_info!())?;
-
-  let object = remove
-    .object()
-    .to_owned()
-    .single_xsd_any_uri()
-    .context(location_info!())?;
-
-  // Ensure that remove activity comes from the same domain as the community
-  remove.id(community_id.domain().context(location_info!())?)?;
-
-  match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_remove_post(context, remove, p).await,
-    Ok(FindResults::Comment(c)) => receive_remove_comment(context, remove, c).await,
-    Ok(FindResults::Community(c)) => receive_remove_community(context, remove, c).await,
-    // if we dont have the object, no need to do anything
-    Err(_) => Ok(HttpResponse::Ok().finish()),
-  }
-}
-
-async fn receive_undo(
-  context: &LemmyContext,
-  activity: AnyBase,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let undo = Undo::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
-
-  match undo.object().as_single_kind_str() {
-    Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await,
-    Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await,
-    Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await,
-    Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await,
-    _ => receive_unhandled_activity(undo),
   }
+  Ok(None)
 }
 
-async fn receive_undo_delete(
-  context: &LemmyContext,
-  undo: Undo,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
-    .context(location_info!())?;
-  verify_activity_domains_valid(&delete, expected_domain, true)?;
-
-  let object = delete
-    .object()
-    .to_owned()
-    .single_xsd_any_uri()
-    .context(location_info!())?;
-  match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await,
-    Ok(FindResults::Comment(c)) => {
-      receive_undo_delete_comment(context, undo, c, request_counter).await
-    }
-    Ok(FindResults::Community(c)) => {
-      receive_undo_delete_community(context, undo, c, request_counter).await
+/// Returns true if `to_and_cc` contains at least one local user.
+async fn is_addressed_to_local_user(to_and_cc: &[Url], pool: &DbPool) -> Result<bool, LemmyError> {
+  for url in to_and_cc {
+    let url = url.to_string();
+    let user = blocking(&pool, move |conn| User_::read_from_actor_id(&conn, &url)).await?;
+    if let Ok(u) = user {
+      if u.local {
+        return Ok(true);
+      }
     }
-    // if we dont have the object, no need to do anything
-    Err(_) => Ok(HttpResponse::Ok().finish()),
   }
+  Ok(false)
 }
 
-async fn receive_undo_remove(
-  context: &LemmyContext,
-  undo: Undo,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
-    .context(location_info!())?;
-  verify_activity_domains_valid(&remove, expected_domain, false)?;
-
-  let object = remove
-    .object()
-    .to_owned()
-    .single_xsd_any_uri()
-    .context(location_info!())?;
-  match find_by_id(context, object).await {
-    Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await,
-    Ok(FindResults::Comment(c)) => {
-      receive_undo_remove_comment(context, undo, c, request_counter).await
+/// Returns true if `to_and_cc` contains at least one followers collection of a remote community
+/// (like `https://example.com/c/main/followers`)
+async fn is_addressed_to_community_followers(
+  to_and_cc: &[Url],
+  pool: &DbPool,
+) -> Result<bool, LemmyError> {
+  for url in to_and_cc {
+    let url = url.to_string();
+    // TODO: extremely hacky, we should just store the followers url for each community in the db
+    if url.ends_with("/followers") {
+      let community_url = url.replace("/followers", "");
+      let community = blocking(&pool, move |conn| {
+        Community::read_from_actor_id(&conn, &community_url)
+      })
+      .await??;
+      if !community.local {
+        return Ok(true);
+      }
     }
-    Ok(FindResults::Community(c)) => {
-      receive_undo_remove_community(context, undo, c, request_counter).await
-    }
-    // if we dont have the object, no need to do anything
-    Err(_) => Ok(HttpResponse::Ok().finish()),
-  }
-}
-
-async fn receive_undo_like(
-  context: &LemmyContext,
-  undo: Undo,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
-    .context(location_info!())?;
-  verify_activity_domains_valid(&like, expected_domain, false)?;
-
-  let type_ = like
-    .object()
-    .as_single_kind_str()
-    .context(location_info!())?;
-  match type_ {
-    "Note" => receive_undo_like_comment(undo, &like, context, request_counter).await,
-    "Page" => receive_undo_like_post(undo, &like, context, request_counter).await,
-    d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
-  }
-}
-
-async fn receive_undo_dislike(
-  context: &LemmyContext,
-  undo: Undo,
-  expected_domain: Url,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
-    .context(location_info!())?;
-  verify_activity_domains_valid(&dislike, expected_domain, false)?;
-
-  let type_ = dislike
-    .object()
-    .as_single_kind_str()
-    .context(location_info!())?;
-  match type_ {
-    "Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await,
-    "Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await,
-    d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
   }
+  Ok(false)
 }
index 45fa5ee7338eb0ddc854e4aae8a5ac14177475ec..9173fec42167bfe3f690ff966a85db15c6b1c68f 100644 (file)
 use crate::{
-  activities::receive::verify_activity_domains_valid,
+  activities::receive::{
+    community::{
+      receive_delete_community,
+      receive_remove_community,
+      receive_undo_delete_community,
+      receive_undo_remove_community,
+    },
+    private_message::{
+      receive_create_private_message,
+      receive_delete_private_message,
+      receive_undo_delete_private_message,
+      receive_update_private_message,
+    },
+    receive_unhandled_activity,
+    verify_activity_domains_valid,
+  },
   check_is_apub_id_valid,
-  extensions::signatures::verify_signature,
-  fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
-  inbox::{get_activity_id, is_activity_already_known},
+  fetcher::get_or_fetch_and_upsert_community,
+  inbox::{
+    get_activity_id,
+    get_activity_to_and_cc,
+    inbox_verify_http_signature,
+    is_activity_already_known,
+    is_addressed_to_public,
+    receive_for_community::{
+      receive_create_for_community,
+      receive_delete_for_community,
+      receive_dislike_for_community,
+      receive_like_for_community,
+      receive_remove_for_community,
+      receive_undo_for_community,
+      receive_update_for_community,
+    },
+  },
   insert_activity,
   ActorType,
-  FromApub,
 };
 use activitystreams::{
-  activity::{Accept, ActorAndObject, Create, Delete, Follow, Undo, Update},
+  activity::{Accept, ActorAndObject, Announce, Delete, Follow, Undo},
   base::AnyBase,
-  object::Note,
   prelude::*,
 };
 use actix_web::{web, HttpRequest, HttpResponse};
 use anyhow::{anyhow, Context};
+use diesel::NotFound;
 use lemmy_db::{
-  community::{CommunityFollower, CommunityFollowerForm},
-  private_message::{PrivateMessage, PrivateMessageForm},
-  private_message_view::PrivateMessageView,
+  community::{Community, CommunityFollower, CommunityFollowerForm},
+  private_message::PrivateMessage,
   user::User_,
-  Crud,
   Followable,
 };
-use lemmy_structs::{blocking, user::PrivateMessageResponse};
+use lemmy_structs::blocking;
 use lemmy_utils::{location_info, LemmyError};
-use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
+use lemmy_websocket::LemmyContext;
 use log::debug;
 use serde::{Deserialize, Serialize};
 use std::fmt::Debug;
+use url::Url;
 
 /// Allowed activities for user inbox.
 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
 #[serde(rename_all = "PascalCase")]
-pub enum ValidTypes {
-  Accept,
-  Create,
-  Update,
-  Delete,
-  Undo,
+pub enum UserValidTypes {
+  Accept,   // community accepted our follow request
+  Create,   // create private message
+  Update,   // edit private message
+  Delete,   // private message or community deleted by creator
+  Undo,     // private message or community restored
+  Remove,   // community removed by admin
+  Announce, // post, comment or vote in community
 }
 
-pub type AcceptedActivities = ActorAndObject<ValidTypes>;
+pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
 
 /// Handler for all incoming activities to user inboxes.
 pub async fn user_inbox(
   request: HttpRequest,
-  input: web::Json<AcceptedActivities>,
+  input: web::Json<UserAcceptedActivities>,
   path: web::Path<String>,
   context: web::Data<LemmyContext>,
 ) -> Result<HttpResponse, LemmyError> {
   let activity = input.into_inner();
+  // First of all check the http signature
+  let request_counter = &mut 0;
+  let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
+
+  // Do nothing if we received the same activity before
+  let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
+  if is_activity_already_known(context.pool(), &activity_id).await? {
+    return Ok(HttpResponse::Ok().finish());
+  }
+
+  // Check if the activity is actually meant for us
   let username = path.into_inner();
   let user = blocking(&context.pool(), move |conn| {
     User_::read_from_name(&conn, &username)
   })
   .await??;
-
-  let to = activity
-    .to()
-    .context(location_info!())?
-    .to_owned()
-    .single_xsd_any_uri();
-  if Some(user.actor_id()?) != to {
+  let to_and_cc = get_activity_to_and_cc(&activity)?;
+  if !to_and_cc.contains(&&user.actor_id()?) {
     return Err(anyhow!("Activity delivered to wrong user").into());
   }
 
-  let actor_uri = activity
-    .actor()?
-    .as_single_xsd_any_uri()
-    .context(location_info!())?;
+  insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
+
   debug!(
-    "User {} inbox received activity {:?} from {}",
+    "User {} received activity {:?} from {}",
     user.name,
     &activity.id_unchecked(),
-    &actor_uri
+    &actor.actor_id_str()
   );
 
-  check_is_apub_id_valid(actor_uri)?;
+  user_receive_message(
+    activity.clone(),
+    Some(user.clone()),
+    actor.as_ref(),
+    &context,
+    request_counter,
+  )
+  .await
+}
 
-  let request_counter = &mut 0;
-  let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
-  verify_signature(&request, actor.as_ref())?;
+/// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
+pub(crate) async fn user_receive_message(
+  activity: UserAcceptedActivities,
+  to_user: Option<User_>,
+  actor: &dyn ActorType,
+  context: &LemmyContext,
+  request_counter: &mut i32,
+) -> Result<HttpResponse, LemmyError> {
+  // TODO: must be addressed to one or more local users, or to followers of a remote community
 
-  let activity_id = get_activity_id(&activity, actor_uri)?;
-  if is_activity_already_known(context.pool(), &activity_id).await? {
-    return Ok(HttpResponse::Ok().finish());
-  }
+  // TODO: if it is addressed to community followers, check that at least one local user is following it
 
   let any_base = activity.clone().into_any_base()?;
   let kind = activity.kind().context(location_info!())?;
-  let res = match kind {
-    ValidTypes::Accept => {
-      receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
+  let actor_url = actor.actor_id()?;
+  match kind {
+    UserValidTypes::Accept => {
+      receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
     }
-    ValidTypes::Create => {
-      receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
+    UserValidTypes::Announce => {
+      receive_announce(&context, any_base, actor, request_counter).await?
     }
-    ValidTypes::Update => {
-      receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
+    UserValidTypes::Create => {
+      receive_create_private_message(&context, any_base, actor_url, request_counter).await?
     }
-    ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
-    ValidTypes::Undo => {
-      receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
+    UserValidTypes::Update => {
+      receive_update_private_message(&context, any_base, actor_url, request_counter).await?
     }
+    UserValidTypes::Delete => {
+      receive_delete(context, any_base, &actor_url, request_counter).await?
+    }
+    UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
+    UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
   };
 
-  insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
-  res
+  // TODO: would be logical to move websocket notification code here
+
+  Ok(HttpResponse::Ok().finish())
 }
 
 /// Handle accepted follows.
@@ -118,15 +168,15 @@ async fn receive_accept(
   actor: &dyn ActorType,
   user: User_,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
   let accept = Accept::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
+  verify_activity_domains_valid(&accept, &actor.actor_id()?, false)?;
 
   // TODO: we should check that we actually sent this activity, because the remote instance
   //       could just put a fake Follow
   let object = accept.object().to_owned().one().context(location_info!())?;
   let follow = Follow::from_any_base(object)?.context(location_info!())?;
-  verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
+  verify_activity_domains_valid(&follow, &user.actor_id()?, false)?;
 
   let community_uri = accept
     .actor()?
@@ -149,186 +199,137 @@ async fn receive_accept(
   })
   .await?;
 
-  Ok(HttpResponse::Ok().finish())
+  Ok(())
 }
 
-async fn receive_create_private_message(
+/// Takes an announce and passes the inner activity to the appropriate handler.
+async fn receive_announce(
   context: &LemmyContext,
   activity: AnyBase,
   actor: &dyn ActorType,
   request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let create = Create::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
-
-  let note = Note::from_any_base(
-    create
-      .object()
-      .as_one()
-      .context(location_info!())?
-      .to_owned(),
-  )?
-  .context(location_info!())?;
-
-  let private_message =
-    PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
-
-  let inserted_private_message = blocking(&context.pool(), move |conn| {
-    PrivateMessage::create(conn, &private_message)
-  })
-  .await??;
-
-  let message = blocking(&context.pool(), move |conn| {
-    PrivateMessageView::read(conn, inserted_private_message.id)
-  })
-  .await??;
-
-  let res = PrivateMessageResponse { message };
-
-  let recipient_id = res.message.recipient_id;
-
-  context.chat_server().do_send(SendUserRoomMessage {
-    op: UserOperation::CreatePrivateMessage,
-    response: res,
-    recipient_id,
-    websocket_id: None,
-  });
-
-  Ok(HttpResponse::Ok().finish())
-}
-
-async fn receive_update_private_message(
-  context: &LemmyContext,
-  activity: AnyBase,
-  actor: &dyn ActorType,
-  request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
-  let update = Update::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
+) -> Result<(), LemmyError> {
+  let announce = Announce::from_any_base(activity)?.context(location_info!())?;
+  verify_activity_domains_valid(&announce, &actor.actor_id()?, false)?;
+  is_addressed_to_public(&announce)?;
 
-  let object = update
+  let kind = announce.object().as_single_kind_str();
+  let inner_activity = announce
     .object()
-    .as_one()
-    .context(location_info!())?
-    .to_owned();
-  let note = Note::from_any_base(object)?.context(location_info!())?;
-
-  let private_message_form =
-    PrivateMessageForm::from_apub(&note, context, Some(actor.actor_id()?), request_counter).await?;
-
-  let private_message_ap_id = private_message_form
-    .ap_id
-    .as_ref()
-    .context(location_info!())?
-    .clone();
-  let private_message = blocking(&context.pool(), move |conn| {
-    PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
-  })
-  .await??;
-
-  let private_message_id = private_message.id;
-  blocking(&context.pool(), move |conn| {
-    PrivateMessage::update(conn, private_message_id, &private_message_form)
-  })
-  .await??;
-
-  let private_message_id = private_message.id;
-  let message = blocking(&context.pool(), move |conn| {
-    PrivateMessageView::read(conn, private_message_id)
-  })
-  .await??;
-
-  let res = PrivateMessageResponse { message };
-
-  let recipient_id = res.message.recipient_id;
+    .to_owned()
+    .one()
+    .context(location_info!())?;
 
-  context.chat_server().do_send(SendUserRoomMessage {
-    op: UserOperation::EditPrivateMessage,
-    response: res,
-    recipient_id,
-    websocket_id: None,
-  });
+  let inner_id = inner_activity.id().context(location_info!())?.to_owned();
+  check_is_apub_id_valid(&inner_id)?;
+  if is_activity_already_known(context.pool(), &inner_id).await? {
+    return Ok(());
+  }
 
-  Ok(HttpResponse::Ok().finish())
+  dbg!(&kind);
+  match kind {
+    Some("Create") => {
+      receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
+    }
+    Some("Update") => {
+      receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
+    }
+    Some("Like") => {
+      receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
+    }
+    Some("Dislike") => {
+      receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
+    }
+    Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await,
+    Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await,
+    Some("Undo") => {
+      receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
+    }
+    _ => receive_unhandled_activity(inner_activity),
+  }
 }
 
-async fn receive_delete_private_message(
+async fn receive_delete(
   context: &LemmyContext,
-  activity: AnyBase,
-  actor: &dyn ActorType,
-) -> Result<HttpResponse, LemmyError> {
-  let delete = Delete::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
+  any_base: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  use CommunityOrPrivateMessage::*;
 
-  let private_message_id = delete
+  let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
+  verify_activity_domains_valid(&delete, expected_domain, true)?;
+  let object_uri = delete
     .object()
     .to_owned()
     .single_xsd_any_uri()
     .context(location_info!())?;
-  let private_message = blocking(context.pool(), move |conn| {
-    PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
-  })
-  .await??;
-  let deleted_private_message = blocking(context.pool(), move |conn| {
-    PrivateMessage::update_deleted(conn, private_message.id, true)
-  })
-  .await??;
-
-  let message = blocking(&context.pool(), move |conn| {
-    PrivateMessageView::read(&conn, deleted_private_message.id)
-  })
-  .await??;
-
-  let res = PrivateMessageResponse { message };
-  let recipient_id = res.message.recipient_id;
-  context.chat_server().do_send(SendUserRoomMessage {
-    op: UserOperation::EditPrivateMessage,
-    response: res,
-    recipient_id,
-    websocket_id: None,
-  });
 
-  Ok(HttpResponse::Ok().finish())
+  match find_community_or_private_message_by_id(context, object_uri).await? {
+    Community(c) => receive_delete_community(context, c).await,
+    PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
+  }
 }
 
-async fn receive_undo_delete_private_message(
+async fn receive_undo(
   context: &LemmyContext,
-  activity: AnyBase,
-  actor: &dyn ActorType,
-) -> Result<HttpResponse, LemmyError> {
-  let undo = Undo::from_any_base(activity)?.context(location_info!())?;
-  verify_activity_domains_valid(&undo, actor.actor_id()?, true)?;
-  let object = undo.object().to_owned().one().context(location_info!())?;
-  let delete = Delete::from_any_base(object)?.context(location_info!())?;
-  verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
+  any_base: AnyBase,
+  expected_domain: &Url,
+  request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+  use CommunityOrPrivateMessage::*;
+  let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
+  verify_activity_domains_valid(&undo, expected_domain, true)?;
+
+  let inner_activity = undo.object().to_owned().one().context(location_info!())?;
+  let kind = inner_activity.kind_str();
+  match kind {
+    Some("Delete") => {
+      let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
+      verify_activity_domains_valid(&delete, expected_domain, true)?;
+      let object_uri = delete
+        .object()
+        .to_owned()
+        .single_xsd_any_uri()
+        .context(location_info!())?;
+      match find_community_or_private_message_by_id(context, object_uri).await? {
+        Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
+        PrivateMessage(p) => {
+          receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
+            .await
+        }
+      }
+    }
+    Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
+    _ => receive_unhandled_activity(undo),
+  }
+}
+enum CommunityOrPrivateMessage {
+  Community(Community),
+  PrivateMessage(PrivateMessage),
+}
 
-  let private_message_id = delete
-    .object()
-    .to_owned()
-    .single_xsd_any_uri()
-    .context(location_info!())?;
-  let private_message = blocking(context.pool(), move |conn| {
-    PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
-  })
-  .await??;
-  let deleted_private_message = blocking(context.pool(), move |conn| {
-    PrivateMessage::update_deleted(conn, private_message.id, false)
+async fn find_community_or_private_message_by_id(
+  context: &LemmyContext,
+  apub_id: Url,
+) -> Result<CommunityOrPrivateMessage, LemmyError> {
+  let ap_id = apub_id.to_string();
+  let community = blocking(context.pool(), move |conn| {
+    Community::read_from_actor_id(conn, &ap_id)
   })
-  .await??;
+  .await?;
+  if let Ok(c) = community {
+    return Ok(CommunityOrPrivateMessage::Community(c));
+  }
 
-  let message = blocking(&context.pool(), move |conn| {
-    PrivateMessageView::read(&conn, deleted_private_message.id)
+  let ap_id = apub_id.to_string();
+  let private_message = blocking(context.pool(), move |conn| {
+    PrivateMessage::read_from_apub_id(conn, &ap_id)
   })
-  .await??;
-
-  let res = PrivateMessageResponse { message };
-  let recipient_id = res.message.recipient_id;
-  context.chat_server().do_send(SendUserRoomMessage {
-    op: UserOperation::EditPrivateMessage,
-    response: res,
-    recipient_id,
-    websocket_id: None,
-  });
+  .await?;
+  if let Ok(p) = private_message {
+    return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
+  }
 
-  Ok(HttpResponse::Ok().finish())
+  return Err(NotFound.into());
 }