]> Untitled Git - lemmy.git/commitdiff
Making the chat server an actor. (#2793)
authorDessalines <dessalines@users.noreply.github.com>
Thu, 13 Apr 2023 10:53:55 +0000 (06:53 -0400)
committerGitHub <noreply@github.com>
Thu, 13 Apr 2023 10:53:55 +0000 (06:53 -0400)
* Making the chat server an actor.

- Fixes #2778
- #2787

* Forgot to add handlers folder.

* Some cleanup.

* Forgot to remove a comment.

* Address PR comments.

* Using ToString for enum operations.

66 files changed:
Cargo.lock
Cargo.toml
crates/api/src/comment/like.rs
crates/api/src/comment_report/create.rs
crates/api/src/comment_report/resolve.rs
crates/api/src/community/add_mod.rs
crates/api/src/community/ban.rs
crates/api/src/community/hide.rs
crates/api/src/local_user/add_admin.rs
crates/api/src/local_user/ban_person.rs
crates/api/src/local_user/get_captcha.rs
crates/api/src/post/feature.rs
crates/api/src/post/like.rs
crates/api/src/post/lock.rs
crates/api/src/post_report/create.rs
crates/api/src/post_report/resolve.rs
crates/api/src/private_message/mark_read.rs
crates/api/src/private_message_report/create.rs
crates/api/src/private_message_report/resolve.rs
crates/api/src/websocket.rs
crates/api_common/Cargo.toml
crates/api_common/src/context.rs
crates/api_common/src/websocket/chat_server.rs
crates/api_common/src/websocket/handlers.rs [deleted file]
crates/api_common/src/websocket/handlers/captcha.rs [new file with mode: 0644]
crates/api_common/src/websocket/handlers/connect.rs [new file with mode: 0644]
crates/api_common/src/websocket/handlers/join_rooms.rs [new file with mode: 0644]
crates/api_common/src/websocket/handlers/messages.rs [new file with mode: 0644]
crates/api_common/src/websocket/handlers/mod.rs [new file with mode: 0644]
crates/api_common/src/websocket/handlers/online_users.rs [new file with mode: 0644]
crates/api_common/src/websocket/mod.rs
crates/api_common/src/websocket/send.rs
crates/api_crud/src/comment/create.rs
crates/api_crud/src/comment/delete.rs
crates/api_crud/src/comment/remove.rs
crates/api_crud/src/comment/update.rs
crates/api_crud/src/community/delete.rs
crates/api_crud/src/community/remove.rs
crates/api_crud/src/community/update.rs
crates/api_crud/src/post/create.rs
crates/api_crud/src/post/delete.rs
crates/api_crud/src/post/read.rs
crates/api_crud/src/post/remove.rs
crates/api_crud/src/post/update.rs
crates/api_crud/src/private_message/create.rs
crates/api_crud/src/private_message/delete.rs
crates/api_crud/src/private_message/update.rs
crates/api_crud/src/site/read.rs
crates/api_crud/src/site/update.rs
crates/api_crud/src/user/create.rs
crates/apub/Cargo.toml
crates/apub/src/activities/community/report.rs
crates/apub/src/activities/community/update.rs
crates/apub/src/activities/create_or_update/comment.rs
crates/apub/src/activities/create_or_update/mod.rs
crates/apub/src/activities/create_or_update/post.rs
crates/apub/src/activities/create_or_update/private_message.rs
crates/apub/src/activities/deletion/delete.rs
crates/apub/src/activities/deletion/mod.rs
crates/apub/src/activities/deletion/undo_delete.rs
crates/apub/src/activities/following/accept.rs
crates/apub/src/activities/voting/mod.rs
crates/apub/src/api/read_community.rs
crates/apub/src/objects/mod.rs
src/api_routes_websocket.rs
src/lib.rs

index 774244683ab446259c4908d27e23b1d5a1c11e69..b98acdc2be06f0ce3bdbdb6d4419b792ef76d658 100644 (file)
@@ -58,6 +58,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f728064aca1c318585bf4bb04ffcfac9e75e508ab4e8b1bd9ba5dfe04e2cbed5"
 dependencies = [
  "actix-rt",
+ "actix_derive",
  "bitflags",
  "bytes",
  "crossbeam-channel",
@@ -324,16 +325,14 @@ dependencies = [
 ]
 
 [[package]]
-name = "actix-ws"
-version = "0.2.5"
+name = "actix_derive"
+version = "0.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "535aec173810be3ca6f25dd5b4d431ae7125d62000aa3cbae1ec739921b02cf3"
+checksum = "6d44b8fee1ced9671ba043476deddef739dd0959bf77030b26b738cc591737a7"
 dependencies = [
- "actix-codec",
- "actix-http",
- "actix-web",
- "futures-core",
- "tokio",
+ "proc-macro2 1.0.47",
+ "quote 1.0.21",
+ "syn 1.0.103",
 ]
 
 [[package]]
@@ -2409,9 +2408,9 @@ dependencies = [
 name = "lemmy_api_common"
 version = "0.17.1"
 dependencies = [
+ "actix",
  "actix-rt",
  "actix-web",
- "actix-ws",
  "anyhow",
  "chrono",
  "encoding",
@@ -2461,6 +2460,7 @@ name = "lemmy_apub"
 version = "0.17.1"
 dependencies = [
  "activitypub_federation",
+ "actix",
  "actix-rt",
  "actix-web",
  "anyhow",
@@ -2589,10 +2589,10 @@ name = "lemmy_server"
 version = "0.17.1"
 dependencies = [
  "activitypub_federation",
+ "actix",
  "actix-rt",
  "actix-web",
  "actix-web-actors",
- "actix-ws",
  "clokwerk",
  "console-subscriber",
  "diesel",
index 3a97f62060bac29a8f0fd9e251db62629e28b0bb..52e1a8beb6452f891fd00f6d3afa01ca1b90bc8b 100644 (file)
@@ -104,7 +104,7 @@ rosetta-i18n = "0.1.2"
 rand = "0.8.5"
 opentelemetry = { version = "0.17.0", features = ["rt-tokio"] }
 tracing-opentelemetry = { version = "0.17.2" }
-actix-ws = "0.2.0"
+actix = "0.13"
 
 [dependencies]
 lemmy_api = { workspace = true }
@@ -133,7 +133,7 @@ doku = { workspace = true }
 reqwest-retry = { workspace = true }
 serde_json = { workspace = true }
 futures = { workspace = true }
-actix-ws = { workspace = true }
+actix = { workspace = true }
 tracing-opentelemetry = { workspace = true, optional = true }
 opentelemetry = { workspace = true, optional = true }
 actix-web-actors = { version = "4.1.0", default-features = false }
index fba61ed13ee83169b8b679e5c0c5dfc7d6087201..3d216872a43a48b08284a072c16c2bfd8b9fddc4 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   comment::{CommentResponse, CreateCommentLike},
   context::LemmyContext,
   utils::{check_community_ban, check_downvotes_enabled, get_local_user_view_from_jwt},
-  websocket::{send::send_comment_ws_message, UserOperation},
+  websocket::UserOperation,
 };
 use lemmy_db_schema::{
   newtypes::LocalUserId,
@@ -78,15 +78,15 @@ impl Perform for CreateCommentLike {
         .map_err(|e| LemmyError::from_error_message(e, "couldnt_like_comment"))?;
     }
 
-    send_comment_ws_message(
-      data.comment_id,
-      UserOperation::CreateCommentLike,
-      websocket_id,
-      None,
-      Some(local_user_view.person.id),
-      recipient_ids,
-      context,
-    )
-    .await
+    context
+      .send_comment_ws_message(
+        &UserOperation::CreateCommentLike,
+        data.comment_id,
+        websocket_id,
+        None,
+        Some(local_user_view.person.id),
+        recipient_ids,
+      )
+      .await
   }
 }
index 9badc31a07694a2b45cdf678c17a40253e155970..fe2471ede2d677b2347d8909e16cebcea1728cce 100644 (file)
@@ -69,15 +69,12 @@ impl Perform for CreateCommentReport {
       comment_report_view,
     };
 
-    context
-      .chat_server()
-      .send_mod_room_message(
-        UserOperation::CreateCommentReport,
-        &res,
-        comment_view.community.id,
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::CreateCommentReport,
+      &res,
+      comment_view.community.id,
+      websocket_id,
+    )?;
 
     Ok(res)
   }
index de4f418da2353d62c8bd9a1ecf02e695a12727e1..f5fd6b124242fd3f4aef0fc08d645a3c31cd2ebf 100644 (file)
@@ -49,15 +49,12 @@ impl Perform for ResolveCommentReport {
       comment_report_view,
     };
 
-    context
-      .chat_server()
-      .send_mod_room_message(
-        UserOperation::ResolveCommentReport,
-        &res,
-        report.community.id,
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::ResolveCommentReport,
+      &res,
+      report.community.id,
+      websocket_id,
+    )?;
 
     Ok(res)
   }
index 28f56cffd2b71df24fef98dbb223dd895890c3d8..7052f7cdf50a6ec792137c59e3aa97aadff29215 100644 (file)
@@ -70,15 +70,13 @@ impl Perform for AddModToCommunity {
     let moderators = CommunityModeratorView::for_community(context.pool(), community_id).await?;
 
     let res = AddModToCommunityResponse { moderators };
-    context
-      .chat_server()
-      .send_community_room_message(
-        &UserOperation::AddModToCommunity,
-        &res,
-        community_id,
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::AddModToCommunity,
+      &res,
+      community_id,
+      websocket_id,
+    )?;
+
     Ok(res)
   }
 }
index b2d4260e0daa021da501e8d77c1ecd9f60540f1d..d3341340e297440a7729612b6c4c4f577b0a145c 100644 (file)
@@ -4,7 +4,11 @@ use lemmy_api_common::{
   community::{BanFromCommunity, BanFromCommunityResponse},
   context::LemmyContext,
   utils::{get_local_user_view_from_jwt, is_mod_or_admin, remove_user_data_in_community},
-  websocket::UserOperation,
+  websocket::{
+    handlers::messages::SendCommunityRoomMessage,
+    serialize_websocket_message,
+    UserOperation,
+  },
 };
 use lemmy_db_schema::{
   source::{
@@ -95,15 +99,13 @@ impl Perform for BanFromCommunity {
       banned: data.ban,
     };
 
-    context
-      .chat_server()
-      .send_community_room_message(
-        &UserOperation::BanFromCommunity,
-        &res,
-        community_id,
-        websocket_id,
-      )
-      .await?;
+    // A custom ban message
+    let message = serialize_websocket_message(&UserOperation::BanFromCommunity, &res)?;
+    context.chat_server().do_send(SendCommunityRoomMessage {
+      community_id,
+      message,
+      websocket_id,
+    });
 
     Ok(res)
   }
index 94ce7d745134dddd9645aa8907334f4b9c5b444b..8e18115782559dafc037a6821fc7f2b0f8d5b432 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   community::{CommunityResponse, HideCommunity},
   context::LemmyContext,
   utils::{get_local_user_view_from_jwt, is_admin},
-  websocket::{send::send_community_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -50,7 +50,13 @@ impl Perform for HideCommunity {
 
     ModHideCommunity::create(context.pool(), &mod_hide_community_form).await?;
 
-    let op = UserOperationCrud::EditCommunity;
-    send_community_ws_message(data.community_id, op, websocket_id, None, context).await
+    context
+      .send_community_ws_message(
+        &UserOperationCrud::EditCommunity,
+        data.community_id,
+        websocket_id,
+        None,
+      )
+      .await
   }
 }
index ca30b939e7df6b644b3143bef3ec8fa33f757177..ad8b67e12358e5c46d733d24eec52127bd96f259 100644 (file)
@@ -56,10 +56,7 @@ impl Perform for AddAdmin {
 
     let res = AddAdminResponse { admins };
 
-    context
-      .chat_server()
-      .send_all_message(UserOperation::AddAdmin, &res, websocket_id)
-      .await?;
+    context.send_all_ws_message(&UserOperation::AddAdmin, &res, websocket_id)?;
 
     Ok(res)
   }
index d2c9789e92e1ca51ec0d9b69b7ce71854f423eff..55069742c49799bdd0836836cd5971dcfc45f5a8 100644 (file)
@@ -79,10 +79,7 @@ impl Perform for BanPerson {
       banned: data.ban,
     };
 
-    context
-      .chat_server()
-      .send_all_message(UserOperation::BanPerson, &res, websocket_id)
-      .await?;
+    context.send_all_ws_message(&UserOperation::BanPerson, &res, websocket_id)?;
 
     Ok(res)
   }
index 3671f79cc23cd80db1b2dd9914a284759dcad314..00ec45a131f0f22311636c58c256779eb061b20e 100644 (file)
@@ -5,7 +5,7 @@ use chrono::Duration;
 use lemmy_api_common::{
   context::LemmyContext,
   person::{CaptchaResponse, GetCaptcha, GetCaptchaResponse},
-  websocket::structs::CaptchaItem,
+  websocket::{handlers::captcha::AddCaptcha, structs::CaptchaItem},
 };
 use lemmy_db_schema::{source::local_site::LocalSite, utils::naive_now};
 use lemmy_utils::{error::LemmyError, ConnectionId};
@@ -47,7 +47,9 @@ impl Perform for GetCaptcha {
     };
 
     // Stores the captcha item on the queue
-    context.chat_server().add_captcha(captcha_item)?;
+    context.chat_server().do_send(AddCaptcha {
+      captcha: captcha_item,
+    });
 
     Ok(GetCaptchaResponse {
       ok: Some(CaptchaResponse { png, wav, uuid }),
index 16ba7bfb2bbcfb67ccb9a11172df25e929eb16ae..e1dd1e3d8bc5e7f76cabea8269c86091c9dac8e9 100644 (file)
@@ -10,7 +10,7 @@ use lemmy_api_common::{
     is_admin,
     is_mod_or_admin,
   },
-  websocket::{send::send_post_ws_message, UserOperation},
+  websocket::UserOperation,
 };
 use lemmy_db_schema::{
   source::{
@@ -82,13 +82,13 @@ impl Perform for FeaturePost {
 
     ModFeaturePost::create(context.pool(), &form).await?;
 
-    send_post_ws_message(
-      data.post_id,
-      UserOperation::FeaturePost,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await
+    context
+      .send_post_ws_message(
+        &UserOperation::FeaturePost,
+        data.post_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await
   }
 }
index 59135ff216f89db3f75a9d0ba4dc32068a5a662d..53e6c13c6177c16d865f19e59f4ecded8a11284d 100644 (file)
@@ -10,7 +10,7 @@ use lemmy_api_common::{
     get_local_user_view_from_jwt,
     mark_post_as_read,
   },
-  websocket::{send::send_post_ws_message, UserOperation},
+  websocket::UserOperation,
 };
 use lemmy_db_schema::{
   source::{
@@ -69,13 +69,13 @@ impl Perform for CreatePostLike {
     // Mark the post as read
     mark_post_as_read(person_id, post_id, context.pool()).await?;
 
-    send_post_ws_message(
-      data.post_id,
-      UserOperation::CreatePostLike,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await
+    context
+      .send_post_ws_message(
+        &UserOperation::CreatePostLike,
+        data.post_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await
   }
 }
index c5fa2a0f55183a56508dd3695741609c8ad6d95b..cd7db90841daec7e780c27b5ad9fa922b1c73c5e 100644 (file)
@@ -9,7 +9,7 @@ use lemmy_api_common::{
     get_local_user_view_from_jwt,
     is_mod_or_admin,
   },
-  websocket::{send::send_post_ws_message, UserOperation},
+  websocket::UserOperation,
 };
 use lemmy_db_schema::{
   source::{
@@ -71,13 +71,13 @@ impl Perform for LockPost {
     };
     ModLockPost::create(context.pool(), &form).await?;
 
-    send_post_ws_message(
-      data.post_id,
-      UserOperation::LockPost,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await
+    context
+      .send_post_ws_message(
+        &UserOperation::LockPost,
+        data.post_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await
   }
 }
index 9ada7b1d639d6733329a10668fb6f7d79d844fb8..423f8b8142171a4ed96ee043119b48c5d08cb4f0 100644 (file)
@@ -69,15 +69,12 @@ impl Perform for CreatePostReport {
 
     let res = PostReportResponse { post_report_view };
 
-    context
-      .chat_server()
-      .send_mod_room_message(
-        UserOperation::CreatePostReport,
-        &res,
-        post_view.community.id,
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::CreatePostReport,
+      &res,
+      post_view.community.id,
+      websocket_id,
+    )?;
 
     Ok(res)
   }
index 615b7f828b1666fd83fc3de62de3a7d1d78121f2..0426262df0f1e6e477d16cd8fa14a145cc0e5a2b 100644 (file)
@@ -46,15 +46,12 @@ impl Perform for ResolvePostReport {
 
     let res = PostReportResponse { post_report_view };
 
-    context
-      .chat_server()
-      .send_mod_room_message(
-        UserOperation::ResolvePostReport,
-        &res,
-        report.community.id,
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::ResolvePostReport,
+      &res,
+      report.community.id,
+      websocket_id,
+    )?;
 
     Ok(res)
   }
index 9c3c3d8ae06763a8210a67fa60684e227cb80503..ba5cf8eaa74cda6b329caea0f57a9dfa1bea0136 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   private_message::{MarkPrivateMessageAsRead, PrivateMessageResponse},
   utils::get_local_user_view_from_jwt,
-  websocket::{send::send_pm_ws_message, UserOperation},
+  websocket::UserOperation,
 };
 use lemmy_db_schema::{
   source::private_message::{PrivateMessage, PrivateMessageUpdateForm},
@@ -45,7 +45,12 @@ impl Perform for MarkPrivateMessageAsRead {
     .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?;
 
     // No need to send an apub update
-    let op = UserOperation::MarkPrivateMessageAsRead;
-    send_pm_ws_message(data.private_message_id, op, websocket_id, context).await
+    context
+      .send_pm_ws_message(
+        &UserOperation::MarkPrivateMessageAsRead,
+        data.private_message_id,
+        websocket_id,
+      )
+      .await
   }
 }
index 5ca2d6a628a0fdb8e09dd6d52dccdd118ae7d797..2a132b6f4d8879ad67dd800a672971d175fd20d3 100644 (file)
@@ -68,15 +68,12 @@ impl Perform for CreatePrivateMessageReport {
       private_message_report_view,
     };
 
-    context
-      .chat_server()
-      .send_mod_room_message(
-        UserOperation::CreatePrivateMessageReport,
-        &res,
-        CommunityId(0),
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::CreatePrivateMessageReport,
+      &res,
+      CommunityId(0),
+      websocket_id,
+    )?;
 
     // TODO: consider federating this
 
index a48a458d01c41536e5d9de136b30fe30178ac20e..9fe014c0f836463cc2f01f8e2005a15498bb7a17 100644 (file)
@@ -48,15 +48,12 @@ impl Perform for ResolvePrivateMessageReport {
       private_message_report_view,
     };
 
-    context
-      .chat_server()
-      .send_mod_room_message(
-        UserOperation::ResolvePrivateMessageReport,
-        &res,
-        CommunityId(0),
-        websocket_id,
-      )
-      .await?;
+    context.send_mod_ws_message(
+      &UserOperation::ResolvePrivateMessageReport,
+      &res,
+      CommunityId(0),
+      websocket_id,
+    )?;
 
     Ok(res)
   }
index 041cb775e8a3670c31e173e25284e6bdeb0502d2..522991ae9621273c306198d97e864764bab80053 100644 (file)
@@ -3,15 +3,18 @@ use actix_web::web::Data;
 use lemmy_api_common::{
   context::LemmyContext,
   utils::get_local_user_view_from_jwt,
-  websocket::structs::{
-    CommunityJoin,
-    CommunityJoinResponse,
-    ModJoin,
-    ModJoinResponse,
-    PostJoin,
-    PostJoinResponse,
-    UserJoin,
-    UserJoinResponse,
+  websocket::{
+    handlers::join_rooms::{JoinCommunityRoom, JoinModRoom, JoinPostRoom, JoinUserRoom},
+    structs::{
+      CommunityJoin,
+      CommunityJoinResponse,
+      ModJoin,
+      ModJoinResponse,
+      PostJoin,
+      PostJoinResponse,
+      UserJoin,
+      UserJoinResponse,
+    },
   },
 };
 use lemmy_utils::{error::LemmyError, ConnectionId};
@@ -30,10 +33,11 @@ impl Perform for UserJoin {
     let local_user_view =
       get_local_user_view_from_jwt(&data.auth, context.pool(), context.secret()).await?;
 
-    if let Some(ws_id) = websocket_id {
-      context
-        .chat_server()
-        .join_user_room(local_user_view.local_user.id, ws_id)?;
+    if let Some(id) = websocket_id {
+      context.chat_server().do_send(JoinUserRoom {
+        user_id: local_user_view.local_user.id,
+        id,
+      });
     }
 
     Ok(UserJoinResponse { joined: true })
@@ -52,10 +56,11 @@ impl Perform for CommunityJoin {
   ) -> Result<CommunityJoinResponse, LemmyError> {
     let data: &CommunityJoin = self;
 
-    if let Some(ws_id) = websocket_id {
-      context
-        .chat_server()
-        .join_community_room(data.community_id, ws_id)?;
+    if let Some(id) = websocket_id {
+      context.chat_server().do_send(JoinCommunityRoom {
+        community_id: data.community_id,
+        id,
+      });
     }
 
     Ok(CommunityJoinResponse { joined: true })
@@ -74,10 +79,11 @@ impl Perform for ModJoin {
   ) -> Result<ModJoinResponse, LemmyError> {
     let data: &ModJoin = self;
 
-    if let Some(ws_id) = websocket_id {
-      context
-        .chat_server()
-        .join_mod_room(data.community_id, ws_id)?;
+    if let Some(id) = websocket_id {
+      context.chat_server().do_send(JoinModRoom {
+        community_id: data.community_id,
+        id,
+      });
     }
 
     Ok(ModJoinResponse { joined: true })
@@ -96,8 +102,11 @@ impl Perform for PostJoin {
   ) -> Result<PostJoinResponse, LemmyError> {
     let data: &PostJoin = self;
 
-    if let Some(ws_id) = websocket_id {
-      context.chat_server().join_post_room(data.post_id, ws_id)?;
+    if let Some(id) = websocket_id {
+      context.chat_server().do_send(JoinPostRoom {
+        post_id: data.post_id,
+        id,
+      });
     }
 
     Ok(PostJoinResponse { joined: true })
index 15e8d3707913a620998f2cad7fd40df54acab546..59352ef51741f3cd757fbfadc01296b873edc526 100644 (file)
@@ -40,7 +40,7 @@ serde_json = { workspace = true }
 anyhow = { workspace = true }
 strum = { workspace = true }
 strum_macros = { workspace = true }
-actix-ws = { workspace = true }
+actix = { workspace = true }
 futures = { workspace = true }
 uuid = { workspace = true }
 actix-rt = { workspace = true }
index 4c91c2c5bcd9412b03bbd54ce82843f3f85b2692..1301bf36f6618d1d0eeec43b0efd71a88b2c365d 100644 (file)
@@ -1,4 +1,5 @@
 use crate::websocket::chat_server::ChatServer;
+use actix::Addr;
 use lemmy_db_schema::{source::secret::Secret, utils::DbPool};
 use lemmy_utils::{
   rate_limit::RateLimitCell,
@@ -10,7 +11,7 @@ use std::sync::Arc;
 #[derive(Clone)]
 pub struct LemmyContext {
   pool: DbPool,
-  chat_server: Arc<ChatServer>,
+  chat_server: Addr<ChatServer>,
   client: Arc<ClientWithMiddleware>,
   secret: Arc<Secret>,
   rate_limit_cell: RateLimitCell,
@@ -19,7 +20,7 @@ pub struct LemmyContext {
 impl LemmyContext {
   pub fn create(
     pool: DbPool,
-    chat_server: Arc<ChatServer>,
+    chat_server: Addr<ChatServer>,
     client: ClientWithMiddleware,
     secret: Secret,
     rate_limit_cell: RateLimitCell,
@@ -35,7 +36,7 @@ impl LemmyContext {
   pub fn pool(&self) -> &DbPool {
     &self.pool
   }
-  pub fn chat_server(&self) -> &Arc<ChatServer> {
+  pub fn chat_server(&self) -> &Addr<ChatServer> {
     &self.chat_server
   }
   pub fn client(&self) -> &ClientWithMiddleware {
index 3c92f238df97be1163ae2cd174b07b37ab754abc..fa06987c5c5d160835a70223d039b004a69c85bb 100644 (file)
@@ -1,30 +1,16 @@
-use crate::{
-  comment::CommentResponse,
-  post::PostResponse,
-  websocket::{serialize_websocket_message, structs::CaptchaItem, OperationType},
+use crate::websocket::{
+  handlers::{SessionInfo, WsMessage},
+  structs::CaptchaItem,
 };
-use actix_ws::Session;
-use anyhow::Context as acontext;
-use futures::future::join_all;
+use actix::{Actor, Context};
 use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId};
-use lemmy_utils::{error::LemmyError, location_info, ConnectionId};
+use lemmy_utils::ConnectionId;
 use rand::{rngs::StdRng, SeedableRng};
-use serde::Serialize;
-use std::{
-  collections::{HashMap, HashSet},
-  sync::{Mutex, MutexGuard},
-};
-use tracing::log::warn;
+use std::collections::{HashMap, HashSet};
 
-/// `ChatServer` manages chat rooms and responsible for coordinating chat
-/// session.
 pub struct ChatServer {
-  inner: Mutex<ChatServerInner>,
-}
-
-pub struct ChatServerInner {
   /// A map from generated random ID to session addr
-  pub sessions: HashMap<ConnectionId, Session>,
+  pub sessions: HashMap<ConnectionId, SessionInfo>,
 
   /// A map from post_id to set of connectionIDs
   pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
@@ -48,347 +34,44 @@ pub struct ChatServerInner {
 /// And manages available rooms. Peers send messages to other peers in same
 /// room through `ChatServer`.
 impl ChatServer {
-  pub fn startup() -> ChatServer {
+  pub fn new() -> ChatServer {
     ChatServer {
-      inner: Mutex::new(ChatServerInner {
-        sessions: Default::default(),
-        post_rooms: Default::default(),
-        community_rooms: Default::default(),
-        mod_rooms: Default::default(),
-        user_rooms: Default::default(),
-        rng: StdRng::from_entropy(),
-        captchas: vec![],
-      }),
-    }
-  }
-
-  pub fn join_community_room(
-    &self,
-    community_id: CommunityId,
-    id: ConnectionId,
-  ) -> Result<(), LemmyError> {
-    let mut inner = self.inner()?;
-    // remove session from all rooms
-    for sessions in inner.community_rooms.values_mut() {
-      sessions.remove(&id);
-    }
-
-    // Also leave all post rooms
-    // This avoids double messages
-    for sessions in inner.post_rooms.values_mut() {
-      sessions.remove(&id);
-    }
-
-    // If the room doesn't exist yet
-    if inner.community_rooms.get_mut(&community_id).is_none() {
-      inner.community_rooms.insert(community_id, HashSet::new());
-    }
-
-    inner
-      .community_rooms
-      .get_mut(&community_id)
-      .context(location_info!())?
-      .insert(id);
-    Ok(())
-  }
-
-  pub fn join_mod_room(
-    &self,
-    community_id: CommunityId,
-    id: ConnectionId,
-  ) -> Result<(), LemmyError> {
-    let mut inner = self.inner()?;
-    // remove session from all rooms
-    for sessions in inner.mod_rooms.values_mut() {
-      sessions.remove(&id);
-    }
-
-    // If the room doesn't exist yet
-    if inner.mod_rooms.get_mut(&community_id).is_none() {
-      inner.mod_rooms.insert(community_id, HashSet::new());
-    }
-
-    inner
-      .mod_rooms
-      .get_mut(&community_id)
-      .context(location_info!())?
-      .insert(id);
-    Ok(())
-  }
-
-  pub fn join_post_room(&self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
-    let mut inner = self.inner()?;
-    // remove session from all rooms
-    for sessions in inner.post_rooms.values_mut() {
-      sessions.remove(&id);
-    }
-
-    // Also leave all communities
-    // This avoids double messages
-    // TODO found a bug, whereby community messages like
-    // delete and remove aren't sent, because
-    // you left the community room
-    for sessions in inner.community_rooms.values_mut() {
-      sessions.remove(&id);
-    }
-
-    // If the room doesn't exist yet
-    if inner.post_rooms.get_mut(&post_id).is_none() {
-      inner.post_rooms.insert(post_id, HashSet::new());
-    }
-
-    inner
-      .post_rooms
-      .get_mut(&post_id)
-      .context(location_info!())?
-      .insert(id);
-
-    Ok(())
-  }
-
-  pub fn join_user_room(&self, user_id: LocalUserId, id: ConnectionId) -> Result<(), LemmyError> {
-    let mut inner = self.inner()?;
-    // remove session from all rooms
-    for sessions in inner.user_rooms.values_mut() {
-      sessions.remove(&id);
-    }
-
-    // If the room doesn't exist yet
-    if inner.user_rooms.get_mut(&user_id).is_none() {
-      inner.user_rooms.insert(user_id, HashSet::new());
-    }
-
-    inner
-      .user_rooms
-      .get_mut(&user_id)
-      .context(location_info!())?
-      .insert(id);
-    Ok(())
-  }
-
-  async fn send_post_room_message<OP, Response>(
-    &self,
-    op: &OP,
-    response: &Response,
-    post_id: PostId,
-    websocket_id: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-    Response: Serialize,
-  {
-    let msg = serialize_websocket_message(op, response)?;
-    let room = self.inner()?.post_rooms.get(&post_id).cloned();
-    self.send_message_in_room(&msg, room, websocket_id).await?;
-    Ok(())
-  }
-
-  /// Send message to all users viewing the given community.
-  pub async fn send_community_room_message<OP, Response>(
-    &self,
-    op: &OP,
-    response: &Response,
-    community_id: CommunityId,
-    websocket_id: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-    Response: Serialize,
-  {
-    let msg = serialize_websocket_message(op, response)?;
-    let room = self.inner()?.community_rooms.get(&community_id).cloned();
-    self.send_message_in_room(&msg, room, websocket_id).await?;
-    Ok(())
-  }
-
-  /// Send message to mods of a given community. Set community_id = 0 to send to site admins.
-  pub async fn send_mod_room_message<OP, Response>(
-    &self,
-    op: OP,
-    response: &Response,
-    community_id: CommunityId,
-    websocket_id: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-    Response: Serialize,
-  {
-    let msg = serialize_websocket_message(&op, response)?;
-    let room = self.inner()?.mod_rooms.get(&community_id).cloned();
-    self.send_message_in_room(&msg, room, websocket_id).await?;
-    Ok(())
-  }
-
-  pub async fn send_all_message<OP, Response>(
-    &self,
-    op: OP,
-    response: &Response,
-    exclude_connection: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-    Response: Serialize,
-  {
-    let msg = &serialize_websocket_message(&op, response)?;
-    let sessions = self.inner()?.sessions.clone();
-    // Note, this will ignore any errors, such as closed connections
-    join_all(
-      sessions
-        .into_iter()
-        .filter(|(id, _)| Some(id) != exclude_connection.as_ref())
-        .map(|(_, mut s): (_, Session)| async move { s.text(msg).await }),
-    )
-    .await;
-    Ok(())
-  }
-
-  pub async fn send_user_room_message<OP, Response>(
-    &self,
-    op: &OP,
-    response: &Response,
-    recipient_id: LocalUserId,
-    websocket_id: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-    Response: Serialize,
-  {
-    let msg = serialize_websocket_message(op, response)?;
-    let room = self.inner()?.user_rooms.get(&recipient_id).cloned();
-    self.send_message_in_room(&msg, room, websocket_id).await?;
-    Ok(())
-  }
-
-  pub async fn send_comment<OP>(
-    &self,
-    user_operation: &OP,
-    comment: &CommentResponse,
-    websocket_id: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-  {
-    let mut comment_reply_sent = comment.clone();
-
-    // Strip out my specific user info
-    comment_reply_sent.comment_view.my_vote = None;
-
-    // Send it to the post room
-    let mut comment_post_sent = comment_reply_sent.clone();
-    // Remove the recipients here to separate mentions / user messages from post or community comments
-    comment_post_sent.recipient_ids = Vec::new();
-    self
-      .send_post_room_message(
-        user_operation,
-        &comment_post_sent,
-        comment_post_sent.comment_view.post.id,
-        websocket_id,
-      )
-      .await?;
-
-    // Send it to the community too
-    self
-      .send_community_room_message(
-        user_operation,
-        &comment_post_sent,
-        CommunityId(0),
-        websocket_id,
-      )
-      .await?;
-    self
-      .send_community_room_message(
-        user_operation,
-        &comment_post_sent,
-        comment.comment_view.community.id,
-        websocket_id,
-      )
-      .await?;
-
-    // Send it to the recipient(s) including the mentioned users
-    for recipient_id in &comment_reply_sent.recipient_ids {
-      self
-        .send_user_room_message(
-          user_operation,
-          &comment_reply_sent,
-          *recipient_id,
-          websocket_id,
-        )
-        .await?;
+      sessions: Default::default(),
+      post_rooms: Default::default(),
+      community_rooms: Default::default(),
+      mod_rooms: Default::default(),
+      user_rooms: Default::default(),
+      rng: StdRng::from_entropy(),
+      captchas: vec![],
     }
-
-    Ok(())
-  }
-
-  pub async fn send_post<OP>(
-    &self,
-    user_operation: &OP,
-    post_res: &PostResponse,
-    websocket_id: Option<ConnectionId>,
-  ) -> Result<(), LemmyError>
-  where
-    OP: OperationType + ToString,
-  {
-    let community_id = post_res.post_view.community.id;
-
-    // Don't send my data with it
-    let mut post_sent = post_res.clone();
-    post_sent.post_view.my_vote = None;
-
-    // Send it to /c/all and that community
-    self
-      .send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id)
-      .await?;
-    self
-      .send_community_room_message(user_operation, &post_sent, community_id, websocket_id)
-      .await?;
-
-    // Send it to the post room
-    self
-      .send_post_room_message(
-        user_operation,
-        &post_sent,
-        post_res.post_view.post.id,
-        websocket_id,
-      )
-      .await?;
-
-    Ok(())
   }
 
-  /// Send websocket message in all sessions which joined a specific room.
-  ///
-  /// `message` - The json message body to send
-  /// `room` - Connection IDs which should receive the message
-  /// `exclude_connection` - Dont send to user who initiated the api call, as that
-  ///                        would result in duplicate notification
-  async fn send_message_in_room(
+  pub fn send_message(
     &self,
+    connections: &HashSet<ConnectionId>,
     message: &str,
-    room: Option<HashSet<ConnectionId>>,
     exclude_connection: Option<ConnectionId>,
-  ) -> Result<(), LemmyError> {
-    let mut session = self.inner()?.sessions.clone();
-    if let Some(room) = room {
-      // Note, this will ignore any errors, such as closed connections
-      join_all(
-        room
-          .into_iter()
-          .filter(|c| Some(c) != exclude_connection.as_ref())
-          .filter_map(|c| session.remove(&c))
-          .map(|mut s: Session| async move { s.text(message).await }),
-      )
-      .await;
+  ) {
+    for id in connections
+      .iter()
+      .filter(|c| Some(*c) != exclude_connection.as_ref())
+    {
+      if let Some(session) = self.sessions.get(id) {
+        session.addr.do_send(WsMessage(message.to_owned()));
+      }
     }
-    Ok(())
   }
+}
 
-  pub(in crate::websocket) fn inner(&self) -> Result<MutexGuard<'_, ChatServerInner>, LemmyError> {
-    match self.inner.lock() {
-      Ok(g) => Ok(g),
-      Err(e) => {
-        warn!("Failed to lock chatserver mutex: {}", e);
-        Err(LemmyError::from_message("Failed to lock chatserver mutex"))
-      }
-    }
+impl Default for ChatServer {
+  fn default() -> Self {
+    Self::new()
   }
 }
+
+/// Make actor from `ChatServer`
+impl Actor for ChatServer {
+  /// We are going to use simple Context, we just need ability to communicate
+  /// with other actors.
+  type Context = Context<Self>;
+}
diff --git a/crates/api_common/src/websocket/handlers.rs b/crates/api_common/src/websocket/handlers.rs
deleted file mode 100644 (file)
index afdbfd5..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-use crate::websocket::{chat_server::ChatServer, structs::CaptchaItem};
-use actix_ws::Session;
-use lemmy_db_schema::{
-  newtypes::{CommunityId, PostId},
-  utils::naive_now,
-};
-use lemmy_utils::{error::LemmyError, ConnectionId};
-use rand::Rng;
-
-impl ChatServer {
-  /// Handler for Connect message.
-  ///
-  /// Register new session and assign unique id to this session
-  pub fn handle_connect(&self, session: Session) -> Result<ConnectionId, LemmyError> {
-    let mut inner = self.inner()?;
-    // register session with random id
-    let id = inner.rng.gen::<usize>();
-
-    inner.sessions.insert(id, session);
-    Ok(id)
-  }
-
-  /// Handler for Disconnect message.
-  pub fn handle_disconnect(&self, connection_id: &ConnectionId) -> Result<(), LemmyError> {
-    let mut inner = self.inner()?;
-    // Remove connections from sessions and all 3 scopes
-    if inner.sessions.remove(connection_id).is_some() {
-      for sessions in inner.user_rooms.values_mut() {
-        sessions.remove(connection_id);
-      }
-
-      for sessions in inner.post_rooms.values_mut() {
-        sessions.remove(connection_id);
-      }
-
-      for sessions in inner.community_rooms.values_mut() {
-        sessions.remove(connection_id);
-      }
-    }
-    Ok(())
-  }
-
-  pub fn get_users_online(&self) -> Result<usize, LemmyError> {
-    Ok(self.inner()?.sessions.len())
-  }
-
-  pub fn get_post_users_online(&self, post_id: PostId) -> Result<usize, LemmyError> {
-    if let Some(users) = self.inner()?.post_rooms.get(&post_id) {
-      Ok(users.len())
-    } else {
-      Ok(0)
-    }
-  }
-
-  pub fn get_community_users_online(&self, community_id: CommunityId) -> Result<usize, LemmyError> {
-    if let Some(users) = self.inner()?.community_rooms.get(&community_id) {
-      Ok(users.len())
-    } else {
-      Ok(0)
-    }
-  }
-
-  pub fn add_captcha(&self, captcha: CaptchaItem) -> Result<(), LemmyError> {
-    self.inner()?.captchas.push(captcha);
-    Ok(())
-  }
-
-  pub fn check_captcha(&self, uuid: String, answer: String) -> Result<bool, LemmyError> {
-    let mut inner = self.inner()?;
-    // Remove all the ones that are past the expire time
-    inner.captchas.retain(|x| x.expires.gt(&naive_now()));
-
-    let check = inner
-      .captchas
-      .iter()
-      .any(|r| r.uuid == uuid && r.answer.to_lowercase() == answer.to_lowercase());
-
-    // Remove this uuid so it can't be re-checked (Checks only work once)
-    inner.captchas.retain(|x| x.uuid != uuid);
-
-    Ok(check)
-  }
-}
diff --git a/crates/api_common/src/websocket/handlers/captcha.rs b/crates/api_common/src/websocket/handlers/captcha.rs
new file mode 100644 (file)
index 0000000..e3ff609
--- /dev/null
@@ -0,0 +1,45 @@
+use crate::websocket::{chat_server::ChatServer, structs::CaptchaItem};
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::utils::naive_now;
+
+/// Adding a Captcha
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct AddCaptcha {
+  pub captcha: CaptchaItem,
+}
+
+impl Handler<AddCaptcha> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: AddCaptcha, _: &mut Context<Self>) -> Self::Result {
+    self.captchas.push(msg.captcha);
+  }
+}
+
+/// Checking a Captcha
+#[derive(Message)]
+#[rtype(bool)]
+pub struct CheckCaptcha {
+  pub uuid: String,
+  pub answer: String,
+}
+
+impl Handler<CheckCaptcha> for ChatServer {
+  type Result = bool;
+
+  fn handle(&mut self, msg: CheckCaptcha, _: &mut Context<Self>) -> Self::Result {
+    // Remove all the ones that are past the expire time
+    self.captchas.retain(|x| x.expires.gt(&naive_now()));
+
+    let check = self
+      .captchas
+      .iter()
+      .any(|r| r.uuid == msg.uuid && r.answer.to_lowercase() == msg.answer.to_lowercase());
+
+    // Remove this uuid so it can't be re-checked (Checks only work once)
+    self.captchas.retain(|x| x.uuid != msg.uuid);
+
+    check
+  }
+}
diff --git a/crates/api_common/src/websocket/handlers/connect.rs b/crates/api_common/src/websocket/handlers/connect.rs
new file mode 100644 (file)
index 0000000..f3e7e3b
--- /dev/null
@@ -0,0 +1,62 @@
+use crate::websocket::{
+  chat_server::ChatServer,
+  handlers::{SessionInfo, WsMessage},
+};
+use actix::{Context, Handler, Message, Recipient};
+use lemmy_utils::ConnectionId;
+use rand::Rng;
+
+/// New chat session is created
+#[derive(Message)]
+#[rtype(ConnectionId)]
+pub struct Connect {
+  pub addr: Recipient<WsMessage>,
+}
+
+/// Handler for Connect message.
+///
+/// Register new session and assign unique id to this session
+impl Handler<Connect> for ChatServer {
+  type Result = ConnectionId;
+
+  fn handle(&mut self, msg: Connect, _: &mut Context<Self>) -> Self::Result {
+    // register session with random id
+    let id = self.rng.gen::<usize>();
+    let session = SessionInfo { addr: msg.addr };
+    self.sessions.insert(id, session);
+
+    // send id back
+    id
+  }
+}
+
+/// Session is disconnected
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct Disconnect {
+  pub id: ConnectionId,
+}
+
+/// Handler for Disconnect message.
+impl Handler<Disconnect> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
+    // remove address
+    if self.sessions.remove(&msg.id).is_some() {
+      // remove session from all rooms
+      for sessions in self.user_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+      for sessions in self.post_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+      for sessions in self.community_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+      for sessions in self.mod_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+    }
+  }
+}
diff --git a/crates/api_common/src/websocket/handlers/join_rooms.rs b/crates/api_common/src/websocket/handlers/join_rooms.rs
new file mode 100644 (file)
index 0000000..10235e3
--- /dev/null
@@ -0,0 +1,120 @@
+use crate::websocket::chat_server::ChatServer;
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId};
+use lemmy_utils::ConnectionId;
+use std::collections::HashSet;
+
+/// Joining a Post room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinPostRoom {
+  pub post_id: PostId,
+  pub id: ConnectionId,
+}
+
+impl Handler<JoinPostRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinPostRoom, _: &mut Context<Self>) -> Self::Result {
+    // remove session from all rooms
+    for sessions in self.post_rooms.values_mut() {
+      sessions.remove(&msg.id);
+    }
+
+    // Also leave all communities
+    // This avoids double messages
+    // TODO found a bug, whereby community messages like
+    // delete and remove aren't sent, because
+    // you left the community room
+    for sessions in self.community_rooms.values_mut() {
+      sessions.remove(&msg.id);
+    }
+
+    self
+      .post_rooms
+      .entry(msg.post_id)
+      .or_insert_with(HashSet::new)
+      .insert(msg.id);
+  }
+}
+
+/// Joining a Community Room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinCommunityRoom {
+  pub community_id: CommunityId,
+  pub id: ConnectionId,
+}
+
+impl Handler<JoinCommunityRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context<Self>) -> Self::Result {
+    // remove session from all rooms
+    for sessions in self.community_rooms.values_mut() {
+      sessions.remove(&msg.id);
+    }
+
+    // Also leave all post rooms
+    // This avoids double messages
+    for sessions in self.post_rooms.values_mut() {
+      sessions.remove(&msg.id);
+    }
+
+    self
+      .community_rooms
+      .entry(msg.community_id)
+      .or_insert_with(HashSet::new)
+      .insert(msg.id);
+  }
+}
+
+/// Joining a Mod room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinModRoom {
+  pub community_id: CommunityId,
+  pub id: ConnectionId,
+}
+
+impl Handler<JoinModRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinModRoom, _: &mut Context<Self>) -> Self::Result {
+    // remove session from all rooms
+    for sessions in self.mod_rooms.values_mut() {
+      sessions.remove(&msg.id);
+    }
+
+    self
+      .mod_rooms
+      .entry(msg.community_id)
+      .or_insert_with(HashSet::new)
+      .insert(msg.id);
+  }
+}
+
+/// Joining a User room
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinUserRoom {
+  pub user_id: LocalUserId,
+  pub id: ConnectionId,
+}
+
+impl Handler<JoinUserRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinUserRoom, _: &mut Context<Self>) -> Self::Result {
+    // remove session from all rooms
+    for sessions in self.user_rooms.values_mut() {
+      sessions.remove(&msg.id);
+    }
+
+    self
+      .user_rooms
+      .entry(msg.user_id)
+      .or_insert_with(HashSet::new)
+      .insert(msg.id);
+  }
+}
diff --git a/crates/api_common/src/websocket/handlers/messages.rs b/crates/api_common/src/websocket/handlers/messages.rs
new file mode 100644 (file)
index 0000000..3b49302
--- /dev/null
@@ -0,0 +1,130 @@
+use crate::websocket::chat_server::ChatServer;
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::newtypes::{CommunityId, LocalUserId, PostId};
+use lemmy_utils::ConnectionId;
+use std::collections::HashSet;
+
+/// Sending a post room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendPostRoomMessage {
+  pub post_id: PostId,
+  pub message: String,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendPostRoomMessage> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendPostRoomMessage, _: &mut Context<Self>) -> Self::Result {
+    let room_connections = self.post_rooms.get(&msg.post_id);
+    if let Some(connections) = room_connections {
+      self.send_message(connections, &msg.message, msg.websocket_id);
+    }
+  }
+}
+
+/// Sending a community room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendCommunityRoomMessage {
+  pub community_id: CommunityId,
+  pub message: String,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendCommunityRoomMessage> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendCommunityRoomMessage, _: &mut Context<Self>) -> Self::Result {
+    let room_connections = self.community_rooms.get(&msg.community_id);
+    if let Some(connections) = room_connections {
+      self.send_message(connections, &msg.message, msg.websocket_id);
+    }
+  }
+}
+
+/// Sending a mod room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendModRoomMessage {
+  pub community_id: CommunityId,
+  pub message: String,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendModRoomMessage> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendModRoomMessage, _: &mut Context<Self>) -> Self::Result {
+    let room_connections = self.community_rooms.get(&msg.community_id);
+    if let Some(connections) = room_connections {
+      self.send_message(connections, &msg.message, msg.websocket_id);
+    }
+  }
+}
+
+/// Sending a user room message
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendUserRoomMessage {
+  pub recipient_id: LocalUserId,
+  pub message: String,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendUserRoomMessage> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendUserRoomMessage, _: &mut Context<Self>) -> Self::Result {
+    let room_connections = self.user_rooms.get(&msg.recipient_id);
+    if let Some(connections) = room_connections {
+      self.send_message(connections, &msg.message, msg.websocket_id);
+    }
+  }
+}
+
+/// Sending a message to every session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendAllMessage {
+  pub message: String,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+impl Handler<SendAllMessage> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendAllMessage, _: &mut Context<Self>) -> Self::Result {
+    let connections: HashSet<ConnectionId> = self.sessions.keys().cloned().collect();
+    self.send_message(&connections, &msg.message, msg.websocket_id);
+  }
+}
+
+///// Send websocket message in all sessions which joined a specific room.
+/////
+///// `message` - The json message body to send
+///// `room` - Connection IDs which should receive the message
+///// `exclude_connection` - Dont send to user who initiated the api call, as that
+/////                        would result in duplicate notification
+//async fn send_message_in_room(
+//  &self,
+//  message: &str,
+//  room: Option<HashSet<ConnectionId>>,
+//  exclude_connection: Option<ConnectionId>,
+//) -> Result<(), LemmyError> {
+//  let mut session = self.inner()?.sessions.clone();
+//  if let Some(room) = room {
+//    // Note, this will ignore any errors, such as closed connections
+//    join_all(
+//      room
+//        .into_iter()
+//        .filter(|c| Some(c) != exclude_connection.as_ref())
+//        .filter_map(|c| session.remove(&c))
+//        .map(|mut s: Session| async move { s.text(message).await }),
+//    )
+//    .await;
+//  }
+//  Ok(())
+//}
+//}
diff --git a/crates/api_common/src/websocket/handlers/mod.rs b/crates/api_common/src/websocket/handlers/mod.rs
new file mode 100644 (file)
index 0000000..d989a44
--- /dev/null
@@ -0,0 +1,18 @@
+use actix::{Message, Recipient};
+
+pub mod captcha;
+pub mod connect;
+pub mod join_rooms;
+pub mod messages;
+pub mod online_users;
+
+/// A string message sent to a websocket session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct WsMessage(pub String);
+
+// TODO move this?
+pub struct SessionInfo {
+  pub addr: Recipient<WsMessage>,
+  // pub ip: IpAddr
+}
diff --git a/crates/api_common/src/websocket/handlers/online_users.rs b/crates/api_common/src/websocket/handlers/online_users.rs
new file mode 100644 (file)
index 0000000..2833b68
--- /dev/null
@@ -0,0 +1,55 @@
+use crate::websocket::chat_server::ChatServer;
+use actix::{Context, Handler, Message};
+use lemmy_db_schema::newtypes::{CommunityId, PostId};
+
+/// Getting the number of online connections
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetUsersOnline;
+
+/// Handler for Disconnect message.
+impl Handler<GetUsersOnline> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context<Self>) -> Self::Result {
+    self.sessions.len()
+  }
+}
+
+/// Getting the number of post users online
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetPostUsersOnline {
+  pub post_id: PostId,
+}
+
+/// Handler for Disconnect message.
+impl Handler<GetPostUsersOnline> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context<Self>) -> Self::Result {
+    self
+      .post_rooms
+      .get(&msg.post_id)
+      .map_or(1, std::collections::HashSet::len)
+  }
+}
+
+/// Getting the number of post users online
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetCommunityUsersOnline {
+  pub community_id: CommunityId,
+}
+
+/// Handler for Disconnect message.
+impl Handler<GetCommunityUsersOnline> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context<Self>) -> Self::Result {
+    self
+      .community_rooms
+      .get(&msg.community_id)
+      .map_or(1, std::collections::HashSet::len)
+  }
+}
index 21c5338541aba35e76c0aa4d338924985285bddc..2726b7d4cb788f982900d2caf0c44185a52fbd3a 100644 (file)
@@ -1,3 +1,4 @@
+use actix::{Message, Recipient};
 use lemmy_utils::error::LemmyError;
 use serde::Serialize;
 
@@ -6,13 +7,22 @@ pub mod handlers;
 pub mod send;
 pub mod structs;
 
+/// A string message sent to a websocket session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct WsMessage(pub String);
+
+pub struct SessionInfo {
+  pub addr: Recipient<WsMessage>,
+}
+
 #[derive(Serialize)]
 struct WebsocketResponse<T> {
   op: String,
   data: T,
 }
 
-pub fn serialize_websocket_message<OP, Response>(
+pub fn serialize_websocket_message<Response, OP>(
   op: &OP,
   data: &Response,
 ) -> Result<String, LemmyError>
@@ -133,11 +143,3 @@ pub enum UserOperationApub {
   Search,
   ResolveObject,
 }
-
-pub trait OperationType {}
-
-impl OperationType for UserOperationCrud {}
-
-impl OperationType for UserOperation {}
-
-impl OperationType for UserOperationApub {}
index 0e8a78a209695e7822a886345dc8da0f798e91e4..e983b85c57babfa9704b6b4c12325133a0e2f895 100644 (file)
@@ -1,3 +1,13 @@
+use super::{
+  handlers::messages::{
+    SendAllMessage,
+    SendCommunityRoomMessage,
+    SendModRoomMessage,
+    SendPostRoomMessage,
+    SendUserRoomMessage,
+  },
+  serialize_websocket_message,
+};
 use crate::{
   comment::CommentResponse,
   community::CommunityResponse,
@@ -5,7 +15,6 @@ use crate::{
   post::PostResponse,
   private_message::PrivateMessageResponse,
   utils::{check_person_block, get_interface_language, send_email_to_user},
-  websocket::OperationType,
 };
 use lemmy_db_schema::{
   newtypes::{CommentId, CommunityId, LocalUserId, PersonId, PostId, PrivateMessageId},
@@ -23,253 +32,374 @@ use lemmy_db_schema::{
 use lemmy_db_views::structs::{CommentView, LocalUserView, PostView, PrivateMessageView};
 use lemmy_db_views_actor::structs::CommunityView;
 use lemmy_utils::{error::LemmyError, utils::mention::MentionData, ConnectionId};
+use serde::Serialize;
+
+impl LemmyContext {
+  #[tracing::instrument(skip_all)]
+  pub async fn send_post_ws_message<OP>(
+    &self,
+    op: &OP,
+    post_id: PostId,
+    websocket_id: Option<ConnectionId>,
+    person_id: Option<PersonId>,
+  ) -> Result<PostResponse, LemmyError>
+  where
+    OP: ToString,
+  {
+    let post_view = PostView::read(self.pool(), post_id, person_id, Some(true)).await?;
+
+    let res = PostResponse { post_view };
+
+    // Send it to the post room
+    // Don't send my data with it
+    let mut post_sent = res.clone();
+    post_sent.post_view.my_vote = None;
+    let message = serialize_websocket_message(op, &post_sent)?;
+
+    self.chat_server().do_send(SendPostRoomMessage {
+      post_id,
+      message: message.clone(),
+      websocket_id,
+    });
+
+    // Send it to /c/all and that community
+    self.chat_server().do_send(SendCommunityRoomMessage {
+      community_id: CommunityId(0),
+      message: message.clone(),
+      websocket_id,
+    });
+
+    self.chat_server().do_send(SendCommunityRoomMessage {
+      community_id: post_sent.post_view.community.id,
+      message,
+      websocket_id,
+    });
+
+    Ok(res)
+  }
 
-#[tracing::instrument(skip_all)]
-pub async fn send_post_ws_message<OP: ToString + Send + OperationType + 'static>(
-  post_id: PostId,
-  op: OP,
-  websocket_id: Option<ConnectionId>,
-  person_id: Option<PersonId>,
-  context: &LemmyContext,
-) -> Result<PostResponse, LemmyError> {
-  let post_view = PostView::read(context.pool(), post_id, person_id, Some(true)).await?;
-
-  let res = PostResponse { post_view };
-
-  context
-    .chat_server()
-    .send_post(&op, &res, websocket_id)
-    .await?;
-
-  Ok(res)
-}
-
-// TODO: in many call sites in apub crate, we are setting an empty vec for recipient_ids,
-//       we should get the actual recipient actors from somewhere
-#[tracing::instrument(skip_all)]
-pub async fn send_comment_ws_message_simple<OP: ToString + Send + OperationType + 'static>(
-  comment_id: CommentId,
-  op: OP,
-  context: &LemmyContext,
-) -> Result<CommentResponse, LemmyError> {
-  send_comment_ws_message(comment_id, op, None, None, None, vec![], context).await
-}
-
-#[tracing::instrument(skip_all)]
-pub async fn send_comment_ws_message<OP: ToString + Send + OperationType + 'static>(
-  comment_id: CommentId,
-  op: OP,
-  websocket_id: Option<ConnectionId>,
-  form_id: Option<String>,
-  person_id: Option<PersonId>,
-  recipient_ids: Vec<LocalUserId>,
-  context: &LemmyContext,
-) -> Result<CommentResponse, LemmyError> {
-  let view = CommentView::read(context.pool(), comment_id, person_id).await?;
-
-  let mut res = CommentResponse {
-    comment_view: view,
-    recipient_ids,
-    // The sent out form id should be null
-    form_id: None,
-  };
-
-  context
-    .chat_server()
-    .send_comment(&op, &res, websocket_id)
-    .await?;
-
-  // The recipient_ids should be empty for returns
-  res.recipient_ids = Vec::new();
-  res.form_id = form_id;
-
-  Ok(res)
-}
+  // TODO: in many call sites in apub crate, we are setting an empty vec for recipient_ids,
+  //       we should get the actual recipient actors from somewhere
+  #[tracing::instrument(skip_all)]
+  pub async fn send_comment_ws_message_simple<OP>(
+    &self,
+    op: &OP,
+    comment_id: CommentId,
+  ) -> Result<CommentResponse, LemmyError>
+  where
+    OP: ToString,
+  {
+    self
+      .send_comment_ws_message(op, comment_id, None, None, None, vec![])
+      .await
+  }
 
-#[tracing::instrument(skip_all)]
-pub async fn send_community_ws_message<OP: ToString + Send + OperationType + 'static>(
-  community_id: CommunityId,
-  op: OP,
-  websocket_id: Option<ConnectionId>,
-  person_id: Option<PersonId>,
-  context: &LemmyContext,
-) -> Result<CommunityResponse, LemmyError> {
-  let community_view =
-    CommunityView::read(context.pool(), community_id, person_id, Some(true)).await?;
-  let discussion_languages = CommunityLanguage::read(context.pool(), community_id).await?;
-
-  let mut res = CommunityResponse {
-    community_view,
-    discussion_languages,
-  };
-
-  // Strip out the person id and subscribed when sending to others
-  res.community_view.subscribed = SubscribedType::NotSubscribed;
-
-  context
-    .chat_server()
-    .send_community_room_message(&op, &res, res.community_view.community.id, websocket_id)
-    .await?;
-
-  Ok(res)
-}
+  #[tracing::instrument(skip_all)]
+  pub async fn send_comment_ws_message<OP>(
+    &self,
+    op: &OP,
+    comment_id: CommentId,
+    websocket_id: Option<ConnectionId>,
+    form_id: Option<String>,
+    person_id: Option<PersonId>,
+    recipient_ids: Vec<LocalUserId>,
+  ) -> Result<CommentResponse, LemmyError>
+  where
+    OP: ToString,
+  {
+    let view = CommentView::read(self.pool(), comment_id, person_id).await?;
+
+    let mut res = CommentResponse {
+      comment_view: view,
+      recipient_ids,
+      form_id,
+    };
+
+    // Strip out my specific user info
+    let mut sent_recipient_comment = res.clone();
+    sent_recipient_comment.form_id = None;
+    sent_recipient_comment.comment_view.my_vote = None;
+    let recipient_message = serialize_websocket_message(op, &sent_recipient_comment)?;
+
+    // Send it to the recipient(s) including the mentioned users
+    for recipient_id in &sent_recipient_comment.recipient_ids {
+      self.chat_server().do_send(SendUserRoomMessage {
+        recipient_id: *recipient_id,
+        message: recipient_message.clone(),
+        websocket_id,
+      });
+    }
 
-#[tracing::instrument(skip_all)]
-pub async fn send_pm_ws_message<OP: ToString + Send + OperationType + 'static>(
-  private_message_id: PrivateMessageId,
-  op: OP,
-  websocket_id: Option<ConnectionId>,
-  context: &LemmyContext,
-) -> Result<PrivateMessageResponse, LemmyError> {
-  let view = PrivateMessageView::read(context.pool(), private_message_id).await?;
-
-  let res = PrivateMessageResponse {
-    private_message_view: view,
-  };
-
-  // Send notifications to the local recipient, if one exists
-  if res.private_message_view.recipient.local {
-    let recipient_id = res.private_message_view.recipient.id;
-    let local_recipient = LocalUserView::read_person(context.pool(), recipient_id).await?;
-
-    context
-      .chat_server()
-      .send_user_room_message(&op, &res, local_recipient.local_user.id, websocket_id)
-      .await?;
+    // Remove the recipients here to separate mentions / user messages from post or community comments
+    let mut sent_post_comment = sent_recipient_comment;
+    sent_post_comment.recipient_ids = Vec::new();
+    let post_message = serialize_websocket_message(op, &sent_post_comment)?;
+
+    // Send it to the post room
+    self.chat_server().do_send(SendPostRoomMessage {
+      post_id: sent_post_comment.comment_view.post.id,
+      message: post_message.clone(),
+      websocket_id,
+    });
+
+    // Send it to the community too
+    self.chat_server().do_send(SendCommunityRoomMessage {
+      community_id: sent_post_comment.comment_view.community.id,
+      message: post_message,
+      websocket_id,
+    });
+    // TODO should I send it to all? Seems excessive
+    //  self
+    //    .send_community_room_message(
+    //      user_operation,
+    //      &comment_post_sent,
+    //      CommunityId(0),
+    //      websocket_id,
+    //    )
+    //    .await?;
+
+    // No need to return recipients
+    res.recipient_ids = Vec::new();
+
+    Ok(res)
   }
 
-  Ok(res)
-}
-
-#[tracing::instrument(skip_all)]
-pub async fn send_local_notifs(
-  mentions: Vec<MentionData>,
-  comment: &Comment,
-  person: &Person,
-  post: &Post,
-  do_send_email: bool,
-  context: &LemmyContext,
-) -> Result<Vec<LocalUserId>, LemmyError> {
-  let mut recipient_ids = Vec::new();
-  let inbox_link = format!("{}/inbox", context.settings().get_protocol_and_hostname());
-
-  // Send the local mentions
-  for mention in mentions
-    .iter()
-    .filter(|m| m.is_local(&context.settings().hostname) && m.name.ne(&person.name))
-    .collect::<Vec<&MentionData>>()
+  #[tracing::instrument(skip_all)]
+  pub async fn send_community_ws_message<OP>(
+    &self,
+    op: &OP,
+    community_id: CommunityId,
+    websocket_id: Option<ConnectionId>,
+    person_id: Option<PersonId>,
+  ) -> Result<CommunityResponse, LemmyError>
+  where
+    OP: ToString,
   {
-    let mention_name = mention.name.clone();
-    let user_view = LocalUserView::read_from_name(context.pool(), &mention_name).await;
-    if let Ok(mention_user_view) = user_view {
-      // TODO
-      // At some point, make it so you can't tag the parent creator either
-      // This can cause two notifications, one for reply and the other for mention
-      recipient_ids.push(mention_user_view.local_user.id);
-
-      let user_mention_form = PersonMentionInsertForm {
-        recipient_id: mention_user_view.person.id,
-        comment_id: comment.id,
-        read: None,
-      };
-
-      // Allow this to fail softly, since comment edits might re-update or replace it
-      // Let the uniqueness handle this fail
-      PersonMention::create(context.pool(), &user_mention_form)
-        .await
-        .ok();
-
-      // Send an email to those local users that have notifications on
-      if do_send_email {
-        let lang = get_interface_language(&mention_user_view);
-        send_email_to_user(
-          &mention_user_view,
-          &lang.notification_mentioned_by_subject(&person.name),
-          &lang.notification_mentioned_by_body(&comment.content, &inbox_link, &person.name),
-          context.settings(),
-        )
-      }
-    }
+    let community_view =
+      CommunityView::read(self.pool(), community_id, person_id, Some(true)).await?;
+    let discussion_languages = CommunityLanguage::read(self.pool(), community_id).await?;
+
+    let mut res = CommunityResponse {
+      community_view,
+      discussion_languages,
+    };
+
+    // Strip out the person id and subscribed when sending to others
+    res.community_view.subscribed = SubscribedType::NotSubscribed;
+    let message = serialize_websocket_message(op, &res)?;
+
+    self.chat_server().do_send(SendCommunityRoomMessage {
+      community_id: res.community_view.community.id,
+      message,
+      websocket_id,
+    });
+
+    Ok(res)
   }
 
-  // Send comment_reply to the parent commenter / poster
-  if let Some(parent_comment_id) = comment.parent_comment_id() {
-    let parent_comment = Comment::read(context.pool(), parent_comment_id).await?;
+  #[tracing::instrument(skip_all)]
+  pub async fn send_pm_ws_message<OP>(
+    &self,
+    op: &OP,
+    private_message_id: PrivateMessageId,
+    websocket_id: Option<ConnectionId>,
+  ) -> Result<PrivateMessageResponse, LemmyError>
+  where
+    OP: ToString,
+  {
+    let view = PrivateMessageView::read(self.pool(), private_message_id).await?;
 
-    // Get the parent commenter local_user
-    let parent_creator_id = parent_comment.creator_id;
+    let res = PrivateMessageResponse {
+      private_message_view: view,
+    };
 
-    // Only add to recipients if that person isn't blocked
-    let creator_blocked = check_person_block(person.id, parent_creator_id, context.pool())
-      .await
-      .is_err();
+    // Send notifications to the local recipient, if one exists
+    if res.private_message_view.recipient.local {
+      let recipient_id = res.private_message_view.recipient.id;
+      let local_recipient = LocalUserView::read_person(self.pool(), recipient_id).await?;
 
-    // Don't send a notif to yourself
-    if parent_comment.creator_id != person.id && !creator_blocked {
-      let user_view = LocalUserView::read_person(context.pool(), parent_creator_id).await;
-      if let Ok(parent_user_view) = user_view {
-        recipient_ids.push(parent_user_view.local_user.id);
+      let message = serialize_websocket_message(op, &res)?;
 
-        let comment_reply_form = CommentReplyInsertForm {
-          recipient_id: parent_user_view.person.id,
+      self.chat_server().do_send(SendUserRoomMessage {
+        recipient_id: local_recipient.local_user.id,
+        message,
+        websocket_id,
+      });
+    }
+
+    Ok(res)
+  }
+
+  #[tracing::instrument(skip_all)]
+  pub async fn send_local_notifs(
+    &self,
+    mentions: Vec<MentionData>,
+    comment: &Comment,
+    person: &Person,
+    post: &Post,
+    do_send_email: bool,
+  ) -> Result<Vec<LocalUserId>, LemmyError> {
+    let mut recipient_ids = Vec::new();
+    let inbox_link = format!("{}/inbox", self.settings().get_protocol_and_hostname());
+
+    // Send the local mentions
+    for mention in mentions
+      .iter()
+      .filter(|m| m.is_local(&self.settings().hostname) && m.name.ne(&person.name))
+      .collect::<Vec<&MentionData>>()
+    {
+      let mention_name = mention.name.clone();
+      let user_view = LocalUserView::read_from_name(self.pool(), &mention_name).await;
+      if let Ok(mention_user_view) = user_view {
+        // TODO
+        // At some point, make it so you can't tag the parent creator either
+        // This can cause two notifications, one for reply and the other for mention
+        recipient_ids.push(mention_user_view.local_user.id);
+
+        let user_mention_form = PersonMentionInsertForm {
+          recipient_id: mention_user_view.person.id,
           comment_id: comment.id,
           read: None,
         };
 
         // Allow this to fail softly, since comment edits might re-update or replace it
         // Let the uniqueness handle this fail
-        CommentReply::create(context.pool(), &comment_reply_form)
+        PersonMention::create(self.pool(), &user_mention_form)
           .await
           .ok();
 
+        // Send an email to those local users that have notifications on
         if do_send_email {
-          let lang = get_interface_language(&parent_user_view);
+          let lang = get_interface_language(&mention_user_view);
           send_email_to_user(
-            &parent_user_view,
-            &lang.notification_comment_reply_subject(&person.name),
-            &lang.notification_comment_reply_body(&comment.content, &inbox_link, &person.name),
-            context.settings(),
+            &mention_user_view,
+            &lang.notification_mentioned_by_subject(&person.name),
+            &lang.notification_mentioned_by_body(&comment.content, &inbox_link, &person.name),
+            self.settings(),
           )
         }
       }
     }
-  } else {
-    // If there's no parent, its the post creator
-    // Only add to recipients if that person isn't blocked
-    let creator_blocked = check_person_block(person.id, post.creator_id, context.pool())
-      .await
-      .is_err();
 
-    if post.creator_id != person.id && !creator_blocked {
-      let creator_id = post.creator_id;
-      let parent_user = LocalUserView::read_person(context.pool(), creator_id).await;
-      if let Ok(parent_user_view) = parent_user {
-        recipient_ids.push(parent_user_view.local_user.id);
+    // Send comment_reply to the parent commenter / poster
+    if let Some(parent_comment_id) = comment.parent_comment_id() {
+      let parent_comment = Comment::read(self.pool(), parent_comment_id).await?;
 
-        let comment_reply_form = CommentReplyInsertForm {
-          recipient_id: parent_user_view.person.id,
-          comment_id: comment.id,
-          read: None,
-        };
+      // Get the parent commenter local_user
+      let parent_creator_id = parent_comment.creator_id;
 
-        // Allow this to fail softly, since comment edits might re-update or replace it
-        // Let the uniqueness handle this fail
-        CommentReply::create(context.pool(), &comment_reply_form)
-          .await
-          .ok();
-
-        if do_send_email {
-          let lang = get_interface_language(&parent_user_view);
-          send_email_to_user(
-            &parent_user_view,
-            &lang.notification_post_reply_subject(&person.name),
-            &lang.notification_post_reply_body(&comment.content, &inbox_link, &person.name),
-            context.settings(),
-          )
+      // Only add to recipients if that person isn't blocked
+      let creator_blocked = check_person_block(person.id, parent_creator_id, self.pool())
+        .await
+        .is_err();
+
+      // Don't send a notif to yourself
+      if parent_comment.creator_id != person.id && !creator_blocked {
+        let user_view = LocalUserView::read_person(self.pool(), parent_creator_id).await;
+        if let Ok(parent_user_view) = user_view {
+          recipient_ids.push(parent_user_view.local_user.id);
+
+          let comment_reply_form = CommentReplyInsertForm {
+            recipient_id: parent_user_view.person.id,
+            comment_id: comment.id,
+            read: None,
+          };
+
+          // Allow this to fail softly, since comment edits might re-update or replace it
+          // Let the uniqueness handle this fail
+          CommentReply::create(self.pool(), &comment_reply_form)
+            .await
+            .ok();
+
+          if do_send_email {
+            let lang = get_interface_language(&parent_user_view);
+            send_email_to_user(
+              &parent_user_view,
+              &lang.notification_comment_reply_subject(&person.name),
+              &lang.notification_comment_reply_body(&comment.content, &inbox_link, &person.name),
+              self.settings(),
+            )
+          }
+        }
+      }
+    } else {
+      // If there's no parent, its the post creator
+      // Only add to recipients if that person isn't blocked
+      let creator_blocked = check_person_block(person.id, post.creator_id, self.pool())
+        .await
+        .is_err();
+
+      if post.creator_id != person.id && !creator_blocked {
+        let creator_id = post.creator_id;
+        let parent_user = LocalUserView::read_person(self.pool(), creator_id).await;
+        if let Ok(parent_user_view) = parent_user {
+          recipient_ids.push(parent_user_view.local_user.id);
+
+          let comment_reply_form = CommentReplyInsertForm {
+            recipient_id: parent_user_view.person.id,
+            comment_id: comment.id,
+            read: None,
+          };
+
+          // Allow this to fail softly, since comment edits might re-update or replace it
+          // Let the uniqueness handle this fail
+          CommentReply::create(self.pool(), &comment_reply_form)
+            .await
+            .ok();
+
+          if do_send_email {
+            let lang = get_interface_language(&parent_user_view);
+            send_email_to_user(
+              &parent_user_view,
+              &lang.notification_post_reply_subject(&person.name),
+              &lang.notification_post_reply_body(&comment.content, &inbox_link, &person.name),
+              self.settings(),
+            )
+          }
         }
       }
     }
+
+    Ok(recipient_ids)
+  }
+
+  #[tracing::instrument(skip_all)]
+  pub fn send_all_ws_message<Data, OP>(
+    &self,
+    op: &OP,
+    data: Data,
+    websocket_id: Option<ConnectionId>,
+  ) -> Result<(), LemmyError>
+  where
+    Data: Serialize,
+    OP: ToString,
+  {
+    let message = serialize_websocket_message(op, &data)?;
+    self.chat_server().do_send(SendAllMessage {
+      message,
+      websocket_id,
+    });
+    Ok(())
   }
 
-  Ok(recipient_ids)
+  #[tracing::instrument(skip_all)]
+  pub fn send_mod_ws_message<Data, OP>(
+    &self,
+    op: &OP,
+    data: Data,
+    community_id: CommunityId,
+    websocket_id: Option<ConnectionId>,
+  ) -> Result<(), LemmyError>
+  where
+    Data: Serialize,
+    OP: ToString,
+  {
+    let message = serialize_websocket_message(op, &data)?;
+    self.chat_server().do_send(SendModRoomMessage {
+      community_id,
+      message,
+      websocket_id,
+    });
+    Ok(())
+  }
 }
index d9f4f07cbc58d7fd97ad74a3eb157e19a18cd562..70997378dc80d3d90d4022ec26638fdf36a408f4 100644 (file)
@@ -13,10 +13,7 @@ use lemmy_api_common::{
     local_site_to_slur_regex,
     EndpointType,
   },
-  websocket::{
-    send::{send_comment_ws_message, send_local_notifs},
-    UserOperationCrud,
-  },
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -131,15 +128,15 @@ impl PerformCrud for CreateComment {
     // Scan the comment for user mentions, add those rows
     let post_id = post.id;
     let mentions = scrape_text_for_mentions(&content_slurs_removed);
-    let recipient_ids = send_local_notifs(
-      mentions,
-      &updated_comment,
-      &local_user_view.person,
-      &post,
-      true,
-      context,
-    )
-    .await?;
+    let recipient_ids = context
+      .send_local_notifs(
+        mentions,
+        &updated_comment,
+        &local_user_view.person,
+        &post,
+        true,
+      )
+      .await?;
 
     // You like your own comment by default
     let like_form = CommentLikeForm {
@@ -182,15 +179,15 @@ impl PerformCrud for CreateComment {
       }
     }
 
-    send_comment_ws_message(
-      inserted_comment.id,
-      UserOperationCrud::CreateComment,
-      websocket_id,
-      data.form_id.clone(),
-      Some(local_user_view.person.id),
-      recipient_ids,
-      context,
-    )
-    .await
+    context
+      .send_comment_ws_message(
+        &UserOperationCrud::CreateComment,
+        inserted_comment.id,
+        websocket_id,
+        data.form_id.clone(),
+        Some(local_user_view.person.id),
+        recipient_ids,
+      )
+      .await
   }
 }
index 211d04776747bdf06b8090f61d2260560686bee8..a96b4cc7ab69134a3a2f20015de287a37fac4ca4 100644 (file)
@@ -4,10 +4,7 @@ use lemmy_api_common::{
   comment::{CommentResponse, DeleteComment},
   context::LemmyContext,
   utils::{check_community_ban, get_local_user_view_from_jwt},
-  websocket::{
-    send::{send_comment_ws_message, send_local_notifs},
-    UserOperationCrud,
-  },
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -65,26 +62,26 @@ impl PerformCrud for DeleteComment {
 
     let post_id = updated_comment.post_id;
     let post = Post::read(context.pool(), post_id).await?;
-    let recipient_ids = send_local_notifs(
-      vec![],
-      &updated_comment,
-      &local_user_view.person,
-      &post,
-      false,
-      context,
-    )
-    .await?;
+    let recipient_ids = context
+      .send_local_notifs(
+        vec![],
+        &updated_comment,
+        &local_user_view.person,
+        &post,
+        false,
+      )
+      .await?;
 
-    let res = send_comment_ws_message(
-      data.comment_id,
-      UserOperationCrud::DeleteComment,
-      websocket_id,
-      None, // TODO a comment delete might clear forms?
-      Some(local_user_view.person.id),
-      recipient_ids,
-      context,
-    )
-    .await?;
+    let res = context
+      .send_comment_ws_message(
+        &UserOperationCrud::DeleteComment,
+        data.comment_id,
+        websocket_id,
+        None,
+        Some(local_user_view.person.id),
+        recipient_ids,
+      )
+      .await?;
 
     Ok(res)
   }
index 1032f54c9f985614bd8961ff82184e6219a4ba7b..add01aa30a67e740b29cf7d371c50f4febbeaedc 100644 (file)
@@ -4,10 +4,7 @@ use lemmy_api_common::{
   comment::{CommentResponse, RemoveComment},
   context::LemmyContext,
   utils::{check_community_ban, get_local_user_view_from_jwt, is_mod_or_admin},
-  websocket::{
-    send::{send_comment_ws_message, send_local_notifs},
-    UserOperationCrud,
-  },
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -73,26 +70,26 @@ impl PerformCrud for RemoveComment {
 
     let post_id = updated_comment.post_id;
     let post = Post::read(context.pool(), post_id).await?;
-    let recipient_ids = send_local_notifs(
-      vec![],
-      &updated_comment,
-      &local_user_view.person.clone(),
-      &post,
-      false,
-      context,
-    )
-    .await?;
+    let recipient_ids = context
+      .send_local_notifs(
+        vec![],
+        &updated_comment,
+        &local_user_view.person.clone(),
+        &post,
+        false,
+      )
+      .await?;
 
-    let res = send_comment_ws_message(
-      data.comment_id,
-      UserOperationCrud::RemoveComment,
-      websocket_id,
-      None, // TODO maybe this might clear other forms
-      Some(local_user_view.person.id),
-      recipient_ids,
-      context,
-    )
-    .await?;
+    let res = context
+      .send_comment_ws_message(
+        &UserOperationCrud::RemoveComment,
+        data.comment_id,
+        websocket_id,
+        None, // TODO maybe this might clear other forms
+        Some(local_user_view.person.id),
+        recipient_ids,
+      )
+      .await?;
 
     Ok(res)
   }
index 06536330f30146898f6212cc67f59631e8e52ff1..1d5f1c5a2745b6492039fc6cbdaece1785797350 100644 (file)
@@ -4,10 +4,7 @@ use lemmy_api_common::{
   comment::{CommentResponse, EditComment},
   context::LemmyContext,
   utils::{check_community_ban, get_local_user_view_from_jwt, local_site_to_slur_regex},
-  websocket::{
-    send::{send_comment_ws_message, send_local_notifs},
-    UserOperationCrud,
-  },
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -81,25 +78,25 @@ impl PerformCrud for EditComment {
     // Do the mentions / recipients
     let updated_comment_content = updated_comment.content.clone();
     let mentions = scrape_text_for_mentions(&updated_comment_content);
-    let recipient_ids = send_local_notifs(
-      mentions,
-      &updated_comment,
-      &local_user_view.person,
-      &orig_comment.post,
-      false,
-      context,
-    )
-    .await?;
+    let recipient_ids = context
+      .send_local_notifs(
+        mentions,
+        &updated_comment,
+        &local_user_view.person,
+        &orig_comment.post,
+        false,
+      )
+      .await?;
 
-    send_comment_ws_message(
-      data.comment_id,
-      UserOperationCrud::EditComment,
-      websocket_id,
-      data.form_id.clone(),
-      None,
-      recipient_ids,
-      context,
-    )
-    .await
+    context
+      .send_comment_ws_message(
+        &UserOperationCrud::EditComment,
+        data.comment_id,
+        websocket_id,
+        data.form_id.clone(),
+        None,
+        recipient_ids,
+      )
+      .await
   }
 }
index 4720d1103bbbe2d28996bc0c9cd6e89f5be0b7ae..7ce685d8ba871e4d06b91b827ce319e127267f44 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   community::{CommunityResponse, DeleteCommunity},
   context::LemmyContext,
   utils::{get_local_user_view_from_jwt, is_top_mod},
-  websocket::{send::send_community_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::community::{Community, CommunityUpdateForm},
@@ -48,14 +48,14 @@ impl PerformCrud for DeleteCommunity {
     .await
     .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_community"))?;
 
-    let res = send_community_ws_message(
-      data.community_id,
-      UserOperationCrud::DeleteCommunity,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await?;
+    let res = context
+      .send_community_ws_message(
+        &UserOperationCrud::DeleteCommunity,
+        data.community_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await?;
 
     Ok(res)
   }
index 80e23bf71568b7e3c08a4af60197a25c6f9225dd..f791476227b758beef5d57a68a3a7e381a34d43b 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   community::{CommunityResponse, RemoveCommunity},
   context::LemmyContext,
   utils::{get_local_user_view_from_jwt, is_admin},
-  websocket::{send::send_community_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -56,14 +56,14 @@ impl PerformCrud for RemoveCommunity {
     };
     ModRemoveCommunity::create(context.pool(), &form).await?;
 
-    let res = send_community_ws_message(
-      data.community_id,
-      UserOperationCrud::RemoveCommunity,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await?;
+    let res = context
+      .send_community_ws_message(
+        &UserOperationCrud::RemoveCommunity,
+        data.community_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await?;
 
     Ok(res)
   }
index e3d1acfa345692da0b8387f7be2af9dae5a448d4..9628615bbb56d22792d725715ba4d7da77e961cb 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   community::{CommunityResponse, EditCommunity},
   context::LemmyContext,
   utils::{get_local_user_view_from_jwt, local_site_to_slur_regex},
-  websocket::{send::send_community_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   newtypes::PersonId,
@@ -78,7 +78,13 @@ impl PerformCrud for EditCommunity {
       .await
       .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_community"))?;
 
-    let op = UserOperationCrud::EditCommunity;
-    send_community_ws_message(data.community_id, op, websocket_id, None, context).await
+    context
+      .send_community_ws_message(
+        &UserOperationCrud::EditCommunity,
+        data.community_id,
+        websocket_id,
+        None,
+      )
+      .await
   }
 }
index 65559250648d6bebdc17b1b1c61308c33eed514e..2ceade21f6303d3a1df258fbeb794d210b96fea2 100644 (file)
@@ -14,7 +14,7 @@ use lemmy_api_common::{
     mark_post_as_read,
     EndpointType,
   },
-  websocket::{send::send_post_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   impls::actor_language::default_post_language,
@@ -173,13 +173,13 @@ impl PerformCrud for CreatePost {
       }
     }
 
-    send_post_ws_message(
-      inserted_post.id,
-      UserOperationCrud::CreatePost,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await
+    context
+      .send_post_ws_message(
+        &UserOperationCrud::CreatePost,
+        inserted_post.id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await
   }
 }
index 57e5adb15f81b6b054231b7275f70114b688bf86..fae5b0096213654078919b35fd685561a1fbf6ae 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   post::{DeletePost, PostResponse},
   utils::{check_community_ban, check_community_deleted_or_removed, get_local_user_view_from_jwt},
-  websocket::{send::send_post_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::post::{Post, PostUpdateForm},
@@ -57,14 +57,14 @@ impl PerformCrud for DeletePost {
     )
     .await?;
 
-    let res = send_post_ws_message(
-      data.post_id,
-      UserOperationCrud::DeletePost,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await?;
+    let res = context
+      .send_post_ws_message(
+        &UserOperationCrud::DeletePost,
+        data.post_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await?;
 
     Ok(res)
   }
index a6612cbb44cd4e15c9523c7e4c075d5222b9b082..0dfb67c411970873592b3142ca7f4628bb1f4ccf 100644 (file)
@@ -9,6 +9,7 @@ use lemmy_api_common::{
     is_mod_or_admin_opt,
     mark_post_as_read,
   },
+  websocket::handlers::online_users::GetPostUsersOnline,
 };
 use lemmy_db_schema::{
   aggregates::structs::{PersonPostAggregates, PersonPostAggregatesForm},
@@ -95,7 +96,10 @@ impl PerformCrud for GetPost {
 
     let moderators = CommunityModeratorView::for_community(context.pool(), community_id).await?;
 
-    let online = context.chat_server().get_post_users_online(post_id)?;
+    let online = context
+      .chat_server()
+      .send(GetPostUsersOnline { post_id })
+      .await?;
 
     // Return the jwt
     Ok(GetPostResponse {
index 66af2ca959f25474d0ab0707edb1720d3b712ffa..b53a468e95eb5306f8e5d198fa34d5bbcda4d807 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   post::{PostResponse, RemovePost},
   utils::{check_community_ban, get_local_user_view_from_jwt, is_mod_or_admin},
-  websocket::{send::send_post_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -66,14 +66,14 @@ impl PerformCrud for RemovePost {
     };
     ModRemovePost::create(context.pool(), &form).await?;
 
-    let res = send_post_ws_message(
-      data.post_id,
-      UserOperationCrud::RemovePost,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await?;
+    let res = context
+      .send_post_ws_message(
+        &UserOperationCrud::RemovePost,
+        data.post_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await?;
 
     Ok(res)
   }
index 4e43770de1c2201153959bf1d0f8b2390bbf0aba..27cfabe5b8f4b56a5b903fe7f27bb0773de241f6 100644 (file)
@@ -5,7 +5,7 @@ use lemmy_api_common::{
   post::{EditPost, PostResponse},
   request::fetch_site_data,
   utils::{check_community_ban, get_local_user_view_from_jwt, local_site_to_slur_regex},
-  websocket::{send::send_post_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -113,13 +113,13 @@ impl PerformCrud for EditPost {
       return Err(LemmyError::from_error_message(e, err_type));
     }
 
-    send_post_ws_message(
-      data.post_id,
-      UserOperationCrud::EditPost,
-      websocket_id,
-      Some(local_user_view.person.id),
-      context,
-    )
-    .await
+    context
+      .send_post_ws_message(
+        &UserOperationCrud::EditPost,
+        data.post_id,
+        websocket_id,
+        Some(local_user_view.person.id),
+      )
+      .await
   }
 }
index 91556c9d0b63e3b6d2043aa049a720446e407d6b..28c243f6549f6d135129c490a6e986be21f5ddca 100644 (file)
@@ -12,7 +12,7 @@ use lemmy_api_common::{
     send_email_to_user,
     EndpointType,
   },
-  websocket::{send::send_pm_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -80,13 +80,13 @@ impl PerformCrud for CreatePrivateMessage {
     .await
     .map_err(|e| LemmyError::from_error_message(e, "couldnt_create_private_message"))?;
 
-    let res = send_pm_ws_message(
-      inserted_private_message.id,
-      UserOperationCrud::CreatePrivateMessage,
-      websocket_id,
-      context,
-    )
-    .await?;
+    let res = context
+      .send_pm_ws_message(
+        &UserOperationCrud::CreatePrivateMessage,
+        inserted_private_message.id,
+        websocket_id,
+      )
+      .await?;
 
     // Send email to the local recipient, if one exists
     if res.private_message_view.recipient.local {
index 4f58065a3eda550dbac04b67715867d0def4d31b..e6a643a7fae80ac84726afc1b64cbf5e416d4395 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   private_message::{DeletePrivateMessage, PrivateMessageResponse},
   utils::get_local_user_view_from_jwt,
-  websocket::{send::send_pm_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::private_message::{PrivateMessage, PrivateMessageUpdateForm},
@@ -46,7 +46,12 @@ impl PerformCrud for DeletePrivateMessage {
     .await
     .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?;
 
-    let op = UserOperationCrud::DeletePrivateMessage;
-    send_pm_ws_message(data.private_message_id, op, websocket_id, context).await
+    context
+      .send_pm_ws_message(
+        &UserOperationCrud::DeletePrivateMessage,
+        data.private_message_id,
+        websocket_id,
+      )
+      .await
   }
 }
index 19da0cf5527a3b59420040b0a9a2a2c2d87005f1..06bd349f4cbbabcb277bcbc2cedfc34c463335c3 100644 (file)
@@ -4,7 +4,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   private_message::{EditPrivateMessage, PrivateMessageResponse},
   utils::{get_local_user_view_from_jwt, local_site_to_slur_regex},
-  websocket::{send::send_pm_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -52,7 +52,12 @@ impl PerformCrud for EditPrivateMessage {
     .await
     .map_err(|e| LemmyError::from_error_message(e, "couldnt_update_private_message"))?;
 
-    let op = UserOperationCrud::EditPrivateMessage;
-    send_pm_ws_message(data.private_message_id, op, websocket_id, context).await
+    context
+      .send_pm_ws_message(
+        &UserOperationCrud::EditPrivateMessage,
+        data.private_message_id,
+        websocket_id,
+      )
+      .await
   }
 }
index 0687fdf6653cca4cce0965d6459fa06e10b3e62f..2330d0f739dbec1fd1616372fbb7193393f77ab0 100644 (file)
@@ -4,6 +4,7 @@ use lemmy_api_common::{
   context::LemmyContext,
   site::{GetSite, GetSiteResponse, MyUserInfo},
   utils::{build_federated_instances, get_local_user_settings_view_from_jwt_opt},
+  websocket::handlers::online_users::GetUsersOnline,
 };
 use lemmy_db_schema::source::{
   actor_language::{LocalUserLanguage, SiteLanguage},
@@ -36,7 +37,7 @@ impl PerformCrud for GetSite {
 
     let admins = PersonView::admins(context.pool()).await?;
 
-    let online = context.chat_server().get_users_online()?;
+    let online = context.chat_server().send(GetUsersOnline).await?;
 
     // Build the local user
     let my_user = if let Some(local_user_view) = get_local_user_settings_view_from_jwt_opt(
index 36a09628023f957011f587d88ad915eca8c0f69f..3922fdab7a4f0b9d55545a2a21a779306537174c 100644 (file)
@@ -192,10 +192,7 @@ impl PerformCrud for EditSite {
 
     let res = SiteResponse { site_view };
 
-    context
-      .chat_server()
-      .send_all_message(UserOperationCrud::EditSite, &res, websocket_id)
-      .await?;
+    context.send_all_ws_message(&UserOperationCrud::EditSite, &res, websocket_id)?;
 
     Ok(res)
   }
index bf48b5f57b3a973f703a6df948ed16dff5f93462..0554da0557a9ccbf142451eb5f1f82e26151612b 100644 (file)
@@ -15,6 +15,7 @@ use lemmy_api_common::{
     send_verification_email,
     EndpointType,
   },
+  websocket::handlers::captcha::CheckCaptcha,
 };
 use lemmy_db_schema::{
   aggregates::structs::PersonAggregates,
@@ -78,10 +79,13 @@ impl PerformCrud for Register {
 
     // If the site is set up, check the captcha
     if local_site.site_setup && local_site.captcha_enabled {
-      let check = context.chat_server().check_captcha(
-        data.captcha_uuid.clone().unwrap_or_default(),
-        data.captcha_answer.clone().unwrap_or_default(),
-      )?;
+      let check = context
+        .chat_server()
+        .send(CheckCaptcha {
+          uuid: data.captcha_uuid.clone().unwrap_or_default(),
+          answer: data.captcha_answer.clone().unwrap_or_default(),
+        })
+        .await?;
       if !check {
         return Err(LemmyError::from_message("captcha_incorrect"));
       }
index e50915fa43db19652a7d7b97b3985cd6ab8360a9..8d586059f6b63396ff3af44ca4f8e7ff78ba2f86 100644 (file)
@@ -26,6 +26,7 @@ serde_json = { workspace = true }
 serde = { workspace = true }
 actix-web = { workspace = true }
 actix-rt = { workspace = true }
+actix = { workspace = true }
 tracing = { workspace = true }
 strum_macros = { workspace = true }
 url = { workspace = true }
index a5fab8a01a9d5c4081dbc593057b4a8a57a186b7..9830be13fb9f3be4751549512d8bffbb6d2c621b 100644 (file)
@@ -143,15 +143,12 @@ impl ActivityHandler for Report {
 
         let post_report_view = PostReportView::read(context.pool(), report.id, actor.id).await?;
 
-        context
-          .chat_server()
-          .send_mod_room_message(
-            UserOperation::CreateCommentReport,
-            &PostReportResponse { post_report_view },
-            post.community_id,
-            None,
-          )
-          .await?;
+        context.send_mod_ws_message(
+          &UserOperation::CreateCommentReport,
+          &PostReportResponse { post_report_view },
+          post.community_id,
+          None,
+        )?;
       }
       PostOrComment::Comment(comment) => {
         let report_form = CommentReportForm {
@@ -167,17 +164,14 @@ impl ActivityHandler for Report {
           CommentReportView::read(context.pool(), report.id, actor.id).await?;
         let community_id = comment_report_view.community.id;
 
-        context
-          .chat_server()
-          .send_mod_room_message(
-            UserOperation::CreateCommentReport,
-            &CommentReportResponse {
-              comment_report_view,
-            },
-            community_id,
-            None,
-          )
-          .await?;
+        context.send_mod_ws_message(
+          &UserOperation::CreateCommentReport,
+          &CommentReportResponse {
+            comment_report_view,
+          },
+          community_id,
+          None,
+        )?;
       }
     };
     Ok(())
index f0eaa09c6230ebbad94a129adbead246168bd580..85581e2a168ba092aa543d199892168a93467f24 100644 (file)
@@ -21,7 +21,7 @@ use lemmy_api_common::{
   community::{CommunityResponse, EditCommunity, HideCommunity},
   context::LemmyContext,
   utils::get_local_user_view_from_jwt,
-  websocket::{send::send_community_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{source::community::Community, traits::Crud};
 use lemmy_utils::error::LemmyError;
@@ -102,14 +102,14 @@ impl ActivityHandler for UpdateCommunity {
     let updated_community =
       Community::update(context.pool(), community.id, &community_update_form).await?;
 
-    send_community_ws_message(
-      updated_community.id,
-      UserOperationCrud::EditCommunity,
-      None,
-      None,
-      context,
-    )
-    .await?;
+    context
+      .send_community_ws_message(
+        &UserOperationCrud::EditCommunity,
+        updated_community.id,
+        None,
+        None,
+      )
+      .await?;
     Ok(())
   }
 }
index 55d5e3f0201754d4d525f3b1e29c972589a9b444..d07a8e663738310dedf534d9c1978451e7183627 100644 (file)
@@ -28,7 +28,7 @@ use lemmy_api_common::{
   comment::{CommentResponse, CreateComment, EditComment},
   context::LemmyContext,
   utils::{check_post_deleted_or_removed, is_mod_or_admin},
-  websocket::{send::send_comment_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   newtypes::PersonId,
@@ -199,10 +199,9 @@ impl ActivityHandler for CreateOrUpdateNote {
       CreateOrUpdateType::Create => UserOperationCrud::CreateComment,
       CreateOrUpdateType::Update => UserOperationCrud::EditComment,
     };
-    send_comment_ws_message(
-      comment.id, notif_type, None, None, None, recipients, context,
-    )
-    .await?;
+    context
+      .send_comment_ws_message(&notif_type, comment.id, None, None, None, recipients)
+      .await?;
     Ok(())
   }
 }
index b1fcfb6ee112f48937d35ed6693d223dac2d11dc..9e4fa51b233b476774e3a91e6953dabd2049da55 100644 (file)
@@ -1,6 +1,6 @@
 use crate::objects::person::ApubPerson;
 use activitypub_federation::{config::Data, fetch::object_id::ObjectId};
-use lemmy_api_common::{context::LemmyContext, websocket::send::send_local_notifs};
+use lemmy_api_common::context::LemmyContext;
 use lemmy_db_schema::{
   newtypes::LocalUserId,
   source::{comment::Comment, post::Post},
@@ -29,5 +29,7 @@ async fn get_comment_notif_recipients(
   // anyway.
   // TODO: for compatibility with other projects, it would be much better to read this from cc or tags
   let mentions = scrape_text_for_mentions(&comment.content);
-  send_local_notifs(mentions, comment, &actor, &post, do_send_email, context).await
+  context
+    .send_local_notifs(mentions, comment, &actor, &post, do_send_email)
+    .await
 }
index 169db5eabef174136033a23efc7c86d6f04f59c1..771e3c3c4580f0c30ec3a404f1c00d3e0606506e 100644 (file)
@@ -25,7 +25,7 @@ use activitypub_federation::{
 use lemmy_api_common::{
   context::LemmyContext,
   post::{CreatePost, EditPost, PostResponse},
-  websocket::{send::send_post_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   newtypes::PersonId,
@@ -196,7 +196,9 @@ impl ActivityHandler for CreateOrUpdatePage {
       CreateOrUpdateType::Create => UserOperationCrud::CreatePost,
       CreateOrUpdateType::Update => UserOperationCrud::EditPost,
     };
-    send_post_ws_message(post.id, notif_type, None, None, context).await?;
+    context
+      .send_post_ws_message(&notif_type, post.id, None, None)
+      .await?;
     Ok(())
   }
 }
index cacfc1a8dece1d76d8735d0c377ee800a66177ed..85051344f657e0c6b10767a1397bf1ce40ad7257 100644 (file)
@@ -16,7 +16,7 @@ use activitypub_federation::{
 use lemmy_api_common::{
   context::LemmyContext,
   private_message::{CreatePrivateMessage, EditPrivateMessage, PrivateMessageResponse},
-  websocket::{send::send_pm_ws_message, UserOperationCrud},
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   newtypes::PersonId,
@@ -124,7 +124,9 @@ impl ActivityHandler for CreateOrUpdateChatMessage {
       CreateOrUpdateType::Create => UserOperationCrud::CreatePrivateMessage,
       CreateOrUpdateType::Update => UserOperationCrud::EditPrivateMessage,
     };
-    send_pm_ws_message(private_message.id, notif_type, None, context).await?;
+    context
+      .send_pm_ws_message(&notif_type, private_message.id, None)
+      .await?;
 
     Ok(())
   }
index 337ba9e8afa271c37388ec1bdd63bacbc96e194c..1bb3b6c417df43f9a299e904c6cf247c007cb9e1 100644 (file)
@@ -8,13 +8,7 @@ use crate::{
   protocol::{activities::deletion::delete::Delete, IdOrNestedObject},
 };
 use activitypub_federation::{config::Data, kinds::activity::DeleteType, traits::ActivityHandler};
-use lemmy_api_common::{
-  context::LemmyContext,
-  websocket::{
-    send::{send_comment_ws_message_simple, send_community_ws_message, send_post_ws_message},
-    UserOperationCrud,
-  },
-};
+use lemmy_api_common::{context::LemmyContext, websocket::UserOperationCrud};
 use lemmy_db_schema::{
   source::{
     comment::{Comment, CommentUpdateForm},
@@ -134,7 +128,9 @@ pub(in crate::activities) async fn receive_remove_action(
       )
       .await?;
 
-      send_community_ws_message(deleted_community.id, RemoveCommunity, None, None, context).await?;
+      context
+        .send_community_ws_message(&RemoveCommunity, deleted_community.id, None, None)
+        .await?;
     }
     DeletableObjects::Post(post) => {
       let form = ModRemovePostForm {
@@ -151,7 +147,9 @@ pub(in crate::activities) async fn receive_remove_action(
       )
       .await?;
 
-      send_post_ws_message(removed_post.id, RemovePost, None, None, context).await?;
+      context
+        .send_post_ws_message(&RemovePost, removed_post.id, None, None)
+        .await?;
     }
     DeletableObjects::Comment(comment) => {
       let form = ModRemoveCommentForm {
@@ -168,7 +166,9 @@ pub(in crate::activities) async fn receive_remove_action(
       )
       .await?;
 
-      send_comment_ws_message_simple(removed_comment.id, RemoveComment, context).await?;
+      context
+        .send_comment_ws_message_simple(&RemoveComment, removed_comment.id)
+        .await?;
     }
     DeletableObjects::PrivateMessage(_) => unimplemented!(),
   }
index 7f901d596ea183479830b1455133655936787e66..690b1d3a36422c22aeb9468722de68f6a44cb66e 100644 (file)
@@ -35,15 +35,7 @@ use lemmy_api_common::{
   post::{DeletePost, PostResponse, RemovePost},
   private_message::{DeletePrivateMessage, PrivateMessageResponse},
   utils::get_local_user_view_from_jwt,
-  websocket::{
-    send::{
-      send_comment_ws_message_simple,
-      send_community_ws_message,
-      send_pm_ws_message,
-      send_post_ws_message,
-    },
-    UserOperationCrud,
-  },
+  websocket::UserOperationCrud,
 };
 use lemmy_db_schema::{
   source::{
@@ -410,14 +402,14 @@ async fn receive_delete_action(
           .build(),
       )
       .await?;
-      send_community_ws_message(
-        community.id,
-        UserOperationCrud::DeleteCommunity,
-        None,
-        None,
-        context,
-      )
-      .await?;
+      context
+        .send_community_ws_message(
+          &UserOperationCrud::DeleteCommunity,
+          community.id,
+          None,
+          None,
+        )
+        .await?;
     }
     DeletableObjects::Post(post) => {
       if deleted != post.deleted {
@@ -427,14 +419,9 @@ async fn receive_delete_action(
           &PostUpdateForm::builder().deleted(Some(deleted)).build(),
         )
         .await?;
-        send_post_ws_message(
-          deleted_post.id,
-          UserOperationCrud::DeletePost,
-          None,
-          None,
-          context,
-        )
-        .await?;
+        context
+          .send_post_ws_message(&UserOperationCrud::DeletePost, deleted_post.id, None, None)
+          .await?;
       }
     }
     DeletableObjects::Comment(comment) => {
@@ -445,12 +432,9 @@ async fn receive_delete_action(
           &CommentUpdateForm::builder().deleted(Some(deleted)).build(),
         )
         .await?;
-        send_comment_ws_message_simple(
-          deleted_comment.id,
-          UserOperationCrud::DeleteComment,
-          context,
-        )
-        .await?;
+        context
+          .send_comment_ws_message_simple(&UserOperationCrud::DeleteComment, deleted_comment.id)
+          .await?;
       }
     }
     DeletableObjects::PrivateMessage(pm) => {
@@ -463,13 +447,13 @@ async fn receive_delete_action(
       )
       .await?;
 
-      send_pm_ws_message(
-        deleted_private_message.id,
-        UserOperationCrud::DeletePrivateMessage,
-        None,
-        context,
-      )
-      .await?;
+      context
+        .send_pm_ws_message(
+          &UserOperationCrud::DeletePrivateMessage,
+          deleted_private_message.id,
+          None,
+        )
+        .await?;
     }
   }
   Ok(())
index d14018382021a28e5585e843aecd7da4caaf4b70..13a32cabcc66082115a35feb62781b550f88fe6b 100644 (file)
@@ -8,13 +8,7 @@ use crate::{
   protocol::activities::deletion::{delete::Delete, undo_delete::UndoDelete},
 };
 use activitypub_federation::{config::Data, kinds::activity::UndoType, traits::ActivityHandler};
-use lemmy_api_common::{
-  context::LemmyContext,
-  websocket::{
-    send::{send_comment_ws_message_simple, send_community_ws_message, send_post_ws_message},
-    UserOperationCrud,
-  },
-};
+use lemmy_api_common::{context::LemmyContext, websocket::UserOperationCrud};
 use lemmy_db_schema::{
   source::{
     comment::{Comment, CommentUpdateForm},
@@ -125,7 +119,9 @@ impl UndoDelete {
           &CommunityUpdateForm::builder().removed(Some(false)).build(),
         )
         .await?;
-        send_community_ws_message(deleted_community.id, EditCommunity, None, None, context).await?;
+        context
+          .send_community_ws_message(&EditCommunity, deleted_community.id, None, None)
+          .await?;
       }
       DeletableObjects::Post(post) => {
         let form = ModRemovePostForm {
@@ -141,7 +137,9 @@ impl UndoDelete {
           &PostUpdateForm::builder().removed(Some(false)).build(),
         )
         .await?;
-        send_post_ws_message(removed_post.id, EditPost, None, None, context).await?;
+        context
+          .send_post_ws_message(&EditPost, removed_post.id, None, None)
+          .await?;
       }
       DeletableObjects::Comment(comment) => {
         let form = ModRemoveCommentForm {
@@ -157,7 +155,9 @@ impl UndoDelete {
           &CommentUpdateForm::builder().removed(Some(false)).build(),
         )
         .await?;
-        send_comment_ws_message_simple(removed_comment.id, EditComment, context).await?;
+        context
+          .send_comment_ws_message_simple(&EditComment, removed_comment.id)
+          .await?;
       }
       DeletableObjects::PrivateMessage(_) => unimplemented!(),
     }
index 250fbf5454df88a0afdd0d58b4433b9a6d3e94c4..c57ec0b53054b315837ba0d98419691d7ffa5da1 100644 (file)
@@ -12,7 +12,11 @@ use activitypub_federation::{
 use lemmy_api_common::{
   community::CommunityResponse,
   context::LemmyContext,
-  websocket::UserOperation,
+  websocket::{
+    handlers::messages::SendUserRoomMessage,
+    serialize_websocket_message,
+    UserOperation,
+  },
 };
 use lemmy_db_schema::{
   source::{actor_language::CommunityLanguage, community::CommunityFollower},
@@ -85,20 +89,18 @@ impl ActivityHandler for AcceptFollow {
       .id;
     let discussion_languages = CommunityLanguage::read(context.pool(), community_id).await?;
 
-    let response = CommunityResponse {
+    let res = CommunityResponse {
       community_view,
       discussion_languages,
     };
 
-    context
-      .chat_server()
-      .send_user_room_message(
-        &UserOperation::FollowCommunity,
-        &response,
-        local_recipient_id,
-        None,
-      )
-      .await?;
+    let message = serialize_websocket_message(&UserOperation::FollowCommunity, &res)?;
+
+    context.chat_server().do_send(SendUserRoomMessage {
+      recipient_id: local_recipient_id,
+      message,
+      websocket_id: None,
+    });
 
     Ok(())
   }
index 5ab31da5551c6d057c488ef4ad2b29f133fb1714..89832148ff3f0e45b961cb7161e50f88433199e9 100644 (file)
@@ -16,10 +16,7 @@ use lemmy_api_common::{
   post::{CreatePostLike, PostResponse},
   sensitive::Sensitive,
   utils::get_local_user_view_from_jwt,
-  websocket::{
-    send::{send_comment_ws_message_simple, send_post_ws_message},
-    UserOperation,
-  },
+  websocket::UserOperation,
 };
 use lemmy_db_schema::{
   newtypes::CommunityId,
@@ -125,7 +122,9 @@ async fn vote_comment(
   CommentLike::remove(context.pool(), person_id, comment_id).await?;
   CommentLike::like(context.pool(), &like_form).await?;
 
-  send_comment_ws_message_simple(comment_id, UserOperation::CreateCommentLike, context).await?;
+  context
+    .send_comment_ws_message_simple(&UserOperation::CreateCommentLike, comment_id)
+    .await?;
   Ok(())
 }
 
@@ -146,7 +145,9 @@ async fn vote_post(
   PostLike::remove(context.pool(), person_id, post_id).await?;
   PostLike::like(context.pool(), &like_form).await?;
 
-  send_post_ws_message(post.id, UserOperation::CreatePostLike, None, None, context).await?;
+  context
+    .send_post_ws_message(&UserOperation::CreatePostLike, post.id, None, None)
+    .await?;
   Ok(())
 }
 
@@ -160,7 +161,9 @@ async fn undo_vote_comment(
   let person_id = actor.id;
   CommentLike::remove(context.pool(), person_id, comment_id).await?;
 
-  send_comment_ws_message_simple(comment_id, UserOperation::CreateCommentLike, context).await?;
+  context
+    .send_comment_ws_message_simple(&UserOperation::CreateCommentLike, comment_id)
+    .await?;
   Ok(())
 }
 
@@ -174,6 +177,8 @@ async fn undo_vote_post(
   let person_id = actor.id;
   PostLike::remove(context.pool(), person_id, post_id).await?;
 
-  send_post_ws_message(post_id, UserOperation::CreatePostLike, None, None, context).await?;
+  context
+    .send_post_ws_message(&UserOperation::CreatePostLike, post_id, None, None)
+    .await?;
   Ok(())
 }
index 5547fcff0d6ec117a81294b5587bbc8b5e70d7c8..3dc56a940f46e36636d63734c438be3ad412a682 100644 (file)
@@ -8,6 +8,7 @@ use lemmy_api_common::{
   community::{GetCommunity, GetCommunityResponse},
   context::LemmyContext,
   utils::{check_private_instance, get_local_user_view_from_jwt_opt, is_mod_or_admin_opt},
+  websocket::handlers::online_users::GetCommunityUsersOnline,
 };
 use lemmy_db_schema::{
   impls::actor_language::default_post_language,
@@ -76,7 +77,8 @@ impl PerformApub for GetCommunity {
 
     let online = context
       .chat_server()
-      .get_community_users_online(community_id)?;
+      .send(GetCommunityUsersOnline { community_id })
+      .await?;
 
     let site_id =
       Site::instance_actor_id_from_url(community_view.community.actor_id.clone().into());
index 6d2ff291cb66ae84c8d86fbc36fb6e489d114958..ff4182dd21aaa215d1b0c013c193c69d525b2dc9 100644 (file)
@@ -55,6 +55,7 @@ pub(crate) fn verify_is_remote_object(id: &Url, settings: &Settings) -> Result<(
 #[cfg(test)]
 pub(crate) mod tests {
   use activitypub_federation::config::{Data, FederationConfig};
+  use actix::Actor;
   use anyhow::anyhow;
   use lemmy_api_common::{
     context::LemmyContext,
@@ -69,7 +70,6 @@ pub(crate) mod tests {
   };
   use reqwest::{Client, Request, Response};
   use reqwest_middleware::{ClientBuilder, Middleware, Next};
-  use std::sync::Arc;
   use task_local_extensions::Extensions;
 
   struct BlockedMiddleware;
@@ -110,7 +110,7 @@ pub(crate) mod tests {
     let rate_limit_config = RateLimitConfig::builder().build();
     let rate_limit_cell = RateLimitCell::new(rate_limit_config).await;
 
-    let chat_server = Arc::new(ChatServer::startup());
+    let chat_server = ChatServer::default().start();
     let context = LemmyContext::create(pool, chat_server, client, secret, rate_limit_cell.clone());
     let config = FederationConfig::builder()
       .domain("example.com")
index 3ab1efc7007d918fa20df5c6d3b686d321c9269e..b3ce85bdbfd9711edcdaaa1a7dd53bfbc168a479 100644 (file)
@@ -1,8 +1,18 @@
 use activitypub_federation::config::Data as ContextData;
+use actix::{
+  fut,
+  Actor,
+  ActorContext,
+  ActorFutureExt,
+  AsyncContext,
+  ContextFutureSpawner,
+  Handler,
+  Running,
+  StreamHandler,
+  WrapFuture,
+};
 use actix_web::{web, Error, HttpRequest, HttpResponse};
 use actix_web_actors::ws;
-use actix_ws::{MessageStream, Session};
-use futures::stream::StreamExt;
 use lemmy_api::Perform;
 use lemmy_api_common::{
   comment::{
@@ -101,6 +111,10 @@ use lemmy_api_common::{
     Search,
   },
   websocket::{
+    handlers::{
+      connect::{Connect, Disconnect},
+      WsMessage,
+    },
     serialize_websocket_message,
     structs::{CommunityJoin, ModJoin, PostJoin, UserJoin},
     UserOperation,
@@ -117,21 +131,36 @@ use std::{
   ops::Deref,
   result,
   str::FromStr,
-  sync::{Arc, Mutex},
   time::{Duration, Instant},
 };
-use tracing::{debug, error, info};
+use tracing::{debug, error};
+
+/// How often heartbeat pings are sent
+const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(25);
+
+/// How long before lack of client response causes a timeout
+const CLIENT_TIMEOUT: Duration = Duration::from_secs(60);
+
+pub struct WsChatSession {
+  /// unique session id
+  pub id: ConnectionId,
+
+  pub ip: IpAddr,
+
+  /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
+  /// otherwise we drop connection.
+  pub hb: Instant,
+
+  /// The context data
+  apub_data: ContextData<LemmyContext>,
+}
 
-/// Entry point for our route
 pub async fn websocket(
   req: HttpRequest,
   body: web::Payload,
-  context: web::Data<LemmyContext>,
   rate_limiter: web::Data<RateLimitCell>,
   apub_data: ContextData<LemmyContext>,
 ) -> Result<HttpResponse, Error> {
-  let (response, session, stream) = actix_ws::handle(&req, body)?;
-
   let client_ip = IpAddr(
     req
       .connection_info()
@@ -146,111 +175,158 @@ pub async fn websocket(
       "Websocket join with IP: {} has been rate limited.",
       &client_ip
     );
-    session.close(None).await.map_err(LemmyError::from)?;
-    return Ok(response);
+    return Ok(HttpResponse::TooManyRequests().finish());
   }
 
-  let connection_id = context.chat_server().handle_connect(session.clone())?;
-  info!("{} joined", &client_ip);
+  ws::start(
+    WsChatSession {
+      id: 0,
+      ip: client_ip,
+      hb: Instant::now(),
+      apub_data,
+    },
+    &req,
+    body,
+  )
+}
+
+/// helper method that sends ping to client every few seconds (HEARTBEAT_INTERVAL).
+///
+/// also this method checks heartbeats from client
+fn hb(ctx: &mut ws::WebsocketContext<WsChatSession>) {
+  ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
+    // check client heartbeats
+    if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
+      // heartbeat timed out
 
-  let alive = Arc::new(Mutex::new(Instant::now()));
-  heartbeat(session.clone(), alive.clone());
+      // notify chat server
+      act
+        .apub_data
+        .chat_server()
+        .do_send(Disconnect { id: act.id });
 
-  actix_rt::spawn(handle_messages(
-    stream,
-    client_ip,
-    session,
-    connection_id,
-    alive,
-    rate_limiter,
-    apub_data,
-  ));
+      // stop actor
+      ctx.stop();
 
-  Ok(response)
+      // don't try to send a ping
+      return;
+    }
+
+    ctx.ping(b"");
+  });
 }
 
-async fn handle_messages(
-  mut stream: MessageStream,
-  client_ip: IpAddr,
-  mut session: Session,
-  connection_id: ConnectionId,
-  alive: Arc<Mutex<Instant>>,
-  rate_limiter: web::Data<RateLimitCell>,
-  context: ContextData<LemmyContext>,
-) -> Result<(), LemmyError> {
-  while let Some(Ok(msg)) = stream.next().await {
-    match msg {
-      ws::Message::Ping(bytes) => {
-        if session.pong(&bytes).await.is_err() {
-          break;
+impl Actor for WsChatSession {
+  type Context = ws::WebsocketContext<Self>;
+
+  /// Method is called on actor start.
+  /// We register ws session with ChatServer
+  fn started(&mut self, ctx: &mut Self::Context) {
+    // we'll start heartbeat process on session start.
+    hb(ctx);
+
+    // register self in chat server. `AsyncContext::wait` register
+    // future within context, but context waits until this future resolves
+    // before processing any other events.
+    // HttpContext::state() is instance of WsChatSessionState, state is shared
+    // across all routes within application
+    let addr = ctx.address();
+    self
+      .apub_data
+      .chat_server()
+      .send(Connect {
+        addr: addr.recipient(),
+      })
+      .into_actor(self)
+      .then(|res, act, ctx| {
+        match res {
+          Ok(res) => act.id = res,
+          // something is wrong with chat server
+          _ => ctx.stop(),
         }
+        fut::ready(())
+      })
+      .wait(ctx);
+  }
+  fn stopping(&mut self, _: &mut Self::Context) -> Running {
+    // notify chat server
+    self
+      .apub_data
+      .chat_server()
+      .do_send(Disconnect { id: self.id });
+    Running::Stop
+  }
+}
+
+/// Handle messages from chat server, we simply send it to peer websocket
+impl Handler<WsMessage> for WsChatSession {
+  type Result = ();
+
+  fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
+    ctx.text(msg.0);
+  }
+}
+
+/// WebSocket message handler
+impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsChatSession {
+  fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
+    let msg = match msg {
+      Err(_) => {
+        ctx.stop();
+        return;
+      }
+      Ok(msg) => msg,
+    };
+
+    match msg {
+      ws::Message::Ping(msg) => {
+        self.hb = Instant::now();
+        ctx.pong(&msg);
       }
       ws::Message::Pong(_) => {
-        let mut lock = alive
-          .lock()
-          .expect("Failed to acquire websocket heartbeat alive lock");
-        *lock = Instant::now();
+        self.hb = Instant::now();
       }
       ws::Message::Text(text) => {
-        let msg = text.trim().to_string();
-        let executed = parse_json_message(
-          msg,
-          client_ip.clone(),
-          connection_id,
-          rate_limiter.get_ref(),
-          context.reset_request_count(),
-        )
-        .await;
-
-        let res = executed.unwrap_or_else(|e| {
-          error!("Error during message handling {}", e);
-          e.to_json()
-            .unwrap_or_else(|_| String::from(r#"{"error":"failed to serialize json"}"#))
+        let ip_clone = self.ip.clone();
+        let id_clone = self.id.to_owned();
+        let context_clone = self.apub_data.reset_request_count();
+
+        let fut = Box::pin(async move {
+          let msg = text.trim().to_string();
+          parse_json_message(msg, ip_clone, id_clone, context_clone).await
         });
-        session.text(res).await?;
-      }
-      ws::Message::Close(_) => {
-        session.close(None).await?;
-        context.chat_server().handle_disconnect(&connection_id)?;
-        break;
+        fut
+          .into_actor(self)
+          .then(|res, _, ctx| {
+            match res {
+              Ok(res) => ctx.text(res),
+              Err(e) => error!("{}", &e),
+            }
+            actix::fut::ready(())
+          })
+          .spawn(ctx);
       }
-      ws::Message::Binary(_) => info!("Unexpected binary"),
-      _ => {}
-    }
-  }
-  Ok(())
-}
-
-fn heartbeat(mut session: Session, alive: Arc<Mutex<Instant>>) {
-  actix_rt::spawn(async move {
-    let mut interval = actix_rt::time::interval(Duration::from_secs(5));
-    loop {
-      if session.ping(b"").await.is_err() {
-        break;
+      ws::Message::Binary(_) => println!("Unexpected binary"),
+      ws::Message::Close(reason) => {
+        ctx.close(reason);
+        ctx.stop();
       }
-
-      let duration_since = {
-        let alive_lock = alive
-          .lock()
-          .expect("Failed to acquire websocket heartbeat alive lock");
-        Instant::now().duration_since(*alive_lock)
-      };
-      if duration_since > Duration::from_secs(10) {
-        let _ = session.close(None).await;
-        break;
+      ws::Message::Continuation(_) => {
+        ctx.stop();
       }
-      interval.tick().await;
+      ws::Message::Nop => (),
     }
-  });
+  }
 }
 
+/// Entry point for our websocket route
 async fn parse_json_message(
   msg: String,
   ip: IpAddr,
   connection_id: ConnectionId,
-  rate_limiter: &RateLimitCell,
   context: ContextData<LemmyContext>,
 ) -> Result<String, LemmyError> {
+  let rate_limiter = context.settings_updated_channel();
   let json: Value = serde_json::from_str(&msg)?;
   let data = json
     .get("data")
index 152e3ded3a96eb8653f65ee50d89bdbbe7820eb6..124cecc590443b11188cdb5ec44af4131be40ced 100644 (file)
@@ -8,6 +8,7 @@ pub mod telemetry;
 
 use crate::{code_migrations::run_advanced_migrations, root_span_builder::QuieterRootSpanBuilder};
 use activitypub_federation::config::{FederationConfig, FederationMiddleware};
+use actix::Actor;
 use actix_web::{middleware, web::Data, App, HttpServer, Result};
 use doku::json::{AutoComments, CommentsStyle, Formatting, ObjectsStyle};
 use lemmy_api_common::{
@@ -35,7 +36,7 @@ use reqwest::Client;
 use reqwest_middleware::ClientBuilder;
 use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
 use reqwest_tracing::TracingMiddleware;
-use std::{env, sync::Arc, thread, time::Duration};
+use std::{env, thread, time::Duration};
 use tracing::subscriber::set_global_default;
 use tracing_actix_web::TracingLogger;
 use tracing_error::ErrorLayer;
@@ -133,7 +134,7 @@ pub async fn start_lemmy_server() -> Result<(), LemmyError> {
     scheduled_tasks::setup(db_url, user_agent).expect("Couldn't set up scheduled_tasks");
   });
 
-  let chat_server = Arc::new(ChatServer::startup());
+  let chat_server = ChatServer::default().start();
 
   // Create Http server with websocket support
   let settings_bind = settings.clone();