]> Untitled Git - lemmy.git/blob - crates/apub/src/activity_queue.rs
Fix changelog links
[lemmy.git] / crates / apub / src / activity_queue.rs
1 use crate::{
2   check_is_apub_id_valid,
3   extensions::signatures::sign_and_send,
4   insert_activity,
5   ActorType,
6   CommunityType,
7   APUB_JSON_CONTENT_TYPE,
8 };
9 use activitystreams::{
10   base::{BaseExt, Extends, ExtendsExt},
11   object::AsObject,
12 };
13 use anyhow::{anyhow, Context, Error};
14 use background_jobs::{
15   create_server,
16   memory_storage::Storage,
17   ActixJob,
18   Backoff,
19   MaxRetries,
20   QueueHandle,
21   WorkerConfig,
22 };
23 use itertools::Itertools;
24 use lemmy_db_schema::source::{community::Community, person::Person};
25 use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
26 use lemmy_websocket::LemmyContext;
27 use log::{debug, warn};
28 use reqwest::Client;
29 use serde::{Deserialize, Serialize};
30 use std::{collections::BTreeMap, env, fmt::Debug, future::Future, pin::Pin};
31 use url::Url;
32
33 /// Sends a local activity to a single, remote actor.
34 ///
35 /// * `activity` the apub activity to be sent
36 /// * `creator` the local actor which created the activity
37 /// * `inbox` the inbox url where the activity should be delivered to
38 pub(crate) async fn send_activity_single_dest<T, Kind>(
39   activity: T,
40   creator: &dyn ActorType,
41   inbox: Url,
42   context: &LemmyContext,
43 ) -> Result<(), LemmyError>
44 where
45   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
46   Kind: Serialize,
47   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
48 {
49   if check_is_apub_id_valid(&inbox, false).is_ok() {
50     debug!(
51       "Sending activity {:?} to {}",
52       &activity.id_unchecked().map(ToString::to_string),
53       &inbox
54     );
55     send_activity_internal(context, activity, creator, vec![inbox], true, true).await?;
56   }
57
58   Ok(())
59 }
60
61 /// From a local community, send activity to all remote followers.
62 ///
63 /// * `activity` the apub activity to send
64 /// * `community` the sending community
65 /// * `extra_inbox` actor inbox which should receive the activity, in addition to followers
66 pub(crate) async fn send_to_community_followers<T, Kind>(
67   activity: T,
68   community: &Community,
69   extra_inbox: Option<Url>,
70   context: &LemmyContext,
71 ) -> Result<(), LemmyError>
72 where
73   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
74   Kind: Serialize,
75   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
76 {
77   let extra_inbox: Vec<Url> = extra_inbox.into_iter().collect();
78   let follower_inboxes: Vec<Url> = vec![
79     community.get_follower_inboxes(context.pool()).await?,
80     extra_inbox,
81   ]
82   .iter()
83   .flatten()
84   .unique()
85   .filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname()))
86   .filter(|inbox| check_is_apub_id_valid(inbox, false).is_ok())
87   .map(|inbox| inbox.to_owned())
88   .collect();
89   debug!(
90     "Sending activity {:?} to followers of {}",
91     &activity.id_unchecked().map(ToString::to_string),
92     &community.actor_id
93   );
94
95   send_activity_internal(context, activity, community, follower_inboxes, true, false).await?;
96
97   Ok(())
98 }
99
100 /// Sends an activity from a local person to a remote community.
101 ///
102 /// * `activity` the activity to send
103 /// * `creator` the creator of the activity
104 /// * `community` the destination community
105 /// * `object_actor` if the object of the activity is an actor, it should be passed here so it can
106 ///                  be sent directly to the actor
107 ///
108 pub(crate) async fn send_to_community<T, Kind>(
109   activity: T,
110   creator: &Person,
111   community: &Community,
112   object_actor: Option<Url>,
113   context: &LemmyContext,
114 ) -> Result<(), LemmyError>
115 where
116   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
117   Kind: Serialize,
118   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
119 {
120   // if this is a local community, we need to do an announce from the community instead
121   if community.local {
122     community
123       .send_announce(activity.into_any_base()?, object_actor, context)
124       .await?;
125   } else {
126     let inbox = community.get_shared_inbox_or_inbox_url();
127     check_is_apub_id_valid(&inbox, false)?;
128     debug!(
129       "Sending activity {:?} to community {}",
130       &activity.id_unchecked().map(ToString::to_string),
131       &community.actor_id
132     );
133     // dont send to object_actor here, as that is responsibility of the community itself
134     send_activity_internal(context, activity, creator, vec![inbox], true, false).await?;
135   }
136
137   Ok(())
138 }
139
140 /// Sends notification to any persons mentioned in a comment
141 ///
142 /// * `creator` person who created the comment
143 /// * `mentions` list of inboxes of persons which are mentioned in the comment
144 /// * `activity` either a `Create/Note` or `Update/Note`
145 pub(crate) async fn send_comment_mentions<T, Kind>(
146   creator: &Person,
147   mentions: Vec<Url>,
148   activity: T,
149   context: &LemmyContext,
150 ) -> Result<(), LemmyError>
151 where
152   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
153   Kind: Serialize,
154   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
155 {
156   debug!(
157     "Sending mentions activity {:?} to {:?}",
158     &activity.id_unchecked(),
159     &mentions
160   );
161   let mentions = mentions
162     .iter()
163     .filter(|inbox| check_is_apub_id_valid(inbox, false).is_ok())
164     .map(|i| i.to_owned())
165     .collect();
166   send_activity_internal(
167     context, activity, creator, mentions, false, // Don't create a new DB row
168     false,
169   )
170   .await?;
171   Ok(())
172 }
173
174 /// Create new `SendActivityTasks`, which will deliver the given activity to inboxes, as well as
175 /// handling signing and retrying failed deliveres.
176 ///
177 /// The caller of this function needs to remove any blocked domains from `to`,
178 /// using `check_is_apub_id_valid()`.
179 async fn send_activity_internal<T, Kind>(
180   context: &LemmyContext,
181   activity: T,
182   actor: &dyn ActorType,
183   inboxes: Vec<Url>,
184   insert_into_db: bool,
185   sensitive: bool,
186 ) -> Result<(), LemmyError>
187 where
188   T: AsObject<Kind> + Extends<Kind> + Debug,
189   Kind: Serialize,
190   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
191 {
192   if !Settings::get().federation().enabled || inboxes.is_empty() {
193     return Ok(());
194   }
195
196   // Don't send anything to ourselves
197   let hostname = Settings::get().get_hostname_without_port()?;
198   let inboxes: Vec<&Url> = inboxes
199     .iter()
200     .filter(|i| i.domain().expect("valid inbox url") != hostname)
201     .collect();
202
203   let activity = activity.into_any_base()?;
204   let serialised_activity = serde_json::to_string(&activity)?;
205
206   // This is necessary because send_comment and send_comment_mentions
207   // might send the same ap_id
208   if insert_into_db {
209     let id = activity.id().context(location_info!())?;
210     insert_activity(id, activity.clone(), true, sensitive, context.pool()).await?;
211   }
212
213   for i in inboxes {
214     let message = SendActivityTask {
215       activity: serialised_activity.to_owned(),
216       inbox: i.to_owned(),
217       actor_id: actor.actor_id(),
218       private_key: actor.private_key().context(location_info!())?,
219     };
220     if env::var("LEMMY_TEST_SEND_SYNC").is_ok() {
221       do_send(message, &Client::default()).await?;
222     } else {
223       context.activity_queue.queue::<SendActivityTask>(message)?;
224     }
225   }
226
227   Ok(())
228 }
229
230 #[derive(Clone, Debug, Deserialize, Serialize)]
231 struct SendActivityTask {
232   activity: String,
233   inbox: Url,
234   actor_id: Url,
235   private_key: String,
236 }
237
238 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
239 /// if the delivery failed.
240 impl ActixJob for SendActivityTask {
241   type State = MyState;
242   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
243   const NAME: &'static str = "SendActivityTask";
244
245   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
246   const BACKOFF: Backoff = Backoff::Exponential(2);
247
248   fn run(self, state: Self::State) -> Self::Future {
249     Box::pin(async move { do_send(self, &state.client).await })
250   }
251 }
252
253 async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
254   let mut headers = BTreeMap::<String, String>::new();
255   headers.insert("Content-Type".into(), APUB_JSON_CONTENT_TYPE.to_string());
256   let result = sign_and_send(
257     client,
258     headers,
259     &task.inbox,
260     task.activity.clone(),
261     &task.actor_id,
262     task.private_key.to_owned(),
263   )
264   .await;
265
266   if let Err(e) = result {
267     warn!("{}", e);
268     return Err(anyhow!(
269       "Failed to send activity {} to {}",
270       &task.activity,
271       task.inbox
272     ));
273   }
274   Ok(())
275 }
276
277 pub fn create_activity_queue() -> QueueHandle {
278   // Start the application server. This guards access to to the jobs store
279   let queue_handle = create_server(Storage::new());
280
281   // Configure and start our workers
282   WorkerConfig::new(|| MyState {
283     client: Client::default(),
284   })
285   .register::<SendActivityTask>()
286   .start(queue_handle.clone());
287
288   queue_handle
289 }
290
291 #[derive(Clone)]
292 struct MyState {
293   pub client: Client,
294 }