[[package]]
name = "background-jobs"
-version = "0.9.1"
+version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bb7df0fd6abf9d55139d4c9e569c0a8cd271ec265862c41bd215b46b36c52397"
+checksum = "77f4508c6c5b5cfc6c18d43d0ba6ecda339710206854da9e1c9ac9dfb7e3eb6f"
dependencies = [
"background-jobs-actix",
"background-jobs-core",
[[package]]
name = "background-jobs-actix"
-version = "0.9.6"
+version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38aebb545b0fac45046421993890eb49cc04896a93b85bbfb1b9017decc413f9"
+checksum = "5dabf6a2204fe034db7910a38f8e2d183fe24eb92abd4c0aaca59f8cacf4e48b"
dependencies = [
"actix-rt",
"anyhow",
"async-trait",
"background-jobs-core",
"chrono",
- "log",
"num_cpus",
"serde",
"serde_json",
"thiserror",
"tokio",
+ "tracing",
+ "tracing-futures",
"uuid",
]
[[package]]
name = "background-jobs-core"
-version = "0.9.5"
+version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ee8604ff89c62ca8eefc1ea2c3f359a53b7930e640fb22bf7890eab13b4640d2"
+checksum = "174d36b170699ecc13b7513bda9eff6f12cc889eae5d16b792daa3f7b21be452"
dependencies = [
"actix-rt",
"anyhow",
"async-mutex",
"async-trait",
"chrono",
- "log",
"serde",
"serde_json",
"thiserror",
- "tokio",
+ "tracing",
+ "tracing-futures",
"uuid",
]
captcha = "0.0.8"
anyhow = "1.0.44"
thiserror = "1.0.29"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
reqwest = { version = "0.11.4", features = ["json"] }
async-trait = "0.1.51"
anyhow = "1.0.44"
thiserror = "1.0.29"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
reqwest = { version = "0.11.4", features = ["json"] }
webmention = "0.4.0"
async-trait = "0.1.51"
anyhow = "1.0.44"
thiserror = "1.0.29"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
reqwest = { version = "0.11.4", features = ["json"] }
html2md = "0.2.13"
once_cell = "1.8.0"
person::tests::parse_lemmy_person,
tests::{file_to_json_object, init_context},
};
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{
source::{
community::Community,
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_community_moderators() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await;
let community_id = community.id;
tests::{file_to_json_object, init_context},
};
use assert_json_diff::assert_json_include;
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use serial_test::serial;
async fn prepare_comment_test(
#[actix_rt::test]
#[serial]
pub(crate) async fn test_parse_lemmy_comment() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_comment() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/comment/38741").unwrap();
let data = prepare_comment_test(&url, &context).await;
pub(crate) mod tests {
use super::*;
use crate::objects::tests::{file_to_json_object, init_context};
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::traits::Crud;
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_community() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await;
assert_eq!(community.title, "Ten Forward");
#[cfg(test)]
pub(crate) mod tests {
use actix::Actor;
+ use background_jobs::QueueHandle;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
- use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::{
establish_unpooled_connection,
get_database_url_from_env,
// TODO: would be nice if we didnt have to use a full context for tests.
// or at least write a helper function so this code is shared with main.rs
- pub(crate) fn init_context() -> LemmyContext {
+ pub(crate) fn init_context(activity_queue: QueueHandle) -> LemmyContext {
// call this to run migrations
establish_unpooled_connection();
let settings = Settings::init().unwrap();
.user_agent(build_user_agent(&settings))
.build()
.unwrap();
- let activity_queue = create_activity_queue();
let secret = Secret {
id: 0,
jwt_secret: "".to_string(),
pub(crate) mod tests {
use super::*;
use crate::objects::tests::{file_to_json_object, init_context};
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use lemmy_db_schema::traits::Crud;
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_person() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let person = parse_lemmy_person(&context).await;
assert_eq!(person.display_name, Some("Jean-Luc Picard".to_string()));
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_person() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let json = file_to_json_object("assets/pleroma/objects/person.json");
let url = Url::parse("https://queer.hacktivis.me/users/lanodan").unwrap();
let mut request_counter = 0;
post::ApubPost,
tests::{file_to_json_object, init_context},
};
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use serial_test::serial;
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_post() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let community = parse_lemmy_community(&context).await;
let person = parse_lemmy_person(&context).await;
tests::{file_to_json_object, init_context},
};
use assert_json_diff::assert_json_include;
+ use lemmy_apub_lib::activity_queue::create_activity_queue;
use serial_test::serial;
async fn prepare_comment_test(url: &Url, context: &LemmyContext) -> (ApubPerson, ApubPerson) {
#[actix_rt::test]
#[serial]
async fn test_parse_lemmy_pm() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await;
let json: ChatMessage = file_to_json_object("assets/lemmy/objects/chat_message.json");
#[actix_rt::test]
#[serial]
async fn test_parse_pleroma_pm() {
- let context = init_context();
+ let manager = create_activity_queue();
+ let context = init_context(manager.queue_handle().clone());
let url = Url::parse("https://enterprise.lemmy.ml/private_message/1621").unwrap();
let data = prepare_comment_test(&url, &context).await;
let pleroma_url = Url::parse("https://queer.hacktivis.me/objects/2").unwrap();
actix-web = { version = "4.0.0-beta.9", default-features = false }
http-signature-normalization-actix = { version = "0.5.0-beta.10", default-features = false, features = ["server", "sha-2"] }
http-signature-normalization-reqwest = { version = "0.2.0", default-features = false, features = ["sha-2"] }
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
diesel = "1.4.8"
use crate::{signatures::sign_and_send, traits::ActorType};
use anyhow::{anyhow, Context, Error};
use background_jobs::{
- create_server,
memory_storage::Storage,
ActixJob,
Backoff,
+ Manager,
MaxRetries,
QueueHandle,
WorkerConfig,
if env::var("APUB_TESTING_SEND_SYNC").is_ok() {
do_send(message, client).await?;
} else {
- activity_queue.queue::<SendActivityTask>(message)?;
+ activity_queue.queue::<SendActivityTask>(message).await?;
}
}
Ok(())
}
-pub fn create_activity_queue() -> QueueHandle {
- // Start the application server. This guards access to to the jobs store
- let queue_handle = create_server(Storage::new());
- let arbiter = actix_web::rt::Arbiter::new();
-
+pub fn create_activity_queue() -> Manager {
// Configure and start our workers
- WorkerConfig::new(|| MyState {
+ WorkerConfig::new_managed(Storage::new(), |_| MyState {
client: Client::default(),
})
.register::<SendActivityTask>()
- .start_in_arbiter(&arbiter, queue_handle.clone());
-
- queue_handle
+ .start()
}
#[derive(Clone)]
actix = "0.12.0"
anyhow = "1.0.44"
diesel = "1.4.8"
-background-jobs = "0.9.1"
+background-jobs = "0.11.0"
tokio = "1.12.0"
strum = "0.21.0"
strum_macros = "0.21.1"
.user_agent(build_user_agent(&settings))
.build()?;
- let activity_queue = create_activity_queue();
+ let queue_manager = create_activity_queue();
+
+ let activity_queue = queue_manager.queue_handle().clone();
let chat_server = ChatServer::startup(
pool.clone(),
.run()
.await?;
+ drop(queue_manager);
+
Ok(())
}