]> Untitled Git - lemmy.git/commitdiff
Refactor websocket to split it into multiple files
authorFelix Ableitner <me@nutomic.com>
Mon, 31 Aug 2020 15:20:13 +0000 (17:20 +0200)
committerFelix Ableitner <me@nutomic.com>
Mon, 31 Aug 2020 15:20:13 +0000 (17:20 +0200)
20 files changed:
server/src/api/comment.rs
server/src/api/community.rs
server/src/api/post.rs
server/src/api/site.rs
server/src/api/user.rs
server/src/apub/inbox/activities/create.rs
server/src/apub/inbox/activities/delete.rs
server/src/apub/inbox/activities/dislike.rs
server/src/apub/inbox/activities/like.rs
server/src/apub/inbox/activities/remove.rs
server/src/apub/inbox/activities/undo.rs
server/src/apub/inbox/activities/update.rs
server/src/apub/inbox/user_inbox.rs
server/src/lib.rs
server/src/main.rs
server/src/routes/websocket.rs
server/src/websocket/chat_server.rs [moved from server/src/websocket/server.rs with 60% similarity]
server/src/websocket/handlers.rs [new file with mode: 0644]
server/src/websocket/messages.rs [new file with mode: 0644]
server/src/websocket/mod.rs

index f2effab58b48e8aa65225f9ae8dac69783440a7a..0d250a7b2d2e65dd10da096237e91e6ff1977e21 100644 (file)
@@ -11,7 +11,7 @@ use crate::{
   apub::{ApubLikeableType, ApubObjectType},
   blocking,
   websocket::{
-    server::{JoinCommunityRoom, SendComment},
+    messages::{JoinCommunityRoom, SendComment},
     UserOperation,
   },
   ConnectionId,
index c94ca59b7ba284fbab78c2bd0500cb8f6e0f83a4..a7b3b6c7c702a5ee03e9696bbc8d3b73dbd72e68 100644 (file)
@@ -4,7 +4,7 @@ use crate::{
   apub::ActorType,
   blocking,
   websocket::{
-    server::{GetCommunityUsersOnline, JoinCommunityRoom, SendCommunityRoomMessage},
+    messages::{GetCommunityUsersOnline, JoinCommunityRoom, SendCommunityRoomMessage},
     UserOperation,
   },
   ConnectionId,
index 9f0fb3be051d3a5a6c202326392712f9e31880c9..84842173e8d0c75cfde823d76b21f594c9b7b282 100644 (file)
@@ -13,7 +13,7 @@ use crate::{
   blocking,
   fetch_iframely_and_pictrs_data,
   websocket::{
-    server::{GetPostUsersOnline, JoinCommunityRoom, JoinPostRoom, SendPost},
+    messages::{GetPostUsersOnline, JoinCommunityRoom, JoinPostRoom, SendPost},
     UserOperation,
   },
   ConnectionId,
index 8f5f0e93082f3feb318cb9aae8b01e4e672e3d03..2ea4a390c83197bb82be14c372dcaca0d09eda9c 100644 (file)
@@ -13,7 +13,7 @@ use crate::{
   blocking,
   version,
   websocket::{
-    server::{GetUsersOnline, SendAllMessage},
+    messages::{GetUsersOnline, SendAllMessage},
     UserOperation,
   },
   ConnectionId,
index e6cf2a820ad61fbcf29d1a244ce8edd58bb212ef..32a9d2b82ac2f0e2735cac286888ab00c462c62d 100644 (file)
@@ -12,7 +12,7 @@ use crate::{
   blocking,
   captcha_espeak_wav_base64,
   websocket::{
-    server::{CaptchaItem, CheckCaptcha, JoinUserRoom, SendAllMessage, SendUserRoomMessage},
+    messages::{CaptchaItem, CheckCaptcha, JoinUserRoom, SendAllMessage, SendUserRoomMessage},
     UserOperation,
   },
   ConnectionId,
index caba560dceaf33729955b2416b815eab9ae32028..a7d7d5750f849088acb347a02c2b7cc449930ce9 100644 (file)
@@ -15,7 +15,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendPost},
+    messages::{SendComment, SendPost},
     UserOperation,
   },
   LemmyContext,
index 9c4f0beeffe688f19cf6ede99784d92472c00267..65180ca43ef12e466dd6b4ceb8010009aa957c33 100644 (file)
@@ -14,7 +14,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendCommunityRoomMessage, SendPost},
+    messages::{SendComment, SendCommunityRoomMessage, SendPost},
     UserOperation,
   },
   LemmyContext,
index 4d59dd4703e7fb920bdd5f6c4508a71ae6db7b1a..441e36f1a3eb7a6f052db5ecf47563b562cb2261 100644 (file)
@@ -12,7 +12,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendPost},
+    messages::{SendComment, SendPost},
     UserOperation,
   },
   LemmyContext,
