+++ /dev/null
-use crate::{activity_queue::send_activity, community::do_announce, insert_activity};
-use activitystreams::{
- base::{Extends, ExtendsExt},
- object::AsObject,
-};
-use lemmy_db::{community::Community, user::User_};
-use lemmy_utils::{settings::Settings, LemmyError};
-use lemmy_websocket::LemmyContext;
-use serde::{export::fmt::Debug, Serialize};
-use url::{ParseError, Url};
-use uuid::Uuid;
-
-pub async fn send_activity_to_community<T, Kind>(
- creator: &User_,
- community: &Community,
- to: Vec<Url>,
- activity: T,
- context: &LemmyContext,
-) -> Result<(), LemmyError>
-where
- T: AsObject<Kind> + Extends<Kind> + Serialize + Debug + Send + Clone + 'static,
- Kind: Serialize,
- <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
-{
- // TODO: looks like call this sometimes with activity, and sometimes with any_base
- insert_activity(creator.id, activity.clone(), true, context.pool()).await?;
-
- // if this is a local community, we need to do an announce from the community instead
- if community.local {
- do_announce(activity.into_any_base()?, &community, creator, context).await?;
- } else {
- send_activity(context.activity_queue(), activity, creator, to)?;
- }
-
- Ok(())
-}
-
-pub(in crate) fn generate_activity_id<T>(kind: T) -> Result<Url, ParseError>
-where
- T: ToString,
-{
- let id = format!(
- "{}/activities/{}/{}",
- Settings::get().get_protocol_and_hostname(),
- kind.to_string().to_lowercase(),
- Uuid::new_v4()
- );
- Url::parse(&id)
-}
-use crate::{check_is_apub_id_valid, extensions::signatures::sign, ActorType};
+use crate::{
+ check_is_apub_id_valid,
+ community::do_announce,
+ extensions::signatures::sign,
+ insert_activity,
+ ActorType,
+};
use activitystreams::{
base::{Extends, ExtendsExt},
object::AsObject,
QueueHandle,
WorkerConfig,
};
+use itertools::Itertools;
+use lemmy_db::{community::Community, user::User_, DbPool};
use lemmy_utils::{location_info, settings::Settings, LemmyError};
+use lemmy_websocket::LemmyContext;
use log::warn;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, future::Future, pin::Pin};
use url::Url;
-pub fn send_activity<T, Kind>(
+pub async fn send_activity_single_dest<T, Kind>(
+ activity: T,
+ creator: &dyn ActorType,
+ to: Url,
+ context: &LemmyContext,
+) -> Result<(), LemmyError>
+where
+ T: AsObject<Kind> + Extends<Kind>,
+ Kind: Serialize,
+ <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+ if check_is_apub_id_valid(&to).is_ok() {
+ send_activity_internal(
+ context.activity_queue(),
+ activity,
+ creator,
+ vec![to],
+ context.pool(),
+ )
+ .await?;
+ }
+
+ Ok(())
+}
+
+pub async fn send_to_community_followers<T, Kind>(
+ activity: T,
+ community: &Community,
+ context: &LemmyContext,
+ sender_shared_inbox: Option<Url>,
+) -> Result<(), LemmyError>
+where
+ T: AsObject<Kind> + Extends<Kind>,
+ Kind: Serialize,
+ <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+ // dont send to the local instance, nor to the instance where the activity originally came from,
+ // because that would result in a database error (same data inserted twice)
+ let community_shared_inbox = community.get_shared_inbox_url()?;
+ let to: Vec<Url> = community
+ .get_follower_inboxes(context.pool())
+ .await?
+ .iter()
+ .filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
+ .filter(|inbox| inbox != &&community_shared_inbox)
+ .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
+ .unique()
+ .map(|inbox| inbox.to_owned())
+ .collect();
+
+ send_activity_internal(
+ context.activity_queue(),
+ activity,
+ community,
+ to,
+ context.pool(),
+ )
+ .await?;
+
+ Ok(())
+}
+
+pub async fn send_to_community<T, Kind>(
+ creator: &User_,
+ community: &Community,
+ activity: T,
+ context: &LemmyContext,
+) -> Result<(), LemmyError>
+where
+ T: AsObject<Kind> + Extends<Kind>,
+ Kind: Serialize,
+ <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+ // if this is a local community, we need to do an announce from the community instead
+ if community.local {
+ do_announce(activity.into_any_base()?, &community, creator, context).await?;
+ } else {
+ let inbox = community.get_shared_inbox_url()?;
+ check_is_apub_id_valid(&inbox)?;
+ send_activity_internal(
+ context.activity_queue(),
+ activity,
+ creator,
+ vec![inbox],
+ context.pool(),
+ )
+ .await?;
+ }
+
+ Ok(())
+}
+
+pub async fn send_comment_mentions<T, Kind>(
+ creator: &User_,
+ mentions: Vec<Url>,
+ activity: T,
+ context: &LemmyContext,
+) -> Result<(), LemmyError>
+where
+ T: AsObject<Kind> + Extends<Kind>,
+ Kind: Serialize,
+ <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
+{
+ let mentions = mentions
+ .iter()
+ .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
+ .map(|i| i.to_owned())
+ .collect();
+ send_activity_internal(
+ context.activity_queue(),
+ activity,
+ creator,
+ mentions,
+ context.pool(),
+ )
+ .await?;
+ Ok(())
+}
+
+/// Asynchronously sends the given `activity` from `actor` to every inbox URL in `to`.
+///
+/// The caller of this function needs to remove any blocked domains from `to`,
+/// using `check_is_apub_id_valid()`.
+async fn send_activity_internal<T, Kind>(
activity_sender: &QueueHandle,
activity: T,
actor: &dyn ActorType,
to: Vec<Url>,
+ pool: &DbPool,
) -> Result<(), LemmyError>
where
- T: AsObject<Kind>,
- T: Extends<Kind>,
+ T: AsObject<Kind> + Extends<Kind>,
Kind: Serialize,
<T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
{
return Ok(());
}
- let activity = activity.into_any_base()?;
- let serialised_activity = serde_json::to_string(&activity)?;
-
for to_url in &to {
- check_is_apub_id_valid(&to_url)?;
+ assert!(check_is_apub_id_valid(&to_url).is_ok());
}
+ let activity = activity.into_any_base()?;
+ let serialised_activity = serde_json::to_string(&activity)?;
+ insert_activity(actor.user_id(), serialised_activity.clone(), true, pool).await?;
+
// TODO: it would make sense to create a separate task for each destination server
let message = SendActivityTask {
activity: serialised_activity,
use crate::{
- activities::{generate_activity_id, send_activity_to_community},
+ activity_queue::{send_comment_mentions, send_to_community},
check_actor_domain,
create_apub_response,
create_apub_tombstone_response,
get_or_fetch_and_insert_post,
get_or_fetch_and_upsert_user,
},
+ generate_activity_id,
ActorType,
ApubLikeableType,
ApubObjectType,
// Set the mention tags
.set_many_tags(maa.get_tags()?);
- send_activity_to_community(&creator, &community, maa.inboxes, create, context).await?;
+ send_to_community(&creator, &community, create.clone(), context).await?;
+ send_comment_mentions(&creator, maa.inboxes, create, context).await?;
Ok(())
}
// Set the mention tags
.set_many_tags(maa.get_tags()?);
- send_activity_to_community(&creator, &community, maa.inboxes, update, context).await?;
+ send_to_community(&creator, &community, update.clone(), context).await?;
+ send_comment_mentions(&creator, maa.inboxes, update, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- delete,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, delete, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- undo,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, undo, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &mod_,
- &community,
- vec![community.get_shared_inbox_url()?],
- remove,
- context,
- )
- .await?;
+ send_to_community(&mod_, &community, remove, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &mod_,
- &community,
- vec![community.get_shared_inbox_url()?],
- undo,
- context,
- )
- .await?;
+ send_to_community(&mod_, &community, undo, context).await?;
Ok(())
}
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- like,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, like, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- dislike,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, dislike, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- undo,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, undo, context).await?;
Ok(())
}
}
use crate::{
- activities::generate_activity_id,
- activity_queue::send_activity,
+ activity_queue::{send_activity_single_dest, send_to_community_followers},
check_actor_domain,
+ check_is_apub_id_valid,
create_apub_response,
create_apub_tombstone_response,
create_tombstone,
extensions::group_extensions::GroupExtension,
fetcher::{get_or_fetch_and_upsert_actor, get_or_fetch_and_upsert_user},
- insert_activity,
+ generate_activity_id,
ActorType,
FromApub,
GroupExt,
.set_id(generate_activity_id(AcceptType::Accept)?)
.set_to(to.clone());
- insert_activity(self.creator_id, accept.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), accept, self, vec![to])?;
+ send_activity_single_dest(accept, self, to, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- insert_activity(self.creator_id, delete.clone(), true, context.pool()).await?;
-
- let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
- // Note: For an accept, since it was automatic, no one pushed a button,
- // the community was the actor.
- // But for delete, the creator is the actor, and does the signing
- send_activity(context.activity_queue(), delete, creator, inboxes)?;
+ send_to_community_followers(delete, self, context, None).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- insert_activity(self.creator_id, undo.clone(), true, context.pool()).await?;
-
- let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
- // Note: For an accept, since it was automatic, no one pushed a button,
- // the community was the actor.
- // But for delete, the creator is the actor, and does the signing
- send_activity(context.activity_queue(), undo, creator, inboxes)?;
+ send_to_community_followers(undo, self, context, None).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- insert_activity(mod_.id, remove.clone(), true, context.pool()).await?;
-
- let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
- // Note: For an accept, since it was automatic, no one pushed a button,
- // the community was the actor.
- // But for delete, the creator is the actor, and does the signing
- send_activity(context.activity_queue(), remove, mod_, inboxes)?;
+ send_to_community_followers(remove, self, context, None).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![self.get_followers_url()?]);
- insert_activity(mod_.id, undo.clone(), true, context.pool()).await?;
-
- let inboxes = self.get_follower_inboxes(context.pool()).await?;
-
- // Note: For an accept, since it was automatic, no one pushed a button,
- // the community was the actor.
- // But for remove , the creator is the actor, and does the signing
- send_activity(context.activity_queue(), undo, mod_, inboxes)?;
+ send_to_community_followers(undo, self, context, None).await?;
Ok(())
}
))?)
})
.filter_map(Result::ok)
+ // Don't send to blocked instances
+ .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
.unique()
.collect();
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- insert_activity(community.creator_id, announce.clone(), true, context.pool()).await?;
-
- let mut to: Vec<Url> = community.get_follower_inboxes(context.pool()).await?;
-
- // dont send to the local instance, nor to the instance where the activity originally came from,
- // because that would result in a database error (same data inserted twice)
- // this seems to be the "easiest" stable alternative for remove_item()
- let sender_shared_inbox = sender.get_shared_inbox_url()?;
- to.retain(|x| x != &sender_shared_inbox);
- let community_shared_inbox = community.get_shared_inbox_url()?;
- to.retain(|x| x != &community_shared_inbox);
-
- send_activity(context.activity_queue(), announce, community, to)?;
+ send_to_community_followers(
+ announce,
+ community,
+ context,
+ Some(sender.get_shared_inbox_url()?),
+ )
+ .await?;
Ok(())
}
#[macro_use]
extern crate lazy_static;
-pub mod activities;
pub mod activity_queue;
pub mod comment;
pub mod community;
use reqwest::Client;
use serde::Serialize;
use url::{ParseError, Url};
+use uuid::Uuid;
type GroupExt = Ext2<ApActor<Group>, GroupExtension, PublicKeyExtension>;
type PersonExt = Ext1<ApActor<Person>, PublicKeyExtension>;
.await??;
Ok(())
}
+
+pub(in crate) fn generate_activity_id<T>(kind: T) -> Result<Url, ParseError>
+where
+ T: ToString,
+{
+ let id = format!(
+ "{}/activities/{}/{}",
+ Settings::get().get_protocol_and_hostname(),
+ kind.to_string().to_lowercase(),
+ Uuid::new_v4()
+ );
+ Url::parse(&id)
+}
use crate::{
- activities::{generate_activity_id, send_activity_to_community},
+ activity_queue::send_to_community,
check_actor_domain,
create_apub_response,
create_apub_tombstone_response,
create_tombstone,
extensions::page_extension::PageExtension,
fetcher::{get_or_fetch_and_upsert_community, get_or_fetch_and_upsert_user},
+ generate_activity_id,
ActorType,
ApubLikeableType,
ApubObjectType,
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- create,
- context,
- )
- .await?;
+ send_to_community(creator, &community, create, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- update,
- context,
- )
- .await?;
+ send_to_community(creator, &community, update, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- delete,
- context,
- )
- .await?;
+ send_to_community(creator, &community, delete, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- undo,
- context,
- )
- .await?;
+ send_to_community(creator, &community, undo, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- mod_,
- &community,
- vec![community.get_shared_inbox_url()?],
- remove,
- context,
- )
- .await?;
+ send_to_community(mod_, &community, remove, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- mod_,
- &community,
- vec![community.get_shared_inbox_url()?],
- undo,
- context,
- )
- .await?;
+ send_to_community(mod_, &community, undo, context).await?;
Ok(())
}
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- like,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, like, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- dislike,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, dislike, context).await?;
Ok(())
}
.set_to(public())
.set_many_ccs(vec![community.get_followers_url()?]);
- send_activity_to_community(
- &creator,
- &community,
- vec![community.get_shared_inbox_url()?],
- undo,
- context,
- )
- .await?;
+ send_to_community(&creator, &community, undo, context).await?;
Ok(())
}
}
use crate::{
- activities::generate_activity_id,
- activity_queue::send_activity,
+ activity_queue::send_activity_single_dest,
check_actor_domain,
check_is_apub_id_valid,
create_tombstone,
fetcher::get_or_fetch_and_upsert_user,
- insert_activity,
+ generate_activity_id,
ActorType,
ApubObjectType,
FromApub,
.set_id(generate_activity_id(CreateType::Create)?)
.set_to(to.clone());
- insert_activity(creator.id, create.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), create, creator, vec![to])?;
+ send_activity_single_dest(create, creator, to, context).await?;
Ok(())
}
.set_id(generate_activity_id(UpdateType::Update)?)
.set_to(to.clone());
- insert_activity(creator.id, update.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), update, creator, vec![to])?;
+ send_activity_single_dest(update, creator, to, context).await?;
Ok(())
}
.set_id(generate_activity_id(DeleteType::Delete)?)
.set_to(to.clone());
- insert_activity(creator.id, delete.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), delete, creator, vec![to])?;
+ send_activity_single_dest(delete, creator, to, context).await?;
Ok(())
}
.set_id(generate_activity_id(UndoType::Undo)?)
.set_to(to.clone());
- insert_activity(creator.id, undo.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), undo, creator, vec![to])?;
+ send_activity_single_dest(undo, creator, to, context).await?;
Ok(())
}
use crate::{
- activities::generate_activity_id,
- activity_queue::send_activity,
+ activity_queue::send_activity_single_dest,
check_actor_domain,
create_apub_response,
fetcher::get_or_fetch_and_upsert_actor,
- insert_activity,
+ generate_activity_id,
ActorType,
FromApub,
PersonExt,
let follow_actor = get_or_fetch_and_upsert_actor(follow_actor_id, context).await?;
let to = follow_actor.get_inbox_url()?;
- insert_activity(self.id, follow.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), follow, self, vec![to])?;
+ send_activity_single_dest(follow, self, to, context).await?;
Ok(())
}
.set_context(activitystreams::context())
.set_id(generate_activity_id(UndoType::Undo)?);
- insert_activity(self.id, undo.clone(), true, context.pool()).await?;
-
- send_activity(context.activity_queue(), undo, self, vec![to])?;
+ send_activity_single_dest(undo, self, to, context).await?;
Ok(())
}