]> Untitled Git - lemmy.git/blob - crates/api_common/src/send_activity.rs
add enable_federated_downvotes site option
[lemmy.git] / crates / api_common / src / send_activity.rs
1 use crate::{
2   community::BanFromCommunity,
3   context::LemmyContext,
4   person::BanPerson,
5   post::{DeletePost, RemovePost},
6 };
7 use activitypub_federation::config::Data;
8 use futures::future::BoxFuture;
9 use lemmy_db_schema::{
10   newtypes::{CommunityId, DbUrl, PersonId},
11   source::{
12     comment::Comment,
13     community::Community,
14     person::Person,
15     post::Post,
16     private_message::PrivateMessage,
17   },
18 };
19 use lemmy_db_views::structs::PrivateMessageView;
20 use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
21 use once_cell::sync::{Lazy, OnceCell};
22 use tokio::{
23   sync::{
24     mpsc,
25     mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
26     Mutex,
27   },
28   task::JoinHandle,
29 };
30 use url::Url;
31
32 type MatchOutgoingActivitiesBoxed =
33   Box<for<'a> fn(SendActivityData, &'a Data<LemmyContext>) -> BoxFuture<'a, LemmyResult<()>>>;
34
35 /// This static is necessary so that activities can be sent out synchronously for tests.
36 pub static MATCH_OUTGOING_ACTIVITIES: OnceCell<MatchOutgoingActivitiesBoxed> = OnceCell::new();
37
38 #[derive(Debug)]
39 pub enum SendActivityData {
40   CreatePost(Post),
41   UpdatePost(Post),
42   DeletePost(Post, Person, DeletePost),
43   RemovePost(Post, Person, RemovePost),
44   LockPost(Post, Person, bool),
45   FeaturePost(Post, Person, bool),
46   CreateComment(Comment),
47   UpdateComment(Comment),
48   DeleteComment(Comment, Person, Community),
49   RemoveComment(Comment, Person, Community, Option<String>),
50   LikePostOrComment(DbUrl, Person, Community, i16),
51   FollowCommunity(Community, Person, bool),
52   UpdateCommunity(Person, Community),
53   DeleteCommunity(Person, Community, bool),
54   RemoveCommunity(Person, Community, Option<String>, bool),
55   AddModToCommunity(Person, CommunityId, PersonId, bool),
56   BanFromCommunity(Person, CommunityId, Person, BanFromCommunity),
57   BanFromSite(Person, Person, BanPerson),
58   CreatePrivateMessage(PrivateMessageView),
59   UpdatePrivateMessage(PrivateMessageView),
60   DeletePrivateMessage(Person, PrivateMessage, bool),
61   DeleteUser(Person),
62   CreateReport(Url, Person, Community, String),
63 }
64
65 // TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
66 //       ctrl+c still works.
67 static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
68   let (sender, receiver) = mpsc::unbounded_channel();
69   let weak_sender = sender.downgrade();
70   ActivityChannel {
71     weak_sender,
72     receiver: Mutex::new(receiver),
73     keepalive_sender: Mutex::new(Some(sender)),
74   }
75 });
76
77 pub struct ActivityChannel {
78   weak_sender: WeakUnboundedSender<SendActivityData>,
79   receiver: Mutex<UnboundedReceiver<SendActivityData>>,
80   keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
81 }
82
83 impl ActivityChannel {
84   pub async fn retrieve_activity() -> Option<SendActivityData> {
85     let mut lock = ACTIVITY_CHANNEL.receiver.lock().await;
86     lock.recv().await
87   }
88
89   pub async fn submit_activity(
90     data: SendActivityData,
91     context: &Data<LemmyContext>,
92   ) -> LemmyResult<()> {
93     if *SYNCHRONOUS_FEDERATION {
94       MATCH_OUTGOING_ACTIVITIES
95         .get()
96         .expect("retrieve function pointer")(data, context)
97       .await?;
98     }
99     // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
100     // not sure which way is more efficient
101     else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
102       sender.send(data)?;
103     }
104     Ok(())
105   }
106
107   pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
108     ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
109     outgoing_activities_task.await??;
110     Ok(())
111   }
112 }