From: Dessalines Date: Thu, 13 Apr 2023 10:53:55 +0000 (-0400) Subject: Making the chat server an actor. (#2793) X-Git-Url: http://these/git/%7B%60%24%7BwebArchiveUrl%7D/%22%7B%7D/%24%7B%60data:application/%22https:/hacktivis.me/%7BpictrsAvatarThumbnail%28this.props.site.site.icon%29%7D?a=commitdiff_plain;h=63f54a3103684fd375428331f730b9a104add3a9;p=lemmy.git Making the chat server an actor. (#2793) * Making the chat server an actor. - Fixes #2778 - #2787 * Forgot to add handlers folder. * Some cleanup. * Forgot to remove a comment. * Address PR comments. * Using ToString for enum operations. --- diff --git a/Cargo.lock b/Cargo.lock index 77424468..b98acdc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,6 +58,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5" dependencies = [ "actix-rt", + "actix_derive", "bitflags", "bytes", "crossbeam-channel", @@ -324,16 +325,14 @@ dependencies = [ ] [[package]] -name = "actix-ws" -version = "0.2.5" +name = "actix_derive" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3" +checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7" dependencies = [ - "actix-codec", - "actix-http", - "actix-web", - "futures-core", - "tokio", + "proc-macro2 1.0.47", + "quote 1.0.21", + "syn 1.0.103", ] [[package]] @@ -2409,9 +2408,9 @@ dependencies = [ name = "lemmy_api_common" version = "0.17.1" dependencies = [ + "actix", "actix-rt", "actix-web", - "actix-ws", "anyhow", "chrono", "encoding", @@ -2461,6 +2460,7 @@ name = "lemmy_apub" version = "0.17.1" dependencies = [ "activitypub_federation", + "actix", "actix-rt", "actix-web", "anyhow", @@ -2589,10 +2589,10 @@ name = "lemmy_server" version = "0.17.1" dependencies = [ "activitypub_federation", + "actix", "actix-rt", "actix-web", "actix-web-actors", - "actix-ws", "clokwerk", "console-subscriber", "diesel", diff --git a/Cargo.toml b/Cargo.toml index 3a97f620..52e1a8be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ rosetta-i18n = "0.1.2" rand = "0.8.5" opentelemetry = { version = "0.17.0", features = ["rt-tokio"] } tracing-opentelemetry = { version = "0.17.2" } -actix-ws = "0.2.0" +actix = "0.13" [dependencies] lemmy_api = { workspace = true } @@ -133,7 +133,7 @@ doku = { workspace = true } reqwest-retry = { workspace = true } serde_json = { workspace = true } futures = { workspace = true } -actix-ws = { workspace = true } +actix = { workspace = true } tracing-opentelemetry = { workspace = true, optional = true } opentelemetry = { workspace = true, optional = true } actix-web-actors = { version = "4.1.0", default-features = false } diff --git a/crates/api/src/comment/like.rs b/crates/api/src/comment/like.rs index fba61ed1..3d216872 100644 --- a/crates/api/src/comment/like.rs +++ b/crates/api/src/comment/like.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ comment::{CommentResponse, CreateCommentLike}, context::LemmyContext, utils::{check_community_ban, check_downvotes_enabled, get_local_user_view_from_jwt}, - websocket::{send::send_comment_ws_message, UserOperation}, + websocket::UserOperation, }; use lemmy_db_schema::{ newtypes::LocalUserId, @@ -78,15 +78,15 @@ impl Perform for CreateCommentLike { .map_err(|e| LemmyError::from_error_message(e, "couldnt_like_comment"))?; } - send_comment_ws_message( - data.comment_id, - UserOperation::CreateCommentLike, - websocket_id, - None, - Some(local_user_view.person.id), - recipient_ids, - context, - ) - .await + context + .send_comment_ws_message( + &UserOperation::CreateCommentLike, + data.comment_id, + websocket_id, + None, + Some(local_user_view.person.id), + recipient_ids, + ) + .await } } diff --git a/crates/api/src/comment_report/create.rs b/crates/api/src/comment_report/create.rs index 9badc31a..fe2471ed 100644 --- a/crates/api/src/comment_report/create.rs +++ b/crates/api/src/comment_report/create.rs @@ -69,15 +69,12 @@ impl Perform for CreateCommentReport { comment_report_view, }; - context - .chat_server() - .send_mod_room_message( - UserOperation::CreateCommentReport, - &res, - comment_view.community.id, - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::CreateCommentReport, + &res, + comment_view.community.id, + websocket_id, + )?; Ok(res) } diff --git a/crates/api/src/comment_report/resolve.rs b/crates/api/src/comment_report/resolve.rs index de4f418d..f5fd6b12 100644 --- a/crates/api/src/comment_report/resolve.rs +++ b/crates/api/src/comment_report/resolve.rs @@ -49,15 +49,12 @@ impl Perform for ResolveCommentReport { comment_report_view, }; - context - .chat_server() - .send_mod_room_message( - UserOperation::ResolveCommentReport, - &res, - report.community.id, - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::ResolveCommentReport, + &res, + report.community.id, + websocket_id, + )?; Ok(res) } diff --git a/crates/api/src/community/add_mod.rs b/crates/api/src/community/add_mod.rs index 28f56cff..7052f7cd 100644 --- a/crates/api/src/community/add_mod.rs +++ b/crates/api/src/community/add_mod.rs @@ -70,15 +70,13 @@ impl Perform for AddModToCommunity { let moderators = CommunityModeratorView::for_community(context.pool(), community_id).await?; let res = AddModToCommunityResponse { moderators }; - context - .chat_server() - .send_community_room_message( - &UserOperation::AddModToCommunity, - &res, - community_id, - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::AddModToCommunity, + &res, + community_id, + websocket_id, + )?; + Ok(res) } } diff --git a/crates/api/src/community/ban.rs b/crates/api/src/community/ban.rs index b2d4260e..d3341340 100644 --- a/crates/api/src/community/ban.rs +++ b/crates/api/src/community/ban.rs @@ -4,7 +4,11 @@ use lemmy_api_common::{ community::{BanFromCommunity, BanFromCommunityResponse}, context::LemmyContext, utils::{get_local_user_view_from_jwt, is_mod_or_admin, remove_user_data_in_community}, - websocket::UserOperation, + websocket::{ + handlers::messages::SendCommunityRoomMessage, + serialize_websocket_message, + UserOperation, + }, }; use lemmy_db_schema::{ source::{ @@ -95,15 +99,13 @@ impl Perform for BanFromCommunity { banned: data.ban, }; - context - .chat_server() - .send_community_room_message( - &UserOperation::BanFromCommunity, - &res, - community_id, - websocket_id, - ) - .await?; + // A custom ban message + let message = serialize_websocket_message(&UserOperation::BanFromCommunity, &res)?; + context.chat_server().do_send(SendCommunityRoomMessage { + community_id, + message, + websocket_id, + }); Ok(res) } diff --git a/crates/api/src/community/hide.rs b/crates/api/src/community/hide.rs index 94ce7d74..8e181157 100644 --- a/crates/api/src/community/hide.rs +++ b/crates/api/src/community/hide.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ community::{CommunityResponse, HideCommunity}, context::LemmyContext, utils::{get_local_user_view_from_jwt, is_admin}, - websocket::{send::send_community_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -50,7 +50,13 @@ impl Perform for HideCommunity { ModHideCommunity::create(context.pool(), &mod_hide_community_form).await?; - let op = UserOperationCrud::EditCommunity; - send_community_ws_message(data.community_id, op, websocket_id, None, context).await + context + .send_community_ws_message( + &UserOperationCrud::EditCommunity, + data.community_id, + websocket_id, + None, + ) + .await } } diff --git a/crates/api/src/local_user/add_admin.rs b/crates/api/src/local_user/add_admin.rs index ca30b939..ad8b67e1 100644 --- a/crates/api/src/local_user/add_admin.rs +++ b/crates/api/src/local_user/add_admin.rs @@ -56,10 +56,7 @@ impl Perform for AddAdmin { let res = AddAdminResponse { admins }; - context - .chat_server() - .send_all_message(UserOperation::AddAdmin, &res, websocket_id) - .await?; + context.send_all_ws_message(&UserOperation::AddAdmin, &res, websocket_id)?; Ok(res) } diff --git a/crates/api/src/local_user/ban_person.rs b/crates/api/src/local_user/ban_person.rs index d2c9789e..55069742 100644 --- a/crates/api/src/local_user/ban_person.rs +++ b/crates/api/src/local_user/ban_person.rs @@ -79,10 +79,7 @@ impl Perform for BanPerson { banned: data.ban, }; - context - .chat_server() - .send_all_message(UserOperation::BanPerson, &res, websocket_id) - .await?; + context.send_all_ws_message(&UserOperation::BanPerson, &res, websocket_id)?; Ok(res) } diff --git a/crates/api/src/local_user/get_captcha.rs b/crates/api/src/local_user/get_captcha.rs index 3671f79c..00ec45a1 100644 --- a/crates/api/src/local_user/get_captcha.rs +++ b/crates/api/src/local_user/get_captcha.rs @@ -5,7 +5,7 @@ use chrono::Duration; use lemmy_api_common::{ context::LemmyContext, person::{CaptchaResponse, GetCaptcha, GetCaptchaResponse}, - websocket::structs::CaptchaItem, + websocket::{handlers::captcha::AddCaptcha, structs::CaptchaItem}, }; use lemmy_db_schema::{source::local_site::LocalSite, utils::naive_now}; use lemmy_utils::{error::LemmyError, ConnectionId}; @@ -47,7 +47,9 @@ impl Perform for GetCaptcha { }; // Stores the captcha item on the queue - context.chat_server().add_captcha(captcha_item)?; + context.chat_server().do_send(AddCaptcha { + captcha: captcha_item, + }); Ok(GetCaptchaResponse { ok: Some(CaptchaResponse { png, wav, uuid }), diff --git a/crates/api/src/post/feature.rs b/crates/api/src/post/feature.rs index 16ba7bfb..e1dd1e3d 100644 --- a/crates/api/src/post/feature.rs +++ b/crates/api/src/post/feature.rs @@ -10,7 +10,7 @@ use lemmy_api_common::{ is_admin, is_mod_or_admin, }, - websocket::{send::send_post_ws_message, UserOperation}, + websocket::UserOperation, }; use lemmy_db_schema::{ source::{ @@ -82,13 +82,13 @@ impl Perform for FeaturePost { ModFeaturePost::create(context.pool(), &form).await?; - send_post_ws_message( - data.post_id, - UserOperation::FeaturePost, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await + context + .send_post_ws_message( + &UserOperation::FeaturePost, + data.post_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await } } diff --git a/crates/api/src/post/like.rs b/crates/api/src/post/like.rs index 59135ff2..53e6c13c 100644 --- a/crates/api/src/post/like.rs +++ b/crates/api/src/post/like.rs @@ -10,7 +10,7 @@ use lemmy_api_common::{ get_local_user_view_from_jwt, mark_post_as_read, }, - websocket::{send::send_post_ws_message, UserOperation}, + websocket::UserOperation, }; use lemmy_db_schema::{ source::{ @@ -69,13 +69,13 @@ impl Perform for CreatePostLike { // Mark the post as read mark_post_as_read(person_id, post_id, context.pool()).await?; - send_post_ws_message( - data.post_id, - UserOperation::CreatePostLike, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await + context + .send_post_ws_message( + &UserOperation::CreatePostLike, + data.post_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await } } diff --git a/crates/api/src/post/lock.rs b/crates/api/src/post/lock.rs index c5fa2a0f..cd7db908 100644 --- a/crates/api/src/post/lock.rs +++ b/crates/api/src/post/lock.rs @@ -9,7 +9,7 @@ use lemmy_api_common::{ get_local_user_view_from_jwt, is_mod_or_admin, }, - websocket::{send::send_post_ws_message, UserOperation}, + websocket::UserOperation, }; use lemmy_db_schema::{ source::{ @@ -71,13 +71,13 @@ impl Perform for LockPost { }; ModLockPost::create(context.pool(), &form).await?; - send_post_ws_message( - data.post_id, - UserOperation::LockPost, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await + context + .send_post_ws_message( + &UserOperation::LockPost, + data.post_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await } } diff --git a/crates/api/src/post_report/create.rs b/crates/api/src/post_report/create.rs index 9ada7b1d..423f8b81 100644 --- a/crates/api/src/post_report/create.rs +++ b/crates/api/src/post_report/create.rs @@ -69,15 +69,12 @@ impl Perform for CreatePostReport { let res = PostReportResponse { post_report_view }; - context - .chat_server() - .send_mod_room_message( - UserOperation::CreatePostReport, - &res, - post_view.community.id, - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::CreatePostReport, + &res, + post_view.community.id, + websocket_id, + )?; Ok(res) } diff --git a/crates/api/src/post_report/resolve.rs b/crates/api/src/post_report/resolve.rs index 615b7f82..0426262d 100644 --- a/crates/api/src/post_report/resolve.rs +++ b/crates/api/src/post_report/resolve.rs @@ -46,15 +46,12 @@ impl Perform for ResolvePostReport { let res = PostReportResponse { post_report_view }; - context - .chat_server() - .send_mod_room_message( - UserOperation::ResolvePostReport, - &res, - report.community.id, - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::ResolvePostReport, + &res, + report.community.id, + websocket_id, + )?; Ok(res) } diff --git a/crates/api/src/private_message/mark_read.rs b/crates/api/src/private_message/mark_read.rs index 9c3c3d8a..ba5cf8ea 100644 --- a/crates/api/src/private_message/mark_read.rs +++ b/crates/api/src/private_message/mark_read.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ context::LemmyContext, private_message::{MarkPrivateMessageAsRead, PrivateMessageResponse}, utils::get_local_user_view_from_jwt, - websocket::{send::send_pm_ws_message, UserOperation}, + websocket::UserOperation, }; use lemmy_db_schema::{ source::private_message::{PrivateMessage, PrivateMessageUpdateForm}, @@ -45,7 +45,12 @@ impl Perform for MarkPrivateMessageAsRead { .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?; // No need to send an apub update - let op = UserOperation::MarkPrivateMessageAsRead; - send_pm_ws_message(data.private_message_id, op, websocket_id, context).await + context + .send_pm_ws_message( + &UserOperation::MarkPrivateMessageAsRead, + data.private_message_id, + websocket_id, + ) + .await } } diff --git a/crates/api/src/private_message_report/create.rs b/crates/api/src/private_message_report/create.rs index 5ca2d6a6..2a132b6f 100644 --- a/crates/api/src/private_message_report/create.rs +++ b/crates/api/src/private_message_report/create.rs @@ -68,15 +68,12 @@ impl Perform for CreatePrivateMessageReport { private_message_report_view, }; - context - .chat_server() - .send_mod_room_message( - UserOperation::CreatePrivateMessageReport, - &res, - CommunityId(0), - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::CreatePrivateMessageReport, + &res, + CommunityId(0), + websocket_id, + )?; // TODO: consider federating this diff --git a/crates/api/src/private_message_report/resolve.rs b/crates/api/src/private_message_report/resolve.rs index a48a458d..9fe014c0 100644 --- a/crates/api/src/private_message_report/resolve.rs +++ b/crates/api/src/private_message_report/resolve.rs @@ -48,15 +48,12 @@ impl Perform for ResolvePrivateMessageReport { private_message_report_view, }; - context - .chat_server() - .send_mod_room_message( - UserOperation::ResolvePrivateMessageReport, - &res, - CommunityId(0), - websocket_id, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::ResolvePrivateMessageReport, + &res, + CommunityId(0), + websocket_id, + )?; Ok(res) } diff --git a/crates/api/src/websocket.rs b/crates/api/src/websocket.rs index 041cb775..522991ae 100644 --- a/crates/api/src/websocket.rs +++ b/crates/api/src/websocket.rs @@ -3,15 +3,18 @@ use actix_web::web::Data; use lemmy_api_common::{ context::LemmyContext, utils::get_local_user_view_from_jwt, - websocket::structs::{ - CommunityJoin, - CommunityJoinResponse, - ModJoin, - ModJoinResponse, - PostJoin, - PostJoinResponse, - UserJoin, - UserJoinResponse, + websocket::{ + handlers::join_rooms::{JoinCommunityRoom, JoinModRoom, JoinPostRoom, JoinUserRoom}, + structs::{ + CommunityJoin, + CommunityJoinResponse, + ModJoin, + ModJoinResponse, + PostJoin, + PostJoinResponse, + UserJoin, + UserJoinResponse, + }, }, }; use lemmy_utils::{error::LemmyError, ConnectionId}; @@ -30,10 +33,11 @@ impl Perform for UserJoin { let local_user_view = get_local_user_view_from_jwt(&data.auth, context.pool(), context.secret()).await?; - if let Some(ws_id) = websocket_id { - context - .chat_server() - .join_user_room(local_user_view.local_user.id, ws_id)?; + if let Some(id) = websocket_id { + context.chat_server().do_send(JoinUserRoom { + user_id: local_user_view.local_user.id, + id, + }); } Ok(UserJoinResponse { joined: true }) @@ -52,10 +56,11 @@ impl Perform for CommunityJoin { ) -> Result { let data: &CommunityJoin = self; - if let Some(ws_id) = websocket_id { - context - .chat_server() - .join_community_room(data.community_id, ws_id)?; + if let Some(id) = websocket_id { + context.chat_server().do_send(JoinCommunityRoom { + community_id: data.community_id, + id, + }); } Ok(CommunityJoinResponse { joined: true }) @@ -74,10 +79,11 @@ impl Perform for ModJoin { ) -> Result { let data: &ModJoin = self; - if let Some(ws_id) = websocket_id { - context - .chat_server() - .join_mod_room(data.community_id, ws_id)?; + if let Some(id) = websocket_id { + context.chat_server().do_send(JoinModRoom { + community_id: data.community_id, + id, + }); } Ok(ModJoinResponse { joined: true }) @@ -96,8 +102,11 @@ impl Perform for PostJoin { ) -> Result { let data: &PostJoin = self; - if let Some(ws_id) = websocket_id { - context.chat_server().join_post_room(data.post_id, ws_id)?; + if let Some(id) = websocket_id { + context.chat_server().do_send(JoinPostRoom { + post_id: data.post_id, + id, + }); } Ok(PostJoinResponse { joined: true }) diff --git a/crates/api_common/Cargo.toml b/crates/api_common/Cargo.toml index 15e8d370..59352ef5 100644 --- a/crates/api_common/Cargo.toml +++ b/crates/api_common/Cargo.toml @@ -40,7 +40,7 @@ serde_json = { workspace = true } anyhow = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } -actix-ws = { workspace = true } +actix = { workspace = true } futures = { workspace = true } uuid = { workspace = true } actix-rt = { workspace = true } diff --git a/crates/api_common/src/context.rs b/crates/api_common/src/context.rs index 4c91c2c5..1301bf36 100644 --- a/crates/api_common/src/context.rs +++ b/crates/api_common/src/context.rs @@ -1,4 +1,5 @@ use crate::websocket::chat_server::ChatServer; +use actix::Addr; use lemmy_db_schema::{source::secret::Secret, utils::DbPool}; use lemmy_utils::{ rate_limit::RateLimitCell, @@ -10,7 +11,7 @@ use std::sync::Arc; #[derive(Clone)] pub struct LemmyContext { pool: DbPool, - chat_server: Arc, + chat_server: Addr, client: Arc, secret: Arc, rate_limit_cell: RateLimitCell, @@ -19,7 +20,7 @@ pub struct LemmyContext { impl LemmyContext { pub fn create( pool: DbPool, - chat_server: Arc, + chat_server: Addr, client: ClientWithMiddleware, secret: Secret, rate_limit_cell: RateLimitCell, @@ -35,7 +36,7 @@ impl LemmyContext { pub fn pool(&self) -> &DbPool { &self.pool } - pub fn chat_server(&self) -> &Arc { + pub fn chat_server(&self) -> &Addr { &self.chat_server } pub fn client(&self) -> &ClientWithMiddleware { diff --git a/crates/api_common/src/websocket/chat_server.rs b/crates/api_common/src/websocket/chat_server.rs index 3c92f238..fa06987c 100644 --- a/crates/api_common/src/websocket/chat_server.rs +++ b/crates/api_common/src/websocket/chat_server.rs @@ -1,30 +1,16 @@ -use crate::{ - comment::CommentResponse, - post::PostResponse, - websocket::{serialize_websocket_message, structs::CaptchaItem, OperationType}, +use crate::websocket::{ + handlers::{SessionInfo, WsMessage}, + structs::CaptchaItem, }; -use actix_ws::Session; -use anyhow::Context as acontext; -use futures::future::join_all; +use actix::{Actor, Context}; use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId}; -use lemmy_utils::{error::LemmyError, location_info, ConnectionId}; +use lemmy_utils::ConnectionId; use rand::{rngs::StdRng, SeedableRng}; -use serde::Serialize; -use std::{ - collections::{HashMap, HashSet}, - sync::{Mutex, MutexGuard}, -}; -use tracing::log::warn; +use std::collections::{HashMap, HashSet}; -/// `ChatServer` manages chat rooms and responsible for coordinating chat -/// session. pub struct ChatServer { - inner: Mutex, -} - -pub struct ChatServerInner { /// A map from generated random ID to session addr - pub sessions: HashMap, + pub sessions: HashMap, /// A map from post_id to set of connectionIDs pub post_rooms: HashMap>, @@ -48,347 +34,44 @@ pub struct ChatServerInner { /// And manages available rooms. Peers send messages to other peers in same /// room through `ChatServer`. impl ChatServer { - pub fn startup() -> ChatServer { + pub fn new() -> ChatServer { ChatServer { - inner: Mutex::new(ChatServerInner { - sessions: Default::default(), - post_rooms: Default::default(), - community_rooms: Default::default(), - mod_rooms: Default::default(), - user_rooms: Default::default(), - rng: StdRng::from_entropy(), - captchas: vec![], - }), - } - } - - pub fn join_community_room( - &self, - community_id: CommunityId, - id: ConnectionId, - ) -> Result<(), LemmyError> { - let mut inner = self.inner()?; - // remove session from all rooms - for sessions in inner.community_rooms.values_mut() { - sessions.remove(&id); - } - - // Also leave all post rooms - // This avoids double messages - for sessions in inner.post_rooms.values_mut() { - sessions.remove(&id); - } - - // If the room doesn't exist yet - if inner.community_rooms.get_mut(&community_id).is_none() { - inner.community_rooms.insert(community_id, HashSet::new()); - } - - inner - .community_rooms - .get_mut(&community_id) - .context(location_info!())? - .insert(id); - Ok(()) - } - - pub fn join_mod_room( - &self, - community_id: CommunityId, - id: ConnectionId, - ) -> Result<(), LemmyError> { - let mut inner = self.inner()?; - // remove session from all rooms - for sessions in inner.mod_rooms.values_mut() { - sessions.remove(&id); - } - - // If the room doesn't exist yet - if inner.mod_rooms.get_mut(&community_id).is_none() { - inner.mod_rooms.insert(community_id, HashSet::new()); - } - - inner - .mod_rooms - .get_mut(&community_id) - .context(location_info!())? - .insert(id); - Ok(()) - } - - pub fn join_post_room(&self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> { - let mut inner = self.inner()?; - // remove session from all rooms - for sessions in inner.post_rooms.values_mut() { - sessions.remove(&id); - } - - // Also leave all communities - // This avoids double messages - // TODO found a bug, whereby community messages like - // delete and remove aren't sent, because - // you left the community room - for sessions in inner.community_rooms.values_mut() { - sessions.remove(&id); - } - - // If the room doesn't exist yet - if inner.post_rooms.get_mut(&post_id).is_none() { - inner.post_rooms.insert(post_id, HashSet::new()); - } - - inner - .post_rooms - .get_mut(&post_id) - .context(location_info!())? - .insert(id); - - Ok(()) - } - - pub fn join_user_room(&self, user_id: LocalUserId, id: ConnectionId) -> Result<(), LemmyError> { - let mut inner = self.inner()?; - // remove session from all rooms - for sessions in inner.user_rooms.values_mut() { - sessions.remove(&id); - } - - // If the room doesn't exist yet - if inner.user_rooms.get_mut(&user_id).is_none() { - inner.user_rooms.insert(user_id, HashSet::new()); - } - - inner - .user_rooms - .get_mut(&user_id) - .context(location_info!())? - .insert(id); - Ok(()) - } - - async fn send_post_room_message( - &self, - op: &OP, - response: &Response, - post_id: PostId, - websocket_id: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - Response: Serialize, - { - let msg = serialize_websocket_message(op, response)?; - let room = self.inner()?.post_rooms.get(&post_id).cloned(); - self.send_message_in_room(&msg, room, websocket_id).await?; - Ok(()) - } - - /// Send message to all users viewing the given community. - pub async fn send_community_room_message( - &self, - op: &OP, - response: &Response, - community_id: CommunityId, - websocket_id: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - Response: Serialize, - { - let msg = serialize_websocket_message(op, response)?; - let room = self.inner()?.community_rooms.get(&community_id).cloned(); - self.send_message_in_room(&msg, room, websocket_id).await?; - Ok(()) - } - - /// Send message to mods of a given community. Set community_id = 0 to send to site admins. - pub async fn send_mod_room_message( - &self, - op: OP, - response: &Response, - community_id: CommunityId, - websocket_id: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - Response: Serialize, - { - let msg = serialize_websocket_message(&op, response)?; - let room = self.inner()?.mod_rooms.get(&community_id).cloned(); - self.send_message_in_room(&msg, room, websocket_id).await?; - Ok(()) - } - - pub async fn send_all_message( - &self, - op: OP, - response: &Response, - exclude_connection: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - Response: Serialize, - { - let msg = &serialize_websocket_message(&op, response)?; - let sessions = self.inner()?.sessions.clone(); - // Note, this will ignore any errors, such as closed connections - join_all( - sessions - .into_iter() - .filter(|(id, _)| Some(id) != exclude_connection.as_ref()) - .map(|(_, mut s): (_, Session)| async move { s.text(msg).await }), - ) - .await; - Ok(()) - } - - pub async fn send_user_room_message( - &self, - op: &OP, - response: &Response, - recipient_id: LocalUserId, - websocket_id: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - Response: Serialize, - { - let msg = serialize_websocket_message(op, response)?; - let room = self.inner()?.user_rooms.get(&recipient_id).cloned(); - self.send_message_in_room(&msg, room, websocket_id).await?; - Ok(()) - } - - pub async fn send_comment( - &self, - user_operation: &OP, - comment: &CommentResponse, - websocket_id: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - { - let mut comment_reply_sent = comment.clone(); - - // Strip out my specific user info - comment_reply_sent.comment_view.my_vote = None; - - // Send it to the post room - let mut comment_post_sent = comment_reply_sent.clone(); - // Remove the recipients here to separate mentions / user messages from post or community comments - comment_post_sent.recipient_ids = Vec::new(); - self - .send_post_room_message( - user_operation, - &comment_post_sent, - comment_post_sent.comment_view.post.id, - websocket_id, - ) - .await?; - - // Send it to the community too - self - .send_community_room_message( - user_operation, - &comment_post_sent, - CommunityId(0), - websocket_id, - ) - .await?; - self - .send_community_room_message( - user_operation, - &comment_post_sent, - comment.comment_view.community.id, - websocket_id, - ) - .await?; - - // Send it to the recipient(s) including the mentioned users - for recipient_id in &comment_reply_sent.recipient_ids { - self - .send_user_room_message( - user_operation, - &comment_reply_sent, - *recipient_id, - websocket_id, - ) - .await?; + sessions: Default::default(), + post_rooms: Default::default(), + community_rooms: Default::default(), + mod_rooms: Default::default(), + user_rooms: Default::default(), + rng: StdRng::from_entropy(), + captchas: vec![], } - - Ok(()) - } - - pub async fn send_post( - &self, - user_operation: &OP, - post_res: &PostResponse, - websocket_id: Option, - ) -> Result<(), LemmyError> - where - OP: OperationType + ToString, - { - let community_id = post_res.post_view.community.id; - - // Don't send my data with it - let mut post_sent = post_res.clone(); - post_sent.post_view.my_vote = None; - - // Send it to /c/all and that community - self - .send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id) - .await?; - self - .send_community_room_message(user_operation, &post_sent, community_id, websocket_id) - .await?; - - // Send it to the post room - self - .send_post_room_message( - user_operation, - &post_sent, - post_res.post_view.post.id, - websocket_id, - ) - .await?; - - Ok(()) } - /// Send websocket message in all sessions which joined a specific room. - /// - /// `message` - The json message body to send - /// `room` - Connection IDs which should receive the message - /// `exclude_connection` - Dont send to user who initiated the api call, as that - /// would result in duplicate notification - async fn send_message_in_room( + pub fn send_message( &self, + connections: &HashSet, message: &str, - room: Option>, exclude_connection: Option, - ) -> Result<(), LemmyError> { - let mut session = self.inner()?.sessions.clone(); - if let Some(room) = room { - // Note, this will ignore any errors, such as closed connections - join_all( - room - .into_iter() - .filter(|c| Some(c) != exclude_connection.as_ref()) - .filter_map(|c| session.remove(&c)) - .map(|mut s: Session| async move { s.text(message).await }), - ) - .await; + ) { + for id in connections + .iter() + .filter(|c| Some(*c) != exclude_connection.as_ref()) + { + if let Some(session) = self.sessions.get(id) { + session.addr.do_send(WsMessage(message.to_owned())); + } } - Ok(()) } +} - pub(in crate::websocket) fn inner(&self) -> Result, LemmyError> { - match self.inner.lock() { - Ok(g) => Ok(g), - Err(e) => { - warn!("Failed to lock chatserver mutex: {}", e); - Err(LemmyError::from_message("Failed to lock chatserver mutex")) - } - } +impl Default for ChatServer { + fn default() -> Self { + Self::new() } } + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context; +} diff --git a/crates/api_common/src/websocket/handlers.rs b/crates/api_common/src/websocket/handlers.rs deleted file mode 100644 index afdbfd59..00000000 --- a/crates/api_common/src/websocket/handlers.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::websocket::{chat_server::ChatServer, structs::CaptchaItem}; -use actix_ws::Session; -use lemmy_db_schema::{ - newtypes::{CommunityId, PostId}, - utils::naive_now, -}; -use lemmy_utils::{error::LemmyError, ConnectionId}; -use rand::Rng; - -impl ChatServer { - /// Handler for Connect message. - /// - /// Register new session and assign unique id to this session - pub fn handle_connect(&self, session: Session) -> Result { - let mut inner = self.inner()?; - // register session with random id - let id = inner.rng.gen::(); - - inner.sessions.insert(id, session); - Ok(id) - } - - /// Handler for Disconnect message. - pub fn handle_disconnect(&self, connection_id: &ConnectionId) -> Result<(), LemmyError> { - let mut inner = self.inner()?; - // Remove connections from sessions and all 3 scopes - if inner.sessions.remove(connection_id).is_some() { - for sessions in inner.user_rooms.values_mut() { - sessions.remove(connection_id); - } - - for sessions in inner.post_rooms.values_mut() { - sessions.remove(connection_id); - } - - for sessions in inner.community_rooms.values_mut() { - sessions.remove(connection_id); - } - } - Ok(()) - } - - pub fn get_users_online(&self) -> Result { - Ok(self.inner()?.sessions.len()) - } - - pub fn get_post_users_online(&self, post_id: PostId) -> Result { - if let Some(users) = self.inner()?.post_rooms.get(&post_id) { - Ok(users.len()) - } else { - Ok(0) - } - } - - pub fn get_community_users_online(&self, community_id: CommunityId) -> Result { - if let Some(users) = self.inner()?.community_rooms.get(&community_id) { - Ok(users.len()) - } else { - Ok(0) - } - } - - pub fn add_captcha(&self, captcha: CaptchaItem) -> Result<(), LemmyError> { - self.inner()?.captchas.push(captcha); - Ok(()) - } - - pub fn check_captcha(&self, uuid: String, answer: String) -> Result { - let mut inner = self.inner()?; - // Remove all the ones that are past the expire time - inner.captchas.retain(|x| x.expires.gt(&naive_now())); - - let check = inner - .captchas - .iter() - .any(|r| r.uuid == uuid && r.answer.to_lowercase() == answer.to_lowercase()); - - // Remove this uuid so it can't be re-checked (Checks only work once) - inner.captchas.retain(|x| x.uuid != uuid); - - Ok(check) - } -} diff --git a/crates/api_common/src/websocket/handlers/captcha.rs b/crates/api_common/src/websocket/handlers/captcha.rs new file mode 100644 index 00000000..e3ff6093 --- /dev/null +++ b/crates/api_common/src/websocket/handlers/captcha.rs @@ -0,0 +1,45 @@ +use crate::websocket::{chat_server::ChatServer, structs::CaptchaItem}; +use actix::{Context, Handler, Message}; +use lemmy_db_schema::utils::naive_now; + +/// Adding a Captcha +#[derive(Message)] +#[rtype(result = "()")] +pub struct AddCaptcha { + pub captcha: CaptchaItem, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: AddCaptcha, _: &mut Context) -> Self::Result { + self.captchas.push(msg.captcha); + } +} + +/// Checking a Captcha +#[derive(Message)] +#[rtype(bool)] +pub struct CheckCaptcha { + pub uuid: String, + pub answer: String, +} + +impl Handler for ChatServer { + type Result = bool; + + fn handle(&mut self, msg: CheckCaptcha, _: &mut Context) -> Self::Result { + // Remove all the ones that are past the expire time + self.captchas.retain(|x| x.expires.gt(&naive_now())); + + let check = self + .captchas + .iter() + .any(|r| r.uuid == msg.uuid && r.answer.to_lowercase() == msg.answer.to_lowercase()); + + // Remove this uuid so it can't be re-checked (Checks only work once) + self.captchas.retain(|x| x.uuid != msg.uuid); + + check + } +} diff --git a/crates/api_common/src/websocket/handlers/connect.rs b/crates/api_common/src/websocket/handlers/connect.rs new file mode 100644 index 00000000..f3e7e3b6 --- /dev/null +++ b/crates/api_common/src/websocket/handlers/connect.rs @@ -0,0 +1,62 @@ +use crate::websocket::{ + chat_server::ChatServer, + handlers::{SessionInfo, WsMessage}, +}; +use actix::{Context, Handler, Message, Recipient}; +use lemmy_utils::ConnectionId; +use rand::Rng; + +/// New chat session is created +#[derive(Message)] +#[rtype(ConnectionId)] +pub struct Connect { + pub addr: Recipient, +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler for ChatServer { + type Result = ConnectionId; + + fn handle(&mut self, msg: Connect, _: &mut Context) -> Self::Result { + // register session with random id + let id = self.rng.gen::(); + let session = SessionInfo { addr: msg.addr }; + self.sessions.insert(id, session); + + // send id back + id + } +} + +/// Session is disconnected +#[derive(Message)] +#[rtype(result = "()")] +pub struct Disconnect { + pub id: ConnectionId, +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context) -> Self::Result { + // remove address + if self.sessions.remove(&msg.id).is_some() { + // remove session from all rooms + for sessions in self.user_rooms.values_mut() { + sessions.remove(&msg.id); + } + for sessions in self.post_rooms.values_mut() { + sessions.remove(&msg.id); + } + for sessions in self.community_rooms.values_mut() { + sessions.remove(&msg.id); + } + for sessions in self.mod_rooms.values_mut() { + sessions.remove(&msg.id); + } + } + } +} diff --git a/crates/api_common/src/websocket/handlers/join_rooms.rs b/crates/api_common/src/websocket/handlers/join_rooms.rs new file mode 100644 index 00000000..10235e3f --- /dev/null +++ b/crates/api_common/src/websocket/handlers/join_rooms.rs @@ -0,0 +1,120 @@ +use crate::websocket::chat_server::ChatServer; +use actix::{Context, Handler, Message}; +use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId}; +use lemmy_utils::ConnectionId; +use std::collections::HashSet; + +/// Joining a Post room +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinPostRoom { + pub post_id: PostId, + pub id: ConnectionId, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinPostRoom, _: &mut Context) -> Self::Result { + // remove session from all rooms + for sessions in self.post_rooms.values_mut() { + sessions.remove(&msg.id); + } + + // Also leave all communities + // This avoids double messages + // TODO found a bug, whereby community messages like + // delete and remove aren't sent, because + // you left the community room + for sessions in self.community_rooms.values_mut() { + sessions.remove(&msg.id); + } + + self + .post_rooms + .entry(msg.post_id) + .or_insert_with(HashSet::new) + .insert(msg.id); + } +} + +/// Joining a Community Room +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinCommunityRoom { + pub community_id: CommunityId, + pub id: ConnectionId, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context) -> Self::Result { + // remove session from all rooms + for sessions in self.community_rooms.values_mut() { + sessions.remove(&msg.id); + } + + // Also leave all post rooms + // This avoids double messages + for sessions in self.post_rooms.values_mut() { + sessions.remove(&msg.id); + } + + self + .community_rooms + .entry(msg.community_id) + .or_insert_with(HashSet::new) + .insert(msg.id); + } +} + +/// Joining a Mod room +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinModRoom { + pub community_id: CommunityId, + pub id: ConnectionId, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinModRoom, _: &mut Context) -> Self::Result { + // remove session from all rooms + for sessions in self.mod_rooms.values_mut() { + sessions.remove(&msg.id); + } + + self + .mod_rooms + .entry(msg.community_id) + .or_insert_with(HashSet::new) + .insert(msg.id); + } +} + +/// Joining a User room +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinUserRoom { + pub user_id: LocalUserId, + pub id: ConnectionId, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinUserRoom, _: &mut Context) -> Self::Result { + // remove session from all rooms + for sessions in self.user_rooms.values_mut() { + sessions.remove(&msg.id); + } + + self + .user_rooms + .entry(msg.user_id) + .or_insert_with(HashSet::new) + .insert(msg.id); + } +} diff --git a/crates/api_common/src/websocket/handlers/messages.rs b/crates/api_common/src/websocket/handlers/messages.rs new file mode 100644 index 00000000..3b49302a --- /dev/null +++ b/crates/api_common/src/websocket/handlers/messages.rs @@ -0,0 +1,130 @@ +use crate::websocket::chat_server::ChatServer; +use actix::{Context, Handler, Message}; +use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId}; +use lemmy_utils::ConnectionId; +use std::collections::HashSet; + +/// Sending a post room message +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendPostRoomMessage { + pub post_id: PostId, + pub message: String, + pub websocket_id: Option, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendPostRoomMessage, _: &mut Context) -> Self::Result { + let room_connections = self.post_rooms.get(&msg.post_id); + if let Some(connections) = room_connections { + self.send_message(connections, &msg.message, msg.websocket_id); + } + } +} + +/// Sending a community room message +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendCommunityRoomMessage { + pub community_id: CommunityId, + pub message: String, + pub websocket_id: Option, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendCommunityRoomMessage, _: &mut Context) -> Self::Result { + let room_connections = self.community_rooms.get(&msg.community_id); + if let Some(connections) = room_connections { + self.send_message(connections, &msg.message, msg.websocket_id); + } + } +} + +/// Sending a mod room message +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendModRoomMessage { + pub community_id: CommunityId, + pub message: String, + pub websocket_id: Option, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendModRoomMessage, _: &mut Context) -> Self::Result { + let room_connections = self.community_rooms.get(&msg.community_id); + if let Some(connections) = room_connections { + self.send_message(connections, &msg.message, msg.websocket_id); + } + } +} + +/// Sending a user room message +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendUserRoomMessage { + pub recipient_id: LocalUserId, + pub message: String, + pub websocket_id: Option, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendUserRoomMessage, _: &mut Context) -> Self::Result { + let room_connections = self.user_rooms.get(&msg.recipient_id); + if let Some(connections) = room_connections { + self.send_message(connections, &msg.message, msg.websocket_id); + } + } +} + +/// Sending a message to every session +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendAllMessage { + pub message: String, + pub websocket_id: Option, +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendAllMessage, _: &mut Context) -> Self::Result { + let connections: HashSet = self.sessions.keys().cloned().collect(); + self.send_message(&connections, &msg.message, msg.websocket_id); + } +} + +///// Send websocket message in all sessions which joined a specific room. +///// +///// `message` - The json message body to send +///// `room` - Connection IDs which should receive the message +///// `exclude_connection` - Dont send to user who initiated the api call, as that +///// would result in duplicate notification +//async fn send_message_in_room( +// &self, +// message: &str, +// room: Option>, +// exclude_connection: Option, +//) -> Result<(), LemmyError> { +// let mut session = self.inner()?.sessions.clone(); +// if let Some(room) = room { +// // Note, this will ignore any errors, such as closed connections +// join_all( +// room +// .into_iter() +// .filter(|c| Some(c) != exclude_connection.as_ref()) +// .filter_map(|c| session.remove(&c)) +// .map(|mut s: Session| async move { s.text(message).await }), +// ) +// .await; +// } +// Ok(()) +//} +//} diff --git a/crates/api_common/src/websocket/handlers/mod.rs b/crates/api_common/src/websocket/handlers/mod.rs new file mode 100644 index 00000000..d989a44e --- /dev/null +++ b/crates/api_common/src/websocket/handlers/mod.rs @@ -0,0 +1,18 @@ +use actix::{Message, Recipient}; + +pub mod captcha; +pub mod connect; +pub mod join_rooms; +pub mod messages; +pub mod online_users; + +/// A string message sent to a websocket session +#[derive(Message)] +#[rtype(result = "()")] +pub struct WsMessage(pub String); + +// TODO move this? +pub struct SessionInfo { + pub addr: Recipient, + // pub ip: IpAddr +} diff --git a/crates/api_common/src/websocket/handlers/online_users.rs b/crates/api_common/src/websocket/handlers/online_users.rs new file mode 100644 index 00000000..2833b68f --- /dev/null +++ b/crates/api_common/src/websocket/handlers/online_users.rs @@ -0,0 +1,55 @@ +use crate::websocket::chat_server::ChatServer; +use actix::{Context, Handler, Message}; +use lemmy_db_schema::newtypes::{CommunityId, PostId}; + +/// Getting the number of online connections +#[derive(Message)] +#[rtype(usize)] +pub struct GetUsersOnline; + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context) -> Self::Result { + self.sessions.len() + } +} + +/// Getting the number of post users online +#[derive(Message)] +#[rtype(usize)] +pub struct GetPostUsersOnline { + pub post_id: PostId, +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context) -> Self::Result { + self + .post_rooms + .get(&msg.post_id) + .map_or(1, std::collections::HashSet::len) + } +} + +/// Getting the number of post users online +#[derive(Message)] +#[rtype(usize)] +pub struct GetCommunityUsersOnline { + pub community_id: CommunityId, +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context) -> Self::Result { + self + .community_rooms + .get(&msg.community_id) + .map_or(1, std::collections::HashSet::len) + } +} diff --git a/crates/api_common/src/websocket/mod.rs b/crates/api_common/src/websocket/mod.rs index 21c53385..2726b7d4 100644 --- a/crates/api_common/src/websocket/mod.rs +++ b/crates/api_common/src/websocket/mod.rs @@ -1,3 +1,4 @@ +use actix::{Message, Recipient}; use lemmy_utils::error::LemmyError; use serde::Serialize; @@ -6,13 +7,22 @@ pub mod handlers; pub mod send; pub mod structs; +/// A string message sent to a websocket session +#[derive(Message)] +#[rtype(result = "()")] +pub struct WsMessage(pub String); + +pub struct SessionInfo { + pub addr: Recipient, +} + #[derive(Serialize)] struct WebsocketResponse { op: String, data: T, } -pub fn serialize_websocket_message( +pub fn serialize_websocket_message( op: &OP, data: &Response, ) -> Result @@ -133,11 +143,3 @@ pub enum UserOperationApub { Search, ResolveObject, } - -pub trait OperationType {} - -impl OperationType for UserOperationCrud {} - -impl OperationType for UserOperation {} - -impl OperationType for UserOperationApub {} diff --git a/crates/api_common/src/websocket/send.rs b/crates/api_common/src/websocket/send.rs index 0e8a78a2..e983b85c 100644 --- a/crates/api_common/src/websocket/send.rs +++ b/crates/api_common/src/websocket/send.rs @@ -1,3 +1,13 @@ +use super::{ + handlers::messages::{ + SendAllMessage, + SendCommunityRoomMessage, + SendModRoomMessage, + SendPostRoomMessage, + SendUserRoomMessage, + }, + serialize_websocket_message, +}; use crate::{ comment::CommentResponse, community::CommunityResponse, @@ -5,7 +15,6 @@ use crate::{ post::PostResponse, private_message::PrivateMessageResponse, utils::{check_person_block, get_interface_language, send_email_to_user}, - websocket::OperationType, }; use lemmy_db_schema::{ newtypes::{CommentId, CommunityId, LocalUserId, PersonId, PostId, PrivateMessageId}, @@ -23,253 +32,374 @@ use lemmy_db_schema::{ use lemmy_db_views::structs::{CommentView, LocalUserView, PostView, PrivateMessageView}; use lemmy_db_views_actor::structs::CommunityView; use lemmy_utils::{error::LemmyError, utils::mention::MentionData, ConnectionId}; +use serde::Serialize; + +impl LemmyContext { + #[tracing::instrument(skip_all)] + pub async fn send_post_ws_message( + &self, + op: &OP, + post_id: PostId, + websocket_id: Option, + person_id: Option, + ) -> Result + where + OP: ToString, + { + let post_view = PostView::read(self.pool(), post_id, person_id, Some(true)).await?; + + let res = PostResponse { post_view }; + + // Send it to the post room + // Don't send my data with it + let mut post_sent = res.clone(); + post_sent.post_view.my_vote = None; + let message = serialize_websocket_message(op, &post_sent)?; + + self.chat_server().do_send(SendPostRoomMessage { + post_id, + message: message.clone(), + websocket_id, + }); + + // Send it to /c/all and that community + self.chat_server().do_send(SendCommunityRoomMessage { + community_id: CommunityId(0), + message: message.clone(), + websocket_id, + }); + + self.chat_server().do_send(SendCommunityRoomMessage { + community_id: post_sent.post_view.community.id, + message, + websocket_id, + }); + + Ok(res) + } -#[tracing::instrument(skip_all)] -pub async fn send_post_ws_message( - post_id: PostId, - op: OP, - websocket_id: Option, - person_id: Option, - context: &LemmyContext, -) -> Result { - let post_view = PostView::read(context.pool(), post_id, person_id, Some(true)).await?; - - let res = PostResponse { post_view }; - - context - .chat_server() - .send_post(&op, &res, websocket_id) - .await?; - - Ok(res) -} - -// TODO: in many call sites in apub crate, we are setting an empty vec for recipient_ids, -// we should get the actual recipient actors from somewhere -#[tracing::instrument(skip_all)] -pub async fn send_comment_ws_message_simple( - comment_id: CommentId, - op: OP, - context: &LemmyContext, -) -> Result { - send_comment_ws_message(comment_id, op, None, None, None, vec![], context).await -} - -#[tracing::instrument(skip_all)] -pub async fn send_comment_ws_message( - comment_id: CommentId, - op: OP, - websocket_id: Option, - form_id: Option, - person_id: Option, - recipient_ids: Vec, - context: &LemmyContext, -) -> Result { - let view = CommentView::read(context.pool(), comment_id, person_id).await?; - - let mut res = CommentResponse { - comment_view: view, - recipient_ids, - // The sent out form id should be null - form_id: None, - }; - - context - .chat_server() - .send_comment(&op, &res, websocket_id) - .await?; - - // The recipient_ids should be empty for returns - res.recipient_ids = Vec::new(); - res.form_id = form_id; - - Ok(res) -} + // TODO: in many call sites in apub crate, we are setting an empty vec for recipient_ids, + // we should get the actual recipient actors from somewhere + #[tracing::instrument(skip_all)] + pub async fn send_comment_ws_message_simple( + &self, + op: &OP, + comment_id: CommentId, + ) -> Result + where + OP: ToString, + { + self + .send_comment_ws_message(op, comment_id, None, None, None, vec![]) + .await + } -#[tracing::instrument(skip_all)] -pub async fn send_community_ws_message( - community_id: CommunityId, - op: OP, - websocket_id: Option, - person_id: Option, - context: &LemmyContext, -) -> Result { - let community_view = - CommunityView::read(context.pool(), community_id, person_id, Some(true)).await?; - let discussion_languages = CommunityLanguage::read(context.pool(), community_id).await?; - - let mut res = CommunityResponse { - community_view, - discussion_languages, - }; - - // Strip out the person id and subscribed when sending to others - res.community_view.subscribed = SubscribedType::NotSubscribed; - - context - .chat_server() - .send_community_room_message(&op, &res, res.community_view.community.id, websocket_id) - .await?; - - Ok(res) -} + #[tracing::instrument(skip_all)] + pub async fn send_comment_ws_message( + &self, + op: &OP, + comment_id: CommentId, + websocket_id: Option, + form_id: Option, + person_id: Option, + recipient_ids: Vec, + ) -> Result + where + OP: ToString, + { + let view = CommentView::read(self.pool(), comment_id, person_id).await?; + + let mut res = CommentResponse { + comment_view: view, + recipient_ids, + form_id, + }; + + // Strip out my specific user info + let mut sent_recipient_comment = res.clone(); + sent_recipient_comment.form_id = None; + sent_recipient_comment.comment_view.my_vote = None; + let recipient_message = serialize_websocket_message(op, &sent_recipient_comment)?; + + // Send it to the recipient(s) including the mentioned users + for recipient_id in &sent_recipient_comment.recipient_ids { + self.chat_server().do_send(SendUserRoomMessage { + recipient_id: *recipient_id, + message: recipient_message.clone(), + websocket_id, + }); + } -#[tracing::instrument(skip_all)] -pub async fn send_pm_ws_message( - private_message_id: PrivateMessageId, - op: OP, - websocket_id: Option, - context: &LemmyContext, -) -> Result { - let view = PrivateMessageView::read(context.pool(), private_message_id).await?; - - let res = PrivateMessageResponse { - private_message_view: view, - }; - - // Send notifications to the local recipient, if one exists - if res.private_message_view.recipient.local { - let recipient_id = res.private_message_view.recipient.id; - let local_recipient = LocalUserView::read_person(context.pool(), recipient_id).await?; - - context - .chat_server() - .send_user_room_message(&op, &res, local_recipient.local_user.id, websocket_id) - .await?; + // Remove the recipients here to separate mentions / user messages from post or community comments + let mut sent_post_comment = sent_recipient_comment; + sent_post_comment.recipient_ids = Vec::new(); + let post_message = serialize_websocket_message(op, &sent_post_comment)?; + + // Send it to the post room + self.chat_server().do_send(SendPostRoomMessage { + post_id: sent_post_comment.comment_view.post.id, + message: post_message.clone(), + websocket_id, + }); + + // Send it to the community too + self.chat_server().do_send(SendCommunityRoomMessage { + community_id: sent_post_comment.comment_view.community.id, + message: post_message, + websocket_id, + }); + // TODO should I send it to all? Seems excessive + // self + // .send_community_room_message( + // user_operation, + // &comment_post_sent, + // CommunityId(0), + // websocket_id, + // ) + // .await?; + + // No need to return recipients + res.recipient_ids = Vec::new(); + + Ok(res) } - Ok(res) -} - -#[tracing::instrument(skip_all)] -pub async fn send_local_notifs( - mentions: Vec, - comment: &Comment, - person: &Person, - post: &Post, - do_send_email: bool, - context: &LemmyContext, -) -> Result, LemmyError> { - let mut recipient_ids = Vec::new(); - let inbox_link = format!("{}/inbox", context.settings().get_protocol_and_hostname()); - - // Send the local mentions - for mention in mentions - .iter() - .filter(|m| m.is_local(&context.settings().hostname) && m.name.ne(&person.name)) - .collect::>() + #[tracing::instrument(skip_all)] + pub async fn send_community_ws_message( + &self, + op: &OP, + community_id: CommunityId, + websocket_id: Option, + person_id: Option, + ) -> Result + where + OP: ToString, { - let mention_name = mention.name.clone(); - let user_view = LocalUserView::read_from_name(context.pool(), &mention_name).await; - if let Ok(mention_user_view) = user_view { - // TODO - // At some point, make it so you can't tag the parent creator either - // This can cause two notifications, one for reply and the other for mention - recipient_ids.push(mention_user_view.local_user.id); - - let user_mention_form = PersonMentionInsertForm { - recipient_id: mention_user_view.person.id, - comment_id: comment.id, - read: None, - }; - - // Allow this to fail softly, since comment edits might re-update or replace it - // Let the uniqueness handle this fail - PersonMention::create(context.pool(), &user_mention_form) - .await - .ok(); - - // Send an email to those local users that have notifications on - if do_send_email { - let lang = get_interface_language(&mention_user_view); - send_email_to_user( - &mention_user_view, - &lang.notification_mentioned_by_subject(&person.name), - &lang.notification_mentioned_by_body(&comment.content, &inbox_link, &person.name), - context.settings(), - ) - } - } + let community_view = + CommunityView::read(self.pool(), community_id, person_id, Some(true)).await?; + let discussion_languages = CommunityLanguage::read(self.pool(), community_id).await?; + + let mut res = CommunityResponse { + community_view, + discussion_languages, + }; + + // Strip out the person id and subscribed when sending to others + res.community_view.subscribed = SubscribedType::NotSubscribed; + let message = serialize_websocket_message(op, &res)?; + + self.chat_server().do_send(SendCommunityRoomMessage { + community_id: res.community_view.community.id, + message, + websocket_id, + }); + + Ok(res) } - // Send comment_reply to the parent commenter / poster - if let Some(parent_comment_id) = comment.parent_comment_id() { - let parent_comment = Comment::read(context.pool(), parent_comment_id).await?; + #[tracing::instrument(skip_all)] + pub async fn send_pm_ws_message( + &self, + op: &OP, + private_message_id: PrivateMessageId, + websocket_id: Option, + ) -> Result + where + OP: ToString, + { + let view = PrivateMessageView::read(self.pool(), private_message_id).await?; - // Get the parent commenter local_user - let parent_creator_id = parent_comment.creator_id; + let res = PrivateMessageResponse { + private_message_view: view, + }; - // Only add to recipients if that person isn't blocked - let creator_blocked = check_person_block(person.id, parent_creator_id, context.pool()) - .await - .is_err(); + // Send notifications to the local recipient, if one exists + if res.private_message_view.recipient.local { + let recipient_id = res.private_message_view.recipient.id; + let local_recipient = LocalUserView::read_person(self.pool(), recipient_id).await?; - // Don't send a notif to yourself - if parent_comment.creator_id != person.id && !creator_blocked { - let user_view = LocalUserView::read_person(context.pool(), parent_creator_id).await; - if let Ok(parent_user_view) = user_view { - recipient_ids.push(parent_user_view.local_user.id); + let message = serialize_websocket_message(op, &res)?; - let comment_reply_form = CommentReplyInsertForm { - recipient_id: parent_user_view.person.id, + self.chat_server().do_send(SendUserRoomMessage { + recipient_id: local_recipient.local_user.id, + message, + websocket_id, + }); + } + + Ok(res) + } + + #[tracing::instrument(skip_all)] + pub async fn send_local_notifs( + &self, + mentions: Vec, + comment: &Comment, + person: &Person, + post: &Post, + do_send_email: bool, + ) -> Result, LemmyError> { + let mut recipient_ids = Vec::new(); + let inbox_link = format!("{}/inbox", self.settings().get_protocol_and_hostname()); + + // Send the local mentions + for mention in mentions + .iter() + .filter(|m| m.is_local(&self.settings().hostname) && m.name.ne(&person.name)) + .collect::>() + { + let mention_name = mention.name.clone(); + let user_view = LocalUserView::read_from_name(self.pool(), &mention_name).await; + if let Ok(mention_user_view) = user_view { + // TODO + // At some point, make it so you can't tag the parent creator either + // This can cause two notifications, one for reply and the other for mention + recipient_ids.push(mention_user_view.local_user.id); + + let user_mention_form = PersonMentionInsertForm { + recipient_id: mention_user_view.person.id, comment_id: comment.id, read: None, }; // Allow this to fail softly, since comment edits might re-update or replace it // Let the uniqueness handle this fail - CommentReply::create(context.pool(), &comment_reply_form) + PersonMention::create(self.pool(), &user_mention_form) .await .ok(); + // Send an email to those local users that have notifications on if do_send_email { - let lang = get_interface_language(&parent_user_view); + let lang = get_interface_language(&mention_user_view); send_email_to_user( - &parent_user_view, - &lang.notification_comment_reply_subject(&person.name), - &lang.notification_comment_reply_body(&comment.content, &inbox_link, &person.name), - context.settings(), + &mention_user_view, + &lang.notification_mentioned_by_subject(&person.name), + &lang.notification_mentioned_by_body(&comment.content, &inbox_link, &person.name), + self.settings(), ) } } } - } else { - // If there's no parent, its the post creator - // Only add to recipients if that person isn't blocked - let creator_blocked = check_person_block(person.id, post.creator_id, context.pool()) - .await - .is_err(); - if post.creator_id != person.id && !creator_blocked { - let creator_id = post.creator_id; - let parent_user = LocalUserView::read_person(context.pool(), creator_id).await; - if let Ok(parent_user_view) = parent_user { - recipient_ids.push(parent_user_view.local_user.id); + // Send comment_reply to the parent commenter / poster + if let Some(parent_comment_id) = comment.parent_comment_id() { + let parent_comment = Comment::read(self.pool(), parent_comment_id).await?; - let comment_reply_form = CommentReplyInsertForm { - recipient_id: parent_user_view.person.id, - comment_id: comment.id, - read: None, - }; + // Get the parent commenter local_user + let parent_creator_id = parent_comment.creator_id; - // Allow this to fail softly, since comment edits might re-update or replace it - // Let the uniqueness handle this fail - CommentReply::create(context.pool(), &comment_reply_form) - .await - .ok(); - - if do_send_email { - let lang = get_interface_language(&parent_user_view); - send_email_to_user( - &parent_user_view, - &lang.notification_post_reply_subject(&person.name), - &lang.notification_post_reply_body(&comment.content, &inbox_link, &person.name), - context.settings(), - ) + // Only add to recipients if that person isn't blocked + let creator_blocked = check_person_block(person.id, parent_creator_id, self.pool()) + .await + .is_err(); + + // Don't send a notif to yourself + if parent_comment.creator_id != person.id && !creator_blocked { + let user_view = LocalUserView::read_person(self.pool(), parent_creator_id).await; + if let Ok(parent_user_view) = user_view { + recipient_ids.push(parent_user_view.local_user.id); + + let comment_reply_form = CommentReplyInsertForm { + recipient_id: parent_user_view.person.id, + comment_id: comment.id, + read: None, + }; + + // Allow this to fail softly, since comment edits might re-update or replace it + // Let the uniqueness handle this fail + CommentReply::create(self.pool(), &comment_reply_form) + .await + .ok(); + + if do_send_email { + let lang = get_interface_language(&parent_user_view); + send_email_to_user( + &parent_user_view, + &lang.notification_comment_reply_subject(&person.name), + &lang.notification_comment_reply_body(&comment.content, &inbox_link, &person.name), + self.settings(), + ) + } + } + } + } else { + // If there's no parent, its the post creator + // Only add to recipients if that person isn't blocked + let creator_blocked = check_person_block(person.id, post.creator_id, self.pool()) + .await + .is_err(); + + if post.creator_id != person.id && !creator_blocked { + let creator_id = post.creator_id; + let parent_user = LocalUserView::read_person(self.pool(), creator_id).await; + if let Ok(parent_user_view) = parent_user { + recipient_ids.push(parent_user_view.local_user.id); + + let comment_reply_form = CommentReplyInsertForm { + recipient_id: parent_user_view.person.id, + comment_id: comment.id, + read: None, + }; + + // Allow this to fail softly, since comment edits might re-update or replace it + // Let the uniqueness handle this fail + CommentReply::create(self.pool(), &comment_reply_form) + .await + .ok(); + + if do_send_email { + let lang = get_interface_language(&parent_user_view); + send_email_to_user( + &parent_user_view, + &lang.notification_post_reply_subject(&person.name), + &lang.notification_post_reply_body(&comment.content, &inbox_link, &person.name), + self.settings(), + ) + } } } } + + Ok(recipient_ids) + } + + #[tracing::instrument(skip_all)] + pub fn send_all_ws_message( + &self, + op: &OP, + data: Data, + websocket_id: Option, + ) -> Result<(), LemmyError> + where + Data: Serialize, + OP: ToString, + { + let message = serialize_websocket_message(op, &data)?; + self.chat_server().do_send(SendAllMessage { + message, + websocket_id, + }); + Ok(()) } - Ok(recipient_ids) + #[tracing::instrument(skip_all)] + pub fn send_mod_ws_message( + &self, + op: &OP, + data: Data, + community_id: CommunityId, + websocket_id: Option, + ) -> Result<(), LemmyError> + where + Data: Serialize, + OP: ToString, + { + let message = serialize_websocket_message(op, &data)?; + self.chat_server().do_send(SendModRoomMessage { + community_id, + message, + websocket_id, + }); + Ok(()) + } } diff --git a/crates/api_crud/src/comment/create.rs b/crates/api_crud/src/comment/create.rs index d9f4f07c..70997378 100644 --- a/crates/api_crud/src/comment/create.rs +++ b/crates/api_crud/src/comment/create.rs @@ -13,10 +13,7 @@ use lemmy_api_common::{ local_site_to_slur_regex, EndpointType, }, - websocket::{ - send::{send_comment_ws_message, send_local_notifs}, - UserOperationCrud, - }, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -131,15 +128,15 @@ impl PerformCrud for CreateComment { // Scan the comment for user mentions, add those rows let post_id = post.id; let mentions = scrape_text_for_mentions(&content_slurs_removed); - let recipient_ids = send_local_notifs( - mentions, - &updated_comment, - &local_user_view.person, - &post, - true, - context, - ) - .await?; + let recipient_ids = context + .send_local_notifs( + mentions, + &updated_comment, + &local_user_view.person, + &post, + true, + ) + .await?; // You like your own comment by default let like_form = CommentLikeForm { @@ -182,15 +179,15 @@ impl PerformCrud for CreateComment { } } - send_comment_ws_message( - inserted_comment.id, - UserOperationCrud::CreateComment, - websocket_id, - data.form_id.clone(), - Some(local_user_view.person.id), - recipient_ids, - context, - ) - .await + context + .send_comment_ws_message( + &UserOperationCrud::CreateComment, + inserted_comment.id, + websocket_id, + data.form_id.clone(), + Some(local_user_view.person.id), + recipient_ids, + ) + .await } } diff --git a/crates/api_crud/src/comment/delete.rs b/crates/api_crud/src/comment/delete.rs index 211d0477..a96b4cc7 100644 --- a/crates/api_crud/src/comment/delete.rs +++ b/crates/api_crud/src/comment/delete.rs @@ -4,10 +4,7 @@ use lemmy_api_common::{ comment::{CommentResponse, DeleteComment}, context::LemmyContext, utils::{check_community_ban, get_local_user_view_from_jwt}, - websocket::{ - send::{send_comment_ws_message, send_local_notifs}, - UserOperationCrud, - }, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -65,26 +62,26 @@ impl PerformCrud for DeleteComment { let post_id = updated_comment.post_id; let post = Post::read(context.pool(), post_id).await?; - let recipient_ids = send_local_notifs( - vec![], - &updated_comment, - &local_user_view.person, - &post, - false, - context, - ) - .await?; + let recipient_ids = context + .send_local_notifs( + vec![], + &updated_comment, + &local_user_view.person, + &post, + false, + ) + .await?; - let res = send_comment_ws_message( - data.comment_id, - UserOperationCrud::DeleteComment, - websocket_id, - None, // TODO a comment delete might clear forms? - Some(local_user_view.person.id), - recipient_ids, - context, - ) - .await?; + let res = context + .send_comment_ws_message( + &UserOperationCrud::DeleteComment, + data.comment_id, + websocket_id, + None, + Some(local_user_view.person.id), + recipient_ids, + ) + .await?; Ok(res) } diff --git a/crates/api_crud/src/comment/remove.rs b/crates/api_crud/src/comment/remove.rs index 1032f54c..add01aa3 100644 --- a/crates/api_crud/src/comment/remove.rs +++ b/crates/api_crud/src/comment/remove.rs @@ -4,10 +4,7 @@ use lemmy_api_common::{ comment::{CommentResponse, RemoveComment}, context::LemmyContext, utils::{check_community_ban, get_local_user_view_from_jwt, is_mod_or_admin}, - websocket::{ - send::{send_comment_ws_message, send_local_notifs}, - UserOperationCrud, - }, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -73,26 +70,26 @@ impl PerformCrud for RemoveComment { let post_id = updated_comment.post_id; let post = Post::read(context.pool(), post_id).await?; - let recipient_ids = send_local_notifs( - vec![], - &updated_comment, - &local_user_view.person.clone(), - &post, - false, - context, - ) - .await?; + let recipient_ids = context + .send_local_notifs( + vec![], + &updated_comment, + &local_user_view.person.clone(), + &post, + false, + ) + .await?; - let res = send_comment_ws_message( - data.comment_id, - UserOperationCrud::RemoveComment, - websocket_id, - None, // TODO maybe this might clear other forms - Some(local_user_view.person.id), - recipient_ids, - context, - ) - .await?; + let res = context + .send_comment_ws_message( + &UserOperationCrud::RemoveComment, + data.comment_id, + websocket_id, + None, // TODO maybe this might clear other forms + Some(local_user_view.person.id), + recipient_ids, + ) + .await?; Ok(res) } diff --git a/crates/api_crud/src/comment/update.rs b/crates/api_crud/src/comment/update.rs index 06536330..1d5f1c5a 100644 --- a/crates/api_crud/src/comment/update.rs +++ b/crates/api_crud/src/comment/update.rs @@ -4,10 +4,7 @@ use lemmy_api_common::{ comment::{CommentResponse, EditComment}, context::LemmyContext, utils::{check_community_ban, get_local_user_view_from_jwt, local_site_to_slur_regex}, - websocket::{ - send::{send_comment_ws_message, send_local_notifs}, - UserOperationCrud, - }, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -81,25 +78,25 @@ impl PerformCrud for EditComment { // Do the mentions / recipients let updated_comment_content = updated_comment.content.clone(); let mentions = scrape_text_for_mentions(&updated_comment_content); - let recipient_ids = send_local_notifs( - mentions, - &updated_comment, - &local_user_view.person, - &orig_comment.post, - false, - context, - ) - .await?; + let recipient_ids = context + .send_local_notifs( + mentions, + &updated_comment, + &local_user_view.person, + &orig_comment.post, + false, + ) + .await?; - send_comment_ws_message( - data.comment_id, - UserOperationCrud::EditComment, - websocket_id, - data.form_id.clone(), - None, - recipient_ids, - context, - ) - .await + context + .send_comment_ws_message( + &UserOperationCrud::EditComment, + data.comment_id, + websocket_id, + data.form_id.clone(), + None, + recipient_ids, + ) + .await } } diff --git a/crates/api_crud/src/community/delete.rs b/crates/api_crud/src/community/delete.rs index 4720d110..7ce685d8 100644 --- a/crates/api_crud/src/community/delete.rs +++ b/crates/api_crud/src/community/delete.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ community::{CommunityResponse, DeleteCommunity}, context::LemmyContext, utils::{get_local_user_view_from_jwt, is_top_mod}, - websocket::{send::send_community_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::community::{Community, CommunityUpdateForm}, @@ -48,14 +48,14 @@ impl PerformCrud for DeleteCommunity { .await .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_community"))?; - let res = send_community_ws_message( - data.community_id, - UserOperationCrud::DeleteCommunity, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await?; + let res = context + .send_community_ws_message( + &UserOperationCrud::DeleteCommunity, + data.community_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await?; Ok(res) } diff --git a/crates/api_crud/src/community/remove.rs b/crates/api_crud/src/community/remove.rs index 80e23bf7..f7914762 100644 --- a/crates/api_crud/src/community/remove.rs +++ b/crates/api_crud/src/community/remove.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ community::{CommunityResponse, RemoveCommunity}, context::LemmyContext, utils::{get_local_user_view_from_jwt, is_admin}, - websocket::{send::send_community_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -56,14 +56,14 @@ impl PerformCrud for RemoveCommunity { }; ModRemoveCommunity::create(context.pool(), &form).await?; - let res = send_community_ws_message( - data.community_id, - UserOperationCrud::RemoveCommunity, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await?; + let res = context + .send_community_ws_message( + &UserOperationCrud::RemoveCommunity, + data.community_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await?; Ok(res) } diff --git a/crates/api_crud/src/community/update.rs b/crates/api_crud/src/community/update.rs index e3d1acfa..9628615b 100644 --- a/crates/api_crud/src/community/update.rs +++ b/crates/api_crud/src/community/update.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ community::{CommunityResponse, EditCommunity}, context::LemmyContext, utils::{get_local_user_view_from_jwt, local_site_to_slur_regex}, - websocket::{send::send_community_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ newtypes::PersonId, @@ -78,7 +78,13 @@ impl PerformCrud for EditCommunity { .await .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_community"))?; - let op = UserOperationCrud::EditCommunity; - send_community_ws_message(data.community_id, op, websocket_id, None, context).await + context + .send_community_ws_message( + &UserOperationCrud::EditCommunity, + data.community_id, + websocket_id, + None, + ) + .await } } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index 65559250..2ceade21 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -14,7 +14,7 @@ use lemmy_api_common::{ mark_post_as_read, EndpointType, }, - websocket::{send::send_post_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ impls::actor_language::default_post_language, @@ -173,13 +173,13 @@ impl PerformCrud for CreatePost { } } - send_post_ws_message( - inserted_post.id, - UserOperationCrud::CreatePost, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await + context + .send_post_ws_message( + &UserOperationCrud::CreatePost, + inserted_post.id, + websocket_id, + Some(local_user_view.person.id), + ) + .await } } diff --git a/crates/api_crud/src/post/delete.rs b/crates/api_crud/src/post/delete.rs index 57e5adb1..fae5b009 100644 --- a/crates/api_crud/src/post/delete.rs +++ b/crates/api_crud/src/post/delete.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ context::LemmyContext, post::{DeletePost, PostResponse}, utils::{check_community_ban, check_community_deleted_or_removed, get_local_user_view_from_jwt}, - websocket::{send::send_post_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::post::{Post, PostUpdateForm}, @@ -57,14 +57,14 @@ impl PerformCrud for DeletePost { ) .await?; - let res = send_post_ws_message( - data.post_id, - UserOperationCrud::DeletePost, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await?; + let res = context + .send_post_ws_message( + &UserOperationCrud::DeletePost, + data.post_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await?; Ok(res) } diff --git a/crates/api_crud/src/post/read.rs b/crates/api_crud/src/post/read.rs index a6612cbb..0dfb67c4 100644 --- a/crates/api_crud/src/post/read.rs +++ b/crates/api_crud/src/post/read.rs @@ -9,6 +9,7 @@ use lemmy_api_common::{ is_mod_or_admin_opt, mark_post_as_read, }, + websocket::handlers::online_users::GetPostUsersOnline, }; use lemmy_db_schema::{ aggregates::structs::{PersonPostAggregates, PersonPostAggregatesForm}, @@ -95,7 +96,10 @@ impl PerformCrud for GetPost { let moderators = CommunityModeratorView::for_community(context.pool(), community_id).await?; - let online = context.chat_server().get_post_users_online(post_id)?; + let online = context + .chat_server() + .send(GetPostUsersOnline { post_id }) + .await?; // Return the jwt Ok(GetPostResponse { diff --git a/crates/api_crud/src/post/remove.rs b/crates/api_crud/src/post/remove.rs index 66af2ca9..b53a468e 100644 --- a/crates/api_crud/src/post/remove.rs +++ b/crates/api_crud/src/post/remove.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ context::LemmyContext, post::{PostResponse, RemovePost}, utils::{check_community_ban, get_local_user_view_from_jwt, is_mod_or_admin}, - websocket::{send::send_post_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -66,14 +66,14 @@ impl PerformCrud for RemovePost { }; ModRemovePost::create(context.pool(), &form).await?; - let res = send_post_ws_message( - data.post_id, - UserOperationCrud::RemovePost, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await?; + let res = context + .send_post_ws_message( + &UserOperationCrud::RemovePost, + data.post_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await?; Ok(res) } diff --git a/crates/api_crud/src/post/update.rs b/crates/api_crud/src/post/update.rs index 4e43770d..27cfabe5 100644 --- a/crates/api_crud/src/post/update.rs +++ b/crates/api_crud/src/post/update.rs @@ -5,7 +5,7 @@ use lemmy_api_common::{ post::{EditPost, PostResponse}, request::fetch_site_data, utils::{check_community_ban, get_local_user_view_from_jwt, local_site_to_slur_regex}, - websocket::{send::send_post_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -113,13 +113,13 @@ impl PerformCrud for EditPost { return Err(LemmyError::from_error_message(e, err_type)); } - send_post_ws_message( - data.post_id, - UserOperationCrud::EditPost, - websocket_id, - Some(local_user_view.person.id), - context, - ) - .await + context + .send_post_ws_message( + &UserOperationCrud::EditPost, + data.post_id, + websocket_id, + Some(local_user_view.person.id), + ) + .await } } diff --git a/crates/api_crud/src/private_message/create.rs b/crates/api_crud/src/private_message/create.rs index 91556c9d..28c243f6 100644 --- a/crates/api_crud/src/private_message/create.rs +++ b/crates/api_crud/src/private_message/create.rs @@ -12,7 +12,7 @@ use lemmy_api_common::{ send_email_to_user, EndpointType, }, - websocket::{send::send_pm_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -80,13 +80,13 @@ impl PerformCrud for CreatePrivateMessage { .await .map_err(|e| LemmyError::from_error_message(e, "couldnt_create_private_message"))?; - let res = send_pm_ws_message( - inserted_private_message.id, - UserOperationCrud::CreatePrivateMessage, - websocket_id, - context, - ) - .await?; + let res = context + .send_pm_ws_message( + &UserOperationCrud::CreatePrivateMessage, + inserted_private_message.id, + websocket_id, + ) + .await?; // Send email to the local recipient, if one exists if res.private_message_view.recipient.local { diff --git a/crates/api_crud/src/private_message/delete.rs b/crates/api_crud/src/private_message/delete.rs index 4f58065a..e6a643a7 100644 --- a/crates/api_crud/src/private_message/delete.rs +++ b/crates/api_crud/src/private_message/delete.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ context::LemmyContext, private_message::{DeletePrivateMessage, PrivateMessageResponse}, utils::get_local_user_view_from_jwt, - websocket::{send::send_pm_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::private_message::{PrivateMessage, PrivateMessageUpdateForm}, @@ -46,7 +46,12 @@ impl PerformCrud for DeletePrivateMessage { .await .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?; - let op = UserOperationCrud::DeletePrivateMessage; - send_pm_ws_message(data.private_message_id, op, websocket_id, context).await + context + .send_pm_ws_message( + &UserOperationCrud::DeletePrivateMessage, + data.private_message_id, + websocket_id, + ) + .await } } diff --git a/crates/api_crud/src/private_message/update.rs b/crates/api_crud/src/private_message/update.rs index 19da0cf5..06bd349f 100644 --- a/crates/api_crud/src/private_message/update.rs +++ b/crates/api_crud/src/private_message/update.rs @@ -4,7 +4,7 @@ use lemmy_api_common::{ context::LemmyContext, private_message::{EditPrivateMessage, PrivateMessageResponse}, utils::{get_local_user_view_from_jwt, local_site_to_slur_regex}, - websocket::{send::send_pm_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -52,7 +52,12 @@ impl PerformCrud for EditPrivateMessage { .await .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?; - let op = UserOperationCrud::EditPrivateMessage; - send_pm_ws_message(data.private_message_id, op, websocket_id, context).await + context + .send_pm_ws_message( + &UserOperationCrud::EditPrivateMessage, + data.private_message_id, + websocket_id, + ) + .await } } diff --git a/crates/api_crud/src/site/read.rs b/crates/api_crud/src/site/read.rs index 0687fdf6..2330d0f7 100644 --- a/crates/api_crud/src/site/read.rs +++ b/crates/api_crud/src/site/read.rs @@ -4,6 +4,7 @@ use lemmy_api_common::{ context::LemmyContext, site::{GetSite, GetSiteResponse, MyUserInfo}, utils::{build_federated_instances, get_local_user_settings_view_from_jwt_opt}, + websocket::handlers::online_users::GetUsersOnline, }; use lemmy_db_schema::source::{ actor_language::{LocalUserLanguage, SiteLanguage}, @@ -36,7 +37,7 @@ impl PerformCrud for GetSite { let admins = PersonView::admins(context.pool()).await?; - let online = context.chat_server().get_users_online()?; + let online = context.chat_server().send(GetUsersOnline).await?; // Build the local user let my_user = if let Some(local_user_view) = get_local_user_settings_view_from_jwt_opt( diff --git a/crates/api_crud/src/site/update.rs b/crates/api_crud/src/site/update.rs index 36a09628..3922fdab 100644 --- a/crates/api_crud/src/site/update.rs +++ b/crates/api_crud/src/site/update.rs @@ -192,10 +192,7 @@ impl PerformCrud for EditSite { let res = SiteResponse { site_view }; - context - .chat_server() - .send_all_message(UserOperationCrud::EditSite, &res, websocket_id) - .await?; + context.send_all_ws_message(&UserOperationCrud::EditSite, &res, websocket_id)?; Ok(res) } diff --git a/crates/api_crud/src/user/create.rs b/crates/api_crud/src/user/create.rs index bf48b5f5..0554da05 100644 --- a/crates/api_crud/src/user/create.rs +++ b/crates/api_crud/src/user/create.rs @@ -15,6 +15,7 @@ use lemmy_api_common::{ send_verification_email, EndpointType, }, + websocket::handlers::captcha::CheckCaptcha, }; use lemmy_db_schema::{ aggregates::structs::PersonAggregates, @@ -78,10 +79,13 @@ impl PerformCrud for Register { // If the site is set up, check the captcha if local_site.site_setup && local_site.captcha_enabled { - let check = context.chat_server().check_captcha( - data.captcha_uuid.clone().unwrap_or_default(), - data.captcha_answer.clone().unwrap_or_default(), - )?; + let check = context + .chat_server() + .send(CheckCaptcha { + uuid: data.captcha_uuid.clone().unwrap_or_default(), + answer: data.captcha_answer.clone().unwrap_or_default(), + }) + .await?; if !check { return Err(LemmyError::from_message("captcha_incorrect")); } diff --git a/crates/apub/Cargo.toml b/crates/apub/Cargo.toml index e50915fa..8d586059 100644 --- a/crates/apub/Cargo.toml +++ b/crates/apub/Cargo.toml @@ -26,6 +26,7 @@ serde_json = { workspace = true } serde = { workspace = true } actix-web = { workspace = true } actix-rt = { workspace = true } +actix = { workspace = true } tracing = { workspace = true } strum_macros = { workspace = true } url = { workspace = true } diff --git a/crates/apub/src/activities/community/report.rs b/crates/apub/src/activities/community/report.rs index a5fab8a0..9830be13 100644 --- a/crates/apub/src/activities/community/report.rs +++ b/crates/apub/src/activities/community/report.rs @@ -143,15 +143,12 @@ impl ActivityHandler for Report { let post_report_view = PostReportView::read(context.pool(), report.id, actor.id).await?; - context - .chat_server() - .send_mod_room_message( - UserOperation::CreateCommentReport, - &PostReportResponse { post_report_view }, - post.community_id, - None, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::CreateCommentReport, + &PostReportResponse { post_report_view }, + post.community_id, + None, + )?; } PostOrComment::Comment(comment) => { let report_form = CommentReportForm { @@ -167,17 +164,14 @@ impl ActivityHandler for Report { CommentReportView::read(context.pool(), report.id, actor.id).await?; let community_id = comment_report_view.community.id; - context - .chat_server() - .send_mod_room_message( - UserOperation::CreateCommentReport, - &CommentReportResponse { - comment_report_view, - }, - community_id, - None, - ) - .await?; + context.send_mod_ws_message( + &UserOperation::CreateCommentReport, + &CommentReportResponse { + comment_report_view, + }, + community_id, + None, + )?; } }; Ok(()) diff --git a/crates/apub/src/activities/community/update.rs b/crates/apub/src/activities/community/update.rs index f0eaa09c..85581e2a 100644 --- a/crates/apub/src/activities/community/update.rs +++ b/crates/apub/src/activities/community/update.rs @@ -21,7 +21,7 @@ use lemmy_api_common::{ community::{CommunityResponse, EditCommunity, HideCommunity}, context::LemmyContext, utils::get_local_user_view_from_jwt, - websocket::{send::send_community_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{source::community::Community, traits::Crud}; use lemmy_utils::error::LemmyError; @@ -102,14 +102,14 @@ impl ActivityHandler for UpdateCommunity { let updated_community = Community::update(context.pool(), community.id, &community_update_form).await?; - send_community_ws_message( - updated_community.id, - UserOperationCrud::EditCommunity, - None, - None, - context, - ) - .await?; + context + .send_community_ws_message( + &UserOperationCrud::EditCommunity, + updated_community.id, + None, + None, + ) + .await?; Ok(()) } } diff --git a/crates/apub/src/activities/create_or_update/comment.rs b/crates/apub/src/activities/create_or_update/comment.rs index 55d5e3f0..d07a8e66 100644 --- a/crates/apub/src/activities/create_or_update/comment.rs +++ b/crates/apub/src/activities/create_or_update/comment.rs @@ -28,7 +28,7 @@ use lemmy_api_common::{ comment::{CommentResponse, CreateComment, EditComment}, context::LemmyContext, utils::{check_post_deleted_or_removed, is_mod_or_admin}, - websocket::{send::send_comment_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ newtypes::PersonId, @@ -199,10 +199,9 @@ impl ActivityHandler for CreateOrUpdateNote { CreateOrUpdateType::Create => UserOperationCrud::CreateComment, CreateOrUpdateType::Update => UserOperationCrud::EditComment, }; - send_comment_ws_message( - comment.id, notif_type, None, None, None, recipients, context, - ) - .await?; + context + .send_comment_ws_message(¬if_type, comment.id, None, None, None, recipients) + .await?; Ok(()) } } diff --git a/crates/apub/src/activities/create_or_update/mod.rs b/crates/apub/src/activities/create_or_update/mod.rs index b1fcfb6e..9e4fa51b 100644 --- a/crates/apub/src/activities/create_or_update/mod.rs +++ b/crates/apub/src/activities/create_or_update/mod.rs @@ -1,6 +1,6 @@ use crate::objects::person::ApubPerson; use activitypub_federation::{config::Data, fetch::object_id::ObjectId}; -use lemmy_api_common::{context::LemmyContext, websocket::send::send_local_notifs}; +use lemmy_api_common::context::LemmyContext; use lemmy_db_schema::{ newtypes::LocalUserId, source::{comment::Comment, post::Post}, @@ -29,5 +29,7 @@ async fn get_comment_notif_recipients( // anyway. // TODO: for compatibility with other projects, it would be much better to read this from cc or tags let mentions = scrape_text_for_mentions(&comment.content); - send_local_notifs(mentions, comment, &actor, &post, do_send_email, context).await + context + .send_local_notifs(mentions, comment, &actor, &post, do_send_email) + .await } diff --git a/crates/apub/src/activities/create_or_update/post.rs b/crates/apub/src/activities/create_or_update/post.rs index 169db5ea..771e3c3c 100644 --- a/crates/apub/src/activities/create_or_update/post.rs +++ b/crates/apub/src/activities/create_or_update/post.rs @@ -25,7 +25,7 @@ use activitypub_federation::{ use lemmy_api_common::{ context::LemmyContext, post::{CreatePost, EditPost, PostResponse}, - websocket::{send::send_post_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ newtypes::PersonId, @@ -196,7 +196,9 @@ impl ActivityHandler for CreateOrUpdatePage { CreateOrUpdateType::Create => UserOperationCrud::CreatePost, CreateOrUpdateType::Update => UserOperationCrud::EditPost, }; - send_post_ws_message(post.id, notif_type, None, None, context).await?; + context + .send_post_ws_message(¬if_type, post.id, None, None) + .await?; Ok(()) } } diff --git a/crates/apub/src/activities/create_or_update/private_message.rs b/crates/apub/src/activities/create_or_update/private_message.rs index cacfc1a8..85051344 100644 --- a/crates/apub/src/activities/create_or_update/private_message.rs +++ b/crates/apub/src/activities/create_or_update/private_message.rs @@ -16,7 +16,7 @@ use activitypub_federation::{ use lemmy_api_common::{ context::LemmyContext, private_message::{CreatePrivateMessage, EditPrivateMessage, PrivateMessageResponse}, - websocket::{send::send_pm_ws_message, UserOperationCrud}, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ newtypes::PersonId, @@ -124,7 +124,9 @@ impl ActivityHandler for CreateOrUpdateChatMessage { CreateOrUpdateType::Create => UserOperationCrud::CreatePrivateMessage, CreateOrUpdateType::Update => UserOperationCrud::EditPrivateMessage, }; - send_pm_ws_message(private_message.id, notif_type, None, context).await?; + context + .send_pm_ws_message(¬if_type, private_message.id, None) + .await?; Ok(()) } diff --git a/crates/apub/src/activities/deletion/delete.rs b/crates/apub/src/activities/deletion/delete.rs index 337ba9e8..1bb3b6c4 100644 --- a/crates/apub/src/activities/deletion/delete.rs +++ b/crates/apub/src/activities/deletion/delete.rs @@ -8,13 +8,7 @@ use crate::{ protocol::{activities::deletion::delete::Delete, IdOrNestedObject}, }; use activitypub_federation::{config::Data, kinds::activity::DeleteType, traits::ActivityHandler}; -use lemmy_api_common::{ - context::LemmyContext, - websocket::{ - send::{send_comment_ws_message_simple, send_community_ws_message, send_post_ws_message}, - UserOperationCrud, - }, -}; +use lemmy_api_common::{context::LemmyContext, websocket::UserOperationCrud}; use lemmy_db_schema::{ source::{ comment::{Comment, CommentUpdateForm}, @@ -134,7 +128,9 @@ pub(in crate::activities) async fn receive_remove_action( ) .await?; - send_community_ws_message(deleted_community.id, RemoveCommunity, None, None, context).await?; + context + .send_community_ws_message(&RemoveCommunity, deleted_community.id, None, None) + .await?; } DeletableObjects::Post(post) => { let form = ModRemovePostForm { @@ -151,7 +147,9 @@ pub(in crate::activities) async fn receive_remove_action( ) .await?; - send_post_ws_message(removed_post.id, RemovePost, None, None, context).await?; + context + .send_post_ws_message(&RemovePost, removed_post.id, None, None) + .await?; } DeletableObjects::Comment(comment) => { let form = ModRemoveCommentForm { @@ -168,7 +166,9 @@ pub(in crate::activities) async fn receive_remove_action( ) .await?; - send_comment_ws_message_simple(removed_comment.id, RemoveComment, context).await?; + context + .send_comment_ws_message_simple(&RemoveComment, removed_comment.id) + .await?; } DeletableObjects::PrivateMessage(_) => unimplemented!(), } diff --git a/crates/apub/src/activities/deletion/mod.rs b/crates/apub/src/activities/deletion/mod.rs index 7f901d59..690b1d3a 100644 --- a/crates/apub/src/activities/deletion/mod.rs +++ b/crates/apub/src/activities/deletion/mod.rs @@ -35,15 +35,7 @@ use lemmy_api_common::{ post::{DeletePost, PostResponse, RemovePost}, private_message::{DeletePrivateMessage, PrivateMessageResponse}, utils::get_local_user_view_from_jwt, - websocket::{ - send::{ - send_comment_ws_message_simple, - send_community_ws_message, - send_pm_ws_message, - send_post_ws_message, - }, - UserOperationCrud, - }, + websocket::UserOperationCrud, }; use lemmy_db_schema::{ source::{ @@ -410,14 +402,14 @@ async fn receive_delete_action( .build(), ) .await?; - send_community_ws_message( - community.id, - UserOperationCrud::DeleteCommunity, - None, - None, - context, - ) - .await?; + context + .send_community_ws_message( + &UserOperationCrud::DeleteCommunity, + community.id, + None, + None, + ) + .await?; } DeletableObjects::Post(post) => { if deleted != post.deleted { @@ -427,14 +419,9 @@ async fn receive_delete_action( &PostUpdateForm::builder().deleted(Some(deleted)).build(), ) .await?; - send_post_ws_message( - deleted_post.id, - UserOperationCrud::DeletePost, - None, - None, - context, - ) - .await?; + context + .send_post_ws_message(&UserOperationCrud::DeletePost, deleted_post.id, None, None) + .await?; } } DeletableObjects::Comment(comment) => { @@ -445,12 +432,9 @@ async fn receive_delete_action( &CommentUpdateForm::builder().deleted(Some(deleted)).build(), ) .await?; - send_comment_ws_message_simple( - deleted_comment.id, - UserOperationCrud::DeleteComment, - context, - ) - .await?; + context + .send_comment_ws_message_simple(&UserOperationCrud::DeleteComment, deleted_comment.id) + .await?; } } DeletableObjects::PrivateMessage(pm) => { @@ -463,13 +447,13 @@ async fn receive_delete_action( ) .await?; - send_pm_ws_message( - deleted_private_message.id, - UserOperationCrud::DeletePrivateMessage, - None, - context, - ) - .await?; + context + .send_pm_ws_message( + &UserOperationCrud::DeletePrivateMessage, + deleted_private_message.id, + None, + ) + .await?; } } Ok(()) diff --git a/crates/apub/src/activities/deletion/undo_delete.rs b/crates/apub/src/activities/deletion/undo_delete.rs index d1401838..13a32cab 100644 --- a/crates/apub/src/activities/deletion/undo_delete.rs +++ b/crates/apub/src/activities/deletion/undo_delete.rs @@ -8,13 +8,7 @@ use crate::{ protocol::activities::deletion::{delete::Delete, undo_delete::UndoDelete}, }; use activitypub_federation::{config::Data, kinds::activity::UndoType, traits::ActivityHandler}; -use lemmy_api_common::{ - context::LemmyContext, - websocket::{ - send::{send_comment_ws_message_simple, send_community_ws_message, send_post_ws_message}, - UserOperationCrud, - }, -}; +use lemmy_api_common::{context::LemmyContext, websocket::UserOperationCrud}; use lemmy_db_schema::{ source::{ comment::{Comment, CommentUpdateForm}, @@ -125,7 +119,9 @@ impl UndoDelete { &CommunityUpdateForm::builder().removed(Some(false)).build(), ) .await?; - send_community_ws_message(deleted_community.id, EditCommunity, None, None, context).await?; + context + .send_community_ws_message(&EditCommunity, deleted_community.id, None, None) + .await?; } DeletableObjects::Post(post) => { let form = ModRemovePostForm { @@ -141,7 +137,9 @@ impl UndoDelete { &PostUpdateForm::builder().removed(Some(false)).build(), ) .await?; - send_post_ws_message(removed_post.id, EditPost, None, None, context).await?; + context + .send_post_ws_message(&EditPost, removed_post.id, None, None) + .await?; } DeletableObjects::Comment(comment) => { let form = ModRemoveCommentForm { @@ -157,7 +155,9 @@ impl UndoDelete { &CommentUpdateForm::builder().removed(Some(false)).build(), ) .await?; - send_comment_ws_message_simple(removed_comment.id, EditComment, context).await?; + context + .send_comment_ws_message_simple(&EditComment, removed_comment.id) + .await?; } DeletableObjects::PrivateMessage(_) => unimplemented!(), } diff --git a/crates/apub/src/activities/following/accept.rs b/crates/apub/src/activities/following/accept.rs index 250fbf54..c57ec0b5 100644 --- a/crates/apub/src/activities/following/accept.rs +++ b/crates/apub/src/activities/following/accept.rs @@ -12,7 +12,11 @@ use activitypub_federation::{ use lemmy_api_common::{ community::CommunityResponse, context::LemmyContext, - websocket::UserOperation, + websocket::{ + handlers::messages::SendUserRoomMessage, + serialize_websocket_message, + UserOperation, + }, }; use lemmy_db_schema::{ source::{actor_language::CommunityLanguage, community::CommunityFollower}, @@ -85,20 +89,18 @@ impl ActivityHandler for AcceptFollow { .id; let discussion_languages = CommunityLanguage::read(context.pool(), community_id).await?; - let response = CommunityResponse { + let res = CommunityResponse { community_view, discussion_languages, }; - context - .chat_server() - .send_user_room_message( - &UserOperation::FollowCommunity, - &response, - local_recipient_id, - None, - ) - .await?; + let message = serialize_websocket_message(&UserOperation::FollowCommunity, &res)?; + + context.chat_server().do_send(SendUserRoomMessage { + recipient_id: local_recipient_id, + message, + websocket_id: None, + }); Ok(()) } diff --git a/crates/apub/src/activities/voting/mod.rs b/crates/apub/src/activities/voting/mod.rs index 5ab31da5..89832148 100644 --- a/crates/apub/src/activities/voting/mod.rs +++ b/crates/apub/src/activities/voting/mod.rs @@ -16,10 +16,7 @@ use lemmy_api_common::{ post::{CreatePostLike, PostResponse}, sensitive::Sensitive, utils::get_local_user_view_from_jwt, - websocket::{ - send::{send_comment_ws_message_simple, send_post_ws_message}, - UserOperation, - }, + websocket::UserOperation, }; use lemmy_db_schema::{ newtypes::CommunityId, @@ -125,7 +122,9 @@ async fn vote_comment( CommentLike::remove(context.pool(), person_id, comment_id).await?; CommentLike::like(context.pool(), &like_form).await?; - send_comment_ws_message_simple(comment_id, UserOperation::CreateCommentLike, context).await?; + context + .send_comment_ws_message_simple(&UserOperation::CreateCommentLike, comment_id) + .await?; Ok(()) } @@ -146,7 +145,9 @@ async fn vote_post( PostLike::remove(context.pool(), person_id, post_id).await?; PostLike::like(context.pool(), &like_form).await?; - send_post_ws_message(post.id, UserOperation::CreatePostLike, None, None, context).await?; + context + .send_post_ws_message(&UserOperation::CreatePostLike, post.id, None, None) + .await?; Ok(()) } @@ -160,7 +161,9 @@ async fn undo_vote_comment( let person_id = actor.id; CommentLike::remove(context.pool(), person_id, comment_id).await?; - send_comment_ws_message_simple(comment_id, UserOperation::CreateCommentLike, context).await?; + context + .send_comment_ws_message_simple(&UserOperation::CreateCommentLike, comment_id) + .await?; Ok(()) } @@ -174,6 +177,8 @@ async fn undo_vote_post( let person_id = actor.id; PostLike::remove(context.pool(), person_id, post_id).await?; - send_post_ws_message(post_id, UserOperation::CreatePostLike, None, None, context).await?; + context + .send_post_ws_message(&UserOperation::CreatePostLike, post_id, None, None) + .await?; Ok(()) } diff --git a/crates/apub/src/api/read_community.rs b/crates/apub/src/api/read_community.rs index 5547fcff..3dc56a94 100644 --- a/crates/apub/src/api/read_community.rs +++ b/crates/apub/src/api/read_community.rs @@ -8,6 +8,7 @@ use lemmy_api_common::{ community::{GetCommunity, GetCommunityResponse}, context::LemmyContext, utils::{check_private_instance, get_local_user_view_from_jwt_opt, is_mod_or_admin_opt}, + websocket::handlers::online_users::GetCommunityUsersOnline, }; use lemmy_db_schema::{ impls::actor_language::default_post_language, @@ -76,7 +77,8 @@ impl PerformApub for GetCommunity { let online = context .chat_server() - .get_community_users_online(community_id)?; + .send(GetCommunityUsersOnline { community_id }) + .await?; let site_id = Site::instance_actor_id_from_url(community_view.community.actor_id.clone().into()); diff --git a/crates/apub/src/objects/mod.rs b/crates/apub/src/objects/mod.rs index 6d2ff291..ff4182dd 100644 --- a/crates/apub/src/objects/mod.rs +++ b/crates/apub/src/objects/mod.rs @@ -55,6 +55,7 @@ pub(crate) fn verify_is_remote_object(id: &Url, settings: &Settings) -> Result<( #[cfg(test)] pub(crate) mod tests { use activitypub_federation::config::{Data, FederationConfig}; + use actix::Actor; use anyhow::anyhow; use lemmy_api_common::{ context::LemmyContext, @@ -69,7 +70,6 @@ pub(crate) mod tests { }; use reqwest::{Client, Request, Response}; use reqwest_middleware::{ClientBuilder, Middleware, Next}; - use std::sync::Arc; use task_local_extensions::Extensions; struct BlockedMiddleware; @@ -110,7 +110,7 @@ pub(crate) mod tests { let rate_limit_config = RateLimitConfig::builder().build(); let rate_limit_cell = RateLimitCell::new(rate_limit_config).await; - let chat_server = Arc::new(ChatServer::startup()); + let chat_server = ChatServer::default().start(); let context = LemmyContext::create(pool, chat_server, client, secret, rate_limit_cell.clone()); let config = FederationConfig::builder() .domain("example.com") diff --git a/src/api_routes_websocket.rs b/src/api_routes_websocket.rs index 3ab1efc7..b3ce85bd 100644 --- a/src/api_routes_websocket.rs +++ b/src/api_routes_websocket.rs @@ -1,8 +1,18 @@ use activitypub_federation::config::Data as ContextData; +use actix::{ + fut, + Actor, + ActorContext, + ActorFutureExt, + AsyncContext, + ContextFutureSpawner, + Handler, + Running, + StreamHandler, + WrapFuture, +}; use actix_web::{web, Error, HttpRequest, HttpResponse}; use actix_web_actors::ws; -use actix_ws::{MessageStream, Session}; -use futures::stream::StreamExt; use lemmy_api::Perform; use lemmy_api_common::{ comment::{ @@ -101,6 +111,10 @@ use lemmy_api_common::{ Search, }, websocket::{ + handlers::{ + connect::{Connect, Disconnect}, + WsMessage, + }, serialize_websocket_message, structs::{CommunityJoin, ModJoin, PostJoin, UserJoin}, UserOperation, @@ -117,21 +131,36 @@ use std::{ ops::Deref, result, str::FromStr, - sync::{Arc, Mutex}, time::{Duration, Instant}, }; -use tracing::{debug, error, info}; +use tracing::{debug, error}; + +/// How often heartbeat pings are sent +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25); + +/// How long before lack of client response causes a timeout +const CLIENT_TIMEOUT: Duration = Duration::from_secs(60); + +pub struct WsChatSession { + /// unique session id + pub id: ConnectionId, + + pub ip: IpAddr, + + /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), + /// otherwise we drop connection. + pub hb: Instant, + + /// The context data + apub_data: ContextData, +} -/// Entry point for our route pub async fn websocket( req: HttpRequest, body: web::Payload, - context: web::Data, rate_limiter: web::Data, apub_data: ContextData, ) -> Result { - let (response, session, stream) = actix_ws::handle(&req, body)?; - let client_ip = IpAddr( req .connection_info() @@ -146,111 +175,158 @@ pub async fn websocket( "Websocket join with IP: {} has been rate limited.", &client_ip ); - session.close(None).await.map_err(LemmyError::from)?; - return Ok(response); + return Ok(HttpResponse::TooManyRequests().finish()); } - let connection_id = context.chat_server().handle_connect(session.clone())?; - info!("{} joined", &client_ip); + ws::start( + WsChatSession { + id: 0, + ip: client_ip, + hb: Instant::now(), + apub_data, + }, + &req, + body, + ) +} + +/// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL). +/// +/// also this method checks heartbeats from client +fn hb(ctx: &mut ws::WebsocketContext) { + ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { + // check client heartbeats + if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + // heartbeat timed out - let alive = Arc::new(Mutex::new(Instant::now())); - heartbeat(session.clone(), alive.clone()); + // notify chat server + act + .apub_data + .chat_server() + .do_send(Disconnect { id: act.id }); - actix_rt::spawn(handle_messages( - stream, - client_ip, - session, - connection_id, - alive, - rate_limiter, - apub_data, - )); + // stop actor + ctx.stop(); - Ok(response) + // don't try to send a ping + return; + } + + ctx.ping(b""); + }); } -async fn handle_messages( - mut stream: MessageStream, - client_ip: IpAddr, - mut session: Session, - connection_id: ConnectionId, - alive: Arc>, - rate_limiter: web::Data, - context: ContextData, -) -> Result<(), LemmyError> { - while let Some(Ok(msg)) = stream.next().await { - match msg { - ws::Message::Ping(bytes) => { - if session.pong(&bytes).await.is_err() { - break; +impl Actor for WsChatSession { + type Context = ws::WebsocketContext; + + /// Method is called on actor start. + /// We register ws session with ChatServer + fn started(&mut self, ctx: &mut Self::Context) { + // we'll start heartbeat process on session start. + hb(ctx); + + // register self in chat server. `AsyncContext::wait` register + // future within context, but context waits until this future resolves + // before processing any other events. + // HttpContext::state() is instance of WsChatSessionState, state is shared + // across all routes within application + let addr = ctx.address(); + self + .apub_data + .chat_server() + .send(Connect { + addr: addr.recipient(), + }) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(res) => act.id = res, + // something is wrong with chat server + _ => ctx.stop(), } + fut::ready(()) + }) + .wait(ctx); + } + fn stopping(&mut self, _: &mut Self::Context) -> Running { + // notify chat server + self + .apub_data + .chat_server() + .do_send(Disconnect { id: self.id }); + Running::Stop + } +} + +/// Handle messages from chat server, we simply send it to peer websocket +impl Handler for WsChatSession { + type Result = (); + + fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) { + ctx.text(msg.0); + } +} + +/// WebSocket message handler +impl StreamHandler> for WsChatSession { + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { + let msg = match msg { + Err(_) => { + ctx.stop(); + return; + } + Ok(msg) => msg, + }; + + match msg { + ws::Message::Ping(msg) => { + self.hb = Instant::now(); + ctx.pong(&msg); } ws::Message::Pong(_) => { - let mut lock = alive - .lock() - .expect("Failed to acquire websocket heartbeat alive lock"); - *lock = Instant::now(); + self.hb = Instant::now(); } ws::Message::Text(text) => { - let msg = text.trim().to_string(); - let executed = parse_json_message( - msg, - client_ip.clone(), - connection_id, - rate_limiter.get_ref(), - context.reset_request_count(), - ) - .await; - - let res = executed.unwrap_or_else(|e| { - error!("Error during message handling {}", e); - e.to_json() - .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#)) + let ip_clone = self.ip.clone(); + let id_clone = self.id.to_owned(); + let context_clone = self.apub_data.reset_request_count(); + + let fut = Box::pin(async move { + let msg = text.trim().to_string(); + parse_json_message(msg, ip_clone, id_clone, context_clone).await }); - session.text(res).await?; - } - ws::Message::Close(_) => { - session.close(None).await?; - context.chat_server().handle_disconnect(&connection_id)?; - break; + fut + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(res) => ctx.text(res), + Err(e) => error!("{}", &e), + } + actix::fut::ready(()) + }) + .spawn(ctx); } - ws::Message::Binary(_) => info!("Unexpected binary"), - _ => {} - } - } - Ok(()) -} - -fn heartbeat(mut session: Session, alive: Arc>) { - actix_rt::spawn(async move { - let mut interval = actix_rt::time::interval(Duration::from_secs(5)); - loop { - if session.ping(b"").await.is_err() { - break; + ws::Message::Binary(_) => println!("Unexpected binary"), + ws::Message::Close(reason) => { + ctx.close(reason); + ctx.stop(); } - - let duration_since = { - let alive_lock = alive - .lock() - .expect("Failed to acquire websocket heartbeat alive lock"); - Instant::now().duration_since(*alive_lock) - }; - if duration_since > Duration::from_secs(10) { - let _ = session.close(None).await; - break; + ws::Message::Continuation(_) => { + ctx.stop(); } - interval.tick().await; + ws::Message::Nop => (), } - }); + } } +/// Entry point for our websocket route async fn parse_json_message( msg: String, ip: IpAddr, connection_id: ConnectionId, - rate_limiter: &RateLimitCell, context: ContextData, ) -> Result { + let rate_limiter = context.settings_updated_channel(); let json: Value = serde_json::from_str(&msg)?; let data = json .get("data") diff --git a/src/lib.rs b/src/lib.rs index 152e3ded..124cecc5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub mod telemetry; use crate::{code_migrations::run_advanced_migrations, root_span_builder::QuieterRootSpanBuilder}; use activitypub_federation::config::{FederationConfig, FederationMiddleware}; +use actix::Actor; use actix_web::{middleware, web::Data, App, HttpServer, Result}; use doku::json::{AutoComments, CommentsStyle, Formatting, ObjectsStyle}; use lemmy_api_common::{ @@ -35,7 +36,7 @@ use reqwest::Client; use reqwest_middleware::ClientBuilder; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use reqwest_tracing::TracingMiddleware; -use std::{env, sync::Arc, thread, time::Duration}; +use std::{env, thread, time::Duration}; use tracing::subscriber::set_global_default; use tracing_actix_web::TracingLogger; use tracing_error::ErrorLayer; @@ -133,7 +134,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> { scheduled_tasks::setup(db_url, user_agent).expect("Couldn't set up scheduled_tasks"); }); - let chat_server = Arc::new(ChatServer::startup()); + let chat_server = ChatServer::default().start(); // Create Http server with websocket support let settings_bind = settings.clone();