# use allowlist only for remote communities, and posts/comments in local communities
# (meaning remote communities will show content from arbitrary instances).
strict_allowlist: true
+ # Number of workers for sending outgoing activities.
+ worker_count: 16
}
captcha: {
# Whether captcha is required for signup
},
protocol::tests::file_to_json_object,
};
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{
source::{
community::Community,
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_community_moderators() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let (new_mod, site) = parse_lemmy_person(&context).await;
let community = parse_lemmy_community(&context).await;
let community_id = community.id;
protocol::tests::file_to_json_object,
};
use assert_json_diff::assert_json_include;
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::source::site::Site;
use serial_test::serial;
#[actix_rt::test]
#[serial]
pub(crate) async fn test_parse_lemmy_comment() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_comment() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;
objects::{instance::tests::parse_lemmy_instance, tests::init_context},
protocol::tests::file_to_json_object,
};
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{source::site::Site, traits::Crud};
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_community() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let site = parse_lemmy_instance(&context).await;
let community = parse_lemmy_community(&context).await;
pub(crate) mod tests {
use super::*;
use crate::{objects::tests::init_context, protocol::tests::file_to_json_object};
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::traits::Crud;
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_instance() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let site = parse_lemmy_instance(&context).await;
assert_eq!(site.name, "Enterprise");
#[cfg(test)]
pub(crate) mod tests {
use actix::Actor;
- use background_jobs::QueueHandle;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{
establish_unpooled_connection,
get_database_url_from_env,
// TODO: would be nice if we didnt have to use a full context for tests.
// or at least write a helper function so this code is shared with main.rs
- pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext {
+ pub(crate) fn init_context() -> LemmyContext {
+ let client = reqwest::Client::new().into();
+ // activity queue isnt used in tests, so worker count makes no difference
+ let queue_manager = create_activity_queue(client, 4);
+ let activity_queue = queue_manager.queue_handle().clone();
// call this to run migrations
establish_unpooled_connection();
let settings = Settings::init().unwrap();
},
protocol::{objects::instance::Instance, tests::file_to_json_object},
};
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{source::site::Site, traits::Crud};
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_person() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let (person, site) = parse_lemmy_person(&context).await;
assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_person() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
// create and parse a fake pleroma instance actor, to avoid network request during test
let mut json: Instance = file_to_json_object("assets/lemmy/objects/instance.json").unwrap();
},
protocol::tests::file_to_json_object,
};
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::source::site::Site;
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_post() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let (person, site) = parse_lemmy_person(&context).await;
let community = parse_lemmy_community(&context).await;
protocol::tests::file_to_json_object,
};
use assert_json_diff::assert_json_include;
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::source::site::Site;
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_pm() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await;
let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json").unwrap();
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_pm() {
- let client = reqwest::Client::new().into();
- let manager = create_activity_queue(client);
- let context = init_context(manager.queue_handle().clone());
+ let context = init_context();
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await;
let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();
}
} else {
activity_queue.queue::<SendActivityTask>(message).await?;
+ let stats = activity_queue.get_stats().await?;
+ info!(
+ "Activity queue stats: pending: {}, running: {}, dead (this hour): {}, complete (this hour): {}",
+ stats.pending,
+ stats.running,
+ stats.dead.this_hour(),
+ stats.complete.this_hour()
+ );
}
}
r
}
-pub fn create_activity_queue(client: ClientWithMiddleware) -> Manager {
+pub fn create_activity_queue(client: ClientWithMiddleware, worker_count: u64) -> Manager {
// Configure and start our workers
WorkerConfig::new_managed(Storage::new(), move |_| MyState {
client: client.clone(),
})
.register::<SendActivityTask>()
+ .set_worker_count("default", worker_count)
.start()
}
/// (meaning remote communities will show content from arbitrary instances).
#[default(true)]
pub strict_allowlist: bool,
+ /// Number of workers for sending outgoing activities.
+ #[default(16)]
+ pub worker_count: u64,
}
#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]
let client = ClientBuilder::new(client).with(TracingMiddleware).build();
- let queue_manager = create_activity_queue(client.clone());
+ let queue_manager = create_activity_queue(client.clone(), settings.federation.worker_count);
let activity_queue = queue_manager.queue_handle().clone();