From: Felix Ableitner Date: Wed, 28 Oct 2020 16:14:18 +0000 (+0100) Subject: Separate logic for user and community inbox X-Git-Url: http://these/git/%7B%60%24%7BwebArchiveUrl%7D/%22%7B%7D/%22https:/nerdica.net/static/%7Blink%7D?a=commitdiff_plain;h=b469b6d8d3d2bbeb81708ed80e575b2d62ea8743;p=lemmy.git Separate logic for user and community inbox more refactoring with tons of changes: - inbox functions return LemmyError instead of HttpResponse - announce is done directly in community inbox - reorganized functions for handling inbox activities - additional checks for private messages - moved inbox handler functions for post, comment, vote into separete file - ensure that posts, comments etc are addressed to public (ref #1220) - probably more --- diff --git a/lemmy_apub/src/activities/receive/comment.rs b/lemmy_apub/src/activities/receive/comment.rs index 3fe9c0b2..b60d4c95 100644 --- a/lemmy_apub/src/activities/receive/comment.rs +++ b/lemmy_apub/src/activities/receive/comment.rs @@ -1,15 +1,14 @@ use crate::{ - activities::receive::{announce_if_community_is_local, get_actor_as_user}, + activities::receive::get_actor_as_user, fetcher::get_or_fetch_and_insert_comment, ActorType, FromApub, }; use activitystreams::{ - activity::{ActorAndObjectRefExt, Create, Delete, Dislike, Like, Remove, Update}, + activity::{ActorAndObjectRefExt, Create, Dislike, Like, Remove, Update}, base::ExtendsExt, object::Note, }; -use actix_web::HttpResponse; use anyhow::Context; use lemmy_db::{ comment::{Comment, CommentForm, CommentLike, CommentLikeForm}, @@ -26,7 +25,7 @@ pub(crate) async fn receive_create_comment( create: Create, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(&create, context, request_counter).await?; let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -73,15 +72,14 @@ pub(crate) async fn receive_create_comment( websocket_id: None, }); - announce_if_community_is_local(create, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_update_comment( update: Update, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; let user = get_actor_as_user(&update, context, request_counter).await?; @@ -131,15 +129,14 @@ pub(crate) async fn receive_update_comment( websocket_id: None, }); - announce_if_community_is_local(update, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_like_comment( like: Like, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; let user = get_actor_as_user(&like, context, request_counter).await?; @@ -183,15 +180,14 @@ pub(crate) async fn receive_like_comment( websocket_id: None, }); - announce_if_community_is_local(like, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_dislike_comment( dislike: Dislike, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let note = Note::from_any_base( dislike .object() @@ -241,16 +237,13 @@ pub(crate) async fn receive_dislike_comment( websocket_id: None, }); - announce_if_community_is_local(dislike, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_delete_comment( context: &LemmyContext, - delete: Delete, comment: Comment, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let deleted_comment = blocking(context.pool(), move |conn| { Comment::update_deleted(conn, comment.id, true) }) @@ -276,15 +269,14 @@ pub(crate) async fn receive_delete_comment( websocket_id: None, }); - announce_if_community_is_local(delete, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_remove_comment( context: &LemmyContext, _remove: Remove, comment: Comment, -) -> Result { +) -> Result<(), LemmyError> { let removed_comment = blocking(context.pool(), move |conn| { Comment::update_removed(conn, comment.id, true) }) @@ -310,5 +302,5 @@ pub(crate) async fn receive_remove_comment( websocket_id: None, }); - Ok(HttpResponse::Ok().finish()) + Ok(()) } diff --git a/lemmy_apub/src/activities/receive/comment_undo.rs b/lemmy_apub/src/activities/receive/comment_undo.rs index ec61b111..709e8481 100644 --- a/lemmy_apub/src/activities/receive/comment_undo.rs +++ b/lemmy_apub/src/activities/receive/comment_undo.rs @@ -1,10 +1,9 @@ use crate::{ - activities::receive::{announce_if_community_is_local, get_actor_as_user}, + activities::receive::get_actor_as_user, fetcher::get_or_fetch_and_insert_comment, FromApub, }; use activitystreams::{activity::*, object::Note, prelude::*}; -use actix_web::HttpResponse; use anyhow::Context; use lemmy_db::{ comment::{Comment, CommentForm, CommentLike}, @@ -16,11 +15,10 @@ use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation}; pub(crate) async fn receive_undo_like_comment( - undo: Undo, like: &Like, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(like, context, request_counter).await?; let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -57,16 +55,14 @@ pub(crate) async fn receive_undo_like_comment( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_dislike_comment( - undo: Undo, dislike: &Dislike, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(dislike, context, request_counter).await?; let note = Note::from_any_base( dislike @@ -109,16 +105,13 @@ pub(crate) async fn receive_undo_dislike_comment( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_delete_comment( context: &LemmyContext, - undo: Undo, comment: Comment, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let deleted_comment = blocking(context.pool(), move |conn| { Comment::update_deleted(conn, comment.id, false) }) @@ -145,16 +138,13 @@ pub(crate) async fn receive_undo_delete_comment( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_remove_comment( context: &LemmyContext, - undo: Undo, comment: Comment, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let removed_comment = blocking(context.pool(), move |conn| { Comment::update_removed(conn, comment.id, false) }) @@ -181,6 +171,5 @@ pub(crate) async fn receive_undo_remove_comment( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } diff --git a/lemmy_apub/src/activities/receive/community.rs b/lemmy_apub/src/activities/receive/community.rs index 80aad5e7..ed43b33e 100644 --- a/lemmy_apub/src/activities/receive/community.rs +++ b/lemmy_apub/src/activities/receive/community.rs @@ -1,17 +1,19 @@ -use crate::activities::receive::announce_if_community_is_local; -use activitystreams::activity::{Delete, Remove, Undo}; -use actix_web::HttpResponse; +use crate::{activities::receive::verify_activity_domains_valid, inbox::is_addressed_to_public}; +use activitystreams::{ + activity::{ActorAndObjectRefExt, Delete, Remove, Undo}, + base::{AnyBase, ExtendsExt}, +}; +use anyhow::Context; use lemmy_db::{community::Community, community_view::CommunityView}; use lemmy_structs::{blocking, community::CommunityResponse}; -use lemmy_utils::LemmyError; +use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation}; +use url::Url; pub(crate) async fn receive_delete_community( context: &LemmyContext, - delete: Delete, community: Community, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let deleted_community = blocking(context.pool(), move |conn| { Community::update_deleted(conn, community.id, true) }) @@ -33,15 +35,28 @@ pub(crate) async fn receive_delete_community( websocket_id: None, }); - announce_if_community_is_local(delete, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_remove_community( context: &LemmyContext, - _remove: Remove, - community: Community, -) -> Result { + activity: AnyBase, + expected_domain: &Url, +) -> Result<(), LemmyError> { + let remove = Remove::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&remove, expected_domain, true)?; + is_addressed_to_public(&remove)?; + + let community_uri = remove + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + let community = blocking(context.pool(), move |conn| { + Community::read_from_actor_id(conn, community_uri.as_str()) + }) + .await??; + let removed_community = blocking(context.pool(), move |conn| { Community::update_removed(conn, community.id, true) }) @@ -63,16 +78,21 @@ pub(crate) async fn receive_remove_community( websocket_id: None, }); - // TODO: this should probably also call announce_if_community_is_local() - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_delete_community( context: &LemmyContext, undo: Undo, community: Community, - request_counter: &mut i32, -) -> Result { + expected_domain: &Url, +) -> Result<(), LemmyError> { + is_addressed_to_public(&undo)?; + let inner = undo.object().to_owned().one().context(location_info!())?; + let delete = Delete::from_any_base(inner)?.context(location_info!())?; + verify_activity_domains_valid(&delete, expected_domain, true)?; + is_addressed_to_public(&delete)?; + let deleted_community = blocking(context.pool(), move |conn| { Community::update_deleted(conn, community.id, false) }) @@ -94,16 +114,31 @@ pub(crate) async fn receive_undo_delete_community( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_remove_community( context: &LemmyContext, undo: Undo, - community: Community, - request_counter: &mut i32, -) -> Result { + expected_domain: &Url, +) -> Result<(), LemmyError> { + is_addressed_to_public(&undo)?; + + let inner = undo.object().to_owned().one().context(location_info!())?; + let remove = Remove::from_any_base(inner)?.context(location_info!())?; + verify_activity_domains_valid(&remove, &expected_domain, true)?; + is_addressed_to_public(&remove)?; + + let community_uri = remove + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + let community = blocking(context.pool(), move |conn| { + Community::read_from_actor_id(conn, community_uri.as_str()) + }) + .await??; + let removed_community = blocking(context.pool(), move |conn| { Community::update_removed(conn, community.id, false) }) @@ -126,6 +161,5 @@ pub(crate) async fn receive_undo_remove_community( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } diff --git a/lemmy_apub/src/activities/receive/mod.rs b/lemmy_apub/src/activities/receive/mod.rs index 2756fd14..1f17fe9f 100644 --- a/lemmy_apub/src/activities/receive/mod.rs +++ b/lemmy_apub/src/activities/receive/mod.rs @@ -1,22 +1,14 @@ -use crate::{ - fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user}, - ActorType, -}; +use crate::fetcher::get_or_fetch_and_upsert_user; use activitystreams::{ activity::{ActorAndObjectRef, ActorAndObjectRefExt}, - base::{AsBase, BaseExt, Extends, ExtendsExt}, + base::{AsBase, BaseExt}, error::DomainError, - object::{AsObject, ObjectExt}, }; -use actix_web::HttpResponse; -use anyhow::Context; -use diesel::result::Error::NotFound; -use lemmy_db::{comment::Comment, community::Community, post::Post, user::User_}; -use lemmy_structs::blocking; +use anyhow::{anyhow, Context}; +use lemmy_db::user::User_; use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; use log::debug; -use serde::Serialize; use std::fmt::Debug; use url::Url; @@ -25,46 +17,15 @@ pub(crate) mod comment_undo; pub(crate) mod community; pub(crate) mod post; pub(crate) mod post_undo; +pub(crate) mod private_message; /// Return HTTP 501 for unsupported activities in inbox. -pub(crate) fn receive_unhandled_activity(activity: A) -> Result +pub(crate) fn receive_unhandled_activity(activity: A) -> Result<(), LemmyError> where A: Debug, { debug!("received unhandled activity type: {:?}", activity); - Ok(HttpResponse::NotImplemented().finish()) -} - -/// Reads the destination community from the activity's `cc` field. If this refers to a local -/// community, the activity is announced to all community followers. -async fn announce_if_community_is_local( - activity: T, - context: &LemmyContext, - request_counter: &mut i32, -) -> Result<(), LemmyError> -where - T: AsObject, - T: Extends, - Kind: Serialize, - >::Error: From + Send + Sync + 'static, -{ - let cc = activity.cc().context(location_info!())?; - let cc = cc.as_many().context(location_info!())?; - let community_uri = cc - .first() - .context(location_info!())? - .as_xsd_any_uri() - .context(location_info!())?; - // TODO: we could just read from the local db here (and ignore if the community is not found) - let community = - get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?; - - if community.local { - community - .send_announce(activity.into_any_base()?, context) - .await?; - } - Ok(()) + Err(anyhow!("Activity not supported").into()) } /// Reads the actor field of an activity and returns the corresponding `User_`. @@ -81,49 +42,6 @@ where get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await } -pub(crate) enum FindResults { - Comment(Comment), - Community(Community), - Post(Post), -} - -/// Tries to find a community, post or comment in the local database, without any network requests. -/// This is used to handle deletions and removals, because in case we dont have the object, we can -/// simply ignore the activity. -pub(crate) async fn find_by_id( - context: &LemmyContext, - apub_id: Url, -) -> Result { - let ap_id = apub_id.to_string(); - let community = blocking(context.pool(), move |conn| { - Community::read_from_actor_id(conn, &ap_id) - }) - .await?; - if let Ok(c) = community { - return Ok(FindResults::Community(c)); - } - - let ap_id = apub_id.to_string(); - let post = blocking(context.pool(), move |conn| { - Post::read_from_apub_id(conn, &ap_id) - }) - .await?; - if let Ok(p) = post { - return Ok(FindResults::Post(p)); - } - - let ap_id = apub_id.to_string(); - let comment = blocking(context.pool(), move |conn| { - Comment::read_from_apub_id(conn, &ap_id) - }) - .await?; - if let Ok(c) = comment { - return Ok(FindResults::Comment(c)); - } - - return Err(NotFound.into()); -} - /// Ensure that the ID of an incoming activity comes from the same domain as the actor. Optionally /// also checks the ID of the inner object. /// @@ -131,7 +49,7 @@ pub(crate) async fn find_by_id( /// HTTP signature. pub(crate) fn verify_activity_domains_valid( activity: &T, - actor_id: Url, + actor_id: &Url, object_domain_must_match: bool, ) -> Result<(), LemmyError> where diff --git a/lemmy_apub/src/activities/receive/post.rs b/lemmy_apub/src/activities/receive/post.rs index 0753b838..80044237 100644 --- a/lemmy_apub/src/activities/receive/post.rs +++ b/lemmy_apub/src/activities/receive/post.rs @@ -1,15 +1,14 @@ use crate::{ - activities::receive::{announce_if_community_is_local, get_actor_as_user}, + activities::receive::get_actor_as_user, fetcher::get_or_fetch_and_insert_post, ActorType, FromApub, PageExt, }; use activitystreams::{ - activity::{Create, Delete, Dislike, Like, Remove, Update}, + activity::{Create, Dislike, Like, Remove, Update}, prelude::*, }; -use actix_web::HttpResponse; use anyhow::Context; use lemmy_db::{ post::{Post, PostForm, PostLike, PostLikeForm}, @@ -25,7 +24,7 @@ pub(crate) async fn receive_create_post( create: Create, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(&create, context, request_counter).await?; let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -51,15 +50,14 @@ pub(crate) async fn receive_create_post( websocket_id: None, }); - announce_if_community_is_local(create, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_update_post( update: Update, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(&update, context, request_counter).await?; let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -89,15 +87,14 @@ pub(crate) async fn receive_update_post( websocket_id: None, }); - announce_if_community_is_local(update, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_like_post( like: Like, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(&like, context, request_counter).await?; let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -134,15 +131,14 @@ pub(crate) async fn receive_like_post( websocket_id: None, }); - announce_if_community_is_local(like, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_dislike_post( dislike: Dislike, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(&dislike, context, request_counter).await?; let page = PageExt::from_any_base( dislike @@ -185,16 +181,13 @@ pub(crate) async fn receive_dislike_post( websocket_id: None, }); - announce_if_community_is_local(dislike, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_delete_post( context: &LemmyContext, - delete: Delete, post: Post, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let deleted_post = blocking(context.pool(), move |conn| { Post::update_deleted(conn, post.id, true) }) @@ -214,15 +207,14 @@ pub(crate) async fn receive_delete_post( websocket_id: None, }); - announce_if_community_is_local(delete, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_remove_post( context: &LemmyContext, _remove: Remove, post: Post, -) -> Result { +) -> Result<(), LemmyError> { let removed_post = blocking(context.pool(), move |conn| { Post::update_removed(conn, post.id, true) }) @@ -242,5 +234,5 @@ pub(crate) async fn receive_remove_post( websocket_id: None, }); - Ok(HttpResponse::Ok().finish()) + Ok(()) } diff --git a/lemmy_apub/src/activities/receive/post_undo.rs b/lemmy_apub/src/activities/receive/post_undo.rs index bee56d86..99d0ed1d 100644 --- a/lemmy_apub/src/activities/receive/post_undo.rs +++ b/lemmy_apub/src/activities/receive/post_undo.rs @@ -1,11 +1,10 @@ use crate::{ - activities::receive::{announce_if_community_is_local, get_actor_as_user}, + activities::receive::get_actor_as_user, fetcher::get_or_fetch_and_insert_post, FromApub, PageExt, }; use activitystreams::{activity::*, prelude::*}; -use actix_web::HttpResponse; use anyhow::Context; use lemmy_db::{ post::{Post, PostForm, PostLike}, @@ -17,11 +16,10 @@ use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation}; pub(crate) async fn receive_undo_like_post( - undo: Undo, like: &Like, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(like, context, request_counter).await?; let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)? .context(location_info!())?; @@ -52,16 +50,14 @@ pub(crate) async fn receive_undo_like_post( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_dislike_post( - undo: Undo, dislike: &Dislike, context: &LemmyContext, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let user = get_actor_as_user(dislike, context, request_counter).await?; let page = PageExt::from_any_base( dislike @@ -98,16 +94,13 @@ pub(crate) async fn receive_undo_dislike_post( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_delete_post( context: &LemmyContext, - undo: Undo, post: Post, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let deleted_post = blocking(context.pool(), move |conn| { Post::update_deleted(conn, post.id, false) }) @@ -127,16 +120,13 @@ pub(crate) async fn receive_undo_delete_post( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } pub(crate) async fn receive_undo_remove_post( context: &LemmyContext, - undo: Undo, post: Post, - request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let removed_post = blocking(context.pool(), move |conn| { Post::update_removed(conn, post.id, false) }) @@ -157,6 +147,5 @@ pub(crate) async fn receive_undo_remove_post( websocket_id: None, }); - announce_if_community_is_local(undo, context, request_counter).await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } diff --git a/lemmy_apub/src/activities/receive/private_message.rs b/lemmy_apub/src/activities/receive/private_message.rs new file mode 100644 index 00000000..80fb0b70 --- /dev/null +++ b/lemmy_apub/src/activities/receive/private_message.rs @@ -0,0 +1,217 @@ +use crate::{ + activities::receive::verify_activity_domains_valid, + check_is_apub_id_valid, + fetcher::get_or_fetch_and_upsert_user, + inbox::get_activity_to_and_cc, + FromApub, +}; +use activitystreams::{ + activity::{ActorAndObjectRefExt, Create, Delete, Undo, Update}, + base::{AnyBase, AsBase, ExtendsExt}, + object::{AsObject, Note}, + public, +}; +use anyhow::{anyhow, Context}; +use lemmy_db::{ + private_message::{PrivateMessage, PrivateMessageForm}, + private_message_view::PrivateMessageView, + Crud, +}; +use lemmy_structs::{blocking, user::PrivateMessageResponse}; +use lemmy_utils::{location_info, LemmyError}; +use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation}; +use url::Url; + +pub(crate) async fn receive_create_private_message( + context: &LemmyContext, + activity: AnyBase, + expected_domain: Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let create = Create::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&create, &expected_domain, true)?; + check_private_message_activity_valid(&create, context, request_counter).await?; + + let note = Note::from_any_base( + create + .object() + .as_one() + .context(location_info!())? + .to_owned(), + )? + .context(location_info!())?; + + let private_message = + PrivateMessageForm::from_apub(¬e, context, Some(expected_domain), request_counter).await?; + + let inserted_private_message = blocking(&context.pool(), move |conn| { + PrivateMessage::create(conn, &private_message) + }) + .await??; + + let message = blocking(&context.pool(), move |conn| { + PrivateMessageView::read(conn, inserted_private_message.id) + }) + .await??; + + let res = PrivateMessageResponse { message }; + + let recipient_id = res.message.recipient_id; + + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::CreatePrivateMessage, + response: res, + recipient_id, + websocket_id: None, + }); + + Ok(()) +} + +pub(crate) async fn receive_update_private_message( + context: &LemmyContext, + activity: AnyBase, + expected_domain: Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let update = Update::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&update, &expected_domain, true)?; + check_private_message_activity_valid(&update, context, request_counter).await?; + + let object = update + .object() + .as_one() + .context(location_info!())? + .to_owned(); + let note = Note::from_any_base(object)?.context(location_info!())?; + + let private_message_form = + PrivateMessageForm::from_apub(¬e, context, Some(expected_domain), request_counter).await?; + + let private_message_ap_id = private_message_form + .ap_id + .as_ref() + .context(location_info!())? + .clone(); + let private_message = blocking(&context.pool(), move |conn| { + PrivateMessage::read_from_apub_id(conn, &private_message_ap_id) + }) + .await??; + + let private_message_id = private_message.id; + blocking(&context.pool(), move |conn| { + PrivateMessage::update(conn, private_message_id, &private_message_form) + }) + .await??; + + let private_message_id = private_message.id; + let message = blocking(&context.pool(), move |conn| { + PrivateMessageView::read(conn, private_message_id) + }) + .await??; + + let res = PrivateMessageResponse { message }; + + let recipient_id = res.message.recipient_id; + + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::EditPrivateMessage, + response: res, + recipient_id, + websocket_id: None, + }); + + Ok(()) +} + +pub(crate) async fn receive_delete_private_message( + context: &LemmyContext, + delete: Delete, + private_message: PrivateMessage, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + check_private_message_activity_valid(&delete, context, request_counter).await?; + + let deleted_private_message = blocking(context.pool(), move |conn| { + PrivateMessage::update_deleted(conn, private_message.id, true) + }) + .await??; + + let message = blocking(&context.pool(), move |conn| { + PrivateMessageView::read(&conn, deleted_private_message.id) + }) + .await??; + + let res = PrivateMessageResponse { message }; + let recipient_id = res.message.recipient_id; + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::EditPrivateMessage, + response: res, + recipient_id, + websocket_id: None, + }); + + Ok(()) +} + +pub(crate) async fn receive_undo_delete_private_message( + context: &LemmyContext, + undo: Undo, + expected_domain: &Url, + private_message: PrivateMessage, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + check_private_message_activity_valid(&undo, context, request_counter).await?; + let object = undo.object().to_owned().one().context(location_info!())?; + let delete = Delete::from_any_base(object)?.context(location_info!())?; + verify_activity_domains_valid(&delete, expected_domain, true)?; + check_private_message_activity_valid(&delete, context, request_counter).await?; + + let deleted_private_message = blocking(context.pool(), move |conn| { + PrivateMessage::update_deleted(conn, private_message.id, false) + }) + .await??; + + let message = blocking(&context.pool(), move |conn| { + PrivateMessageView::read(&conn, deleted_private_message.id) + }) + .await??; + + let res = PrivateMessageResponse { message }; + let recipient_id = res.message.recipient_id; + context.chat_server().do_send(SendUserRoomMessage { + op: UserOperation::EditPrivateMessage, + response: res, + recipient_id, + websocket_id: None, + }); + + Ok(()) +} + +async fn check_private_message_activity_valid( + activity: &T, + context: &LemmyContext, + request_counter: &mut i32, +) -> Result<(), LemmyError> +where + T: AsBase + AsObject + ActorAndObjectRefExt, +{ + let to_and_cc = get_activity_to_and_cc(activity)?; + if to_and_cc.len() != 1 { + return Err(anyhow!("Private message can only be addressed to one user").into()); + } + if to_and_cc.contains(&public()) { + return Err(anyhow!("Private message cant be public").into()); + } + let user_id = activity + .actor()? + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + check_is_apub_id_valid(&user_id)?; + // check that the sender is a user, not a community + get_or_fetch_and_upsert_user(&user_id, &context, request_counter).await?; + + Ok(()) +} diff --git a/lemmy_apub/src/inbox/community_inbox.rs b/lemmy_apub/src/inbox/community_inbox.rs index b80d739a..630a2db9 100644 --- a/lemmy_apub/src/inbox/community_inbox.rs +++ b/lemmy_apub/src/inbox/community_inbox.rs @@ -1,14 +1,25 @@ use crate::{ activities::receive::verify_activity_domains_valid, - check_is_apub_id_valid, - extensions::signatures::verify_signature, - fetcher::get_or_fetch_and_upsert_user, - inbox::{get_activity_id, is_activity_already_known}, + inbox::{ + get_activity_id, + get_activity_to_and_cc, + inbox_verify_http_signature, + is_activity_already_known, + is_addressed_to_public, + receive_for_community::{ + receive_create_for_community, + receive_delete_for_community, + receive_dislike_for_community, + receive_like_for_community, + receive_undo_for_community, + receive_update_for_community, + }, + }, insert_activity, ActorType, }; use activitystreams::{ - activity::{ActorAndObject, Follow, Undo}, + activity::{kind::FollowType, ActorAndObject, Follow, Undo}, base::AnyBase, prelude::*, }; @@ -25,89 +36,153 @@ use lemmy_websocket::LemmyContext; use log::info; use serde::{Deserialize, Serialize}; use std::fmt::Debug; +use url::Url; /// Allowed activities for community inbox. #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] #[serde(rename_all = "PascalCase")] -pub enum ValidTypes { - Follow, - Undo, +pub enum CommunityValidTypes { + Follow, // follow request from a user + Undo, // unfollow from a user + Create, // create post or comment + Update, // update post or comment + Like, // upvote post or comment + Dislike, // downvote post or comment + Delete, // post or comment deleted by creator + Remove, // post or comment removed by mod or admin } -pub type AcceptedActivities = ActorAndObject; +pub type CommunityAcceptedActivities = ActorAndObject; /// Handler for all incoming receive to community inboxes. pub async fn community_inbox( request: HttpRequest, - input: web::Json, + input: web::Json, path: web::Path, context: web::Data, ) -> Result { let activity = input.into_inner(); + // First of all check the http signature + let request_counter = &mut 0; + let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?; + + // Do nothing if we received the same activity before + let activity_id = get_activity_id(&activity, &actor.actor_id()?)?; + if is_activity_already_known(context.pool(), &activity_id).await? { + return Ok(HttpResponse::Ok().finish()); + } + // Check if the activity is actually meant for us let path = path.into_inner(); let community = blocking(&context.pool(), move |conn| { Community::read_from_name(&conn, &path) }) .await??; - - let to = activity - .to() - .context(location_info!())? - .to_owned() - .single_xsd_any_uri(); - if Some(community.actor_id()?) != to { + let to_and_cc = get_activity_to_and_cc(&activity)?; + if !to_and_cc.contains(&&community.actor_id()?) { return Err(anyhow!("Activity delivered to wrong community").into()); } + insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?; + info!( - "Community {} received activity {:?}", - &community.name, &activity - ); - let user_uri = activity - .actor()? - .as_single_xsd_any_uri() - .context(location_info!())?; - info!( - "Community {} inbox received activity {:?} from {}", + "Community {} received activity {:?} from {}", community.name, &activity.id_unchecked(), - &user_uri + &actor.actor_id_str() ); - check_is_apub_id_valid(user_uri)?; - - let request_counter = &mut 0; - let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?; - verify_signature(&request, &user)?; + community_receive_message( + activity.clone(), + community.clone(), + actor.as_ref(), + &context, + request_counter, + ) + .await +} - let activity_id = get_activity_id(&activity, user_uri)?; - if is_activity_already_known(context.pool(), &activity_id).await? { - return Ok(HttpResponse::Ok().finish()); - } +/// Receives Follow, Undo/Follow, post actions, comment actions (including votes) +pub(crate) async fn community_receive_message( + activity: CommunityAcceptedActivities, + to_community: Community, + actor: &dyn ActorType, + context: &LemmyContext, + request_counter: &mut i32, +) -> Result { + // TODO: check if the sending user is banned by the community let any_base = activity.clone().into_any_base()?; - let kind = activity.kind().context(location_info!())?; - let res = match kind { - ValidTypes::Follow => handle_follow(any_base, user, community, &context).await, - ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await, + let actor_url = actor.actor_id()?; + let activity_kind = activity.kind().context(location_info!())?; + let do_announce = match activity_kind { + CommunityValidTypes::Follow => { + handle_follow(any_base.clone(), actor_url, &to_community, &context).await?; + false + } + CommunityValidTypes::Undo => { + handle_undo( + context, + activity.clone(), + actor_url, + &to_community, + request_counter, + ) + .await? + } + CommunityValidTypes::Create => { + receive_create_for_community(context, any_base.clone(), &actor_url, request_counter).await?; + true + } + CommunityValidTypes::Update => { + receive_update_for_community(context, any_base.clone(), &actor_url, request_counter).await?; + true + } + CommunityValidTypes::Like => { + receive_like_for_community(context, any_base.clone(), &actor_url, request_counter).await?; + true + } + CommunityValidTypes::Dislike => { + receive_dislike_for_community(context, any_base.clone(), &actor_url, request_counter).await?; + true + } + CommunityValidTypes::Delete => { + receive_delete_for_community(context, any_base.clone(), &actor_url).await?; + true + } + CommunityValidTypes::Remove => { + // TODO: we dont support remote mods, so this is ignored for now + //receive_remove_for_community(context, any_base.clone(), &user_url).await? + false + } }; - insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?; - res + if do_announce { + // Check again that the activity is public, just to be sure + is_addressed_to_public(&activity)?; + to_community + .send_announce(activity.into_any_base()?, context) + .await?; + } + + return Ok(HttpResponse::Ok().finish()); } /// Handle a follow request from a remote user, adding the user as follower and returning an /// Accept activity. async fn handle_follow( activity: AnyBase, - user: User_, - community: Community, + user_url: Url, + community: &Community, context: &LemmyContext, ) -> Result { let follow = Follow::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&follow, user.actor_id()?, false)?; + verify_activity_domains_valid(&follow, &user_url, false)?; + let user = blocking(&context.pool(), move |conn| { + User_::read_from_actor_id(&conn, user_url.as_str()) + }) + .await??; let community_follower_form = CommunityFollowerForm { community_id: community.id, user_id: user.id, @@ -124,20 +199,44 @@ async fn handle_follow( Ok(HttpResponse::Ok().finish()) } +async fn handle_undo( + context: &LemmyContext, + activity: CommunityAcceptedActivities, + actor_url: Url, + to_community: &Community, + request_counter: &mut i32, +) -> Result { + let inner_kind = activity + .object() + .is_single_kind(&FollowType::Follow.to_string()); + let any_base = activity.into_any_base()?; + if inner_kind { + handle_undo_follow(any_base, actor_url, to_community, &context).await?; + Ok(false) + } else { + receive_undo_for_community(context, any_base, &actor_url, request_counter).await?; + Ok(true) + } +} + /// Handle `Undo/Follow` from a user, removing the user from followers list. async fn handle_undo_follow( activity: AnyBase, - user: User_, - community: Community, + user_url: Url, + community: &Community, context: &LemmyContext, -) -> Result { +) -> Result<(), LemmyError> { let undo = Undo::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&undo, user.actor_id()?, true)?; + verify_activity_domains_valid(&undo, &user_url, true)?; let object = undo.object().to_owned().one().context(location_info!())?; let follow = Follow::from_any_base(object)?.context(location_info!())?; - verify_activity_domains_valid(&follow, user.actor_id()?, false)?; + verify_activity_domains_valid(&follow, &user_url, false)?; + let user = blocking(&context.pool(), move |conn| { + User_::read_from_actor_id(&conn, user_url.as_str()) + }) + .await??; let community_follower_form = CommunityFollowerForm { community_id: community.id, user_id: user.id, @@ -149,5 +248,5 @@ async fn handle_undo_follow( }) .await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } diff --git a/lemmy_apub/src/inbox/mod.rs b/lemmy_apub/src/inbox/mod.rs index e2305b44..cb5bd9a7 100644 --- a/lemmy_apub/src/inbox/mod.rs +++ b/lemmy_apub/src/inbox/mod.rs @@ -1,12 +1,26 @@ -use activitystreams::base::{BaseExt, Extends}; -use anyhow::Context; +use crate::{ + check_is_apub_id_valid, + extensions::signatures::verify_signature, + fetcher::get_or_fetch_and_upsert_actor, + ActorType, +}; +use activitystreams::{ + activity::ActorAndObjectRefExt, + base::{AsBase, BaseExt, Extends}, + object::{AsObject, ObjectExt}, + public, +}; +use actix_web::HttpRequest; +use anyhow::{anyhow, Context}; use lemmy_db::{activity::Activity, DbPool}; use lemmy_structs::blocking; use lemmy_utils::{location_info, LemmyError}; +use lemmy_websocket::LemmyContext; use serde::{export::fmt::Debug, Serialize}; use url::Url; pub mod community_inbox; +mod receive_for_community; pub mod shared_inbox; pub mod user_inbox; @@ -35,3 +49,65 @@ pub(crate) async fn is_activity_already_known( Err(_) => Ok(false), } } + +pub(crate) fn get_activity_to_and_cc(activity: &T) -> Result, LemmyError> +where + T: AsBase + AsObject + ActorAndObjectRefExt, +{ + let mut to_and_cc = vec![]; + if let Some(to) = activity.to() { + let to = to.to_owned().unwrap_to_vec(); + let mut to = to + .iter() + .map(|t| t.as_xsd_any_uri()) + .flatten() + .map(|t| t.to_owned()) + .collect(); + to_and_cc.append(&mut to); + } + if let Some(cc) = activity.cc() { + let cc = cc.to_owned().unwrap_to_vec(); + let mut cc = cc + .iter() + .map(|c| c.as_xsd_any_uri()) + .flatten() + .map(|c| c.to_owned()) + .collect(); + to_and_cc.append(&mut cc); + } + Ok(to_and_cc) +} + +pub(crate) fn is_addressed_to_public(activity: &T) -> Result<(), LemmyError> +where + T: AsBase + AsObject + ActorAndObjectRefExt, +{ + let to_and_cc = get_activity_to_and_cc(activity)?; + if to_and_cc.contains(&public()) { + Ok(()) + } else { + Err(anyhow!("Activity is not addressed to public").into()) + } +} + +pub(crate) async fn inbox_verify_http_signature( + activity: &T, + context: &LemmyContext, + request: HttpRequest, + request_counter: &mut i32, +) -> Result, LemmyError> +where + T: AsObject + ActorAndObjectRefExt + Extends + AsBase, + Kind: Serialize, + >::Error: From + Send + Sync + 'static, +{ + let actor_id = activity + .actor()? + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + check_is_apub_id_valid(&actor_id)?; + let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?; + verify_signature(&request, actor.as_ref())?; + Ok(actor) +} diff --git a/lemmy_apub/src/inbox/receive_for_community.rs b/lemmy_apub/src/inbox/receive_for_community.rs new file mode 100644 index 00000000..d4cd43d7 --- /dev/null +++ b/lemmy_apub/src/inbox/receive_for_community.rs @@ -0,0 +1,345 @@ +use crate::{ + activities::receive::{ + comment::{ + receive_create_comment, + receive_delete_comment, + receive_dislike_comment, + receive_like_comment, + receive_remove_comment, + receive_update_comment, + }, + comment_undo::{ + receive_undo_delete_comment, + receive_undo_dislike_comment, + receive_undo_like_comment, + receive_undo_remove_comment, + }, + post::{ + receive_create_post, + receive_delete_post, + receive_dislike_post, + receive_like_post, + receive_remove_post, + receive_update_post, + }, + post_undo::{ + receive_undo_delete_post, + receive_undo_dislike_post, + receive_undo_like_post, + receive_undo_remove_post, + }, + receive_unhandled_activity, + verify_activity_domains_valid, + }, + inbox::is_addressed_to_public, +}; +use activitystreams::{ + activity::{Create, Delete, Dislike, Like, Remove, Undo, Update}, + base::AnyBase, + prelude::*, +}; +use anyhow::Context; +use diesel::result::Error::NotFound; +use lemmy_db::{comment::Comment, post::Post, site::Site, Crud}; +use lemmy_structs::blocking; +use lemmy_utils::{location_info, LemmyError}; +use lemmy_websocket::LemmyContext; +use url::Url; + +/// This file is for post/comment activities received by the community, and for post/comment +/// activities announced by the community and received by the user. + +/// A post or comment being created +pub(in crate::inbox) async fn receive_create_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let create = Create::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&create, &expected_domain, true)?; + is_addressed_to_public(&create)?; + + match create.object().as_single_kind_str() { + Some("Page") => receive_create_post(create, context, request_counter).await, + Some("Note") => receive_create_comment(create, context, request_counter).await, + _ => receive_unhandled_activity(create), + } +} + +/// A post or comment being edited +pub(in crate::inbox) async fn receive_update_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let update = Update::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&update, &expected_domain, true)?; + is_addressed_to_public(&update)?; + + match update.object().as_single_kind_str() { + Some("Page") => receive_update_post(update, context, request_counter).await, + Some("Note") => receive_update_comment(update, context, request_counter).await, + _ => receive_unhandled_activity(update), + } +} + +/// A post or comment being upvoted +pub(in crate::inbox) async fn receive_like_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let like = Like::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&like, &expected_domain, false)?; + is_addressed_to_public(&like)?; + + match like.object().as_single_kind_str() { + Some("Page") => receive_like_post(like, context, request_counter).await, + Some("Note") => receive_like_comment(like, context, request_counter).await, + _ => receive_unhandled_activity(like), + } +} + +/// A post or comment being downvoted +pub(in crate::inbox) async fn receive_dislike_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let enable_downvotes = blocking(context.pool(), move |conn| { + Site::read(conn, 1).map(|s| s.enable_downvotes) + }) + .await??; + if !enable_downvotes { + return Ok(()); + } + + let dislike = Dislike::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&dislike, &expected_domain, false)?; + is_addressed_to_public(&dislike)?; + + match dislike.object().as_single_kind_str() { + Some("Page") => receive_dislike_post(dislike, context, request_counter).await, + Some("Note") => receive_dislike_comment(dislike, context, request_counter).await, + _ => receive_unhandled_activity(dislike), + } +} + +/// A post or comment being deleted by its creator +pub(in crate::inbox) async fn receive_delete_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, +) -> Result<(), LemmyError> { + dbg!("receive_delete_for_community"); + let delete = Delete::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&delete, &expected_domain, true)?; + is_addressed_to_public(&delete)?; + + let object = delete + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + + match find_post_or_comment_by_id(context, object).await { + Ok(PostOrComment::Post(p)) => receive_delete_post(context, p).await, + Ok(PostOrComment::Comment(c)) => receive_delete_comment(context, c).await, + // if we dont have the object, no need to do anything + Err(_) => Ok(()), + } +} + +/// A post or comment being removed by a mod/admin +pub(in crate::inbox) async fn receive_remove_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, +) -> Result<(), LemmyError> { + dbg!("receive_remove_for_community"); + let remove = Remove::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&remove, &expected_domain, false)?; + is_addressed_to_public(&remove)?; + + let cc = remove + .cc() + .map(|c| c.as_many()) + .flatten() + .context(location_info!())?; + let community_id = cc + .first() + .map(|c| c.as_xsd_any_uri()) + .flatten() + .context(location_info!())?; + + let object = remove + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + + // Ensure that remove activity comes from the same domain as the community + remove.id(community_id.domain().context(location_info!())?)?; + + match find_post_or_comment_by_id(context, object).await { + Ok(PostOrComment::Post(p)) => receive_remove_post(context, remove, p).await, + Ok(PostOrComment::Comment(c)) => receive_remove_comment(context, remove, c).await, + // if we dont have the object, no need to do anything + Err(_) => Ok(()), + } +} + +/// A post/comment action being reverted (either a delete, remove, upvote or downvote) +pub(in crate::inbox) async fn receive_undo_for_community( + context: &LemmyContext, + activity: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let undo = Undo::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&undo, &expected_domain.to_owned(), true)?; + is_addressed_to_public(&undo)?; + + match undo.object().as_single_kind_str() { + Some("Delete") => receive_undo_delete_for_community(context, undo, expected_domain).await, + Some("Remove") => receive_undo_remove_for_community(context, undo, expected_domain).await, + Some("Like") => { + receive_undo_like_for_community(context, undo, expected_domain, request_counter).await + } + Some("Dislike") => { + receive_undo_dislike_for_community(context, undo, expected_domain, request_counter).await + } + _ => receive_unhandled_activity(undo), + } +} + +/// A post or comment deletion being reverted +pub(in crate::inbox) async fn receive_undo_delete_for_community( + context: &LemmyContext, + undo: Undo, + expected_domain: &Url, +) -> Result<(), LemmyError> { + let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)? + .context(location_info!())?; + verify_activity_domains_valid(&delete, &expected_domain, true)?; + is_addressed_to_public(&delete)?; + + let object = delete + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + match find_post_or_comment_by_id(context, object).await { + Ok(PostOrComment::Post(p)) => receive_undo_delete_post(context, p).await, + Ok(PostOrComment::Comment(c)) => receive_undo_delete_comment(context, c).await, + // if we dont have the object, no need to do anything + Err(_) => Ok(()), + } +} + +/// A post or comment removal being reverted +pub(in crate::inbox) async fn receive_undo_remove_for_community( + context: &LemmyContext, + undo: Undo, + expected_domain: &Url, +) -> Result<(), LemmyError> { + let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)? + .context(location_info!())?; + verify_activity_domains_valid(&remove, &expected_domain, false)?; + is_addressed_to_public(&remove)?; + + let object = remove + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + match find_post_or_comment_by_id(context, object).await { + Ok(PostOrComment::Post(p)) => receive_undo_remove_post(context, p).await, + Ok(PostOrComment::Comment(c)) => receive_undo_remove_comment(context, c).await, + // if we dont have the object, no need to do anything + Err(_) => Ok(()), + } +} + +/// A post or comment upvote being reverted +pub(in crate::inbox) async fn receive_undo_like_for_community( + context: &LemmyContext, + undo: Undo, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)? + .context(location_info!())?; + verify_activity_domains_valid(&like, &expected_domain, false)?; + is_addressed_to_public(&like)?; + + let type_ = like + .object() + .as_single_kind_str() + .context(location_info!())?; + match type_ { + "Note" => receive_undo_like_comment(&like, context, request_counter).await, + "Page" => receive_undo_like_post(&like, context, request_counter).await, + _ => receive_unhandled_activity(like), + } +} + +/// A post or comment downvote being reverted +pub(in crate::inbox) async fn receive_undo_dislike_for_community( + context: &LemmyContext, + undo: Undo, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)? + .context(location_info!())?; + verify_activity_domains_valid(&dislike, &expected_domain, false)?; + is_addressed_to_public(&dislike)?; + + let type_ = dislike + .object() + .as_single_kind_str() + .context(location_info!())?; + match type_ { + "Note" => receive_undo_dislike_comment(&dislike, context, request_counter).await, + "Page" => receive_undo_dislike_post(&dislike, context, request_counter).await, + _ => receive_unhandled_activity(dislike), + } +} + +enum PostOrComment { + Comment(Comment), + Post(Post), +} + +/// Tries to find a post or comment in the local database, without any network requests. +/// This is used to handle deletions and removals, because in case we dont have the object, we can +/// simply ignore the activity. +async fn find_post_or_comment_by_id( + context: &LemmyContext, + apub_id: Url, +) -> Result { + let ap_id = apub_id.to_string(); + let post = blocking(context.pool(), move |conn| { + Post::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(p) = post { + return Ok(PostOrComment::Post(p)); + } + + let ap_id = apub_id.to_string(); + let comment = blocking(context.pool(), move |conn| { + Comment::read_from_apub_id(conn, &ap_id) + }) + .await?; + if let Ok(c) = comment { + return Ok(PostOrComment::Comment(c)); + } + + return Err(NotFound.into()); +} diff --git a/lemmy_apub/src/inbox/shared_inbox.rs b/lemmy_apub/src/inbox/shared_inbox.rs index 3b07400d..ab0a5f18 100644 --- a/lemmy_apub/src/inbox/shared_inbox.rs +++ b/lemmy_apub/src/inbox/shared_inbox.rs @@ -1,63 +1,21 @@ use crate::{ - activities::receive::{ - comment::{ - receive_create_comment, - receive_delete_comment, - receive_dislike_comment, - receive_like_comment, - receive_remove_comment, - receive_update_comment, - }, - comment_undo::{ - receive_undo_delete_comment, - receive_undo_dislike_comment, - receive_undo_like_comment, - receive_undo_remove_comment, - }, - community::{ - receive_delete_community, - receive_remove_community, - receive_undo_delete_community, - receive_undo_remove_community, - }, - find_by_id, - post::{ - receive_create_post, - receive_delete_post, - receive_dislike_post, - receive_like_post, - receive_remove_post, - receive_update_post, - }, - post_undo::{ - receive_undo_delete_post, - receive_undo_dislike_post, - receive_undo_like_post, - receive_undo_remove_post, - }, - receive_unhandled_activity, - verify_activity_domains_valid, - FindResults, + inbox::{ + community_inbox::{community_receive_message, CommunityAcceptedActivities}, + get_activity_id, + get_activity_to_and_cc, + inbox_verify_http_signature, + is_activity_already_known, + user_inbox::{user_receive_message, UserAcceptedActivities}, }, - check_is_apub_id_valid, - extensions::signatures::verify_signature, - fetcher::get_or_fetch_and_upsert_actor, - inbox::{get_activity_id, is_activity_already_known}, insert_activity, - ActorType, -}; -use activitystreams::{ - activity::{ActorAndObject, Announce, Create, Delete, Dislike, Like, Remove, Undo, Update}, - base::AnyBase, - prelude::*, }; +use activitystreams::{activity::ActorAndObject, prelude::*}; use actix_web::{web, HttpRequest, HttpResponse}; -use anyhow::{anyhow, Context}; -use lemmy_db::{site::Site, Crud}; +use anyhow::Context; +use lemmy_db::{community::Community, user::User_, DbPool}; use lemmy_structs::blocking; use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; -use log::debug; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use url::Url; @@ -77,7 +35,7 @@ pub enum ValidTypes { } // TODO: this isnt entirely correct, cause some of these receive are not ActorAndObject, -// but it might still work due to the anybase conversion +// but it still works due to the anybase conversion pub type AcceptedActivities = ActorAndObject; /// Handler for all incoming requests to shared inbox. @@ -87,332 +45,141 @@ pub async fn shared_inbox( context: web::Data, ) -> Result { let activity = input.into_inner(); - - let actor_id = activity - .actor()? - .to_owned() - .single_xsd_any_uri() - .context(location_info!())?; - debug!( - "Shared inbox received activity {:?} from {}", - &activity.id_unchecked(), - &actor_id - ); - - check_is_apub_id_valid(&actor_id)?; - + // First of all check the http signature let request_counter = &mut 0; - let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?; - verify_signature(&request, actor.as_ref())?; + let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?; + // Do nothing if we received the same activity before + let actor_id = actor.actor_id()?; let activity_id = get_activity_id(&activity, &actor_id)?; if is_activity_already_known(context.pool(), &activity_id).await? { return Ok(HttpResponse::Ok().finish()); } - let any_base = activity.clone().into_any_base()?; - let kind = activity.kind().context(location_info!())?; - let res = match kind { - ValidTypes::Announce => { - receive_announce(&context, any_base, actor.as_ref(), request_counter).await - } - ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await, - ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await, - ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await, - ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await, - ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await, - ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await, - ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await, - }; - + // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen + // if we receive the same activity twice in very quick succession. insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?; - res -} - -/// Takes an announce and passes the inner activity to the appropriate handler. -async fn receive_announce( - context: &LemmyContext, - activity: AnyBase, - actor: &dyn ActorType, - request_counter: &mut i32, -) -> Result { - let announce = Announce::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&announce, actor.actor_id()?, false)?; - - let kind = announce.object().as_single_kind_str(); - let object = announce - .object() - .to_owned() - .one() - .context(location_info!())?; - - let inner_id = object.id().context(location_info!())?.to_owned(); - check_is_apub_id_valid(&inner_id)?; - if is_activity_already_known(context.pool(), &inner_id).await? { - return Ok(HttpResponse::Ok().finish()); - } - - match kind { - Some("Create") => receive_create(context, object, inner_id, request_counter).await, - Some("Update") => receive_update(context, object, inner_id, request_counter).await, - Some("Like") => receive_like(context, object, inner_id, request_counter).await, - Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await, - Some("Delete") => receive_delete(context, object, inner_id, request_counter).await, - Some("Remove") => receive_remove(context, object, inner_id).await, - Some("Undo") => receive_undo(context, object, inner_id, request_counter).await, - _ => receive_unhandled_activity(announce), - } -} -async fn receive_create( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let create = Create::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&create, expected_domain, true)?; - - match create.object().as_single_kind_str() { - Some("Page") => receive_create_post(create, context, request_counter).await, - Some("Note") => receive_create_comment(create, context, request_counter).await, - _ => receive_unhandled_activity(create), - } -} - -async fn receive_update( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let update = Update::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&update, expected_domain, true)?; - - match update.object().as_single_kind_str() { - Some("Page") => receive_update_post(update, context, request_counter).await, - Some("Note") => receive_update_comment(update, context, request_counter).await, - _ => receive_unhandled_activity(update), + let activity_any_base = activity.clone().into_any_base()?; + let mut res: Option = None; + let to_and_cc = get_activity_to_and_cc(&activity)?; + // If to_and_cc contains a local community, pass to receive_community_message() + // Handle community first, so in case the sender is banned by the community, it will error out. + // If we handled the user receive first, the activity would be inserted to the database before the + // community could check for bans. + let community = extract_local_community_from_destinations(&to_and_cc, context.pool()).await?; + if let Some(community) = community { + let community_activity = CommunityAcceptedActivities::from_any_base(activity_any_base.clone())? + .context(location_info!())?; + res = Some( + community_receive_message( + community_activity, + community, + actor.as_ref(), + &context, + request_counter, + ) + .await?, + ); } -} -async fn receive_like( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let like = Like::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&like, expected_domain, false)?; - - match like.object().as_single_kind_str() { - Some("Page") => receive_like_post(like, context, request_counter).await, - Some("Note") => receive_like_comment(like, context, request_counter).await, - _ => receive_unhandled_activity(like), + // If to_and_cc contains a local user, pass to receive_user_message() + if is_addressed_to_local_user(&to_and_cc, context.pool()).await? { + let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())? + .context(location_info!())?; + // `to_user` is only used for follow activities (which we dont receive here), so no need to pass + // it in + user_receive_message( + user_activity, + None, + actor.as_ref(), + &context, + request_counter, + ) + .await?; } -} -async fn receive_dislike( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let enable_downvotes = blocking(context.pool(), move |conn| { - Site::read(conn, 1).map(|s| s.enable_downvotes) - }) - .await??; - if !enable_downvotes { - return Ok(HttpResponse::Ok().finish()); + // If to_and_cc contains followers collection of a community, pass to receive_user_message() + if is_addressed_to_community_followers(&to_and_cc, context.pool()).await? { + let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())? + .context(location_info!())?; + res = Some( + user_receive_message( + user_activity, + None, + actor.as_ref(), + &context, + request_counter, + ) + .await?, + ); } - let dislike = Dislike::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&dislike, expected_domain, false)?; - - match dislike.object().as_single_kind_str() { - Some("Page") => receive_dislike_post(dislike, context, request_counter).await, - Some("Note") => receive_dislike_comment(dislike, context, request_counter).await, - _ => receive_unhandled_activity(dislike), + // If none of those, throw an error + if let Some(r) = res { + Ok(r) + } else { + Ok(HttpResponse::NotImplemented().finish()) } } -pub async fn receive_delete( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let delete = Delete::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&delete, expected_domain, true)?; - - let object = delete - .object() - .to_owned() - .single_xsd_any_uri() - .context(location_info!())?; - - match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await, - Ok(FindResults::Comment(c)) => { - receive_delete_comment(context, delete, c, request_counter).await - } - Ok(FindResults::Community(c)) => { - receive_delete_community(context, delete, c, request_counter).await +/// If `to_and_cc` contains the ID of a local community, return that community, otherwise return +/// None. +/// +/// This doesnt handle the case where an activity is addressed to multiple communities (because +/// Lemmy doesnt generate such activities). +async fn extract_local_community_from_destinations( + to_and_cc: &[Url], + pool: &DbPool, +) -> Result, LemmyError> { + for url in to_and_cc { + let url = url.to_string(); + let community = blocking(&pool, move |conn| { + Community::read_from_actor_id(&conn, &url) + }) + .await?; + if let Ok(c) = community { + if c.local { + return Ok(Some(c)); + } } - // if we dont have the object, no need to do anything - Err(_) => Ok(HttpResponse::Ok().finish()), - } -} - -async fn receive_remove( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, -) -> Result { - let remove = Remove::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&remove, expected_domain, false)?; - - let cc = remove - .cc() - .map(|c| c.as_many()) - .flatten() - .context(location_info!())?; - let community_id = cc - .first() - .map(|c| c.as_xsd_any_uri()) - .flatten() - .context(location_info!())?; - - let object = remove - .object() - .to_owned() - .single_xsd_any_uri() - .context(location_info!())?; - - // Ensure that remove activity comes from the same domain as the community - remove.id(community_id.domain().context(location_info!())?)?; - - match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_remove_post(context, remove, p).await, - Ok(FindResults::Comment(c)) => receive_remove_comment(context, remove, c).await, - Ok(FindResults::Community(c)) => receive_remove_community(context, remove, c).await, - // if we dont have the object, no need to do anything - Err(_) => Ok(HttpResponse::Ok().finish()), - } -} - -async fn receive_undo( - context: &LemmyContext, - activity: AnyBase, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let undo = Undo::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?; - - match undo.object().as_single_kind_str() { - Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await, - Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await, - Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await, - Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await, - _ => receive_unhandled_activity(undo), } + Ok(None) } -async fn receive_undo_delete( - context: &LemmyContext, - undo: Undo, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)? - .context(location_info!())?; - verify_activity_domains_valid(&delete, expected_domain, true)?; - - let object = delete - .object() - .to_owned() - .single_xsd_any_uri() - .context(location_info!())?; - match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await, - Ok(FindResults::Comment(c)) => { - receive_undo_delete_comment(context, undo, c, request_counter).await - } - Ok(FindResults::Community(c)) => { - receive_undo_delete_community(context, undo, c, request_counter).await +/// Returns true if `to_and_cc` contains at least one local user. +async fn is_addressed_to_local_user(to_and_cc: &[Url], pool: &DbPool) -> Result { + for url in to_and_cc { + let url = url.to_string(); + let user = blocking(&pool, move |conn| User_::read_from_actor_id(&conn, &url)).await?; + if let Ok(u) = user { + if u.local { + return Ok(true); + } } - // if we dont have the object, no need to do anything - Err(_) => Ok(HttpResponse::Ok().finish()), } + Ok(false) } -async fn receive_undo_remove( - context: &LemmyContext, - undo: Undo, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)? - .context(location_info!())?; - verify_activity_domains_valid(&remove, expected_domain, false)?; - - let object = remove - .object() - .to_owned() - .single_xsd_any_uri() - .context(location_info!())?; - match find_by_id(context, object).await { - Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await, - Ok(FindResults::Comment(c)) => { - receive_undo_remove_comment(context, undo, c, request_counter).await +/// Returns true if `to_and_cc` contains at least one followers collection of a remote community +/// (like `https://example.com/c/main/followers`) +async fn is_addressed_to_community_followers( + to_and_cc: &[Url], + pool: &DbPool, +) -> Result { + for url in to_and_cc { + let url = url.to_string(); + // TODO: extremely hacky, we should just store the followers url for each community in the db + if url.ends_with("/followers") { + let community_url = url.replace("/followers", ""); + let community = blocking(&pool, move |conn| { + Community::read_from_actor_id(&conn, &community_url) + }) + .await??; + if !community.local { + return Ok(true); + } } - Ok(FindResults::Community(c)) => { - receive_undo_remove_community(context, undo, c, request_counter).await - } - // if we dont have the object, no need to do anything - Err(_) => Ok(HttpResponse::Ok().finish()), - } -} - -async fn receive_undo_like( - context: &LemmyContext, - undo: Undo, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)? - .context(location_info!())?; - verify_activity_domains_valid(&like, expected_domain, false)?; - - let type_ = like - .object() - .as_single_kind_str() - .context(location_info!())?; - match type_ { - "Note" => receive_undo_like_comment(undo, &like, context, request_counter).await, - "Page" => receive_undo_like_post(undo, &like, context, request_counter).await, - d => Err(anyhow!("Undo Delete type {} not supported", d).into()), - } -} - -async fn receive_undo_dislike( - context: &LemmyContext, - undo: Undo, - expected_domain: Url, - request_counter: &mut i32, -) -> Result { - let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)? - .context(location_info!())?; - verify_activity_domains_valid(&dislike, expected_domain, false)?; - - let type_ = dislike - .object() - .as_single_kind_str() - .context(location_info!())?; - match type_ { - "Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await, - "Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await, - d => Err(anyhow!("Undo Delete type {} not supported", d).into()), } + Ok(false) } diff --git a/lemmy_apub/src/inbox/user_inbox.rs b/lemmy_apub/src/inbox/user_inbox.rs index 45fa5ee7..9173fec4 100644 --- a/lemmy_apub/src/inbox/user_inbox.rs +++ b/lemmy_apub/src/inbox/user_inbox.rs @@ -1,114 +1,164 @@ use crate::{ - activities::receive::verify_activity_domains_valid, + activities::receive::{ + community::{ + receive_delete_community, + receive_remove_community, + receive_undo_delete_community, + receive_undo_remove_community, + }, + private_message::{ + receive_create_private_message, + receive_delete_private_message, + receive_undo_delete_private_message, + receive_update_private_message, + }, + receive_unhandled_activity, + verify_activity_domains_valid, + }, check_is_apub_id_valid, - extensions::signatures::verify_signature, - fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community}, - inbox::{get_activity_id, is_activity_already_known}, + fetcher::get_or_fetch_and_upsert_community, + inbox::{ + get_activity_id, + get_activity_to_and_cc, + inbox_verify_http_signature, + is_activity_already_known, + is_addressed_to_public, + receive_for_community::{ + receive_create_for_community, + receive_delete_for_community, + receive_dislike_for_community, + receive_like_for_community, + receive_remove_for_community, + receive_undo_for_community, + receive_update_for_community, + }, + }, insert_activity, ActorType, - FromApub, }; use activitystreams::{ - activity::{Accept, ActorAndObject, Create, Delete, Follow, Undo, Update}, + activity::{Accept, ActorAndObject, Announce, Delete, Follow, Undo}, base::AnyBase, - object::Note, prelude::*, }; use actix_web::{web, HttpRequest, HttpResponse}; use anyhow::{anyhow, Context}; +use diesel::NotFound; use lemmy_db::{ - community::{CommunityFollower, CommunityFollowerForm}, - private_message::{PrivateMessage, PrivateMessageForm}, - private_message_view::PrivateMessageView, + community::{Community, CommunityFollower, CommunityFollowerForm}, + private_message::PrivateMessage, user::User_, - Crud, Followable, }; -use lemmy_structs::{blocking, user::PrivateMessageResponse}; +use lemmy_structs::blocking; use lemmy_utils::{location_info, LemmyError}; -use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation}; +use lemmy_websocket::LemmyContext; use log::debug; use serde::{Deserialize, Serialize}; use std::fmt::Debug; +use url::Url; /// Allowed activities for user inbox. #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] #[serde(rename_all = "PascalCase")] -pub enum ValidTypes { - Accept, - Create, - Update, - Delete, - Undo, +pub enum UserValidTypes { + Accept, // community accepted our follow request + Create, // create private message + Update, // edit private message + Delete, // private message or community deleted by creator + Undo, // private message or community restored + Remove, // community removed by admin + Announce, // post, comment or vote in community } -pub type AcceptedActivities = ActorAndObject; +pub type UserAcceptedActivities = ActorAndObject; /// Handler for all incoming activities to user inboxes. pub async fn user_inbox( request: HttpRequest, - input: web::Json, + input: web::Json, path: web::Path, context: web::Data, ) -> Result { let activity = input.into_inner(); + // First of all check the http signature + let request_counter = &mut 0; + let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?; + + // Do nothing if we received the same activity before + let activity_id = get_activity_id(&activity, &actor.actor_id()?)?; + if is_activity_already_known(context.pool(), &activity_id).await? { + return Ok(HttpResponse::Ok().finish()); + } + + // Check if the activity is actually meant for us let username = path.into_inner(); let user = blocking(&context.pool(), move |conn| { User_::read_from_name(&conn, &username) }) .await??; - - let to = activity - .to() - .context(location_info!())? - .to_owned() - .single_xsd_any_uri(); - if Some(user.actor_id()?) != to { + let to_and_cc = get_activity_to_and_cc(&activity)?; + if !to_and_cc.contains(&&user.actor_id()?) { return Err(anyhow!("Activity delivered to wrong user").into()); } - let actor_uri = activity - .actor()? - .as_single_xsd_any_uri() - .context(location_info!())?; + insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?; + debug!( - "User {} inbox received activity {:?} from {}", + "User {} received activity {:?} from {}", user.name, &activity.id_unchecked(), - &actor_uri + &actor.actor_id_str() ); - check_is_apub_id_valid(actor_uri)?; + user_receive_message( + activity.clone(), + Some(user.clone()), + actor.as_ref(), + &context, + request_counter, + ) + .await +} - let request_counter = &mut 0; - let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?; - verify_signature(&request, actor.as_ref())?; +/// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete +pub(crate) async fn user_receive_message( + activity: UserAcceptedActivities, + to_user: Option, + actor: &dyn ActorType, + context: &LemmyContext, + request_counter: &mut i32, +) -> Result { + // TODO: must be addressed to one or more local users, or to followers of a remote community - let activity_id = get_activity_id(&activity, actor_uri)?; - if is_activity_already_known(context.pool(), &activity_id).await? { - return Ok(HttpResponse::Ok().finish()); - } + // TODO: if it is addressed to community followers, check that at least one local user is following it let any_base = activity.clone().into_any_base()?; let kind = activity.kind().context(location_info!())?; - let res = match kind { - ValidTypes::Accept => { - receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await + let actor_url = actor.actor_id()?; + match kind { + UserValidTypes::Accept => { + receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?; } - ValidTypes::Create => { - receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await + UserValidTypes::Announce => { + receive_announce(&context, any_base, actor, request_counter).await? } - ValidTypes::Update => { - receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await + UserValidTypes::Create => { + receive_create_private_message(&context, any_base, actor_url, request_counter).await? } - ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await, - ValidTypes::Undo => { - receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await + UserValidTypes::Update => { + receive_update_private_message(&context, any_base, actor_url, request_counter).await? } + UserValidTypes::Delete => { + receive_delete(context, any_base, &actor_url, request_counter).await? + } + UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?, + UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?, }; - insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?; - res + // TODO: would be logical to move websocket notification code here + + Ok(HttpResponse::Ok().finish()) } /// Handle accepted follows. @@ -118,15 +168,15 @@ async fn receive_accept( actor: &dyn ActorType, user: User_, request_counter: &mut i32, -) -> Result { +) -> Result<(), LemmyError> { let accept = Accept::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&accept, actor.actor_id()?, false)?; + verify_activity_domains_valid(&accept, &actor.actor_id()?, false)?; // TODO: we should check that we actually sent this activity, because the remote instance // could just put a fake Follow let object = accept.object().to_owned().one().context(location_info!())?; let follow = Follow::from_any_base(object)?.context(location_info!())?; - verify_activity_domains_valid(&follow, user.actor_id()?, false)?; + verify_activity_domains_valid(&follow, &user.actor_id()?, false)?; let community_uri = accept .actor()? @@ -149,186 +199,137 @@ async fn receive_accept( }) .await?; - Ok(HttpResponse::Ok().finish()) + Ok(()) } -async fn receive_create_private_message( +/// Takes an announce and passes the inner activity to the appropriate handler. +async fn receive_announce( context: &LemmyContext, activity: AnyBase, actor: &dyn ActorType, request_counter: &mut i32, -) -> Result { - let create = Create::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&create, actor.actor_id()?, true)?; - - let note = Note::from_any_base( - create - .object() - .as_one() - .context(location_info!())? - .to_owned(), - )? - .context(location_info!())?; - - let private_message = - PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?; - - let inserted_private_message = blocking(&context.pool(), move |conn| { - PrivateMessage::create(conn, &private_message) - }) - .await??; - - let message = blocking(&context.pool(), move |conn| { - PrivateMessageView::read(conn, inserted_private_message.id) - }) - .await??; - - let res = PrivateMessageResponse { message }; - - let recipient_id = res.message.recipient_id; - - context.chat_server().do_send(SendUserRoomMessage { - op: UserOperation::CreatePrivateMessage, - response: res, - recipient_id, - websocket_id: None, - }); - - Ok(HttpResponse::Ok().finish()) -} - -async fn receive_update_private_message( - context: &LemmyContext, - activity: AnyBase, - actor: &dyn ActorType, - request_counter: &mut i32, -) -> Result { - let update = Update::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&update, actor.actor_id()?, true)?; +) -> Result<(), LemmyError> { + let announce = Announce::from_any_base(activity)?.context(location_info!())?; + verify_activity_domains_valid(&announce, &actor.actor_id()?, false)?; + is_addressed_to_public(&announce)?; - let object = update + let kind = announce.object().as_single_kind_str(); + let inner_activity = announce .object() - .as_one() - .context(location_info!())? - .to_owned(); - let note = Note::from_any_base(object)?.context(location_info!())?; - - let private_message_form = - PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?; - - let private_message_ap_id = private_message_form - .ap_id - .as_ref() - .context(location_info!())? - .clone(); - let private_message = blocking(&context.pool(), move |conn| { - PrivateMessage::read_from_apub_id(conn, &private_message_ap_id) - }) - .await??; - - let private_message_id = private_message.id; - blocking(&context.pool(), move |conn| { - PrivateMessage::update(conn, private_message_id, &private_message_form) - }) - .await??; - - let private_message_id = private_message.id; - let message = blocking(&context.pool(), move |conn| { - PrivateMessageView::read(conn, private_message_id) - }) - .await??; - - let res = PrivateMessageResponse { message }; - - let recipient_id = res.message.recipient_id; + .to_owned() + .one() + .context(location_info!())?; - context.chat_server().do_send(SendUserRoomMessage { - op: UserOperation::EditPrivateMessage, - response: res, - recipient_id, - websocket_id: None, - }); + let inner_id = inner_activity.id().context(location_info!())?.to_owned(); + check_is_apub_id_valid(&inner_id)?; + if is_activity_already_known(context.pool(), &inner_id).await? { + return Ok(()); + } - Ok(HttpResponse::Ok().finish()) + dbg!(&kind); + match kind { + Some("Create") => { + receive_create_for_community(context, inner_activity, &inner_id, request_counter).await + } + Some("Update") => { + receive_update_for_community(context, inner_activity, &inner_id, request_counter).await + } + Some("Like") => { + receive_like_for_community(context, inner_activity, &inner_id, request_counter).await + } + Some("Dislike") => { + receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await + } + Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await, + Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await, + Some("Undo") => { + receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await + } + _ => receive_unhandled_activity(inner_activity), + } } -async fn receive_delete_private_message( +async fn receive_delete( context: &LemmyContext, - activity: AnyBase, - actor: &dyn ActorType, -) -> Result { - let delete = Delete::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&delete, actor.actor_id()?, true)?; + any_base: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + use CommunityOrPrivateMessage::*; - let private_message_id = delete + let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?; + verify_activity_domains_valid(&delete, expected_domain, true)?; + let object_uri = delete .object() .to_owned() .single_xsd_any_uri() .context(location_info!())?; - let private_message = blocking(context.pool(), move |conn| { - PrivateMessage::read_from_apub_id(conn, private_message_id.as_str()) - }) - .await??; - let deleted_private_message = blocking(context.pool(), move |conn| { - PrivateMessage::update_deleted(conn, private_message.id, true) - }) - .await??; - - let message = blocking(&context.pool(), move |conn| { - PrivateMessageView::read(&conn, deleted_private_message.id) - }) - .await??; - - let res = PrivateMessageResponse { message }; - let recipient_id = res.message.recipient_id; - context.chat_server().do_send(SendUserRoomMessage { - op: UserOperation::EditPrivateMessage, - response: res, - recipient_id, - websocket_id: None, - }); - Ok(HttpResponse::Ok().finish()) + match find_community_or_private_message_by_id(context, object_uri).await? { + Community(c) => receive_delete_community(context, c).await, + PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await, + } } -async fn receive_undo_delete_private_message( +async fn receive_undo( context: &LemmyContext, - activity: AnyBase, - actor: &dyn ActorType, -) -> Result { - let undo = Undo::from_any_base(activity)?.context(location_info!())?; - verify_activity_domains_valid(&undo, actor.actor_id()?, true)?; - let object = undo.object().to_owned().one().context(location_info!())?; - let delete = Delete::from_any_base(object)?.context(location_info!())?; - verify_activity_domains_valid(&delete, actor.actor_id()?, true)?; + any_base: AnyBase, + expected_domain: &Url, + request_counter: &mut i32, +) -> Result<(), LemmyError> { + use CommunityOrPrivateMessage::*; + let undo = Undo::from_any_base(any_base)?.context(location_info!())?; + verify_activity_domains_valid(&undo, expected_domain, true)?; + + let inner_activity = undo.object().to_owned().one().context(location_info!())?; + let kind = inner_activity.kind_str(); + match kind { + Some("Delete") => { + let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?; + verify_activity_domains_valid(&delete, expected_domain, true)?; + let object_uri = delete + .object() + .to_owned() + .single_xsd_any_uri() + .context(location_info!())?; + match find_community_or_private_message_by_id(context, object_uri).await? { + Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await, + PrivateMessage(p) => { + receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter) + .await + } + } + } + Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await, + _ => receive_unhandled_activity(undo), + } +} +enum CommunityOrPrivateMessage { + Community(Community), + PrivateMessage(PrivateMessage), +} - let private_message_id = delete - .object() - .to_owned() - .single_xsd_any_uri() - .context(location_info!())?; - let private_message = blocking(context.pool(), move |conn| { - PrivateMessage::read_from_apub_id(conn, private_message_id.as_str()) - }) - .await??; - let deleted_private_message = blocking(context.pool(), move |conn| { - PrivateMessage::update_deleted(conn, private_message.id, false) +async fn find_community_or_private_message_by_id( + context: &LemmyContext, + apub_id: Url, +) -> Result { + let ap_id = apub_id.to_string(); + let community = blocking(context.pool(), move |conn| { + Community::read_from_actor_id(conn, &ap_id) }) - .await??; + .await?; + if let Ok(c) = community { + return Ok(CommunityOrPrivateMessage::Community(c)); + } - let message = blocking(&context.pool(), move |conn| { - PrivateMessageView::read(&conn, deleted_private_message.id) + let ap_id = apub_id.to_string(); + let private_message = blocking(context.pool(), move |conn| { + PrivateMessage::read_from_apub_id(conn, &ap_id) }) - .await??; - - let res = PrivateMessageResponse { message }; - let recipient_id = res.message.recipient_id; - context.chat_server().do_send(SendUserRoomMessage { - op: UserOperation::EditPrivateMessage, - response: res, - recipient_id, - websocket_id: None, - }); + .await?; + if let Ok(p) = private_message { + return Ok(CommunityOrPrivateMessage::PrivateMessage(p)); + } - Ok(HttpResponse::Ok().finish()) + return Err(NotFound.into()); }