]> Untitled Git - lemmy.git/blob - lemmy_apub/src/activity_queue.rs
c8c8af37e7cdf9bcdd2804795f547f7c29964792
[lemmy.git] / lemmy_apub / src / activity_queue.rs
1 use crate::{
2   check_is_apub_id_valid,
3   extensions::signatures::sign_and_send,
4   insert_activity,
5   ActorType,
6 };
7 use activitystreams::{
8   base::{BaseExt, Extends, ExtendsExt},
9   object::AsObject,
10 };
11 use anyhow::{anyhow, Context, Error};
12 use background_jobs::{
13   create_server,
14   memory_storage::Storage,
15   ActixJob,
16   Backoff,
17   MaxRetries,
18   QueueHandle,
19   WorkerConfig,
20 };
21 use itertools::Itertools;
22 use lemmy_db::{community::Community, user::User_, DbPool};
23 use lemmy_utils::{location_info, settings::Settings, LemmyError};
24 use lemmy_websocket::LemmyContext;
25 use log::{debug, warn};
26 use reqwest::Client;
27 use serde::{export::fmt::Debug, Deserialize, Serialize};
28 use std::{collections::BTreeMap, future::Future, pin::Pin};
29 use url::Url;
30
31 /// Sends a local activity to a single, remote actor.
32 ///
33 /// * `activity` the apub activity to be sent
34 /// * `creator` the local actor which created the activity
35 /// * `inbox` the inbox url where the activity should be delivered to
36 pub async fn send_activity_single_dest<T, Kind>(
37   activity: T,
38   creator: &dyn ActorType,
39   inbox: Url,
40   context: &LemmyContext,
41 ) -> Result<(), LemmyError>
42 where
43   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
44   Kind: Serialize,
45   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
46 {
47   if check_is_apub_id_valid(&inbox).is_ok() {
48     debug!(
49       "Sending activity {:?} to {}",
50       &activity.id_unchecked(),
51       &inbox
52     );
53     send_activity_internal(
54       context.activity_queue(),
55       activity,
56       creator,
57       vec![inbox],
58       context.pool(),
59       true,
60     )
61     .await?;
62   }
63
64   Ok(())
65 }
66
67 /// From a local community, send activity to all remote followers.
68 ///
69 /// * `activity` the apub activity to send
70 /// * `community` the sending community
71 /// * `sender_shared_inbox` in case of an announce, this should be the shared inbox of the inner
72 ///                         activities creator, as receiving a known activity will cause an error
73 pub async fn send_to_community_followers<T, Kind>(
74   activity: T,
75   community: &Community,
76   context: &LemmyContext,
77 ) -> Result<(), LemmyError>
78 where
79   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
80   Kind: Serialize,
81   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
82 {
83   let follower_inboxes: Vec<Url> = community
84     .get_follower_inboxes(context.pool())
85     .await?
86     .iter()
87     .unique()
88     .filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname))
89     .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
90     .map(|inbox| inbox.to_owned())
91     .collect();
92   debug!(
93     "Sending activity {:?} to followers of {}",
94     &activity.id_unchecked(),
95     &community.actor_id
96   );
97
98   send_activity_internal(
99     context.activity_queue(),
100     activity,
101     community,
102     follower_inboxes,
103     context.pool(),
104     true,
105   )
106   .await?;
107
108   Ok(())
109 }
110
111 /// Sends an activity from a local user to a remote community.
112 ///
113 /// * `activity` the activity to send
114 /// * `creator` the creator of the activity
115 /// * `community` the destination community
116 ///
117 pub async fn send_to_community<T, Kind>(
118   activity: T,
119   creator: &User_,
120   community: &Community,
121   context: &LemmyContext,
122 ) -> Result<(), LemmyError>
123 where
124   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
125   Kind: Serialize,
126   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
127 {
128   // if this is a local community, we need to do an announce from the community instead
129   if community.local {
130     community
131       .send_announce(activity.into_any_base()?, context)
132       .await?;
133   } else {
134     let inbox = community.get_shared_inbox_url()?;
135     check_is_apub_id_valid(&inbox)?;
136     debug!(
137       "Sending activity {:?} to community {}",
138       &activity.id_unchecked(),
139       &community.actor_id
140     );
141     send_activity_internal(
142       context.activity_queue(),
143       activity,
144       creator,
145       vec![inbox],
146       context.pool(),
147       true,
148     )
149     .await?;
150   }
151
152   Ok(())
153 }
154
155 /// Sends notification to any users mentioned in a comment
156 ///
157 /// * `creator` user who created the comment
158 /// * `mentions` list of inboxes of users which are mentioned in the comment
159 /// * `activity` either a `Create/Note` or `Update/Note`
160 pub async fn send_comment_mentions<T, Kind>(
161   creator: &User_,
162   mentions: Vec<Url>,
163   activity: T,
164   context: &LemmyContext,
165 ) -> Result<(), LemmyError>
166 where
167   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
168   Kind: Serialize,
169   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
170 {
171   debug!(
172     "Sending mentions activity {:?} to {:?}",
173     &activity.id_unchecked(),
174     &mentions
175   );
176   let mentions = mentions
177     .iter()
178     .filter(|inbox| check_is_apub_id_valid(inbox).is_ok())
179     .map(|i| i.to_owned())
180     .collect();
181   send_activity_internal(
182     context.activity_queue(),
183     activity,
184     creator,
185     mentions,
186     context.pool(),
187     false, // Don't create a new DB row
188   )
189   .await?;
190   Ok(())
191 }
192
193 /// Create new `SendActivityTasks`, which will deliver the given activity to inboxes, as well as
194 /// handling signing and retrying failed deliveres.
195 ///
196 /// The caller of this function needs to remove any blocked domains from `to`,
197 /// using `check_is_apub_id_valid()`.
198 async fn send_activity_internal<T, Kind>(
199   activity_sender: &QueueHandle,
200   activity: T,
201   actor: &dyn ActorType,
202   inboxes: Vec<Url>,
203   pool: &DbPool,
204   insert_into_db: bool,
205 ) -> Result<(), LemmyError>
206 where
207   T: AsObject<Kind> + Extends<Kind> + Debug,
208   Kind: Serialize,
209   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
210 {
211   if !Settings::get().federation.enabled || inboxes.is_empty() {
212     return Ok(());
213   }
214
215   let activity = activity.into_any_base()?;
216   let serialised_activity = serde_json::to_string(&activity)?;
217
218   // This is necessary because send_comment and send_comment_mentions
219   // might send the same ap_id
220   if insert_into_db {
221     let id = activity.id().context(location_info!())?;
222     insert_activity(id, actor.user_id(), activity.clone(), true, pool).await?;
223   }
224
225   for i in inboxes {
226     let message = SendActivityTask {
227       activity: serialised_activity.to_owned(),
228       inbox: i,
229       actor_id: actor.actor_id()?,
230       private_key: actor.private_key().context(location_info!())?,
231     };
232     activity_sender.queue::<SendActivityTask>(message)?;
233   }
234
235   Ok(())
236 }
237
238 #[derive(Clone, Debug, Deserialize, Serialize)]
239 struct SendActivityTask {
240   activity: String,
241   inbox: Url,
242   actor_id: Url,
243   private_key: String,
244 }
245
246 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
247 /// if the delivery failed.
248 impl ActixJob for SendActivityTask {
249   type State = MyState;
250   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
251   const NAME: &'static str = "SendActivityTask";
252
253   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
254   const BACKOFF: Backoff = Backoff::Exponential(2);
255
256   fn run(self, state: Self::State) -> Self::Future {
257     Box::pin(async move {
258       let mut headers = BTreeMap::<String, String>::new();
259       headers.insert("Content-Type".into(), "application/json".into());
260       let result = sign_and_send(
261         &state.client,
262         headers,
263         &self.inbox,
264         self.activity.clone(),
265         &self.actor_id,
266         self.private_key.to_owned(),
267       )
268       .await;
269
270       if let Err(e) = result {
271         warn!("{}", e);
272         return Err(anyhow!(
273           "Failed to send activity {} to {}",
274           &self.activity,
275           self.inbox
276         ));
277       }
278       Ok(())
279     })
280   }
281 }
282
283 pub fn create_activity_queue() -> QueueHandle {
284   // Start the application server. This guards access to to the jobs store
285   let queue_handle = create_server(Storage::new());
286
287   // Configure and start our workers
288   WorkerConfig::new(|| MyState {
289     client: Client::default(),
290   })
291   .register::<SendActivityTask>()
292   .start(queue_handle.clone());
293
294   queue_handle
295 }
296
297 #[derive(Clone)]
298 struct MyState {
299   pub client: Client,
300 }