websocket_id: None,
});
- announce_if_community_is_local(create, &user, context, request_counter).await?;
+ announce_if_community_is_local(create, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(update, &user, context, request_counter).await?;
+ announce_if_community_is_local(update, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(like, &user, context, request_counter).await?;
+ announce_if_community_is_local(like, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(dislike, &user, context, request_counter).await?;
+ announce_if_community_is_local(dislike, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let user = get_actor_as_user(&delete, context, request_counter).await?;
- announce_if_community_is_local(delete, &user, context, request_counter).await?;
+ announce_if_community_is_local(delete, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let user = get_actor_as_user(&undo, context, request_counter).await?;
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
- announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
-use crate::activities::receive::{announce_if_community_is_local, get_actor_as_user};
+use crate::activities::receive::announce_if_community_is_local;
use activitystreams::activity::{Delete, Remove, Undo};
use actix_web::HttpResponse;
use lemmy_db::{community::Community, community_view::CommunityView};
websocket_id: None,
});
- let user = get_actor_as_user(&delete, context, request_counter).await?;
- announce_if_community_is_local(delete, &user, context, request_counter).await?;
+ announce_if_community_is_local(delete, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let user = get_actor_as_user(&undo, context, request_counter).await?;
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
- announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
/// community, the activity is announced to all community followers.
async fn announce_if_community_is_local<T, Kind>(
activity: T,
- user: &User_,
context: &LemmyContext,
request_counter: &mut i32,
) -> Result<(), LemmyError>
if community.local {
community
- .send_announce(activity.into_any_base()?, &user, context)
+ .send_announce(activity.into_any_base()?, context)
.await?;
}
Ok(())
websocket_id: None,
});
- announce_if_community_is_local(create, &user, context, request_counter).await?;
+ announce_if_community_is_local(create, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(update, &user, context, request_counter).await?;
+ announce_if_community_is_local(update, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(like, &user, context, request_counter).await?;
+ announce_if_community_is_local(like, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(dislike, &user, context, request_counter).await?;
+ announce_if_community_is_local(dislike, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let user = get_actor_as_user(&delete, context, request_counter).await?;
- announce_if_community_is_local(delete, &user, context, request_counter).await?;
+ announce_if_community_is_local(delete, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let user = get_actor_as_user(&undo, context, request_counter).await?;
- announce_if_community_is_local(undo, &user, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
websocket_id: None,
});
- let mod_ = get_actor_as_user(&undo, context, request_counter).await?;
- announce_if_community_is_local(undo, &mod_, context, request_counter).await?;
+ announce_if_community_is_local(undo, context, request_counter).await?;
Ok(HttpResponse::Ok().finish())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- send_to_community_followers(delete, self, context, None).await?;
+ send_to_community_followers(delete, self, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- send_to_community_followers(undo, self, context, None).await?;
+ send_to_community_followers(undo, self, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- send_to_community_followers(remove, self, context, None).await?;
+ send_to_community_followers(remove, self, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- send_to_community_followers(undo, self, context, None).await?;
+ send_to_community_followers(undo, self, context).await?;
Ok(())
}
async fn send_announce(
&self,
activity: AnyBase,
- sender: &User_,
context: &LemmyContext,
) -> Result<(), LemmyError> {
let mut announce = Announce::new(self.actor_id.to_owned(), activity);
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- send_to_community_followers(
- announce,
- self,
- context,
- Some(sender.get_shared_inbox_url()?),
- )
- .await?;
+ send_to_community_followers(announce, self, context).await?;
Ok(())
}
async fn send_announce(
&self,
_activity: AnyBase,
- _sender: &User_,
_context: &LemmyContext,
) -> Result<(), LemmyError> {
unimplemented!()
activity: T,
community: &Community,
context: &LemmyContext,
- sender_shared_inbox: Option<Url>,
) -> Result<(), LemmyError>
where
T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
- // dont send to the local instance, nor to the instance where the activity originally came from,
- // because that would result in a database error (same data inserted twice)
- let community_shared_inbox = community.get_shared_inbox_url()?;
let follower_inboxes: Vec<Url> = community
.get_follower_inboxes(context.pool())
.await?
.iter()
- .filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
- .filter(|inbox| inbox != &&community_shared_inbox)
- .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.unique()
+ .filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname))
+ .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.map(|inbox| inbox.to_owned())
.collect();
debug!(
// if this is a local community, we need to do an announce from the community instead
if community.local {
community
- .send_announce(activity.into_any_base()?, creator, context)
+ .send_announce(activity.into_any_base()?, context)
.await?;
} else {
let inbox = community.get_shared_inbox_url()?;
// This is necessary because send_comment and send_comment_mentions
// might send the same ap_id
if insert_into_db {
- insert_activity(actor.user_id(), activity.clone(), true, pool).await?;
+ let id = activity.id().context(location_info!())?;
+ insert_activity(id, actor.user_id(), activity.clone(), true, pool).await?;
}
for i in inboxes {
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::get_or_fetch_and_upsert_user,
+ inbox::{get_activity_id, is_activity_already_known},
insert_activity,
ActorType,
};
verify_signature(&request, &user)?;
+ let activity_id = get_activity_id(&activity, user_uri)?;
+ if is_activity_already_known(context.pool(), &activity_id).await? {
+ return Ok(HttpResponse::Ok().finish());
+ }
+
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
let user_id = user.id;
ValidTypes::Undo => handle_undo_follow(any_base, user, community, &context).await,
};
- insert_activity(user_id, activity.clone(), false, context.pool()).await?;
+ insert_activity(
+ &activity_id,
+ user_id,
+ activity.clone(),
+ false,
+ context.pool(),
+ )
+ .await?;
res
}
+use activitystreams::base::{BaseExt, Extends};
+use anyhow::Context;
+use lemmy_db::{activity::Activity, DbPool};
+use lemmy_structs::blocking;
+use lemmy_utils::{location_info, LemmyError};
+use serde::{export::fmt::Debug, Serialize};
+use url::Url;
+
pub mod community_inbox;
pub mod shared_inbox;
pub mod user_inbox;
+
+pub(crate) fn get_activity_id<T, Kind>(activity: &T, creator_uri: &Url) -> Result<Url, LemmyError>
+where
+ T: BaseExt<Kind> + Extends<Kind> + Debug,
+ Kind: Serialize,
+ <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+ let creator_domain = creator_uri.host_str().context(location_info!())?;
+ let activity_id = activity.id(creator_domain)?;
+ Ok(activity_id.context(location_info!())?.to_owned())
+}
+
+pub(crate) async fn is_activity_already_known(
+ pool: &DbPool,
+ activity_id: &Url,
+) -> Result<bool, LemmyError> {
+ let activity_id = activity_id.to_string();
+ let existing = blocking(pool, move |conn| {
+ Activity::read_from_apub_id(&conn, &activity_id)
+ })
+ .await?;
+ match existing {
+ Ok(_) => Ok(true),
+ Err(_) => Ok(false),
+ }
+}
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::get_or_fetch_and_upsert_actor,
+ inbox::{get_activity_id, is_activity_already_known},
insert_activity,
ActorType,
};
let actor = get_or_fetch_and_upsert_actor(&actor_id, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?;
+ let activity_id = get_activity_id(&activity, &actor_id)?;
+ if is_activity_already_known(context.pool(), &activity_id).await? {
+ return Ok(HttpResponse::Ok().finish());
+ }
+
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
let res = match kind {
ValidTypes::Undo => receive_undo(&context, any_base, actor_id, request_counter).await,
};
- insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
+ insert_activity(
+ &activity_id,
+ actor.user_id(),
+ activity.clone(),
+ false,
+ context.pool(),
+ )
+ .await?;
res
}
let inner_id = object.id().context(location_info!())?.to_owned();
check_is_apub_id_valid(&inner_id)?;
+ if is_activity_already_known(context.pool(), &inner_id).await? {
+ return Ok(HttpResponse::Ok().finish());
+ }
match kind {
Some("Create") => receive_create(context, object, inner_id, request_counter).await,
check_is_apub_id_valid,
extensions::signatures::verify_signature,
fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_community},
+ inbox::{get_activity_id, is_activity_already_known},
insert_activity,
ActorType,
FromApub,
let actor = get_or_fetch_and_upsert_actor(actor_uri, &context, request_counter).await?;
verify_signature(&request, actor.as_ref())?;
+ let activity_id = get_activity_id(&activity, actor_uri)?;
+ if is_activity_already_known(context.pool(), &activity_id).await? {
+ return Ok(HttpResponse::Ok().finish());
+ }
+
let any_base = activity.clone().into_any_base()?;
let kind = activity.kind().context(location_info!())?;
let res = match kind {
}
};
- insert_activity(actor.user_id(), activity.clone(), false, context.pool()).await?;
+ insert_activity(
+ &activity_id,
+ actor.user_id(),
+ activity.clone(),
+ false,
+ context.pool(),
+ )
+ .await?;
res
}
};
use activitystreams_ext::{Ext1, Ext2};
use anyhow::{anyhow, Context};
-use lemmy_db::{activity::do_insert_activity, user::User_, DbPool};
+use lemmy_db::{activity::Activity, user::User_, DbPool};
use lemmy_structs::blocking;
use lemmy_utils::{location_info, settings::Settings, LemmyError};
use lemmy_websocket::LemmyContext;
async fn send_announce(
&self,
activity: AnyBase,
- sender: &User_,
context: &LemmyContext,
) -> Result<(), LemmyError>;
/// Store a sent or received activity in the database, for logging purposes. These records are not
/// persistent.
pub async fn insert_activity<T>(
+ ap_id: &Url,
user_id: i32,
- data: T,
+ activity: T,
local: bool,
pool: &DbPool,
) -> Result<(), LemmyError>
where
T: Serialize + std::fmt::Debug + Send + 'static,
{
+ let ap_id = ap_id.to_string();
blocking(pool, move |conn| {
- do_insert_activity(conn, user_id, &data, local)
+ Activity::insert(conn, ap_id, user_id, &activity, local)
})
.await??;
Ok(())
#[table_name = "activity"]
pub struct Activity {
pub id: i32,
+ pub ap_id: String,
pub user_id: i32,
pub data: Value,
pub local: bool,
#[derive(Insertable, AsChangeset)]
#[table_name = "activity"]
pub struct ActivityForm {
+ pub ap_id: String,
pub user_id: i32,
pub data: Value,
pub local: bool,
}
}
-pub fn do_insert_activity<T>(
- conn: &PgConnection,
- user_id: i32,
- data: &T,
- local: bool,
-) -> Result<Activity, IoError>
-where
- T: Serialize + Debug,
-{
- debug!("inserting activity for user {}: ", user_id);
- debug!("{}", serde_json::to_string_pretty(&data)?);
- let activity_form = ActivityForm {
- user_id,
- data: serde_json::to_value(&data)?,
- local,
- updated: None,
- };
- let result = Activity::create(&conn, &activity_form);
- match result {
- Ok(s) => Ok(s),
- Err(e) => Err(IoError::new(
- ErrorKind::Other,
- format!("Failed to insert activity into database: {}", e),
- )),
+impl Activity {
+ pub fn insert<T>(
+ conn: &PgConnection,
+ ap_id: String,
+ user_id: i32,
+ data: &T,
+ local: bool,
+ ) -> Result<Self, IoError>
+ where
+ T: Serialize + Debug,
+ {
+ debug!("inserting activity for user {}: ", user_id);
+ debug!("{}", serde_json::to_string_pretty(&data)?);
+ let activity_form = ActivityForm {
+ ap_id,
+ user_id,
+ data: serde_json::to_value(&data)?,
+ local,
+ updated: None,
+ };
+ let result = Activity::create(&conn, &activity_form);
+ match result {
+ Ok(s) => Ok(s),
+ Err(e) => Err(IoError::new(
+ ErrorKind::Other,
+ format!("Failed to insert activity into database: {}", e),
+ )),
+ }
+ }
+
+ pub fn read_from_apub_id(conn: &PgConnection, object_id: &str) -> Result<Self, Error> {
+ use crate::schema::activity::dsl::*;
+ activity.filter(ap_id.eq(object_id)).first::<Self>(conn)
}
}
let inserted_creator = User_::create(&conn, &creator_form).unwrap();
+ let ap_id =
+ "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c";
let test_json: Value = serde_json::from_str(
r#"{
- "street": "Article Circle Expressway 1",
- "city": "North Pole",
- "postcode": "99705",
- "state": "Alaska"
-}"#,
+ "@context": "https://www.w3.org/ns/activitystreams",
+ "id": "https://enterprise.lemmy.ml/activities/delete/f1b5d57c-80f8-4e03-a615-688d552e946c",
+ "type": "Delete",
+ "actor": "https://enterprise.lemmy.ml/u/riker",
+ "to": "https://www.w3.org/ns/activitystreams#Public",
+ "cc": [
+ "https://enterprise.lemmy.ml/c/main/"
+ ],
+ "object": "https://enterprise.lemmy.ml/post/32"
+ }"#,
)
.unwrap();
let activity_form = ActivityForm {
+ ap_id: ap_id.to_string(),
user_id: inserted_creator.id,
data: test_json.to_owned(),
local: true,
let inserted_activity = Activity::create(&conn, &activity_form).unwrap();
let expected_activity = Activity {
+ ap_id: ap_id.to_string(),
id: inserted_activity.id,
user_id: inserted_creator.id,
data: test_json,
};
let read_activity = Activity::read(&conn, inserted_activity.id).unwrap();
+ let read_activity_by_apub_id = Activity::read_from_apub_id(&conn, ap_id).unwrap();
User_::delete(&conn, inserted_creator.id).unwrap();
assert_eq!(expected_activity, read_activity);
+ assert_eq!(expected_activity, read_activity_by_apub_id);
assert_eq!(expected_activity, inserted_activity);
}
}
table! {
activity (id) {
id -> Int4,
+ ap_id -> Text,
user_id -> Int4,
data -> Jsonb,
local -> Bool,