]> Untitled Git - lemmy.git/blob - crates/api_common/src/send_activity.rs
8580c21756439381379dd8d04f0d45cb84f91337
[lemmy.git] / crates / api_common / src / send_activity.rs
1 use crate::context::LemmyContext;
2 use activitypub_federation::config::Data;
3 use futures::future::BoxFuture;
4 use lemmy_db_schema::{
5   newtypes::DbUrl,
6   source::{comment::Comment, community::Community, person::Person, post::Post},
7 };
8 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
9 use once_cell::sync::{Lazy, OnceCell};
10 use tokio::{
11   sync::{
12     mpsc,
13     mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
14     Mutex,
15   },
16   task::JoinHandle,
17 };
18
19 type MatchOutgoingActivitiesBoxed =
20   Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
21
22 /// This static is necessary so that activities can be sent out synchronously for tests.
23 pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
24
25 #[derive(Debug)]
26 pub enum SendActivityData {
27   CreatePost(Post),
28   UpdatePost(Post),
29   CreateComment(Comment),
30   DeleteComment(Comment, Person, Community),
31   RemoveComment(Comment, Person, Community, Option<String>),
32   UpdateComment(Comment),
33   LikePostOrComment(DbUrl, Person, Community, i16),
34   FollowCommunity(Community, Person, bool),
35 }
36
37 // TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
38 //       ctrl+c still works.
39 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
40   let (sender, receiver) = mpsc::unbounded_channel();
41   let weak_sender = sender.downgrade();
42   ActivityChannel {
43     weak_sender,
44     receiver: Mutex::new(receiver),
45     keepalive_sender: Mutex::new(Some(sender)),
46   }
47 });
48
49 pub struct ActivityChannel {
50   weak_sender: WeakUnboundedSender<SendActivityData>,
51   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
52   keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
53 }
54
55 impl ActivityChannel {
56   pub async fn retrieve_activity() -> Option<SendActivityData> {
57     let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
58     lock.recv().await
59   }
60
61   pub async fn submit_activity(
62     data: SendActivityData,
63     context: &Data<LemmyContext>,
64   ) -> LemmyResult<()> {
65     if *SYNCHRONOUS_FEDERATION {
66       MATCH_OUTGOING_ACTIVITIES
67         .get()
68         .expect("retrieve function pointer")(data, context)
69       .await?;
70     }
71     // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
72     // not sure which way is more efficient
73     else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
74       sender.send(data)?;
75     }
76     Ok(())
77   }
78
79   pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
80     ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
81     outgoing_activities_task.await??;
82     Ok(())
83   }
84 }