From: Nutomic Date: Wed, 27 Oct 2021 16:03:07 +0000 (+0000) Subject: Rewrite collections to use new fetcher (#1861) X-Git-Url: http://these/git/%7BprofileRss%7D?a=commitdiff_plain;h=61189efe7206add1cdf3bcf8649736b0bbb5a757;p=lemmy.git Rewrite collections to use new fetcher (#1861) * Merge traits ToApub and FromApub into ApubObject * Rewrite community outbox to use new fetcher * Rewrite community moderators collection * Rewrite tombstone --- diff --git a/.cargo-husky/hooks/pre-commit b/.cargo-husky/hooks/pre-commit index 1c2858d4..4b50f2a5 100755 --- a/.cargo-husky/hooks/pre-commit +++ b/.cargo-husky/hooks/pre-commit @@ -1,7 +1,7 @@ #!/bin/bash set -e -cargo +nightly fmt -- --check +cargo +nightly fmt cargo +nightly clippy --workspace --tests --all-targets --all-features -- \ -D warnings -D deprecated -D clippy::perf -D clippy::complexity -D clippy::dbg_macro diff --git a/Cargo.lock b/Cargo.lock index bcb4585d..9931aef1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1833,6 +1833,7 @@ dependencies = [ "http", "http-signature-normalization-actix", "itertools", + "lazy_static", "lemmy_api_common", "lemmy_apub_lib", "lemmy_db_schema", diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index a3a59792..4b54c87d 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -50,6 +50,7 @@ thiserror = "1.0.29" background-jobs = "0.9.0" reqwest = { version = "0.11.4", features = ["json"] } html2md = "0.2.13" +lazy_static = "1.4.0" [dev-dependencies] serial_test = "0.5.1" diff --git a/crates/apub/src/activities/comment/create_or_update.rs b/crates/apub/src/activities/comment/create_or_update.rs index c4591c7c..0dcf3f7d 100644 --- a/crates/apub/src/activities/comment/create_or_update.rs +++ b/crates/apub/src/activities/comment/create_or_update.rs @@ -21,7 +21,7 @@ use activitystreams::{base::AnyBase, link::Mention, primitives::OneOrMany, unpar use lemmy_api_common::{blocking, check_post_deleted_or_removed}; use lemmy_apub_lib::{ data::Data, - traits::{ActivityFields, ActivityHandler, ActorType, FromApub, ToApub}, + traits::{ActivityFields, ActivityHandler, ActorType, ApubObject}, values::PublicUrl, verify::verify_domains_match, }; @@ -77,7 +77,7 @@ impl CreateOrUpdateComment { let create_or_update = CreateOrUpdateComment { actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: comment.to_apub(context.pool()).await?, + object: comment.to_apub(context).await?, cc: maa.ccs, tag: maa.tags, kind, diff --git a/crates/apub/src/activities/community/update.rs b/crates/apub/src/activities/community/update.rs index 1120bd8b..f6d693dd 100644 --- a/crates/apub/src/activities/community/update.rs +++ b/crates/apub/src/activities/community/update.rs @@ -22,7 +22,7 @@ use activitystreams::{ use lemmy_api_common::blocking; use lemmy_apub_lib::{ data::Data, - traits::{ActivityFields, ActivityHandler, ActorType, ToApub}, + traits::{ActivityFields, ActivityHandler, ActorType, ApubObject}, values::PublicUrl, }; use lemmy_db_schema::{ @@ -66,7 +66,7 @@ impl UpdateCommunity { let update = UpdateCommunity { actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: community.to_apub(context.pool()).await?, + object: community.to_apub(context).await?, cc: [ObjectId::new(community.actor_id())], kind: UpdateType::Update, id: id.clone(), diff --git a/crates/apub/src/activities/post/create_or_update.rs b/crates/apub/src/activities/post/create_or_update.rs index cc7ef5c9..4a048a96 100644 --- a/crates/apub/src/activities/post/create_or_update.rs +++ b/crates/apub/src/activities/post/create_or_update.rs @@ -21,7 +21,7 @@ use anyhow::anyhow; use lemmy_api_common::blocking; use lemmy_apub_lib::{ data::Data, - traits::{ActivityFields, ActivityHandler, ActorType, FromApub, ToApub}, + traits::{ActivityFields, ActivityHandler, ActorType, ApubObject}, values::PublicUrl, verify::{verify_domains_match, verify_urls_match}, }; @@ -48,34 +48,42 @@ pub struct CreateOrUpdatePost { } impl CreateOrUpdatePost { - pub async fn send( + pub(crate) async fn new( post: &ApubPost, actor: &ApubPerson, + community: &ApubCommunity, kind: CreateOrUpdateType, context: &LemmyContext, - ) -> Result<(), LemmyError> { - let community_id = post.community_id; - let community: ApubCommunity = blocking(context.pool(), move |conn| { - Community::read(conn, community_id) - }) - .await?? - .into(); - + ) -> Result { let id = generate_activity_id( kind.clone(), &context.settings().get_protocol_and_hostname(), )?; - let create_or_update = CreateOrUpdatePost { + Ok(CreateOrUpdatePost { actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: post.to_apub(context.pool()).await?, + object: post.to_apub(context).await?, cc: [ObjectId::new(community.actor_id())], kind, id: id.clone(), context: lemmy_context(), unparsed: Default::default(), - }; - + }) + } + pub async fn send( + post: &ApubPost, + actor: &ApubPerson, + kind: CreateOrUpdateType, + context: &LemmyContext, + ) -> Result<(), LemmyError> { + let community_id = post.community_id; + let community: ApubCommunity = blocking(context.pool(), move |conn| { + Community::read(conn, community_id) + }) + .await?? + .into(); + let create_or_update = CreateOrUpdatePost::new(post, actor, &community, kind, context).await?; + let id = create_or_update.id.clone(); let activity = AnnouncableActivities::CreateOrUpdatePost(Box::new(create_or_update)); send_to_community(activity, &id, actor, &community, vec![], context).await } diff --git a/crates/apub/src/activities/private_message/create_or_update.rs b/crates/apub/src/activities/private_message/create_or_update.rs index a1ee5598..9ef22f0e 100644 --- a/crates/apub/src/activities/private_message/create_or_update.rs +++ b/crates/apub/src/activities/private_message/create_or_update.rs @@ -12,7 +12,7 @@ use activitystreams::{base::AnyBase, primitives::OneOrMany, unparsed::Unparsed}; use lemmy_api_common::blocking; use lemmy_apub_lib::{ data::Data, - traits::{ActivityFields, ActivityHandler, ActorType, FromApub, ToApub}, + traits::{ActivityFields, ActivityHandler, ActorType, ApubObject}, verify::verify_domains_match, }; use lemmy_db_schema::{source::person::Person, traits::Crud}; @@ -58,7 +58,7 @@ impl CreateOrUpdatePrivateMessage { id: id.clone(), actor: ObjectId::new(actor.actor_id()), to: ObjectId::new(recipient.actor_id()), - object: private_message.to_apub(context.pool()).await?, + object: private_message.to_apub(context).await?, kind, unparsed: Default::default(), }; diff --git a/crates/apub/src/collections/community_moderators.rs b/crates/apub/src/collections/community_moderators.rs new file mode 100644 index 00000000..dc1d5798 --- /dev/null +++ b/crates/apub/src/collections/community_moderators.rs @@ -0,0 +1,141 @@ +use crate::{ + collections::CommunityContext, + context::lemmy_context, + fetcher::object_id::ObjectId, + generate_moderators_url, + objects::person::ApubPerson, +}; +use activitystreams::{ + base::AnyBase, + chrono::NaiveDateTime, + collection::kind::OrderedCollectionType, + primitives::OneOrMany, + url::Url, +}; +use lemmy_api_common::blocking; +use lemmy_apub_lib::{traits::ApubObject, verify::verify_domains_match}; +use lemmy_db_schema::{ + source::community::{CommunityModerator, CommunityModeratorForm}, + traits::Joinable, +}; +use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView; +use lemmy_utils::LemmyError; +use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; + +#[skip_serializing_none] +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GroupModerators { + #[serde(rename = "@context")] + context: OneOrMany, + r#type: OrderedCollectionType, + id: Url, + ordered_items: Vec>, +} + +#[derive(Clone, Debug)] +pub(crate) struct ApubCommunityModerators(pub(crate) Vec); + +#[async_trait::async_trait(?Send)] +impl ApubObject for ApubCommunityModerators { + type DataType = CommunityContext; + type TombstoneType = (); + type ApubType = GroupModerators; + + fn last_refreshed_at(&self) -> Option { + None + } + + async fn read_from_apub_id( + _object_id: Url, + data: &Self::DataType, + ) -> Result, LemmyError> { + // Only read from database if its a local community, otherwise fetch over http + if data.0.local { + let cid = data.0.id; + let moderators = blocking(data.1.pool(), move |conn| { + CommunityModeratorView::for_community(conn, cid) + }) + .await??; + Ok(Some(ApubCommunityModerators { 0: moderators })) + } else { + Ok(None) + } + } + + async fn delete(self, _data: &Self::DataType) -> Result<(), LemmyError> { + unimplemented!() + } + + async fn to_apub(&self, data: &Self::DataType) -> Result { + let ordered_items = self + .0 + .iter() + .map(|m| ObjectId::::new(m.moderator.actor_id.clone().into_inner())) + .collect(); + Ok(GroupModerators { + context: lemmy_context(), + r#type: OrderedCollectionType::OrderedCollection, + id: generate_moderators_url(&data.0.actor_id)?.into(), + ordered_items, + }) + } + + fn to_tombstone(&self) -> Result { + unimplemented!() + } + + async fn from_apub( + apub: &Self::ApubType, + data: &Self::DataType, + expected_domain: &Url, + request_counter: &mut i32, + ) -> Result { + verify_domains_match(expected_domain, &apub.id)?; + let community_id = data.0.id; + let current_moderators = blocking(data.1.pool(), move |conn| { + CommunityModeratorView::for_community(conn, community_id) + }) + .await??; + // Remove old mods from database which arent in the moderators collection anymore + for mod_user in ¤t_moderators { + let mod_id = ObjectId::new(mod_user.moderator.actor_id.clone().into_inner()); + if !apub.ordered_items.contains(&mod_id) { + let community_moderator_form = CommunityModeratorForm { + community_id: mod_user.community.id, + person_id: mod_user.moderator.id, + }; + blocking(data.1.pool(), move |conn| { + CommunityModerator::leave(conn, &community_moderator_form) + }) + .await??; + } + } + + // Add new mods to database which have been added to moderators collection + for mod_id in &apub.ordered_items { + let mod_id = ObjectId::new(mod_id.clone()); + let mod_user: ApubPerson = mod_id.dereference(&data.1, request_counter).await?; + + if !current_moderators + .clone() + .iter() + .map(|c| c.moderator.actor_id.clone()) + .any(|x| x == mod_user.actor_id) + { + let community_moderator_form = CommunityModeratorForm { + community_id: data.0.id, + person_id: mod_user.id, + }; + blocking(data.1.pool(), move |conn| { + CommunityModerator::join(conn, &community_moderator_form) + }) + .await??; + } + } + + // This return value is unused, so just set an empty vec + Ok(ApubCommunityModerators { 0: vec![] }) + } +} diff --git a/crates/apub/src/collections/community_outbox.rs b/crates/apub/src/collections/community_outbox.rs new file mode 100644 index 00000000..24465c95 --- /dev/null +++ b/crates/apub/src/collections/community_outbox.rs @@ -0,0 +1,128 @@ +use crate::{ + activities::{post::create_or_update::CreateOrUpdatePost, CreateOrUpdateType}, + collections::CommunityContext, + context::lemmy_context, + generate_outbox_url, + objects::{person::ApubPerson, post::ApubPost}, +}; +use activitystreams::{ + base::AnyBase, + chrono::NaiveDateTime, + collection::kind::OrderedCollectionType, + primitives::OneOrMany, + url::Url, +}; +use lemmy_api_common::blocking; +use lemmy_apub_lib::{ + data::Data, + traits::{ActivityHandler, ApubObject}, + verify::verify_domains_match, +}; +use lemmy_db_schema::{ + source::{person::Person, post::Post}, + traits::Crud, +}; +use lemmy_utils::LemmyError; +use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; + +#[skip_serializing_none] +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GroupOutbox { + #[serde(rename = "@context")] + context: OneOrMany, + r#type: OrderedCollectionType, + id: Url, + ordered_items: Vec, +} + +#[derive(Clone, Debug)] +pub(crate) struct ApubCommunityOutbox(Vec); + +#[async_trait::async_trait(?Send)] +impl ApubObject for ApubCommunityOutbox { + type DataType = CommunityContext; + type TombstoneType = (); + type ApubType = GroupOutbox; + + fn last_refreshed_at(&self) -> Option { + None + } + + async fn read_from_apub_id( + _object_id: Url, + data: &Self::DataType, + ) -> Result, LemmyError> { + // Only read from database if its a local community, otherwise fetch over http + if data.0.local { + let community_id = data.0.id; + let post_list: Vec = blocking(data.1.pool(), move |conn| { + Post::list_for_community(conn, community_id) + }) + .await?? + .into_iter() + .map(Into::into) + .collect(); + Ok(Some(ApubCommunityOutbox(post_list))) + } else { + Ok(None) + } + } + + async fn delete(self, _data: &Self::DataType) -> Result<(), LemmyError> { + // do nothing (it gets deleted automatically with the community) + Ok(()) + } + + async fn to_apub(&self, data: &Self::DataType) -> Result { + let mut ordered_items = vec![]; + for post in &self.0 { + let actor = post.creator_id; + let actor: ApubPerson = blocking(data.1.pool(), move |conn| Person::read(conn, actor)) + .await?? + .into(); + let a = + CreateOrUpdatePost::new(post, &actor, &data.0, CreateOrUpdateType::Create, &data.1).await?; + ordered_items.push(a); + } + + Ok(GroupOutbox { + context: lemmy_context(), + r#type: OrderedCollectionType::OrderedCollection, + id: generate_outbox_url(&data.0.actor_id)?.into(), + ordered_items, + }) + } + + fn to_tombstone(&self) -> Result { + // no tombstone for this, there is only a tombstone for the community + unimplemented!() + } + + async fn from_apub( + apub: &Self::ApubType, + data: &Self::DataType, + expected_domain: &Url, + request_counter: &mut i32, + ) -> Result { + verify_domains_match(expected_domain, &apub.id)?; + let mut outbox_activities = apub.ordered_items.clone(); + if outbox_activities.len() > 20 { + outbox_activities = outbox_activities[0..20].to_vec(); + } + + // We intentionally ignore errors here. This is because the outbox might contain posts from old + // Lemmy versions, or from other software which we cant parse. In that case, we simply skip the + // item and only parse the ones that work. + for activity in outbox_activities { + activity + .receive(&Data::new(data.1.clone()), request_counter) + .await + .ok(); + } + + // This return value is unused, so just set an empty vec + Ok(ApubCommunityOutbox { 0: vec![] }) + } +} diff --git a/crates/apub/src/collections/mod.rs b/crates/apub/src/collections/mod.rs new file mode 100644 index 00000000..948824e2 --- /dev/null +++ b/crates/apub/src/collections/mod.rs @@ -0,0 +1,7 @@ +use crate::objects::community::ApubCommunity; +use lemmy_websocket::LemmyContext; +pub(crate) mod community_moderators; +pub(crate) mod community_outbox; + +/// Put community in the data, so we dont have to read it again from the database. +pub(crate) struct CommunityContext(pub ApubCommunity, pub LemmyContext); diff --git a/crates/apub/src/fetcher/community.rs b/crates/apub/src/fetcher/community.rs deleted file mode 100644 index 6044a55b..00000000 --- a/crates/apub/src/fetcher/community.rs +++ /dev/null @@ -1,144 +0,0 @@ -use crate::{ - activities::community::announce::AnnounceActivity, - fetcher::{fetch::fetch_remote_object, object_id::ObjectId}, - objects::{community::Group, person::ApubPerson}, -}; -use activitystreams::{ - base::AnyBase, - collection::{CollectionExt, OrderedCollection}, -}; -use anyhow::Context; -use lemmy_api_common::blocking; -use lemmy_apub_lib::{data::Data, traits::ActivityHandler}; -use lemmy_db_schema::{ - source::community::{Community, CommunityModerator, CommunityModeratorForm}, - traits::Joinable, -}; -use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView; -use lemmy_utils::{location_info, LemmyError}; -use lemmy_websocket::LemmyContext; -use url::Url; - -pub(crate) async fn update_community_mods( - group: &Group, - community: &Community, - context: &LemmyContext, - request_counter: &mut i32, -) -> Result<(), LemmyError> { - let new_moderators = fetch_community_mods(context, group, request_counter).await?; - let community_id = community.id; - let current_moderators = blocking(context.pool(), move |conn| { - CommunityModeratorView::for_community(conn, community_id) - }) - .await??; - // Remove old mods from database which arent in the moderators collection anymore - for mod_user in ¤t_moderators { - if !new_moderators.contains(&mod_user.moderator.actor_id.clone().into()) { - let community_moderator_form = CommunityModeratorForm { - community_id: mod_user.community.id, - person_id: mod_user.moderator.id, - }; - blocking(context.pool(), move |conn| { - CommunityModerator::leave(conn, &community_moderator_form) - }) - .await??; - } - } - - // Add new mods to database which have been added to moderators collection - for mod_id in new_moderators { - let mod_id = ObjectId::new(mod_id); - let mod_user: ApubPerson = mod_id.dereference(context, request_counter).await?; - - if !current_moderators - .clone() - .iter() - .map(|c| c.moderator.actor_id.clone()) - .any(|x| x == mod_user.actor_id) - { - let community_moderator_form = CommunityModeratorForm { - community_id: community.id, - person_id: mod_user.id, - }; - blocking(context.pool(), move |conn| { - CommunityModerator::join(conn, &community_moderator_form) - }) - .await??; - } - } - - Ok(()) -} - -pub(crate) async fn fetch_community_outbox( - context: &LemmyContext, - outbox: &Url, - recursion_counter: &mut i32, -) -> Result<(), LemmyError> { - let outbox = fetch_remote_object::( - context.client(), - &context.settings(), - outbox, - recursion_counter, - ) - .await?; - let outbox_activities = outbox.items().context(location_info!())?.clone(); - let mut outbox_activities = outbox_activities.many().context(location_info!())?; - if outbox_activities.len() > 20 { - outbox_activities = outbox_activities[0..20].to_vec(); - } - - // We intentionally ignore errors here. This is because the outbox might contain posts from old - // Lemmy versions, or from other software which we cant parse. In that case, we simply skip the - // item and only parse the ones that work. - for activity in outbox_activities { - parse_outbox_item(activity, context, recursion_counter) - .await - .ok(); - } - - Ok(()) -} - -async fn parse_outbox_item( - announce: AnyBase, - context: &LemmyContext, - request_counter: &mut i32, -) -> Result<(), LemmyError> { - // TODO: instead of converting like this, we should create a struct CommunityOutbox with - // AnnounceActivity as inner type, but that gives me stackoverflow - let ser = serde_json::to_string(&announce)?; - let announce: AnnounceActivity = serde_json::from_str(&ser)?; - announce - .receive(&Data::new(context.clone()), request_counter) - .await?; - Ok(()) -} - -async fn fetch_community_mods( - context: &LemmyContext, - group: &Group, - recursion_counter: &mut i32, -) -> Result, LemmyError> { - if let Some(mods_url) = &group.moderators { - let mods = fetch_remote_object::( - context.client(), - &context.settings(), - mods_url, - recursion_counter, - ) - .await?; - let mods = mods - .items() - .map(|i| i.as_many()) - .flatten() - .context(location_info!())? - .iter() - .filter_map(|i| i.as_xsd_any_uri()) - .map(|u| u.to_owned()) - .collect(); - Ok(mods) - } else { - Ok(vec![]) - } -} diff --git a/crates/apub/src/fetcher/fetch.rs b/crates/apub/src/fetcher/fetch.rs deleted file mode 100644 index 95b2f55f..00000000 --- a/crates/apub/src/fetcher/fetch.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::check_is_apub_id_valid; -use anyhow::anyhow; -use lemmy_apub_lib::APUB_JSON_CONTENT_TYPE; -use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError}; -use log::info; -use reqwest::Client; -use serde::Deserialize; -use std::time::Duration; -use url::Url; - -/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object -/// fetch through the search). -/// -/// A community fetch will load the outbox with up to 20 items, and fetch the creator for each item. -/// So we are looking at a maximum of 22 requests (rounded up just to be safe). -static MAX_REQUEST_NUMBER: i32 = 25; - -/// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation, -/// timeouts etc. -pub(in crate::fetcher) async fn fetch_remote_object( - client: &Client, - settings: &Settings, - url: &Url, - recursion_counter: &mut i32, -) -> Result -where - Response: for<'de> Deserialize<'de> + std::fmt::Debug, -{ - *recursion_counter += 1; - if *recursion_counter > MAX_REQUEST_NUMBER { - return Err(anyhow!("Maximum recursion depth reached").into()); - } - check_is_apub_id_valid(url, false, settings)?; - - let timeout = Duration::from_secs(60); - - let res = retry(|| { - client - .get(url.as_str()) - .header("Accept", APUB_JSON_CONTENT_TYPE) - .timeout(timeout) - .send() - }) - .await?; - - let object = res.json().await?; - info!("Fetched remote object {}", url); - Ok(object) -} diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index fd6b982c..db39d228 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -1,5 +1,3 @@ -pub mod community; -mod fetch; pub mod object_id; pub mod post_or_comment; pub mod search; @@ -47,7 +45,7 @@ pub(crate) async fn get_or_fetch_and_upsert_actor( /// /// TODO it won't pick up new avatars, summaries etc until a day after. /// Actors need an "update" activity pushed to other servers to fix this. -fn should_refetch_actor(last_refreshed: NaiveDateTime) -> bool { +fn should_refetch_object(last_refreshed: NaiveDateTime) -> bool { let update_interval = if cfg!(debug_assertions) { // avoid infinite loop when fetching community outbox chrono::Duration::seconds(ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG) diff --git a/crates/apub/src/fetcher/object_id.rs b/crates/apub/src/fetcher/object_id.rs index 38eb14cc..66466362 100644 --- a/crates/apub/src/fetcher/object_id.rs +++ b/crates/apub/src/fetcher/object_id.rs @@ -1,14 +1,15 @@ -use crate::fetcher::should_refetch_actor; +use crate::fetcher::should_refetch_object; use anyhow::anyhow; use diesel::NotFound; -use lemmy_apub_lib::{ - traits::{ApubObject, FromApub}, - APUB_JSON_CONTENT_TYPE, -}; +use lemmy_apub_lib::{traits::ApubObject, APUB_JSON_CONTENT_TYPE}; use lemmy_db_schema::newtypes::DbUrl; -use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError}; -use lemmy_websocket::LemmyContext; -use reqwest::StatusCode; +use lemmy_utils::{ + request::{build_user_agent, retry}, + settings::structs::Settings, + LemmyError, +}; +use log::info; +use reqwest::{Client, StatusCode}; use serde::{Deserialize, Serialize}; use std::{ fmt::{Debug, Display, Formatter}, @@ -21,17 +22,25 @@ use url::Url; /// fetch through the search). This should be configurable. static REQUEST_LIMIT: i32 = 25; +// TODO: after moving this file to library, remove lazy_static dependency from apub crate +lazy_static! { + static ref CLIENT: Client = Client::builder() + .user_agent(build_user_agent(&Settings::get())) + .build() + .unwrap(); +} + #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] #[serde(transparent)] pub struct ObjectId(Url, #[serde(skip)] PhantomData) where - Kind: FromApub + ApubObject + Send + 'static, - for<'de2> ::ApubType: serde::Deserialize<'de2>; + Kind: ApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>; impl ObjectId where - Kind: FromApub + ApubObject + Send + 'static, - for<'de> ::ApubType: serde::Deserialize<'de>, + Kind: ApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, { pub fn new(url: T) -> Self where @@ -47,10 +56,10 @@ where /// Fetches an activitypub object, either from local database (if possible), or over http. pub async fn dereference( &self, - context: &LemmyContext, + data: &::DataType, request_counter: &mut i32, ) -> Result { - let db_object = self.dereference_from_db(context).await?; + let db_object = self.dereference_from_db(data).await?; // if its a local object, only fetch it from the database and not over http if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) { @@ -60,44 +69,54 @@ where }; } + // object found in database if let Some(object) = db_object { + // object is old and should be refetched if let Some(last_refreshed_at) = object.last_refreshed_at() { - // TODO: rename to should_refetch_object() - if should_refetch_actor(last_refreshed_at) { + if should_refetch_object(last_refreshed_at) { return self - .dereference_from_http(context, request_counter, Some(object)) + .dereference_from_http(data, request_counter, Some(object)) .await; } } Ok(object) - } else { + } + // object not found, need to fetch over http + else { self - .dereference_from_http(context, request_counter, None) + .dereference_from_http(data, request_counter, None) .await } } /// Fetch an object from the local db. Instead of falling back to http, this throws an error if /// the object is not found in the database. - pub async fn dereference_local(&self, context: &LemmyContext) -> Result { - let object = self.dereference_from_db(context).await?; + pub async fn dereference_local( + &self, + data: &::DataType, + ) -> Result { + let object = self.dereference_from_db(data).await?; object.ok_or_else(|| anyhow!("object not found in database {}", self).into()) } /// returning none means the object was not found in local db - async fn dereference_from_db(&self, context: &LemmyContext) -> Result, LemmyError> { + async fn dereference_from_db( + &self, + data: &::DataType, + ) -> Result, LemmyError> { let id = self.0.clone(); - ApubObject::read_from_apub_id(id, context).await + ApubObject::read_from_apub_id(id, data).await } async fn dereference_from_http( &self, - context: &LemmyContext, + data: &::DataType, request_counter: &mut i32, db_object: Option, ) -> Result { // dont fetch local objects this way debug_assert!(self.0.domain() != Some(&Settings::get().hostname)); + info!("Fetching remote object {}", self.to_string()); *request_counter += 1; if *request_counter > REQUEST_LIMIT { @@ -105,8 +124,7 @@ where } let res = retry(|| { - context - .client() + CLIENT .get(self.0.as_str()) .header("Accept", APUB_JSON_CONTENT_TYPE) .timeout(Duration::from_secs(60)) @@ -116,21 +134,21 @@ where if res.status() == StatusCode::GONE { if let Some(db_object) = db_object { - db_object.delete(context).await?; + db_object.delete(data).await?; } return Err(anyhow!("Fetched remote object {} which was deleted", self).into()); } let res2: Kind::ApubType = res.json().await?; - Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?) + Ok(Kind::from_apub(&res2, data, self.inner(), request_counter).await?) } } impl Display for ObjectId where - Kind: FromApub + ApubObject + Send + 'static, - for<'de> ::ApubType: serde::Deserialize<'de>, + Kind: ApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0.to_string()) @@ -139,8 +157,8 @@ where impl From> for Url where - Kind: FromApub + ApubObject + Send + 'static, - for<'de> ::ApubType: serde::Deserialize<'de>, + Kind: ApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn from(id: ObjectId) -> Self { id.0 @@ -149,8 +167,8 @@ where impl From> for DbUrl where - Kind: FromApub + ApubObject + Send + 'static, - for<'de> ::ApubType: serde::Deserialize<'de>, + Kind: ApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>, { fn from(id: ObjectId) -> Self { id.0.into() diff --git a/crates/apub/src/fetcher/post_or_comment.rs b/crates/apub/src/fetcher/post_or_comment.rs index fd78ad79..ff0562a6 100644 --- a/crates/apub/src/fetcher/post_or_comment.rs +++ b/crates/apub/src/fetcher/post_or_comment.rs @@ -3,7 +3,7 @@ use crate::objects::{ post::{ApubPost, Page}, }; use activitystreams::chrono::NaiveDateTime; -use lemmy_apub_lib::traits::{ApubObject, FromApub}; +use lemmy_apub_lib::traits::ApubObject; use lemmy_db_schema::source::{comment::CommentForm, post::PostForm}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; @@ -31,6 +31,8 @@ pub enum PageOrNote { #[async_trait::async_trait(?Send)] impl ApubObject for PostOrComment { type DataType = LemmyContext; + type ApubType = PageOrNote; + type TombstoneType = (); fn last_refreshed_at(&self) -> Option { None @@ -59,12 +61,14 @@ impl ApubObject for PostOrComment { PostOrComment::Comment(c) => c.delete(data).await, } } -} -#[async_trait::async_trait(?Send)] -impl FromApub for PostOrComment { - type ApubType = PageOrNote; - type DataType = LemmyContext; + async fn to_apub(&self, _data: &Self::DataType) -> Result { + unimplemented!() + } + + fn to_tombstone(&self) -> Result { + unimplemented!() + } async fn from_apub( apub: &PageOrNote, diff --git a/crates/apub/src/fetcher/search.rs b/crates/apub/src/fetcher/search.rs index c67eaa0f..86cdcbe0 100644 --- a/crates/apub/src/fetcher/search.rs +++ b/crates/apub/src/fetcher/search.rs @@ -12,7 +12,7 @@ use anyhow::anyhow; use itertools::Itertools; use lemmy_api_common::blocking; use lemmy_apub_lib::{ - traits::{ApubObject, FromApub}, + traits::ApubObject, webfinger::{webfinger_resolve_actor, WebfingerType}, }; use lemmy_db_schema::{ @@ -110,6 +110,8 @@ pub enum SearchableApubTypes { #[async_trait::async_trait(?Send)] impl ApubObject for SearchableObjects { type DataType = LemmyContext; + type ApubType = SearchableApubTypes; + type TombstoneType = (); fn last_refreshed_at(&self) -> Option { match self { @@ -156,12 +158,14 @@ impl ApubObject for SearchableObjects { SearchableObjects::Comment(c) => c.delete(data).await, } } -} -#[async_trait::async_trait(?Send)] -impl FromApub for SearchableObjects { - type ApubType = SearchableApubTypes; - type DataType = LemmyContext; + async fn to_apub(&self, _data: &Self::DataType) -> Result { + unimplemented!() + } + + fn to_tombstone(&self) -> Result { + unimplemented!() + } async fn from_apub( apub: &Self::ApubType, diff --git a/crates/apub/src/http/comment.rs b/crates/apub/src/http/comment.rs index 58543a5b..3086f2fd 100644 --- a/crates/apub/src/http/comment.rs +++ b/crates/apub/src/http/comment.rs @@ -5,7 +5,7 @@ use crate::{ use actix_web::{body::Body, web, web::Path, HttpResponse}; use diesel::result::Error::NotFound; use lemmy_api_common::blocking; -use lemmy_apub_lib::traits::ToApub; +use lemmy_apub_lib::traits::ApubObject; use lemmy_db_schema::{newtypes::CommentId, source::comment::Comment, traits::Crud}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; @@ -30,9 +30,7 @@ pub(crate) async fn get_apub_comment( } if !comment.deleted { - Ok(create_apub_response( - &comment.to_apub(context.pool()).await?, - )) + Ok(create_apub_response(&comment.to_apub(&**context).await?)) } else { Ok(create_apub_tombstone_response(&comment.to_tombstone()?)) } diff --git a/crates/apub/src/http/community.rs b/crates/apub/src/http/community.rs index 7c66ca9d..1e10e384 100644 --- a/crates/apub/src/http/community.rs +++ b/crates/apub/src/http/community.rs @@ -5,8 +5,13 @@ use crate::{ following::{follow::FollowCommunity, undo::UndoFollowCommunity}, report::Report, }, + collections::{ + community_moderators::ApubCommunityModerators, + community_outbox::ApubCommunityOutbox, + CommunityContext, + }, context::lemmy_context, - generate_moderators_url, + fetcher::object_id::ObjectId, generate_outbox_url, http::{ create_apub_response, @@ -17,18 +22,14 @@ use crate::{ objects::community::ApubCommunity, }; use activitystreams::{ - base::{AnyBase, BaseExt}, - collection::{CollectionExt, OrderedCollection, UnorderedCollection}, - url::Url, + base::BaseExt, + collection::{CollectionExt, UnorderedCollection}, }; use actix_web::{body::Body, web, web::Payload, HttpRequest, HttpResponse}; use lemmy_api_common::blocking; -use lemmy_apub_lib::traits::{ActivityFields, ActivityHandler, ToApub}; -use lemmy_db_schema::source::{activity::Activity, community::Community}; -use lemmy_db_views_actor::{ - community_follower_view::CommunityFollowerView, - community_moderator_view::CommunityModeratorView, -}; +use lemmy_apub_lib::traits::{ActivityFields, ActivityHandler, ApubObject}; +use lemmy_db_schema::source::community::Community; +use lemmy_db_views_actor::community_follower_view::CommunityFollowerView; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use log::trace; @@ -51,7 +52,7 @@ pub(crate) async fn get_apub_community_http( .into(); if !community.deleted { - let apub = community.to_apub(context.pool()).await?; + let apub = community.to_apub(&**context).await?; Ok(create_apub_response(&apub)) } else { @@ -133,41 +134,10 @@ pub(crate) async fn get_apub_community_outbox( Community::read_from_name(conn, &info.community_name) }) .await??; - - let community_actor_id = community.actor_id.to_owned(); - let activities = blocking(context.pool(), move |conn| { - Activity::read_community_outbox(conn, &community_actor_id) - }) - .await??; - - let activities = activities - .iter() - .map(AnyBase::from_arbitrary_json) - .collect::, serde_json::Error>>()?; - let len = activities.len(); - let mut collection = OrderedCollection::new(); - collection - .set_many_items(activities) - .set_many_contexts(lemmy_context()) - .set_id(generate_outbox_url(&community.actor_id)?.into()) - .set_total_items(len as u64); - Ok(create_apub_response(&collection)) -} - -pub(crate) async fn get_apub_community_inbox( - info: web::Path, - context: web::Data, -) -> Result, LemmyError> { - let community = blocking(context.pool(), move |conn| { - Community::read_from_name(conn, &info.community_name) - }) - .await??; - - let mut collection = OrderedCollection::new(); - collection - .set_id(community.inbox_url.into()) - .set_many_contexts(lemmy_context()); - Ok(create_apub_response(&collection)) + let id = ObjectId::new(generate_outbox_url(&community.actor_id)?.into_inner()); + let outbox_data = CommunityContext(community.into(), context.get_ref().clone()); + let outbox: ApubCommunityOutbox = id.dereference(&outbox_data, &mut 0).await?; + Ok(create_apub_response(&outbox.to_apub(&outbox_data).await?)) } pub(crate) async fn get_apub_community_moderators( @@ -179,26 +149,10 @@ pub(crate) async fn get_apub_community_moderators( }) .await?? .into(); - - // The attributed to, is an ordered vector with the creator actor_ids first, - // then the rest of the moderators - // TODO Technically the instance admins can mod the community, but lets - // ignore that for now - let cid = community.id; - let moderators = blocking(context.pool(), move |conn| { - CommunityModeratorView::for_community(conn, cid) - }) - .await??; - - let moderators: Vec = moderators - .into_iter() - .map(|m| m.moderator.actor_id.into()) - .collect(); - let mut collection = OrderedCollection::new(); - collection - .set_id(generate_moderators_url(&community.actor_id)?.into()) - .set_total_items(moderators.len() as u64) - .set_many_items(moderators) - .set_many_contexts(lemmy_context()); - Ok(create_apub_response(&collection)) + let id = ObjectId::new(generate_outbox_url(&community.actor_id)?.into_inner()); + let outbox_data = CommunityContext(community, context.get_ref().clone()); + let moderators: ApubCommunityModerators = id.dereference(&outbox_data, &mut 0).await?; + Ok(create_apub_response( + &moderators.to_apub(&outbox_data).await?, + )) } diff --git a/crates/apub/src/http/person.rs b/crates/apub/src/http/person.rs index 16f1bc5c..a7e94020 100644 --- a/crates/apub/src/http/person.rs +++ b/crates/apub/src/http/person.rs @@ -24,7 +24,7 @@ use activitystreams::{ }; use actix_web::{body::Body, web, web::Payload, HttpRequest, HttpResponse}; use lemmy_api_common::blocking; -use lemmy_apub_lib::traits::{ActivityFields, ActivityHandler, ToApub}; +use lemmy_apub_lib::traits::{ActivityFields, ActivityHandler, ApubObject}; use lemmy_db_schema::source::person::Person; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; @@ -51,7 +51,7 @@ pub(crate) async fn get_apub_person_http( .into(); if !person.deleted { - let apub = person.to_apub(context.pool()).await?; + let apub = person.to_apub(&context).await?; Ok(create_apub_response(&apub)) } else { @@ -109,19 +109,3 @@ pub(crate) async fn get_apub_person_outbox( .set_total_items(0_u64); Ok(create_apub_response(&collection)) } - -pub(crate) async fn get_apub_person_inbox( - info: web::Path, - context: web::Data, -) -> Result, LemmyError> { - let person = blocking(context.pool(), move |conn| { - Person::find_by_name(conn, &info.user_name) - }) - .await??; - - let mut collection = OrderedCollection::new(); - collection - .set_id(person.inbox_url.into()) - .set_many_contexts(lemmy_context()); - Ok(create_apub_response(&collection)) -} diff --git a/crates/apub/src/http/post.rs b/crates/apub/src/http/post.rs index 2ce712bc..0459942a 100644 --- a/crates/apub/src/http/post.rs +++ b/crates/apub/src/http/post.rs @@ -5,7 +5,7 @@ use crate::{ use actix_web::{body::Body, web, HttpResponse}; use diesel::result::Error::NotFound; use lemmy_api_common::blocking; -use lemmy_apub_lib::traits::ToApub; +use lemmy_apub_lib::traits::ApubObject; use lemmy_db_schema::{newtypes::PostId, source::post::Post, traits::Crud}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; @@ -30,7 +30,7 @@ pub(crate) async fn get_apub_post( } if !post.deleted { - Ok(create_apub_response(&post.to_apub(context.pool()).await?)) + Ok(create_apub_response(&post.to_apub(&context).await?)) } else { Ok(create_apub_tombstone_response(&post.to_tombstone()?)) } diff --git a/crates/apub/src/http/routes.rs b/crates/apub/src/http/routes.rs index 7adb0ca9..5dfbc238 100644 --- a/crates/apub/src/http/routes.rs +++ b/crates/apub/src/http/routes.rs @@ -4,12 +4,11 @@ use crate::http::{ community_inbox, get_apub_community_followers, get_apub_community_http, - get_apub_community_inbox, get_apub_community_moderators, get_apub_community_outbox, }, get_activity, - person::{get_apub_person_http, get_apub_person_inbox, get_apub_person_outbox, person_inbox}, + person::{get_apub_person_http, get_apub_person_outbox, person_inbox}, post::get_apub_post, shared_inbox, }; @@ -49,10 +48,6 @@ pub fn config(cfg: &mut web::ServiceConfig, settings: &Settings) { "/c/{community_name}/outbox", web::get().to(get_apub_community_outbox), ) - .route( - "/c/{community_name}/inbox", - web::get().to(get_apub_community_inbox), - ) .route( "/c/{community_name}/moderators", web::get().to(get_apub_community_moderators), @@ -62,7 +57,6 @@ pub fn config(cfg: &mut web::ServiceConfig, settings: &Settings) { "/u/{user_name}/outbox", web::get().to(get_apub_person_outbox), ) - .route("/u/{user_name}/inbox", web::get().to(get_apub_person_inbox)) .route("/post/{post_id}", web::get().to(get_apub_post)) .route("/comment/{comment_id}", web::get().to(get_apub_comment)) .route("/activities/{type_}/{id}", web::get().to(get_activity)), diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index cc9d5f48..675798b6 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -1,10 +1,14 @@ pub mod activities; +pub(crate) mod collections; mod context; pub mod fetcher; pub mod http; pub mod migrations; pub mod objects; +#[macro_use] +extern crate lazy_static; + use crate::fetcher::post_or_comment::PostOrComment; use anyhow::{anyhow, Context}; use lemmy_api_common::blocking; diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index 5fe4fa69..b79f53e0 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -2,13 +2,13 @@ use crate::{ activities::{verify_is_public, verify_person_in_community}, context::lemmy_context, fetcher::object_id::ObjectId, - objects::{create_tombstone, person::ApubPerson, post::ApubPost, Source}, + objects::{person::ApubPerson, post::ApubPost, tombstone::Tombstone, Source}, PostOrComment, }; use activitystreams::{ base::AnyBase, chrono::NaiveDateTime, - object::{kind::NoteType, Tombstone}, + object::kind::NoteType, primitives::OneOrMany, public, unparsed::Unparsed, @@ -18,7 +18,7 @@ use chrono::{DateTime, FixedOffset}; use html2md::parse_html; use lemmy_api_common::blocking; use lemmy_apub_lib::{ - traits::{ApubObject, FromApub, ToApub}, + traits::ApubObject, values::{MediaTypeHtml, MediaTypeMarkdown}, verify::verify_domains_match, }; @@ -31,7 +31,6 @@ use lemmy_db_schema::{ post::Post, }, traits::Crud, - DbPool, }; use lemmy_utils::{ utils::{convert_datetime, remove_slurs}, @@ -159,6 +158,8 @@ impl From for ApubComment { #[async_trait::async_trait(?Send)] impl ApubObject for ApubComment { type DataType = LemmyContext; + type ApubType = Note; + type TombstoneType = Tombstone; fn last_refreshed_at(&self) -> Option { None @@ -184,23 +185,17 @@ impl ApubObject for ApubComment { .await??; Ok(()) } -} - -#[async_trait::async_trait(?Send)] -impl ToApub for ApubComment { - type ApubType = Note; - type TombstoneType = Tombstone; - type DataType = DbPool; - async fn to_apub(&self, pool: &DbPool) -> Result { + async fn to_apub(&self, context: &LemmyContext) -> Result { let creator_id = self.creator_id; - let creator = blocking(pool, move |conn| Person::read(conn, creator_id)).await??; + let creator = blocking(context.pool(), move |conn| Person::read(conn, creator_id)).await??; let post_id = self.post_id; - let post = blocking(pool, move |conn| Post::read(conn, post_id)).await??; + let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??; let in_reply_to = if let Some(comment_id) = self.parent_id { - let parent_comment = blocking(pool, move |conn| Comment::read(conn, comment_id)).await??; + let parent_comment = + blocking(context.pool(), move |conn| Comment::read(conn, comment_id)).await??; ObjectId::::new(parent_comment.ap_id.into_inner()) } else { ObjectId::::new(post.ap_id.into_inner()) @@ -228,19 +223,11 @@ impl ToApub for ApubComment { } fn to_tombstone(&self) -> Result { - create_tombstone( - self.deleted, - self.ap_id.to_owned().into(), - self.updated, + Ok(Tombstone::new( NoteType::Note, - ) + self.updated.unwrap_or(self.published), + )) } -} - -#[async_trait::async_trait(?Send)] -impl FromApub for ApubComment { - type ApubType = Note; - type DataType = LemmyContext; /// Converts a `Note` to `Comment`. /// @@ -324,6 +311,8 @@ mod tests { #[actix_rt::test] #[serial] async fn test_parse_lemmy_comment() { + // TODO: changed ObjectId::dereference() so that it always fetches if + // last_refreshed_at() == None. But post doesnt store that and expects to never be refetched let context = init_context(); let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap(); let data = prepare_comment_test(&url, &context).await; @@ -339,7 +328,7 @@ mod tests { assert!(!comment.local); assert_eq!(request_counter, 0); - let to_apub = comment.to_apub(context.pool()).await.unwrap(); + let to_apub = comment.to_apub(&context).await.unwrap(); assert_json_include!(actual: json, expected: to_apub); Comment::delete(&*context.pool().get().unwrap(), comment.id).unwrap(); diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 74d417c3..e71addba 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -1,17 +1,22 @@ use crate::{ check_is_apub_id_valid, + collections::{ + community_moderators::ApubCommunityModerators, + community_outbox::ApubCommunityOutbox, + CommunityContext, + }, context::lemmy_context, - fetcher::community::{fetch_community_outbox, update_community_mods}, + fetcher::object_id::ObjectId, generate_moderators_url, generate_outbox_url, - objects::{create_tombstone, get_summary_from_string_or_source, ImageObject, Source}, + objects::{get_summary_from_string_or_source, tombstone::Tombstone, ImageObject, Source}, CommunityType, }; use activitystreams::{ actor::{kind::GroupType, Endpoints}, base::AnyBase, chrono::NaiveDateTime, - object::{kind::ImageType, Tombstone}, + object::kind::ImageType, primitives::OneOrMany, unparsed::Unparsed, }; @@ -20,7 +25,7 @@ use itertools::Itertools; use lemmy_api_common::blocking; use lemmy_apub_lib::{ signatures::PublicKey, - traits::{ActorType, ApubObject, FromApub, ToApub}, + traits::{ActorType, ApubObject}, values::MediaTypeMarkdown, verify::verify_domains_match, }; @@ -63,9 +68,9 @@ pub struct Group { // lemmy extension sensitive: Option, // lemmy extension - pub(crate) moderators: Option, + pub(crate) moderators: Option>, inbox: Url, - pub(crate) outbox: Url, + pub(crate) outbox: ObjectId, followers: Url, endpoints: Endpoints, public_key: PublicKey, @@ -138,6 +143,8 @@ impl From for ApubCommunity { #[async_trait::async_trait(?Send)] impl ApubObject for ApubCommunity { type DataType = LemmyContext; + type ApubType = Group; + type TombstoneType = Tombstone; fn last_refreshed_at(&self) -> Option { Some(self.last_refreshed_at) @@ -163,41 +170,8 @@ impl ApubObject for ApubCommunity { .await??; Ok(()) } -} - -impl ActorType for ApubCommunity { - fn is_local(&self) -> bool { - self.local - } - fn actor_id(&self) -> Url { - self.actor_id.to_owned().into() - } - fn name(&self) -> String { - self.name.clone() - } - fn public_key(&self) -> Option { - self.public_key.to_owned() - } - fn private_key(&self) -> Option { - self.private_key.to_owned() - } - fn inbox_url(&self) -> Url { - self.inbox_url.clone().into() - } - - fn shared_inbox_url(&self) -> Option { - self.shared_inbox_url.clone().map(|s| s.into_inner()) - } -} - -#[async_trait::async_trait(?Send)] -impl ToApub for ApubCommunity { - type ApubType = Group; - type TombstoneType = Tombstone; - type DataType = DbPool; - - async fn to_apub(&self, _pool: &DbPool) -> Result { + async fn to_apub(&self, _context: &LemmyContext) -> Result { let source = self.description.clone().map(|bio| Source { content: bio, media_type: MediaTypeMarkdown::Markdown, @@ -222,9 +196,11 @@ impl ToApub for ApubCommunity { icon, image, sensitive: Some(self.nsfw), - moderators: Some(generate_moderators_url(&self.actor_id)?.into()), + moderators: Some(ObjectId::::new( + generate_moderators_url(&self.actor_id)?.into_inner(), + )), inbox: self.inbox_url.clone().into(), - outbox: generate_outbox_url(&self.actor_id)?.into(), + outbox: ObjectId::new(generate_outbox_url(&self.actor_id)?), followers: self.followers_url.clone().into(), endpoints: Endpoints { shared_inbox: self.shared_inbox_url.clone().map(|s| s.into()), @@ -239,19 +215,11 @@ impl ToApub for ApubCommunity { } fn to_tombstone(&self) -> Result { - create_tombstone( - self.deleted, - self.actor_id.to_owned().into(), - self.updated, + Ok(Tombstone::new( GroupType::Group, - ) + self.updated.unwrap_or(self.published), + )) } -} - -#[async_trait::async_trait(?Send)] -impl FromApub for ApubCommunity { - type ApubType = Group; - type DataType = LemmyContext; /// Converts a `Group` to `Community`, inserts it into the database and updates moderators. async fn from_apub( @@ -264,19 +232,54 @@ impl FromApub for ApubCommunity { // Fetching mods and outbox is not necessary for Lemmy to work, so ignore errors. Besides, // we need to ignore these errors so that tests can work entirely offline. - let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??; - update_community_mods(group, &community, context, request_counter) + let community: ApubCommunity = + blocking(context.pool(), move |conn| Community::upsert(conn, &form)) + .await?? + .into(); + let outbox_data = CommunityContext(community.clone(), context.clone()); + + group + .outbox + .dereference(&outbox_data, request_counter) .await .map_err(|e| debug!("{}", e)) .ok(); - // TODO: doing this unconditionally might cause infinite loop for some reason - fetch_community_outbox(context, &group.outbox, request_counter) - .await - .map_err(|e| debug!("{}", e)) - .ok(); + if let Some(moderators) = &group.moderators { + moderators + .dereference(&outbox_data, request_counter) + .await + .map_err(|e| debug!("{}", e)) + .ok(); + } + + Ok(community) + } +} + +impl ActorType for ApubCommunity { + fn is_local(&self) -> bool { + self.local + } + fn actor_id(&self) -> Url { + self.actor_id.to_owned().into() + } + fn name(&self) -> String { + self.name.clone() + } + fn public_key(&self) -> Option { + self.public_key.to_owned() + } + fn private_key(&self) -> Option { + self.private_key.to_owned() + } + + fn inbox_url(&self) -> Url { + self.inbox_url.clone().into() + } - Ok(community.into()) + fn shared_inbox_url(&self) -> Option { + self.shared_inbox_url.clone().map(|s| s.into_inner()) } } @@ -327,9 +330,11 @@ mod tests { let mut json: Group = file_to_json_object("assets/lemmy-community.json"); let json_orig = json.clone(); // change these links so they dont fetch over the network - json.moderators = - Some(Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_moderators").unwrap()); - json.outbox = Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap(); + json.moderators = Some(ObjectId::new( + Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_moderators").unwrap(), + )); + json.outbox = + ObjectId::new(Url::parse("https://enterprise.lemmy.ml/c/tenforward/not_outbox").unwrap()); let url = Url::parse("https://enterprise.lemmy.ml/c/tenforward").unwrap(); let mut request_counter = 0; @@ -345,7 +350,7 @@ mod tests { // this makes two requests to the (intentionally) broken outbox/moderators collections assert_eq!(request_counter, 2); - let to_apub = community.to_apub(context.pool()).await.unwrap(); + let to_apub = community.to_apub(&context).await.unwrap(); assert_json_include!(actual: json_orig, expected: to_apub); Community::delete(&*context.pool().get().unwrap(), community.id).unwrap(); diff --git a/crates/apub/src/objects/mod.rs b/crates/apub/src/objects/mod.rs index 76c258ce..96700cb7 100644 --- a/crates/apub/src/objects/mod.rs +++ b/crates/apub/src/objects/mod.rs @@ -1,12 +1,6 @@ -use activitystreams::{ - base::BaseExt, - object::{kind::ImageType, Tombstone, TombstoneExt}, -}; -use anyhow::anyhow; -use chrono::NaiveDateTime; +use activitystreams::object::kind::ImageType; use html2md::parse_html; use lemmy_apub_lib::values::MediaTypeMarkdown; -use lemmy_utils::{utils::convert_datetime, LemmyError}; use serde::{Deserialize, Serialize}; use url::Url; @@ -15,6 +9,7 @@ pub mod community; pub mod person; pub mod post; pub mod private_message; +pub mod tombstone; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] @@ -31,31 +26,6 @@ pub struct ImageObject { url: Url, } -/// Updated is actually the deletion time -fn create_tombstone( - deleted: bool, - object_id: Url, - updated: Option, - former_type: T, -) -> Result -where - T: ToString, -{ - if deleted { - if let Some(updated) = updated { - let mut tombstone = Tombstone::new(); - tombstone.set_id(object_id); - tombstone.set_former_type(former_type.to_string()); - tombstone.set_deleted(convert_datetime(updated)); - Ok(tombstone) - } else { - Err(anyhow!("Cant convert to tombstone because updated time was None.").into()) - } - } else { - Err(anyhow!("Cant convert object to tombstone if it wasnt deleted").into()) - } -} - fn get_summary_from_string_or_source( raw: &Option, source: &Option, @@ -69,7 +39,6 @@ fn get_summary_from_string_or_source( #[cfg(test)] mod tests { - use super::*; use actix::Actor; use diesel::{ r2d2::{ConnectionManager, Pool}, @@ -85,6 +54,7 @@ mod tests { rate_limit::{rate_limiter::RateLimiter, RateLimit}, request::build_user_agent, settings::structs::Settings, + LemmyError, }; use lemmy_websocket::{chat_server::ChatServer, LemmyContext}; use reqwest::Client; diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index 99847562..f4ba225b 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -16,14 +16,13 @@ use chrono::{DateTime, FixedOffset}; use lemmy_api_common::blocking; use lemmy_apub_lib::{ signatures::PublicKey, - traits::{ActorType, ApubObject, FromApub, ToApub}, + traits::{ActorType, ApubObject}, values::MediaTypeMarkdown, verify::verify_domains_match, }; use lemmy_db_schema::{ naive_now, source::person::{Person as DbPerson, PersonForm}, - DbPool, }; use lemmy_utils::{ utils::{check_slurs, check_slurs_opt, convert_datetime, markdown_to_html}, @@ -80,7 +79,7 @@ impl Person { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct ApubPerson(DbPerson); impl Deref for ApubPerson { @@ -99,6 +98,8 @@ impl From for ApubPerson { #[async_trait::async_trait(?Send)] impl ApubObject for ApubPerson { type DataType = LemmyContext; + type ApubType = Person; + type TombstoneType = Tombstone; fn last_refreshed_at(&self) -> Option { Some(self.last_refreshed_at) @@ -124,43 +125,8 @@ impl ApubObject for ApubPerson { .await??; Ok(()) } -} - -impl ActorType for ApubPerson { - fn is_local(&self) -> bool { - self.local - } - fn actor_id(&self) -> Url { - self.actor_id.to_owned().into_inner() - } - fn name(&self) -> String { - self.name.clone() - } - - fn public_key(&self) -> Option { - self.public_key.to_owned() - } - - fn private_key(&self) -> Option { - self.private_key.to_owned() - } - - fn inbox_url(&self) -> Url { - self.inbox_url.clone().into() - } - - fn shared_inbox_url(&self) -> Option { - self.shared_inbox_url.clone().map(|s| s.into_inner()) - } -} - -#[async_trait::async_trait(?Send)] -impl ToApub for ApubPerson { - type ApubType = Person; - type TombstoneType = Tombstone; - type DataType = DbPool; - async fn to_apub(&self, _pool: &DbPool) -> Result { + async fn to_apub(&self, _pool: &LemmyContext) -> Result { let kind = if self.bot_account { UserTypes::Service } else { @@ -203,15 +169,10 @@ impl ToApub for ApubPerson { }; Ok(person) } + fn to_tombstone(&self) -> Result { unimplemented!() } -} - -#[async_trait::async_trait(?Send)] -impl FromApub for ApubPerson { - type ApubType = Person; - type DataType = LemmyContext; async fn from_apub( person: &Person, @@ -265,6 +226,34 @@ impl FromApub for ApubPerson { } } +impl ActorType for ApubPerson { + fn is_local(&self) -> bool { + self.local + } + fn actor_id(&self) -> Url { + self.actor_id.to_owned().into_inner() + } + fn name(&self) -> String { + self.name.clone() + } + + fn public_key(&self) -> Option { + self.public_key.to_owned() + } + + fn private_key(&self) -> Option { + self.private_key.to_owned() + } + + fn inbox_url(&self) -> Url { + self.inbox_url.clone().into() + } + + fn shared_inbox_url(&self) -> Option { + self.shared_inbox_url.clone().map(|s| s.into_inner()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -291,7 +280,7 @@ mod tests { assert_eq!(person.bio.as_ref().unwrap().len(), 39); assert_eq!(request_counter, 0); - let to_apub = person.to_apub(context.pool()).await.unwrap(); + let to_apub = person.to_apub(&context).await.unwrap(); assert_json_include!(actual: json, expected: to_apub); DbPerson::delete(&*context.pool().get().unwrap(), person.id).unwrap(); diff --git a/crates/apub/src/objects/post.rs b/crates/apub/src/objects/post.rs index a56bbd59..8b016188 100644 --- a/crates/apub/src/objects/post.rs +++ b/crates/apub/src/objects/post.rs @@ -2,14 +2,11 @@ use crate::{ activities::{extract_community, verify_is_public, verify_person_in_community}, context::lemmy_context, fetcher::object_id::ObjectId, - objects::{create_tombstone, person::ApubPerson, ImageObject, Source}, + objects::{person::ApubPerson, tombstone::Tombstone, ImageObject, Source}, }; use activitystreams::{ base::AnyBase, - object::{ - kind::{ImageType, PageType}, - Tombstone, - }, + object::kind::{ImageType, PageType}, primitives::OneOrMany, public, unparsed::Unparsed, @@ -17,7 +14,7 @@ use activitystreams::{ use chrono::{DateTime, FixedOffset, NaiveDateTime}; use lemmy_api_common::blocking; use lemmy_apub_lib::{ - traits::{ActorType, ApubObject, FromApub, ToApub}, + traits::{ActorType, ApubObject}, values::{MediaTypeHtml, MediaTypeMarkdown}, verify::verify_domains_match, }; @@ -29,7 +26,6 @@ use lemmy_db_schema::{ post::{Post, PostForm}, }, traits::Crud, - DbPool, }; use lemmy_utils::{ request::fetch_site_data, @@ -133,6 +129,8 @@ impl From for ApubPost { #[async_trait::async_trait(?Send)] impl ApubObject for ApubPost { type DataType = LemmyContext; + type ApubType = Page; + type TombstoneType = Tombstone; fn last_refreshed_at(&self) -> Option { None @@ -158,20 +156,16 @@ impl ApubObject for ApubPost { .await??; Ok(()) } -} - -#[async_trait::async_trait(?Send)] -impl ToApub for ApubPost { - type ApubType = Page; - type TombstoneType = Tombstone; - type DataType = DbPool; // Turn a Lemmy post into an ActivityPub page that can be sent out over the network. - async fn to_apub(&self, pool: &DbPool) -> Result { + async fn to_apub(&self, context: &LemmyContext) -> Result { let creator_id = self.creator_id; - let creator = blocking(pool, move |conn| Person::read(conn, creator_id)).await??; + let creator = blocking(context.pool(), move |conn| Person::read(conn, creator_id)).await??; let community_id = self.community_id; - let community = blocking(pool, move |conn| Community::read(conn, community_id)).await??; + let community = blocking(context.pool(), move |conn| { + Community::read(conn, community_id) + }) + .await??; let source = self.body.clone().map(|body| Source { content: body, @@ -205,19 +199,11 @@ impl ToApub for ApubPost { } fn to_tombstone(&self) -> Result { - create_tombstone( - self.deleted, - self.ap_id.to_owned().into(), - self.updated, + Ok(Tombstone::new( PageType::Page, - ) + self.updated.unwrap_or(self.published), + )) } -} - -#[async_trait::async_trait(?Send)] -impl FromApub for ApubPost { - type ApubType = Page; - type DataType = LemmyContext; async fn from_apub( page: &Page, @@ -315,7 +301,7 @@ mod tests { assert!(post.stickied); assert_eq!(request_counter, 0); - let to_apub = post.to_apub(context.pool()).await.unwrap(); + let to_apub = post.to_apub(&context).await.unwrap(); assert_json_include!(actual: json, expected: to_apub); Post::delete(&*context.pool().get().unwrap(), post.id).unwrap(); diff --git a/crates/apub/src/objects/private_message.rs b/crates/apub/src/objects/private_message.rs index bd9cc1da..407ae810 100644 --- a/crates/apub/src/objects/private_message.rs +++ b/crates/apub/src/objects/private_message.rs @@ -1,12 +1,12 @@ use crate::{ context::lemmy_context, fetcher::object_id::ObjectId, - objects::{create_tombstone, person::ApubPerson, Source}, + objects::{person::ApubPerson, Source}, }; use activitystreams::{ base::AnyBase, chrono::NaiveDateTime, - object::{kind::NoteType, Tombstone}, + object::Tombstone, primitives::OneOrMany, unparsed::Unparsed, }; @@ -15,7 +15,7 @@ use chrono::{DateTime, FixedOffset}; use html2md::parse_html; use lemmy_api_common::blocking; use lemmy_apub_lib::{ - traits::{ApubObject, FromApub, ToApub}, + traits::ApubObject, values::{MediaTypeHtml, MediaTypeMarkdown}, verify::verify_domains_match, }; @@ -25,7 +25,6 @@ use lemmy_db_schema::{ private_message::{PrivateMessage, PrivateMessageForm}, }, traits::Crud, - DbPool, }; use lemmy_utils::{utils::convert_datetime, LemmyError}; use lemmy_websocket::LemmyContext; @@ -104,6 +103,8 @@ impl From for ApubPrivateMessage { #[async_trait::async_trait(?Send)] impl ApubObject for ApubPrivateMessage { type DataType = LemmyContext; + type ApubType = Note; + type TombstoneType = Tombstone; fn last_refreshed_at(&self) -> Option { None @@ -126,20 +127,14 @@ impl ApubObject for ApubPrivateMessage { // do nothing, because pm can't be fetched over http unimplemented!() } -} - -#[async_trait::async_trait(?Send)] -impl ToApub for ApubPrivateMessage { - type ApubType = Note; - type TombstoneType = Tombstone; - type DataType = DbPool; - async fn to_apub(&self, pool: &DbPool) -> Result { + async fn to_apub(&self, context: &LemmyContext) -> Result { let creator_id = self.creator_id; - let creator = blocking(pool, move |conn| Person::read(conn, creator_id)).await??; + let creator = blocking(context.pool(), move |conn| Person::read(conn, creator_id)).await??; let recipient_id = self.recipient_id; - let recipient = blocking(pool, move |conn| Person::read(conn, recipient_id)).await??; + let recipient = + blocking(context.pool(), move |conn| Person::read(conn, recipient_id)).await??; let note = Note { context: lemmy_context(), @@ -161,19 +156,8 @@ impl ToApub for ApubPrivateMessage { } fn to_tombstone(&self) -> Result { - create_tombstone( - self.deleted, - self.ap_id.to_owned().into(), - self.updated, - NoteType::Note, - ) + unimplemented!() } -} - -#[async_trait::async_trait(?Send)] -impl FromApub for ApubPrivateMessage { - type ApubType = Note; - type DataType = LemmyContext; async fn from_apub( note: &Note, @@ -253,7 +237,7 @@ mod tests { assert_eq!(pm.content.len(), 20); assert_eq!(request_counter, 0); - let to_apub = pm.to_apub(context.pool()).await.unwrap(); + let to_apub = pm.to_apub(&context).await.unwrap(); assert_json_include!(actual: json, expected: to_apub); PrivateMessage::delete(&*context.pool().get().unwrap(), pm.id).unwrap(); diff --git a/crates/apub/src/objects/tombstone.rs b/crates/apub/src/objects/tombstone.rs new file mode 100644 index 00000000..fc78c968 --- /dev/null +++ b/crates/apub/src/objects/tombstone.rs @@ -0,0 +1,33 @@ +use crate::context::lemmy_context; +use activitystreams::{ + base::AnyBase, + chrono::{DateTime, FixedOffset, NaiveDateTime}, + object::kind::TombstoneType, + primitives::OneOrMany, +}; +use lemmy_utils::utils::convert_datetime; +use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; + +#[skip_serializing_none] +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Tombstone { + #[serde(rename = "@context")] + context: OneOrMany, + #[serde(rename = "type")] + kind: TombstoneType, + former_type: String, + deleted: DateTime, +} + +impl Tombstone { + pub fn new(former_type: T, updated_time: NaiveDateTime) -> Tombstone { + Tombstone { + context: lemmy_context(), + kind: TombstoneType::Tombstone, + former_type: former_type.to_string(), + deleted: convert_datetime(updated_time), + } + } +} diff --git a/crates/apub_lib/src/traits.rs b/crates/apub_lib/src/traits.rs index c2c1d02a..2240946d 100644 --- a/crates/apub_lib/src/traits.rs +++ b/crates/apub_lib/src/traits.rs @@ -30,6 +30,9 @@ pub trait ActivityHandler { #[async_trait::async_trait(?Send)] pub trait ApubObject { type DataType; + type ApubType; + type TombstoneType; + /// If this object should be refetched after a certain interval, it should return the last refresh /// time here. This is mainly used to update remote actors. fn last_refreshed_at(&self) -> Option; @@ -42,6 +45,25 @@ pub trait ApubObject { Self: Sized; /// Marks the object as deleted in local db. Called when a tombstone is received. async fn delete(self, data: &Self::DataType) -> Result<(), LemmyError>; + + /// Trait for converting an object or actor into the respective ActivityPub type. + async fn to_apub(&self, data: &Self::DataType) -> Result; + fn to_tombstone(&self) -> Result; + + /// Converts an object from ActivityPub type to Lemmy internal type. + /// + /// * `apub` The object to read from + /// * `context` LemmyContext which holds DB pool, HTTP client etc + /// * `expected_domain` Domain where the object was received from. None in case of mod action. + /// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case + async fn from_apub( + apub: &Self::ApubType, + data: &Self::DataType, + expected_domain: &Url, + request_counter: &mut i32, + ) -> Result + where + Self: Sized; } /// Common methods provided by ActivityPub actors (community and person). Not all methods are @@ -71,35 +93,3 @@ pub trait ActorType { }) } } - -/// Trait for converting an object or actor into the respective ActivityPub type. -#[async_trait::async_trait(?Send)] -pub trait ToApub { - type ApubType; - type TombstoneType; - type DataType; - - async fn to_apub(&self, data: &Self::DataType) -> Result; - fn to_tombstone(&self) -> Result; -} - -#[async_trait::async_trait(?Send)] -pub trait FromApub { - type ApubType; - type DataType; - - /// Converts an object from ActivityPub type to Lemmy internal type. - /// - /// * `apub` The object to read from - /// * `context` LemmyContext which holds DB pool, HTTP client etc - /// * `expected_domain` Domain where the object was received from. None in case of mod action. - /// * `mod_action_allowed` True if the object can be a mod activity, ignore `expected_domain` in this case - async fn from_apub( - apub: &Self::ApubType, - data: &Self::DataType, - expected_domain: &Url, - request_counter: &mut i32, - ) -> Result - where - Self: Sized; -} diff --git a/crates/db_schema/src/impls/community.rs b/crates/db_schema/src/impls/community.rs index b2ebb3a0..1fac4a27 100644 --- a/crates/db_schema/src/impls/community.rs +++ b/crates/db_schema/src/impls/community.rs @@ -126,16 +126,6 @@ impl Community { community.select(actor_id).distinct().load::(conn) } - pub fn read_from_followers_url( - conn: &PgConnection, - followers_url_: &DbUrl, - ) -> Result { - use crate::schema::community::dsl::*; - community - .filter(followers_url.eq(followers_url_)) - .first::(conn) - } - pub fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result { use crate::schema::community::dsl::*; insert_into(community)