]> Untitled Git - lemmy.git/blob - crates/api_common/src/send_activity.rs
994aea2a745096353ea22a013fc6aa6cdc1fc52f
[lemmy.git] / crates / api_common / src / send_activity.rs
1 use crate::context::LemmyContext;
2 use activitypub_federation::config::Data;
3 use futures::future::BoxFuture;
4 use lemmy_db_schema::source::{comment::Comment, post::Post};
5 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
6 use once_cell::sync::{Lazy, OnceCell};
7 use tokio::{
8   sync::{
9     mpsc,
10     mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
11     Mutex,
12   },
13   task::JoinHandle,
14 };
15
16 type MatchOutgoingActivitiesBoxed =
17   Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
18
19 /// This static is necessary so that activities can be sent out synchronously for tests.
20 pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
21
22 #[derive(Debug)]
23 pub enum SendActivityData {
24   CreatePost(Post),
25   CreateComment(Comment),
26 }
27
28 // TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
29 //       ctrl+c still works.
30 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
31   let (sender, receiver) = mpsc::unbounded_channel();
32   let weak_sender = sender.downgrade();
33   ActivityChannel {
34     weak_sender,
35     receiver: Mutex::new(receiver),
36     keepalive_sender: Mutex::new(Some(sender)),
37   }
38 });
39
40 pub struct ActivityChannel {
41   weak_sender: WeakUnboundedSender<SendActivityData>,
42   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
43   keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
44 }
45
46 impl ActivityChannel {
47   pub async fn retrieve_activity() -> Option<SendActivityData> {
48     let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
49     lock.recv().await
50   }
51
52   pub async fn submit_activity(
53     data: SendActivityData,
54     context: &Data<LemmyContext>,
55   ) -> LemmyResult<()> {
56     if *SYNCHRONOUS_FEDERATION {
57       MATCH_OUTGOING_ACTIVITIES
58         .get()
59         .expect("retrieve function pointer")(data, context)
60       .await?;
61     }
62     // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
63     // not sure which way is more efficient
64     else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
65       sender.send(data)?;
66     }
67     Ok(())
68   }
69
70   pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
71     ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
72     outgoing_activities_task.await??;
73     Ok(())
74   }
75 }