]> Untitled Git - lemmy.git/blob - crates/apub/src/activity_queue.rs
Rewrite activitypub following, person, community, pm (#1692)
[lemmy.git] / crates / apub / src / activity_queue.rs
1 use crate::{
2   activities::community::announce::{AnnouncableActivities, AnnounceActivity},
3   check_is_apub_id_valid,
4   extensions::signatures::sign_and_send,
5   insert_activity,
6   ActorType,
7   CommunityType,
8   APUB_JSON_CONTENT_TYPE,
9 };
10 use activitystreams::{
11   base::{BaseExt, Extends, ExtendsExt},
12   object::AsObject,
13 };
14 use anyhow::{anyhow, Context, Error};
15 use background_jobs::{
16   create_server,
17   memory_storage::Storage,
18   ActixJob,
19   Backoff,
20   MaxRetries,
21   QueueHandle,
22   WorkerConfig,
23 };
24 use itertools::Itertools;
25 use lemmy_db_schema::source::{community::Community, person::Person};
26 use lemmy_utils::{location_info, settings::structs::Settings, LemmyError};
27 use lemmy_websocket::LemmyContext;
28 use log::{debug, info, warn};
29 use reqwest::Client;
30 use serde::{Deserialize, Serialize};
31 use std::{collections::BTreeMap, env, fmt::Debug, future::Future, pin::Pin};
32 use url::Url;
33
34 /// From a local community, send activity to all remote followers.
35 ///
36 /// * `activity` the apub activity to send
37 /// * `community` the sending community
38 /// * `extra_inbox` actor inbox which should receive the activity, in addition to followers
39 pub(crate) async fn send_to_community_followers<T, Kind>(
40   activity: T,
41   community: &Community,
42   extra_inbox: Option<Url>,
43   context: &LemmyContext,
44 ) -> Result<(), LemmyError>
45 where
46   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
47   Kind: Serialize,
48   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
49 {
50   let extra_inbox: Vec<Url> = extra_inbox.into_iter().collect();
51   let follower_inboxes: Vec<Url> = vec![
52     community.get_follower_inboxes(context.pool()).await?,
53     extra_inbox,
54   ]
55   .iter()
56   .flatten()
57   .unique()
58   .filter(|inbox| inbox.host_str() != Some(&Settings::get().hostname))
59   .filter(|inbox| check_is_apub_id_valid(inbox, false).is_ok())
60   .map(|inbox| inbox.to_owned())
61   .collect();
62   debug!(
63     "Sending activity {:?} to followers of {}",
64     &activity.id_unchecked().map(ToString::to_string),
65     &community.actor_id
66   );
67
68   send_activity_internal(context, activity, community, follower_inboxes, true, false).await?;
69
70   Ok(())
71 }
72
73 /// Sends an activity from a local person to a remote community.
74 ///
75 /// * `activity` the activity to send
76 /// * `creator` the creator of the activity
77 /// * `community` the destination community
78 /// * `object_actor` if the object of the activity is an actor, it should be passed here so it can
79 ///                  be sent directly to the actor
80 ///
81 pub(crate) async fn send_to_community<T, Kind>(
82   activity: T,
83   creator: &Person,
84   community: &Community,
85   object_actor: Option<Url>,
86   context: &LemmyContext,
87 ) -> Result<(), LemmyError>
88 where
89   T: AsObject<Kind> + Extends<Kind> + Debug + BaseExt<Kind>,
90   Kind: Serialize,
91   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
92 {
93   // if this is a local community, we need to do an announce from the community instead
94   if community.local {
95     community
96       .send_announce(activity.into_any_base()?, object_actor, context)
97       .await?;
98   } else {
99     let inbox = community.get_shared_inbox_or_inbox_url();
100     check_is_apub_id_valid(&inbox, false)?;
101     debug!(
102       "Sending activity {:?} to community {}",
103       &activity.id_unchecked().map(ToString::to_string),
104       &community.actor_id
105     );
106     // dont send to object_actor here, as that is responsibility of the community itself
107     send_activity_internal(context, activity, creator, vec![inbox], true, false).await?;
108   }
109
110   Ok(())
111 }
112
113 pub(crate) async fn send_to_community_new(
114   activity: AnnouncableActivities,
115   activity_id: &Url,
116   actor: &dyn ActorType,
117   community: &Community,
118   additional_inboxes: Vec<Url>,
119   context: &LemmyContext,
120 ) -> Result<(), LemmyError> {
121   // if this is a local community, we need to do an announce from the community instead
122   if community.local {
123     insert_activity(activity_id, activity.clone(), true, false, context.pool()).await?;
124     AnnounceActivity::send(activity, community, additional_inboxes, context).await?;
125   } else {
126     let mut inboxes = additional_inboxes;
127     inboxes.push(community.get_shared_inbox_or_inbox_url());
128     send_activity_new(context, &activity, activity_id, actor, inboxes, false).await?;
129   }
130
131   Ok(())
132 }
133
134 pub(crate) async fn send_activity_new<T>(
135   context: &LemmyContext,
136   activity: &T,
137   activity_id: &Url,
138   actor: &dyn ActorType,
139   inboxes: Vec<Url>,
140   sensitive: bool,
141 ) -> Result<(), LemmyError>
142 where
143   T: Serialize,
144 {
145   if !Settings::get().federation.enabled || inboxes.is_empty() {
146     return Ok(());
147   }
148
149   info!("Sending activity {}", activity_id.to_string());
150
151   // Don't send anything to ourselves
152   // TODO: this should be a debug assert
153   let hostname = Settings::get().get_hostname_without_port()?;
154   let inboxes: Vec<&Url> = inboxes
155     .iter()
156     .filter(|i| i.domain().expect("valid inbox url") != hostname)
157     .collect();
158
159   let serialised_activity = serde_json::to_string(&activity)?;
160
161   insert_activity(
162     activity_id,
163     serialised_activity.clone(),
164     true,
165     sensitive,
166     context.pool(),
167   )
168   .await?;
169
170   for i in inboxes {
171     let message = SendActivityTask {
172       activity: serialised_activity.to_owned(),
173       inbox: i.to_owned(),
174       actor_id: actor.actor_id(),
175       private_key: actor.private_key().context(location_info!())?,
176     };
177     if env::var("LEMMY_TEST_SEND_SYNC").is_ok() {
178       do_send(message, &Client::default()).await?;
179     } else {
180       context.activity_queue.queue::<SendActivityTask>(message)?;
181     }
182   }
183
184   Ok(())
185 }
186
187 /// Create new `SendActivityTasks`, which will deliver the given activity to inboxes, as well as
188 /// handling signing and retrying failed deliveres.
189 ///
190 /// The caller of this function needs to remove any blocked domains from `to`,
191 /// using `check_is_apub_id_valid()`.
192 async fn send_activity_internal<T, Kind>(
193   context: &LemmyContext,
194   activity: T,
195   actor: &dyn ActorType,
196   inboxes: Vec<Url>,
197   insert_into_db: bool,
198   sensitive: bool,
199 ) -> Result<(), LemmyError>
200 where
201   T: AsObject<Kind> + Extends<Kind> + Debug,
202   Kind: Serialize,
203   <T as Extends<Kind>>::Error: From<serde_json::Error> + Send + Sync + 'static,
204 {
205   if !Settings::get().federation.enabled || inboxes.is_empty() {
206     return Ok(());
207   }
208
209   // Don't send anything to ourselves
210   let hostname = Settings::get().get_hostname_without_port()?;
211   let inboxes: Vec<&Url> = inboxes
212     .iter()
213     .filter(|i| i.domain().expect("valid inbox url") != hostname)
214     .collect();
215
216   let activity = activity.into_any_base()?;
217   let serialised_activity = serde_json::to_string(&activity)?;
218
219   // This is necessary because send_comment and send_comment_mentions
220   // might send the same ap_id
221   if insert_into_db {
222     let id = activity.id().context(location_info!())?;
223     insert_activity(id, activity.clone(), true, sensitive, context.pool()).await?;
224   }
225
226   for i in inboxes {
227     let message = SendActivityTask {
228       activity: serialised_activity.to_owned(),
229       inbox: i.to_owned(),
230       actor_id: actor.actor_id(),
231       private_key: actor.private_key().context(location_info!())?,
232     };
233     if env::var("LEMMY_TEST_SEND_SYNC").is_ok() {
234       do_send(message, &Client::default()).await?;
235     } else {
236       context.activity_queue.queue::<SendActivityTask>(message)?;
237     }
238   }
239
240   Ok(())
241 }
242
243 #[derive(Clone, Debug, Deserialize, Serialize)]
244 struct SendActivityTask {
245   activity: String,
246   inbox: Url,
247   actor_id: Url,
248   private_key: String,
249 }
250
251 /// Signs the activity with the sending actor's key, and delivers to the given inbox. Also retries
252 /// if the delivery failed.
253 impl ActixJob for SendActivityTask {
254   type State = MyState;
255   type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
256   const NAME: &'static str = "SendActivityTask";
257
258   const MAX_RETRIES: MaxRetries = MaxRetries::Count(10);
259   const BACKOFF: Backoff = Backoff::Exponential(2);
260
261   fn run(self, state: Self::State) -> Self::Future {
262     Box::pin(async move { do_send(self, &state.client).await })
263   }
264 }
265
266 async fn do_send(task: SendActivityTask, client: &Client) -> Result<(), Error> {
267   let mut headers = BTreeMap::<String, String>::new();
268   headers.insert("Content-Type".into(), APUB_JSON_CONTENT_TYPE.to_string());
269   let result = sign_and_send(
270     client,
271     headers,
272     &task.inbox,
273     task.activity.clone(),
274     &task.actor_id,
275     task.private_key.to_owned(),
276   )
277   .await;
278
279   if let Err(e) = result {
280     warn!("{}", e);
281     return Err(anyhow!(
282       "Failed to send activity {} to {}",
283       &task.activity,
284       task.inbox
285     ));
286   }
287   Ok(())
288 }
289
290 pub fn create_activity_queue() -> QueueHandle {
291   // Start the application server. This guards access to to the jobs store
292   let queue_handle = create_server(Storage::new());
293
294   // Configure and start our workers
295   WorkerConfig::new(|| MyState {
296     client: Client::default(),
297   })
298   .register::<SendActivityTask>()
299   .start(queue_handle.clone());
300
301   queue_handle
302 }
303
304 #[derive(Clone)]
305 struct MyState {
306   pub client: Client,
307 }