2 community::BanFromCommunity,
5 post::{DeletePost, RemovePost},
7 use activitypub_federation::config::Data;
8 use futures::future::BoxFuture;
10 newtypes::{CommunityId, DbUrl, PersonId},
16 private_message::PrivateMessage,
19 use lemmy_db_views::structs::PrivateMessageView;
20 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
21 use once_cell::sync::{Lazy, OnceCell};
25 mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
32 type MatchOutgoingActivitiesBoxed =
33 Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
35 /// This static is necessary so that activities can be sent out synchronously for tests.
36 pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
39 pub enum SendActivityData {
42 DeletePost(Post, Person, DeletePost),
43 RemovePost(Post, Person, RemovePost),
44 LockPost(Post, Person, bool),
45 FeaturePost(Post, Person, bool),
46 CreateComment(Comment),
47 UpdateComment(Comment),
48 DeleteComment(Comment, Person, Community),
49 RemoveComment(Comment, Person, Community, Option<String>),
50 LikePostOrComment(DbUrl, Person, Community, i16),
51 FollowCommunity(Community, Person, bool),
52 UpdateCommunity(Person, Community),
53 DeleteCommunity(Person, Community, bool),
54 RemoveCommunity(Person, Community, Option<String>, bool),
55 AddModToCommunity(Person, CommunityId, PersonId, bool),
56 BanFromCommunity(Person, CommunityId, Person, BanFromCommunity),
57 BanFromSite(Person, Person, BanPerson),
58 CreatePrivateMessage(PrivateMessageView),
59 UpdatePrivateMessage(PrivateMessageView),
60 DeletePrivateMessage(Person, PrivateMessage, bool),
62 CreateReport(Url, Person, Community, String),
65 // TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
66 // ctrl+c still works.
67 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
68 let (sender, receiver) = mpsc::unbounded_channel();
69 let weak_sender = sender.downgrade();
72 receiver: Mutex::new(receiver),
73 keepalive_sender: Mutex::new(Some(sender)),
77 pub struct ActivityChannel {
78 weak_sender: WeakUnboundedSender<SendActivityData>,
79 receiver: Mutex<UnboundedReceiver<SendActivityData>>,
80 keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
83 impl ActivityChannel {
84 pub async fn retrieve_activity() -> Option<SendActivityData> {
85 let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
89 pub async fn submit_activity(
90 data: SendActivityData,
91 context: &Data<LemmyContext>,
92 ) -> LemmyResult<()> {
93 if *SYNCHRONOUS_FEDERATION {
94 MATCH_OUTGOING_ACTIVITIES
96 .expect("retrieve function pointer")(data, context)
99 // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
100 // not sure which way is more efficient
101 else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
107 pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
108 ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
109 outgoing_activities_task.await??;