From b15c406924f40e6c3a508f70963ab5cc04033b07 Mon Sep 17 00:00:00 2001 From: Felix Ableitner Date: Mon, 31 Aug 2020 17:20:13 +0200 Subject: [PATCH] Refactor websocket to split it into multiple files --- server/src/api/comment.rs | 2 +- server/src/api/community.rs | 2 +- server/src/api/post.rs | 2 +- server/src/api/site.rs | 2 +- server/src/api/user.rs | 2 +- server/src/apub/inbox/activities/create.rs | 2 +- server/src/apub/inbox/activities/delete.rs | 2 +- server/src/apub/inbox/activities/dislike.rs | 2 +- server/src/apub/inbox/activities/like.rs | 2 +- server/src/apub/inbox/activities/remove.rs | 2 +- server/src/apub/inbox/activities/undo.rs | 2 +- server/src/apub/inbox/activities/update.rs | 2 +- server/src/apub/inbox/user_inbox.rs | 2 +- server/src/lib.rs | 6 +- server/src/main.rs | 2 +- server/src/routes/websocket.rs | 5 +- .../websocket/{server.rs => chat_server.rs} | 436 +----------------- server/src/websocket/handlers.rs | 291 ++++++++++++ server/src/websocket/messages.rs | 137 ++++++ server/src/websocket/mod.rs | 4 +- 20 files changed, 470 insertions(+), 437 deletions(-) rename server/src/websocket/{server.rs => chat_server.rs} (60%) create mode 100644 server/src/websocket/handlers.rs create mode 100644 server/src/websocket/messages.rs diff --git a/server/src/api/comment.rs b/server/src/api/comment.rs index f2effab5..0d250a7b 100644 --- a/server/src/api/comment.rs +++ b/server/src/api/comment.rs @@ -11,7 +11,7 @@ use crate::{ apub::{ApubLikeableType, ApubObjectType}, blocking, websocket::{ - server::{JoinCommunityRoom, SendComment}, + messages::{JoinCommunityRoom, SendComment}, UserOperation, }, ConnectionId, diff --git a/server/src/api/community.rs b/server/src/api/community.rs index c94ca59b..a7b3b6c7 100644 --- a/server/src/api/community.rs +++ b/server/src/api/community.rs @@ -4,7 +4,7 @@ use crate::{ apub::ActorType, blocking, websocket::{ - server::{GetCommunityUsersOnline, JoinCommunityRoom, SendCommunityRoomMessage}, + messages::{GetCommunityUsersOnline, JoinCommunityRoom, SendCommunityRoomMessage}, UserOperation, }, ConnectionId, diff --git a/server/src/api/post.rs b/server/src/api/post.rs index 9f0fb3be..84842173 100644 --- a/server/src/api/post.rs +++ b/server/src/api/post.rs @@ -13,7 +13,7 @@ use crate::{ blocking, fetch_iframely_and_pictrs_data, websocket::{ - server::{GetPostUsersOnline, JoinCommunityRoom, JoinPostRoom, SendPost}, + messages::{GetPostUsersOnline, JoinCommunityRoom, JoinPostRoom, SendPost}, UserOperation, }, ConnectionId, diff --git a/server/src/api/site.rs b/server/src/api/site.rs index 8f5f0e93..2ea4a390 100644 --- a/server/src/api/site.rs +++ b/server/src/api/site.rs @@ -13,7 +13,7 @@ use crate::{ blocking, version, websocket::{ - server::{GetUsersOnline, SendAllMessage}, + messages::{GetUsersOnline, SendAllMessage}, UserOperation, }, ConnectionId, diff --git a/server/src/api/user.rs b/server/src/api/user.rs index e6cf2a82..32a9d2b8 100644 --- a/server/src/api/user.rs +++ b/server/src/api/user.rs @@ -12,7 +12,7 @@ use crate::{ blocking, captcha_espeak_wav_base64, websocket::{ - server::{CaptchaItem, CheckCaptcha, JoinUserRoom, SendAllMessage, SendUserRoomMessage}, + messages::{CaptchaItem, CheckCaptcha, JoinUserRoom, SendAllMessage, SendUserRoomMessage}, UserOperation, }, ConnectionId, diff --git a/server/src/apub/inbox/activities/create.rs b/server/src/apub/inbox/activities/create.rs index caba560d..a7d7d575 100644 --- a/server/src/apub/inbox/activities/create.rs +++ b/server/src/apub/inbox/activities/create.rs @@ -15,7 +15,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendPost}, + messages::{SendComment, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/activities/delete.rs b/server/src/apub/inbox/activities/delete.rs index 9c4f0bee..65180ca4 100644 --- a/server/src/apub/inbox/activities/delete.rs +++ b/server/src/apub/inbox/activities/delete.rs @@ -14,7 +14,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendCommunityRoomMessage, SendPost}, + messages::{SendComment, SendCommunityRoomMessage, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/activities/dislike.rs b/server/src/apub/inbox/activities/dislike.rs index 4d59dd47..441e36f1 100644 --- a/server/src/apub/inbox/activities/dislike.rs +++ b/server/src/apub/inbox/activities/dislike.rs @@ -12,7 +12,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendPost}, + messages::{SendComment, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/activities/like.rs b/server/src/apub/inbox/activities/like.rs index a3f19b3c..67aefaa0 100644 --- a/server/src/apub/inbox/activities/like.rs +++ b/server/src/apub/inbox/activities/like.rs @@ -12,7 +12,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendPost}, + messages::{SendComment, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/activities/remove.rs b/server/src/apub/inbox/activities/remove.rs index 83eb6f33..f0e98be2 100644 --- a/server/src/apub/inbox/activities/remove.rs +++ b/server/src/apub/inbox/activities/remove.rs @@ -15,7 +15,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendCommunityRoomMessage, SendPost}, + messages::{SendComment, SendCommunityRoomMessage, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/activities/undo.rs b/server/src/apub/inbox/activities/undo.rs index 9a589554..9bf9e96a 100644 --- a/server/src/apub/inbox/activities/undo.rs +++ b/server/src/apub/inbox/activities/undo.rs @@ -14,7 +14,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendCommunityRoomMessage, SendPost}, + messages::{SendComment, SendCommunityRoomMessage, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/activities/update.rs b/server/src/apub/inbox/activities/update.rs index f8e6603f..d5d01235 100644 --- a/server/src/apub/inbox/activities/update.rs +++ b/server/src/apub/inbox/activities/update.rs @@ -16,7 +16,7 @@ use crate::{ }, blocking, websocket::{ - server::{SendComment, SendPost}, + messages::{SendComment, SendPost}, UserOperation, }, LemmyContext, diff --git a/server/src/apub/inbox/user_inbox.rs b/server/src/apub/inbox/user_inbox.rs index 27d58ebc..ddb97109 100644 --- a/server/src/apub/inbox/user_inbox.rs +++ b/server/src/apub/inbox/user_inbox.rs @@ -8,7 +8,7 @@ use crate::{ FromApub, }, blocking, - websocket::{server::SendUserRoomMessage, UserOperation}, + websocket::{messages::SendUserRoomMessage, UserOperation}, LemmyContext, LemmyError, }; diff --git a/server/src/lib.rs b/server/src/lib.rs index 32b43ef8..79182be9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -30,11 +30,9 @@ pub mod routes; pub mod version; pub mod websocket; -use crate::{ - request::{retry, RecvError}, - websocket::server::ChatServer, -}; +use crate::request::{retry, RecvError}; +use crate::websocket::chat_server::ChatServer; use actix::Addr; use actix_web::dev::ConnectionInfo; use anyhow::anyhow; diff --git a/server/src/main.rs b/server/src/main.rs index 72fce5c0..30fcdaab 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -24,7 +24,7 @@ use lemmy_server::{ code_migrations::run_advanced_migrations, rate_limit::{rate_limiter::RateLimiter, RateLimit}, routes::*, - websocket::server::*, + websocket::chat_server::ChatServer, LemmyContext, LemmyError, }; diff --git a/server/src/routes/websocket.rs b/server/src/routes/websocket.rs index 7c787d66..954b39b2 100644 --- a/server/src/routes/websocket.rs +++ b/server/src/routes/websocket.rs @@ -1,6 +1,9 @@ use crate::{ get_ip, - websocket::server::{ChatServer, *}, + websocket::{ + chat_server::ChatServer, + messages::{Connect, Disconnect, StandardMessage, WSMessage}, + }, LemmyContext, }; use actix::prelude::*; diff --git a/server/src/websocket/server.rs b/server/src/websocket/chat_server.rs similarity index 60% rename from server/src/websocket/server.rs rename to server/src/websocket/chat_server.rs index 2a4c558c..6ee22f8c 100644 --- a/server/src/websocket/server.rs +++ b/server/src/websocket/chat_server.rs @@ -1,12 +1,12 @@ -//! `ChatServer` is an actor. It maintains list of connection client session. -//! And manages available rooms. Peers send messages to other peers in same -//! room through `ChatServer`. - use super::*; use crate::{ api::{comment::*, community::*, post::*, site::*, user::*, *}, rate_limit::RateLimit, - websocket::UserOperation, + websocket::{ + handlers::{do_user_operation, to_json_string, Args}, + messages::*, + UserOperation, + }, CommunityId, ConnectionId, IPAddr, @@ -15,145 +15,11 @@ use crate::{ PostId, UserId, }; -use actix_web::web; use anyhow::Context as acontext; use background_jobs::QueueHandle; -use lemmy_db::naive_now; use lemmy_utils::location_info; use reqwest::Client; -/// Chat server sends this messages to session -#[derive(Message)] -#[rtype(result = "()")] -pub struct WSMessage(pub String); - -/// Message for chat server communications - -/// New chat session is created -#[derive(Message)] -#[rtype(usize)] -pub struct Connect { - pub addr: Recipient, - pub ip: IPAddr, -} - -/// Session is disconnected -#[derive(Message)] -#[rtype(result = "()")] -pub struct Disconnect { - pub id: ConnectionId, - pub ip: IPAddr, -} - -/// The messages sent to websocket clients -#[derive(Serialize, Deserialize, Message)] -#[rtype(result = "Result")] -pub struct StandardMessage { - /// Id of the client session - pub id: ConnectionId, - /// Peer message - pub msg: String, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct SendAllMessage { - pub op: UserOperation, - pub response: Response, - pub websocket_id: Option, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct SendUserRoomMessage { - pub op: UserOperation, - pub response: Response, - pub recipient_id: UserId, - pub websocket_id: Option, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct SendCommunityRoomMessage { - pub op: UserOperation, - pub response: Response, - pub community_id: CommunityId, - pub websocket_id: Option, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct SendPost { - pub op: UserOperation, - pub post: PostResponse, - pub websocket_id: Option, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct SendComment { - pub op: UserOperation, - pub comment: CommentResponse, - pub websocket_id: Option, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct JoinUserRoom { - pub user_id: UserId, - pub id: ConnectionId, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct JoinCommunityRoom { - pub community_id: CommunityId, - pub id: ConnectionId, -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct JoinPostRoom { - pub post_id: PostId, - pub id: ConnectionId, -} - -#[derive(Message)] -#[rtype(usize)] -pub struct GetUsersOnline; - -#[derive(Message)] -#[rtype(usize)] -pub struct GetPostUsersOnline { - pub post_id: PostId, -} - -#[derive(Message)] -#[rtype(usize)] -pub struct GetCommunityUsersOnline { - pub community_id: CommunityId, -} - -pub struct SessionInfo { - pub addr: Recipient, - pub ip: IPAddr, -} - -#[derive(Message, Debug)] -#[rtype(result = "()")] -pub struct CaptchaItem { - pub uuid: String, - pub answer: String, - pub expires: chrono::NaiveDateTime, -} - -#[derive(Message)] -#[rtype(bool)] -pub struct CheckCaptcha { - pub uuid: String, - pub answer: String, -} - /// `ChatServer` manages chat rooms and responsible for coordinating chat /// session. pub struct ChatServer { @@ -168,18 +34,18 @@ pub struct ChatServer { /// A map from user id to its connection ID for joined users. Remember a user can have multiple /// sessions (IE clients) - user_rooms: HashMap>, + pub(super) user_rooms: HashMap>, - rng: ThreadRng, + pub(super) rng: ThreadRng, /// The DB Pool - pool: Pool>, + pub(super) pool: Pool>, /// Rate limiting based on rate type and IP addr - rate_limiter: RateLimit, + pub(super) rate_limiter: RateLimit, /// A list of the current captchas - captchas: Vec, + pub(super) captchas: Vec, /// An HTTP Client client: Client, @@ -187,6 +53,14 @@ pub struct ChatServer { activity_queue: QueueHandle, } +pub struct SessionInfo { + pub addr: Recipient, + pub ip: IPAddr, +} + +/// `ChatServer` is an actor. It maintains list of connection client session. +/// And manages available rooms. Peers send messages to other peers in same +/// room through `ChatServer`. impl ChatServer { pub fn startup( pool: Pool>, @@ -451,7 +325,7 @@ impl ChatServer { } } - fn parse_json_message( + pub(super) fn parse_json_message( &mut self, msg: StandardMessage, ctx: &mut Context, @@ -576,275 +450,3 @@ impl ChatServer { } } } - -struct Args<'a> { - context: LemmyContext, - rate_limiter: RateLimit, - id: ConnectionId, - ip: IPAddr, - op: UserOperation, - data: &'a str, -} - -async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result -where - for<'de> Data: Deserialize<'de> + 'a, - Data: Perform, -{ - let Args { - context, - rate_limiter, - id, - ip, - op, - data, - } = args; - - let data = data.to_string(); - let op2 = op.clone(); - - let fut = async move { - let parsed_data: Data = serde_json::from_str(&data)?; - let res = parsed_data - .perform(&web::Data::new(context), Some(id)) - .await?; - to_json_string(&op, &res) - }; - - match op2 { - UserOperation::Register => rate_limiter.register().wrap(ip, fut).await, - UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await, - UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await, - _ => rate_limiter.message().wrap(ip, fut).await, - } -} - -/// Make actor from `ChatServer` -impl Actor for ChatServer { - /// We are going to use simple Context, we just need ability to communicate - /// with other actors. - type Context = Context; -} - -/// Handler for Connect message. -/// -/// Register new session and assign unique id to this session -impl Handler for ChatServer { - type Result = usize; - - fn handle(&mut self, msg: Connect, _ctx: &mut Context) -> Self::Result { - // register session with random id - let id = self.rng.gen::(); - info!("{} joined", &msg.ip); - - self.sessions.insert( - id, - SessionInfo { - addr: msg.addr, - ip: msg.ip, - }, - ); - - id - } -} - -/// Handler for Disconnect message. -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: Disconnect, _: &mut Context) { - // Remove connections from sessions and all 3 scopes - if self.sessions.remove(&msg.id).is_some() { - for sessions in self.user_rooms.values_mut() { - sessions.remove(&msg.id); - } - - for sessions in self.post_rooms.values_mut() { - sessions.remove(&msg.id); - } - - for sessions in self.community_rooms.values_mut() { - sessions.remove(&msg.id); - } - } - } -} - -/// Handler for Message message. -impl Handler for ChatServer { - type Result = ResponseFuture>; - - fn handle(&mut self, msg: StandardMessage, ctx: &mut Context) -> Self::Result { - let fut = self.parse_json_message(msg, ctx); - Box::pin(async move { - match fut.await { - Ok(m) => { - // info!("Message Sent: {}", m); - Ok(m) - } - Err(e) => { - error!("Error during message handling {}", e); - Ok(e.to_string()) - } - } - }) - } -} - -impl Handler> for ChatServer -where - Response: Serialize, -{ - type Result = (); - - fn handle(&mut self, msg: SendAllMessage, _: &mut Context) { - self - .send_all_message(&msg.op, &msg.response, msg.websocket_id) - .ok(); - } -} - -impl Handler> for ChatServer -where - Response: Serialize, -{ - type Result = (); - - fn handle(&mut self, msg: SendUserRoomMessage, _: &mut Context) { - self - .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id) - .ok(); - } -} - -impl Handler> for ChatServer -where - Response: Serialize, -{ - type Result = (); - - fn handle(&mut self, msg: SendCommunityRoomMessage, _: &mut Context) { - self - .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id) - .ok(); - } -} - -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: SendPost, _: &mut Context) { - self.send_post(&msg.op, &msg.post, msg.websocket_id).ok(); - } -} - -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: SendComment, _: &mut Context) { - self - .send_comment(&msg.op, &msg.comment, msg.websocket_id) - .ok(); - } -} - -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: JoinUserRoom, _: &mut Context) { - self.join_user_room(msg.user_id, msg.id).ok(); - } -} - -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context) { - self.join_community_room(msg.community_id, msg.id).ok(); - } -} - -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: JoinPostRoom, _: &mut Context) { - self.join_post_room(msg.post_id, msg.id).ok(); - } -} - -impl Handler for ChatServer { - type Result = usize; - - fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context) -> Self::Result { - self.sessions.len() - } -} - -impl Handler for ChatServer { - type Result = usize; - - fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context) -> Self::Result { - if let Some(users) = self.post_rooms.get(&msg.post_id) { - users.len() - } else { - 0 - } - } -} - -impl Handler for ChatServer { - type Result = usize; - - fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context) -> Self::Result { - if let Some(users) = self.community_rooms.get(&msg.community_id) { - users.len() - } else { - 0 - } - } -} - -#[derive(Serialize)] -struct WebsocketResponse { - op: String, - data: T, -} - -fn to_json_string(op: &UserOperation, data: &Response) -> Result -where - Response: Serialize, -{ - let response = WebsocketResponse { - op: op.to_string(), - data, - }; - Ok(serde_json::to_string(&response)?) -} - -impl Handler for ChatServer { - type Result = (); - - fn handle(&mut self, msg: CaptchaItem, _: &mut Context) { - self.captchas.push(msg); - } -} - -impl Handler for ChatServer { - type Result = bool; - - fn handle(&mut self, msg: CheckCaptcha, _: &mut Context) -> Self::Result { - // Remove all the ones that are past the expire time - self.captchas.retain(|x| x.expires.gt(&naive_now())); - - let check = self - .captchas - .iter() - .any(|r| r.uuid == msg.uuid && r.answer == msg.answer); - - // Remove this uuid so it can't be re-checked (Checks only work once) - self.captchas.retain(|x| x.uuid != msg.uuid); - - check - } -} diff --git a/server/src/websocket/handlers.rs b/server/src/websocket/handlers.rs new file mode 100644 index 00000000..812ef921 --- /dev/null +++ b/server/src/websocket/handlers.rs @@ -0,0 +1,291 @@ +use super::*; +use crate::{ + api::Perform, + rate_limit::RateLimit, + websocket::{ + chat_server::{ChatServer, SessionInfo}, + messages::*, + UserOperation, + }, + ConnectionId, + IPAddr, + LemmyContext, + LemmyError, +}; +use actix_web::web; +use lemmy_db::naive_now; + +pub(super) struct Args<'a> { + pub(super) context: LemmyContext, + pub(super) rate_limiter: RateLimit, + pub(super) id: ConnectionId, + pub(super) ip: IPAddr, + pub(super) op: UserOperation, + pub(super) data: &'a str, +} + +pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result +where + for<'de> Data: Deserialize<'de> + 'a, + Data: Perform, +{ + let Args { + context, + rate_limiter, + id, + ip, + op, + data, + } = args; + + let data = data.to_string(); + let op2 = op.clone(); + + let fut = async move { + let parsed_data: Data = serde_json::from_str(&data)?; + let res = parsed_data + .perform(&web::Data::new(context), Some(id)) + .await?; + to_json_string(&op, &res) + }; + + match op2 { + UserOperation::Register => rate_limiter.register().wrap(ip, fut).await, + UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await, + UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await, + _ => rate_limiter.message().wrap(ip, fut).await, + } +} + +/// Make actor from `ChatServer` +impl Actor for ChatServer { + /// We are going to use simple Context, we just need ability to communicate + /// with other actors. + type Context = Context; +} + +/// Handler for Connect message. +/// +/// Register new session and assign unique id to this session +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: Connect, _ctx: &mut Context) -> Self::Result { + // register session with random id + let id = self.rng.gen::(); + info!("{} joined", &msg.ip); + + self.sessions.insert( + id, + SessionInfo { + addr: msg.addr, + ip: msg.ip, + }, + ); + + id + } +} + +/// Handler for Disconnect message. +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: Disconnect, _: &mut Context) { + // Remove connections from sessions and all 3 scopes + if self.sessions.remove(&msg.id).is_some() { + for sessions in self.user_rooms.values_mut() { + sessions.remove(&msg.id); + } + + for sessions in self.post_rooms.values_mut() { + sessions.remove(&msg.id); + } + + for sessions in self.community_rooms.values_mut() { + sessions.remove(&msg.id); + } + } + } +} + +/// Handler for Message message. +impl Handler for ChatServer { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: StandardMessage, ctx: &mut Context) -> Self::Result { + let fut = self.parse_json_message(msg, ctx); + Box::pin(async move { + match fut.await { + Ok(m) => { + // info!("Message Sent: {}", m); + Ok(m) + } + Err(e) => { + error!("Error during message handling {}", e); + Ok(e.to_string()) + } + } + }) + } +} + +impl Handler> for ChatServer +where + Response: Serialize, +{ + type Result = (); + + fn handle(&mut self, msg: SendAllMessage, _: &mut Context) { + self + .send_all_message(&msg.op, &msg.response, msg.websocket_id) + .ok(); + } +} + +impl Handler> for ChatServer +where + Response: Serialize, +{ + type Result = (); + + fn handle(&mut self, msg: SendUserRoomMessage, _: &mut Context) { + self + .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id) + .ok(); + } +} + +impl Handler> for ChatServer +where + Response: Serialize, +{ + type Result = (); + + fn handle(&mut self, msg: SendCommunityRoomMessage, _: &mut Context) { + self + .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id) + .ok(); + } +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendPost, _: &mut Context) { + self.send_post(&msg.op, &msg.post, msg.websocket_id).ok(); + } +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: SendComment, _: &mut Context) { + self + .send_comment(&msg.op, &msg.comment, msg.websocket_id) + .ok(); + } +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinUserRoom, _: &mut Context) { + self.join_user_room(msg.user_id, msg.id).ok(); + } +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context) { + self.join_community_room(msg.community_id, msg.id).ok(); + } +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: JoinPostRoom, _: &mut Context) { + self.join_post_room(msg.post_id, msg.id).ok(); + } +} + +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context) -> Self::Result { + self.sessions.len() + } +} + +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context) -> Self::Result { + if let Some(users) = self.post_rooms.get(&msg.post_id) { + users.len() + } else { + 0 + } + } +} + +impl Handler for ChatServer { + type Result = usize; + + fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context) -> Self::Result { + if let Some(users) = self.community_rooms.get(&msg.community_id) { + users.len() + } else { + 0 + } + } +} + +#[derive(Serialize)] +struct WebsocketResponse { + op: String, + data: T, +} + +pub(super) fn to_json_string( + op: &UserOperation, + data: &Response, +) -> Result +where + Response: Serialize, +{ + let response = WebsocketResponse { + op: op.to_string(), + data, + }; + Ok(serde_json::to_string(&response)?) +} + +impl Handler for ChatServer { + type Result = (); + + fn handle(&mut self, msg: CaptchaItem, _: &mut Context) { + self.captchas.push(msg); + } +} + +impl Handler for ChatServer { + type Result = bool; + + fn handle(&mut self, msg: CheckCaptcha, _: &mut Context) -> Self::Result { + // Remove all the ones that are past the expire time + self.captchas.retain(|x| x.expires.gt(&naive_now())); + + let check = self + .captchas + .iter() + .any(|r| r.uuid == msg.uuid && r.answer == msg.answer); + + // Remove this uuid so it can't be re-checked (Checks only work once) + self.captchas.retain(|x| x.uuid != msg.uuid); + + check + } +} diff --git a/server/src/websocket/messages.rs b/server/src/websocket/messages.rs new file mode 100644 index 00000000..41b0cbc6 --- /dev/null +++ b/server/src/websocket/messages.rs @@ -0,0 +1,137 @@ +use super::*; +use crate::{ + api::{comment::*, post::*}, + websocket::UserOperation, + CommunityId, + ConnectionId, + IPAddr, + PostId, + UserId, +}; + +/// Chat server sends this messages to session +#[derive(Message)] +#[rtype(result = "()")] +pub struct WSMessage(pub String); + +/// Message for chat server communications + +/// New chat session is created +#[derive(Message)] +#[rtype(usize)] +pub struct Connect { + pub addr: Recipient, + pub ip: IPAddr, +} + +/// Session is disconnected +#[derive(Message)] +#[rtype(result = "()")] +pub struct Disconnect { + pub id: ConnectionId, + pub ip: IPAddr, +} + +/// The messages sent to websocket clients +#[derive(Serialize, Deserialize, Message)] +#[rtype(result = "Result")] +pub struct StandardMessage { + /// Id of the client session + pub id: ConnectionId, + /// Peer message + pub msg: String, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendAllMessage { + pub op: UserOperation, + pub response: Response, + pub websocket_id: Option, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendUserRoomMessage { + pub op: UserOperation, + pub response: Response, + pub recipient_id: UserId, + pub websocket_id: Option, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendCommunityRoomMessage { + pub op: UserOperation, + pub response: Response, + pub community_id: CommunityId, + pub websocket_id: Option, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendPost { + pub op: UserOperation, + pub post: PostResponse, + pub websocket_id: Option, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct SendComment { + pub op: UserOperation, + pub comment: CommentResponse, + pub websocket_id: Option, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinUserRoom { + pub user_id: UserId, + pub id: ConnectionId, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinCommunityRoom { + pub community_id: CommunityId, + pub id: ConnectionId, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct JoinPostRoom { + pub post_id: PostId, + pub id: ConnectionId, +} + +#[derive(Message)] +#[rtype(usize)] +pub struct GetUsersOnline; + +#[derive(Message)] +#[rtype(usize)] +pub struct GetPostUsersOnline { + pub post_id: PostId, +} + +#[derive(Message)] +#[rtype(usize)] +pub struct GetCommunityUsersOnline { + pub community_id: CommunityId, +} + +#[derive(Message, Debug)] +#[rtype(result = "()")] +pub struct CaptchaItem { + pub uuid: String, + pub answer: String, + pub expires: chrono::NaiveDateTime, +} + +#[derive(Message)] +#[rtype(bool)] +pub struct CheckCaptcha { + pub uuid: String, + pub answer: String, +} diff --git a/server/src/websocket/mod.rs b/server/src/websocket/mod.rs index 1430d89a..8bf8766d 100644 --- a/server/src/websocket/mod.rs +++ b/server/src/websocket/mod.rs @@ -1,4 +1,6 @@ -pub mod server; +pub mod chat_server; +pub mod handlers; +pub mod messages; use actix::prelude::*; use diesel::{ -- 2.44.1