index a3f19b3cee30ab70c876bc1cd47e2f64fe0207c0..67aefaa08c048f2f2af6f8db0cfbd68ed4a8a872 100644 (file)
@@ -12,7 +12,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendPost},
+    messages::{SendComment, SendPost},
     UserOperation,
   },
   LemmyContext,
index 83eb6f3394172e6b19fa2844e7a4f254df537165..f0e98be2b162a565ef8c91b77471227fad7068ad 100644 (file)
@@ -15,7 +15,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendCommunityRoomMessage, SendPost},
+    messages::{SendComment, SendCommunityRoomMessage, SendPost},
     UserOperation,
   },
   LemmyContext,
index 9a589554d8686c5fe3919930c8b785c1382d4cf8..9bf9e96ad10427670fa932328fbc3d95391cf43a 100644 (file)
@@ -14,7 +14,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendCommunityRoomMessage, SendPost},
+    messages::{SendComment, SendCommunityRoomMessage, SendPost},
     UserOperation,
   },
   LemmyContext,
index f8e6603f69cd6a4481eac9effcbe0da0cdd13cb0..d5d01235005c903a348d0ae30c38cfaf535f5167 100644 (file)
@@ -16,7 +16,7 @@ use crate::{
   },
   blocking,
   websocket::{
-    server::{SendComment, SendPost},
+    messages::{SendComment, SendPost},
     UserOperation,
   },
   LemmyContext,
index 27d58ebcd3b469a0efa2e117de30de78a177b94b..ddb97109c45d50fdd7da3b001f3fd8ab24b801ca 100644 (file)
@@ -8,7 +8,7 @@ use crate::{
     FromApub,
   },
   blocking,
-  websocket::{server::SendUserRoomMessage, UserOperation},
+  websocket::{messages::SendUserRoomMessage, UserOperation},
   LemmyContext,
   LemmyError,
 };
index 32b43ef8454aedc4bd8a527901f2aedde07a8f1d..79182be90bad22ef601f47ec7e7b2c6113bdbf2b 100644 (file)
@@ -30,11 +30,9 @@ pub mod routes;
 pub mod version;
 pub mod websocket;
 
-use crate::{
-  request::{retry, RecvError},
-  websocket::server::ChatServer,
-};
+use crate::request::{retry, RecvError};
 
+use crate::websocket::chat_server::ChatServer;
 use actix::Addr;
 use actix_web::dev::ConnectionInfo;
 use anyhow::anyhow;
index 72fce5c00f5834fb15fbab64b16902c920211b38..30fcdaab38441149d4053831cb01f4110e905bbd 100644 (file)
@@ -24,7 +24,7 @@ use lemmy_server::{
   code_migrations::run_advanced_migrations,
   rate_limit::{rate_limiter::RateLimiter, RateLimit},
   routes::*,
-  websocket::server::*,
+  websocket::chat_server::ChatServer,
   LemmyContext,
   LemmyError,
 };
