1 use crate::context::LemmyContext;
2 use activitypub_federation::config::Data;
3 use futures::future::BoxFuture;
6 source::{comment::Comment, community::Community, person::Person, post::Post},
8 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
9 use once_cell::sync::{Lazy, OnceCell};
13 mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
19 type MatchOutgoingActivitiesBoxed =
20 Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
22 /// This static is necessary so that activities can be sent out synchronously for tests.
23 pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
26 pub enum SendActivityData {
29 CreateComment(Comment),
30 DeleteComment(Comment, Person, Community),
31 RemoveComment(Comment, Person, Community, Option<String>),
32 UpdateComment(Comment),
33 LikePostOrComment(DbUrl, Person, Community, i16),
34 FollowCommunity(Community, Person, bool),
37 // TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
38 // ctrl+c still works.
39 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
40 let (sender, receiver) = mpsc::unbounded_channel();
41 let weak_sender = sender.downgrade();
44 receiver: Mutex::new(receiver),
45 keepalive_sender: Mutex::new(Some(sender)),
49 pub struct ActivityChannel {
50 weak_sender: WeakUnboundedSender<SendActivityData>,
51 receiver: Mutex<UnboundedReceiver<SendActivityData>>,
52 keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
55 impl ActivityChannel {
56 pub async fn retrieve_activity() -> Option<SendActivityData> {
57 let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
61 pub async fn submit_activity(
62 data: SendActivityData,
63 context: &Data<LemmyContext>,
64 ) -> LemmyResult<()> {
65 if *SYNCHRONOUS_FEDERATION {
66 MATCH_OUTGOING_ACTIVITIES
68 .expect("retrieve function pointer")(data, context)
71 // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
72 // not sure which way is more efficient
73 else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
79 pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
80 ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
81 outgoing_activities_task.await??;