]> Untitled Git - lemmy.git/blob - lemmy_apub/src/activity_queue.rs
Merge branch 'main' into no-send-blocked-dess
[lemmy.git] / lemmy_apub / src / activity_queue.rs
1 use crate::{
2   check_is_apub_id_valid,
3   community::do_announce,
4   extensions::signatures::sign_and_send,
5   insert_activity,
6   ActorType,
7 };
8 use activitystreams::{
9   base::{Extends, ExtendsExt},
10   object::AsObject,
11 };
12 use anyhow::{anyhow, Context, Error};
13 use background_jobs::{
14   create_server,
15   memory_storage::Storage,
16   ActixJob,
17   Backoff,
18   MaxRetries,
19   QueueHandle,
20   WorkerConfig,
21 };
22 use itertools::Itertools;
23 use lemmy_db::{community::Community, user::User_, DbPool};
24 use lemmy_utils::{location_info, settings::Settings, LemmyError};
25 use lemmy_websocket::LemmyContext;
26 use log::warn;
27 use reqwest::Client;
28 use serde::{Deserialize, Serialize};
29 use std::{collections::BTreeMap, future::Future, pin::Pin};
30 use url::Url;
31
32 pub async fn send_activity_single_dest<T, Kind>(
33   activity: T,
34   creator: &dyn ActorType,
35   to: Url,
36   context: &LemmyContext,
37 ) -> Result<(), LemmyError>
38 where
39   T: AsObject<Kind> + Extends<Kind>,
40   Kind: Serialize,
41   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
42 {
43   if check_is_apub_id_valid(&to).is_ok() {
44     send_activity_internal(
45       context.activity_queue(),
46       activity,
47       creator,
48       vec![to],
49       context.pool(),
50     )
51     .await?;
52   }
53
54   Ok(())
55 }
56
57 pub async fn send_to_community_followers<T, Kind>(
58   activity: T,
59   community: &Community,
60   context: &LemmyContext,
61   sender_shared_inbox: Option<Url>,
62 ) -> Result<(), LemmyError>
63 where
64   T: AsObject<Kind> + Extends<Kind>,
65   Kind: Serialize,
66   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
67 {
68   // dont send to the local instance, nor to the instance where the activity originally came from,
69   // because that would result in a database error (same data inserted twice)
70   let community_shared_inbox = community.get_shared_inbox_url()?;
71   let to: Vec<Url> = community
72     .get_follower_inboxes(context.pool())
73     .await?
74     .iter()
75     .filter(|inbox| Some(inbox) != sender_shared_inbox.as_ref().as_ref())
76     .filter(|inbox| inbox != &&community_shared_inbox)
77     .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
78     .unique()
79     .map(|inbox| inbox.to_owned())
80     .collect();
81
82   send_activity_internal(
83     context.activity_queue(),
84     activity,
85     community,
86     to,
87     context.pool(),
88   )
89   .await?;
90
91   Ok(())
92 }
93
94 pub async fn send_to_community<T, Kind>(
95   creator: &User_,
96   community: &Community,
97   activity: T,
98   context: &LemmyContext,
99 ) -> Result<(), LemmyError>
100 where
101   T: AsObject<Kind> + Extends<Kind>,
102   Kind: Serialize,
103   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
104 {
105   // if this is a local community, we need to do an announce from the community instead
106   if community.local {
107     do_announce(activity.into_any_base()?, &community, creator, context).await?;
108   } else {
109     let inbox = community.get_shared_inbox_url()?;
110     check_is_apub_id_valid(&inbox)?;
111     send_activity_internal(
112       context.activity_queue(),
113       activity,
114       creator,
115       vec![inbox],
116       context.pool(),
117     )
118     .await?;
119   }
120
121   Ok(())
122 }
123
124 pub async fn send_comment_mentions<T, Kind>(
125   creator: &User_,
126   mentions: Vec<Url>,
127   activity: T,
128   context: &LemmyContext,
129 ) -> Result<(), LemmyError>
130 where
131   T: AsObject<Kind> + Extends<Kind>,
132   Kind: Serialize,
133   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
134 {
135   let mentions = mentions
136     .iter()
137     .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
138     .map(|i| i.to_owned())
139     .collect();
140   send_activity_internal(
141     context.activity_queue(),
142     activity,
143     creator,
144     mentions,
145     context.pool(),
146   )
147   .await?;
148   Ok(())
149 }
150
151 /// Asynchronously sends the given `activity` from `actor` to every inbox URL in `to`.
152 ///
153 /// The caller of this function needs to remove any blocked domains from `to`,
154 /// using `check_is_apub_id_valid()`.
155 async fn send_activity_internal<T, Kind>(
156   activity_sender: &QueueHandle,
157   activity: T,
158   actor: &dyn ActorType,
159   to: Vec<Url>,
160   pool: &DbPool,
161 ) -> Result<(), LemmyError>
162 where
163   T: AsObject<Kind> + Extends<Kind>,
164   Kind: Serialize,
165   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
166 {
167   if !Settings::get().federation.enabled {
168     return Ok(());
169   }
170
171   for to_url in &to {
172     assert!(check_is_apub_id_valid(&to_url).is_ok());
173   }
174
175   let activity = activity.into_any_base()?;
176   let serialised_activity = serde_json::to_string(&activity)?;
177   insert_activity(actor.user_id(), serialised_activity.clone(), true, pool).await?;
178
179   // TODO: it would make sense to create a separate task for each destination server
180   let message = SendActivityTask {
181     activity: serialised_activity,
182     to,
183     actor_id: actor.actor_id()?,
184     private_key: actor.private_key().context(location_info!())?,
185   };
186
187   activity_sender.queue::<SendActivityTask>(message)?;
188
189   Ok(())
190 }
191
192 #[derive(Clone, Debug, Deserialize, Serialize)]
193 struct SendActivityTask {
194   activity: String,
195   to: Vec<Url>,
196   actor_id: Url,
197   private_key: String,
198 }
199
200 impl ActixJob for SendActivityTask {
201   type State = MyState;
202   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
203   const NAME: &'static str = "SendActivityTask";
204
205   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
206   const BACKOFF: Backoff = Backoff::Exponential(2);
207
208   fn run(self, state: Self::State) -> Self::Future {
209     Box::pin(async move {
210       for to_url in &self.to {
211         let mut headers = BTreeMap::<String, String>::new();
212         headers.insert("Content-Type".into(), "application/json".into());
213         let result = sign_and_send(
214           &state.client,
215           headers,
216           to_url,
217           self.activity.clone(),
218           &self.actor_id,
219           self.private_key.to_owned(),
220         )
221         .await;
222
223         if let Err(e) = result {
224           warn!("{}", e);
225           return Err(anyhow!(
226             "Failed to send activity {} to {}",
227             &self.activity,
228             to_url
229           ));
230         }
231       }
232       Ok(())
233     })
234   }
235 }
236
237 pub fn create_activity_queue() -> QueueHandle {
238   // Start the application server. This guards access to to the jobs store
239   let queue_handle = create_server(Storage::new());
240
241   // Configure and start our workers
242   WorkerConfig::new(|| MyState {
243     client: Client::default(),
244   })
245   .register::<SendActivityTask>()
246   .start(queue_handle.clone());
247
248   queue_handle
249 }
250
251 #[derive(Clone)]
252 struct MyState {
253   pub client: Client,
254 }