2 activities::community::announce::{AnnouncableActivities, AnnounceActivity},
3 extensions::signatures::sign_and_send,
6 APUB_JSON_CONTENT_TYPE,
8 use anyhow::{anyhow, Context, Error};
11 memory_storage::Storage,
18 use lemmy_db_schema::source::community::Community;
19 use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
20 use lemmy_websocket::LemmyContext;
21 use log::{info, warn};
23 use serde::{Deserialize, Serialize};
24 use std::{collections::BTreeMap, env, fmt::Debug, future::Future, pin::Pin};
27 pub(crate) async fn send_to_community_new(
28 activity: AnnouncableActivities,
30 actor: &dyn ActorType,
31 community: &Community,
32 additional_inboxes: Vec<Url>,
33 context: &LemmyContext,
34 ) -> Result<(), LemmyError> {
35 // if this is a local community, we need to do an announce from the community instead
37 insert_activity(activity_id, activity.clone(), true, false, context.pool()).await?;
38 AnnounceActivity::send(activity, community, additional_inboxes, context).await?;
40 let mut inboxes = additional_inboxes;
41 inboxes.push(community.get_shared_inbox_or_inbox_url());
42 send_activity_new(context, &activity, activity_id, actor, inboxes, false).await?;
48 pub(crate) async fn send_activity_new<T>(
49 context: &LemmyContext,
52 actor: &dyn ActorType,
55 ) -> Result<(), LemmyError>
59 if !Settings::get().federation.enabled || inboxes.is_empty() {
63 info!("Sending activity {}", activity_id.to_string());
65 // Don't send anything to ourselves
66 // TODO: this should be a debug assert
67 let hostname = Settings::get().get_hostname_without_port()?;
68 let inboxes: Vec<&Url> = inboxes
70 .filter(|i| i.domain().expect("valid inbox url") != hostname)
73 let serialised_activity = serde_json::to_string(&activity)?;
77 serialised_activity.clone(),
85 let message = SendActivityTask {
86 activity: serialised_activity.to_owned(),
88 actor_id: actor.actor_id(),
89 private_key: actor.private_key().context(location_info!())?,
91 if env::var("LEMMY_TEST_SEND_SYNC").is_ok() {
92 do_send(message, &Client::default()).await?;
94 context.activity_queue.queue::<SendActivityTask>(message)?;
101 #[derive(Clone, Debug, Deserialize, Serialize)]
102 struct SendActivityTask {
109 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
110 /// if the delivery failed.
111 impl ActixJob for SendActivityTask {
112 type State = MyState;
113 type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
114 const NAME: &'static str = "SendActivityTask";
116 const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
117 const BACKOFF: Backoff = Backoff::Exponential(2);
119 fn run(self, state: Self::State) -> Self::Future {
120 Box::pin(async move { do_send(self, &state.client).await })
124 async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
125 let mut headers = BTreeMap::<String, String>::new();
126 headers.insert("Content-Type".into(), APUB_JSON_CONTENT_TYPE.to_string());
127 let result = sign_and_send(
131 task.activity.clone(),
133 task.private_key.to_owned(),
137 if let Err(e) = result {
140 "Failed to send activity {} to {}",
148 pub fn create_activity_queue() -> QueueHandle {
149 // Start the application server. This guards access to to the jobs store
150 let queue_handle = create_server(Storage::new());
152 // Configure and start our workers
153 WorkerConfig::new(|| MyState {
154 client: Client::default(),
156 .register::<SendActivityTask>()
157 .start(queue_handle.clone());