]> Untitled Git - lemmy.git/blob - crates/apub_lib/src/activity_queue.rs
Add console-subscriber (#2003)
[lemmy.git] / crates / apub_lib / src / activity_queue.rs
1 use crate::{signatures::sign_and_send, traits::ActorType};
2 use anyhow::{anyhow, Context, Error};
3 use background_jobs::{
4   memory_storage::Storage,
5   ActixJob,
6   Backoff,
7   Manager,
8   MaxRetries,
9   QueueHandle,
10   WorkerConfig,
11 };
12 use lemmy_utils::{location_info, LemmyError};
13 use reqwest_middleware::ClientWithMiddleware;
14 use serde::{Deserialize, Serialize};
15 use std::{env, fmt::Debug, future::Future, pin::Pin};
16 use tracing::{info, warn};
17 use url::Url;
18
19 pub async fn send_activity(
20   activity_id: &Url,
21   actor: &dyn ActorType,
22   inboxes: Vec<&Url>,
23   activity: String,
24   client: &ClientWithMiddleware,
25   activity_queue: &QueueHandle,
26 ) -> Result<(), LemmyError> {
27   for i in inboxes {
28     let message = SendActivityTask {
29       activity_id: activity_id.clone(),
30       inbox: i.to_owned(),
31       actor_id: actor.actor_id(),
32       activity: activity.clone(),
33       private_key: actor.private_key().context(location_info!())?,
34     };
35     if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
36       do_send(message, client).await?;
37     } else {
38       activity_queue.queue::<SendActivityTask>(message).await?;
39     }
40   }
41
42   Ok(())
43 }
44
45 #[derive(Clone, Debug, Deserialize, Serialize)]
46 struct SendActivityTask {
47   activity_id: Url,
48   inbox: Url,
49   actor_id: Url,
50   activity: String,
51   private_key: String,
52 }
53
54 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
55 /// if the delivery failed.
56 impl ActixJob for SendActivityTask {
57   type State = MyState;
58   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
59   const NAME: &'static str = "SendActivityTask";
60
61   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
62   const BACKOFF: Backoff = Backoff::Exponential(2);
63
64   fn run(self, state: Self::State) -> Self::Future {
65     Box::pin(async move { do_send(self, &state.client).await })
66   }
67 }
68
69 async fn do_send(task: SendActivityTask, client: &ClientWithMiddleware) -> Result<(), Error> {
70   info!("Sending {} to {}", task.activity_id, task.inbox);
71   let result = sign_and_send(
72     client,
73     &task.inbox,
74     task.activity.clone(),
75     &task.actor_id,
76     task.private_key.to_owned(),
77   )
78   .await;
79
80   match result {
81     Ok(o) => {
82       if !o.status().is_success() {
83         let status = o.status();
84         let text = o.text().await?;
85
86         warn!(
87           "Send {} to {} failed with status {}: {}",
88           task.activity_id, task.inbox, status, text,
89         );
90       }
91     }
92     Err(e) => {
93       return Err(anyhow!(
94         "Failed to send activity {} to {}: {}",
95         &task.activity_id,
96         task.inbox,
97         e
98       ));
99     }
100   }
101   Ok(())
102 }
103
104 pub fn create_activity_queue(client: ClientWithMiddleware) -> Manager {
105   // Configure and start our workers
106   WorkerConfig::new_managed(Storage::new(), move |_| MyState {
107     client: client.clone(),
108   })
109   .register::<SendActivityTask>()
110   .start()
111 }
112
113 #[derive(Clone)]
114 struct MyState {
115   pub client: ClientWithMiddleware,
116 }