]> Untitled Git - lemmy.git/blobdiff - lemmy_websocket/src/chat_server.rs
Move websocket code into workspace (#107)
[lemmy.git] / lemmy_websocket / src / chat_server.rs
similarity index 58%
rename from src/websocket/chat_server.rs
rename to lemmy_websocket/src/chat_server.rs
index 611f44f9715ba6caa12e6b92e4c6d218787b8d7a..8346a32f60450ef8f727fe9e8bb9e65972dd2555 100644 (file)
@@ -1,7 +1,4 @@
-use crate::{
-  websocket::handlers::{do_user_operation, to_json_string, Args},
-  LemmyContext,
-};
+use crate::{messages::*, serialize_websocket_message, LemmyContext, UserOperation};
 use actix::prelude::*;
 use anyhow::Context as acontext;
 use background_jobs::QueueHandle;
@@ -10,7 +7,7 @@ use diesel::{
   PgConnection,
 };
 use lemmy_rate_limit::RateLimit;
-use lemmy_structs::{comment::*, community::*, post::*, site::*, user::*, websocket::*};
+use lemmy_structs::{comment::*, post::*};
 use lemmy_utils::{
   location_info,
   APIError,
@@ -29,6 +26,14 @@ use std::{
   collections::{HashMap, HashSet},
   str::FromStr,
 };
+use tokio::macros::support::Pin;
+
+type MessageHandlerType = fn(
+  context: LemmyContext,
+  id: ConnectionId,
+  op: UserOperation,
+  data: &str,
+) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
 
 /// `ChatServer` manages chat rooms and responsible for coordinating chat
 /// session.
@@ -57,6 +62,8 @@ pub struct ChatServer {
   /// A list of the current captchas
   pub(super) captchas: Vec<CaptchaItem>,
 
+  message_handler: MessageHandlerType,
+
   /// An HTTP Client
   client: Client,
 
@@ -75,6 +82,7 @@ impl ChatServer {
   pub fn startup(
     pool: Pool<ConnectionManager<PgConnection>>,
     rate_limiter: RateLimit,
+    message_handler: MessageHandlerType,
     client: Client,
     activity_queue: QueueHandle,
   ) -> ChatServer {
@@ -87,6 +95,7 @@ impl ChatServer {
       pool,
       rate_limiter,
       captchas: Vec::new(),
+      message_handler,
       client,
       activity_queue,
     }
@@ -180,7 +189,7 @@ impl ChatServer {
   where
     Response: Serialize,
   {
-    let res_str = &to_json_string(op, response)?;
+    let res_str = &serialize_websocket_message(op, response)?;
     if let Some(sessions) = self.post_rooms.get(&post_id) {
       for id in sessions {
         if let Some(my_id) = websocket_id {
@@ -204,7 +213,7 @@ impl ChatServer {
   where
     Response: Serialize,
   {
-    let res_str = &to_json_string(op, response)?;
+    let res_str = &serialize_websocket_message(op, response)?;
     if let Some(sessions) = self.community_rooms.get(&community_id) {
       for id in sessions {
         if let Some(my_id) = websocket_id {
@@ -227,7 +236,7 @@ impl ChatServer {
   where
     Response: Serialize,
   {
-    let res_str = &to_json_string(op, response)?;
+    let res_str = &serialize_websocket_message(op, response)?;
     for id in self.sessions.keys() {
       if let Some(my_id) = websocket_id {
         if *id == my_id {
@@ -249,7 +258,7 @@ impl ChatServer {
   where
     Response: Serialize,
   {
-    let res_str = &to_json_string(op, response)?;
+    let res_str = &serialize_websocket_message(op, response)?;
     if let Some(sessions) = self.user_rooms.get(&recipient_id) {
       for id in sessions {
         if let Some(my_id) = websocket_id {
@@ -340,8 +349,6 @@ impl ChatServer {
     msg: StandardMessage,
     ctx: &mut Context<Self>,
   ) -> impl Future<Output = Result<String, LemmyError>> {
-    let addr = ctx.address();
-    let pool = self.pool.clone();
     let rate_limiter = self.rate_limiter.clone();
 
     let ip: IPAddr = match self.sessions.get(&msg.id) {
@@ -349,110 +356,27 @@ impl ChatServer {
       None => "blank_ip".to_string(),
     };
 
-    let client = self.client.clone();
-    let activity_queue = self.activity_queue.clone();
+    let context = LemmyContext {
+      pool: self.pool.clone(),
+      chat_server: ctx.address(),
+      client: self.client.to_owned(),
+      activity_queue: self.activity_queue.to_owned(),
+    };
+    let message_handler = self.message_handler;
     async move {
-      let msg = msg;
       let json: Value = serde_json::from_str(&msg.msg)?;
       let data = &json["data"].to_string();
       let op = &json["op"].as_str().ok_or(APIError {
         message: "Unknown op type".to_string(),
       })?;
 
-      let user_operation: UserOperation = UserOperation::from_str(&op)?;
-
-      let context = LemmyContext::new(pool, addr, client, activity_queue);
-      let args = Args {
-        context,
-        rate_limiter,
-        id: msg.id,
-        ip,
-        op: user_operation.clone(),
-        data,
-      };
-
+      let user_operation = UserOperation::from_str(&op)?;
+      let fut = (message_handler)(context, msg.id, user_operation.clone(), data);
       match user_operation {
-        // User ops
-        UserOperation::Login => do_user_operation::<Login>(args).await,
-        UserOperation::Register => do_user_operation::<Register>(args).await,
-        UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await,
-        UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
-        UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
-        UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
-        UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
-        UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
-        UserOperation::MarkUserMentionAsRead => {
-          do_user_operation::<MarkUserMentionAsRead>(args).await
-        }
-        UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
-        UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
-        UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
-        UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
-        UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
-        UserOperation::PostJoin => do_user_operation::<PostJoin>(args).await,
-        UserOperation::CommunityJoin => do_user_operation::<CommunityJoin>(args).await,
-        UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
-
-        // Private Message ops
-        UserOperation::CreatePrivateMessage => {
-          do_user_operation::<CreatePrivateMessage>(args).await
-        }
-        UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
-        UserOperation::DeletePrivateMessage => {
-          do_user_operation::<DeletePrivateMessage>(args).await
-        }
-        UserOperation::MarkPrivateMessageAsRead => {
-          do_user_operation::<MarkPrivateMessageAsRead>(args).await
-        }
-        UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
-
-        // Site ops
-        UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
-        UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
-        UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
-        UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
-        UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
-        UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
-        UserOperation::Search => do_user_operation::<Search>(args).await,
-        UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
-        UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
-        UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
-
-        // Community ops
-        UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
-        UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
-        UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
-        UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
-        UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
-        UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
-        UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
-        UserOperation::GetFollowedCommunities => {
-          do_user_operation::<GetFollowedCommunities>(args).await
-        }
-        UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
-        UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
-
-        // Post ops
-        UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
-        UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
-        UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
-        UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
-        UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
-        UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
-        UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
-        UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
-        UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
-        UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
-
-        // Comment ops
-        UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
-        UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
-        UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
-        UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
-        UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
-        UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
-        UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
-        UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,
+        UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
+        UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
+        UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
+        _ => rate_limiter.message().wrap(ip, fut).await,
       }
     }
   }