]> Untitled Git - lemmy.git/blob - crates/api_common/src/send_activity.rs
Remove SendActivity and Perform traits, rely on channel (#3596)
[lemmy.git] / crates / api_common / src / send_activity.rs
1 use crate::context::LemmyContext;
2 use activitypub_federation::config::Data;
3 use futures::future::BoxFuture;
4 use lemmy_db_schema::source::post::Post;
5 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
6 use once_cell::sync::{Lazy, OnceCell};
7 use tokio::sync::{
8   mpsc,
9   mpsc::{UnboundedReceiver, UnboundedSender},
10   Mutex,
11 };
12
13 type MatchOutgoingActivitiesBoxed =
14   Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
15
16 /// This static is necessary so that activities can be sent out synchronously for tests.
17 pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
18
19 #[derive(Debug)]
20 pub enum SendActivityData {
21   CreatePost(Post),
22 }
23
24 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
25   let (sender, receiver) = mpsc::unbounded_channel();
26   ActivityChannel {
27     sender,
28     receiver: Mutex::new(receiver),
29   }
30 });
31
32 pub struct ActivityChannel {
33   sender: UnboundedSender<SendActivityData>,
34   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
35 }
36
37 impl ActivityChannel {
38   pub async fn retrieve_activity() -> Option<SendActivityData> {
39     let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
40     lock.recv().await
41   }
42
43   pub async fn submit_activity(
44     data: SendActivityData,
45     context: &Data<LemmyContext>,
46   ) -> LemmyResult<()> {
47     if *SYNCHRONOUS_FEDERATION {
48       MATCH_OUTGOING_ACTIVITIES
49         .get()
50         .expect("retrieve function pointer")(data, context)
51       .await?;
52     } else {
53       let lock = &ACTIVITY_CHANNEL.sender;
54       lock.send(data)?;
55     }
56     Ok(())
57   }
58 }