1 use crate::apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType};
3 base::{Extends, ExtendsExt},
6 use anyhow::{anyhow, Context, Error};
10 memory_storage::Storage,
17 use lemmy_utils::{location_info, settings::Settings, LemmyError};
19 use serde::{Deserialize, Serialize};
20 use std::{future::Future, pin::Pin};
23 pub fn send_activity<T, Kind>(
24 activity_sender: &QueueHandle,
26 actor: &dyn ActorType,
28 ) -> Result<(), LemmyError>
33 <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
35 if !Settings::get().federation.enabled {
39 let activity = activity.into_any_base()?;
40 let serialised_activity = serde_json::to_string(&activity)?;
43 check_is_apub_id_valid(&to_url)?;
46 // TODO: it would make sense to create a separate task for each destination server
47 let message = SendActivityTask {
48 activity: serialised_activity,
50 actor_id: actor.actor_id()?,
51 private_key: actor.private_key().context(location_info!())?,
53 activity_sender.queue::<SendActivityTask>(message)?;
58 #[derive(Clone, Debug, Deserialize, Serialize)]
59 struct SendActivityTask {
66 impl ActixJob for SendActivityTask {
68 type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
69 const NAME: &'static str = "SendActivityTask";
71 const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
72 const BACKOFF: Backoff = Backoff::Exponential(2);
74 fn run(self, state: Self::State) -> Self::Future {
76 for to_url in &self.to {
79 .post(to_url.as_str())
80 .header("Content-Type", "application/json");
84 self.activity.clone(),
86 self.private_key.to_owned(),
89 let signed = match signed {
93 // dont return an error because retrying would probably not fix the signing
97 if let Err(e) = signed.send().await {
100 "Failed to send activity {} to {}",
112 pub fn create_activity_queue() -> QueueHandle {
113 // Start the application server. This guards access to to the jobs store
114 let queue_handle = create_server(Storage::new());
116 // Configure and start our workers
117 WorkerConfig::new(|| MyState {
118 client: Client::default(),
120 .register::<SendActivityTask>()
121 .start(queue_handle.clone());