1 use crate::apub::{check_is_apub_id_valid, extensions::signatures::sign, ActorType};
3 base::{Extends, ExtendsExt},
6 use anyhow::{anyhow, Context, Error};
10 memory_storage::Storage,
17 use lemmy_utils::{location_info, settings::Settings, LemmyError};
19 use serde::{Deserialize, Serialize};
20 use std::{future::Future, pin::Pin};
23 pub fn send_activity<T, Kind>(
24 activity_sender: &QueueHandle,
26 actor: &dyn ActorType,
28 ) -> Result<(), LemmyError>
33 <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
35 if !Settings::get().federation.enabled {
39 let activity = activity.into_any_base()?;
40 let serialised_activity = serde_json::to_string(&activity)?;
43 check_is_apub_id_valid(&to_url)?;
46 // TODO: it would make sense to create a separate task for each destination server
47 let message = SendActivityTask {
48 activity: serialised_activity,
50 actor_id: actor.actor_id()?,
51 private_key: actor.private_key().context(location_info!())?,
53 activity_sender.queue::<SendActivityTask>(message)?;
58 #[derive(Clone, Debug, Deserialize, Serialize)]
59 struct SendActivityTask {
66 impl ActixJob for SendActivityTask {
68 type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
69 const NAME: &'static str = "SendActivityTask";
71 const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
72 const BACKOFF: Backoff = Backoff::Exponential(2);
74 fn run(self, state: Self::State) -> Self::Future {
76 for to_url in &self.to {
79 .post(to_url.as_str())
80 .header("Content-Type", "application/json");
82 // TODO: i believe we have to do the signing in here because it is only valid for a few seconds
85 self.activity.clone(),
87 self.private_key.to_owned(),
90 let signed = match signed {
94 // dont return an error because retrying would probably not fix the signing
98 if let Err(e) = signed.send().await {
101 "Failed to send activity {} to {}",
113 pub fn create_activity_queue() -> QueueHandle {
114 // Start the application server. This guards access to to the jobs store
115 let queue_handle = create_server(Storage::new());
117 // Configure and start our workers
118 WorkerConfig::new(|| MyState {
119 client: Client::default(),
121 .register::<SendActivityTask>()
122 .start(queue_handle.clone());