From: Nutomic Date: Thu, 11 Nov 2021 19:49:15 +0000 (+0000) Subject: More federation compat (#1894) X-Git-Url: http://these/git/%22https:/join-lemmy.org/%7B%60%24%7BarchiveTodayUrl%7D/static/git-favicon.png?a=commitdiff_plain;h=1b9414f292f3c48d95bb0cd3bbaf8220607f1f85;p=lemmy.git More federation compat (#1894) * Make HTTP signatures compatible with Pleroma * Send Announce/Page, Announce/Note for Pleroma compatibility * remove unused code --- diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index 6830bd13..11938184 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -26,13 +26,12 @@ pub(crate) trait GetCommunity { } impl AnnounceActivity { - pub async fn send( + fn new( object: AnnouncableActivities, community: &ApubCommunity, - additional_inboxes: Vec, context: &LemmyContext, - ) -> Result<(), LemmyError> { - let announce = AnnounceActivity { + ) -> Result { + Ok(AnnounceActivity { actor: ObjectId::new(community.actor_id()), to: vec![public()], object, @@ -43,11 +42,49 @@ impl AnnounceActivity { &context.settings().get_protocol_and_hostname(), )?, unparsed: Default::default(), - }; + }) + } + + pub async fn send( + object: AnnouncableActivities, + community: &ApubCommunity, + additional_inboxes: Vec, + context: &LemmyContext, + ) -> Result<(), LemmyError> { + let announce = AnnounceActivity::new(object.clone(), community, context)?; let inboxes = community - .get_follower_inboxes(additional_inboxes, context) + .get_follower_inboxes(additional_inboxes.clone(), context) .await?; - send_lemmy_activity(context, &announce, &announce.id, community, inboxes, false).await + send_lemmy_activity( + context, + &announce, + &announce.id, + community, + inboxes.clone(), + false, + ) + .await?; + + // Pleroma (and likely Mastodon) can't handle activities like Announce/Create/Page, so for + // compatibility, we also send Announce/Page and Announce/Note (for new and updated + // posts/comments). + use AnnouncableActivities::*; + let object = match object { + CreateOrUpdatePost(c) => Page(c.object), + CreateOrUpdateComment(c) => Note(c.object), + _ => return Ok(()), + }; + let announce_compat = AnnounceActivity::new(object, community, context)?; + send_lemmy_activity( + context, + &announce_compat, + &announce_compat.id, + community, + inboxes, + false, + ) + .await?; + Ok(()) } } @@ -77,14 +114,7 @@ impl ActivityHandler for AnnounceActivity { if is_activity_already_known(context.pool(), &object_data.id).await? { return Ok(()); } - insert_activity( - &object_data.id, - self.object.clone(), - false, - true, - context.pool(), - ) - .await?; + insert_activity(&object_data.id, &self.object, false, true, context.pool()).await?; self.object.receive(context, request_counter).await } } diff --git a/crates/apub/src/activities/community/mod.rs b/crates/apub/src/activities/community/mod.rs index b63c8b65..1b3e305d 100644 --- a/crates/apub/src/activities/community/mod.rs +++ b/crates/apub/src/activities/community/mod.rs @@ -28,7 +28,7 @@ pub(crate) async fn send_to_community( ) -> Result<(), LemmyError> { // if this is a local community, we need to do an announce from the community instead if community.local { - insert_activity(activity_id, activity.clone(), true, false, context.pool()).await?; + insert_activity(activity_id, &activity, true, false, context.pool()).await?; AnnounceActivity::send(activity, community, additional_inboxes, context).await?; } else { let mut inboxes = additional_inboxes; diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index a6c75376..e115769f 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -173,7 +173,7 @@ async fn send_lemmy_activity( insert_activity( activity_id, - serialised_activity.clone(), + &serialised_activity, true, sensitive, context.pool(), diff --git a/crates/apub/src/activities/post/create_or_update.rs b/crates/apub/src/activities/post/create_or_update.rs index 05a7f38d..3fb6a138 100644 --- a/crates/apub/src/activities/post/create_or_update.rs +++ b/crates/apub/src/activities/post/create_or_update.rs @@ -59,6 +59,7 @@ impl CreateOrUpdatePost { }) .await?? .into(); + let create_or_update = CreateOrUpdatePost::new(post, actor, &community, kind, context).await?; let id = create_or_update.id.clone(); let activity = AnnouncableActivities::CreateOrUpdatePost(create_or_update); diff --git a/crates/apub/src/activity_lists.rs b/crates/apub/src/activity_lists.rs index 98736ae2..00d2d439 100644 --- a/crates/apub/src/activity_lists.rs +++ b/crates/apub/src/activity_lists.rs @@ -1,29 +1,32 @@ use crate::{ activities::community::announce::GetCommunity, objects::community::ApubCommunity, - protocol::activities::{ - community::{ - add_mod::AddMod, - announce::AnnounceActivity, - block_user::BlockUserFromCommunity, - remove_mod::RemoveMod, - report::Report, - undo_block_user::UndoBlockUserFromCommunity, - update::UpdateCommunity, + protocol::{ + activities::{ + community::{ + add_mod::AddMod, + announce::AnnounceActivity, + block_user::BlockUserFromCommunity, + remove_mod::RemoveMod, + report::Report, + undo_block_user::UndoBlockUserFromCommunity, + update::UpdateCommunity, + }, + create_or_update::{comment::CreateOrUpdateComment, post::CreateOrUpdatePost}, + deletion::{delete::Delete, undo_delete::UndoDelete}, + following::{ + accept::AcceptFollowCommunity, + follow::FollowCommunity, + undo_follow::UndoFollowCommunity, + }, + private_message::{ + create_or_update::CreateOrUpdatePrivateMessage, + delete::DeletePrivateMessage, + undo_delete::UndoDeletePrivateMessage, + }, + voting::{undo_vote::UndoVote, vote::Vote}, }, - create_or_update::{comment::CreateOrUpdateComment, post::CreateOrUpdatePost}, - deletion::{delete::Delete, undo_delete::UndoDelete}, - following::{ - accept::AcceptFollowCommunity, - follow::FollowCommunity, - undo_follow::UndoFollowCommunity, - }, - private_message::{ - create_or_update::CreateOrUpdatePrivateMessage, - delete::DeletePrivateMessage, - undo_delete::UndoDeletePrivateMessage, - }, - voting::{undo_vote::UndoVote, vote::Vote}, + objects::{note::Note, page::Page}, }, }; use lemmy_apub_lib::traits::ActivityHandler; @@ -79,6 +82,10 @@ pub enum AnnouncableActivities { UndoBlockUserFromCommunity(UndoBlockUserFromCommunity), AddMod(AddMod), RemoveMod(RemoveMod), + // For compatibility with Pleroma/Mastodon (send only) + Page(Page), + // For compatibility with Pleroma/Mastodon (send only) + Note(Note), } #[async_trait::async_trait(?Send)] @@ -101,6 +108,8 @@ impl GetCommunity for AnnouncableActivities { UndoBlockUserFromCommunity(a) => a.get_community(context, request_counter).await?, AddMod(a) => a.get_community(context, request_counter).await?, RemoveMod(a) => a.get_community(context, request_counter).await?, + Page(_) => unimplemented!(), + Note(_) => unimplemented!(), }; Ok(community) } diff --git a/crates/apub/src/http/community.rs b/crates/apub/src/http/community.rs index 1854da98..330de0b8 100644 --- a/crates/apub/src/http/community.rs +++ b/crates/apub/src/http/community.rs @@ -79,7 +79,7 @@ pub(in crate::http) async fn receive_group_inbox( context: &LemmyContext, ) -> Result { let actor_id = ObjectId::new(activity_data.actor.clone()); - let res = receive_activity(request, activity.clone(), activity_data, context).await; + let res = receive_activity(request, activity.clone(), activity_data, context).await?; if let GroupInboxActivities::AnnouncableActivities(announcable) = activity { let community = announcable.get_community(context, &mut 0).await?; @@ -89,7 +89,7 @@ pub(in crate::http) async fn receive_group_inbox( } } - res + Ok(res) } /// Returns an empty followers collection, only populating the size (for privacy). diff --git a/crates/apub/src/http/mod.rs b/crates/apub/src/http/mod.rs index 0fd8de8b..d35815ab 100644 --- a/crates/apub/src/http/mod.rs +++ b/crates/apub/src/http/mod.rs @@ -109,14 +109,7 @@ where // Log the activity, so we avoid receiving and parsing it twice. Note that this could still happen // if we receive the same activity twice in very quick succession. - insert_activity( - &activity_data.id, - activity.clone(), - false, - true, - context.pool(), - ) - .await?; + insert_activity(&activity_data.id, &activity, false, true, context.pool()).await?; info!("Receiving activity {}", activity_data.id.to_string()); activity diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 75d7a62f..74dfd952 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -179,7 +179,7 @@ pub async fn get_actor_id_from_name( /// persistent. async fn insert_activity( ap_id: &Url, - activity: T, + activity: &T, local: bool, sensitive: bool, pool: &DbPool, @@ -187,9 +187,10 @@ async fn insert_activity( where T: Serialize + std::fmt::Debug + Send + 'static, { + let data = serde_json::to_value(activity)?; let ap_id = ap_id.to_owned().into(); blocking(pool, move |conn| { - Activity::insert(conn, ap_id, &activity, local, sensitive) + Activity::insert(conn, ap_id, data, local, sensitive) }) .await??; Ok(()) diff --git a/crates/apub/src/protocol/objects/note.rs b/crates/apub/src/protocol/objects/note.rs index 6d582b34..784e6f9f 100644 --- a/crates/apub/src/protocol/objects/note.rs +++ b/crates/apub/src/protocol/objects/note.rs @@ -4,9 +4,15 @@ use crate::{ protocol::Source, }; use activitystreams::{object::kind::NoteType, unparsed::Unparsed}; +use anyhow::anyhow; use chrono::{DateTime, FixedOffset}; use lemmy_api_common::blocking; -use lemmy_apub_lib::{object_id::ObjectId, values::MediaTypeHtml}; +use lemmy_apub_lib::{ + data::Data, + object_id::ObjectId, + traits::ActivityHandler, + values::MediaTypeHtml, +}; use lemmy_db_schema::{newtypes::CommentId, source::post::Post, traits::Crud}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; @@ -81,3 +87,15 @@ impl Note { } } } + +// For Pleroma/Mastodon compat. Unimplemented because its only used for sending. +#[async_trait::async_trait(?Send)] +impl ActivityHandler for Note { + type DataType = LemmyContext; + async fn verify(&self, _: &Data, _: &mut i32) -> Result<(), LemmyError> { + Err(anyhow!("Announce/Page can only be sent, not received").into()) + } + async fn receive(self, _: &Data, _: &mut i32) -> Result<(), LemmyError> { + unimplemented!() + } +} diff --git a/crates/apub/src/protocol/objects/page.rs b/crates/apub/src/protocol/objects/page.rs index 89fb1141..dbf52eee 100644 --- a/crates/apub/src/protocol/objects/page.rs +++ b/crates/apub/src/protocol/objects/page.rs @@ -5,7 +5,12 @@ use crate::{ use activitystreams::{object::kind::PageType, unparsed::Unparsed}; use anyhow::anyhow; use chrono::{DateTime, FixedOffset}; -use lemmy_apub_lib::{object_id::ObjectId, values::MediaTypeHtml}; +use lemmy_apub_lib::{ + data::Data, + object_id::ObjectId, + traits::ActivityHandler, + values::MediaTypeHtml, +}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use serde::{Deserialize, Serialize}; @@ -73,3 +78,15 @@ impl Page { } } } + +// For Pleroma/Mastodon compat. Unimplemented because its only used for sending. +#[async_trait::async_trait(?Send)] +impl ActivityHandler for Page { + type DataType = LemmyContext; + async fn verify(&self, _: &Data, _: &mut i32) -> Result<(), LemmyError> { + Err(anyhow!("Announce/Page can only be sent, not received").into()) + } + async fn receive(self, _: &Data, _: &mut i32) -> Result<(), LemmyError> { + unimplemented!() + } +} diff --git a/crates/apub_lib/src/activity_queue.rs b/crates/apub_lib/src/activity_queue.rs index 7efa54ef..fe28d870 100644 --- a/crates/apub_lib/src/activity_queue.rs +++ b/crates/apub_lib/src/activity_queue.rs @@ -1,4 +1,4 @@ -use crate::{signatures::sign_and_send, traits::ActorType, APUB_JSON_CONTENT_TYPE}; +use crate::{signatures::sign_and_send, traits::ActorType}; use anyhow::{anyhow, Context, Error}; use background_jobs::{ create_server, @@ -13,7 +13,7 @@ use lemmy_utils::{location_info, LemmyError}; use log::warn; use reqwest::Client; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, env, fmt::Debug, future::Future, pin::Pin}; +use std::{env, fmt::Debug, future::Future, pin::Pin}; use url::Url; pub async fn send_activity( @@ -64,11 +64,8 @@ impl ActixJob for SendActivityTask { } async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> { - let mut headers = BTreeMap::::new(); - headers.insert("Content-Type".into(), APUB_JSON_CONTENT_TYPE.to_string()); let result = sign_and_send( client, - headers, &task.inbox, task.activity.clone(), &task.actor_id, diff --git a/crates/apub_lib/src/signatures.rs b/crates/apub_lib/src/signatures.rs index ccc72081..3329b681 100644 --- a/crates/apub_lib/src/signatures.rs +++ b/crates/apub_lib/src/signatures.rs @@ -1,3 +1,5 @@ +use crate::APUB_JSON_CONTENT_TYPE; +use activitystreams::chrono::Utc; use actix_web::HttpRequest; use anyhow::anyhow; use http::{header::HeaderName, HeaderMap, HeaderValue}; @@ -13,19 +15,18 @@ use openssl::{ use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; -use std::{collections::BTreeMap, str::FromStr}; +use std::str::FromStr; use url::Url; lazy_static! { static ref CONFIG2: ConfigActix = ConfigActix::new(); - static ref HTTP_SIG_CONFIG: Config = Config::new(); + static ref HTTP_SIG_CONFIG: Config = Config::new().mastodon_compat(); } /// Creates an HTTP post request to `inbox_url`, with the given `client` and `headers`, and /// `activity` as request body. The request is signed with `private_key` and then sent. pub async fn sign_and_send( client: &Client, - headers: BTreeMap, inbox_url: &Url, activity: String, actor_id: &Url, @@ -33,16 +34,24 @@ pub async fn sign_and_send( ) -> Result { let signing_key_id = format!("{}#main-key", actor_id); - let mut header_map = HeaderMap::new(); - for h in headers { - header_map.insert( - HeaderName::from_str(h.0.as_str())?, - HeaderValue::from_str(h.1.as_str())?, - ); + let mut headers = HeaderMap::new(); + let mut host = inbox_url.domain().expect("read inbox domain").to_string(); + if let Some(port) = inbox_url.port() { + host = format!("{}:{}", host, port); } + headers.insert( + HeaderName::from_str("Content-Type")?, + HeaderValue::from_str(APUB_JSON_CONTENT_TYPE)?, + ); + headers.insert(HeaderName::from_str("Host")?, HeaderValue::from_str(&host)?); + headers.insert( + HeaderName::from_str("Date")?, + HeaderValue::from_str(&Utc::now().to_rfc2822())?, + ); + let response = client .post(&inbox_url.to_string()) - .headers(header_map) + .headers(headers) .signature_with_digest( HTTP_SIG_CONFIG.clone(), signing_key_id, diff --git a/crates/db_schema/src/impls/activity.rs b/crates/db_schema/src/impls/activity.rs index 5efb3b23..5ec5d49d 100644 --- a/crates/db_schema/src/impls/activity.rs +++ b/crates/db_schema/src/impls/activity.rs @@ -1,10 +1,7 @@ use crate::{newtypes::DbUrl, source::activity::*, traits::Crud}; use diesel::{dsl::*, result::Error, *}; -use serde::Serialize; -use std::{ - fmt::Debug, - io::{Error as IoError, ErrorKind}, -}; +use serde_json::Value; +use std::io::{Error as IoError, ErrorKind}; impl Crud for Activity { type Form = ActivityForm; @@ -38,19 +35,16 @@ impl Crud for Activity { } impl Activity { - pub fn insert( + pub fn insert( conn: &PgConnection, ap_id: DbUrl, - data: &T, + data: Value, local: bool, sensitive: bool, - ) -> Result - where - T: Serialize + Debug, - { + ) -> Result { let activity_form = ActivityForm { ap_id, - data: serde_json::to_value(&data)?, + data, local: Some(local), sensitive, updated: None,