I noticed that stopping the Lemmy process with ctrl+c wasnt working
because the activity channel isnt properly closed. This is now fixed.
Later we should also move the channel from static into LemmyContext,
Im not doing that now to avoid conflicts with #3670.
use lemmy_db_schema::source::post::Post;
use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
use once_cell::sync::{Lazy, OnceCell};
use lemmy_db_schema::source::post::Post;
use lemmy_utils::{error::LemmyResult, SYNCHRONOUS_FEDERATION};
use once_cell::sync::{Lazy, OnceCell};
-use tokio::sync::{
- mpsc,
- mpsc::{UnboundedReceiver, UnboundedSender},
- Mutex,
+use tokio::{
+ sync::{
+ mpsc,
+ mpsc::{UnboundedReceiver, UnboundedSender, WeakUnboundedSender},
+ Mutex,
+ },
+ task::JoinHandle,
};
type MatchOutgoingActivitiesBoxed =
};
type MatchOutgoingActivitiesBoxed =
+// TODO: instead of static, move this into LemmyContext. make sure that stopping the process with
+// ctrl+c still works.
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
static ACTIVITY_CHANNEL: Lazy<ActivityChannel> = Lazy::new(|| {
let (sender, receiver) = mpsc::unbounded_channel();
+ let weak_sender = sender.downgrade();
receiver: Mutex::new(receiver),
receiver: Mutex::new(receiver),
+ keepalive_sender: Mutex::new(Some(sender)),
}
});
pub struct ActivityChannel {
}
});
pub struct ActivityChannel {
- sender: UnboundedSender<SendActivityData>,
+ weak_sender: WeakUnboundedSender<SendActivityData>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
receiver: Mutex<UnboundedReceiver<SendActivityData>>,
+ keepalive_sender: Mutex<Option<UnboundedSender<SendActivityData>>>,
.get()
.expect("retrieve function pointer")(data, context)
.await?;
.get()
.expect("retrieve function pointer")(data, context)
.await?;
- } else {
- let lock = &ACTIVITY_CHANNEL.sender;
- lock.send(data)?;
+ // could do `ACTIVITY_CHANNEL.keepalive_sender.lock()` instead and get rid of weak_sender,
+ // not sure which way is more efficient
+ else if let Some(sender) = ACTIVITY_CHANNEL.weak_sender.upgrade() {
+ sender.send(data)?;
+ }
+ Ok(())
+ }
+
+ pub async fn close(outgoing_activities_task: JoinHandle<LemmyResult<()>>) -> LemmyResult<()> {
+ ACTIVITY_CHANNEL.keepalive_sender.lock().await.take();
+ outgoing_activities_task.await??;
context::LemmyContext,
lemmy_db_views::structs::SiteView,
request::build_user_agent,
context::LemmyContext,
lemmy_db_views::structs::SiteView,
request::build_user_agent,
- send_activity::MATCH_OUTGOING_ACTIVITIES,
+ send_activity::{ActivityChannel, MATCH_OUTGOING_ACTIVITIES},
utils::{
check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config,
utils::{
check_private_instance_and_federation_enabled,
local_site_rate_limit_to_rate_limit_config,
.await?;
// Wait for outgoing apub sends to complete
.await?;
// Wait for outgoing apub sends to complete
- outgoing_activities_task.await??;
+ ActivityChannel::close(outgoing_activities_task).await?;