Split activity table into sent and received parts (fixes #3103) (#3583)
authorNutomic <me@nutomic.com>
Fri, 14 Jul 2023 15:17:06 +0000 (17:17 +0200)
committerGitHub <noreply@github.com>
Fri, 14 Jul 2023 15:17:06 +0000 (11:17 -0400)
* Split activity table into sent and received parts (fixes #3103)

The received activities are only stored in order to avoid processing
the same incoming activity multiple times. For this purpose it is
completely unnecessary to store the data. So we can split the
table into sent_activity and received_activity parts, where
only sent_activity table needs to store activity data. This should
reduce storage use significantly.

Also reduces activity storage duration to three months, we can reduce
this further if necessary.

Additionally the id columns of activity tables are removed because
they are completely unused and risk overflowing (fixes #3560).

* address review

* move insert_received_activity() methods to verify handlers

* remove unnecessary conflict line

* clippy

* use on conflict, add tests

28 files changed:
crates/apub/src/activities/block/block_user.rs
crates/apub/src/activities/block/undo_block_user.rs
crates/apub/src/activities/community/announce.rs
crates/apub/src/activities/community/collection_add.rs
crates/apub/src/activities/community/collection_remove.rs
crates/apub/src/activities/community/lock_page.rs
crates/apub/src/activities/community/report.rs
crates/apub/src/activities/community/update.rs
crates/apub/src/activities/create_or_update/comment.rs
crates/apub/src/activities/create_or_update/post.rs
crates/apub/src/activities/create_or_update/private_message.rs
crates/apub/src/activities/deletion/delete.rs
crates/apub/src/activities/deletion/delete_user.rs
crates/apub/src/activities/deletion/undo_delete.rs
crates/apub/src/activities/following/accept.rs
crates/apub/src/activities/following/follow.rs
crates/apub/src/activities/following/undo_follow.rs
crates/apub/src/activities/mod.rs
crates/apub/src/activities/voting/undo_vote.rs
crates/apub/src/activities/voting/vote.rs
crates/apub/src/http/mod.rs
crates/apub/src/lib.rs
crates/db_schema/src/impls/activity.rs
crates/db_schema/src/schema.rs
crates/db_schema/src/source/activity.rs
migrations/2023-07-11-084714_receive_activity_table/down.sql [new file with mode: 0644]
migrations/2023-07-11-084714_receive_activity_table/up.sql [new file with mode: 0644]
src/scheduled_tasks.rs

index f8a1e4b8d79bc18faf34cfe857ac8c945f0ed1f7..55642f86238e717202f6bdab3788be64923780ce 100644 (file)
@@ -9,7 +9,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::{instance::remote_instance_inboxes, person::ApubPerson},
   protocol::activities::block::block_user::BlockUser,
 };
@@ -124,6 +124,7 @@ impl ActivityHandler for BlockUser {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     match self.target.dereference(context).await? {
       SiteOrCommunity::Site(site) => {
@@ -147,7 +148,6 @@ impl ActivityHandler for BlockUser {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let expires = self.expires.map(|u| u.naive_local());
     let mod_person = self.actor.dereference(context).await?;
     let blocked_person = self.object.dereference(context).await?;
index b31f8b4b26f7245e8aa7fcbc37c10b8f6ccb4ddb..f683497945e52974d2061dbbb494db41a6bcd679 100644 (file)
@@ -7,7 +7,7 @@ use crate::{
     verify_is_public,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::{instance::remote_instance_inboxes, person::ApubPerson},
   protocol::activities::block::{block_user::BlockUser, undo_block_user::UndoBlockUser},
 };
@@ -88,6 +88,7 @@ impl ActivityHandler for UndoBlockUser {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     verify_domains_match(self.actor.inner(), self.object.actor.inner())?;
     self.object.verify(context).await?;
@@ -96,7 +97,6 @@ impl ActivityHandler for UndoBlockUser {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let expires = self.object.expires.map(|u| u.naive_local());
     let mod_person = self.actor.dereference(context).await?;
     let blocked_person = self.object.object.dereference(context).await?;
index e33e9fbf482ed1a02d978101574f04669187ccc4..ed489158ef1b157287a896a3e38f1fac935f5870 100644 (file)
@@ -6,7 +6,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::community::ApubCommunity,
   protocol::{
     activities::community::announce::{AnnounceActivity, RawAnnouncableActivities},
@@ -133,14 +133,14 @@ impl ActivityHandler for AnnounceActivity {
   }
 
   #[tracing::instrument(skip_all)]
-  async fn verify(&self, _context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+  async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     Ok(())
   }
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let object: AnnouncableActivities = self.object.object(context).await?.try_into()?;
     // This is only for sending, not receiving so we reject it.
     if let AnnouncableActivities::Page(_) = object {
index d08b0cb486d8c67e069a57423fba92bce0340a70..c36a8f0dafc0a3f75dec9596af783bead92cd743 100644 (file)
@@ -7,7 +7,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
   protocol::{
     activities::community::{collection_add::CollectionAdd, collection_remove::CollectionRemove},
@@ -108,6 +108,7 @@ impl ActivityHandler for CollectionAdd {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
@@ -117,7 +118,6 @@ impl ActivityHandler for CollectionAdd {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let (community, collection_type) =
       Community::get_by_collection_url(&mut context.pool(), &self.target.into()).await?;
     match collection_type {
index a1c443ea8c57ca7a848d03ae15da34933499ef6c..28214284b1b93d27a6f8539a0dd4d8a429b3e927 100644 (file)
@@ -7,7 +7,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
   protocol::{activities::community::collection_remove::CollectionRemove, InCommunity},
 };
@@ -101,6 +101,7 @@ impl ActivityHandler for CollectionRemove {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
@@ -110,7 +111,6 @@ impl ActivityHandler for CollectionRemove {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let (community, collection_type) =
       Community::get_by_collection_url(&mut context.pool(), &self.target.into()).await?;
     match collection_type {
index 0416b972aee88d46cb60bf28acd27c3eb077eee4..94135ede900cf3579af8641b573cd20ad38d853d 100644 (file)
@@ -8,7 +8,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   protocol::{
     activities::community::lock_page::{LockPage, LockType, UndoLockPage},
     InCommunity,
@@ -79,6 +79,7 @@ impl ActivityHandler for UndoLockPage {
   }
 
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), Self::Error> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
@@ -94,7 +95,6 @@ impl ActivityHandler for UndoLockPage {
   }
 
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), Self::Error> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let form = PostUpdateForm::builder().locked(Some(false)).build();
     let post = self.object.object.dereference(context).await?;
     Post::update(&mut context.pool(), post.id, &form).await?;
index 1dffacc39faa666d1847e5d987c73de509371fdc..67b84644e6b473078d7833823cdcb4780828178c 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   activities::{generate_activity_id, send_lemmy_activity, verify_person_in_community},
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   protocol::{activities::community::report::Report, InCommunity},
   PostOrComment,
@@ -115,6 +115,7 @@ impl ActivityHandler for Report {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
     Ok(())
@@ -122,7 +123,6 @@ impl ActivityHandler for Report {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     let actor = self.actor.dereference(context).await?;
     match self.object.dereference(context).await? {
       PostOrComment::Post(post) => {
index 3e697fddcca328b28c8ace2c63e6de173b5a86c8..fe2477d6efdf6a63102edee0e3396521a4ff07cb 100644 (file)
@@ -7,7 +7,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   protocol::{activities::community::update::UpdateCommunity, InCommunity},
   SendActivity,
@@ -82,6 +82,7 @@ impl ActivityHandler for UpdateCommunity {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
@@ -92,7 +93,6 @@ impl ActivityHandler for UpdateCommunity {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let community = self.community(context).await?;
 
     let community_update_form = self.object.into_update_form();
index 804f1827bb23a1cc7ade6e0f5a9f5b17961b2ca4..51b87ed27fc2271230f434973f008c327937d721 100644 (file)
@@ -7,7 +7,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   mentions::MentionOrValue,
   objects::{comment::ApubComment, community::ApubCommunity, person::ApubPerson},
   protocol::{
@@ -154,6 +154,7 @@ impl ActivityHandler for CreateOrUpdateNote {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     let post = self.object.get_parents(context).await?.0;
     let community = self.community(context).await?;
@@ -169,7 +170,6 @@ impl ActivityHandler for CreateOrUpdateNote {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     // Need to do this check here instead of Note::from_json because we need the person who
     // send the activity, not the comment author.
     let existing_comment = self.object.id.dereference_local(context).await.ok();
index e0ce0fec4cc48a85e8bfdada8cb1a0c77c17d2e7..77199056d53f044c236f9d74aac1b569daeb1568 100644 (file)
@@ -8,7 +8,7 @@ use crate::{
     verify_person_in_community,
   },
   activity_lists::AnnouncableActivities,
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
   protocol::{
     activities::{create_or_update::page::CreateOrUpdatePage, CreateOrUpdateType},
@@ -146,6 +146,7 @@ impl ActivityHandler for CreateOrUpdatePage {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &self.cc)?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
@@ -180,7 +181,6 @@ impl ActivityHandler for CreateOrUpdatePage {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let post = ApubPost::from_json(self.object, context).await?;
 
     // author likes their own post by default
index 36c9785da04291cb6e7214d98f080a2954f69f47..3eaad2f713ad5e1eedcc5295f9a8b8685b58db6c 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   activities::{generate_activity_id, send_lemmy_activity, verify_person},
-  insert_activity,
+  insert_received_activity,
   objects::{person::ApubPerson, private_message::ApubPrivateMessage},
   protocol::activities::{
     create_or_update::chat_message::CreateOrUpdateChatMessage,
@@ -109,6 +109,7 @@ impl ActivityHandler for CreateOrUpdateChatMessage {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_person(&self.actor, context).await?;
     verify_domains_match(self.actor.inner(), self.object.id.inner())?;
     verify_domains_match(self.to[0].inner(), self.object.to[0].inner())?;
@@ -118,7 +119,6 @@ impl ActivityHandler for CreateOrUpdateChatMessage {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     ApubPrivateMessage::from_json(self.object, context).await?;
     Ok(())
   }
index 8ad104173d6e4bc32e33fba735c1ce5c3d40733e..fcdede8d76745460653ea97874f3317633146b61 100644 (file)
@@ -3,7 +3,7 @@ use crate::{
     deletion::{receive_delete_action, verify_delete_activity, DeletableObjects},
     generate_activity_id,
   },
-  insert_activity,
+  insert_received_activity,
   objects::person::ApubPerson,
   protocol::{activities::deletion::delete::Delete, IdOrNestedObject},
 };
@@ -43,13 +43,13 @@ impl ActivityHandler for Delete {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_delete_activity(self, self.summary.is_some(), context).await?;
     Ok(())
   }
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     if let Some(reason) = self.summary {
       // We set reason to empty string if it doesn't exist, to distinguish between delete and
       // remove. Here we change it back to option, so we don't write it to db.
index d74a3c8aa3f16a0c8945e6bd6d364258cc74b7aa..b388ed9e1ec597642a672756f65a33495af069a3 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person},
-  insert_activity,
+  insert_received_activity,
   objects::{instance::remote_instance_inboxes, person::ApubPerson},
   protocol::activities::deletion::delete_user::DeleteUser,
   SendActivity,
@@ -73,6 +73,7 @@ impl ActivityHandler for DeleteUser {
   }
 
   async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_is_public(&self.to, &[])?;
     verify_person(&self.actor, context).await?;
     verify_urls_match(self.actor.inner(), self.object.inner())?;
@@ -80,7 +81,6 @@ impl ActivityHandler for DeleteUser {
   }
 
   async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     let actor = self.actor.dereference(context).await?;
     delete_user_account(
       actor.id,
index e10bd066037d2ecf33deb3046f0ea57a8fea7a6c..541a7455fe376082931f957524c859bcdd357236 100644 (file)
@@ -3,7 +3,7 @@ use crate::{
     deletion::{receive_delete_action, verify_delete_activity, DeletableObjects},
     generate_activity_id,
   },
-  insert_activity,
+  insert_received_activity,
   objects::person::ApubPerson,
   protocol::activities::deletion::{delete::Delete, undo_delete::UndoDelete},
 };
@@ -42,6 +42,7 @@ impl ActivityHandler for UndoDelete {
   }
 
   async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
+    insert_received_activity(&self.id, data).await?;
     self.object.verify(data).await?;
     verify_delete_activity(&self.object, self.object.summary.is_some(), data).await?;
     Ok(())
@@ -49,7 +50,6 @@ impl ActivityHandler for UndoDelete {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, false, context).await?;
     if self.object.summary.is_some() {
       UndoDelete::receive_undo_remove_action(
         &self.actor.dereference(context).await?,
index af7d637255356941afbfa93469546da9a1909a7c..adaad51d14b263d1d41fa2cf07ce4d5814a1a6e1 100644 (file)
@@ -1,6 +1,6 @@
 use crate::{
   activities::{generate_activity_id, send_lemmy_activity},
-  insert_activity,
+  insert_received_activity,
   protocol::activities::following::{accept::AcceptFollow, follow::Follow},
 };
 use activitypub_federation::{
@@ -50,6 +50,7 @@ impl ActivityHandler for AcceptFollow {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_urls_match(self.actor.inner(), self.object.object.inner())?;
     self.object.verify(context).await?;
     if let Some(to) = &self.to {
@@ -60,7 +61,6 @@ impl ActivityHandler for AcceptFollow {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     let community = self.actor.dereference(context).await?;
     let person = self.object.actor.dereference(context).await?;
     // This will throw an error if no follow was requested
index 073784da12d32def1068771e1ca725de613dbdf3..2f0f5037aed69d6d3abed8572bb0a5afe2b68f1c 100644 (file)
@@ -6,7 +6,7 @@ use crate::{
     verify_person_in_community,
   },
   fetcher::user_or_community::UserOrCommunity,
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   protocol::activities::following::{
     accept::AcceptFollow,
@@ -90,6 +90,7 @@ impl ActivityHandler for Follow {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_person(&self.actor, context).await?;
     let object = self.object.dereference(context).await?;
     if let UserOrCommunity::Community(c) = object {
@@ -103,7 +104,6 @@ impl ActivityHandler for Follow {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     let actor = self.actor.dereference(context).await?;
     let object = self.object.dereference(context).await?;
     match object {
index 9f18ccfbc92558f0d11ea7f2d268c19ff4e92673..c36b36df8cb4c118464387c6690d687114d7ccdc 100644 (file)
@@ -1,7 +1,7 @@
 use crate::{
   activities::{generate_activity_id, send_lemmy_activity, verify_person},
   fetcher::user_or_community::UserOrCommunity,
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   protocol::activities::following::{follow::Follow, undo_follow::UndoFollow},
 };
@@ -60,6 +60,7 @@ impl ActivityHandler for UndoFollow {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     verify_urls_match(self.actor.inner(), self.object.actor.inner())?;
     verify_person(&self.actor, context).await?;
     self.object.verify(context).await?;
@@ -71,7 +72,6 @@ impl ActivityHandler for UndoFollow {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     let person = self.actor.dereference(context).await?;
     let object = self.object.object.dereference(context).await?;
 
index e0b46e0e7cae9d9e4e99f4692e446df2ba473718..4fd8da536f20f7e01a39cd545c5e6165e7ea1a4a 100644 (file)
@@ -1,5 +1,4 @@
 use crate::{
-  insert_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   CONTEXT,
 };
@@ -15,7 +14,11 @@ use anyhow::anyhow;
 use lemmy_api_common::context::LemmyContext;
 use lemmy_db_schema::{
   newtypes::CommunityId,
-  source::{community::Community, instance::Instance},
+  source::{
+    activity::{SentActivity, SentActivityForm},
+    community::Community,
+    instance::Instance,
+  },
 };
 use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
 use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType};
@@ -184,7 +187,12 @@ where
   info!("Sending activity {}", activity.id().to_string());
   let activity = WithContext::new(activity, CONTEXT.deref().clone());
 
-  insert_activity(activity.id(), &activity, true, sensitive, data).await?;
+  let form = SentActivityForm {
+    ap_id: activity.id().clone().into(),
+    data: serde_json::to_value(activity.clone())?,
+    sensitive,
+  };
+  SentActivity::create(&mut data.pool(), form).await?;
   send_activity(activity, actor, inbox, data).await?;
 
   Ok(())
index bcb8ee4068241c22083291feb6d9b2958de0cc37..9616c651f46d59a6188c046cae35aa5baf6a6b13 100644 (file)
@@ -4,7 +4,7 @@ use crate::{
     verify_person_in_community,
     voting::{undo_vote_comment, undo_vote_post},
   },
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   protocol::{
     activities::voting::{undo_vote::UndoVote, vote::Vote},
@@ -57,6 +57,7 @@ impl ActivityHandler for UndoVote {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
     verify_urls_match(self.actor.inner(), self.object.actor.inner())?;
@@ -66,7 +67,6 @@ impl ActivityHandler for UndoVote {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     let actor = self.actor.dereference(context).await?;
     let object = self.object.object.dereference(context).await?;
     match object {
index 4de9a8c176a13fd98da5cfeedf65d047f32b75b1..ef4572986f0c3d52d15424d7f7b5356bbe72aac5 100644 (file)
@@ -4,7 +4,7 @@ use crate::{
     verify_person_in_community,
     voting::{vote_comment, vote_post},
   },
-  insert_activity,
+  insert_received_activity,
   objects::{community::ApubCommunity, person::ApubPerson},
   protocol::{
     activities::voting::vote::{Vote, VoteType},
@@ -56,6 +56,7 @@ impl ActivityHandler for Vote {
 
   #[tracing::instrument(skip_all)]
   async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+    insert_received_activity(&self.id, context).await?;
     let community = self.community(context).await?;
     verify_person_in_community(&self.actor, &community, context).await?;
     let enable_downvotes = LocalSite::read(&mut context.pool())
@@ -70,7 +71,6 @@ impl ActivityHandler for Vote {
 
   #[tracing::instrument(skip_all)]
   async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
-    insert_activity(&self.id, &self, false, true, context).await?;
     let actor = self.actor.dereference(context).await?;
     let object = self.object.dereference(context).await?;
     match object {
index 52a0144348f0c8197e98a4c94da6625e2ae5aa91..c261d9e4929c363385e10756d491967a25f29148 100644 (file)
@@ -13,7 +13,7 @@ use activitypub_federation::{
 use actix_web::{web, web::Bytes, HttpRequest, HttpResponse};
 use http::StatusCode;
 use lemmy_api_common::context::LemmyContext;
-use lemmy_db_schema::source::activity::Activity;
+use lemmy_db_schema::source::activity::SentActivity;
 use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
 use serde::{Deserialize, Serialize};
 use std::ops::Deref;
@@ -88,12 +88,10 @@ pub(crate) async fn get_activity(
     info.id
   ))?
   .into();
-  let activity = Activity::read_from_apub_id(&mut context.pool(), &activity_id).await?;
+  let activity = SentActivity::read_from_apub_id(&mut context.pool(), &activity_id).await?;
 
   let sensitive = activity.sensitive;
-  if !activity.local {
-    Err(err_object_not_local())
-  } else if sensitive {
+  if sensitive {
     Ok(HttpResponse::Forbidden().finish())
   } else {
     create_apub_response(&activity.data)
index 8d818602262eb031971edd65649f1ac1f9dc7804..9a45284f22fbf609dfbbdf0196847e685b8cdbf0 100644 (file)
@@ -3,18 +3,12 @@ use activitypub_federation::config::{Data, UrlVerifier};
 use async_trait::async_trait;
 use lemmy_api_common::context::LemmyContext;
 use lemmy_db_schema::{
-  source::{
-    activity::{Activity, ActivityInsertForm},
-    instance::Instance,
-    local_site::LocalSite,
-  },
-  traits::Crud,
+  source::{activity::ReceivedActivity, instance::Instance, local_site::LocalSite},
   utils::{ActualDbPool, DbPool},
 };
 use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
 use moka::future::Cache;
 use once_cell::sync::Lazy;
-use serde::Serialize;
 use std::{sync::Arc, time::Duration};
 use url::Url;
 
@@ -178,30 +172,16 @@ pub(crate) async fn check_apub_id_valid_with_strictness(
   Ok(())
 }
 
-/// Store a sent or received activity in the database.
+/// Store received activities in the database.
 ///
-/// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also
-/// ensures that the same activity cannot be received more than once.
-#[tracing::instrument(skip(data, activity))]
-async fn insert_activity<T>(
+/// This ensures that the same activity doesnt get received and processed more than once, which
+/// would be a waste of resources.
+#[tracing::instrument(skip(data))]
+async fn insert_received_activity(
   ap_id: &Url,
-  activity: &T,
-  local: bool,
-  sensitive: bool,
   data: &Data<LemmyContext>,
-) -> Result<(), LemmyError>
-where
-  T: Serialize,
-{
-  let ap_id = ap_id.clone().into();
-  let form = ActivityInsertForm {
-    ap_id,
-    data: serde_json::to_value(activity)?,
-    local: Some(local),
-    sensitive: Some(sensitive),
-    updated: None,
-  };
-  Activity::create(&mut data.pool(), &form).await?;
+) -> Result<(), LemmyError> {
+  ReceivedActivity::create(&mut data.pool(), &ap_id.clone().into()).await?;
   Ok(())
 }
 
index 4e581f95cfb1b3aeec5399750b77db526f0b303e..adda4fc763fc771a53bba0e28112fc5a1a14733a 100644 (file)
 use crate::{
+  diesel::OptionalExtension,
   newtypes::DbUrl,
-  schema::activity::dsl::{activity, ap_id},
-  source::activity::{Activity, ActivityInsertForm, ActivityUpdateForm},
-  traits::Crud,
+  source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
   utils::{get_conn, DbPool},
 };
-use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl};
+use diesel::{
+  dsl::insert_into,
+  result::{DatabaseErrorKind, Error, Error::DatabaseError},
+  ExpressionMethods,
+  QueryDsl,
+};
 use diesel_async::RunQueryDsl;
 
-#[async_trait]
-impl Crud for Activity {
-  type InsertForm = ActivityInsertForm;
-  type UpdateForm = ActivityUpdateForm;
-  type IdType = i32;
-  async fn read(pool: &mut DbPool<'_>, activity_id: i32) -> Result<Self, Error> {
-    let conn = &mut get_conn(pool).await?;
-    activity.find(activity_id).first::<Self>(conn).await
-  }
-
-  async fn create(pool: &mut DbPool<'_>, new_activity: &Self::InsertForm) -> Result<Self, Error> {
+impl SentActivity {
+  pub async fn create(pool: &mut DbPool<'_>, form: SentActivityForm) -> Result<Self, Error> {
+    use crate::schema::sent_activity::dsl::sent_activity;
     let conn = &mut get_conn(pool).await?;
-    insert_into(activity)
-      .values(new_activity)
+    insert_into(sent_activity)
+      .values(form)
       .get_result::<Self>(conn)
       .await
   }
 
-  async fn update(
-    pool: &mut DbPool<'_>,
-    activity_id: i32,
-    new_activity: &Self::UpdateForm,
-  ) -> Result<Self, Error> {
+  pub async fn read_from_apub_id(pool: &mut DbPool<'_>, object_id: &DbUrl) -> Result<Self, Error> {
+    use crate::schema::sent_activity::dsl::{ap_id, sent_activity};
     let conn = &mut get_conn(pool).await?;
-    diesel::update(activity.find(activity_id))
-      .set(new_activity)
-      .get_result::<Self>(conn)
-      .await
-  }
-  async fn delete(pool: &mut DbPool<'_>, activity_id: i32) -> Result<usize, Error> {
-    let conn = &mut get_conn(pool).await?;
-    diesel::delete(activity.find(activity_id))
-      .execute(conn)
+    sent_activity
+      .filter(ap_id.eq(object_id))
+      .first::<Self>(conn)
       .await
   }
 }
 
-impl Activity {
-  pub async fn read_from_apub_id(
-    pool: &mut DbPool<'_>,
-    object_id: &DbUrl,
-  ) -> Result<Activity, Error> {
+impl ReceivedActivity {
+  pub async fn create(pool: &mut DbPool<'_>, ap_id_: &DbUrl) -> Result<(), Error> {
+    use crate::schema::received_activity::dsl::{ap_id, id, received_activity};
     let conn = &mut get_conn(pool).await?;
-    activity
-      .filter(ap_id.eq(object_id))
-      .first::<Self>(conn)
+    let res = insert_into(received_activity)
+      .values(ap_id.eq(ap_id_))
+      .on_conflict_do_nothing()
+      .returning(id)
+      .get_result::<i64>(conn)
       .await
+      .optional()?;
+    if res.is_some() {
+      // new activity inserted successfully
+      Ok(())
+    } else {
+      // duplicate activity
+      Err(DatabaseError(
+        DatabaseErrorKind::UniqueViolation,
+        Box::<String>::default(),
+      ))
+    }
   }
 }
 
 #[cfg(test)]
 mod tests {
   use super::*;
-  use crate::{
-    newtypes::DbUrl,
-    source::{
-      activity::{Activity, ActivityInsertForm},
-      instance::Instance,
-      person::{Person, PersonInsertForm},
-    },
-    utils::build_db_pool_for_tests,
-  };
-  use serde_json::Value;
+  use crate::utils::build_db_pool_for_tests;
+  use serde_json::json;
   use serial_test::serial;
   use url::Url;
 
   #[tokio::test]
   #[serial]
-  async fn test_crud() {
+  async fn receive_activity_duplicate() {
     let pool = &build_db_pool_for_tests().await;
     let pool = &mut pool.into();
+    let ap_id: DbUrl = Url::parse("http://example.com/activity/531")
+      .unwrap()
+      .into();
 
-    let inserted_instance = Instance::read_or_create(pool, "my_domain.tld".to_string())
-      .await
-      .unwrap();
-
-    let creator_form = PersonInsertForm::builder()
-      .name("activity_creator_ pm".into())
-      .public_key("pubkey".to_string())
-      .instance_id(inserted_instance.id)
-      .build();
+    // inserting activity for first time
+    let res = ReceivedActivity::create(pool, &ap_id).await;
+    assert!(res.is_ok());
 
-    let inserted_creator = Person::create(pool, &creator_form).await.unwrap();
-
-    let ap_id_: DbUrl = Url::parse(
-      "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
-    )
-    .unwrap()
-    .into();
-    let test_json: Value = serde_json::from_str(
-      r#"{
-    "@context": "https://www.w3.org/ns/activitystreams",
-    "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
-    "type": "Delete",
-    "actor": "https://enterprise.lemmy.ml/u/riker",
-    "to": "https://www.w3.org/ns/activitystreams#Public",
-    "cc": [
-        "https://enterprise.lemmy.ml/c/main/"
-    ],
-    "object": "https://enterprise.lemmy.ml/post/32"
-    }"#,
-    )
-    .unwrap();
-    let activity_form = ActivityInsertForm {
-      ap_id: ap_id_.clone(),
-      data: test_json.clone(),
-      local: Some(true),
-      sensitive: Some(false),
-      updated: None,
-    };
+    let res = ReceivedActivity::create(pool, &ap_id).await;
+    assert!(res.is_err());
+  }
 
-    let inserted_activity = Activity::create(pool, &activity_form).await.unwrap();
+  #[tokio::test]
+  #[serial]
+  async fn sent_activity_write_read() {
+    let pool = &build_db_pool_for_tests().await;
+    let pool = &mut pool.into();
+    let ap_id: DbUrl = Url::parse("http://example.com/activity/412")
+      .unwrap()
+      .into();
+    let data = json!({
+        "key1": "0xF9BA143B95FF6D82",
+        "key2": "42",
+    });
+    let sensitive = false;
 
-    let expected_activity = Activity {
-      ap_id: ap_id_.clone(),
-      id: inserted_activity.id,
-      data: test_json,
-      local: true,
-      sensitive: false,
-      published: inserted_activity.published,
-      updated: None,
+    let form = SentActivityForm {
+      ap_id: ap_id.clone(),
+      data: data.clone(),
+      sensitive,
     };
 
-    let read_activity = Activity::read(pool, inserted_activity.id).await.unwrap();
-    let read_activity_by_apub_id = Activity::read_from_apub_id(pool, &ap_id_).await.unwrap();
-    Person::delete(pool, inserted_creator.id).await.unwrap();
-    Activity::delete(pool, inserted_activity.id).await.unwrap();
+    SentActivity::create(pool, form).await.unwrap();
 
-    assert_eq!(expected_activity, read_activity);
-    assert_eq!(expected_activity, read_activity_by_apub_id);
-    assert_eq!(expected_activity, inserted_activity);
+    let res = SentActivity::read_from_apub_id(pool, &ap_id).await.unwrap();
+    assert_eq!(res.ap_id, ap_id);
+    assert_eq!(res.data, data);
+    assert_eq!(res.sensitive, sensitive);
   }
 }
index e503a827467df50692f92113e6427873452a8ede..ae75c31d8c371c5d1eed9348189fce7046110b10 100644 (file)
@@ -14,18 +14,6 @@ pub mod sql_types {
     pub struct SortTypeEnum;
 }
 
-diesel::table! {
-    activity (id) {
-        id -> Int4,
-        data -> Jsonb,
-        local -> Bool,
-        published -> Timestamp,
-        updated -> Nullable<Timestamp>,
-        ap_id -> Text,
-        sensitive -> Bool,
-    }
-}
-
 diesel::table! {
     admin_purge_comment (id) {
         id -> Int4,
@@ -762,6 +750,14 @@ diesel::table! {
     }
 }
 
+diesel::table! {
+    received_activity (id) {
+        id -> Int8,
+        ap_id -> Text,
+        published -> Timestamp,
+    }
+}
+
 diesel::table! {
     registration_application (id) {
         id -> Int4,
@@ -780,6 +776,16 @@ diesel::table! {
     }
 }
 
+diesel::table! {
+    sent_activity (id) {
+        id -> Int8,
+        ap_id -> Text,
+        data -> Json,
+        sensitive -> Bool,
+        published -> Timestamp,
+    }
+}
+
 diesel::table! {
     site (id) {
         id -> Int4,
@@ -920,7 +926,6 @@ diesel::joinable!(site_language -> site (site_id));
 diesel::joinable!(tagline -> local_site (local_site_id));
 
 diesel::allow_tables_to_appear_in_same_query!(
-    activity,
     admin_purge_comment,
     admin_purge_community,
     admin_purge_person,
@@ -977,8 +982,10 @@ diesel::allow_tables_to_appear_in_same_query!(
     post_saved,
     private_message,
     private_message_report,
+    received_activity,
     registration_application,
     secret,
+    sent_activity,
     site,
     site_aggregates,
     site_language,
index c5c8dd359ecc29f9512667a2df94e4683d66d4b1..85b193f51f525fcbab2df076eca8be2c87f225cc 100644 (file)
@@ -1,34 +1,28 @@
-use crate::{newtypes::DbUrl, schema::activity};
+use crate::{newtypes::DbUrl, schema::sent_activity};
 use serde_json::Value;
 use std::fmt::Debug;
 
-#[derive(PartialEq, Eq, Debug, Queryable, Identifiable)]
-#[diesel(table_name = activity)]
-pub struct Activity {
-  pub id: i32,
-  pub data: Value,
-  pub local: bool,
-  pub published: chrono::NaiveDateTime,
-  pub updated: Option<chrono::NaiveDateTime>,
+#[derive(PartialEq, Eq, Debug, Queryable)]
+#[diesel(table_name = sent_activity)]
+pub struct SentActivity {
+  pub id: i64,
   pub ap_id: DbUrl,
+  pub data: Value,
   pub sensitive: bool,
+  pub published: chrono::NaiveDateTime,
 }
-
 #[derive(Insertable)]
-#[diesel(table_name = activity)]
-pub struct ActivityInsertForm {
-  pub data: Value,
-  pub local: Option<bool>,
-  pub updated: Option<chrono::NaiveDateTime>,
+#[diesel(table_name = sent_activity)]
+pub struct SentActivityForm {
   pub ap_id: DbUrl,
-  pub sensitive: Option<bool>,
+  pub data: Value,
+  pub sensitive: bool,
 }
 
-#[derive(AsChangeset)]
-#[diesel(table_name = activity)]
-pub struct ActivityUpdateForm {
-  pub data: Option<Value>,
-  pub local: Option<bool>,
-  pub updated: Option<Option<chrono::NaiveDateTime>>,
-  pub sensitive: Option<bool>,
+#[derive(PartialEq, Eq, Debug, Queryable)]
+#[diesel(table_name = received_activity)]
+pub struct ReceivedActivity {
+  pub id: i64,
+  pub ap_id: DbUrl,
+  pub published: chrono::NaiveDateTime,
 }
diff --git a/migrations/2023-07-11-084714_receive_activity_table/down.sql b/migrations/2023-07-11-084714_receive_activity_table/down.sql
new file mode 100644 (file)
index 0000000..ea4f4d4
--- /dev/null
@@ -0,0 +1,21 @@
+create table activity (
+    id serial primary key,
+    data jsonb not null,
+    local boolean not null default true,
+    published timestamp not null default now(),
+    updated timestamp,
+    ap_id text not null,
+    sensitive boolean not null default true
+);
+
+insert into activity(ap_id, data, sensitive, published)
+    select ap_id, data, sensitive, published
+    from sent_activity
+    order by id desc
+    limit 100000;
+
+-- We cant copy received_activity entries back into activities table because we dont have data
+-- which is mandatory.
+
+drop table sent_activity;
+drop table received_activity;
\ No newline at end of file
diff --git a/migrations/2023-07-11-084714_receive_activity_table/up.sql b/migrations/2023-07-11-084714_receive_activity_table/up.sql
new file mode 100644 (file)
index 0000000..c6b30b7
--- /dev/null
@@ -0,0 +1,35 @@
+-- outgoing activities, need to be stored to be later server over http
+-- we change data column from jsonb to json for decreased size
+-- https://stackoverflow.com/a/22910602
+create table sent_activity (
+    id bigserial primary key,
+    ap_id text unique not null,
+    data json not null,
+    sensitive boolean not null,
+    published timestamp not null default now()
+);
+
+-- incoming activities, we only need the id to avoid processing the same activity multiple times
+create table received_activity (
+    id bigserial primary key,
+    ap_id text unique not null,
+    published timestamp not null default now()
+);
+
+-- copy sent activities to new table. only copy last 100k for faster migration
+insert into sent_activity(ap_id, data, sensitive, published)
+    select ap_id, data, sensitive, published
+    from activity
+    where local = true
+    order by id desc
+    limit 100000;
+
+-- copy received activities to new table. only last 1m for faster migration
+insert into received_activity(ap_id, published)
+    select ap_id, published
+    from activity
+    where local = false
+    order by id desc
+    limit 1000000;
+
+drop table activity;
index f20e61e12821d155ed0bdb6e02ffbc1a893ed1dd..ad97d1934a14cf313eaf6eddbf97f13188bc089d 100644 (file)
@@ -13,7 +13,16 @@ use diesel::{
 use diesel::{sql_query, PgConnection, RunQueryDsl};
 use lemmy_api_common::context::LemmyContext;
 use lemmy_db_schema::{
-  schema::{activity, captcha_answer, comment, community_person_ban, instance, person, post},
+  schema::{
+    captcha_answer,
+    comment,
+    community_person_ban,
+    instance,
+    person,
+    post,
+    received_activity,
+    sent_activity,
+  },
   source::instance::{Instance, InstanceForm},
   utils::{naive_now, DELETED_REPLACEMENT_TEXT},
 };
@@ -211,16 +220,17 @@ fn delete_expired_captcha_answers(conn: &mut PgConnection) {
 /// Clear old activities (this table gets very large)
 fn clear_old_activities(conn: &mut PgConnection) {
   info!("Clearing old activities...");
-  match diesel::delete(activity::table.filter(activity::published.lt(now - 6.months())))
+  diesel::delete(sent_activity::table.filter(sent_activity::published.lt(now - 3.months())))
     .execute(conn)
-  {
-    Ok(_) => {
-      info!("Done.");
-    }
-    Err(e) => {
-      error!("Failed to clear old activities: {}", e)
-    }
-  }
+    .map_err(|e| error!("Failed to clear old sent activities: {}", e))
+    .ok();
+
+  diesel::delete(
+    received_activity::table.filter(received_activity::published.lt(now - 3.months())),
+  )
+  .execute(conn)
+  .map_err(|e| error!("Failed to clear old received activities: {}", e))
+  .ok();
 }
 
 /// overwrite posts and comments 30d after deletion