]> Untitled Git - lemmy.git/blob - lemmy_apub/src/activity_queue.rs
Merge remote-tracking branch 'origin/split-db-workspace' into move_views_to_diesel_split
[lemmy.git] / lemmy_apub / src / activity_queue.rs
1 use crate::{
2   check_is_apub_id_valid,
3   extensions::signatures::sign_and_send,
4   insert_activity,
5   ActorType,
6 };
7 use activitystreams::{
8   base::{BaseExt, Extends, ExtendsExt},
9   object::AsObject,
10 };
11 use anyhow::{anyhow, Context, Error};
12 use background_jobs::{
13   create_server,
14   memory_storage::Storage,
15   ActixJob,
16   Backoff,
17   MaxRetries,
18   QueueHandle,
19   WorkerConfig,
20 };
21 use itertools::Itertools;
22 use lemmy_db::DbPool;
23 use lemmy_db_schema::source::{community::Community, user::User_};
24 use lemmy_utils::{location_info, settings::Settings, LemmyError};
25 use lemmy_websocket::LemmyContext;
26 use log::{debug, warn};
27 use reqwest::Client;
28 use serde::{export::fmt::Debug, Deserialize, Serialize};
29 use std::{collections::BTreeMap, env, future::Future, pin::Pin};
30 use url::Url;
31
32 /// Sends a local activity to a single, remote actor.
33 ///
34 /// * `activity` the apub activity to be sent
35 /// * `creator` the local actor which created the activity
36 /// * `inbox` the inbox url where the activity should be delivered to
37 pub(crate) async fn send_activity_single_dest<T, Kind>(
38   activity: T,
39   creator: &dyn ActorType,
40   inbox: Url,
41   context: &LemmyContext,
42 ) -> Result<(), LemmyError>
43 where
44   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
45   Kind: Serialize,
46   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
47 {
48   if check_is_apub_id_valid(&inbox).is_ok() {
49     debug!(
50       "Sending activity {:?} to {}",
51       &activity.id_unchecked(),
52       &inbox
53     );
54     send_activity_internal(
55       context.activity_queue(),
56       activity,
57       creator,
58       vec![inbox],
59       context.pool(),
60       true,
61       true,
62     )
63     .await?;
64   }
65
66   Ok(())
67 }
68
69 /// From a local community, send activity to all remote followers.
70 ///
71 /// * `activity` the apub activity to send
72 /// * `community` the sending community
73 /// * `sender_shared_inbox` in case of an announce, this should be the shared inbox of the inner
74 ///                         activities creator, as receiving a known activity will cause an error
75 pub(crate) async fn send_to_community_followers<T, Kind>(
76   activity: T,
77   community: &Community,
78   context: &LemmyContext,
79 ) -> Result<(), LemmyError>
80 where
81   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
82   Kind: Serialize,
83   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
84 {
85   let follower_inboxes: Vec<Url> = community
86     .get_follower_inboxes(context.pool())
87     .await?
88     .iter()
89     .unique()
90     .filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname))
91     .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
92     .map(|inbox| inbox.to_owned())
93     .collect();
94   debug!(
95     "Sending activity {:?} to followers of {}",
96     &activity.id_unchecked(),
97     &community.actor_id
98   );
99
100   send_activity_internal(
101     context.activity_queue(),
102     activity,
103     community,
104     follower_inboxes,
105     context.pool(),
106     true,
107     false,
108   )
109   .await?;
110
111   Ok(())
112 }
113
114 /// Sends an activity from a local user to a remote community.
115 ///
116 /// * `activity` the activity to send
117 /// * `creator` the creator of the activity
118 /// * `community` the destination community
119 ///
120 pub(crate) async fn send_to_community<T, Kind>(
121   activity: T,
122   creator: &User_,
123   community: &Community,
124   context: &LemmyContext,
125 ) -> Result<(), LemmyError>
126 where
127   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
128   Kind: Serialize,
129   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
130 {
131   // if this is a local community, we need to do an announce from the community instead
132   if community.local {
133     community
134       .send_announce(activity.into_any_base()?, context)
135       .await?;
136   } else {
137     let inbox = community.get_shared_inbox_url()?;
138     check_is_apub_id_valid(&inbox)?;
139     debug!(
140       "Sending activity {:?} to community {}",
141       &activity.id_unchecked(),
142       &community.actor_id
143     );
144     send_activity_internal(
145       context.activity_queue(),
146       activity,
147       creator,
148       vec![inbox],
149       context.pool(),
150       true,
151       false,
152     )
153     .await?;
154   }
155
156   Ok(())
157 }
158
159 /// Sends notification to any users mentioned in a comment
160 ///
161 /// * `creator` user who created the comment
162 /// * `mentions` list of inboxes of users which are mentioned in the comment
163 /// * `activity` either a `Create/Note` or `Update/Note`
164 pub(crate) async fn send_comment_mentions<T, Kind>(
165   creator: &User_,
166   mentions: Vec<Url>,
167   activity: T,
168   context: &LemmyContext,
169 ) -> Result<(), LemmyError>
170 where
171   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
172   Kind: Serialize,
173   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
174 {
175   debug!(
176     "Sending mentions activity {:?} to {:?}",
177     &activity.id_unchecked(),
178     &mentions
179   );
180   let mentions = mentions
181     .iter()
182     .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
183     .map(|i| i.to_owned())
184     .collect();
185   send_activity_internal(
186     context.activity_queue(),
187     activity,
188     creator,
189     mentions,
190     context.pool(),
191     false, // Don't create a new DB row
192     false,
193   )
194   .await?;
195   Ok(())
196 }
197
198 /// Create new `SendActivityTasks`, which will deliver the given activity to inboxes, as well as
199 /// handling signing and retrying failed deliveres.
200 ///
201 /// The caller of this function needs to remove any blocked domains from `to`,
202 /// using `check_is_apub_id_valid()`.
203 async fn send_activity_internal<T, Kind>(
204   activity_sender: &QueueHandle,
205   activity: T,
206   actor: &dyn ActorType,
207   inboxes: Vec<Url>,
208   pool: &DbPool,
209   insert_into_db: bool,
210   sensitive: bool,
211 ) -> Result<(), LemmyError>
212 where
213   T: AsObject<Kind> + Extends<Kind> + Debug,
214   Kind: Serialize,
215   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
216 {
217   if !Settings::get().federation.enabled || inboxes.is_empty() {
218     return Ok(());
219   }
220
221   let activity = activity.into_any_base()?;
222   let serialised_activity = serde_json::to_string(&activity)?;
223
224   // This is necessary because send_comment and send_comment_mentions
225   // might send the same ap_id
226   if insert_into_db {
227     let id = activity.id().context(location_info!())?;
228     insert_activity(id, activity.clone(), true, sensitive, pool).await?;
229   }
230
231   for i in inboxes {
232     let message = SendActivityTask {
233       activity: serialised_activity.to_owned(),
234       inbox: i,
235       actor_id: actor.actor_id()?,
236       private_key: actor.private_key().context(location_info!())?,
237     };
238     if env::var("LEMMY_TEST_SEND_SYNC").is_ok() {
239       do_send(message, &Client::default()).await?;
240     } else {
241       activity_sender.queue::<SendActivityTask>(message)?;
242     }
243   }
244
245   Ok(())
246 }
247
248 #[derive(Clone, Debug, Deserialize, Serialize)]
249 struct SendActivityTask {
250   activity: String,
251   inbox: Url,
252   actor_id: Url,
253   private_key: String,
254 }
255
256 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
257 /// if the delivery failed.
258 impl ActixJob for SendActivityTask {
259   type State = MyState;
260   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
261   const NAME: &'static str = "SendActivityTask";
262
263   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
264   const BACKOFF: Backoff = Backoff::Exponential(2);
265
266   fn run(self, state: Self::State) -> Self::Future {
267     Box::pin(async move { do_send(self, &state.client).await })
268   }
269 }
270
271 async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
272   let mut headers = BTreeMap::<String, String>::new();
273   headers.insert("Content-Type".into(), "application/json".into());
274   let result = sign_and_send(
275     client,
276     headers,
277     &task.inbox,
278     task.activity.clone(),
279     &task.actor_id,
280     task.private_key.to_owned(),
281   )
282   .await;
283
284   if let Err(e) = result {
285     warn!("{}", e);
286     return Err(anyhow!(
287       "Failed to send activity {} to {}",
288       &task.activity,
289       task.inbox
290     ));
291   }
292   Ok(())
293 }
294
295 pub fn create_activity_queue() -> QueueHandle {
296   // Start the application server. This guards access to to the jobs store
297   let queue_handle = create_server(Storage::new());
298
299   // Configure and start our workers
300   WorkerConfig::new(|| MyState {
301     client: Client::default(),
302   })
303   .register::<SendActivityTask>()
304   .start(queue_handle.clone());
305
306   queue_handle
307 }
308
309 #[derive(Clone)]
310 struct MyState {
311   pub client: Client,
312 }