From: Nutomic Date: Sat, 25 Sep 2021 15:44:52 +0000 (+0000) Subject: Rewrite fetcher (#1792) X-Git-Url: http://these/git/readmes/%7B%60%24%7BarchiveTodayUrl%7D/static/%7BpictrsAvatarThumbnail%28user.avatar%29%7D?a=commitdiff_plain;h=527eefbe92a4796b957fb232831f7e7c80511ed9;p=lemmy.git Rewrite fetcher (#1792) * Use new fetcher implementation for post/comment * rewrite person fetch to use new fetcher * rewrite community to use new fetcher * rename new_fetcher to dereference_object_id * make ObjectId a newtype * handle deletion in new fetcher * rewrite apub object search to be generic * move upsert() method out of ApubObject trait * simplify ObjectId::new (and fix clippy) --- diff --git a/api_tests/src/post.spec.ts b/api_tests/src/post.spec.ts index 8836a2c7..50726204 100644 --- a/api_tests/src/post.spec.ts +++ b/api_tests/src/post.spec.ts @@ -135,7 +135,8 @@ test('Update a post', async () => { test('Sticky a post', async () => { let postRes = await createPost(alpha, betaCommunity.community.id); - let stickiedPostRes = await stickyPost(alpha, true, postRes.post_view.post); + let betaPost1 = (await resolvePost(beta, postRes.post_view.post)).post; + let stickiedPostRes = await stickyPost(beta, true, betaPost1.post); expect(stickiedPostRes.post_view.post.stickied).toBe(true); // Make sure that post is stickied on beta @@ -145,7 +146,7 @@ test('Sticky a post', async () => { expect(betaPost.post.stickied).toBe(true); // Unsticky a post - let unstickiedPost = await stickyPost(alpha, false, postRes.post_view.post); + let unstickiedPost = await stickyPost(beta, false, betaPost1.post); expect(unstickiedPost.post_view.post.stickied).toBe(false); // Make sure that post is unstickied on beta diff --git a/crates/api/src/comment.rs b/crates/api/src/comment.rs index 00e136ad..0af3324f 100644 --- a/crates/api/src/comment.rs +++ b/crates/api/src/comment.rs @@ -13,7 +13,7 @@ use lemmy_apub::{ undo_vote::UndoVote, vote::{Vote, VoteType}, }, - PostOrComment, + fetcher::post_or_comment::PostOrComment, }; use lemmy_db_queries::{source::comment::Comment_, Likeable, Saveable}; use lemmy_db_schema::{source::comment::*, LocalUserId}; diff --git a/crates/api/src/post.rs b/crates/api/src/post.rs index f0f025e2..71b20014 100644 --- a/crates/api/src/post.rs +++ b/crates/api/src/post.rs @@ -19,7 +19,7 @@ use lemmy_apub::{ }, CreateOrUpdateType, }, - PostOrComment, + fetcher::post_or_comment::PostOrComment, }; use lemmy_db_queries::{source::post::Post_, Crud, Likeable, Saveable}; use lemmy_db_schema::source::{moderator::*, post::*}; diff --git a/crates/api/src/site.rs b/crates/api/src/site.rs index b80467e3..82c94cbb 100644 --- a/crates/api/src/site.rs +++ b/crates/api/src/site.rs @@ -1,6 +1,7 @@ use crate::Perform; use actix_web::web::Data; use anyhow::Context; +use diesel::NotFound; use lemmy_api_common::{ blocking, build_federated_instances, @@ -9,24 +10,32 @@ use lemmy_api_common::{ is_admin, site::*, }; -use lemmy_apub::{build_actor_id_from_shortname, fetcher::search::search_by_apub_id, EndpointType}; +use lemmy_apub::{ + build_actor_id_from_shortname, + fetcher::search::{search_by_apub_id, SearchableObjects}, + EndpointType, +}; use lemmy_db_queries::{ from_opt_str_to_opt_enum, source::site::Site_, Crud, + DbPool, DeleteableOrRemoveable, ListingType, SearchType, SortType, }; -use lemmy_db_schema::source::{moderator::*, site::Site}; +use lemmy_db_schema::{ + source::{moderator::*, site::Site}, + PersonId, +}; use lemmy_db_views::{ - comment_view::CommentQueryBuilder, - post_view::PostQueryBuilder, + comment_view::{CommentQueryBuilder, CommentView}, + post_view::{PostQueryBuilder, PostView}, site_view::SiteView, }; use lemmy_db_views_actor::{ - community_view::CommunityQueryBuilder, + community_view::{CommunityQueryBuilder, CommunityView}, person_view::{PersonQueryBuilder, PersonViewSafe}, }; use lemmy_db_views_moderator::{ @@ -376,11 +385,52 @@ impl Perform for ResolveObject { _websocket_id: Option, ) -> Result { let local_user_view = get_local_user_view_from_jwt_opt(&self.auth, context.pool()).await?; - let res = search_by_apub_id(&self.q, local_user_view, context) + let res = search_by_apub_id(&self.q, context) .await .map_err(|_| ApiError::err("couldnt_find_object"))?; - Ok(res) + convert_response(res, local_user_view.map(|l| l.person.id), context.pool()) + .await + .map_err(|_| ApiError::err("couldnt_find_object").into()) + } +} + +async fn convert_response( + object: SearchableObjects, + user_id: Option, + pool: &DbPool, +) -> Result { + let removed_or_deleted; + let mut res = ResolveObjectResponse { + comment: None, + post: None, + community: None, + person: None, + }; + use SearchableObjects::*; + match object { + Person(p) => { + removed_or_deleted = p.deleted; + res.person = Some(blocking(pool, move |conn| PersonViewSafe::read(conn, p.id)).await??) + } + Community(c) => { + removed_or_deleted = c.deleted || c.removed; + res.community = + Some(blocking(pool, move |conn| CommunityView::read(conn, c.id, user_id)).await??) + } + Post(p) => { + removed_or_deleted = p.deleted || p.removed; + res.post = Some(blocking(pool, move |conn| PostView::read(conn, p.id, user_id)).await??) + } + Comment(c) => { + removed_or_deleted = c.deleted || c.removed; + res.comment = Some(blocking(pool, move |conn| CommentView::read(conn, c.id, user_id)).await??) + } + }; + // if the object was deleted from database, dont return it + if removed_or_deleted { + return Err(NotFound {}.into()); } + Ok(res) } #[async_trait::async_trait(?Send)] diff --git a/crates/api_crud/src/comment/create.rs b/crates/api_crud/src/comment/create.rs index 94c30692..76698700 100644 --- a/crates/api_crud/src/comment/create.rs +++ b/crates/api_crud/src/comment/create.rs @@ -15,9 +15,9 @@ use lemmy_apub::{ voting::vote::{Vote, VoteType}, CreateOrUpdateType, }, + fetcher::post_or_comment::PostOrComment, generate_apub_endpoint, EndpointType, - PostOrComment, }; use lemmy_db_queries::{source::comment::Comment_, Crud, Likeable}; use lemmy_db_schema::source::comment::*; diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 547c657c..e21c6f33 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -13,9 +13,9 @@ use lemmy_apub::{ voting::vote::{Vote, VoteType}, CreateOrUpdateType, }, + fetcher::post_or_comment::PostOrComment, generate_apub_endpoint, EndpointType, - PostOrComment, }; use lemmy_db_queries::{source::post::Post_, Crud, Likeable}; use lemmy_db_schema::source::post::*; diff --git a/crates/apub/src/activities/comment/create_or_update.rs b/crates/apub/src/activities/comment/create_or_update.rs index e6876e77..9ce17200 100644 --- a/crates/apub/src/activities/comment/create_or_update.rs +++ b/crates/apub/src/activities/comment/create_or_update.rs @@ -10,6 +10,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, objects::{comment::Note, FromApub, ToApub}, ActorType, }; @@ -26,7 +27,7 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct CreateOrUpdateComment { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], object: Note, cc: Vec, @@ -60,7 +61,7 @@ impl CreateOrUpdateComment { let maa = collect_non_local_mentions(comment, &community, context).await?; let create_or_update = CreateOrUpdateComment { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object: comment.to_apub(context.pool()).await?, cc: maa.ccs, @@ -84,11 +85,11 @@ impl ActivityHandler for CreateOrUpdateComment { request_counter: &mut i32, ) -> Result<(), LemmyError> { let community = extract_community(&self.cc, context, request_counter).await?; + let community_id = ObjectId::new(community.actor_id()); verify_activity(self)?; - verify_person_in_community(&self.actor, &community.actor_id(), context, request_counter) - .await?; - verify_domains_match(&self.actor, self.object.id_unchecked())?; + verify_person_in_community(&self.actor, &community_id, context, request_counter).await?; + verify_domains_match(self.actor.inner(), self.object.id_unchecked())?; // TODO: should add a check that the correct community is in cc (probably needs changes to // comment deserialization) self.object.verify(context, request_counter).await?; @@ -100,7 +101,8 @@ impl ActivityHandler for CreateOrUpdateComment { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let comment = Comment::from_apub(&self.object, context, &self.actor, request_counter).await?; + let comment = + Comment::from_apub(&self.object, context, self.actor.inner(), request_counter).await?; let recipients = get_notif_recipients(&self.actor, &comment, context, request_counter).await?; let notif_type = match self.kind { CreateOrUpdateType::Create => UserOperationCrud::CreateComment, diff --git a/crates/apub/src/activities/comment/mod.rs b/crates/apub/src/activities/comment/mod.rs index 44b8d245..8f27e9ba 100644 --- a/crates/apub/src/activities/comment/mod.rs +++ b/crates/apub/src/activities/comment/mod.rs @@ -1,4 +1,4 @@ -use crate::{fetcher::person::get_or_fetch_and_upsert_person, ActorType}; +use crate::{fetcher::object_id::ObjectId, ActorType}; use activitystreams::{ base::BaseExt, link::{LinkExt, Mention}, @@ -26,14 +26,14 @@ use url::Url; pub mod create_or_update; async fn get_notif_recipients( - actor: &Url, + actor: &ObjectId, comment: &Comment, context: &LemmyContext, request_counter: &mut i32, ) -> Result, LemmyError> { let post_id = comment.post_id; let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??; - let actor = get_or_fetch_and_upsert_person(actor, context, request_counter).await?; + let actor = actor.dereference(context, request_counter).await?; // Note: // Although mentions could be gotten from the post tags (they are included there), or the ccs, @@ -76,14 +76,17 @@ pub async fn collect_non_local_mentions( for mention in &mentions { // TODO should it be fetching it every time? if let Ok(actor_id) = fetch_webfinger_url(mention, context.client()).await { + let actor_id: ObjectId = ObjectId::new(actor_id); debug!("mention actor_id: {}", actor_id); addressed_ccs.push(actor_id.to_owned().to_string().parse()?); - let mention_person = get_or_fetch_and_upsert_person(&actor_id, context, &mut 0).await?; + let mention_person = actor_id.dereference(context, &mut 0).await?; inboxes.push(mention_person.get_shared_inbox_or_inbox_url()); let mut mention_tag = Mention::new(); - mention_tag.set_href(actor_id).set_name(mention.full_name()); + mention_tag + .set_href(actor_id.into()) + .set_name(mention.full_name()); tags.push(mention_tag); } } diff --git a/crates/apub/src/activities/community/add_mod.rs b/crates/apub/src/activities/community/add_mod.rs index 56013d4a..a066211f 100644 --- a/crates/apub/src/activities/community/add_mod.rs +++ b/crates/apub/src/activities/community/add_mod.rs @@ -9,7 +9,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, generate_moderators_url, ActorType, }; @@ -34,11 +34,11 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct AddMod { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], - object: Url, + object: ObjectId, target: Url, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: AddType, id: Url, @@ -57,11 +57,11 @@ impl AddMod { ) -> Result<(), LemmyError> { let id = generate_activity_id(AddType::Add)?; let add = AddMod { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: added_mod.actor_id(), + object: ObjectId::new(added_mod.actor_id()), target: generate_moderators_url(&community.actor_id)?.into(), - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: AddType::Add, id: id.clone(), context: lemmy_context(), @@ -84,7 +84,7 @@ impl ActivityHandler for AddMod { verify_activity(self)?; verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?; verify_mod_action(&self.actor, self.cc[0].clone(), context).await?; - verify_add_remove_moderator_target(&self.target, self.cc[0].clone())?; + verify_add_remove_moderator_target(&self.target, &self.cc[0])?; Ok(()) } @@ -93,9 +93,8 @@ impl ActivityHandler for AddMod { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let community = - get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?; - let new_mod = get_or_fetch_and_upsert_person(&self.object, context, request_counter).await?; + let community = self.cc[0].dereference(context, request_counter).await?; + let new_mod = self.object.dereference(context, request_counter).await?; // If we had to refetch the community while parsing the activity, then the new mod has already // been added. Skip it here as it would result in a duplicate key error. diff --git a/crates/apub/src/activities/community/announce.rs b/crates/apub/src/activities/community/announce.rs index d16345cc..96797c8f 100644 --- a/crates/apub/src/activities/community/announce.rs +++ b/crates/apub/src/activities/community/announce.rs @@ -19,6 +19,7 @@ use crate::{ }, activity_queue::send_activity_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, http::is_activity_already_known, insert_activity, ActorType, @@ -57,7 +58,7 @@ pub enum AnnouncableActivities { #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct AnnounceActivity { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], object: AnnouncableActivities, cc: Vec, @@ -78,7 +79,7 @@ impl AnnounceActivity { context: &LemmyContext, ) -> Result<(), LemmyError> { let announce = AnnounceActivity { - actor: community.actor_id(), + actor: ObjectId::new(community.actor_id()), to: [PublicUrl::Public], object, cc: vec![community.followers_url()], diff --git a/crates/apub/src/activities/community/block_user.rs b/crates/apub/src/activities/community/block_user.rs index 777faf9a..f7e81f97 100644 --- a/crates/apub/src/activities/community/block_user.rs +++ b/crates/apub/src/activities/community/block_user.rs @@ -8,7 +8,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -38,10 +38,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct BlockUserFromCommunity { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], - pub(in crate::activities::community) object: Url, - cc: [Url; 1], + pub(in crate::activities::community) object: ObjectId, + cc: [ObjectId; 1], #[serde(rename = "type")] kind: BlockType, id: Url, @@ -58,10 +58,10 @@ impl BlockUserFromCommunity { actor: &Person, ) -> Result { Ok(BlockUserFromCommunity { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: target.actor_id(), - cc: [community.actor_id()], + object: ObjectId::new(target.actor_id()), + cc: [ObjectId::new(community.actor_id())], kind: BlockType::Block, id: generate_activity_id(BlockType::Block)?, context: lemmy_context(), @@ -102,10 +102,8 @@ impl ActivityHandler for BlockUserFromCommunity { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let community = - get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?; - let blocked_user = - get_or_fetch_and_upsert_person(&self.object, context, request_counter).await?; + let community = self.cc[0].dereference(context, request_counter).await?; + let blocked_user = self.object.dereference(context, request_counter).await?; let community_user_ban_form = CommunityPersonBanForm { community_id: community.id, diff --git a/crates/apub/src/activities/community/remove_mod.rs b/crates/apub/src/activities/community/remove_mod.rs index c7175567..4960076c 100644 --- a/crates/apub/src/activities/community/remove_mod.rs +++ b/crates/apub/src/activities/community/remove_mod.rs @@ -10,7 +10,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, generate_moderators_url, ActorType, }; @@ -35,10 +35,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct RemoveMod { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], - pub(in crate::activities) object: Url, - cc: [Url; 1], + pub(in crate::activities) object: ObjectId, + cc: [ObjectId; 1], #[serde(rename = "type")] kind: RemoveType, // if target is set, this is means remove mod from community @@ -59,13 +59,13 @@ impl RemoveMod { ) -> Result<(), LemmyError> { let id = generate_activity_id(RemoveType::Remove)?; let remove = RemoveMod { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: removed_mod.actor_id(), + object: ObjectId::new(removed_mod.actor_id()), target: Some(generate_moderators_url(&community.actor_id)?.into()), id: id.clone(), context: lemmy_context(), - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: RemoveType::Remove, unparsed: Default::default(), }; @@ -87,10 +87,10 @@ impl ActivityHandler for RemoveMod { if let Some(target) = &self.target { verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?; verify_mod_action(&self.actor, self.cc[0].clone(), context).await?; - verify_add_remove_moderator_target(target, self.cc[0].clone())?; + verify_add_remove_moderator_target(target, &self.cc[0])?; } else { verify_delete_activity( - &self.object, + self.object.inner(), self, &self.cc[0], true, @@ -108,10 +108,8 @@ impl ActivityHandler for RemoveMod { request_counter: &mut i32, ) -> Result<(), LemmyError> { if self.target.is_some() { - let community = - get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?; - let remove_mod = - get_or_fetch_and_upsert_person(&self.object, context, request_counter).await?; + let community = self.cc[0].dereference(context, request_counter).await?; + let remove_mod = self.object.dereference(context, request_counter).await?; let form = CommunityModeratorForm { community_id: community.id, @@ -124,7 +122,14 @@ impl ActivityHandler for RemoveMod { // TODO: send websocket notification about removed mod Ok(()) } else { - receive_remove_action(&self.actor, &self.object, None, context, request_counter).await + receive_remove_action( + &self.actor, + self.object.inner(), + None, + context, + request_counter, + ) + .await } } } diff --git a/crates/apub/src/activities/community/undo_block_user.rs b/crates/apub/src/activities/community/undo_block_user.rs index 9c12dd3a..eec90682 100644 --- a/crates/apub/src/activities/community/undo_block_user.rs +++ b/crates/apub/src/activities/community/undo_block_user.rs @@ -8,7 +8,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -32,10 +32,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UndoBlockUserFromCommunity { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], object: BlockUserFromCommunity, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: UndoType, id: Url, @@ -56,10 +56,10 @@ impl UndoBlockUserFromCommunity { let id = generate_activity_id(UndoType::Undo)?; let undo = UndoBlockUserFromCommunity { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object: block, - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: UndoType::Undo, id: id.clone(), context: lemmy_context(), @@ -91,10 +91,12 @@ impl ActivityHandler for UndoBlockUserFromCommunity { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let community = - get_or_fetch_and_upsert_community(&self.cc[0], context, request_counter).await?; - let blocked_user = - get_or_fetch_and_upsert_person(&self.object.object, context, request_counter).await?; + let community = self.cc[0].dereference(context, request_counter).await?; + let blocked_user = self + .object + .object + .dereference(context, request_counter) + .await?; let community_user_ban_form = CommunityPersonBanForm { community_id: community.id, diff --git a/crates/apub/src/activities/community/update.rs b/crates/apub/src/activities/community/update.rs index 88b9a62a..a6da1c9f 100644 --- a/crates/apub/src/activities/community/update.rs +++ b/crates/apub/src/activities/community/update.rs @@ -8,6 +8,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, objects::{community::Group, ToApub}, ActorType, }; @@ -34,11 +35,11 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UpdateCommunity { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], // TODO: would be nice to use a separate struct here, which only contains the fields updated here object: Group, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: UpdateType, id: Url, @@ -56,10 +57,10 @@ impl UpdateCommunity { ) -> Result<(), LemmyError> { let id = generate_activity_id(UpdateType::Update)?; let update = UpdateCommunity { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object: community.to_apub(context.pool()).await?, - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: UpdateType::Update, id: id.clone(), context: lemmy_context(), diff --git a/crates/apub/src/activities/deletion/delete.rs b/crates/apub/src/activities/deletion/delete.rs index cb1fdbd9..8e8bd942 100644 --- a/crates/apub/src/activities/deletion/delete.rs +++ b/crates/apub/src/activities/deletion/delete.rs @@ -12,7 +12,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::person::get_or_fetch_and_upsert_person, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -49,6 +49,7 @@ use lemmy_websocket::{ UserOperationCrud, }; use serde::{Deserialize, Serialize}; +use serde_with::skip_serializing_none; use url::Url; /// This is very confusing, because there are four distinct cases to handle: @@ -59,13 +60,14 @@ use url::Url; /// /// TODO: we should probably change how community deletions work to simplify this. Probably by /// wrapping it in an announce just like other activities, instead of having the community send it. +#[skip_serializing_none] #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct Delete { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], pub(in crate::activities::deletion) object: Url, - pub(in crate::activities::deletion) cc: [Url; 1], + pub(in crate::activities::deletion) cc: [ObjectId; 1], #[serde(rename = "type")] kind: DeleteType, /// If summary is present, this is a mod action (Remove in Lemmy terms). Otherwise, its a user @@ -138,10 +140,10 @@ impl Delete { summary: Option, ) -> Result { Ok(Delete { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object: object_id, - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: DeleteType::Delete, summary, id: generate_activity_id(DeleteType::Delete)?, @@ -165,13 +167,13 @@ impl Delete { } pub(in crate::activities) async fn receive_remove_action( - actor: &Url, + actor: &ObjectId, object: &Url, reason: Option, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_person(actor, context, request_counter).await?; + let actor = actor.dereference(context, request_counter).await?; use UserOperationCrud::*; match DeletableObjects::read_from_db(object, context).await? { DeletableObjects::Community(community) => { diff --git a/crates/apub/src/activities/deletion/mod.rs b/crates/apub/src/activities/deletion/mod.rs index 350773f4..5843e501 100644 --- a/crates/apub/src/activities/deletion/mod.rs +++ b/crates/apub/src/activities/deletion/mod.rs @@ -4,7 +4,7 @@ use crate::{ verify_mod_action, verify_person_in_community, }, - fetcher::person::get_or_fetch_and_upsert_person, + fetcher::object_id::ObjectId, ActorType, }; use lemmy_api_common::blocking; @@ -99,22 +99,22 @@ impl DeletableObjects { pub(in crate::activities) async fn verify_delete_activity( object: &Url, activity: &dyn ActivityFields, - community_id: &Url, + community_id: &ObjectId, is_mod_action: bool, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { let object = DeletableObjects::read_from_db(object, context).await?; + let actor = ObjectId::new(activity.actor().clone()); match object { DeletableObjects::Community(c) => { if c.local { // can only do this check for local community, in remote case it would try to fetch the // deleted community (which fails) - verify_person_in_community(activity.actor(), community_id, context, request_counter) - .await?; + verify_person_in_community(&actor, community_id, context, request_counter).await?; } // community deletion is always a mod (or admin) action - verify_mod_action(activity.actor(), c.actor_id(), context).await?; + verify_mod_action(&actor, ObjectId::new(c.actor_id()), context).await?; } DeletableObjects::Post(p) => { verify_delete_activity_post_or_comment( @@ -145,14 +145,15 @@ pub(in crate::activities) async fn verify_delete_activity( async fn verify_delete_activity_post_or_comment( activity: &dyn ActivityFields, object_id: &Url, - community_id: &Url, + community_id: &ObjectId, is_mod_action: bool, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - verify_person_in_community(activity.actor(), community_id, context, request_counter).await?; + let actor = ObjectId::new(activity.actor().clone()); + verify_person_in_community(&actor, community_id, context, request_counter).await?; if is_mod_action { - verify_mod_action(activity.actor(), community_id.clone(), context).await?; + verify_mod_action(&actor, community_id.clone(), context).await?; } else { // domain of post ap_id and post.creator ap_id are identical, so we just check the former verify_domains_match(activity.actor(), object_id)?; @@ -171,7 +172,7 @@ struct WebsocketMessages { /// because of the mod log async fn receive_delete_action( object: &Url, - actor: &Url, + actor: &ObjectId, ws_messages: WebsocketMessages, deleted: bool, context: &LemmyContext, @@ -180,7 +181,7 @@ async fn receive_delete_action( match DeletableObjects::read_from_db(object, context).await? { DeletableObjects::Community(community) => { if community.local { - let mod_ = get_or_fetch_and_upsert_person(actor, context, request_counter).await?; + let mod_ = actor.dereference(context, request_counter).await?; let object = community.actor_id(); send_apub_delete(&mod_, &community.clone(), object, true, context).await?; } diff --git a/crates/apub/src/activities/deletion/undo_delete.rs b/crates/apub/src/activities/deletion/undo_delete.rs index bec7d76c..2dbbf9c4 100644 --- a/crates/apub/src/activities/deletion/undo_delete.rs +++ b/crates/apub/src/activities/deletion/undo_delete.rs @@ -13,6 +13,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -38,10 +39,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UndoDelete { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], object: Delete, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: UndoType, id: Url, @@ -109,10 +110,10 @@ impl UndoDelete { let id = generate_activity_id(UndoType::Undo)?; let undo = UndoDelete { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object, - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: UndoType::Undo, id: id.clone(), context: lemmy_context(), diff --git a/crates/apub/src/activities/following/accept.rs b/crates/apub/src/activities/following/accept.rs index c76263cc..a7472bc1 100644 --- a/crates/apub/src/activities/following/accept.rs +++ b/crates/apub/src/activities/following/accept.rs @@ -7,7 +7,7 @@ use crate::{ }, activity_queue::send_activity_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -31,8 +31,8 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct AcceptFollowCommunity { - actor: Url, - to: Url, + actor: ObjectId, + to: ObjectId, object: FollowCommunity, #[serde(rename = "type")] kind: AcceptType, @@ -57,8 +57,8 @@ impl AcceptFollowCommunity { .await??; let accept = AcceptFollowCommunity { - actor: community.actor_id(), - to: person.actor_id(), + actor: ObjectId::new(community.actor_id()), + to: ObjectId::new(person.actor_id()), object: follow, kind: AcceptType::Accept, id: generate_activity_id(AcceptType::Accept)?, @@ -78,8 +78,8 @@ impl ActivityHandler for AcceptFollowCommunity { request_counter: &mut i32, ) -> Result<(), LemmyError> { verify_activity(self)?; - verify_urls_match(&self.to, self.object.actor())?; - verify_urls_match(&self.actor, &self.object.to)?; + verify_urls_match(self.to.inner(), self.object.actor())?; + verify_urls_match(self.actor(), self.object.to.inner())?; verify_community(&self.actor, context, request_counter).await?; self.object.verify(context, request_counter).await?; Ok(()) @@ -90,8 +90,8 @@ impl ActivityHandler for AcceptFollowCommunity { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_community(&self.actor, context, request_counter).await?; - let to = get_or_fetch_and_upsert_person(&self.to, context, request_counter).await?; + let actor = self.actor.dereference(context, request_counter).await?; + let to = self.to.dereference(context, request_counter).await?; // This will throw an error if no follow was requested blocking(context.pool(), move |conn| { CommunityFollower::follow_accepted(conn, actor.id, to.id) diff --git a/crates/apub/src/activities/following/follow.rs b/crates/apub/src/activities/following/follow.rs index e6ca747a..8af3d388 100644 --- a/crates/apub/src/activities/following/follow.rs +++ b/crates/apub/src/activities/following/follow.rs @@ -7,7 +7,7 @@ use crate::{ }, activity_queue::send_activity_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -31,9 +31,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct FollowCommunity { - actor: Url, - pub(in crate::activities::following) to: Url, - pub(in crate::activities::following) object: Url, + actor: ObjectId, + // TODO: is there any reason to put the same community id twice, in to and object? + pub(in crate::activities::following) to: ObjectId, + pub(in crate::activities::following) object: ObjectId, #[serde(rename = "type")] kind: FollowType, id: Url, @@ -49,9 +50,9 @@ impl FollowCommunity { community: &Community, ) -> Result { Ok(FollowCommunity { - actor: actor.actor_id(), - to: community.actor_id(), - object: community.actor_id(), + actor: ObjectId::new(actor.actor_id()), + to: ObjectId::new(community.actor_id()), + object: ObjectId::new(community.actor_id()), kind: FollowType::Follow, id: generate_activity_id(FollowType::Follow)?, context: lemmy_context(), @@ -87,7 +88,7 @@ impl ActivityHandler for FollowCommunity { request_counter: &mut i32, ) -> Result<(), LemmyError> { verify_activity(self)?; - verify_urls_match(&self.to, &self.object)?; + verify_urls_match(self.to.inner(), self.object.inner())?; verify_person(&self.actor, context, request_counter).await?; Ok(()) } @@ -97,9 +98,8 @@ impl ActivityHandler for FollowCommunity { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?; - let community = - get_or_fetch_and_upsert_community(&self.object, context, request_counter).await?; + let actor = self.actor.dereference(context, request_counter).await?; + let community = self.object.dereference(context, request_counter).await?; let community_follower_form = CommunityFollowerForm { community_id: community.id, person_id: actor.id, diff --git a/crates/apub/src/activities/following/undo.rs b/crates/apub/src/activities/following/undo.rs index 092036bb..f35b3095 100644 --- a/crates/apub/src/activities/following/undo.rs +++ b/crates/apub/src/activities/following/undo.rs @@ -7,7 +7,7 @@ use crate::{ }, activity_queue::send_activity_new, extensions::context::lemmy_context, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -31,8 +31,8 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UndoFollowCommunity { - actor: Url, - to: Url, + actor: ObjectId, + to: ObjectId, object: FollowCommunity, #[serde(rename = "type")] kind: UndoType, @@ -51,8 +51,8 @@ impl UndoFollowCommunity { ) -> Result<(), LemmyError> { let object = FollowCommunity::new(actor, community)?; let undo = UndoFollowCommunity { - actor: actor.actor_id(), - to: community.actor_id(), + actor: ObjectId::new(actor.actor_id()), + to: ObjectId::new(community.actor_id()), object, kind: UndoType::Undo, id: generate_activity_id(UndoType::Undo)?, @@ -72,8 +72,8 @@ impl ActivityHandler for UndoFollowCommunity { request_counter: &mut i32, ) -> Result<(), LemmyError> { verify_activity(self)?; - verify_urls_match(&self.to, &self.object.object)?; - verify_urls_match(&self.actor, self.object.actor())?; + verify_urls_match(self.to.inner(), self.object.object.inner())?; + verify_urls_match(self.actor(), self.object.actor())?; verify_person(&self.actor, context, request_counter).await?; self.object.verify(context, request_counter).await?; Ok(()) @@ -84,8 +84,8 @@ impl ActivityHandler for UndoFollowCommunity { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?; - let community = get_or_fetch_and_upsert_community(&self.to, context, request_counter).await?; + let actor = self.actor.dereference(context, request_counter).await?; + let community = self.to.dereference(context, request_counter).await?; let community_follower_form = CommunityFollowerForm { community_id: community.id, diff --git a/crates/apub/src/activities/mod.rs b/crates/apub/src/activities/mod.rs index a846a0e7..3cfaca85 100644 --- a/crates/apub/src/activities/mod.rs +++ b/crates/apub/src/activities/mod.rs @@ -1,7 +1,7 @@ use crate::{ check_community_or_site_ban, check_is_apub_id_valid, - fetcher::{community::get_or_fetch_and_upsert_community, person::get_or_fetch_and_upsert_person}, + fetcher::object_id::ObjectId, generate_moderators_url, }; use anyhow::anyhow; @@ -39,11 +39,11 @@ pub enum CreateOrUpdateType { /// Checks that the specified Url actually identifies a Person (by fetching it), and that the person /// doesn't have a site ban. async fn verify_person( - person_id: &Url, + person_id: &ObjectId, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let person = get_or_fetch_and_upsert_person(person_id, context, request_counter).await?; + let person = person_id.dereference(context, request_counter).await?; if person.banned { return Err(anyhow!("Person {} is banned", person_id).into()); } @@ -58,7 +58,8 @@ pub(crate) async fn extract_community( let mut cc_iter = cc.iter(); loop { if let Some(cid) = cc_iter.next() { - if let Ok(c) = get_or_fetch_and_upsert_community(cid, context, request_counter).await { + let cid = ObjectId::new(cid.clone()); + if let Ok(c) = cid.dereference(context, request_counter).await { break Ok(c); } } else { @@ -70,23 +71,23 @@ pub(crate) async fn extract_community( /// Fetches the person and community to verify their type, then checks if person is banned from site /// or community. pub(crate) async fn verify_person_in_community( - person_id: &Url, - community_id: &Url, + person_id: &ObjectId, + community_id: &ObjectId, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let community = get_or_fetch_and_upsert_community(community_id, context, request_counter).await?; - let person = get_or_fetch_and_upsert_person(person_id, context, request_counter).await?; + let community = community_id.dereference(context, request_counter).await?; + let person = person_id.dereference(context, request_counter).await?; check_community_or_site_ban(&person, community.id, context.pool()).await } /// Simply check that the url actually refers to a valid group. async fn verify_community( - community_id: &Url, + community_id: &ObjectId, context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - get_or_fetch_and_upsert_community(community_id, context, request_counter).await?; + community_id.dereference(context, request_counter).await?; Ok(()) } @@ -100,12 +101,12 @@ fn verify_activity(activity: &dyn ActivityFields) -> Result<(), LemmyError> { /// because in case of remote communities, admins can also perform mod actions. As admin status /// is not federated, we cant verify their actions remotely. pub(crate) async fn verify_mod_action( - actor_id: &Url, - community: Url, + actor_id: &ObjectId, + community_id: ObjectId, context: &LemmyContext, ) -> Result<(), LemmyError> { let community = blocking(context.pool(), move |conn| { - Community::read_from_apub_id(conn, &community.into()) + Community::read_from_apub_id(conn, &community_id.into()) }) .await??; @@ -133,8 +134,11 @@ pub(crate) async fn verify_mod_action( /// For Add/Remove community moderator activities, check that the target field actually contains /// /c/community/moderators. Any different values are unsupported. -fn verify_add_remove_moderator_target(target: &Url, community: Url) -> Result<(), LemmyError> { - if target != &generate_moderators_url(&community.into())?.into_inner() { +fn verify_add_remove_moderator_target( + target: &Url, + community: &ObjectId, +) -> Result<(), LemmyError> { + if target != &generate_moderators_url(&community.clone().into())?.into_inner() { return Err(anyhow!("Unkown target url").into()); } Ok(()) diff --git a/crates/apub/src/activities/post/create_or_update.rs b/crates/apub/src/activities/post/create_or_update.rs index eff56ce2..8a48da5f 100644 --- a/crates/apub/src/activities/post/create_or_update.rs +++ b/crates/apub/src/activities/post/create_or_update.rs @@ -1,7 +1,6 @@ use crate::{ activities::{ community::announce::AnnouncableActivities, - extract_community, generate_activity_id, verify_activity, verify_mod_action, @@ -10,7 +9,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::person::get_or_fetch_and_upsert_person, + fetcher::object_id::ObjectId, objects::{post::Page, FromApub, ToApub}, ActorType, }; @@ -34,10 +33,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct CreateOrUpdatePost { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], object: Page, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: CreateOrUpdateType, id: Url, @@ -62,10 +61,10 @@ impl CreateOrUpdatePost { let id = generate_activity_id(kind.clone())?; let create_or_update = CreateOrUpdatePost { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object: post.to_apub(context.pool()).await?, - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind, id: id.clone(), context: lemmy_context(), @@ -85,13 +84,12 @@ impl ActivityHandler for CreateOrUpdatePost { request_counter: &mut i32, ) -> Result<(), LemmyError> { verify_activity(self)?; - let community = extract_community(&self.cc, context, request_counter).await?; - let community_id = community.actor_id(); - verify_person_in_community(&self.actor, &community_id, context, request_counter).await?; + let community = self.cc[0].dereference(context, request_counter).await?; + verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?; match self.kind { CreateOrUpdateType::Create => { - verify_domains_match(&self.actor, self.object.id_unchecked())?; - verify_urls_match(&self.actor, &self.object.attributed_to)?; + verify_domains_match(self.actor.inner(), self.object.id_unchecked())?; + verify_urls_match(self.actor(), self.object.attributed_to.inner())?; // Check that the post isnt locked or stickied, as that isnt possible for newly created posts. // However, when fetching a remote post we generate a new create activity with the current // locked/stickied value, so this check may fail. So only check if its a local community, @@ -105,10 +103,10 @@ impl ActivityHandler for CreateOrUpdatePost { CreateOrUpdateType::Update => { let is_mod_action = self.object.is_mod_action(context.pool()).await?; if is_mod_action { - verify_mod_action(&self.actor, community_id, context).await?; + verify_mod_action(&self.actor, self.cc[0].clone(), context).await?; } else { - verify_domains_match(&self.actor, self.object.id_unchecked())?; - verify_urls_match(&self.actor, &self.object.attributed_to)?; + verify_domains_match(self.actor.inner(), self.object.id_unchecked())?; + verify_urls_match(self.actor(), self.object.attributed_to.inner())?; } } } @@ -121,7 +119,7 @@ impl ActivityHandler for CreateOrUpdatePost { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?; + let actor = self.actor.dereference(context, request_counter).await?; let post = Post::from_apub(&self.object, context, &actor.actor_id(), request_counter).await?; let notif_type = match self.kind { diff --git a/crates/apub/src/activities/private_message/create_or_update.rs b/crates/apub/src/activities/private_message/create_or_update.rs index 98a26d80..83f4824b 100644 --- a/crates/apub/src/activities/private_message/create_or_update.rs +++ b/crates/apub/src/activities/private_message/create_or_update.rs @@ -2,6 +2,7 @@ use crate::{ activities::{generate_activity_id, verify_activity, verify_person, CreateOrUpdateType}, activity_queue::send_activity_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, objects::{private_message::Note, FromApub, ToApub}, ActorType, }; @@ -21,9 +22,8 @@ pub struct CreateOrUpdatePrivateMessage { #[serde(rename = "@context")] pub context: OneOrMany, id: Url, - actor: Url, - to: Url, - cc: [Url; 0], + actor: ObjectId, + to: ObjectId, object: Note, #[serde(rename = "type")] kind: CreateOrUpdateType, @@ -46,9 +46,8 @@ impl CreateOrUpdatePrivateMessage { let create_or_update = CreateOrUpdatePrivateMessage { context: lemmy_context(), id: id.clone(), - actor: actor.actor_id(), - to: recipient.actor_id(), - cc: [], + actor: ObjectId::new(actor.actor_id()), + to: ObjectId::new(recipient.actor_id()), object: private_message.to_apub(context.pool()).await?, kind, unparsed: Default::default(), @@ -66,7 +65,7 @@ impl ActivityHandler for CreateOrUpdatePrivateMessage { ) -> Result<(), LemmyError> { verify_activity(self)?; verify_person(&self.actor, context, request_counter).await?; - verify_domains_match(&self.actor, self.object.id_unchecked())?; + verify_domains_match(self.actor.inner(), self.object.id_unchecked())?; self.object.verify(context, request_counter).await?; Ok(()) } @@ -77,7 +76,7 @@ impl ActivityHandler for CreateOrUpdatePrivateMessage { request_counter: &mut i32, ) -> Result<(), LemmyError> { let private_message = - PrivateMessage::from_apub(&self.object, context, &self.actor, request_counter).await?; + PrivateMessage::from_apub(&self.object, context, self.actor.inner(), request_counter).await?; let notif_type = match self.kind { CreateOrUpdateType::Create => UserOperationCrud::CreatePrivateMessage, diff --git a/crates/apub/src/activities/private_message/delete.rs b/crates/apub/src/activities/private_message/delete.rs index 47e1a71a..82aad317 100644 --- a/crates/apub/src/activities/private_message/delete.rs +++ b/crates/apub/src/activities/private_message/delete.rs @@ -2,6 +2,7 @@ use crate::{ activities::{generate_activity_id, verify_activity, verify_person}, activity_queue::send_activity_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -22,8 +23,8 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct DeletePrivateMessage { - actor: Url, - to: Url, + actor: ObjectId, + to: ObjectId, pub(in crate::activities::private_message) object: Url, #[serde(rename = "type")] kind: DeleteType, @@ -40,8 +41,8 @@ impl DeletePrivateMessage { pm: &PrivateMessage, ) -> Result { Ok(DeletePrivateMessage { - actor: actor.actor_id(), - to: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), + to: ObjectId::new(actor.actor_id()), object: pm.ap_id.clone().into(), kind: DeleteType::Delete, id: generate_activity_id(DeleteType::Delete)?, @@ -74,7 +75,7 @@ impl ActivityHandler for DeletePrivateMessage { ) -> Result<(), LemmyError> { verify_activity(self)?; verify_person(&self.actor, context, request_counter).await?; - verify_domains_match(&self.actor, &self.object)?; + verify_domains_match(self.actor.inner(), &self.object)?; Ok(()) } diff --git a/crates/apub/src/activities/private_message/undo_delete.rs b/crates/apub/src/activities/private_message/undo_delete.rs index 911a17c7..2dc9d724 100644 --- a/crates/apub/src/activities/private_message/undo_delete.rs +++ b/crates/apub/src/activities/private_message/undo_delete.rs @@ -7,6 +7,7 @@ use crate::{ }, activity_queue::send_activity_new, extensions::context::lemmy_context, + fetcher::object_id::ObjectId, ActorType, }; use activitystreams::{ @@ -27,8 +28,8 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UndoDeletePrivateMessage { - actor: Url, - to: Url, + actor: ObjectId, + to: ObjectId, object: DeletePrivateMessage, #[serde(rename = "type")] kind: UndoType, @@ -52,8 +53,8 @@ impl UndoDeletePrivateMessage { let object = DeletePrivateMessage::new(actor, pm)?; let id = generate_activity_id(UndoType::Undo)?; let undo = UndoDeletePrivateMessage { - actor: actor.actor_id(), - to: recipient.actor_id(), + actor: ObjectId::new(actor.actor_id()), + to: ObjectId::new(recipient.actor_id()), object, kind: UndoType::Undo, id: id.clone(), @@ -74,8 +75,8 @@ impl ActivityHandler for UndoDeletePrivateMessage { ) -> Result<(), LemmyError> { verify_activity(self)?; verify_person(&self.actor, context, request_counter).await?; - verify_urls_match(&self.actor, self.object.actor())?; - verify_domains_match(&self.actor, &self.object.object)?; + verify_urls_match(self.actor(), self.object.actor())?; + verify_domains_match(self.actor(), &self.object.object)?; self.object.verify(context, request_counter).await?; Ok(()) } diff --git a/crates/apub/src/activities/undo_remove.rs b/crates/apub/src/activities/undo_remove.rs index 9720c06f..eaf3684d 100644 --- a/crates/apub/src/activities/undo_remove.rs +++ b/crates/apub/src/activities/undo_remove.rs @@ -1,7 +1,10 @@ -use crate::activities::{ - community::remove_mod::RemoveMod, - deletion::{undo_delete::UndoDelete, verify_delete_activity}, - verify_activity, +use crate::{ + activities::{ + community::remove_mod::RemoveMod, + deletion::{undo_delete::UndoDelete, verify_delete_activity}, + verify_activity, + }, + fetcher::object_id::ObjectId, }; use activitystreams::{ activity::kind::UndoType, @@ -10,6 +13,7 @@ use activitystreams::{ unparsed::Unparsed, }; use lemmy_apub_lib::{values::PublicUrl, ActivityFields, ActivityHandler}; +use lemmy_db_schema::source::{community::Community, person::Person}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; use serde::{Deserialize, Serialize}; @@ -18,11 +22,11 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UndoRemovePostCommentOrCommunity { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], // Note, there is no such thing as Undo/Remove/Mod, so we ignore that object: RemoveMod, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: UndoType, id: Url, @@ -43,7 +47,7 @@ impl ActivityHandler for UndoRemovePostCommentOrCommunity { self.object.verify(context, request_counter).await?; verify_delete_activity( - &self.object.object, + self.object.object.inner(), self, &self.cc[0], true, @@ -59,6 +63,6 @@ impl ActivityHandler for UndoRemovePostCommentOrCommunity { context: &LemmyContext, _request_counter: &mut i32, ) -> Result<(), LemmyError> { - UndoDelete::receive_undo_remove_action(&self.object.object, context).await + UndoDelete::receive_undo_remove_action(self.object.object.inner(), context).await } } diff --git a/crates/apub/src/activities/voting/undo_vote.rs b/crates/apub/src/activities/voting/undo_vote.rs index 0f0eb1ff..e11d2960 100644 --- a/crates/apub/src/activities/voting/undo_vote.rs +++ b/crates/apub/src/activities/voting/undo_vote.rs @@ -12,10 +12,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::{ - objects::get_or_fetch_and_insert_post_or_comment, - person::get_or_fetch_and_upsert_person, - }, + fetcher::object_id::ObjectId, ActorType, PostOrComment, }; @@ -41,10 +38,10 @@ use url::Url; #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct UndoVote { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], object: Vote, - cc: [Url; 1], + cc: [ObjectId; 1], #[serde(rename = "type")] kind: UndoType, id: Url, @@ -70,10 +67,10 @@ impl UndoVote { let object = Vote::new(object, actor, &community, kind.clone())?; let id = generate_activity_id(UndoType::Undo)?; let undo_vote = UndoVote { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], object, - cc: [community.actor_id()], + cc: [ObjectId::new(community.actor_id())], kind: UndoType::Undo, id: id.clone(), context: lemmy_context(), @@ -93,7 +90,7 @@ impl ActivityHandler for UndoVote { ) -> Result<(), LemmyError> { verify_activity(self)?; verify_person_in_community(&self.actor, &self.cc[0], context, request_counter).await?; - verify_urls_match(&self.actor, self.object.actor())?; + verify_urls_match(self.actor(), self.object.actor())?; self.object.verify(context, request_counter).await?; Ok(()) } @@ -103,10 +100,12 @@ impl ActivityHandler for UndoVote { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?; - let object = - get_or_fetch_and_insert_post_or_comment(&self.object.object, context, request_counter) - .await?; + let actor = self.actor.dereference(context, request_counter).await?; + let object = self + .object + .object + .dereference(context, request_counter) + .await?; match object { PostOrComment::Post(p) => undo_vote_post(actor, p.deref(), context).await, PostOrComment::Comment(c) => undo_vote_comment(actor, c.deref(), context).await, diff --git a/crates/apub/src/activities/voting/vote.rs b/crates/apub/src/activities/voting/vote.rs index 95d09961..69a9c3ba 100644 --- a/crates/apub/src/activities/voting/vote.rs +++ b/crates/apub/src/activities/voting/vote.rs @@ -8,10 +8,7 @@ use crate::{ }, activity_queue::send_to_community_new, extensions::context::lemmy_context, - fetcher::{ - objects::get_or_fetch_and_insert_post_or_comment, - person::get_or_fetch_and_upsert_person, - }, + fetcher::object_id::ObjectId, ActorType, PostOrComment, }; @@ -61,10 +58,10 @@ impl From<&VoteType> for i16 { #[derive(Clone, Debug, Deserialize, Serialize, ActivityFields)] #[serde(rename_all = "camelCase")] pub struct Vote { - actor: Url, + actor: ObjectId, to: [PublicUrl; 1], - pub(in crate::activities::voting) object: Url, - cc: [Url; 1], + pub(in crate::activities::voting) object: ObjectId, + cc: [ObjectId; 1], #[serde(rename = "type")] pub(in crate::activities::voting) kind: VoteType, id: Url, @@ -82,10 +79,10 @@ impl Vote { kind: VoteType, ) -> Result { Ok(Vote { - actor: actor.actor_id(), + actor: ObjectId::new(actor.actor_id()), to: [PublicUrl::Public], - object: object.ap_id(), - cc: [community.actor_id()], + object: ObjectId::new(object.ap_id()), + cc: [ObjectId::new(community.actor_id())], kind: kind.clone(), id: generate_activity_id(kind)?, context: lemmy_context(), @@ -129,9 +126,8 @@ impl ActivityHandler for Vote { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - let actor = get_or_fetch_and_upsert_person(&self.actor, context, request_counter).await?; - let object = - get_or_fetch_and_insert_post_or_comment(&self.object, context, request_counter).await?; + let actor = self.actor.dereference(context, request_counter).await?; + let object = self.object.dereference(context, request_counter).await?; match object { PostOrComment::Post(p) => vote_post(&self.kind, actor, p.deref(), context).await, PostOrComment::Comment(c) => vote_comment(&self.kind, actor, c.deref(), context).await, diff --git a/crates/apub/src/fetcher/community.rs b/crates/apub/src/fetcher/community.rs index 83aa7b51..99dd3f85 100644 --- a/crates/apub/src/fetcher/community.rs +++ b/crates/apub/src/fetcher/community.rs @@ -1,92 +1,23 @@ use crate::{ activities::community::announce::AnnounceActivity, - fetcher::{ - fetch::fetch_remote_object, - is_deleted, - person::get_or_fetch_and_upsert_person, - should_refetch_actor, - }, - objects::{community::Group, FromApub}, + fetcher::{fetch::fetch_remote_object, object_id::ObjectId}, + objects::community::Group, }; use activitystreams::collection::{CollectionExt, OrderedCollection}; use anyhow::Context; -use diesel::result::Error::NotFound; use lemmy_api_common::blocking; use lemmy_apub_lib::ActivityHandler; -use lemmy_db_queries::{source::community::Community_, ApubObject, Joinable}; -use lemmy_db_schema::source::community::{Community, CommunityModerator, CommunityModeratorForm}; +use lemmy_db_queries::Joinable; +use lemmy_db_schema::source::{ + community::{Community, CommunityModerator, CommunityModeratorForm}, + person::Person, +}; use lemmy_db_views_actor::community_moderator_view::CommunityModeratorView; use lemmy_utils::{location_info, LemmyError}; use lemmy_websocket::LemmyContext; -use log::debug; use url::Url; -/// Get a community from its apub ID. -/// -/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. -/// Otherwise it is fetched from the remote instance, stored and returned. -pub(crate) async fn get_or_fetch_and_upsert_community( - apub_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let apub_id_owned = apub_id.to_owned(); - let community = blocking(context.pool(), move |conn| { - Community::read_from_apub_id(conn, &apub_id_owned.into()) - }) - .await?; - - match community { - Ok(c) if !c.local && should_refetch_actor(c.last_refreshed_at) => { - debug!("Fetching and updating from remote community: {}", apub_id); - fetch_remote_community(apub_id, context, Some(c), recursion_counter).await - } - Ok(c) => Ok(c), - Err(NotFound {}) => { - debug!("Fetching and creating remote community: {}", apub_id); - fetch_remote_community(apub_id, context, None, recursion_counter).await - } - Err(e) => Err(e.into()), - } -} - -/// Request a community by apub ID from a remote instance, including moderators. If `old_community`, -/// is set, this is an update for a community which is already known locally. If not, we don't know -/// the community yet and also pull the outbox, to get some initial posts. -async fn fetch_remote_community( - apub_id: &Url, - context: &LemmyContext, - old_community: Option, - request_counter: &mut i32, -) -> Result { - let group = fetch_remote_object::(context.client(), apub_id, request_counter).await; - - if let Some(c) = old_community.to_owned() { - if is_deleted(&group) { - blocking(context.pool(), move |conn| { - Community::update_deleted(conn, c.id, true) - }) - .await??; - } else if group.is_err() { - // If fetching failed, return the existing data. - return Ok(c); - } - } - - let group = group?; - let community = Community::from_apub(&group, context, apub_id, request_counter).await?; - - update_community_mods(&group, &community, context, request_counter).await?; - - // only fetch outbox for new communities, otherwise this can create an infinite loop - if old_community.is_none() { - fetch_community_outbox(context, &group.outbox, request_counter).await? - } - - Ok(community) -} - -async fn update_community_mods( +pub(crate) async fn update_community_mods( group: &Group, community: &Community, context: &LemmyContext, @@ -113,8 +44,9 @@ async fn update_community_mods( } // Add new mods to database which have been added to moderators collection - for mod_uri in new_moderators { - let mod_user = get_or_fetch_and_upsert_person(&mod_uri, context, request_counter).await?; + for mod_id in new_moderators { + let mod_id = ObjectId::new(mod_id); + let mod_user: Person = mod_id.dereference(context, request_counter).await?; if !current_moderators .clone() @@ -136,7 +68,7 @@ async fn update_community_mods( Ok(()) } -async fn fetch_community_outbox( +pub(crate) async fn fetch_community_outbox( context: &LemmyContext, outbox: &Url, recursion_counter: &mut i32, @@ -160,7 +92,7 @@ async fn fetch_community_outbox( Ok(()) } -pub(crate) async fn fetch_community_mods( +async fn fetch_community_mods( context: &LemmyContext, group: &Group, recursion_counter: &mut i32, diff --git a/crates/apub/src/fetcher/deletable_apub_object.rs b/crates/apub/src/fetcher/deletable_apub_object.rs new file mode 100644 index 00000000..5df90dd1 --- /dev/null +++ b/crates/apub/src/fetcher/deletable_apub_object.rs @@ -0,0 +1,85 @@ +use crate::fetcher::post_or_comment::PostOrComment; +use lemmy_api_common::blocking; +use lemmy_db_queries::source::{ + comment::Comment_, + community::Community_, + person::Person_, + post::Post_, +}; +use lemmy_db_schema::source::{comment::Comment, community::Community, person::Person, post::Post}; +use lemmy_utils::LemmyError; +use lemmy_websocket::LemmyContext; + +// TODO: merge this trait with ApubObject (means that db_schema needs to depend on apub_lib) +#[async_trait::async_trait(?Send)] +pub trait DeletableApubObject { + // TODO: pass in tombstone with summary field, to decide between remove/delete + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError>; +} + +#[async_trait::async_trait(?Send)] +impl DeletableApubObject for Community { + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> { + let id = self.id; + blocking(context.pool(), move |conn| { + Community::update_deleted(conn, id, true) + }) + .await??; + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl DeletableApubObject for Person { + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> { + let id = self.id; + blocking(context.pool(), move |conn| Person::delete_account(conn, id)).await??; + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl DeletableApubObject for Post { + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> { + let id = self.id; + blocking(context.pool(), move |conn| { + Post::update_deleted(conn, id, true) + }) + .await??; + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl DeletableApubObject for Comment { + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> { + let id = self.id; + blocking(context.pool(), move |conn| { + Comment::update_deleted(conn, id, true) + }) + .await??; + Ok(()) + } +} + +#[async_trait::async_trait(?Send)] +impl DeletableApubObject for PostOrComment { + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> { + match self { + PostOrComment::Comment(c) => { + blocking(context.pool(), move |conn| { + Comment::update_deleted(conn, c.id, true) + }) + .await??; + } + PostOrComment::Post(p) => { + blocking(context.pool(), move |conn| { + Post::update_deleted(conn, p.id, true) + }) + .await??; + } + } + + Ok(()) + } +} diff --git a/crates/apub/src/fetcher/fetch.rs b/crates/apub/src/fetcher/fetch.rs index 128ccf1f..1e6f8de8 100644 --- a/crates/apub/src/fetcher/fetch.rs +++ b/crates/apub/src/fetcher/fetch.rs @@ -2,10 +2,9 @@ use crate::{check_is_apub_id_valid, APUB_JSON_CONTENT_TYPE}; use anyhow::anyhow; use lemmy_utils::{request::retry, LemmyError}; use log::info; -use reqwest::{Client, StatusCode}; +use reqwest::Client; use serde::Deserialize; use std::time::Duration; -use thiserror::Error; use url::Url; /// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object @@ -15,50 +14,19 @@ use url::Url; /// So we are looking at a maximum of 22 requests (rounded up just to be safe). static MAX_REQUEST_NUMBER: i32 = 25; -#[derive(Debug, Error)] -pub(in crate::fetcher) struct FetchError { - pub inner: anyhow::Error, - pub status_code: Option, -} - -impl From for FetchError { - fn from(t: LemmyError) -> Self { - FetchError { - inner: t.inner, - status_code: None, - } - } -} - -impl From for FetchError { - fn from(t: reqwest::Error) -> Self { - let status = t.status(); - FetchError { - inner: t.into(), - status_code: status, - } - } -} - -impl std::fmt::Display for FetchError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - std::fmt::Display::fmt(&self, f) - } -} - /// Fetch any type of ActivityPub object, handling things like HTTP headers, deserialisation, /// timeouts etc. pub(in crate::fetcher) async fn fetch_remote_object( client: &Client, url: &Url, recursion_counter: &mut i32, -) -> Result +) -> Result where Response: for<'de> Deserialize<'de> + std::fmt::Debug, { *recursion_counter += 1; if *recursion_counter > MAX_REQUEST_NUMBER { - return Err(LemmyError::from(anyhow!("Maximum recursion depth reached")).into()); + return Err(anyhow!("Maximum recursion depth reached").into()); } check_is_apub_id_valid(url, false)?; @@ -73,14 +41,6 @@ where }) .await?; - if res.status() == StatusCode::GONE { - info!("Fetched remote object {} which was deleted", url); - return Err(FetchError { - inner: anyhow!("Fetched remote object {} which was deleted", url), - status_code: Some(res.status()), - }); - } - let object = res.json().await?; info!("Fetched remote object {}", url); Ok(object) diff --git a/crates/apub/src/fetcher/mod.rs b/crates/apub/src/fetcher/mod.rs index 9d9e2b8e..0ef72b25 100644 --- a/crates/apub/src/fetcher/mod.rs +++ b/crates/apub/src/fetcher/mod.rs @@ -1,56 +1,42 @@ pub mod community; +pub mod deletable_apub_object; mod fetch; -pub mod objects; -pub mod person; +pub mod object_id; +pub mod post_or_comment; pub mod search; -use crate::{ - fetcher::{ - community::get_or_fetch_and_upsert_community, - fetch::FetchError, - person::get_or_fetch_and_upsert_person, - }, - ActorType, -}; +use crate::{fetcher::object_id::ObjectId, ActorType}; use chrono::NaiveDateTime; -use http::StatusCode; -use lemmy_db_schema::naive_now; +use lemmy_db_schema::{ + naive_now, + source::{community::Community, person::Person}, +}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; -use serde::Deserialize; use url::Url; static ACTOR_REFETCH_INTERVAL_SECONDS: i64 = 24 * 60 * 60; static ACTOR_REFETCH_INTERVAL_SECONDS_DEBUG: i64 = 10; -fn is_deleted(fetch_response: &Result) -> bool -where - Response: for<'de> Deserialize<'de>, -{ - if let Err(e) = fetch_response { - if let Some(status) = e.status_code { - if status == StatusCode::GONE { - return true; - } - } - } - false -} - /// Get a remote actor from its apub ID (either a person or a community). Thin wrapper around /// `get_or_fetch_and_upsert_person()` and `get_or_fetch_and_upsert_community()`. /// /// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. /// Otherwise it is fetched from the remote instance, stored and returned. pub(crate) async fn get_or_fetch_and_upsert_actor( - apub_id: &Url, + apub_id: Url, context: &LemmyContext, recursion_counter: &mut i32, ) -> Result, LemmyError> { - let community = get_or_fetch_and_upsert_community(apub_id, context, recursion_counter).await; + let community_id = ObjectId::::new(apub_id.clone()); + let community = community_id.dereference(context, recursion_counter).await; let actor: Box = match community { Ok(c) => Box::new(c), - Err(_) => Box::new(get_or_fetch_and_upsert_person(apub_id, context, recursion_counter).await?), + Err(_) => { + let person_id = ObjectId::new(apub_id); + let person: Person = person_id.dereference(context, recursion_counter).await?; + Box::new(person) + } }; Ok(actor) } diff --git a/crates/apub/src/fetcher/object_id.rs b/crates/apub/src/fetcher/object_id.rs new file mode 100644 index 00000000..7b3c535f --- /dev/null +++ b/crates/apub/src/fetcher/object_id.rs @@ -0,0 +1,157 @@ +use crate::{ + fetcher::{deletable_apub_object::DeletableApubObject, should_refetch_actor}, + objects::FromApub, + APUB_JSON_CONTENT_TYPE, +}; +use anyhow::anyhow; +use diesel::NotFound; +use lemmy_api_common::blocking; +use lemmy_db_queries::{ApubObject, DbPool}; +use lemmy_db_schema::DbUrl; +use lemmy_utils::{request::retry, settings::structs::Settings, LemmyError}; +use lemmy_websocket::LemmyContext; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use std::{ + fmt::{Debug, Display, Formatter}, + marker::PhantomData, + time::Duration, +}; +use url::Url; + +/// Maximum number of HTTP requests allowed to handle a single incoming activity (or a single object +/// fetch through the search). This should be configurable. +static REQUEST_LIMIT: i32 = 25; + +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] +pub struct ObjectId(Url, #[serde(skip)] PhantomData) +where + Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, + for<'de2> ::ApubType: serde::Deserialize<'de2>; + +impl ObjectId +where + Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, + for<'de> ::ApubType: serde::Deserialize<'de>, +{ + pub fn new(url: T) -> Self + where + T: Into, + { + ObjectId(url.into(), PhantomData::) + } + + pub fn inner(&self) -> &Url { + &self.0 + } + + /// Fetches an activitypub object, either from local database (if possible), or over http. + pub(crate) async fn dereference( + &self, + context: &LemmyContext, + request_counter: &mut i32, + ) -> Result { + let db_object = self.dereference_locally(context.pool()).await?; + + // if its a local object, only fetch it from the database and not over http + if self.0.domain() == Some(&Settings::get().get_hostname_without_port()?) { + return match db_object { + None => Err(NotFound {}.into()), + Some(o) => Ok(o), + }; + } + + if let Some(object) = db_object { + if let Some(last_refreshed_at) = object.last_refreshed_at() { + // TODO: rename to should_refetch_object() + if should_refetch_actor(last_refreshed_at) { + return self + .dereference_remotely(context, request_counter, Some(object)) + .await; + } + } + Ok(object) + } else { + self + .dereference_remotely(context, request_counter, None) + .await + } + } + + /// returning none means the object was not found in local db + async fn dereference_locally(&self, pool: &DbPool) -> Result, LemmyError> { + let id: DbUrl = self.0.clone().into(); + let object = blocking(pool, move |conn| ApubObject::read_from_apub_id(conn, &id)).await?; + match object { + Ok(o) => Ok(Some(o)), + Err(NotFound {}) => Ok(None), + Err(e) => Err(e.into()), + } + } + + async fn dereference_remotely( + &self, + context: &LemmyContext, + request_counter: &mut i32, + db_object: Option, + ) -> Result { + // dont fetch local objects this way + debug_assert!(self.0.domain() != Some(&Settings::get().hostname)); + + *request_counter += 1; + if *request_counter > REQUEST_LIMIT { + return Err(LemmyError::from(anyhow!("Request limit reached"))); + } + + let res = retry(|| { + context + .client() + .get(self.0.as_str()) + .header("Accept", APUB_JSON_CONTENT_TYPE) + .timeout(Duration::from_secs(60)) + .send() + }) + .await?; + + if res.status() == StatusCode::GONE { + if let Some(db_object) = db_object { + db_object.delete(context).await?; + } + return Err(anyhow!("Fetched remote object {} which was deleted", self).into()); + } + + let res2: Kind::ApubType = res.json().await?; + + Ok(Kind::from_apub(&res2, context, self.inner(), request_counter).await?) + } +} + +impl Display for ObjectId +where + Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, + for<'de> ::ApubType: serde::Deserialize<'de>, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.to_string()) + } +} + +impl From> for Url +where + Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, + for<'de> ::ApubType: serde::Deserialize<'de>, +{ + fn from(id: ObjectId) -> Self { + id.0 + } +} + +impl From> for DbUrl +where + Kind: FromApub + ApubObject + DeletableApubObject + Send + 'static, + for<'de> ::ApubType: serde::Deserialize<'de>, +{ + fn from(id: ObjectId) -> Self { + id.0.into() + } +} diff --git a/crates/apub/src/fetcher/objects.rs b/crates/apub/src/fetcher/objects.rs deleted file mode 100644 index e538982d..00000000 --- a/crates/apub/src/fetcher/objects.rs +++ /dev/null @@ -1,97 +0,0 @@ -use crate::{ - fetcher::fetch::fetch_remote_object, - objects::{comment::Note, post::Page, FromApub}, - PostOrComment, -}; -use anyhow::anyhow; -use diesel::result::Error::NotFound; -use lemmy_api_common::blocking; -use lemmy_db_queries::{ApubObject, Crud}; -use lemmy_db_schema::source::{comment::Comment, post::Post}; -use lemmy_utils::LemmyError; -use lemmy_websocket::LemmyContext; -use log::debug; -use url::Url; - -/// Gets a post by its apub ID. If it exists locally, it is returned directly. Otherwise it is -/// pulled from its apub ID, inserted and returned. -/// -/// The parent community is also pulled if necessary. Comments are not pulled. -pub(crate) async fn get_or_fetch_and_insert_post( - post_ap_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let post_ap_id_owned = post_ap_id.to_owned(); - let post = blocking(context.pool(), move |conn| { - Post::read_from_apub_id(conn, &post_ap_id_owned.into()) - }) - .await?; - - match post { - Ok(p) => Ok(p), - Err(NotFound {}) => { - debug!("Fetching and creating remote post: {}", post_ap_id); - let page = - fetch_remote_object::(context.client(), post_ap_id, recursion_counter).await?; - let post = Post::from_apub(&page, context, post_ap_id, recursion_counter).await?; - - Ok(post) - } - Err(e) => Err(e.into()), - } -} - -/// Gets a comment by its apub ID. If it exists locally, it is returned directly. Otherwise it is -/// pulled from its apub ID, inserted and returned. -/// -/// The parent community, post and comment are also pulled if necessary. -pub(crate) async fn get_or_fetch_and_insert_comment( - comment_ap_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let comment_ap_id_owned = comment_ap_id.to_owned(); - let comment = blocking(context.pool(), move |conn| { - Comment::read_from_apub_id(conn, &comment_ap_id_owned.into()) - }) - .await?; - - match comment { - Ok(p) => Ok(p), - Err(NotFound {}) => { - debug!( - "Fetching and creating remote comment and its parents: {}", - comment_ap_id - ); - let comment = - fetch_remote_object::(context.client(), comment_ap_id, recursion_counter).await?; - let comment = Comment::from_apub(&comment, context, comment_ap_id, recursion_counter).await?; - - let post_id = comment.post_id; - let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??; - if post.locked { - return Err(anyhow!("Post is locked").into()); - } - - Ok(comment) - } - Err(e) => Err(e.into()), - } -} - -pub(crate) async fn get_or_fetch_and_insert_post_or_comment( - ap_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - Ok( - match get_or_fetch_and_insert_post(ap_id, context, recursion_counter).await { - Ok(p) => PostOrComment::Post(Box::new(p)), - Err(_) => { - let c = get_or_fetch_and_insert_comment(ap_id, context, recursion_counter).await?; - PostOrComment::Comment(Box::new(c)) - } - }, - ) -} diff --git a/crates/apub/src/fetcher/person.rs b/crates/apub/src/fetcher/person.rs deleted file mode 100644 index ed3ca057..00000000 --- a/crates/apub/src/fetcher/person.rs +++ /dev/null @@ -1,70 +0,0 @@ -use crate::{ - fetcher::{fetch::fetch_remote_object, is_deleted, should_refetch_actor}, - objects::{person::Person as ApubPerson, FromApub}, -}; -use anyhow::anyhow; -use diesel::result::Error::NotFound; -use lemmy_api_common::blocking; -use lemmy_db_queries::{source::person::Person_, ApubObject}; -use lemmy_db_schema::source::person::Person; -use lemmy_utils::LemmyError; -use lemmy_websocket::LemmyContext; -use log::debug; -use url::Url; - -/// Get a person from its apub ID. -/// -/// If it exists locally and `!should_refetch_actor()`, it is returned directly from the database. -/// Otherwise it is fetched from the remote instance, stored and returned. -pub(crate) async fn get_or_fetch_and_upsert_person( - apub_id: &Url, - context: &LemmyContext, - recursion_counter: &mut i32, -) -> Result { - let apub_id_owned = apub_id.to_owned(); - let person = blocking(context.pool(), move |conn| { - Person::read_from_apub_id(conn, &apub_id_owned.into()) - }) - .await?; - - match person { - // If its older than a day, re-fetch it - Ok(u) if !u.local && should_refetch_actor(u.last_refreshed_at) => { - debug!("Fetching and updating from remote person: {}", apub_id); - let person = - fetch_remote_object::(context.client(), apub_id, recursion_counter).await; - - if is_deleted(&person) { - // TODO: use Person::update_deleted() once implemented - blocking(context.pool(), move |conn| { - Person::delete_account(conn, u.id) - }) - .await??; - return Err(anyhow!("Person was deleted by remote instance").into()); - } else if person.is_err() { - return Ok(u); - } - - let person = Person::from_apub(&person?, context, apub_id, recursion_counter).await?; - - let person_id = person.id; - blocking(context.pool(), move |conn| { - Person::mark_as_updated(conn, person_id) - }) - .await??; - - Ok(person) - } - Ok(u) => Ok(u), - Err(NotFound {}) => { - debug!("Fetching and creating remote person: {}", apub_id); - let person = - fetch_remote_object::(context.client(), apub_id, recursion_counter).await?; - - let person = Person::from_apub(&person, context, apub_id, recursion_counter).await?; - - Ok(person) - } - Err(e) => Err(e.into()), - } -} diff --git a/crates/apub/src/fetcher/post_or_comment.rs b/crates/apub/src/fetcher/post_or_comment.rs new file mode 100644 index 00000000..53107499 --- /dev/null +++ b/crates/apub/src/fetcher/post_or_comment.rs @@ -0,0 +1,85 @@ +use crate::objects::{comment::Note, post::Page, FromApub}; +use activitystreams::chrono::NaiveDateTime; +use diesel::{result::Error, PgConnection}; +use lemmy_db_queries::ApubObject; +use lemmy_db_schema::{ + source::{ + comment::{Comment, CommentForm}, + post::{Post, PostForm}, + }, + DbUrl, +}; +use lemmy_utils::LemmyError; +use lemmy_websocket::LemmyContext; +use serde::Deserialize; +use url::Url; + +#[derive(Clone, Debug)] +pub enum PostOrComment { + Comment(Box), + Post(Box), +} + +pub enum PostOrCommentForm { + PostForm(PostForm), + CommentForm(CommentForm), +} + +#[derive(Deserialize)] +pub enum PageOrNote { + Page(Page), + Note(Note), +} + +#[async_trait::async_trait(?Send)] +impl ApubObject for PostOrComment { + fn last_refreshed_at(&self) -> Option { + None + } + + // TODO: this can probably be implemented using a single sql query + fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result + where + Self: Sized, + { + let post = Post::read_from_apub_id(conn, object_id); + Ok(match post { + Ok(o) => PostOrComment::Post(Box::new(o)), + Err(_) => PostOrComment::Comment(Box::new(Comment::read_from_apub_id(conn, object_id)?)), + }) + } +} + +#[async_trait::async_trait(?Send)] +impl FromApub for PostOrComment { + type ApubType = PageOrNote; + + async fn from_apub( + apub: &PageOrNote, + context: &LemmyContext, + expected_domain: &Url, + request_counter: &mut i32, + ) -> Result + where + Self: Sized, + { + Ok(match apub { + PageOrNote::Page(p) => PostOrComment::Post(Box::new( + Post::from_apub(p, context, expected_domain, request_counter).await?, + )), + PageOrNote::Note(n) => PostOrComment::Comment(Box::new( + Comment::from_apub(n, context, expected_domain, request_counter).await?, + )), + }) + } +} + +impl PostOrComment { + pub(crate) fn ap_id(&self) -> Url { + match self { + PostOrComment::Post(p) => p.ap_id.clone(), + PostOrComment::Comment(c) => c.ap_id.clone(), + } + .into() + } +} diff --git a/crates/apub/src/fetcher/search.rs b/crates/apub/src/fetcher/search.rs index 70e7c40c..6a3cc14f 100644 --- a/crates/apub/src/fetcher/search.rs +++ b/crates/apub/src/fetcher/search.rs @@ -1,52 +1,27 @@ use crate::{ - fetcher::{ - community::get_or_fetch_and_upsert_community, - fetch::fetch_remote_object, - is_deleted, - person::get_or_fetch_and_upsert_person, - }, - find_object_by_id, + fetcher::{deletable_apub_object::DeletableApubObject, object_id::ObjectId}, objects::{comment::Note, community::Group, person::Person as ApubPerson, post::Page, FromApub}, - Object, }; +use activitystreams::chrono::NaiveDateTime; use anyhow::anyhow; +use diesel::{result::Error, PgConnection}; use itertools::Itertools; -use lemmy_api_common::{blocking, site::ResolveObjectResponse}; +use lemmy_api_common::blocking; use lemmy_apub_lib::webfinger::{webfinger_resolve_actor, WebfingerType}; -use lemmy_db_queries::source::{ - comment::Comment_, - community::Community_, - person::Person_, - post::Post_, - private_message::PrivateMessage_, +use lemmy_db_queries::{ + source::{community::Community_, person::Person_}, + ApubObject, + DbPool, }; -use lemmy_db_schema::source::{ - comment::Comment, - community::Community, - person::Person, - post::Post, - private_message::PrivateMessage, +use lemmy_db_schema::{ + source::{comment::Comment, community::Community, person::Person, post::Post}, + DbUrl, }; -use lemmy_db_views::{ - comment_view::CommentView, - local_user_view::LocalUserView, - post_view::PostView, -}; -use lemmy_db_views_actor::{community_view::CommunityView, person_view::PersonViewSafe}; use lemmy_utils::LemmyError; use lemmy_websocket::LemmyContext; +use serde::Deserialize; use url::Url; -/// The types of ActivityPub objects that can be fetched directly by searching for their ID. -#[derive(serde::Deserialize, Debug)] -#[serde(untagged)] -enum SearchAcceptedObjects { - Person(Box), - Group(Box), - Page(Box), - Comment(Box), -} - /// Attempt to parse the query as URL, and fetch an ActivityPub object from it. /// /// Some working examples for use with the `docker/federation/` setup: @@ -56,9 +31,8 @@ enum SearchAcceptedObjects { /// http://lemmy_delta:8571/comment/2 pub async fn search_by_apub_id( query: &str, - local_user_view: Option, context: &LemmyContext, -) -> Result { +) -> Result { let query_url = match Url::parse(query) { Ok(u) => u, Err(_) => { @@ -75,142 +49,113 @@ pub async fn search_by_apub_id( } // local actor, read from database and return else { - let name: String = name.into(); - return match kind { - WebfingerType::Group => { - let res = blocking(context.pool(), move |conn| { - let community = Community::read_from_name(conn, &name)?; - CommunityView::read(conn, community.id, local_user_view.map(|l| l.person.id)) - }) - .await??; - Ok(ResolveObjectResponse { - community: Some(res), - ..ResolveObjectResponse::default() - }) - } - WebfingerType::Person => { - let res = blocking(context.pool(), move |conn| { - let person = Person::find_by_name(conn, &name)?; - PersonViewSafe::read(conn, person.id) - }) - .await??; - Ok(ResolveObjectResponse { - person: Some(res), - ..ResolveObjectResponse::default() - }) - } - }; + return find_local_actor_by_name(name, kind, context.pool()).await; } } }; let request_counter = &mut 0; - // this does a fetch (even for local objects), just to determine its type and fetch it again - // below. we need to fix this when rewriting the fetcher. - let fetch_response = - fetch_remote_object::(context.client(), &query_url, request_counter) - .await; - if is_deleted(&fetch_response) { - delete_object_locally(&query_url, context).await?; - return Err(anyhow!("Object was deleted").into()); - } + ObjectId::new(query_url) + .dereference(context, request_counter) + .await +} - // Necessary because we get a stack overflow using FetchError - let fet_res = fetch_response.map_err(|e| LemmyError::from(e.inner))?; - build_response(fet_res, query_url, request_counter, context).await +async fn find_local_actor_by_name( + name: &str, + kind: WebfingerType, + pool: &DbPool, +) -> Result { + let name: String = name.into(); + Ok(match kind { + WebfingerType::Group => SearchableObjects::Community( + blocking(pool, move |conn| Community::read_from_name(conn, &name)).await??, + ), + WebfingerType::Person => SearchableObjects::Person( + blocking(pool, move |conn| Person::find_by_name(conn, &name)).await??, + ), + }) } -async fn build_response( - fetch_response: SearchAcceptedObjects, - query_url: Url, - recursion_counter: &mut i32, - context: &LemmyContext, -) -> Result { - use ResolveObjectResponse as ROR; - Ok(match fetch_response { - SearchAcceptedObjects::Person(p) => { - let person_uri = p.id(&query_url)?; +/// The types of ActivityPub objects that can be fetched directly by searching for their ID. +#[derive(Debug)] +pub enum SearchableObjects { + Person(Person), + Community(Community), + Post(Post), + Comment(Comment), +} - let person = get_or_fetch_and_upsert_person(person_uri, context, recursion_counter).await?; - ROR { - person: blocking(context.pool(), move |conn| { - PersonViewSafe::read(conn, person.id) - }) - .await? - .ok(), - ..ROR::default() - } - } - SearchAcceptedObjects::Group(g) => { - let community_uri = g.id(&query_url)?; - let community = - get_or_fetch_and_upsert_community(community_uri, context, recursion_counter).await?; - ROR { - community: blocking(context.pool(), move |conn| { - CommunityView::read(conn, community.id, None) - }) - .await? - .ok(), - ..ROR::default() - } - } - SearchAcceptedObjects::Page(p) => { - let p = Post::from_apub(&p, context, &query_url, recursion_counter).await?; - ROR { - post: blocking(context.pool(), move |conn| PostView::read(conn, p.id, None)) - .await? - .ok(), - ..ROR::default() - } - } - SearchAcceptedObjects::Comment(c) => { - let c = Comment::from_apub(&c, context, &query_url, recursion_counter).await?; - ROR { - comment: blocking(context.pool(), move |conn| { - CommentView::read(conn, c.id, None) - }) - .await? - .ok(), - ..ROR::default() - } - } - }) +#[derive(Deserialize)] +#[serde(untagged)] +pub enum SearchableApubTypes { + Group(Group), + Person(ApubPerson), + Page(Page), + Note(Note), } -async fn delete_object_locally(query_url: &Url, context: &LemmyContext) -> Result<(), LemmyError> { - let res = find_object_by_id(context, query_url.to_owned()).await?; - match res { - Object::Comment(c) => { - blocking(context.pool(), move |conn| { - Comment::update_deleted(conn, c.id, true) - }) - .await??; +impl ApubObject for SearchableObjects { + fn last_refreshed_at(&self) -> Option { + match self { + SearchableObjects::Person(p) => p.last_refreshed_at(), + SearchableObjects::Community(c) => c.last_refreshed_at(), + SearchableObjects::Post(p) => p.last_refreshed_at(), + SearchableObjects::Comment(c) => c.last_refreshed_at(), } - Object::Post(p) => { - blocking(context.pool(), move |conn| { - Post::update_deleted(conn, p.id, true) - }) - .await??; + } + + // TODO: this is inefficient, because if the object is not in local db, it will run 4 db queries + // before finally returning an error. it would be nice if we could check all 4 tables in + // a single query. + // we could skip this and always return an error, but then it would not be able to mark + // objects as deleted that were deleted by remote server. + fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { + let c = Community::read_from_apub_id(conn, object_id); + if let Ok(c) = c { + return Ok(SearchableObjects::Community(c)); } - Object::Person(u) => { - // TODO: implement update_deleted() for user, move it to ApubObject trait - blocking(context.pool(), move |conn| { - Person::delete_account(conn, u.id) - }) - .await??; + let p = Person::read_from_apub_id(conn, object_id); + if let Ok(p) = p { + return Ok(SearchableObjects::Person(p)); } - Object::Community(c) => { - blocking(context.pool(), move |conn| { - Community::update_deleted(conn, c.id, true) - }) - .await??; + let p = Post::read_from_apub_id(conn, object_id); + if let Ok(p) = p { + return Ok(SearchableObjects::Post(p)); } - Object::PrivateMessage(pm) => { - blocking(context.pool(), move |conn| { - PrivateMessage::update_deleted(conn, pm.id, true) - }) - .await??; + let c = Comment::read_from_apub_id(conn, object_id); + Ok(SearchableObjects::Comment(c?)) + } +} + +#[async_trait::async_trait(?Send)] +impl FromApub for SearchableObjects { + type ApubType = SearchableApubTypes; + + async fn from_apub( + apub: &Self::ApubType, + context: &LemmyContext, + ed: &Url, + rc: &mut i32, + ) -> Result { + use SearchableApubTypes as SAT; + use SearchableObjects as SO; + Ok(match apub { + SAT::Group(g) => SO::Community(Community::from_apub(g, context, ed, rc).await?), + SAT::Person(p) => SO::Person(Person::from_apub(p, context, ed, rc).await?), + SAT::Page(p) => SO::Post(Post::from_apub(p, context, ed, rc).await?), + SAT::Note(n) => SO::Comment(Comment::from_apub(n, context, ed, rc).await?), + }) + } +} + +#[async_trait::async_trait(?Send)] +impl DeletableApubObject for SearchableObjects { + async fn delete(self, context: &LemmyContext) -> Result<(), LemmyError> { + match self { + SearchableObjects::Person(p) => p.delete(context).await, + SearchableObjects::Community(c) => c.delete(context).await, + SearchableObjects::Post(p) => p.delete(context).await, + SearchableObjects::Comment(c) => c.delete(context).await, } } - Ok(()) } diff --git a/crates/apub/src/http/mod.rs b/crates/apub/src/http/mod.rs index 6fe8d182..47182259 100644 --- a/crates/apub/src/http/mod.rs +++ b/crates/apub/src/http/mod.rs @@ -90,7 +90,8 @@ where + 'static, { let request_counter = &mut 0; - let actor = get_or_fetch_and_upsert_actor(activity.actor(), context, request_counter).await?; + let actor = + get_or_fetch_and_upsert_actor(activity.actor().clone(), context, request_counter).await?; verify_signature(&request, &actor.public_key().context(location_info!())?)?; // Do nothing if we received the same activity before diff --git a/crates/apub/src/lib.rs b/crates/apub/src/lib.rs index 839e7d14..e7065752 100644 --- a/crates/apub/src/lib.rs +++ b/crates/apub/src/lib.rs @@ -9,26 +9,17 @@ pub mod http; pub mod migrations; pub mod objects; -use crate::extensions::signatures::PublicKey; +use crate::{extensions::signatures::PublicKey, fetcher::post_or_comment::PostOrComment}; use anyhow::{anyhow, Context}; -use diesel::NotFound; use lemmy_api_common::blocking; -use lemmy_db_queries::{source::activity::Activity_, ApubObject, DbPool}; +use lemmy_db_queries::{source::activity::Activity_, DbPool}; use lemmy_db_schema::{ - source::{ - activity::Activity, - comment::Comment, - community::Community, - person::{Person as DbPerson, Person}, - post::Post, - private_message::PrivateMessage, - }, + source::{activity::Activity, person::Person}, CommunityId, DbUrl, }; use lemmy_db_views_actor::community_person_ban_view::CommunityPersonBanView; use lemmy_utils::{location_info, settings::structs::Settings, LemmyError}; -use lemmy_websocket::LemmyContext; use serde::Serialize; use std::net::IpAddr; use url::{ParseError, Url}; @@ -244,96 +235,6 @@ where Ok(()) } -pub enum PostOrComment { - Comment(Box), - Post(Box), -} - -impl PostOrComment { - pub(crate) fn ap_id(&self) -> Url { - match self { - PostOrComment::Post(p) => p.ap_id.clone(), - PostOrComment::Comment(c) => c.ap_id.clone(), - } - .into() - } -} - -/// Tries to find a post or comment in the local database, without any network requests. -/// This is used to handle deletions and removals, because in case we dont have the object, we can -/// simply ignore the activity. -pub(crate) async fn find_post_or_comment_by_id( - context: &LemmyContext, - apub_id: Url, -) -> Result { - let ap_id = apub_id.clone(); - let post = blocking(context.pool(), move |conn| { - Post::read_from_apub_id(conn, &ap_id.into()) - }) - .await?; - if let Ok(p) = post { - return Ok(PostOrComment::Post(Box::new(p))); - } - - let ap_id = apub_id.clone(); - let comment = blocking(context.pool(), move |conn| { - Comment::read_from_apub_id(conn, &ap_id.into()) - }) - .await?; - if let Ok(c) = comment { - return Ok(PostOrComment::Comment(Box::new(c))); - } - - Err(NotFound.into()) -} - -#[derive(Debug)] -enum Object { - Comment(Box), - Post(Box), - Community(Box), - Person(Box), - PrivateMessage(Box), -} - -async fn find_object_by_id(context: &LemmyContext, apub_id: Url) -> Result { - let ap_id = apub_id.clone(); - if let Ok(pc) = find_post_or_comment_by_id(context, ap_id.to_owned()).await { - return Ok(match pc { - PostOrComment::Post(p) => Object::Post(Box::new(*p)), - PostOrComment::Comment(c) => Object::Comment(Box::new(*c)), - }); - } - - let ap_id = apub_id.clone(); - let person = blocking(context.pool(), move |conn| { - DbPerson::read_from_apub_id(conn, &ap_id.into()) - }) - .await?; - if let Ok(u) = person { - return Ok(Object::Person(Box::new(u))); - } - - let ap_id = apub_id.clone(); - let community = blocking(context.pool(), move |conn| { - Community::read_from_apub_id(conn, &ap_id.into()) - }) - .await?; - if let Ok(c) = community { - return Ok(Object::Community(Box::new(c))); - } - - let private_message = blocking(context.pool(), move |conn| { - PrivateMessage::read_from_apub_id(conn, &apub_id.into()) - }) - .await?; - if let Ok(pm) = private_message { - return Ok(Object::PrivateMessage(Box::new(pm))); - } - - Err(NotFound.into()) -} - async fn check_community_or_site_ban( person: &Person, community_id: CommunityId, diff --git a/crates/apub/src/migrations.rs b/crates/apub/src/migrations.rs index 59eeacce..493518fe 100644 --- a/crates/apub/src/migrations.rs +++ b/crates/apub/src/migrations.rs @@ -1,3 +1,4 @@ +use crate::fetcher::{object_id::ObjectId, post_or_comment::PostOrComment}; use serde::{Deserialize, Serialize}; use url::Url; @@ -13,7 +14,7 @@ use url::Url; #[serde(untagged)] pub enum CommentInReplyToMigration { Old(Vec), - New(Url), + New(ObjectId), } // Another migration we are doing is to handle all deletions and removals using Delete activity. diff --git a/crates/apub/src/objects/comment.rs b/crates/apub/src/objects/comment.rs index 1bebb39e..f3344871 100644 --- a/crates/apub/src/objects/comment.rs +++ b/crates/apub/src/objects/comment.rs @@ -1,13 +1,9 @@ use crate::{ activities::verify_person_in_community, extensions::context::lemmy_context, - fetcher::objects::{ - get_or_fetch_and_insert_comment, - get_or_fetch_and_insert_post, - get_or_fetch_and_insert_post_or_comment, - }, + fetcher::object_id::ObjectId, migrations::CommentInReplyToMigration, - objects::{create_tombstone, get_or_fetch_and_upsert_person, FromApub, Source, ToApub}, + objects::{create_tombstone, FromApub, Source, ToApub}, ActorType, PostOrComment, }; @@ -24,7 +20,7 @@ use lemmy_apub_lib::{ values::{MediaTypeHtml, MediaTypeMarkdown, PublicUrl}, verify_domains_match, }; -use lemmy_db_queries::{ApubObject, Crud, DbPool}; +use lemmy_db_queries::{source::comment::Comment_, Crud, DbPool}; use lemmy_db_schema::{ source::{ comment::{Comment, CommentForm}, @@ -53,7 +49,7 @@ pub struct Note { context: OneOrMany, r#type: NoteType, id: Url, - pub(crate) attributed_to: Url, + pub(crate) attributed_to: ObjectId, /// Indicates that the object is publicly readable. Unlike [`Post.to`], this one doesn't contain /// the community ID, as it would be incompatible with Pleroma (and we can get the community from /// the post in [`in_reply_to`]). @@ -86,23 +82,15 @@ impl Note { CommentInReplyToMigration::Old(in_reply_to) => { // This post, or the parent comment might not yet exist on this server yet, fetch them. let post_id = in_reply_to.get(0).context(location_info!())?; - let post = Box::pin(get_or_fetch_and_insert_post( - post_id, - context, - request_counter, - )) - .await?; + let post_id = ObjectId::new(post_id.clone()); + let post = Box::pin(post_id.dereference(context, request_counter)).await?; // The 2nd item, if it exists, is the parent comment apub_id // Nested comments will automatically get fetched recursively let parent_id: Option = match in_reply_to.get(1) { - Some(parent_comment_uri) => { - let parent_comment = Box::pin(get_or_fetch_and_insert_comment( - parent_comment_uri, - context, - request_counter, - )) - .await?; + Some(comment_id) => { + let comment_id = ObjectId::::new(comment_id.clone()); + let parent_comment = Box::pin(comment_id.dereference(context, request_counter)).await?; Some(parent_comment.id) } @@ -112,9 +100,7 @@ impl Note { Ok((post, parent_id)) } CommentInReplyToMigration::New(in_reply_to) => { - let parent = Box::pin( - get_or_fetch_and_insert_post_or_comment(in_reply_to, context, request_counter).await?, - ); + let parent = Box::pin(in_reply_to.dereference(context, request_counter).await?); match parent.deref() { PostOrComment::Post(p) => { // Workaround because I cant figure ut how to get the post out of the box (and we dont @@ -148,10 +134,10 @@ impl Note { if post.locked { return Err(anyhow!("Post is locked").into()); } - verify_domains_match(&self.attributed_to, &self.id)?; + verify_domains_match(self.attributed_to.inner(), &self.id)?; verify_person_in_community( &self.attributed_to, - &community.actor_id(), + &ObjectId::new(community.actor_id()), context, request_counter, ) @@ -185,7 +171,7 @@ impl ToApub for Comment { context: lemmy_context(), r#type: NoteType::Note, id: self.ap_id.to_owned().into_inner(), - attributed_to: creator.actor_id.into_inner(), + attributed_to: ObjectId::new(creator.actor_id), to: PublicUrl::Public, content: self.content.clone(), media_type: MediaTypeHtml::Html, @@ -226,9 +212,14 @@ impl FromApub for Comment { request_counter: &mut i32, ) -> Result { let ap_id = Some(note.id(expected_domain)?.clone().into()); - let creator = - get_or_fetch_and_upsert_person(¬e.attributed_to, context, request_counter).await?; + let creator = note + .attributed_to + .dereference(context, request_counter) + .await?; let (post, parent_comment_id) = note.get_parents(context, request_counter).await?; + if post.locked { + return Err(anyhow!("Post is locked").into()); + } let content = ¬e.source.content; let content_slurs_removed = remove_slurs(content); diff --git a/crates/apub/src/objects/community.rs b/crates/apub/src/objects/community.rs index 5269d766..dcd00e6d 100644 --- a/crates/apub/src/objects/community.rs +++ b/crates/apub/src/objects/community.rs @@ -1,6 +1,6 @@ use crate::{ extensions::{context::lemmy_context, signatures::PublicKey}, - fetcher::community::fetch_community_mods, + fetcher::community::{fetch_community_outbox, update_community_mods}, generate_moderators_url, objects::{create_tombstone, FromApub, ImageObject, Source, ToApub}, ActorType, @@ -18,7 +18,7 @@ use lemmy_apub_lib::{ values::{MediaTypeHtml, MediaTypeMarkdown}, verify_domains_match, }; -use lemmy_db_queries::{ApubObject, DbPool}; +use lemmy_db_queries::{source::community::Community_, DbPool}; use lemmy_db_schema::{ naive_now, source::community::{Community, CommunityForm}, @@ -175,10 +175,14 @@ impl FromApub for Community { expected_domain: &Url, request_counter: &mut i32, ) -> Result { - fetch_community_mods(context, group, request_counter).await?; let form = Group::from_apub_to_form(group, expected_domain).await?; let community = blocking(context.pool(), move |conn| Community::upsert(conn, &form)).await??; + update_community_mods(group, &community, context, request_counter).await?; + + // TODO: doing this unconditionally might cause infinite loop for some reason + fetch_community_outbox(context, &group.outbox, request_counter).await?; + Ok(community) } } diff --git a/crates/apub/src/objects/mod.rs b/crates/apub/src/objects/mod.rs index 33a93915..b05a944c 100644 --- a/crates/apub/src/objects/mod.rs +++ b/crates/apub/src/objects/mod.rs @@ -1,4 +1,3 @@ -use crate::fetcher::person::get_or_fetch_and_upsert_person; use activitystreams::{ base::BaseExt, object::{kind::ImageType, Tombstone, TombstoneExt}, @@ -26,7 +25,7 @@ pub(crate) trait ToApub { } #[async_trait::async_trait(?Send)] -pub(crate) trait FromApub { +pub trait FromApub { type ApubType; /// Converts an object from ActivityPub type to Lemmy internal type. /// diff --git a/crates/apub/src/objects/person.rs b/crates/apub/src/objects/person.rs index ec14a943..04af848f 100644 --- a/crates/apub/src/objects/person.rs +++ b/crates/apub/src/objects/person.rs @@ -17,7 +17,7 @@ use lemmy_apub_lib::{ values::{MediaTypeHtml, MediaTypeMarkdown}, verify_domains_match, }; -use lemmy_db_queries::{ApubObject, DbPool}; +use lemmy_db_queries::{source::person::Person_, DbPool}; use lemmy_db_schema::{ naive_now, source::person::{Person as DbPerson, PersonForm}, diff --git a/crates/apub/src/objects/post.rs b/crates/apub/src/objects/post.rs index 4773f8c5..341b428b 100644 --- a/crates/apub/src/objects/post.rs +++ b/crates/apub/src/objects/post.rs @@ -1,7 +1,7 @@ use crate::{ activities::{extract_community, verify_person_in_community}, extensions::context::lemmy_context, - fetcher::person::get_or_fetch_and_upsert_person, + fetcher::object_id::ObjectId, objects::{create_tombstone, FromApub, ImageObject, Source, ToApub}, ActorType, }; @@ -21,7 +21,7 @@ use lemmy_apub_lib::{ values::{MediaTypeHtml, MediaTypeMarkdown}, verify_domains_match, }; -use lemmy_db_queries::{ApubObject, Crud, DbPool}; +use lemmy_db_queries::{source::post::Post_, ApubObject, Crud, DbPool}; use lemmy_db_schema::{ self, source::{ @@ -48,7 +48,7 @@ pub struct Page { context: OneOrMany, r#type: PageType, id: Url, - pub(crate) attributed_to: Url, + pub(crate) attributed_to: ObjectId, to: [Url; 2], name: String, content: Option, @@ -101,10 +101,10 @@ impl Page { let community = extract_community(&self.to, context, request_counter).await?; check_slurs(&self.name)?; - verify_domains_match(&self.attributed_to, &self.id)?; + verify_domains_match(self.attributed_to.inner(), &self.id)?; verify_person_in_community( &self.attributed_to, - &community.actor_id(), + &ObjectId::new(community.actor_id()), context, request_counter, ) @@ -137,7 +137,7 @@ impl ToApub for Post { context: lemmy_context(), r#type: PageType::Page, id: self.ap_id.clone().into(), - attributed_to: creator.actor_id.into(), + attributed_to: ObjectId::new(creator.actor_id), to: [community.actor_id.into(), public()], name: self.name.clone(), content: self.body.as_ref().map(|b| markdown_to_html(b)), @@ -183,8 +183,10 @@ impl FromApub for Post { page.id(expected_domain)? }; let ap_id = Some(ap_id.clone().into()); - let creator = - get_or_fetch_and_upsert_person(&page.attributed_to, context, request_counter).await?; + let creator = page + .attributed_to + .dereference(context, request_counter) + .await?; let community = extract_community(&page.to, context, request_counter).await?; let thumbnail_url: Option = page.image.clone().map(|i| i.url); diff --git a/crates/apub/src/objects/private_message.rs b/crates/apub/src/objects/private_message.rs index 02cf12eb..9bc917a9 100644 --- a/crates/apub/src/objects/private_message.rs +++ b/crates/apub/src/objects/private_message.rs @@ -1,6 +1,6 @@ use crate::{ extensions::context::lemmy_context, - fetcher::person::get_or_fetch_and_upsert_person, + fetcher::object_id::ObjectId, objects::{create_tombstone, FromApub, Source, ToApub}, }; use activitystreams::{ @@ -16,7 +16,7 @@ use lemmy_apub_lib::{ values::{MediaTypeHtml, MediaTypeMarkdown}, verify_domains_match, }; -use lemmy_db_queries::{ApubObject, Crud, DbPool}; +use lemmy_db_queries::{source::private_message::PrivateMessage_, Crud, DbPool}; use lemmy_db_schema::source::{ person::Person, private_message::{PrivateMessage, PrivateMessageForm}, @@ -35,8 +35,8 @@ pub struct Note { context: OneOrMany, r#type: NoteType, id: Url, - pub(crate) attributed_to: Url, - to: Url, + pub(crate) attributed_to: ObjectId, + to: ObjectId, content: String, media_type: MediaTypeHtml, source: Source, @@ -60,9 +60,11 @@ impl Note { context: &LemmyContext, request_counter: &mut i32, ) -> Result<(), LemmyError> { - verify_domains_match(&self.attributed_to, &self.id)?; - let person = - get_or_fetch_and_upsert_person(&self.attributed_to, context, request_counter).await?; + verify_domains_match(self.attributed_to.inner(), &self.id)?; + let person = self + .attributed_to + .dereference(context, request_counter) + .await?; if person.banned { return Err(anyhow!("Person is banned from site").into()); } @@ -85,8 +87,8 @@ impl ToApub for PrivateMessage { context: lemmy_context(), r#type: NoteType::Note, id: self.ap_id.clone().into(), - attributed_to: creator.actor_id.into_inner(), - to: recipient.actor_id.into(), + attributed_to: ObjectId::new(creator.actor_id), + to: ObjectId::new(recipient.actor_id), content: self.content.clone(), media_type: MediaTypeHtml::Html, source: Source { @@ -121,9 +123,11 @@ impl FromApub for PrivateMessage { request_counter: &mut i32, ) -> Result { let ap_id = Some(note.id(expected_domain)?.clone().into()); - let creator = - get_or_fetch_and_upsert_person(¬e.attributed_to, context, request_counter).await?; - let recipient = get_or_fetch_and_upsert_person(¬e.to, context, request_counter).await?; + let creator = note + .attributed_to + .dereference(context, request_counter) + .await?; + let recipient = note.to.dereference(context, request_counter).await?; let form = PrivateMessageForm { creator_id: creator.id, diff --git a/crates/apub_lib_derive/src/lib.rs b/crates/apub_lib_derive/src/lib.rs index e7a1912c..c04f9747 100644 --- a/crates/apub_lib_derive/src/lib.rs +++ b/crates/apub_lib_derive/src/lib.rs @@ -145,14 +145,14 @@ pub fn derive_activity_fields(input: proc_macro::TokenStream) -> proc_macro::Tok unimplemented!() }; let cc_impl = if has_cc { - quote! {self.cc.clone().into()} + quote! {self.cc.iter().map(|i| i.clone().into()).collect()} } else { quote! {vec![]} }; quote! { impl #impl_generics lemmy_apub_lib::ActivityFields for #name #ty_generics #where_clause { fn id_unchecked(&self) -> &url::Url { &self.id } - fn actor(&self) -> &url::Url { &self.actor } + fn actor(&self) -> &url::Url { &self.actor.inner() } fn cc(&self) -> Vec { #cc_impl } } } diff --git a/crates/db_queries/src/lib.rs b/crates/db_queries/src/lib.rs index e62124b1..dbd470e4 100644 --- a/crates/db_queries/src/lib.rs +++ b/crates/db_queries/src/lib.rs @@ -12,6 +12,7 @@ extern crate diesel_migrations; #[cfg(test)] extern crate serial_test; +use chrono::NaiveDateTime; use diesel::{result::Error, *}; use lemmy_db_schema::{CommunityId, DbUrl, PersonId}; use lemmy_utils::ApiError; @@ -145,14 +146,14 @@ pub trait DeleteableOrRemoveable { fn blank_out_deleted_or_removed_info(self) -> Self; } +// TODO: move this to apub lib pub trait ApubObject { - type Form; + /// If this object should be refetched after a certain interval, it should return the last refresh + /// time here. This is mainly used to update remote actors. + fn last_refreshed_at(&self) -> Option; fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result where Self: Sized; - fn upsert(conn: &PgConnection, user_form: &Self::Form) -> Result - where - Self: Sized; } pub trait MaybeOptional { diff --git a/crates/db_queries/src/source/activity.rs b/crates/db_queries/src/source/activity.rs index 1f158370..3008c38f 100644 --- a/crates/db_queries/src/source/activity.rs +++ b/crates/db_queries/src/source/activity.rs @@ -1,7 +1,6 @@ use crate::Crud; use diesel::{dsl::*, result::Error, sql_types::Text, *}; use lemmy_db_schema::{source::activity::*, DbUrl}; -use log::debug; use serde::Serialize; use serde_json::Value; use std::{ @@ -72,7 +71,6 @@ impl Activity_ for Activity { where T: Serialize + Debug, { - debug!("{}", serde_json::to_string_pretty(&data)?); let activity_form = ActivityForm { ap_id, data: serde_json::to_value(&data)?, diff --git a/crates/db_queries/src/source/comment.rs b/crates/db_queries/src/source/comment.rs index 1e972271..ec8466db 100644 --- a/crates/db_queries/src/source/comment.rs +++ b/crates/db_queries/src/source/comment.rs @@ -1,4 +1,5 @@ use crate::{ApubObject, Crud, DeleteableOrRemoveable, Likeable, Saveable}; +use chrono::NaiveDateTime; use diesel::{dsl::*, result::Error, *}; use lemmy_db_schema::{ naive_now, @@ -50,6 +51,7 @@ pub trait Comment_ { comment_id: CommentId, new_content: &str, ) -> Result; + fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result; } impl Comment_ for Comment { @@ -133,6 +135,16 @@ impl Comment_ for Comment { .set((content.eq(new_content), updated.eq(naive_now()))) .get_result::(conn) } + + fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result { + use lemmy_db_schema::schema::comment::dsl::*; + insert_into(comment) + .values(comment_form) + .on_conflict(ap_id) + .do_update() + .set(comment_form) + .get_result::(conn) + } } impl Crud for Comment { @@ -168,20 +180,13 @@ impl Crud for Comment { } impl ApubObject for Comment { - type Form = CommentForm; - fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { - use lemmy_db_schema::schema::comment::dsl::*; - comment.filter(ap_id.eq(object_id)).first::(conn) + fn last_refreshed_at(&self) -> Option { + None } - fn upsert(conn: &PgConnection, comment_form: &CommentForm) -> Result { + fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { use lemmy_db_schema::schema::comment::dsl::*; - insert_into(comment) - .values(comment_form) - .on_conflict(ap_id) - .do_update() - .set(comment_form) - .get_result::(conn) + comment.filter(ap_id.eq(object_id)).first::(conn) } } diff --git a/crates/db_queries/src/source/community.rs b/crates/db_queries/src/source/community.rs index eaaa2ba2..dbce1969 100644 --- a/crates/db_queries/src/source/community.rs +++ b/crates/db_queries/src/source/community.rs @@ -1,4 +1,5 @@ use crate::{ApubObject, Bannable, Crud, DeleteableOrRemoveable, Followable, Joinable}; +use chrono::NaiveDateTime; use diesel::{dsl::*, result::Error, *}; use lemmy_db_schema::{ naive_now, @@ -93,23 +94,16 @@ impl Crud for Community { } impl ApubObject for Community { - type Form = CommunityForm; + fn last_refreshed_at(&self) -> Option { + Some(self.last_refreshed_at) + } + fn read_from_apub_id(conn: &PgConnection, for_actor_id: &DbUrl) -> Result { use lemmy_db_schema::schema::community::dsl::*; community .filter(actor_id.eq(for_actor_id)) .first::(conn) } - - fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result { - use lemmy_db_schema::schema::community::dsl::*; - insert_into(community) - .values(community_form) - .on_conflict(actor_id) - .do_update() - .set(community_form) - .get_result::(conn) - } } pub trait Community_ { @@ -129,6 +123,7 @@ pub trait Community_ { conn: &PgConnection, followers_url: &DbUrl, ) -> Result; + fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result; } impl Community_ for Community { @@ -176,6 +171,16 @@ impl Community_ for Community { .filter(followers_url.eq(followers_url_)) .first::(conn) } + + fn upsert(conn: &PgConnection, community_form: &CommunityForm) -> Result { + use lemmy_db_schema::schema::community::dsl::*; + insert_into(community) + .values(community_form) + .on_conflict(actor_id) + .do_update() + .set(community_form) + .get_result::(conn) + } } impl Joinable for CommunityModerator { diff --git a/crates/db_queries/src/source/person.rs b/crates/db_queries/src/source/person.rs index 6172b4e1..2b2e7d0e 100644 --- a/crates/db_queries/src/source/person.rs +++ b/crates/db_queries/src/source/person.rs @@ -1,4 +1,5 @@ use crate::{ApubObject, Crud}; +use chrono::NaiveDateTime; use diesel::{dsl::*, result::Error, *}; use lemmy_db_schema::{ naive_now, @@ -181,7 +182,10 @@ impl Crud for Person { } impl ApubObject for Person { - type Form = PersonForm; + fn last_refreshed_at(&self) -> Option { + Some(self.last_refreshed_at) + } + fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { use lemmy_db_schema::schema::person::dsl::*; person @@ -189,15 +193,6 @@ impl ApubObject for Person { .filter(actor_id.eq(object_id)) .first::(conn) } - - fn upsert(conn: &PgConnection, person_form: &PersonForm) -> Result { - insert_into(person) - .values(person_form) - .on_conflict(actor_id) - .do_update() - .set(person_form) - .get_result::(conn) - } } pub trait Person_ { @@ -206,6 +201,7 @@ pub trait Person_ { fn find_by_name(conn: &PgConnection, name: &str) -> Result; fn mark_as_updated(conn: &PgConnection, person_id: PersonId) -> Result; fn delete_account(conn: &PgConnection, person_id: PersonId) -> Result; + fn upsert(conn: &PgConnection, person_form: &PersonForm) -> Result; } impl Person_ for Person { @@ -256,6 +252,15 @@ impl Person_ for Person { )) .get_result::(conn) } + + fn upsert(conn: &PgConnection, person_form: &PersonForm) -> Result { + insert_into(person) + .values(person_form) + .on_conflict(actor_id) + .do_update() + .set(person_form) + .get_result::(conn) + } } #[cfg(test)] diff --git a/crates/db_queries/src/source/post.rs b/crates/db_queries/src/source/post.rs index 02ae4d6e..7f185f42 100644 --- a/crates/db_queries/src/source/post.rs +++ b/crates/db_queries/src/source/post.rs @@ -1,4 +1,5 @@ use crate::{ApubObject, Crud, DeleteableOrRemoveable, Likeable, Readable, Saveable}; +use chrono::NaiveDateTime; use diesel::{dsl::*, result::Error, *}; use lemmy_db_schema::{ naive_now, @@ -72,6 +73,7 @@ pub trait Post_ { new_stickied: bool, ) -> Result; fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool; + fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result; } impl Post_ for Post { @@ -179,14 +181,6 @@ impl Post_ for Post { fn is_post_creator(person_id: PersonId, post_creator_id: PersonId) -> bool { person_id == post_creator_id } -} - -impl ApubObject for Post { - type Form = PostForm; - fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { - use lemmy_db_schema::schema::post::dsl::*; - post.filter(ap_id.eq(object_id)).first::(conn) - } fn upsert(conn: &PgConnection, post_form: &PostForm) -> Result { use lemmy_db_schema::schema::post::dsl::*; @@ -199,6 +193,17 @@ impl ApubObject for Post { } } +impl ApubObject for Post { + fn last_refreshed_at(&self) -> Option { + None + } + + fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result { + use lemmy_db_schema::schema::post::dsl::*; + post.filter(ap_id.eq(object_id)).first::(conn) + } +} + impl Likeable for PostLike { type Form = PostLikeForm; type IdType = PostId; diff --git a/crates/db_queries/src/source/private_message.rs b/crates/db_queries/src/source/private_message.rs index c1138b97..71dca04c 100644 --- a/crates/db_queries/src/source/private_message.rs +++ b/crates/db_queries/src/source/private_message.rs @@ -1,4 +1,5 @@ use crate::{ApubObject, Crud, DeleteableOrRemoveable}; +use chrono::NaiveDateTime; use diesel::{dsl::*, result::Error, *}; use lemmy_db_schema::{naive_now, source::private_message::*, DbUrl, PersonId, PrivateMessageId}; @@ -30,7 +31,10 @@ impl Crud for PrivateMessage { } impl ApubObject for PrivateMessage { - type Form = PrivateMessageForm; + fn last_refreshed_at(&self) -> Option { + None + } + fn read_from_apub_id(conn: &PgConnection, object_id: &DbUrl) -> Result where Self: Sized, @@ -40,16 +44,6 @@ impl ApubObject for PrivateMessage { .filter(ap_id.eq(object_id)) .first::(conn) } - - fn upsert(conn: &PgConnection, private_message_form: &PrivateMessageForm) -> Result { - use lemmy_db_schema::schema::private_message::dsl::*; - insert_into(private_message) - .values(private_message_form) - .on_conflict(ap_id) - .do_update() - .set(private_message_form) - .get_result::(conn) - } } pub trait PrivateMessage_ { @@ -77,6 +71,10 @@ pub trait PrivateMessage_ { conn: &PgConnection, for_recipient_id: PersonId, ) -> Result, Error>; + fn upsert( + conn: &PgConnection, + private_message_form: &PrivateMessageForm, + ) -> Result; } impl PrivateMessage_ for PrivateMessage { @@ -138,6 +136,19 @@ impl PrivateMessage_ for PrivateMessage { .set(read.eq(true)) .get_results::(conn) } + + fn upsert( + conn: &PgConnection, + private_message_form: &PrivateMessageForm, + ) -> Result { + use lemmy_db_schema::schema::private_message::dsl::*; + insert_into(private_message) + .values(private_message_form) + .on_conflict(ap_id) + .do_update() + .set(private_message_form) + .get_result::(conn) + } } impl DeleteableOrRemoveable for PrivateMessage {