index 7c787d66d79bce0e4f0a17fd98ac7fbab22ff5ee..954b39b22f81fa60c12601d5fe60a9c6acbf925e 100644 (file)
@@ -1,6 +1,9 @@
 use crate::{
   get_ip,
-  websocket::server::{ChatServer, *},
+  websocket::{
+    chat_server::ChatServer,
+    messages::{Connect, Disconnect, StandardMessage, WSMessage},
+  },
   LemmyContext,
 };
 use actix::prelude::*;
similarity index 60%
rename from server/src/websocket/server.rs
rename to server/src/websocket/chat_server.rs
index 2a4c558cd383408adbd56278fc85c1d051e37b02..6ee22f8c7624be530a368888341f68445bb17047 100644 (file)
@@ -1,12 +1,12 @@
-//! `ChatServer` is an actor. It maintains list of connection client session.
-//! And manages available rooms. Peers send messages to other peers in same
-//! room through `ChatServer`.
-
 use super::*;
 use crate::{
   api::{comment::*, community::*, post::*, site::*, user::*, *},
   rate_limit::RateLimit,
-  websocket::UserOperation,
+  websocket::{
+    handlers::{do_user_operation, to_json_string, Args},
+    messages::*,
+    UserOperation,
+  },
   CommunityId,
   ConnectionId,
   IPAddr,
@@ -15,145 +15,11 @@ use crate::{
   PostId,
   UserId,
 };
-use actix_web::web;
 use anyhow::Context as acontext;
 use background_jobs::QueueHandle;
-use lemmy_db::naive_now;
 use lemmy_utils::location_info;
 use reqwest::Client;
 
-/// Chat server sends this messages to session
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct WSMessage(pub String);
-
-/// Message for chat server communications
-
-/// New chat session is created
-#[derive(Message)]
-#[rtype(usize)]
-pub struct Connect {
-  pub addr: Recipient<WSMessage>,
-  pub ip: IPAddr,
-}
-
-/// Session is disconnected
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct Disconnect {
-  pub id: ConnectionId,
-  pub ip: IPAddr,
-}
-
-/// The messages sent to websocket clients
-#[derive(Serialize, Deserialize, Message)]
-#[rtype(result = "Result<String, std::convert::Infallible>")]
-pub struct StandardMessage {
-  /// Id of the client session
-  pub id: ConnectionId,
-  /// Peer message
-  pub msg: String,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct SendAllMessage<Response> {
-  pub op: UserOperation,
-  pub response: Response,
-  pub websocket_id: Option<ConnectionId>,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct SendUserRoomMessage<Response> {
-  pub op: UserOperation,
-  pub response: Response,
-  pub recipient_id: UserId,
-  pub websocket_id: Option<ConnectionId>,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct SendCommunityRoomMessage<Response> {
-  pub op: UserOperation,
-  pub response: Response,
-  pub community_id: CommunityId,
-  pub websocket_id: Option<ConnectionId>,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct SendPost {
-  pub op: UserOperation,
-  pub post: PostResponse,
-  pub websocket_id: Option<ConnectionId>,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct SendComment {
-  pub op: UserOperation,
-  pub comment: CommentResponse,
-  pub websocket_id: Option<ConnectionId>,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct JoinUserRoom {
-  pub user_id: UserId,
-  pub id: ConnectionId,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct JoinCommunityRoom {
-  pub community_id: CommunityId,
-  pub id: ConnectionId,
-}
-
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct JoinPostRoom {
-  pub post_id: PostId,
-  pub id: ConnectionId,
-}
-
-#[derive(Message)]
-#[rtype(usize)]
-pub struct GetUsersOnline;
-
-#[derive(Message)]
-#[rtype(usize)]
-pub struct GetPostUsersOnline {
-  pub post_id: PostId,
-}
-
-#[derive(Message)]
-#[rtype(usize)]
-pub struct GetCommunityUsersOnline {
-  pub community_id: CommunityId,
-}
-
-pub struct SessionInfo {
-  pub addr: Recipient<WSMessage>,
-  pub ip: IPAddr,
-}
-
-#[derive(Message, Debug)]
-#[rtype(result = "()")]
-pub struct CaptchaItem {
-  pub uuid: String,
-  pub answer: String,
-  pub expires: chrono::NaiveDateTime,
-}
-
-#[derive(Message)]
-#[rtype(bool)]
-pub struct CheckCaptcha {
-  pub uuid: String,
-  pub answer: String,
-}
-
 /// `ChatServer` manages chat rooms and responsible for coordinating chat
 /// session.
 pub struct ChatServer {
@@ -168,18 +34,18 @@ pub struct ChatServer {
 
   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
   /// sessions (IE clients)
-  user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
+  pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
 
-  rng: ThreadRng,
+  pub(super) rng: ThreadRng,
 
   /// The DB Pool
-  pool: Pool<ConnectionManager<PgConnection>>,
+  pub(super) pool: Pool<ConnectionManager<PgConnection>>,
 
   /// Rate limiting based on rate type and IP addr
-  rate_limiter: RateLimit,
+  pub(super) rate_limiter: RateLimit,
 
   /// A list of the current captchas
-  captchas: Vec<CaptchaItem>,
+  pub(super) captchas: Vec<CaptchaItem>,
 
   /// An HTTP Client
   client: Client,
@@ -187,6 +53,14 @@ pub struct ChatServer {
   activity_queue: QueueHandle,
 }
 
+pub struct SessionInfo {
+  pub addr: Recipient<WSMessage>,
+  pub ip: IPAddr,
+}
+
+/// `ChatServer` is an actor. It maintains list of connection client session.
+/// And manages available rooms. Peers send messages to other peers in same
+/// room through `ChatServer`.
 impl ChatServer {
   pub fn startup(
     pool: Pool<ConnectionManager<PgConnection>>,
@@ -451,7 +325,7 @@ impl ChatServer {
     }
   }
 
-  fn parse_json_message(
+  pub(super) fn parse_json_message(
     &mut self,
     msg: StandardMessage,
     ctx: &mut Context<Self>,
@@ -576,275 +450,3 @@ impl ChatServer {
     }
   }
 }
-
-struct Args<'a> {
-  context: LemmyContext,
-  rate_limiter: RateLimit,
-  id: ConnectionId,
-  ip: IPAddr,
-  op: UserOperation,
-  data: &'a str,
-}
-
-async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result<String, LemmyError>
-where
-  for<'de> Data: Deserialize<'de> + 'a,
-  Data: Perform,
-{
-  let Args {
-    context,
-    rate_limiter,
-    id,
-    ip,
-    op,
-    data,
-  } = args;
-
-  let data = data.to_string();
-  let op2 = op.clone();
-
-  let fut = async move {
-    let parsed_data: Data = serde_json::from_str(&data)?;
-    let res = parsed_data
-      .perform(&web::Data::new(context), Some(id))
-      .await?;
-    to_json_string(&op, &res)
-  };
-
-  match op2 {
-    UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
-    UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
-    UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
-    _ => rate_limiter.message().wrap(ip, fut).await,
-  }
-}
-
-/// Make actor from `ChatServer`
-impl Actor for ChatServer {
-  /// We are going to use simple Context, we just need ability to communicate
-  /// with other actors.
-  type Context = Context<Self>;
-}
-
-/// Handler for Connect message.
-///
-/// Register new session and assign unique id to this session
-impl Handler<Connect> for ChatServer {
-  type Result = usize;
-
-  fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
-    // register session with random id
-    let id = self.rng.gen::<usize>();
-    info!("{} joined", &msg.ip);
-
-    self.sessions.insert(
-      id,
-      SessionInfo {
-        addr: msg.addr,
-        ip: msg.ip,
-      },
-    );
-
-    id
-  }
-}
-
-/// Handler for Disconnect message.
-impl Handler<Disconnect> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
-    // Remove connections from sessions and all 3 scopes
-    if self.sessions.remove(&msg.id).is_some() {
-      for sessions in self.user_rooms.values_mut() {
-        sessions.remove(&msg.id);
-      }
-
-      for sessions in self.post_rooms.values_mut() {
-        sessions.remove(&msg.id);
-      }
-
-      for sessions in self.community_rooms.values_mut() {
-        sessions.remove(&msg.id);
-      }
-    }
-  }
-}
-
-/// Handler for Message message.
-impl Handler<StandardMessage> for ChatServer {
-  type Result = ResponseFuture<Result<String, std::convert::Infallible>>;
-
-  fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) -> Self::Result {
-    let fut = self.parse_json_message(msg, ctx);
-    Box::pin(async move {
-      match fut.await {
-        Ok(m) => {
-          // info!("Message Sent: {}", m);
-          Ok(m)
-        }
-        Err(e) => {
-          error!("Error during message handling {}", e);
-          Ok(e.to_string())
-        }
-      }
-    })
-  }
-}
-
-impl<Response> Handler<SendAllMessage<Response>> for ChatServer
-where
-  Response: Serialize,
-{
-  type Result = ();
-
-  fn handle(&mut self, msg: SendAllMessage<Response>, _: &mut Context<Self>) {
-    self
-      .send_all_message(&msg.op, &msg.response, msg.websocket_id)
-      .ok();
-  }
-}
-
-impl<Response> Handler<SendUserRoomMessage<Response>> for ChatServer
-where
-  Response: Serialize,
-{
-  type Result = ();
-
-  fn handle(&mut self, msg: SendUserRoomMessage<Response>, _: &mut Context<Self>) {
-    self
-      .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id)
-      .ok();
-  }
-}
-
-impl<Response> Handler<SendCommunityRoomMessage<Response>> for ChatServer
-where
-  Response: Serialize,
-{
-  type Result = ();
-
-  fn handle(&mut self, msg: SendCommunityRoomMessage<Response>, _: &mut Context<Self>) {
-    self
-      .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id)
-      .ok();
-  }
-}
-
-impl Handler<SendPost> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: SendPost, _: &mut Context<Self>) {
-    self.send_post(&msg.op, &msg.post, msg.websocket_id).ok();
-  }
-}
-
-impl Handler<SendComment> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: SendComment, _: &mut Context<Self>) {
-    self
-      .send_comment(&msg.op, &msg.comment, msg.websocket_id)
-      .ok();
-  }
-}
-
-impl Handler<JoinUserRoom> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: JoinUserRoom, _: &mut Context<Self>) {
-    self.join_user_room(msg.user_id, msg.id).ok();
-  }
-}
-
-impl Handler<JoinCommunityRoom> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context<Self>) {
-    self.join_community_room(msg.community_id, msg.id).ok();
-  }
-}
-
-impl Handler<JoinPostRoom> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: JoinPostRoom, _: &mut Context<Self>) {
-    self.join_post_room(msg.post_id, msg.id).ok();
-  }
-}
-
-impl Handler<GetUsersOnline> for ChatServer {
-  type Result = usize;
-
-  fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context<Self>) -> Self::Result {
-    self.sessions.len()
-  }
-}
-
-impl Handler<GetPostUsersOnline> for ChatServer {
-  type Result = usize;
-
-  fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context<Self>) -> Self::Result {
-    if let Some(users) = self.post_rooms.get(&msg.post_id) {
-      users.len()
-    } else {
-      0
-    }
-  }
-}
-
-impl Handler<GetCommunityUsersOnline> for ChatServer {
-  type Result = usize;
-
-  fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context<Self>) -> Self::Result {
-    if let Some(users) = self.community_rooms.get(&msg.community_id) {
-      users.len()
-    } else {
-      0
-    }
-  }
-}
-
-#[derive(Serialize)]
-struct WebsocketResponse<T> {
-  op: String,
-  data: T,
-}
-
-fn to_json_string<Response>(op: &UserOperation, data: &Response) -> Result<String, LemmyError>
-where
-  Response: Serialize,
-{
-  let response = WebsocketResponse {
-    op: op.to_string(),
-    data,
-  };
-  Ok(serde_json::to_string(&response)?)
-}
-
-impl Handler<CaptchaItem> for ChatServer {
-  type Result = ();
-
-  fn handle(&mut self, msg: CaptchaItem, _: &mut Context<Self>) {
-    self.captchas.push(msg);
-  }
-}
-
-impl Handler<CheckCaptcha> for ChatServer {
-  type Result = bool;
-
-  fn handle(&mut self, msg: CheckCaptcha, _: &mut Context<Self>) -> Self::Result {
-    // Remove all the ones that are past the expire time
-    self.captchas.retain(|x| x.expires.gt(&naive_now()));
-
-    let check = self
-      .captchas
-      .iter()
-      .any(|r| r.uuid == msg.uuid && r.answer == msg.answer);
-
-    // Remove this uuid so it can't be re-checked (Checks only work once)
-    self.captchas.retain(|x| x.uuid != msg.uuid);
-
-    check
-  }
-}
diff --git a/server/src/websocket/handlers.rs b/server/src/websocket/handlers.rs
new file mode 100644 (file)
index 0000000..812ef92
--- /dev/null
@@ -0,0 +1,291 @@
+use super::*;
+use crate::{
+  api::Perform,
+  rate_limit::RateLimit,
+  websocket::{
+    chat_server::{ChatServer, SessionInfo},
+    messages::*,
+    UserOperation,
+  },
+  ConnectionId,
+  IPAddr,
+  LemmyContext,
+  LemmyError,
+};
+use actix_web::web;
+use lemmy_db::naive_now;
+
+pub(super) struct Args<'a> {
+  pub(super) context: LemmyContext,
+  pub(super) rate_limiter: RateLimit,
+  pub(super) id: ConnectionId,
+  pub(super) ip: IPAddr,
+  pub(super) op: UserOperation,
+  pub(super) data: &'a str,
+}
+
+pub(super) async fn do_user_operation<'a, 'b, Data>(args: Args<'b>) -> Result<String, LemmyError>
+where
+  for<'de> Data: Deserialize<'de> + 'a,
+  Data: Perform,
+{
+  let Args {
+    context,
+    rate_limiter,
+    id,
+    ip,
+    op,
+    data,
+  } = args;
+
+  let data = data.to_string();
+  let op2 = op.clone();
+
+  let fut = async move {
+    let parsed_data: Data = serde_json::from_str(&data)?;
+    let res = parsed_data
+      .perform(&web::Data::new(context), Some(id))
+      .await?;
+    to_json_string(&op, &res)
+  };
+
+  match op2 {
+    UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
+    UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
+    UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
+    _ => rate_limiter.message().wrap(ip, fut).await,
+  }
+}
+
+/// Make actor from `ChatServer`
+impl Actor for ChatServer {
+  /// We are going to use simple Context, we just need ability to communicate
+  /// with other actors.
+  type Context = Context<Self>;
+}
+
+/// Handler for Connect message.
+///
+/// Register new session and assign unique id to this session
+impl Handler<Connect> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
+    // register session with random id
+    let id = self.rng.gen::<usize>();
+    info!("{} joined", &msg.ip);
+
+    self.sessions.insert(
+      id,
+      SessionInfo {
+        addr: msg.addr,
+        ip: msg.ip,
+      },
+    );
+
+    id
+  }
+}
+
+/// Handler for Disconnect message.
+impl Handler<Disconnect> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
+    // Remove connections from sessions and all 3 scopes
+    if self.sessions.remove(&msg.id).is_some() {
+      for sessions in self.user_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+
+      for sessions in self.post_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+
+      for sessions in self.community_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+    }
+  }
+}
+
+/// Handler for Message message.
+impl Handler<StandardMessage> for ChatServer {
+  type Result = ResponseFuture<Result<String, std::convert::Infallible>>;
+
+  fn handle(&mut self, msg: StandardMessage, ctx: &mut Context<Self>) -> Self::Result {
+    let fut = self.parse_json_message(msg, ctx);
+    Box::pin(async move {
+      match fut.await {
+        Ok(m) => {
+          // info!("Message Sent: {}", m);
+          Ok(m)
+        }
+        Err(e) => {
+          error!("Error during message handling {}", e);
+          Ok(e.to_string())
+        }
+      }
+    })
+  }
+}
+
+impl<Response> Handler<SendAllMessage<Response>> for ChatServer
+where
+  Response: Serialize,
+{
+  type Result = ();
+
+  fn handle(&mut self, msg: SendAllMessage<Response>, _: &mut Context<Self>) {
+    self
+      .send_all_message(&msg.op, &msg.response, msg.websocket_id)
+      .ok();
+  }
+}
+
+impl<Response> Handler<SendUserRoomMessage<Response>> for ChatServer
+where
+  Response: Serialize,
+{
+  type Result = ();
+
+  fn handle(&mut self, msg: SendUserRoomMessage<Response>, _: &mut Context<Self>) {
+    self
+      .send_user_room_message(&msg.op, &msg.response, msg.recipient_id, msg.websocket_id)
+      .ok();
+  }
+}
+
+impl<Response> Handler<SendCommunityRoomMessage<Response>> for ChatServer
+where
+  Response: Serialize,
+{
+  type Result = ();
+
+  fn handle(&mut self, msg: SendCommunityRoomMessage<Response>, _: &mut Context<Self>) {
+    self
+      .send_community_room_message(&msg.op, &msg.response, msg.community_id, msg.websocket_id)
+      .ok();
+  }
+}
+
+impl Handler<SendPost> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendPost, _: &mut Context<Self>) {
+    self.send_post(&msg.op, &msg.post, msg.websocket_id).ok();
+  }
+}
+
+impl Handler<SendComment> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: SendComment, _: &mut Context<Self>) {
+    self
+      .send_comment(&msg.op, &msg.comment, msg.websocket_id)
+      .ok();
+  }
+}
+
+impl Handler<JoinUserRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinUserRoom, _: &mut Context<Self>) {
+    self.join_user_room(msg.user_id, msg.id).ok();
+  }
+}
+
+impl Handler<JoinCommunityRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinCommunityRoom, _: &mut Context<Self>) {
+    self.join_community_room(msg.community_id, msg.id).ok();
+  }
+}
+
+impl Handler<JoinPostRoom> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: JoinPostRoom, _: &mut Context<Self>) {
+    self.join_post_room(msg.post_id, msg.id).ok();
+  }
+}
+
+impl Handler<GetUsersOnline> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, _msg: GetUsersOnline, _: &mut Context<Self>) -> Self::Result {
+    self.sessions.len()
+  }
+}
+
+impl Handler<GetPostUsersOnline> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, msg: GetPostUsersOnline, _: &mut Context<Self>) -> Self::Result {
+    if let Some(users) = self.post_rooms.get(&msg.post_id) {
+      users.len()
+    } else {
+      0
+    }
+  }
+}
+
+impl Handler<GetCommunityUsersOnline> for ChatServer {
+  type Result = usize;
+
+  fn handle(&mut self, msg: GetCommunityUsersOnline, _: &mut Context<Self>) -> Self::Result {
+    if let Some(users) = self.community_rooms.get(&msg.community_id) {
+      users.len()
+    } else {
+      0
+    }
+  }
+}
+
+#[derive(Serialize)]
+struct WebsocketResponse<T> {
+  op: String,
+  data: T,
+}
+
+pub(super) fn to_json_string<Response>(
+  op: &UserOperation,
+  data: &Response,
+) -> Result<String, LemmyError>
+where
+  Response: Serialize,
+{
+  let response = WebsocketResponse {
+    op: op.to_string(),
+    data,
+  };
+  Ok(serde_json::to_string(&response)?)
+}
+
+impl Handler<CaptchaItem> for ChatServer {
+  type Result = ();
+
+  fn handle(&mut self, msg: CaptchaItem, _: &mut Context<Self>) {
+    self.captchas.push(msg);
+  }
+}
+
+impl Handler<CheckCaptcha> for ChatServer {
+  type Result = bool;
+
+  fn handle(&mut self, msg: CheckCaptcha, _: &mut Context<Self>) -> Self::Result {
+    // Remove all the ones that are past the expire time
+    self.captchas.retain(|x| x.expires.gt(&naive_now()));
+
+    let check = self
+      .captchas
+      .iter()
+      .any(|r| r.uuid == msg.uuid && r.answer == msg.answer);
+
+    // Remove this uuid so it can't be re-checked (Checks only work once)
+    self.captchas.retain(|x| x.uuid != msg.uuid);
+
+    check
+  }
+}
diff --git a/server/src/websocket/messages.rs b/server/src/websocket/messages.rs
new file mode 100644 (file)
index 0000000..41b0cbc
--- /dev/null
@@ -0,0 +1,137 @@
+use super::*;
+use crate::{
+  api::{comment::*, post::*},
+  websocket::UserOperation,
+  CommunityId,
+  ConnectionId,
+  IPAddr,
+  PostId,
+  UserId,
+};
+
+/// Chat server sends this messages to session
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct WSMessage(pub String);
+
+/// Message for chat server communications
+
+/// New chat session is created
+#[derive(Message)]
+#[rtype(usize)]
+pub struct Connect {
+  pub addr: Recipient<WSMessage>,
+  pub ip: IPAddr,
+}
+
+/// Session is disconnected
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct Disconnect {
+  pub id: ConnectionId,
+  pub ip: IPAddr,
+}
+
+/// The messages sent to websocket clients
+#[derive(Serialize, Deserialize, Message)]
+#[rtype(result = "Result<String, std::convert::Infallible>")]
+pub struct StandardMessage {
+  /// Id of the client session
+  pub id: ConnectionId,
+  /// Peer message
+  pub msg: String,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendAllMessage<Response> {
+  pub op: UserOperation,
+  pub response: Response,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendUserRoomMessage<Response> {
+  pub op: UserOperation,
+  pub response: Response,
+  pub recipient_id: UserId,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendCommunityRoomMessage<Response> {
+  pub op: UserOperation,
+  pub response: Response,
+  pub community_id: CommunityId,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendPost {
+  pub op: UserOperation,
+  pub post: PostResponse,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct SendComment {
+  pub op: UserOperation,
+  pub comment: CommentResponse,
+  pub websocket_id: Option<ConnectionId>,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinUserRoom {
+  pub user_id: UserId,
+  pub id: ConnectionId,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinCommunityRoom {
+  pub community_id: CommunityId,
+  pub id: ConnectionId,
+}
+
+#[derive(Message)]
+#[rtype(result = "()")]
+pub struct JoinPostRoom {
+  pub post_id: PostId,
+  pub id: ConnectionId,
+}
+
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetUsersOnline;
+
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetPostUsersOnline {
+  pub post_id: PostId,
+}
+
+#[derive(Message)]
+#[rtype(usize)]
+pub struct GetCommunityUsersOnline {
+  pub community_id: CommunityId,
+}
+
+#[derive(Message, Debug)]
+#[rtype(result = "()")]
+pub struct CaptchaItem {
+  pub uuid: String,
+  pub answer: String,
+  pub expires: chrono::NaiveDateTime,
+}
+
+#[derive(Message)]
+#[rtype(bool)]
+pub struct CheckCaptcha {
+  pub uuid: String,
+  pub answer: String,
+}
index 1430d89ae1555874e7f44132967bd5b3501bbd2d..8bf8766da4250b450f547de080388639f1ef755c 100644 (file)
@@ -1,4 +1,6 @@
-pub mod server;
+pub mod chat_server;
+pub mod handlers;
+pub mod messages;
 
 use actix::prelude::*;
 use diesel::{