verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson},
protocol::activities::block::block_user::BlockUser,
};
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
match self.target.dereference(context).await? {
SiteOrCommunity::Site(site) => {
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let expires = self.expires.map(|u| u.naive_local());
let mod_person = self.actor.dereference(context).await?;
let blocked_person = self.object.dereference(context).await?;
verify_is_public,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson},
protocol::activities::block::{block_user::BlockUser, undo_block_user::UndoBlockUser},
};
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
verify_domains_match(self.actor.inner(), self.object.actor.inner())?;
self.object.verify(context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let expires = self.object.expires.map(|u| u.naive_local());
let mod_person = self.actor.dereference(context).await?;
let blocked_person = self.object.object.dereference(context).await?;
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::community::ApubCommunity,
protocol::{
activities::community::announce::{AnnounceActivity, RawAnnouncableActivities},
}
#[tracing::instrument(skip_all)]
- async fn verify(&self, _context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
Ok(())
}
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let object: AnnouncableActivities = self.object.object(context).await?.try_into()?;
// This is only for sending, not receiving so we reject it.
if let AnnouncableActivities::Page(_) = object {
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
protocol::{
activities::community::{collection_add::CollectionAdd, collection_remove::CollectionRemove},
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let (community, collection_type) =
Community::get_by_collection_url(&mut context.pool(), &self.target.into()).await?;
match collection_type {
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
protocol::{activities::community::collection_remove::CollectionRemove, InCommunity},
};
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let (community, collection_type) =
Community::get_by_collection_url(&mut context.pool(), &self.target.into()).await?;
match collection_type {
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
protocol::{
activities::community::lock_page::{LockPage, LockType, UndoLockPage},
InCommunity,
}
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), Self::Error> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
}
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), Self::Error> {
- insert_activity(&self.id, &self, false, false, context).await?;
let form = PostUpdateForm::builder().locked(Some(false)).build();
let post = self.object.object.dereference(context).await?;
Post::update(&mut context.pool(), post.id, &form).await?;
use crate::{
activities::{generate_activity_id, send_lemmy_activity, verify_person_in_community},
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson},
protocol::{activities::community::report::Report, InCommunity},
PostOrComment,
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
Ok(())
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
let actor = self.actor.dereference(context).await?;
match self.object.dereference(context).await? {
PostOrComment::Post(post) => {
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson},
protocol::{activities::community::update::UpdateCommunity, InCommunity},
SendActivity,
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let community = self.community(context).await?;
let community_update_form = self.object.into_update_form();
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
mentions::MentionOrValue,
objects::{comment::ApubComment, community::ApubCommunity, person::ApubPerson},
protocol::{
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
let post = self.object.get_parents(context).await?.0;
let community = self.community(context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
// Need to do this check here instead of Note::from_json because we need the person who
// send the activity, not the comment author.
let existing_comment = self.object.id.dereference_local(context).await.ok();
verify_person_in_community,
},
activity_lists::AnnouncableActivities,
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson, post::ApubPost},
protocol::{
activities::{create_or_update::page::CreateOrUpdatePage, CreateOrUpdateType},
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &self.cc)?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let post = ApubPost::from_json(self.object, context).await?;
// author likes their own post by default
use crate::{
activities::{generate_activity_id, send_lemmy_activity, verify_person},
- insert_activity,
+ insert_received_activity,
objects::{person::ApubPerson, private_message::ApubPrivateMessage},
protocol::activities::{
create_or_update::chat_message::CreateOrUpdateChatMessage,
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_person(&self.actor, context).await?;
verify_domains_match(self.actor.inner(), self.object.id.inner())?;
verify_domains_match(self.to[0].inner(), self.object.to[0].inner())?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
ApubPrivateMessage::from_json(self.object, context).await?;
Ok(())
}
deletion::{receive_delete_action, verify_delete_activity, DeletableObjects},
generate_activity_id,
},
- insert_activity,
+ insert_received_activity,
objects::person::ApubPerson,
protocol::{activities::deletion::delete::Delete, IdOrNestedObject},
};
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_delete_activity(self, self.summary.is_some(), context).await?;
Ok(())
}
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
if let Some(reason) = self.summary {
// We set reason to empty string if it doesn't exist, to distinguish between delete and
// remove. Here we change it back to option, so we don't write it to db.
use crate::{
activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person},
- insert_activity,
+ insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson},
protocol::activities::deletion::delete_user::DeleteUser,
SendActivity,
}
async fn verify(&self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_is_public(&self.to, &[])?;
verify_person(&self.actor, context).await?;
verify_urls_match(self.actor.inner(), self.object.inner())?;
}
async fn receive(self, context: &Data<Self::DataType>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
let actor = self.actor.dereference(context).await?;
delete_user_account(
actor.id,
deletion::{receive_delete_action, verify_delete_activity, DeletableObjects},
generate_activity_id,
},
- insert_activity,
+ insert_received_activity,
objects::person::ApubPerson,
protocol::activities::deletion::{delete::Delete, undo_delete::UndoDelete},
};
}
async fn verify(&self, data: &Data<Self::DataType>) -> Result<(), Self::Error> {
+ insert_received_activity(&self.id, data).await?;
self.object.verify(data).await?;
verify_delete_activity(&self.object, self.object.summary.is_some(), data).await?;
Ok(())
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, false, context).await?;
if self.object.summary.is_some() {
UndoDelete::receive_undo_remove_action(
&self.actor.dereference(context).await?,
use crate::{
activities::{generate_activity_id, send_lemmy_activity},
- insert_activity,
+ insert_received_activity,
protocol::activities::following::{accept::AcceptFollow, follow::Follow},
};
use activitypub_federation::{
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_urls_match(self.actor.inner(), self.object.object.inner())?;
self.object.verify(context).await?;
if let Some(to) = &self.to {
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
let community = self.actor.dereference(context).await?;
let person = self.object.actor.dereference(context).await?;
// This will throw an error if no follow was requested
verify_person_in_community,
},
fetcher::user_or_community::UserOrCommunity,
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson},
protocol::activities::following::{
accept::AcceptFollow,
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_person(&self.actor, context).await?;
let object = self.object.dereference(context).await?;
if let UserOrCommunity::Community(c) = object {
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
let actor = self.actor.dereference(context).await?;
let object = self.object.dereference(context).await?;
match object {
use crate::{
activities::{generate_activity_id, send_lemmy_activity, verify_person},
fetcher::user_or_community::UserOrCommunity,
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson},
protocol::activities::following::{follow::Follow, undo_follow::UndoFollow},
};
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
verify_urls_match(self.actor.inner(), self.object.actor.inner())?;
verify_person(&self.actor, context).await?;
self.object.verify(context).await?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
let person = self.actor.dereference(context).await?;
let object = self.object.object.dereference(context).await?;
use crate::{
- insert_activity,
objects::{community::ApubCommunity, person::ApubPerson},
CONTEXT,
};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
newtypes::CommunityId,
- source::{community::Community, instance::Instance},
+ source::{
+ activity::{SentActivity, SentActivityForm},
+ community::Community,
+ instance::Instance,
+ },
};
use lemmy_db_views_actor::structs::{CommunityPersonBanView, CommunityView};
use lemmy_utils::error::{LemmyError, LemmyErrorExt, LemmyErrorType};
info!("Sending activity {}", activity.id().to_string());
let activity = WithContext::new(activity, CONTEXT.deref().clone());
- insert_activity(activity.id(), &activity, true, sensitive, data).await?;
+ let form = SentActivityForm {
+ ap_id: activity.id().clone().into(),
+ data: serde_json::to_value(activity.clone())?,
+ sensitive,
+ };
+ SentActivity::create(&mut data.pool(), form).await?;
send_activity(activity, actor, inbox, data).await?;
Ok(())
verify_person_in_community,
voting::{undo_vote_comment, undo_vote_post},
},
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson},
protocol::{
activities::voting::{undo_vote::UndoVote, vote::Vote},
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
verify_urls_match(self.actor.inner(), self.object.actor.inner())?;
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
let actor = self.actor.dereference(context).await?;
let object = self.object.object.dereference(context).await?;
match object {
verify_person_in_community,
voting::{vote_comment, vote_post},
},
- insert_activity,
+ insert_received_activity,
objects::{community::ApubCommunity, person::ApubPerson},
protocol::{
activities::voting::vote::{Vote, VoteType},
#[tracing::instrument(skip_all)]
async fn verify(&self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
+ insert_received_activity(&self.id, context).await?;
let community = self.community(context).await?;
verify_person_in_community(&self.actor, &community, context).await?;
let enable_downvotes = LocalSite::read(&mut context.pool())
#[tracing::instrument(skip_all)]
async fn receive(self, context: &Data<LemmyContext>) -> Result<(), LemmyError> {
- insert_activity(&self.id, &self, false, true, context).await?;
let actor = self.actor.dereference(context).await?;
let object = self.object.dereference(context).await?;
match object {
use actix_web::{web, web::Bytes, HttpRequest, HttpResponse};
use http::StatusCode;
use lemmy_api_common::context::LemmyContext;
-use lemmy_db_schema::source::activity::Activity;
+use lemmy_db_schema::source::activity::SentActivity;
use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
use serde::{Deserialize, Serialize};
use std::ops::Deref;
info.id
))?
.into();
- let activity = Activity::read_from_apub_id(&mut context.pool(), &activity_id).await?;
+ let activity = SentActivity::read_from_apub_id(&mut context.pool(), &activity_id).await?;
let sensitive = activity.sensitive;
- if !activity.local {
- Err(err_object_not_local())
- } else if sensitive {
+ if sensitive {
Ok(HttpResponse::Forbidden().finish())
} else {
create_apub_response(&activity.data)
use async_trait::async_trait;
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
- source::{
- activity::{Activity, ActivityInsertForm},
- instance::Instance,
- local_site::LocalSite,
- },
- traits::Crud,
+ source::{activity::ReceivedActivity, instance::Instance, local_site::LocalSite},
utils::{ActualDbPool, DbPool},
};
use lemmy_utils::error::{LemmyError, LemmyErrorType, LemmyResult};
use moka::future::Cache;
use once_cell::sync::Lazy;
-use serde::Serialize;
use std::{sync::Arc, time::Duration};
use url::Url;
Ok(())
}
-/// Store a sent or received activity in the database.
+/// Store received activities in the database.
///
-/// Stored activities are served over the HTTP endpoint `GET /activities/{type_}/{id}`. This also
-/// ensures that the same activity cannot be received more than once.
-#[tracing::instrument(skip(data, activity))]
-async fn insert_activity<T>(
+/// This ensures that the same activity doesnt get received and processed more than once, which
+/// would be a waste of resources.
+#[tracing::instrument(skip(data))]
+async fn insert_received_activity(
ap_id: &Url,
- activity: &T,
- local: bool,
- sensitive: bool,
data: &Data<LemmyContext>,
-) -> Result<(), LemmyError>
-where
- T: Serialize,
-{
- let ap_id = ap_id.clone().into();
- let form = ActivityInsertForm {
- ap_id,
- data: serde_json::to_value(activity)?,
- local: Some(local),
- sensitive: Some(sensitive),
- updated: None,
- };
- Activity::create(&mut data.pool(), &form).await?;
+) -> Result<(), LemmyError> {
+ ReceivedActivity::create(&mut data.pool(), &ap_id.clone().into()).await?;
Ok(())
}
use crate::{
+ diesel::OptionalExtension,
newtypes::DbUrl,
- schema::activity::dsl::{activity, ap_id},
- source::activity::{Activity, ActivityInsertForm, ActivityUpdateForm},
- traits::Crud,
+ source::activity::{ReceivedActivity, SentActivity, SentActivityForm},
utils::{get_conn, DbPool},
};
-use diesel::{dsl::insert_into, result::Error, ExpressionMethods, QueryDsl};
+use diesel::{
+ dsl::insert_into,
+ result::{DatabaseErrorKind, Error, Error::DatabaseError},
+ ExpressionMethods,
+ QueryDsl,
+};
use diesel_async::RunQueryDsl;
-#[async_trait]
-impl Crud for Activity {
- type InsertForm = ActivityInsertForm;
- type UpdateForm = ActivityUpdateForm;
- type IdType = i32;
- async fn read(pool: &mut DbPool<'_>, activity_id: i32) -> Result<Self, Error> {
- let conn = &mut get_conn(pool).await?;
- activity.find(activity_id).first::<Self>(conn).await
- }
-
- async fn create(pool: &mut DbPool<'_>, new_activity: &Self::InsertForm) -> Result<Self, Error> {
+impl SentActivity {
+ pub async fn create(pool: &mut DbPool<'_>, form: SentActivityForm) -> Result<Self, Error> {
+ use crate::schema::sent_activity::dsl::sent_activity;
let conn = &mut get_conn(pool).await?;
- insert_into(activity)
- .values(new_activity)
+ insert_into(sent_activity)
+ .values(form)
.get_result::<Self>(conn)
.await
}
- async fn update(
- pool: &mut DbPool<'_>,
- activity_id: i32,
- new_activity: &Self::UpdateForm,
- ) -> Result<Self, Error> {
+ pub async fn read_from_apub_id(pool: &mut DbPool<'_>, object_id: &DbUrl) -> Result<Self, Error> {
+ use crate::schema::sent_activity::dsl::{ap_id, sent_activity};
let conn = &mut get_conn(pool).await?;
- diesel::update(activity.find(activity_id))
- .set(new_activity)
- .get_result::<Self>(conn)
- .await
- }
- async fn delete(pool: &mut DbPool<'_>, activity_id: i32) -> Result<usize, Error> {
- let conn = &mut get_conn(pool).await?;
- diesel::delete(activity.find(activity_id))
- .execute(conn)
+ sent_activity
+ .filter(ap_id.eq(object_id))
+ .first::<Self>(conn)
.await
}
}
-impl Activity {
- pub async fn read_from_apub_id(
- pool: &mut DbPool<'_>,
- object_id: &DbUrl,
- ) -> Result<Activity, Error> {
+impl ReceivedActivity {
+ pub async fn create(pool: &mut DbPool<'_>, ap_id_: &DbUrl) -> Result<(), Error> {
+ use crate::schema::received_activity::dsl::{ap_id, id, received_activity};
let conn = &mut get_conn(pool).await?;
- activity
- .filter(ap_id.eq(object_id))
- .first::<Self>(conn)
+ let res = insert_into(received_activity)
+ .values(ap_id.eq(ap_id_))
+ .on_conflict_do_nothing()
+ .returning(id)
+ .get_result::<i64>(conn)
.await
+ .optional()?;
+ if res.is_some() {
+ // new activity inserted successfully
+ Ok(())
+ } else {
+ // duplicate activity
+ Err(DatabaseError(
+ DatabaseErrorKind::UniqueViolation,
+ Box::<String>::default(),
+ ))
+ }
}
}
#[cfg(test)]
mod tests {
use super::*;
- use crate::{
- newtypes::DbUrl,
- source::{
- activity::{Activity, ActivityInsertForm},
- instance::Instance,
- person::{Person, PersonInsertForm},
- },
- utils::build_db_pool_for_tests,
- };
- use serde_json::Value;
+ use crate::utils::build_db_pool_for_tests;
+ use serde_json::json;
use serial_test::serial;
use url::Url;
#[tokio::test]
#[serial]
- async fn test_crud() {
+ async fn receive_activity_duplicate() {
let pool = &build_db_pool_for_tests().await;
let pool = &mut pool.into();
+ let ap_id: DbUrl = Url::parse("http://example.com/activity/531")
+ .unwrap()
+ .into();
- let inserted_instance = Instance::read_or_create(pool, "my_domain.tld".to_string())
- .await
- .unwrap();
-
- let creator_form = PersonInsertForm::builder()
- .name("activity_creator_ pm".into())
- .public_key("pubkey".to_string())
- .instance_id(inserted_instance.id)
- .build();
+ // inserting activity for first time
+ let res = ReceivedActivity::create(pool, &ap_id).await;
+ assert!(res.is_ok());
- let inserted_creator = Person::create(pool, &creator_form).await.unwrap();
-
- let ap_id_: DbUrl = Url::parse(
- "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
- )
- .unwrap()
- .into();
- let test_json: Value = serde_json::from_str(
- r#"{
- "@context": "https://www.w3.org/ns/activitystreams",
- "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
- "type": "Delete",
- "actor": "https://enterprise.lemmy.ml/u/riker",
- "to": "https://www.w3.org/ns/activitystreams#Public",
- "cc": [
- "https://enterprise.lemmy.ml/c/main/"
- ],
- "object": "https://enterprise.lemmy.ml/post/32"
- }"#,
- )
- .unwrap();
- let activity_form = ActivityInsertForm {
- ap_id: ap_id_.clone(),
- data: test_json.clone(),
- local: Some(true),
- sensitive: Some(false),
- updated: None,
- };
+ let res = ReceivedActivity::create(pool, &ap_id).await;
+ assert!(res.is_err());
+ }
- let inserted_activity = Activity::create(pool, &activity_form).await.unwrap();
+ #[tokio::test]
+ #[serial]
+ async fn sent_activity_write_read() {
+ let pool = &build_db_pool_for_tests().await;
+ let pool = &mut pool.into();
+ let ap_id: DbUrl = Url::parse("http://example.com/activity/412")
+ .unwrap()
+ .into();
+ let data = json!({
+ "key1": "0xF9BA143B95FF6D82",
+ "key2": "42",
+ });
+ let sensitive = false;
- let expected_activity = Activity {
- ap_id: ap_id_.clone(),
- id: inserted_activity.id,
- data: test_json,
- local: true,
- sensitive: false,
- published: inserted_activity.published,
- updated: None,
+ let form = SentActivityForm {
+ ap_id: ap_id.clone(),
+ data: data.clone(),
+ sensitive,
};
- let read_activity = Activity::read(pool, inserted_activity.id).await.unwrap();
- let read_activity_by_apub_id = Activity::read_from_apub_id(pool, &ap_id_).await.unwrap();
- Person::delete(pool, inserted_creator.id).await.unwrap();
- Activity::delete(pool, inserted_activity.id).await.unwrap();
+ SentActivity::create(pool, form).await.unwrap();
- assert_eq!(expected_activity, read_activity);
- assert_eq!(expected_activity, read_activity_by_apub_id);
- assert_eq!(expected_activity, inserted_activity);
+ let res = SentActivity::read_from_apub_id(pool, &ap_id).await.unwrap();
+ assert_eq!(res.ap_id, ap_id);
+ assert_eq!(res.data, data);
+ assert_eq!(res.sensitive, sensitive);
}
}
pub struct SortTypeEnum;
}
-diesel::table! {
- activity (id) {
- id -> Int4,
- data -> Jsonb,
- local -> Bool,
- published -> Timestamp,
- updated -> Nullable<Timestamp>,
- ap_id -> Text,
- sensitive -> Bool,
- }
-}
-
diesel::table! {
admin_purge_comment (id) {
id -> Int4,
}
}
+diesel::table! {
+ received_activity (id) {
+ id -> Int8,
+ ap_id -> Text,
+ published -> Timestamp,
+ }
+}
+
diesel::table! {
registration_application (id) {
id -> Int4,
}
}
+diesel::table! {
+ sent_activity (id) {
+ id -> Int8,
+ ap_id -> Text,
+ data -> Json,
+ sensitive -> Bool,
+ published -> Timestamp,
+ }
+}
+
diesel::table! {
site (id) {
id -> Int4,
diesel::joinable!(tagline -> local_site (local_site_id));
diesel::allow_tables_to_appear_in_same_query!(
- activity,
admin_purge_comment,
admin_purge_community,
admin_purge_person,
post_saved,
private_message,
private_message_report,
+ received_activity,
registration_application,
secret,
+ sent_activity,
site,
site_aggregates,
site_language,
-use crate::{newtypes::DbUrl, schema::activity};
+use crate::{newtypes::DbUrl, schema::sent_activity};
use serde_json::Value;
use std::fmt::Debug;
-#[derive(PartialEq, Eq, Debug, Queryable, Identifiable)]
-#[diesel(table_name = activity)]
-pub struct Activity {
- pub id: i32,
- pub data: Value,
- pub local: bool,
- pub published: chrono::NaiveDateTime,
- pub updated: Option<chrono::NaiveDateTime>,
+#[derive(PartialEq, Eq, Debug, Queryable)]
+#[diesel(table_name = sent_activity)]
+pub struct SentActivity {
+ pub id: i64,
pub ap_id: DbUrl,
+ pub data: Value,
pub sensitive: bool,
+ pub published: chrono::NaiveDateTime,
}
-
#[derive(Insertable)]
-#[diesel(table_name = activity)]
-pub struct ActivityInsertForm {
- pub data: Value,
- pub local: Option<bool>,
- pub updated: Option<chrono::NaiveDateTime>,
+#[diesel(table_name = sent_activity)]
+pub struct SentActivityForm {
pub ap_id: DbUrl,
- pub sensitive: Option<bool>,
+ pub data: Value,
+ pub sensitive: bool,
}
-#[derive(AsChangeset)]
-#[diesel(table_name = activity)]
-pub struct ActivityUpdateForm {
- pub data: Option<Value>,
- pub local: Option<bool>,
- pub updated: Option<Option<chrono::NaiveDateTime>>,
- pub sensitive: Option<bool>,
+#[derive(PartialEq, Eq, Debug, Queryable)]
+#[diesel(table_name = received_activity)]
+pub struct ReceivedActivity {
+ pub id: i64,
+ pub ap_id: DbUrl,
+ pub published: chrono::NaiveDateTime,
}
--- /dev/null
+create table activity (
+ id serial primary key,
+ data jsonb not null,
+ local boolean not null default true,
+ published timestamp not null default now(),
+ updated timestamp,
+ ap_id text not null,
+ sensitive boolean not null default true
+);
+
+insert into activity(ap_id, data, sensitive, published)
+ select ap_id, data, sensitive, published
+ from sent_activity
+ order by id desc
+ limit 100000;
+
+-- We cant copy received_activity entries back into activities table because we dont have data
+-- which is mandatory.
+
+drop table sent_activity;
+drop table received_activity;
\ No newline at end of file
--- /dev/null
+-- outgoing activities, need to be stored to be later server over http
+-- we change data column from jsonb to json for decreased size
+-- https://stackoverflow.com/a/22910602
+create table sent_activity (
+ id bigserial primary key,
+ ap_id text unique not null,
+ data json not null,
+ sensitive boolean not null,
+ published timestamp not null default now()
+);
+
+-- incoming activities, we only need the id to avoid processing the same activity multiple times
+create table received_activity (
+ id bigserial primary key,
+ ap_id text unique not null,
+ published timestamp not null default now()
+);
+
+-- copy sent activities to new table. only copy last 100k for faster migration
+insert into sent_activity(ap_id, data, sensitive, published)
+ select ap_id, data, sensitive, published
+ from activity
+ where local = true
+ order by id desc
+ limit 100000;
+
+-- copy received activities to new table. only last 1m for faster migration
+insert into received_activity(ap_id, published)
+ select ap_id, published
+ from activity
+ where local = false
+ order by id desc
+ limit 1000000;
+
+drop table activity;
use diesel::{sql_query, PgConnection, RunQueryDsl};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
- schema::{activity, captcha_answer, comment, community_person_ban, instance, person, post},
+ schema::{
+ captcha_answer,
+ comment,
+ community_person_ban,
+ instance,
+ person,
+ post,
+ received_activity,
+ sent_activity,
+ },
source::instance::{Instance, InstanceForm},
utils::{naive_now, DELETED_REPLACEMENT_TEXT},
};
/// Clear old activities (this table gets very large)
fn clear_old_activities(conn: &mut PgConnection) {
info!("Clearing old activities...");
- match diesel::delete(activity::table.filter(activity::published.lt(now - 6.months())))
+ diesel::delete(sent_activity::table.filter(sent_activity::published.lt(now - 3.months())))
.execute(conn)
- {
- Ok(_) => {
- info!("Done.");
- }
- Err(e) => {
- error!("Failed to clear old activities: {}", e)
- }
- }
+ .map_err(|e| error!("Failed to clear old sent activities: {}", e))
+ .ok();
+
+ diesel::delete(
+ received_activity::table.filter(received_activity::published.lt(now - 3.months())),
+ )
+ .execute(conn)
+ .map_err(|e| error!("Failed to clear old received activities: {}", e))
+ .ok();
}
/// overwrite posts and comments 30d after deletion