use crate::{
- activities::receive::{announce_if_community_is_local, get_actor_as_user},
+ activities::receive::get_actor_as_user,
fetcher::get_or_fetch_and_insert_comment,
ActorType,
FromApub,
};
use activitystreams::{
- activity::{ActorAndObjectRefExt, Create, Delete, Dislike, Like, Remove, Update},
+ activity::{ActorAndObjectRefExt, Create, Dislike, Like, Remove, Update},
base::ExtendsExt,
object::Note,
};
-use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::{Comment, CommentForm, CommentLike, CommentLikeForm},
create: Create,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(&create, context, request_counter).await?;
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
websocket_id: None,
});
- announce_if_community_is_local(create, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_update_comment(
update: Update,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let user = get_actor_as_user(&update, context, request_counter).await?;
websocket_id: None,
});
- announce_if_community_is_local(update, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_like_comment(
like: Like,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let user = get_actor_as_user(&like, context, request_counter).await?;
websocket_id: None,
});
- announce_if_community_is_local(like, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_dislike_comment(
dislike: Dislike,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let note = Note::from_any_base(
dislike
.object()
websocket_id: None,
});
- announce_if_community_is_local(dislike, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_delete_comment(
context: &LemmyContext,
- delete: Delete,
comment: Comment,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let deleted_comment = blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, comment.id, true)
})
websocket_id: None,
});
- announce_if_community_is_local(delete, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_remove_comment(
context: &LemmyContext,
_remove: Remove,
comment: Comment,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let removed_comment = blocking(context.pool(), move |conn| {
Comment::update_removed(conn, comment.id, true)
})
websocket_id: None,
});
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
use crate::{
- activities::receive::{announce_if_community_is_local, get_actor_as_user},
+ activities::receive::get_actor_as_user,
fetcher::get_or_fetch_and_insert_comment,
FromApub,
};
use activitystreams::{activity::*, object::Note, prelude::*};
-use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::{Comment, CommentForm, CommentLike},
use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
pub(crate) async fn receive_undo_like_comment(
- undo: Undo,
like: &Like,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(like, context, request_counter).await?;
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_dislike_comment(
- undo: Undo,
dislike: &Dislike,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(dislike, context, request_counter).await?;
let note = Note::from_any_base(
dislike
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_delete_comment(
context: &LemmyContext,
- undo: Undo,
comment: Comment,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let deleted_comment = blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, comment.id, false)
})
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_remove_comment(
context: &LemmyContext,
- undo: Undo,
comment: Comment,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let removed_comment = blocking(context.pool(), move |conn| {
Comment::update_removed(conn, comment.id, false)
})
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
-use crate::activities::receive::announce_if_community_is_local;
-use activitystreams::activity::{Delete, Remove, Undo};
-use actix_web::HttpResponse;
+use crate::{activities::receive::verify_activity_domains_valid, inbox::is_addressed_to_public};
+use activitystreams::{
+ activity::{ActorAndObjectRefExt, Delete, Remove, Undo},
+ base::{AnyBase, ExtendsExt},
+};
+use anyhow::Context;
use lemmy_db::{community::Community, community_view::CommunityView};
use lemmy_structs::{blocking, community::CommunityResponse};
-use lemmy_utils::LemmyError;
+use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation};
+use url::Url;
pub(crate) async fn receive_delete_community(
context: &LemmyContext,
- delete: Delete,
community: Community,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, true)
})
websocket_id: None,
});
- announce_if_community_is_local(delete, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_remove_community(
context: &LemmyContext,
- _remove: Remove,
- community: Community,
-) -> Result<HttpResponse, LemmyError> {
+ activity: AnyBase,
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ let remove = Remove::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&remove, expected_domain, true)?;
+ is_addressed_to_public(&remove)?;
+
+ let community_uri = remove
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ let community = blocking(context.pool(), move |conn| {
+ Community::read_from_actor_id(conn, community_uri.as_str())
+ })
+ .await??;
+
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, true)
})
websocket_id: None,
});
- // TODO: this should probably also call announce_if_community_is_local()
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_delete_community(
context: &LemmyContext,
undo: Undo,
community: Community,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ is_addressed_to_public(&undo)?;
+ let inner = undo.object().to_owned().one().context(location_info!())?;
+ let delete = Delete::from_any_base(inner)?.context(location_info!())?;
+ verify_activity_domains_valid(&delete, expected_domain, true)?;
+ is_addressed_to_public(&delete)?;
+
let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, false)
})
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_remove_community(
context: &LemmyContext,
undo: Undo,
- community: Community,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ is_addressed_to_public(&undo)?;
+
+ let inner = undo.object().to_owned().one().context(location_info!())?;
+ let remove = Remove::from_any_base(inner)?.context(location_info!())?;
+ verify_activity_domains_valid(&remove, &expected_domain, true)?;
+ is_addressed_to_public(&remove)?;
+
+ let community_uri = remove
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ let community = blocking(context.pool(), move |conn| {
+ Community::read_from_actor_id(conn, community_uri.as_str())
+ })
+ .await??;
+
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, false)
})
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
-use crate::{
- fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
- ActorType,
-};
+use crate::fetcher::get_or_fetch_and_upsert_user;
use activitystreams::{
activity::{ActorAndObjectRef, ActorAndObjectRefExt},
- base::{AsBase, BaseExt, Extends, ExtendsExt},
+ base::{AsBase, BaseExt},
error::DomainError,
- object::{AsObject, ObjectExt},
};
-use actix_web::HttpResponse;
-use anyhow::Context;
-use diesel::result::Error::NotFound;
-use lemmy_db::{comment::Comment, community::Community, post::Post, user::User_};
-use lemmy_structs::blocking;
+use anyhow::{anyhow, Context};
+use lemmy_db::user::User_;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
use log::debug;
-use serde::Serialize;
use std::fmt::Debug;
use url::Url;
pub(crate) mod community;
pub(crate) mod post;
pub(crate) mod post_undo;
+pub(crate) mod private_message;
/// Return HTTP 501 for unsupported activities in inbox.
-pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, LemmyError>
+pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<(), LemmyError>
where
A: Debug,
{
debug!("received unhandled activity type: {:?}", activity);
- Ok(HttpResponse::NotImplemented().finish())
-}
-
-/// Reads the destination community from the activity's `cc` field. If this refers to a local
-/// community, the activity is announced to all community followers.
-async fn announce_if_community_is_local<T, Kind>(
- activity: T,
- context: &LemmyContext,
- request_counter: &mut i32,
-) -> Result<(), LemmyError>
-where
- T: AsObject<Kind>,
- T: Extends<Kind>,
- Kind: Serialize,
- <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
-{
- let cc = activity.cc().context(location_info!())?;
- let cc = cc.as_many().context(location_info!())?;
- let community_uri = cc
- .first()
- .context(location_info!())?
- .as_xsd_any_uri()
- .context(location_info!())?;
- // TODO: we could just read from the local db here (and ignore if the community is not found)
- let community =
- get_or_fetch_and_upsert_community(&community_uri, context, request_counter).await?;
-
- if community.local {
- community
- .send_announce(activity.into_any_base()?, context)
- .await?;
- }
- Ok(())
+ Err(anyhow!("Activity not supported").into())
}
/// Reads the actor field of an activity and returns the corresponding `User_`.
get_or_fetch_and_upsert_user(&user_uri, context, request_counter).await
}
-pub(crate) enum FindResults {
- Comment(Comment),
- Community(Community),
- Post(Post),
-}
-
-/// Tries to find a community, post or comment in the local database, without any network requests.
-/// This is used to handle deletions and removals, because in case we dont have the object, we can
-/// simply ignore the activity.
-pub(crate) async fn find_by_id(
- context: &LemmyContext,
- apub_id: Url,
-) -> Result<FindResults, LemmyError> {
- let ap_id = apub_id.to_string();
- let community = blocking(context.pool(), move |conn| {
- Community::read_from_actor_id(conn, &ap_id)
- })
- .await?;
- if let Ok(c) = community {
- return Ok(FindResults::Community(c));
- }
-
- let ap_id = apub_id.to_string();
- let post = blocking(context.pool(), move |conn| {
- Post::read_from_apub_id(conn, &ap_id)
- })
- .await?;
- if let Ok(p) = post {
- return Ok(FindResults::Post(p));
- }
-
- let ap_id = apub_id.to_string();
- let comment = blocking(context.pool(), move |conn| {
- Comment::read_from_apub_id(conn, &ap_id)
- })
- .await?;
- if let Ok(c) = comment {
- return Ok(FindResults::Comment(c));
- }
-
- return Err(NotFound.into());
-}
-
/// Ensure that the ID of an incoming activity comes from the same domain as the actor. Optionally
/// also checks the ID of the inner object.
///
/// HTTP signature.
pub(crate) fn verify_activity_domains_valid<T, Kind>(
activity: &T,
- actor_id: Url,
+ actor_id: &Url,
object_domain_must_match: bool,
) -> Result<(), LemmyError>
where
use crate::{
- activities::receive::{announce_if_community_is_local, get_actor_as_user},
+ activities::receive::get_actor_as_user,
fetcher::get_or_fetch_and_insert_post,
ActorType,
FromApub,
PageExt,
};
use activitystreams::{
- activity::{Create, Delete, Dislike, Like, Remove, Update},
+ activity::{Create, Dislike, Like, Remove, Update},
prelude::*,
};
-use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
post::{Post, PostForm, PostLike, PostLikeForm},
create: Create,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(&create, context, request_counter).await?;
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
websocket_id: None,
});
- announce_if_community_is_local(create, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_update_post(
update: Update,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(&update, context, request_counter).await?;
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
websocket_id: None,
});
- announce_if_community_is_local(update, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_like_post(
like: Like,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(&like, context, request_counter).await?;
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
websocket_id: None,
});
- announce_if_community_is_local(like, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_dislike_post(
dislike: Dislike,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(&dislike, context, request_counter).await?;
let page = PageExt::from_any_base(
dislike
websocket_id: None,
});
- announce_if_community_is_local(dislike, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_delete_post(
context: &LemmyContext,
- delete: Delete,
post: Post,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let deleted_post = blocking(context.pool(), move |conn| {
Post::update_deleted(conn, post.id, true)
})
websocket_id: None,
});
- announce_if_community_is_local(delete, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_remove_post(
context: &LemmyContext,
_remove: Remove,
post: Post,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let removed_post = blocking(context.pool(), move |conn| {
Post::update_removed(conn, post.id, true)
})
websocket_id: None,
});
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
use crate::{
- activities::receive::{announce_if_community_is_local, get_actor_as_user},
+ activities::receive::get_actor_as_user,
fetcher::get_or_fetch_and_insert_post,
FromApub,
PageExt,
};
use activitystreams::{activity::*, prelude::*};
-use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
post::{Post, PostForm, PostLike},
use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
pub(crate) async fn receive_undo_like_post(
- undo: Undo,
like: &Like,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(like, context, request_counter).await?;
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_dislike_post(
- undo: Undo,
dislike: &Dislike,
context: &LemmyContext,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let user = get_actor_as_user(dislike, context, request_counter).await?;
let page = PageExt::from_any_base(
dislike
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_delete_post(
context: &LemmyContext,
- undo: Undo,
post: Post,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let deleted_post = blocking(context.pool(), move |conn| {
Post::update_deleted(conn, post.id, false)
})
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
pub(crate) async fn receive_undo_remove_post(
context: &LemmyContext,
- undo: Undo,
post: Post,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let removed_post = blocking(context.pool(), move |conn| {
Post::update_removed(conn, post.id, false)
})
websocket_id: None,
});
- announce_if_community_is_local(undo, context, request_counter).await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
--- /dev/null
+use crate::{
+ activities::receive::verify_activity_domains_valid,
+ check_is_apub_id_valid,
+ fetcher::get_or_fetch_and_upsert_user,
+ inbox::get_activity_to_and_cc,
+ FromApub,
+};
+use activitystreams::{
+ activity::{ActorAndObjectRefExt, Create, Delete, Undo, Update},
+ base::{AnyBase, AsBase, ExtendsExt},
+ object::{AsObject, Note},
+ public,
+};
+use anyhow::{anyhow, Context};
+use lemmy_db::{
+ private_message::{PrivateMessage, PrivateMessageForm},
+ private_message_view::PrivateMessageView,
+ Crud,
+};
+use lemmy_structs::{blocking, user::PrivateMessageResponse};
+use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
+use url::Url;
+
+pub(crate) async fn receive_create_private_message(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let create = Create::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&create, &expected_domain, true)?;
+ check_private_message_activity_valid(&create, context, request_counter).await?;
+
+ let note = Note::from_any_base(
+ create
+ .object()
+ .as_one()
+ .context(location_info!())?
+ .to_owned(),
+ )?
+ .context(location_info!())?;
+
+ let private_message =
+ PrivateMessageForm::from_apub(¬e, context, Some(expected_domain), request_counter).await?;
+
+ let inserted_private_message = blocking(&context.pool(), move |conn| {
+ PrivateMessage::create(conn, &private_message)
+ })
+ .await??;
+
+ let message = blocking(&context.pool(), move |conn| {
+ PrivateMessageView::read(conn, inserted_private_message.id)
+ })
+ .await??;
+
+ let res = PrivateMessageResponse { message };
+
+ let recipient_id = res.message.recipient_id;
+
+ context.chat_server().do_send(SendUserRoomMessage {
+ op: UserOperation::CreatePrivateMessage,
+ response: res,
+ recipient_id,
+ websocket_id: None,
+ });
+
+ Ok(())
+}
+
+pub(crate) async fn receive_update_private_message(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let update = Update::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&update, &expected_domain, true)?;
+ check_private_message_activity_valid(&update, context, request_counter).await?;
+
+ let object = update
+ .object()
+ .as_one()
+ .context(location_info!())?
+ .to_owned();
+ let note = Note::from_any_base(object)?.context(location_info!())?;
+
+ let private_message_form =
+ PrivateMessageForm::from_apub(¬e, context, Some(expected_domain), request_counter).await?;
+
+ let private_message_ap_id = private_message_form
+ .ap_id
+ .as_ref()
+ .context(location_info!())?
+ .clone();
+ let private_message = blocking(&context.pool(), move |conn| {
+ PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
+ })
+ .await??;
+
+ let private_message_id = private_message.id;
+ blocking(&context.pool(), move |conn| {
+ PrivateMessage::update(conn, private_message_id, &private_message_form)
+ })
+ .await??;
+
+ let private_message_id = private_message.id;
+ let message = blocking(&context.pool(), move |conn| {
+ PrivateMessageView::read(conn, private_message_id)
+ })
+ .await??;
+
+ let res = PrivateMessageResponse { message };
+
+ let recipient_id = res.message.recipient_id;
+
+ context.chat_server().do_send(SendUserRoomMessage {
+ op: UserOperation::EditPrivateMessage,
+ response: res,
+ recipient_id,
+ websocket_id: None,
+ });
+
+ Ok(())
+}
+
+pub(crate) async fn receive_delete_private_message(
+ context: &LemmyContext,
+ delete: Delete,
+ private_message: PrivateMessage,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ check_private_message_activity_valid(&delete, context, request_counter).await?;
+
+ let deleted_private_message = blocking(context.pool(), move |conn| {
+ PrivateMessage::update_deleted(conn, private_message.id, true)
+ })
+ .await??;
+
+ let message = blocking(&context.pool(), move |conn| {
+ PrivateMessageView::read(&conn, deleted_private_message.id)
+ })
+ .await??;
+
+ let res = PrivateMessageResponse { message };
+ let recipient_id = res.message.recipient_id;
+ context.chat_server().do_send(SendUserRoomMessage {
+ op: UserOperation::EditPrivateMessage,
+ response: res,
+ recipient_id,
+ websocket_id: None,
+ });
+
+ Ok(())
+}
+
+pub(crate) async fn receive_undo_delete_private_message(
+ context: &LemmyContext,
+ undo: Undo,
+ expected_domain: &Url,
+ private_message: PrivateMessage,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ check_private_message_activity_valid(&undo, context, request_counter).await?;
+ let object = undo.object().to_owned().one().context(location_info!())?;
+ let delete = Delete::from_any_base(object)?.context(location_info!())?;
+ verify_activity_domains_valid(&delete, expected_domain, true)?;
+ check_private_message_activity_valid(&delete, context, request_counter).await?;
+
+ let deleted_private_message = blocking(context.pool(), move |conn| {
+ PrivateMessage::update_deleted(conn, private_message.id, false)
+ })
+ .await??;
+
+ let message = blocking(&context.pool(), move |conn| {
+ PrivateMessageView::read(&conn, deleted_private_message.id)
+ })
+ .await??;
+
+ let res = PrivateMessageResponse { message };
+ let recipient_id = res.message.recipient_id;
+ context.chat_server().do_send(SendUserRoomMessage {
+ op: UserOperation::EditPrivateMessage,
+ response: res,
+ recipient_id,
+ websocket_id: None,
+ });
+
+ Ok(())
+}
+
+async fn check_private_message_activity_valid<T, Kind>(
+ activity: &T,
+ context: &LemmyContext,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError>
+where
+ T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
+{
+ let to_and_cc = get_activity_to_and_cc(activity)?;
+ if to_and_cc.len() != 1 {
+ return Err(anyhow!("Private message can only be addressed to one user").into());
+ }
+ if to_and_cc.contains(&public()) {
+ return Err(anyhow!("Private message cant be public").into());
+ }
+ let user_id = activity
+ .actor()?
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ check_is_apub_id_valid(&user_id)?;
+ // check that the sender is a user, not a community
+ get_or_fetch_and_upsert_user(&user_id, &context, request_counter).await?;
+
+ Ok(())
+}
use crate::{
activities::receive::verify_activity_domains_valid,
- check_is_apub_id_valid,
- extensions::signatures::verify_signature,
- fetcher::get_or_fetch_and_upsert_user,
- inbox::{get_activity_id, is_activity_already_known},
+ inbox::{
+ get_activity_id,
+ get_activity_to_and_cc,
+ inbox_verify_http_signature,
+ is_activity_already_known,
+ is_addressed_to_public,
+ receive_for_community::{
+ receive_create_for_community,
+ receive_delete_for_community,
+ receive_dislike_for_community,
+ receive_like_for_community,
+ receive_undo_for_community,
+ receive_update_for_community,
+ },
+ },
insert_activity,
ActorType,
};
use activitystreams::{
- activity::{ActorAndObject, Follow, Undo},
+ activity::{kind::FollowType, ActorAndObject, Follow, Undo},
base::AnyBase,
prelude::*,
};
use log::info;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
+use url::Url;
/// Allowed activities for community inbox.
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
-pub enum ValidTypes {
- Follow,
- Undo,
+pub enum CommunityValidTypes {
+ Follow, // follow request from a user
+ Undo, // unfollow from a user
+ Create, // create post or comment
+ Update, // update post or comment
+ Like, // upvote post or comment
+ Dislike, // downvote post or comment
+ Delete, // post or comment deleted by creator
+ Remove, // post or comment removed by mod or admin
}
-pub type AcceptedActivities = ActorAndObject<ValidTypes>;
+pub type CommunityAcceptedActivities = ActorAndObject<CommunityValidTypes>;
/// Handler for all incoming receive to community inboxes.
pub async fn community_inbox(
request: HttpRequest,
- input: web::Json<AcceptedActivities>,
+ input: web::Json<CommunityAcceptedActivities>,
path: web::Path<String>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError> {
let activity = input.into_inner();
+ // First of all check the http signature
+ let request_counter = &mut 0;
+ let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
+
+ // Do nothing if we received the same activity before
+ let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
+ if is_activity_already_known(context.pool(), &activity_id).await? {
+ return Ok(HttpResponse::Ok().finish());
+ }
+ // Check if the activity is actually meant for us
let path = path.into_inner();
let community = blocking(&context.pool(), move |conn| {
Community::read_from_name(&conn, &path)
})
.await??;
-
- let to = activity
- .to()
- .context(location_info!())?
- .to_owned()
- .single_xsd_any_uri();
- if Some(community.actor_id()?) != to {
+ let to_and_cc = get_activity_to_and_cc(&activity)?;
+ if !to_and_cc.contains(&&community.actor_id()?) {
return Err(anyhow!("Activity delivered to wrong community").into());
}
+ insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
+
info!(
- "Community {} received activity {:?}",
- &community.name, &activity
- );
- let user_uri = activity
- .actor()?
- .as_single_xsd_any_uri()
- .context(location_info!())?;
- info!(
- "Community {} inbox received activity {:?} from {}",
+ "Community {} received activity {:?} from {}",
community.name,
&activity.id_unchecked(),
- &user_uri
+ &actor.actor_id_str()
);
- check_is_apub_id_valid(user_uri)?;
-
- let request_counter = &mut 0;
- let user = get_or_fetch_and_upsert_user(&user_uri, &context, request_counter).await?;
- verify_signature(&request, &user)?;
+ community_receive_message(
+ activity.clone(),
+ community.clone(),
+ actor.as_ref(),
+ &context,
+ request_counter,
+ )
+ .await
+}
- let activity_id = get_activity_id(&activity, user_uri)?;
- if is_activity_already_known(context.pool(), &activity_id).await? {
- return Ok(HttpResponse::Ok().finish());
- }
+/// Receives Follow, Undo/Follow, post actions, comment actions (including votes)
+pub(crate) async fn community_receive_message(
+ activity: CommunityAcceptedActivities,
+ to_community: Community,
+ actor: &dyn ActorType,
+ context: &LemmyContext,
+ request_counter: &mut i32,
+) -> Result<HttpResponse, LemmyError> {
+ // TODO: check if the sending user is banned by the community
let any_base = activity.clone().into_any_base()?;
- let kind = activity.kind().context(location_info!())?;
- let res = match kind {
- ValidTypes::Follow => handle_follow(any_base, user, community, &context).await,
- ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await,
+ let actor_url = actor.actor_id()?;
+ let activity_kind = activity.kind().context(location_info!())?;
+ let do_announce = match activity_kind {
+ CommunityValidTypes::Follow => {
+ handle_follow(any_base.clone(), actor_url, &to_community, &context).await?;
+ false
+ }
+ CommunityValidTypes::Undo => {
+ handle_undo(
+ context,
+ activity.clone(),
+ actor_url,
+ &to_community,
+ request_counter,
+ )
+ .await?
+ }
+ CommunityValidTypes::Create => {
+ receive_create_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+ true
+ }
+ CommunityValidTypes::Update => {
+ receive_update_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+ true
+ }
+ CommunityValidTypes::Like => {
+ receive_like_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+ true
+ }
+ CommunityValidTypes::Dislike => {
+ receive_dislike_for_community(context, any_base.clone(), &actor_url, request_counter).await?;
+ true
+ }
+ CommunityValidTypes::Delete => {
+ receive_delete_for_community(context, any_base.clone(), &actor_url).await?;
+ true
+ }
+ CommunityValidTypes::Remove => {
+ // TODO: we dont support remote mods, so this is ignored for now
+ //receive_remove_for_community(context, any_base.clone(), &user_url).await?
+ false
+ }
};
- insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
- res
+ if do_announce {
+ // Check again that the activity is public, just to be sure
+ is_addressed_to_public(&activity)?;
+ to_community
+ .send_announce(activity.into_any_base()?, context)
+ .await?;
+ }
+
+ return Ok(HttpResponse::Ok().finish());
}
/// Handle a follow request from a remote user, adding the user as follower and returning an
/// Accept activity.
async fn handle_follow(
activity: AnyBase,
- user: User_,
- community: Community,
+ user_url: Url,
+ community: &Community,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let follow = Follow::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
+ verify_activity_domains_valid(&follow, &user_url, false)?;
+ let user = blocking(&context.pool(), move |conn| {
+ User_::read_from_actor_id(&conn, user_url.as_str())
+ })
+ .await??;
let community_follower_form = CommunityFollowerForm {
community_id: community.id,
user_id: user.id,
Ok(HttpResponse::Ok().finish())
}
+async fn handle_undo(
+ context: &LemmyContext,
+ activity: CommunityAcceptedActivities,
+ actor_url: Url,
+ to_community: &Community,
+ request_counter: &mut i32,
+) -> Result<bool, LemmyError> {
+ let inner_kind = activity
+ .object()
+ .is_single_kind(&FollowType::Follow.to_string());
+ let any_base = activity.into_any_base()?;
+ if inner_kind {
+ handle_undo_follow(any_base, actor_url, to_community, &context).await?;
+ Ok(false)
+ } else {
+ receive_undo_for_community(context, any_base, &actor_url, request_counter).await?;
+ Ok(true)
+ }
+}
+
/// Handle `Undo/Follow` from a user, removing the user from followers list.
async fn handle_undo_follow(
activity: AnyBase,
- user: User_,
- community: Community,
+ user_url: Url,
+ community: &Community,
context: &LemmyContext,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let undo = Undo::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&undo, user.actor_id()?, true)?;
+ verify_activity_domains_valid(&undo, &user_url, true)?;
let object = undo.object().to_owned().one().context(location_info!())?;
let follow = Follow::from_any_base(object)?.context(location_info!())?;
- verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
+ verify_activity_domains_valid(&follow, &user_url, false)?;
+ let user = blocking(&context.pool(), move |conn| {
+ User_::read_from_actor_id(&conn, user_url.as_str())
+ })
+ .await??;
let community_follower_form = CommunityFollowerForm {
community_id: community.id,
user_id: user.id,
})
.await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
-use activitystreams::base::{BaseExt, Extends};
-use anyhow::Context;
+use crate::{
+ check_is_apub_id_valid,
+ extensions::signatures::verify_signature,
+ fetcher::get_or_fetch_and_upsert_actor,
+ ActorType,
+};
+use activitystreams::{
+ activity::ActorAndObjectRefExt,
+ base::{AsBase, BaseExt, Extends},
+ object::{AsObject, ObjectExt},
+ public,
+};
+use actix_web::HttpRequest;
+use anyhow::{anyhow, Context};
use lemmy_db::{activity::Activity, DbPool};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::LemmyContext;
use serde::{export::fmt::Debug, Serialize};
use url::Url;
pub mod community_inbox;
+mod receive_for_community;
pub mod shared_inbox;
pub mod user_inbox;
Err(_) => Ok(false),
}
}
+
+pub(crate) fn get_activity_to_and_cc<T, Kind>(activity: &T) -> Result<Vec<Url>, LemmyError>
+where
+ T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
+{
+ let mut to_and_cc = vec![];
+ if let Some(to) = activity.to() {
+ let to = to.to_owned().unwrap_to_vec();
+ let mut to = to
+ .iter()
+ .map(|t| t.as_xsd_any_uri())
+ .flatten()
+ .map(|t| t.to_owned())
+ .collect();
+ to_and_cc.append(&mut to);
+ }
+ if let Some(cc) = activity.cc() {
+ let cc = cc.to_owned().unwrap_to_vec();
+ let mut cc = cc
+ .iter()
+ .map(|c| c.as_xsd_any_uri())
+ .flatten()
+ .map(|c| c.to_owned())
+ .collect();
+ to_and_cc.append(&mut cc);
+ }
+ Ok(to_and_cc)
+}
+
+pub(crate) fn is_addressed_to_public<T, Kind>(activity: &T) -> Result<(), LemmyError>
+where
+ T: AsBase<Kind> + AsObject<Kind> + ActorAndObjectRefExt,
+{
+ let to_and_cc = get_activity_to_and_cc(activity)?;
+ if to_and_cc.contains(&public()) {
+ Ok(())
+ } else {
+ Err(anyhow!("Activity is not addressed to public").into())
+ }
+}
+
+pub(crate) async fn inbox_verify_http_signature<T, Kind>(
+ activity: &T,
+ context: &LemmyContext,
+ request: HttpRequest,
+ request_counter: &mut i32,
+) -> Result<Box<dyn ActorType>, LemmyError>
+where
+ T: AsObject<Kind> + ActorAndObjectRefExt + Extends<Kind> + AsBase<Kind>,
+ Kind: Serialize,
+ <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+ let actor_id = activity
+ .actor()?
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ check_is_apub_id_valid(&actor_id)?;
+ let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
+ verify_signature(&request, actor.as_ref())?;
+ Ok(actor)
+}
--- /dev/null
+use crate::{
+ activities::receive::{
+ comment::{
+ receive_create_comment,
+ receive_delete_comment,
+ receive_dislike_comment,
+ receive_like_comment,
+ receive_remove_comment,
+ receive_update_comment,
+ },
+ comment_undo::{
+ receive_undo_delete_comment,
+ receive_undo_dislike_comment,
+ receive_undo_like_comment,
+ receive_undo_remove_comment,
+ },
+ post::{
+ receive_create_post,
+ receive_delete_post,
+ receive_dislike_post,
+ receive_like_post,
+ receive_remove_post,
+ receive_update_post,
+ },
+ post_undo::{
+ receive_undo_delete_post,
+ receive_undo_dislike_post,
+ receive_undo_like_post,
+ receive_undo_remove_post,
+ },
+ receive_unhandled_activity,
+ verify_activity_domains_valid,
+ },
+ inbox::is_addressed_to_public,
+};
+use activitystreams::{
+ activity::{Create, Delete, Dislike, Like, Remove, Undo, Update},
+ base::AnyBase,
+ prelude::*,
+};
+use anyhow::Context;
+use diesel::result::Error::NotFound;
+use lemmy_db::{comment::Comment, post::Post, site::Site, Crud};
+use lemmy_structs::blocking;
+use lemmy_utils::{location_info, LemmyError};
+use lemmy_websocket::LemmyContext;
+use url::Url;
+
+/// This file is for post/comment activities received by the community, and for post/comment
+/// activities announced by the community and received by the user.
+
+/// A post or comment being created
+pub(in crate::inbox) async fn receive_create_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let create = Create::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&create, &expected_domain, true)?;
+ is_addressed_to_public(&create)?;
+
+ match create.object().as_single_kind_str() {
+ Some("Page") => receive_create_post(create, context, request_counter).await,
+ Some("Note") => receive_create_comment(create, context, request_counter).await,
+ _ => receive_unhandled_activity(create),
+ }
+}
+
+/// A post or comment being edited
+pub(in crate::inbox) async fn receive_update_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let update = Update::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&update, &expected_domain, true)?;
+ is_addressed_to_public(&update)?;
+
+ match update.object().as_single_kind_str() {
+ Some("Page") => receive_update_post(update, context, request_counter).await,
+ Some("Note") => receive_update_comment(update, context, request_counter).await,
+ _ => receive_unhandled_activity(update),
+ }
+}
+
+/// A post or comment being upvoted
+pub(in crate::inbox) async fn receive_like_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let like = Like::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&like, &expected_domain, false)?;
+ is_addressed_to_public(&like)?;
+
+ match like.object().as_single_kind_str() {
+ Some("Page") => receive_like_post(like, context, request_counter).await,
+ Some("Note") => receive_like_comment(like, context, request_counter).await,
+ _ => receive_unhandled_activity(like),
+ }
+}
+
+/// A post or comment being downvoted
+pub(in crate::inbox) async fn receive_dislike_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let enable_downvotes = blocking(context.pool(), move |conn| {
+ Site::read(conn, 1).map(|s| s.enable_downvotes)
+ })
+ .await??;
+ if !enable_downvotes {
+ return Ok(());
+ }
+
+ let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&dislike, &expected_domain, false)?;
+ is_addressed_to_public(&dislike)?;
+
+ match dislike.object().as_single_kind_str() {
+ Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
+ Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
+ _ => receive_unhandled_activity(dislike),
+ }
+}
+
+/// A post or comment being deleted by its creator
+pub(in crate::inbox) async fn receive_delete_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ dbg!("receive_delete_for_community");
+ let delete = Delete::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&delete, &expected_domain, true)?;
+ is_addressed_to_public(&delete)?;
+
+ let object = delete
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+
+ match find_post_or_comment_by_id(context, object).await {
+ Ok(PostOrComment::Post(p)) => receive_delete_post(context, p).await,
+ Ok(PostOrComment::Comment(c)) => receive_delete_comment(context, c).await,
+ // if we dont have the object, no need to do anything
+ Err(_) => Ok(()),
+ }
+}
+
+/// A post or comment being removed by a mod/admin
+pub(in crate::inbox) async fn receive_remove_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ dbg!("receive_remove_for_community");
+ let remove = Remove::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&remove, &expected_domain, false)?;
+ is_addressed_to_public(&remove)?;
+
+ let cc = remove
+ .cc()
+ .map(|c| c.as_many())
+ .flatten()
+ .context(location_info!())?;
+ let community_id = cc
+ .first()
+ .map(|c| c.as_xsd_any_uri())
+ .flatten()
+ .context(location_info!())?;
+
+ let object = remove
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+
+ // Ensure that remove activity comes from the same domain as the community
+ remove.id(community_id.domain().context(location_info!())?)?;
+
+ match find_post_or_comment_by_id(context, object).await {
+ Ok(PostOrComment::Post(p)) => receive_remove_post(context, remove, p).await,
+ Ok(PostOrComment::Comment(c)) => receive_remove_comment(context, remove, c).await,
+ // if we dont have the object, no need to do anything
+ Err(_) => Ok(()),
+ }
+}
+
+/// A post/comment action being reverted (either a delete, remove, upvote or downvote)
+pub(in crate::inbox) async fn receive_undo_for_community(
+ context: &LemmyContext,
+ activity: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let undo = Undo::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&undo, &expected_domain.to_owned(), true)?;
+ is_addressed_to_public(&undo)?;
+
+ match undo.object().as_single_kind_str() {
+ Some("Delete") => receive_undo_delete_for_community(context, undo, expected_domain).await,
+ Some("Remove") => receive_undo_remove_for_community(context, undo, expected_domain).await,
+ Some("Like") => {
+ receive_undo_like_for_community(context, undo, expected_domain, request_counter).await
+ }
+ Some("Dislike") => {
+ receive_undo_dislike_for_community(context, undo, expected_domain, request_counter).await
+ }
+ _ => receive_unhandled_activity(undo),
+ }
+}
+
+/// A post or comment deletion being reverted
+pub(in crate::inbox) async fn receive_undo_delete_for_community(
+ context: &LemmyContext,
+ undo: Undo,
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+ .context(location_info!())?;
+ verify_activity_domains_valid(&delete, &expected_domain, true)?;
+ is_addressed_to_public(&delete)?;
+
+ let object = delete
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ match find_post_or_comment_by_id(context, object).await {
+ Ok(PostOrComment::Post(p)) => receive_undo_delete_post(context, p).await,
+ Ok(PostOrComment::Comment(c)) => receive_undo_delete_comment(context, c).await,
+ // if we dont have the object, no need to do anything
+ Err(_) => Ok(()),
+ }
+}
+
+/// A post or comment removal being reverted
+pub(in crate::inbox) async fn receive_undo_remove_for_community(
+ context: &LemmyContext,
+ undo: Undo,
+ expected_domain: &Url,
+) -> Result<(), LemmyError> {
+ let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+ .context(location_info!())?;
+ verify_activity_domains_valid(&remove, &expected_domain, false)?;
+ is_addressed_to_public(&remove)?;
+
+ let object = remove
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ match find_post_or_comment_by_id(context, object).await {
+ Ok(PostOrComment::Post(p)) => receive_undo_remove_post(context, p).await,
+ Ok(PostOrComment::Comment(c)) => receive_undo_remove_comment(context, c).await,
+ // if we dont have the object, no need to do anything
+ Err(_) => Ok(()),
+ }
+}
+
+/// A post or comment upvote being reverted
+pub(in crate::inbox) async fn receive_undo_like_for_community(
+ context: &LemmyContext,
+ undo: Undo,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+ .context(location_info!())?;
+ verify_activity_domains_valid(&like, &expected_domain, false)?;
+ is_addressed_to_public(&like)?;
+
+ let type_ = like
+ .object()
+ .as_single_kind_str()
+ .context(location_info!())?;
+ match type_ {
+ "Note" => receive_undo_like_comment(&like, context, request_counter).await,
+ "Page" => receive_undo_like_post(&like, context, request_counter).await,
+ _ => receive_unhandled_activity(like),
+ }
+}
+
+/// A post or comment downvote being reverted
+pub(in crate::inbox) async fn receive_undo_dislike_for_community(
+ context: &LemmyContext,
+ undo: Undo,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
+ .context(location_info!())?;
+ verify_activity_domains_valid(&dislike, &expected_domain, false)?;
+ is_addressed_to_public(&dislike)?;
+
+ let type_ = dislike
+ .object()
+ .as_single_kind_str()
+ .context(location_info!())?;
+ match type_ {
+ "Note" => receive_undo_dislike_comment(&dislike, context, request_counter).await,
+ "Page" => receive_undo_dislike_post(&dislike, context, request_counter).await,
+ _ => receive_unhandled_activity(dislike),
+ }
+}
+
+enum PostOrComment {
+ Comment(Comment),
+ Post(Post),
+}
+
+/// Tries to find a post or comment in the local database, without any network requests.
+/// This is used to handle deletions and removals, because in case we dont have the object, we can
+/// simply ignore the activity.
+async fn find_post_or_comment_by_id(
+ context: &LemmyContext,
+ apub_id: Url,
+) -> Result<PostOrComment, LemmyError> {
+ let ap_id = apub_id.to_string();
+ let post = blocking(context.pool(), move |conn| {
+ Post::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(p) = post {
+ return Ok(PostOrComment::Post(p));
+ }
+
+ let ap_id = apub_id.to_string();
+ let comment = blocking(context.pool(), move |conn| {
+ Comment::read_from_apub_id(conn, &ap_id)
+ })
+ .await?;
+ if let Ok(c) = comment {
+ return Ok(PostOrComment::Comment(c));
+ }
+
+ return Err(NotFound.into());
+}
use crate::{
- activities::receive::{
- comment::{
- receive_create_comment,
- receive_delete_comment,
- receive_dislike_comment,
- receive_like_comment,
- receive_remove_comment,
- receive_update_comment,
- },
- comment_undo::{
- receive_undo_delete_comment,
- receive_undo_dislike_comment,
- receive_undo_like_comment,
- receive_undo_remove_comment,
- },
- community::{
- receive_delete_community,
- receive_remove_community,
- receive_undo_delete_community,
- receive_undo_remove_community,
- },
- find_by_id,
- post::{
- receive_create_post,
- receive_delete_post,
- receive_dislike_post,
- receive_like_post,
- receive_remove_post,
- receive_update_post,
- },
- post_undo::{
- receive_undo_delete_post,
- receive_undo_dislike_post,
- receive_undo_like_post,
- receive_undo_remove_post,
- },
- receive_unhandled_activity,
- verify_activity_domains_valid,
- FindResults,
+ inbox::{
+ community_inbox::{community_receive_message, CommunityAcceptedActivities},
+ get_activity_id,
+ get_activity_to_and_cc,
+ inbox_verify_http_signature,
+ is_activity_already_known,
+ user_inbox::{user_receive_message, UserAcceptedActivities},
},
- check_is_apub_id_valid,
- extensions::signatures::verify_signature,
- fetcher::get_or_fetch_and_upsert_actor,
- inbox::{get_activity_id, is_activity_already_known},
insert_activity,
- ActorType,
-};
-use activitystreams::{
- activity::{ActorAndObject, Announce, Create, Delete, Dislike, Like, Remove, Undo, Update},
- base::AnyBase,
- prelude::*,
};
+use activitystreams::{activity::ActorAndObject, prelude::*};
use actix_web::{web, HttpRequest, HttpResponse};
-use anyhow::{anyhow, Context};
-use lemmy_db::{site::Site, Crud};
+use anyhow::Context;
+use lemmy_db::{community::Community, user::User_, DbPool};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
-use log::debug;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use url::Url;
}
// TODO: this isnt entirely correct, cause some of these receive are not ActorAndObject,
-// but it might still work due to the anybase conversion
+// but it still works due to the anybase conversion
pub type AcceptedActivities = ActorAndObject<ValidTypes>;
/// Handler for all incoming requests to shared inbox.
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError> {
let activity = input.into_inner();
-
- let actor_id = activity
- .actor()?
- .to_owned()
- .single_xsd_any_uri()
- .context(location_info!())?;
- debug!(
- "Shared inbox received activity {:?} from {}",
- &activity.id_unchecked(),
- &actor_id
- );
-
- check_is_apub_id_valid(&actor_id)?;
-
+ // First of all check the http signature
let request_counter = &mut 0;
- let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
- verify_signature(&request, actor.as_ref())?;
+ let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
+ // Do nothing if we received the same activity before
+ let actor_id = actor.actor_id()?;
let activity_id = get_activity_id(&activity, &actor_id)?;
if is_activity_already_known(context.pool(), &activity_id).await? {
return Ok(HttpResponse::Ok().finish());
}
- let any_base = activity.clone().into_any_base()?;
- let kind = activity.kind().context(location_info!())?;
- let res = match kind {
- ValidTypes::Announce => {
- receive_announce(&context, any_base, actor.as_ref(), request_counter).await
- }
- ValidTypes::Create => receive_create(&context, any_base, actor_id, request_counter).await,
- ValidTypes::Update => receive_update(&context, any_base, actor_id, request_counter).await,
- ValidTypes::Like => receive_like(&context, any_base, actor_id, request_counter).await,
- ValidTypes::Dislike => receive_dislike(&context, any_base, actor_id, request_counter).await,
- ValidTypes::Remove => receive_remove(&context, any_base, actor_id).await,
- ValidTypes::Delete => receive_delete(&context, any_base, actor_id, request_counter).await,
- ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
- };
-
+ // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen
+ // if we receive the same activity twice in very quick succession.
insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
- res
-}
-
-/// Takes an announce and passes the inner activity to the appropriate handler.
-async fn receive_announce(
- context: &LemmyContext,
- activity: AnyBase,
- actor: &dyn ActorType,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let announce = Announce::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
-
- let kind = announce.object().as_single_kind_str();
- let object = announce
- .object()
- .to_owned()
- .one()
- .context(location_info!())?;
-
- let inner_id = object.id().context(location_info!())?.to_owned();
- check_is_apub_id_valid(&inner_id)?;
- if is_activity_already_known(context.pool(), &inner_id).await? {
- return Ok(HttpResponse::Ok().finish());
- }
-
- match kind {
- Some("Create") => receive_create(context, object, inner_id, request_counter).await,
- Some("Update") => receive_update(context, object, inner_id, request_counter).await,
- Some("Like") => receive_like(context, object, inner_id, request_counter).await,
- Some("Dislike") => receive_dislike(context, object, inner_id, request_counter).await,
- Some("Delete") => receive_delete(context, object, inner_id, request_counter).await,
- Some("Remove") => receive_remove(context, object, inner_id).await,
- Some("Undo") => receive_undo(context, object, inner_id, request_counter).await,
- _ => receive_unhandled_activity(announce),
- }
-}
-async fn receive_create(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let create = Create::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&create, expected_domain, true)?;
-
- match create.object().as_single_kind_str() {
- Some("Page") => receive_create_post(create, context, request_counter).await,
- Some("Note") => receive_create_comment(create, context, request_counter).await,
- _ => receive_unhandled_activity(create),
- }
-}
-
-async fn receive_update(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let update = Update::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&update, expected_domain, true)?;
-
- match update.object().as_single_kind_str() {
- Some("Page") => receive_update_post(update, context, request_counter).await,
- Some("Note") => receive_update_comment(update, context, request_counter).await,
- _ => receive_unhandled_activity(update),
+ let activity_any_base = activity.clone().into_any_base()?;
+ let mut res: Option<HttpResponse> = None;
+ let to_and_cc = get_activity_to_and_cc(&activity)?;
+ // If to_and_cc contains a local community, pass to receive_community_message()
+ // Handle community first, so in case the sender is banned by the community, it will error out.
+ // If we handled the user receive first, the activity would be inserted to the database before the
+ // community could check for bans.
+ let community = extract_local_community_from_destinations(&to_and_cc, context.pool()).await?;
+ if let Some(community) = community {
+ let community_activity = CommunityAcceptedActivities::from_any_base(activity_any_base.clone())?
+ .context(location_info!())?;
+ res = Some(
+ community_receive_message(
+ community_activity,
+ community,
+ actor.as_ref(),
+ &context,
+ request_counter,
+ )
+ .await?,
+ );
}
-}
-async fn receive_like(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let like = Like::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&like, expected_domain, false)?;
-
- match like.object().as_single_kind_str() {
- Some("Page") => receive_like_post(like, context, request_counter).await,
- Some("Note") => receive_like_comment(like, context, request_counter).await,
- _ => receive_unhandled_activity(like),
+ // If to_and_cc contains a local user, pass to receive_user_message()
+ if is_addressed_to_local_user(&to_and_cc, context.pool()).await? {
+ let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())?
+ .context(location_info!())?;
+ // `to_user` is only used for follow activities (which we dont receive here), so no need to pass
+ // it in
+ user_receive_message(
+ user_activity,
+ None,
+ actor.as_ref(),
+ &context,
+ request_counter,
+ )
+ .await?;
}
-}
-async fn receive_dislike(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let enable_downvotes = blocking(context.pool(), move |conn| {
- Site::read(conn, 1).map(|s| s.enable_downvotes)
- })
- .await??;
- if !enable_downvotes {
- return Ok(HttpResponse::Ok().finish());
+ // If to_and_cc contains followers collection of a community, pass to receive_user_message()
+ if is_addressed_to_community_followers(&to_and_cc, context.pool()).await? {
+ let user_activity = UserAcceptedActivities::from_any_base(activity_any_base.clone())?
+ .context(location_info!())?;
+ res = Some(
+ user_receive_message(
+ user_activity,
+ None,
+ actor.as_ref(),
+ &context,
+ request_counter,
+ )
+ .await?,
+ );
}
- let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&dislike, expected_domain, false)?;
-
- match dislike.object().as_single_kind_str() {
- Some("Page") => receive_dislike_post(dislike, context, request_counter).await,
- Some("Note") => receive_dislike_comment(dislike, context, request_counter).await,
- _ => receive_unhandled_activity(dislike),
+ // If none of those, throw an error
+ if let Some(r) = res {
+ Ok(r)
+ } else {
+ Ok(HttpResponse::NotImplemented().finish())
}
}
-pub async fn receive_delete(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let delete = Delete::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&delete, expected_domain, true)?;
-
- let object = delete
- .object()
- .to_owned()
- .single_xsd_any_uri()
- .context(location_info!())?;
-
- match find_by_id(context, object).await {
- Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p, request_counter).await,
- Ok(FindResults::Comment(c)) => {
- receive_delete_comment(context, delete, c, request_counter).await
- }
- Ok(FindResults::Community(c)) => {
- receive_delete_community(context, delete, c, request_counter).await
+/// If `to_and_cc` contains the ID of a local community, return that community, otherwise return
+/// None.
+///
+/// This doesnt handle the case where an activity is addressed to multiple communities (because
+/// Lemmy doesnt generate such activities).
+async fn extract_local_community_from_destinations(
+ to_and_cc: &[Url],
+ pool: &DbPool,
+) -> Result<Option<Community>, LemmyError> {
+ for url in to_and_cc {
+ let url = url.to_string();
+ let community = blocking(&pool, move |conn| {
+ Community::read_from_actor_id(&conn, &url)
+ })
+ .await?;
+ if let Ok(c) = community {
+ if c.local {
+ return Ok(Some(c));
+ }
}
- // if we dont have the object, no need to do anything
- Err(_) => Ok(HttpResponse::Ok().finish()),
- }
-}
-
-async fn receive_remove(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
-) -> Result<HttpResponse, LemmyError> {
- let remove = Remove::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&remove, expected_domain, false)?;
-
- let cc = remove
- .cc()
- .map(|c| c.as_many())
- .flatten()
- .context(location_info!())?;
- let community_id = cc
- .first()
- .map(|c| c.as_xsd_any_uri())
- .flatten()
- .context(location_info!())?;
-
- let object = remove
- .object()
- .to_owned()
- .single_xsd_any_uri()
- .context(location_info!())?;
-
- // Ensure that remove activity comes from the same domain as the community
- remove.id(community_id.domain().context(location_info!())?)?;
-
- match find_by_id(context, object).await {
- Ok(FindResults::Post(p)) => receive_remove_post(context, remove, p).await,
- Ok(FindResults::Comment(c)) => receive_remove_comment(context, remove, c).await,
- Ok(FindResults::Community(c)) => receive_remove_community(context, remove, c).await,
- // if we dont have the object, no need to do anything
- Err(_) => Ok(HttpResponse::Ok().finish()),
- }
-}
-
-async fn receive_undo(
- context: &LemmyContext,
- activity: AnyBase,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let undo = Undo::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&undo, expected_domain.to_owned(), true)?;
-
- match undo.object().as_single_kind_str() {
- Some("Delete") => receive_undo_delete(context, undo, expected_domain, request_counter).await,
- Some("Remove") => receive_undo_remove(context, undo, expected_domain, request_counter).await,
- Some("Like") => receive_undo_like(context, undo, expected_domain, request_counter).await,
- Some("Dislike") => receive_undo_dislike(context, undo, expected_domain, request_counter).await,
- _ => receive_unhandled_activity(undo),
}
+ Ok(None)
}
-async fn receive_undo_delete(
- context: &LemmyContext,
- undo: Undo,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let delete = Delete::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
- .context(location_info!())?;
- verify_activity_domains_valid(&delete, expected_domain, true)?;
-
- let object = delete
- .object()
- .to_owned()
- .single_xsd_any_uri()
- .context(location_info!())?;
- match find_by_id(context, object).await {
- Ok(FindResults::Post(p)) => receive_undo_delete_post(context, undo, p, request_counter).await,
- Ok(FindResults::Comment(c)) => {
- receive_undo_delete_comment(context, undo, c, request_counter).await
- }
- Ok(FindResults::Community(c)) => {
- receive_undo_delete_community(context, undo, c, request_counter).await
+/// Returns true if `to_and_cc` contains at least one local user.
+async fn is_addressed_to_local_user(to_and_cc: &[Url], pool: &DbPool) -> Result<bool, LemmyError> {
+ for url in to_and_cc {
+ let url = url.to_string();
+ let user = blocking(&pool, move |conn| User_::read_from_actor_id(&conn, &url)).await?;
+ if let Ok(u) = user {
+ if u.local {
+ return Ok(true);
+ }
}
- // if we dont have the object, no need to do anything
- Err(_) => Ok(HttpResponse::Ok().finish()),
}
+ Ok(false)
}
-async fn receive_undo_remove(
- context: &LemmyContext,
- undo: Undo,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let remove = Remove::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
- .context(location_info!())?;
- verify_activity_domains_valid(&remove, expected_domain, false)?;
-
- let object = remove
- .object()
- .to_owned()
- .single_xsd_any_uri()
- .context(location_info!())?;
- match find_by_id(context, object).await {
- Ok(FindResults::Post(p)) => receive_undo_remove_post(context, undo, p, request_counter).await,
- Ok(FindResults::Comment(c)) => {
- receive_undo_remove_comment(context, undo, c, request_counter).await
+/// Returns true if `to_and_cc` contains at least one followers collection of a remote community
+/// (like `https://example.com/c/main/followers`)
+async fn is_addressed_to_community_followers(
+ to_and_cc: &[Url],
+ pool: &DbPool,
+) -> Result<bool, LemmyError> {
+ for url in to_and_cc {
+ let url = url.to_string();
+ // TODO: extremely hacky, we should just store the followers url for each community in the db
+ if url.ends_with("/followers") {
+ let community_url = url.replace("/followers", "");
+ let community = blocking(&pool, move |conn| {
+ Community::read_from_actor_id(&conn, &community_url)
+ })
+ .await??;
+ if !community.local {
+ return Ok(true);
+ }
}
- Ok(FindResults::Community(c)) => {
- receive_undo_remove_community(context, undo, c, request_counter).await
- }
- // if we dont have the object, no need to do anything
- Err(_) => Ok(HttpResponse::Ok().finish()),
- }
-}
-
-async fn receive_undo_like(
- context: &LemmyContext,
- undo: Undo,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let like = Like::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
- .context(location_info!())?;
- verify_activity_domains_valid(&like, expected_domain, false)?;
-
- let type_ = like
- .object()
- .as_single_kind_str()
- .context(location_info!())?;
- match type_ {
- "Note" => receive_undo_like_comment(undo, &like, context, request_counter).await,
- "Page" => receive_undo_like_post(undo, &like, context, request_counter).await,
- d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
- }
-}
-
-async fn receive_undo_dislike(
- context: &LemmyContext,
- undo: Undo,
- expected_domain: Url,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let dislike = Dislike::from_any_base(undo.object().to_owned().one().context(location_info!())?)?
- .context(location_info!())?;
- verify_activity_domains_valid(&dislike, expected_domain, false)?;
-
- let type_ = dislike
- .object()
- .as_single_kind_str()
- .context(location_info!())?;
- match type_ {
- "Note" => receive_undo_dislike_comment(undo, &dislike, context, request_counter).await,
- "Page" => receive_undo_dislike_post(undo, &dislike, context, request_counter).await,
- d => Err(anyhow!("Undo Delete type {} not supported", d).into()),
}
+ Ok(false)
}
use crate::{
- activities::receive::verify_activity_domains_valid,
+ activities::receive::{
+ community::{
+ receive_delete_community,
+ receive_remove_community,
+ receive_undo_delete_community,
+ receive_undo_remove_community,
+ },
+ private_message::{
+ receive_create_private_message,
+ receive_delete_private_message,
+ receive_undo_delete_private_message,
+ receive_update_private_message,
+ },
+ receive_unhandled_activity,
+ verify_activity_domains_valid,
+ },
check_is_apub_id_valid,
- extensions::signatures::verify_signature,
- fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
- inbox::{get_activity_id, is_activity_already_known},
+ fetcher::get_or_fetch_and_upsert_community,
+ inbox::{
+ get_activity_id,
+ get_activity_to_and_cc,
+ inbox_verify_http_signature,
+ is_activity_already_known,
+ is_addressed_to_public,
+ receive_for_community::{
+ receive_create_for_community,
+ receive_delete_for_community,
+ receive_dislike_for_community,
+ receive_like_for_community,
+ receive_remove_for_community,
+ receive_undo_for_community,
+ receive_update_for_community,
+ },
+ },
insert_activity,
ActorType,
- FromApub,
};
use activitystreams::{
- activity::{Accept, ActorAndObject, Create, Delete, Follow, Undo, Update},
+ activity::{Accept, ActorAndObject, Announce, Delete, Follow, Undo},
base::AnyBase,
- object::Note,
prelude::*,
};
use actix_web::{web, HttpRequest, HttpResponse};
use anyhow::{anyhow, Context};
+use diesel::NotFound;
use lemmy_db::{
- community::{CommunityFollower, CommunityFollowerForm},
- private_message::{PrivateMessage, PrivateMessageForm},
- private_message_view::PrivateMessageView,
+ community::{Community, CommunityFollower, CommunityFollowerForm},
+ private_message::PrivateMessage,
user::User_,
- Crud,
Followable,
};
-use lemmy_structs::{blocking, user::PrivateMessageResponse};
+use lemmy_structs::blocking;
use lemmy_utils::{location_info, LemmyError};
-use lemmy_websocket::{messages::SendUserRoomMessage, LemmyContext, UserOperation};
+use lemmy_websocket::LemmyContext;
use log::debug;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
+use url::Url;
/// Allowed activities for user inbox.
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
-pub enum ValidTypes {
- Accept,
- Create,
- Update,
- Delete,
- Undo,
+pub enum UserValidTypes {
+ Accept, // community accepted our follow request
+ Create, // create private message
+ Update, // edit private message
+ Delete, // private message or community deleted by creator
+ Undo, // private message or community restored
+ Remove, // community removed by admin
+ Announce, // post, comment or vote in community
}
-pub type AcceptedActivities = ActorAndObject<ValidTypes>;
+pub type UserAcceptedActivities = ActorAndObject<UserValidTypes>;
/// Handler for all incoming activities to user inboxes.
pub async fn user_inbox(
request: HttpRequest,
- input: web::Json<AcceptedActivities>,
+ input: web::Json<UserAcceptedActivities>,
path: web::Path<String>,
context: web::Data<LemmyContext>,
) -> Result<HttpResponse, LemmyError> {
let activity = input.into_inner();
+ // First of all check the http signature
+ let request_counter = &mut 0;
+ let actor = inbox_verify_http_signature(&activity, &context, request, request_counter).await?;
+
+ // Do nothing if we received the same activity before
+ let activity_id = get_activity_id(&activity, &actor.actor_id()?)?;
+ if is_activity_already_known(context.pool(), &activity_id).await? {
+ return Ok(HttpResponse::Ok().finish());
+ }
+
+ // Check if the activity is actually meant for us
let username = path.into_inner();
let user = blocking(&context.pool(), move |conn| {
User_::read_from_name(&conn, &username)
})
.await??;
-
- let to = activity
- .to()
- .context(location_info!())?
- .to_owned()
- .single_xsd_any_uri();
- if Some(user.actor_id()?) != to {
+ let to_and_cc = get_activity_to_and_cc(&activity)?;
+ if !to_and_cc.contains(&&user.actor_id()?) {
return Err(anyhow!("Activity delivered to wrong user").into());
}
- let actor_uri = activity
- .actor()?
- .as_single_xsd_any_uri()
- .context(location_info!())?;
+ insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
+
debug!(
- "User {} inbox received activity {:?} from {}",
+ "User {} received activity {:?} from {}",
user.name,
&activity.id_unchecked(),
- &actor_uri
+ &actor.actor_id_str()
);
- check_is_apub_id_valid(actor_uri)?;
+ user_receive_message(
+ activity.clone(),
+ Some(user.clone()),
+ actor.as_ref(),
+ &context,
+ request_counter,
+ )
+ .await
+}
- let request_counter = &mut 0;
- let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
- verify_signature(&request, actor.as_ref())?;
+/// Receives Accept/Follow, Announce, private messages and community (undo) remove, (undo) delete
+pub(crate) async fn user_receive_message(
+ activity: UserAcceptedActivities,
+ to_user: Option<User_>,
+ actor: &dyn ActorType,
+ context: &LemmyContext,
+ request_counter: &mut i32,
+) -> Result<HttpResponse, LemmyError> {
+ // TODO: must be addressed to one or more local users, or to followers of a remote community
- let activity_id = get_activity_id(&activity, actor_uri)?;
- if is_activity_already_known(context.pool(), &activity_id).await? {
- return Ok(HttpResponse::Ok().finish());
- }
+ // TODO: if it is addressed to community followers, check that at least one local user is following it
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
- let res = match kind {
- ValidTypes::Accept => {
- receive_accept(&context, any_base, actor.as_ref(), user, request_counter).await
+ let actor_url = actor.actor_id()?;
+ match kind {
+ UserValidTypes::Accept => {
+ receive_accept(&context, any_base, actor, to_user.unwrap(), request_counter).await?;
}
- ValidTypes::Create => {
- receive_create_private_message(&context, any_base, actor.as_ref(), request_counter).await
+ UserValidTypes::Announce => {
+ receive_announce(&context, any_base, actor, request_counter).await?
}
- ValidTypes::Update => {
- receive_update_private_message(&context, any_base, actor.as_ref(), request_counter).await
+ UserValidTypes::Create => {
+ receive_create_private_message(&context, any_base, actor_url, request_counter).await?
}
- ValidTypes::Delete => receive_delete_private_message(&context, any_base, actor.as_ref()).await,
- ValidTypes::Undo => {
- receive_undo_delete_private_message(&context, any_base, actor.as_ref()).await
+ UserValidTypes::Update => {
+ receive_update_private_message(&context, any_base, actor_url, request_counter).await?
}
+ UserValidTypes::Delete => {
+ receive_delete(context, any_base, &actor_url, request_counter).await?
+ }
+ UserValidTypes::Undo => receive_undo(context, any_base, &actor_url, request_counter).await?,
+ UserValidTypes::Remove => receive_remove_community(&context, any_base, &actor_url).await?,
};
- insert_activity(&activity_id, activity.clone(), false, true, context.pool()).await?;
- res
+ // TODO: would be logical to move websocket notification code here
+
+ Ok(HttpResponse::Ok().finish())
}
/// Handle accepted follows.
actor: &dyn ActorType,
user: User_,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
+) -> Result<(), LemmyError> {
let accept = Accept::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&accept, actor.actor_id()?, false)?;
+ verify_activity_domains_valid(&accept, &actor.actor_id()?, false)?;
// TODO: we should check that we actually sent this activity, because the remote instance
// could just put a fake Follow
let object = accept.object().to_owned().one().context(location_info!())?;
let follow = Follow::from_any_base(object)?.context(location_info!())?;
- verify_activity_domains_valid(&follow, user.actor_id()?, false)?;
+ verify_activity_domains_valid(&follow, &user.actor_id()?, false)?;
let community_uri = accept
.actor()?
})
.await?;
- Ok(HttpResponse::Ok().finish())
+ Ok(())
}
-async fn receive_create_private_message(
+/// Takes an announce and passes the inner activity to the appropriate handler.
+async fn receive_announce(
context: &LemmyContext,
activity: AnyBase,
actor: &dyn ActorType,
request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let create = Create::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&create, actor.actor_id()?, true)?;
-
- let note = Note::from_any_base(
- create
- .object()
- .as_one()
- .context(location_info!())?
- .to_owned(),
- )?
- .context(location_info!())?;
-
- let private_message =
- PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?;
-
- let inserted_private_message = blocking(&context.pool(), move |conn| {
- PrivateMessage::create(conn, &private_message)
- })
- .await??;
-
- let message = blocking(&context.pool(), move |conn| {
- PrivateMessageView::read(conn, inserted_private_message.id)
- })
- .await??;
-
- let res = PrivateMessageResponse { message };
-
- let recipient_id = res.message.recipient_id;
-
- context.chat_server().do_send(SendUserRoomMessage {
- op: UserOperation::CreatePrivateMessage,
- response: res,
- recipient_id,
- websocket_id: None,
- });
-
- Ok(HttpResponse::Ok().finish())
-}
-
-async fn receive_update_private_message(
- context: &LemmyContext,
- activity: AnyBase,
- actor: &dyn ActorType,
- request_counter: &mut i32,
-) -> Result<HttpResponse, LemmyError> {
- let update = Update::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&update, actor.actor_id()?, true)?;
+) -> Result<(), LemmyError> {
+ let announce = Announce::from_any_base(activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&announce, &actor.actor_id()?, false)?;
+ is_addressed_to_public(&announce)?;
- let object = update
+ let kind = announce.object().as_single_kind_str();
+ let inner_activity = announce
.object()
- .as_one()
- .context(location_info!())?
- .to_owned();
- let note = Note::from_any_base(object)?.context(location_info!())?;
-
- let private_message_form =
- PrivateMessageForm::from_apub(¬e, context, Some(actor.actor_id()?), request_counter).await?;
-
- let private_message_ap_id = private_message_form
- .ap_id
- .as_ref()
- .context(location_info!())?
- .clone();
- let private_message = blocking(&context.pool(), move |conn| {
- PrivateMessage::read_from_apub_id(conn, &private_message_ap_id)
- })
- .await??;
-
- let private_message_id = private_message.id;
- blocking(&context.pool(), move |conn| {
- PrivateMessage::update(conn, private_message_id, &private_message_form)
- })
- .await??;
-
- let private_message_id = private_message.id;
- let message = blocking(&context.pool(), move |conn| {
- PrivateMessageView::read(conn, private_message_id)
- })
- .await??;
-
- let res = PrivateMessageResponse { message };
-
- let recipient_id = res.message.recipient_id;
+ .to_owned()
+ .one()
+ .context(location_info!())?;
- context.chat_server().do_send(SendUserRoomMessage {
- op: UserOperation::EditPrivateMessage,
- response: res,
- recipient_id,
- websocket_id: None,
- });
+ let inner_id = inner_activity.id().context(location_info!())?.to_owned();
+ check_is_apub_id_valid(&inner_id)?;
+ if is_activity_already_known(context.pool(), &inner_id).await? {
+ return Ok(());
+ }
- Ok(HttpResponse::Ok().finish())
+ dbg!(&kind);
+ match kind {
+ Some("Create") => {
+ receive_create_for_community(context, inner_activity, &inner_id, request_counter).await
+ }
+ Some("Update") => {
+ receive_update_for_community(context, inner_activity, &inner_id, request_counter).await
+ }
+ Some("Like") => {
+ receive_like_for_community(context, inner_activity, &inner_id, request_counter).await
+ }
+ Some("Dislike") => {
+ receive_dislike_for_community(context, inner_activity, &inner_id, request_counter).await
+ }
+ Some("Delete") => receive_delete_for_community(context, inner_activity, &inner_id).await,
+ Some("Remove") => receive_remove_for_community(context, inner_activity, &inner_id).await,
+ Some("Undo") => {
+ receive_undo_for_community(context, inner_activity, &inner_id, request_counter).await
+ }
+ _ => receive_unhandled_activity(inner_activity),
+ }
}
-async fn receive_delete_private_message(
+async fn receive_delete(
context: &LemmyContext,
- activity: AnyBase,
- actor: &dyn ActorType,
-) -> Result<HttpResponse, LemmyError> {
- let delete = Delete::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
+ any_base: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ use CommunityOrPrivateMessage::*;
- let private_message_id = delete
+ let delete = Delete::from_any_base(any_base.clone())?.context(location_info!())?;
+ verify_activity_domains_valid(&delete, expected_domain, true)?;
+ let object_uri = delete
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
- let private_message = blocking(context.pool(), move |conn| {
- PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
- })
- .await??;
- let deleted_private_message = blocking(context.pool(), move |conn| {
- PrivateMessage::update_deleted(conn, private_message.id, true)
- })
- .await??;
-
- let message = blocking(&context.pool(), move |conn| {
- PrivateMessageView::read(&conn, deleted_private_message.id)
- })
- .await??;
-
- let res = PrivateMessageResponse { message };
- let recipient_id = res.message.recipient_id;
- context.chat_server().do_send(SendUserRoomMessage {
- op: UserOperation::EditPrivateMessage,
- response: res,
- recipient_id,
- websocket_id: None,
- });
- Ok(HttpResponse::Ok().finish())
+ match find_community_or_private_message_by_id(context, object_uri).await? {
+ Community(c) => receive_delete_community(context, c).await,
+ PrivateMessage(p) => receive_delete_private_message(context, delete, p, request_counter).await,
+ }
}
-async fn receive_undo_delete_private_message(
+async fn receive_undo(
context: &LemmyContext,
- activity: AnyBase,
- actor: &dyn ActorType,
-) -> Result<HttpResponse, LemmyError> {
- let undo = Undo::from_any_base(activity)?.context(location_info!())?;
- verify_activity_domains_valid(&undo, actor.actor_id()?, true)?;
- let object = undo.object().to_owned().one().context(location_info!())?;
- let delete = Delete::from_any_base(object)?.context(location_info!())?;
- verify_activity_domains_valid(&delete, actor.actor_id()?, true)?;
+ any_base: AnyBase,
+ expected_domain: &Url,
+ request_counter: &mut i32,
+) -> Result<(), LemmyError> {
+ use CommunityOrPrivateMessage::*;
+ let undo = Undo::from_any_base(any_base)?.context(location_info!())?;
+ verify_activity_domains_valid(&undo, expected_domain, true)?;
+
+ let inner_activity = undo.object().to_owned().one().context(location_info!())?;
+ let kind = inner_activity.kind_str();
+ match kind {
+ Some("Delete") => {
+ let delete = Delete::from_any_base(inner_activity)?.context(location_info!())?;
+ verify_activity_domains_valid(&delete, expected_domain, true)?;
+ let object_uri = delete
+ .object()
+ .to_owned()
+ .single_xsd_any_uri()
+ .context(location_info!())?;
+ match find_community_or_private_message_by_id(context, object_uri).await? {
+ Community(c) => receive_undo_delete_community(context, undo, c, expected_domain).await,
+ PrivateMessage(p) => {
+ receive_undo_delete_private_message(context, undo, expected_domain, p, request_counter)
+ .await
+ }
+ }
+ }
+ Some("Remove") => receive_undo_remove_community(context, undo, expected_domain).await,
+ _ => receive_unhandled_activity(undo),
+ }
+}
+enum CommunityOrPrivateMessage {
+ Community(Community),
+ PrivateMessage(PrivateMessage),
+}
- let private_message_id = delete
- .object()
- .to_owned()
- .single_xsd_any_uri()
- .context(location_info!())?;
- let private_message = blocking(context.pool(), move |conn| {
- PrivateMessage::read_from_apub_id(conn, private_message_id.as_str())
- })
- .await??;
- let deleted_private_message = blocking(context.pool(), move |conn| {
- PrivateMessage::update_deleted(conn, private_message.id, false)
+async fn find_community_or_private_message_by_id(
+ context: &LemmyContext,
+ apub_id: Url,
+) -> Result<CommunityOrPrivateMessage, LemmyError> {
+ let ap_id = apub_id.to_string();
+ let community = blocking(context.pool(), move |conn| {
+ Community::read_from_actor_id(conn, &ap_id)
})
- .await??;
+ .await?;
+ if let Ok(c) = community {
+ return Ok(CommunityOrPrivateMessage::Community(c));
+ }
- let message = blocking(&context.pool(), move |conn| {
- PrivateMessageView::read(&conn, deleted_private_message.id)
+ let ap_id = apub_id.to_string();
+ let private_message = blocking(context.pool(), move |conn| {
+ PrivateMessage::read_from_apub_id(conn, &ap_id)
})
- .await??;
-
- let res = PrivateMessageResponse { message };
- let recipient_id = res.message.recipient_id;
- context.chat_server().do_send(SendUserRoomMessage {
- op: UserOperation::EditPrivateMessage,
- response: res,
- recipient_id,
- websocket_id: None,
- });
+ .await?;
+ if let Ok(p) = private_message {
+ return Ok(CommunityOrPrivateMessage::PrivateMessage(p));
+ }
- Ok(HttpResponse::Ok().finish())
+ return Err(NotFound.into());
}