]> Untitled Git - lemmy.git/blob - crates/apub/src/activity_queue.rs
Rewrite fetcher (#1792)
[lemmy.git] / crates / apub / src / activity_queue.rs
1 use crate::{
2   activities::community::announce::{AnnouncableActivities, AnnounceActivity},
3   extensions::signatures::sign_and_send,
4   insert_activity,
5   ActorType,
6   APUB_JSON_CONTENT_TYPE,
7 };
8 use anyhow::{anyhow, Context, Error};
9 use background_jobs::{
10   create_server,
11   memory_storage::Storage,
12   ActixJob,
13   Backoff,
14   MaxRetries,
15   QueueHandle,
16   WorkerConfig,
17 };
18 use lemmy_db_schema::source::community::Community;
19 use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
20 use lemmy_websocket::LemmyContext;
21 use log::{info, warn};
22 use reqwest::Client;
23 use serde::{Deserialize, Serialize};
24 use std::{collections::BTreeMap, env, fmt::Debug, future::Future, pin::Pin};
25 use url::Url;
26
27 pub(crate) async fn send_to_community_new(
28   activity: AnnouncableActivities,
29   activity_id: &Url,
30   actor: &dyn ActorType,
31   community: &Community,
32   additional_inboxes: Vec<Url>,
33   context: &LemmyContext,
34 ) -> Result<(), LemmyError> {
35   // if this is a local community, we need to do an announce from the community instead
36   if community.local {
37     insert_activity(activity_id, activity.clone(), true, false, context.pool()).await?;
38     AnnounceActivity::send(activity, community, additional_inboxes, context).await?;
39   } else {
40     let mut inboxes = additional_inboxes;
41     inboxes.push(community.get_shared_inbox_or_inbox_url());
42     send_activity_new(context, &activity, activity_id, actor, inboxes, false).await?;
43   }
44
45   Ok(())
46 }
47
48 pub(crate) async fn send_activity_new<T>(
49   context: &LemmyContext,
50   activity: &T,
51   activity_id: &Url,
52   actor: &dyn ActorType,
53   inboxes: Vec<Url>,
54   sensitive: bool,
55 ) -> Result<(), LemmyError>
56 where
57   T: Serialize,
58 {
59   if !Settings::get().federation.enabled || inboxes.is_empty() {
60     return Ok(());
61   }
62
63   info!("Sending activity {}", activity_id.to_string());
64
65   // Don't send anything to ourselves
66   // TODO: this should be a debug assert
67   let hostname = Settings::get().get_hostname_without_port()?;
68   let inboxes: Vec<&Url> = inboxes
69     .iter()
70     .filter(|i| i.domain().expect("valid inbox url") != hostname)
71     .collect();
72
73   let serialised_activity = serde_json::to_string(&activity)?;
74
75   insert_activity(
76     activity_id,
77     serialised_activity.clone(),
78     true,
79     sensitive,
80     context.pool(),
81   )
82   .await?;
83
84   for i in inboxes {
85     let message = SendActivityTask {
86       activity: serialised_activity.to_owned(),
87       inbox: i.to_owned(),
88       actor_id: actor.actor_id(),
89       private_key: actor.private_key().context(location_info!())?,
90     };
91     if env::var("LEMMY_TEST_SEND_SYNC").is_ok() {
92       do_send(message, &Client::default()).await?;
93     } else {
94       context.activity_queue.queue::<SendActivityTask>(message)?;
95     }
96   }
97
98   Ok(())
99 }
100
101 #[derive(Clone, Debug, Deserialize, Serialize)]
102 struct SendActivityTask {
103   activity: String,
104   inbox: Url,
105   actor_id: Url,
106   private_key: String,
107 }
108
109 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
110 /// if the delivery failed.
111 impl ActixJob for SendActivityTask {
112   type State = MyState;
113   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
114   const NAME: &'static str = "SendActivityTask";
115
116   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
117   const BACKOFF: Backoff = Backoff::Exponential(2);
118
119   fn run(self, state: Self::State) -> Self::Future {
120     Box::pin(async move { do_send(self, &state.client).await })
121   }
122 }
123
124 async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
125   let mut headers = BTreeMap::<String, String>::new();
126   headers.insert("Content-Type".into(), APUB_JSON_CONTENT_TYPE.to_string());
127   let result = sign_and_send(
128     client,
129     headers,
130     &task.inbox,
131     task.activity.clone(),
132     &task.actor_id,
133     task.private_key.to_owned(),
134   )
135   .await;
136
137   if let Err(e) = result {
138     warn!("{}", e);
139     return Err(anyhow!(
140       "Failed to send activity {} to {}",
141       &task.activity,
142       task.inbox
143     ));
144   }
145   Ok(())
146 }
147
148 pub fn create_activity_queue() -> QueueHandle {
149   // Start the application server. This guards access to to the jobs store
150   let queue_handle = create_server(Storage::new());
151
152   // Configure and start our workers
153   WorkerConfig::new(|| MyState {
154     client: Client::default(),
155   })
156   .register::<SendActivityTask>()
157   .start(queue_handle.clone());
158
159   queue_handle
160 }
161
162 #[derive(Clone)]
163 struct MyState {
164   pub client: Client,
165 }