use lemmy_db_schema::source::post::Post;
use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
use once_cell::sync::{Lazy, OnceCell};
-use tokio::sync::{
- mpsc,
- mpsc::{UnboundedReceiver, UnboundedSender},
- Mutex,
+use tokio::{
+ sync::{
+ mpsc,
+ mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
+ Mutex,
+ },
+ task::JoinHandle,
};
type MatchOutgoingActivitiesBoxed =
CreatePost(Post),
}
+// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
+// ctrl+c still works.
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
+ let weak_sender = sender.downgrade();
ActivityChannel {
- sender,
+ weak_sender,
receiver: Mutex::new(receiver),
+ keepalive_sender: Mutex::new(Some(sender)),
}
});
pub struct ActivityChannel {
- sender: UnboundedSender<SendActivityData>,
+ weak_sender: WeakUnboundedSender<SendActivityData>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
+ keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
}
impl ActivityChannel {
.get()
.expect("retrieve function pointer")(data, context)
.await?;
- } else {
- let lock = &ACTIVITY_CHANNEL.sender;
- lock.send(data)?;
}
+ // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
+ // not sure which way is more efficient
+ else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
+ sender.send(data)?;
+ }
+ Ok(())
+ }
+
+ pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
+ ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
+ outgoing_activities_task.await??;
Ok(())
}
}
context::LemmyContext,
lemmy_db_views::structs::SiteView,
request::build_user_agent,
- send_activity::MATCH_OUTGOING_ACTIVITIES,
+ send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
utils::{
check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config,
.await?;
// Wait for outgoing apub sends to complete
- outgoing_activities_task.await??;
+ ActivityChannel::close(outgoing_activities_task).await?;
Ok(())
}