]> Untitled Git - lemmy.git/blobdiff - server/src/websocket/server.rs
Merge branch 'federation' into dev_1
[lemmy.git] / server / src / websocket / server.rs
index 5efcb7bf41fe6496c44a07ebcf219409d2bb059e..0c606284537c4c6f3fa4de9b8edfca6d39a53a6d 100644 (file)
@@ -3,7 +3,7 @@
 //! room through `ChatServer`.
 
 use actix::prelude::*;
-use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
 use diesel::PgConnection;
 use failure::Error;
 use rand::{rngs::ThreadRng, Rng};
@@ -19,8 +19,16 @@ use crate::api::post::*;
 use crate::api::site::*;
 use crate::api::user::*;
 use crate::api::*;
+use crate::apub::puller::*;
+use crate::websocket::UserOperation;
 use crate::Settings;
 
+type ConnectionId = usize;
+type PostId = i32;
+type CommunityId = i32;
+type UserId = i32;
+type IPAddr = String;
+
 /// Chat server sends this messages to session
 #[derive(Message)]
 #[rtype(result = "()")]
@@ -33,35 +41,22 @@ pub struct WSMessage(pub String);
 #[rtype(usize)]
 pub struct Connect {
   pub addr: Recipient<WSMessage>,
-  pub ip: String,
+  pub ip: IPAddr,
 }
 
 /// Session is disconnected
 #[derive(Message)]
 #[rtype(result = "()")]
 pub struct Disconnect {
-  pub id: usize,
-  pub ip: String,
-}
-
-// TODO this is unused rn
-/// Send message to specific room
-#[derive(Message)]
-#[rtype(result = "()")]
-pub struct ClientMessage {
-  /// Id of the client session
-  pub id: usize,
-  /// Peer message
-  pub msg: String,
-  /// Room name
-  pub room: String,
+  pub id: ConnectionId,
+  pub ip: IPAddr,
 }
 
 #[derive(Serialize, Deserialize, Message)]
 #[rtype(String)]
 pub struct StandardMessage {
   /// Id of the client session
-  pub id: usize,
+  pub id: ConnectionId,
   /// Peer message
   pub msg: String,
 }
@@ -74,36 +69,93 @@ pub struct RateLimitBucket {
 
 pub struct SessionInfo {
   pub addr: Recipient<WSMessage>,
-  pub ip: String,
+  pub ip: IPAddr,
 }
 
 /// `ChatServer` manages chat rooms and responsible for coordinating chat
-/// session. implementation is super primitive
+/// session.
 pub struct ChatServer {
-  sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
-  rate_limits: HashMap<String, RateLimitBucket>,
-  rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
+  /// A map from generated random ID to session addr
+  sessions: HashMap<ConnectionId, SessionInfo>,
+
+  /// A map from post_id to set of connectionIDs
+  post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
+
+  /// A map from community to set of connectionIDs
+  community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
+
+  /// A map from user id to its connection ID for joined users. Remember a user can have multiple
+  /// sessions (IE clients)
+  user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
+
+  /// Rate limiting based on IP addr
+  rate_limits: HashMap<IPAddr, RateLimitBucket>,
+
   rng: ThreadRng,
   db: Pool<ConnectionManager<PgConnection>>,
 }
 
 impl ChatServer {
   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
-    // default room
-    let rooms = HashMap::new();
-
     ChatServer {
       sessions: HashMap::new(),
       rate_limits: HashMap::new(),
-      rooms,
+      post_rooms: HashMap::new(),
+      community_rooms: HashMap::new(),
+      user_rooms: HashMap::new(),
       rng: rand::thread_rng(),
       db,
     }
   }
 
-  /// Send message to all users in the room
-  fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
-    if let Some(sessions) = self.rooms.get(&room) {
+  fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
+    // remove session from all rooms
+    for sessions in self.community_rooms.values_mut() {
+      sessions.remove(&id);
+    }
+
+    // If the room doesn't exist yet
+    if self.community_rooms.get_mut(&community_id).is_none() {
+      self.community_rooms.insert(community_id, HashSet::new());
+    }
+
+    self
+      .community_rooms
+      .get_mut(&community_id)
+      .unwrap()
+      .insert(id);
+  }
+
+  fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
+    // remove session from all rooms
+    for sessions in self.post_rooms.values_mut() {
+      sessions.remove(&id);
+    }
+
+    // If the room doesn't exist yet
+    if self.post_rooms.get_mut(&post_id).is_none() {
+      self.post_rooms.insert(post_id, HashSet::new());
+    }
+
+    self.post_rooms.get_mut(&post_id).unwrap().insert(id);
+  }
+
+  fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
+    // remove session from all rooms
+    for sessions in self.user_rooms.values_mut() {
+      sessions.remove(&id);
+    }
+
+    // If the room doesn't exist yet
+    if self.user_rooms.get_mut(&user_id).is_none() {
+      self.user_rooms.insert(user_id, HashSet::new());
+    }
+
+    self.user_rooms.get_mut(&user_id).unwrap().insert(id);
+  }
+
+  fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
+    if let Some(sessions) = self.post_rooms.get(&post_id) {
       for id in sessions {
         if *id != skip_id {
           if let Some(info) = self.sessions.get(id) {
@@ -114,43 +166,98 @@ impl ChatServer {
     }
   }
 
-  fn join_room(&mut self, room_id: i32, id: usize) {
-    // remove session from all rooms
-    for sessions in self.rooms.values_mut() {
-      sessions.remove(&id);
+  fn send_community_room_message(
+    &self,
+    community_id: CommunityId,
+    message: &str,
+    skip_id: ConnectionId,
+  ) {
+    if let Some(sessions) = self.community_rooms.get(&community_id) {
+      for id in sessions {
+        if *id != skip_id {
+          if let Some(info) = self.sessions.get(id) {
+            let _ = info.addr.do_send(WSMessage(message.to_owned()));
+          }
+        }
+      }
     }
+  }
 
-    // If the room doesn't exist yet
-    if self.rooms.get_mut(&room_id).is_none() {
-      self.rooms.insert(room_id, HashSet::new());
+  fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
+    if let Some(sessions) = self.user_rooms.get(&user_id) {
+      for id in sessions {
+        if *id != skip_id {
+          if let Some(info) = self.sessions.get(id) {
+            let _ = info.addr.do_send(WSMessage(message.to_owned()));
+          }
+        }
+      }
     }
+  }
 
-    self.rooms.get_mut(&room_id).unwrap().insert(id);
+  fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
+    for id in self.sessions.keys() {
+      if *id != skip_id {
+        if let Some(info) = self.sessions.get(id) {
+          let _ = info.addr.do_send(WSMessage(message.to_owned()));
+        }
+      }
+    }
   }
 
-  fn send_community_message(
+  fn comment_sends(
     &self,
-    community_id: i32,
-    message: &str,
-    skip_id: usize,
-  ) -> Result<(), Error> {
-    use crate::db::post_view::*;
-    use crate::db::*;
+    user_operation: UserOperation,
+    comment: CommentResponse,
+    id: ConnectionId,
+  ) -> Result<String, Error> {
+    let mut comment_reply_sent = comment.clone();
+    comment_reply_sent.comment.my_vote = None;
+    comment_reply_sent.comment.user_id = None;
 
-    let conn = self.db.get()?;
+    // For the post room ones, and the directs back to the user
+    // strip out the recipient_ids, so that
+    // users don't get double notifs
+    let mut comment_user_sent = comment.clone();
+    comment_user_sent.recipient_ids = Vec::new();
 
-    let posts = PostQueryBuilder::create(&conn)
-      .listing_type(ListingType::Community)
-      .sort(&SortType::New)
-      .for_community_id(community_id)
-      .limit(9999)
-      .list()?;
+    let mut comment_post_sent = comment_reply_sent.clone();
+    comment_post_sent.recipient_ids = Vec::new();
 
-    for post in posts {
-      self.send_room_message(post.id, message, skip_id);
+    let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
+    let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
+    let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
+
+    // Send it to the post room
+    self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
+
+    // Send it to the recipient(s) including the mentioned users
+    for recipient_id in comment_reply_sent.recipient_ids {
+      self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
     }
 
-    Ok(())
+    Ok(comment_user_sent_str)
+  }
+
+  fn post_sends(
+    &self,
+    user_operation: UserOperation,
+    post: PostResponse,
+    id: ConnectionId,
+  ) -> Result<String, Error> {
+    let community_id = post.post.community_id;
+
+    // Don't send my data with it
+    let mut post_sent = post.clone();
+    post_sent.post.my_vote = None;
+    post_sent.post.user_id = None;
+    let post_sent_str = to_json_string(&user_operation, &post_sent)?;
+
+    // Send it to /c/all and that community
+    self.send_community_room_message(0, &post_sent_str, id);
+    self.send_community_room_message(community_id, &post_sent_str, id);
+
+    to_json_string(&user_operation, post)
   }
 
   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
@@ -201,7 +308,6 @@ impl ChatServer {
           );
           Err(
             APIError {
-              op: "Rate Limit".to_string(),
               message: format!("Too many requests. {} per {} seconds", rate, per),
             }
             .into(),
@@ -233,9 +339,6 @@ impl Handler<Connect> for ChatServer {
   type Result = usize;
 
   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
-    // notify all users in same room
-    // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
-
     // register session with random id
     let id = self.rng.gen::<usize>();
     println!("{} joined", &msg.ip);
@@ -267,15 +370,18 @@ impl Handler<Disconnect> for ChatServer {
   type Result = ();
 
   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
-    // let mut rooms: Vec<i32> = Vec::new();
-
-    // remove address
+    // Remove connections from sessions and all 3 scopes
     if self.sessions.remove(&msg.id).is_some() {
-      // remove session from all rooms
-      for sessions in self.rooms.values_mut() {
-        if sessions.remove(&msg.id) {
-          // rooms.push(*id);
-        }
+      for sessions in self.user_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+
+      for sessions in self.post_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
+
+      for sessions in self.community_rooms.values_mut() {
+        sessions.remove(&msg.id);
       }
     }
   }
@@ -291,15 +397,47 @@ impl Handler<StandardMessage> for ChatServer {
       Err(e) => e.to_string(),
     };
 
+    println!("Message Sent: {}", msg_out);
     MessageResult(msg_out)
   }
 }
 
+#[derive(Serialize)]
+struct WebsocketResponse<T> {
+  op: String,
+  data: T,
+}
+
+fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
+where
+  T: Serialize,
+{
+  let response = WebsocketResponse {
+    op: op.to_string(),
+    data,
+  };
+  Ok(serde_json::to_string(&response)?)
+}
+
+fn do_user_operation<'a, Data, Response>(
+  op: UserOperation,
+  data: &str,
+  conn: &PooledConnection<ConnectionManager<PgConnection>>,
+) -> Result<String, Error>
+where
+  for<'de> Data: Deserialize<'de> + 'a,
+  Response: Serialize,
+  Oper<Data>: Perform<Response>,
+{
+  let parsed_data: Data = serde_json::from_str(data)?;
+  let res = Oper::new(parsed_data).perform(&conn)?;
+  to_json_string(&op, &res)
+}
+
 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
   let json: Value = serde_json::from_str(&msg.msg)?;
   let data = &json["data"].to_string();
   let op = &json["op"].as_str().ok_or(APIError {
-    op: "Unknown op type".to_string(),
     message: "Unknown op type".to_string(),
   })?;
 
@@ -307,261 +445,254 @@ fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<Str
 
   let user_operation: UserOperation = UserOperation::from_str(&op)?;
 
+  // TODO: none of the chat messages are going to work if stuff is submitted via http api,
+  //       need to move that handling elsewhere
   match user_operation {
-    UserOperation::Login => {
-      let login: Login = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, login).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
-    }
+    UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
     UserOperation::Register => {
-      let register: Register = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, register).perform(&conn);
-      if res.is_ok() {
-        chat.check_rate_limit_register(msg.id)?;
-      }
-      Ok(serde_json::to_string(&res?)?)
+      chat.check_rate_limit_register(msg.id)?;
+      do_user_operation::<Register, LoginResponse>(user_operation, data, &conn)
     }
     UserOperation::GetUserDetails => {
-      let get_user_details: GetUserDetails = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_user_details).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
     }
     UserOperation::SaveUserSettings => {
-      let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, save_user_settings).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
     }
     UserOperation::AddAdmin => {
       let add_admin: AddAdmin = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, add_admin).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(add_admin).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_all_message(&res_str, msg.id);
+      Ok(res_str)
     }
     UserOperation::BanUser => {
       let ban_user: BanUser = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, ban_user).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(ban_user).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_all_message(&res_str, msg.id);
+      Ok(res_str)
     }
     UserOperation::GetReplies => {
-      let get_replies: GetReplies = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_replies).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
     }
     UserOperation::GetUserMentions => {
-      let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_user_mentions).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
     }
     UserOperation::EditUserMention => {
-      let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_user_mention).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
     }
     UserOperation::MarkAllAsRead => {
-      let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, mark_all_as_read).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
     }
     UserOperation::GetCommunity => {
       let get_community: GetCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_community).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+
+      let mut res = if Settings::get().federation_enabled {
+        if let Some(community_name) = get_community.name.to_owned() {
+          if community_name.contains('@') {
+            // TODO: need to support sort, filter etc for remote communities
+            get_remote_community(community_name)?
+          // TODO what is this about
+          // get_community.name = Some(name.replace("!", ""));
+          } else {
+            Oper::new(get_community).perform(&conn)?
+          }
+        } else {
+          Oper::new(get_community).perform(&conn)?
+        }
+      } else {
+        Oper::new(get_community).perform(&conn)?
+      };
+
+      let community_id = res.community.id;
+
+      chat.join_community_room(community_id, msg.id);
+
+      res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
+        community_users.len()
+      } else {
+        0
+      };
+
+      to_json_string(&user_operation, &res)
     }
     UserOperation::ListCommunities => {
-      let list_communities: ListCommunities = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, list_communities).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      if Settings::get().federation_enabled {
+        let res = get_all_communities()?;
+        let val = ListCommunitiesResponse { communities: res };
+        to_json_string(&user_operation, &val)
+      } else {
+        do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
+      }
     }
     UserOperation::CreateCommunity => {
       chat.check_rate_limit_register(msg.id)?;
-      let create_community: CreateCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_community).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<CreateCommunity, CommunityResponse>(user_operation, data, &conn)
     }
     UserOperation::EditCommunity => {
       let edit_community: EditCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_community).perform(&conn)?;
+      let res = Oper::new(edit_community).perform(&conn)?;
       let mut community_sent: CommunityResponse = res.clone();
       community_sent.community.user_id = None;
       community_sent.community.subscribed = None;
-      let community_sent_str = serde_json::to_string(&community_sent)?;
-      chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
-      Ok(serde_json::to_string(&res)?)
+      let community_sent_str = to_json_string(&user_operation, &community_sent)?;
+      chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
+      to_json_string(&user_operation, &res)
     }
     UserOperation::FollowCommunity => {
-      let follow_community: FollowCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, follow_community).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
-    }
-    UserOperation::GetFollowedCommunities => {
-      let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, followed_communities).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
     }
+    UserOperation::GetFollowedCommunities => do_user_operation::<
+      GetFollowedCommunities,
+      GetFollowedCommunitiesResponse,
+    >(user_operation, data, &conn),
     UserOperation::BanFromCommunity => {
       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
       let community_id = ban_from_community.community_id;
-      let res = Oper::new(user_operation, ban_from_community).perform(&conn)?;
-      let res_str = serde_json::to_string(&res)?;
-      chat.send_community_message(community_id, &res_str, msg.id)?;
+      let res = Oper::new(ban_from_community).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_community_room_message(community_id, &res_str, msg.id);
       Ok(res_str)
     }
     UserOperation::AddModToCommunity => {
       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
       let community_id = mod_add_to_community.community_id;
-      let res = Oper::new(user_operation, mod_add_to_community).perform(&conn)?;
-      let res_str = serde_json::to_string(&res)?;
-      chat.send_community_message(community_id, &res_str, msg.id)?;
+      let res = Oper::new(mod_add_to_community).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_community_room_message(community_id, &res_str, msg.id);
       Ok(res_str)
     }
     UserOperation::ListCategories => {
-      let list_categories: ListCategories = ListCategories;
-      let res = Oper::new(user_operation, list_categories).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
-    }
-    UserOperation::CreatePost => {
-      chat.check_rate_limit_post(msg.id)?;
-      let create_post: CreatePost = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_post).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
     }
     UserOperation::GetPost => {
       let get_post: GetPost = serde_json::from_str(data)?;
-      chat.join_room(get_post.id, msg.id);
-      let res = Oper::new(user_operation, get_post).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      let post_id = get_post.id;
+      chat.join_post_room(post_id, msg.id);
+      let mut res = Oper::new(get_post).perform(&conn)?;
+
+      res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
+        post_users.len()
+      } else {
+        0
+      };
+
+      to_json_string(&user_operation, &res)
     }
     UserOperation::GetPosts => {
       let get_posts: GetPosts = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_posts).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      if get_posts.community_id.is_none() {
+        // 0 is the "all" community
+        chat.join_community_room(0, msg.id);
+      }
+      let res = Oper::new(get_posts).perform(&conn)?;
+      to_json_string(&user_operation, &res)
+    }
+    UserOperation::CreatePost => {
+      chat.check_rate_limit_post(msg.id)?;
+      let create_post: CreatePost = serde_json::from_str(data)?;
+      let res = Oper::new(create_post).perform(&conn)?;
+
+      chat.post_sends(UserOperation::CreatePost, res, msg.id)
     }
     UserOperation::CreatePostLike => {
       chat.check_rate_limit_message(msg.id)?;
       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_post_like).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(create_post_like).perform(&conn)?;
+
+      chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
     }
     UserOperation::EditPost => {
       let edit_post: EditPost = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_post).perform(&conn)?;
-      let mut post_sent = res.clone();
-      post_sent.post.my_vote = None;
-      let post_sent_str = serde_json::to_string(&post_sent)?;
-      chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(edit_post).perform(&conn)?;
+
+      chat.post_sends(UserOperation::EditPost, res, msg.id)
     }
     UserOperation::SavePost => {
-      let save_post: SavePost = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, save_post).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
     }
     UserOperation::CreateComment => {
       chat.check_rate_limit_message(msg.id)?;
       let create_comment: CreateComment = serde_json::from_str(data)?;
-      let post_id = create_comment.post_id;
-      let res = Oper::new(user_operation, create_comment).perform(&conn)?;
-      let mut comment_sent = res.clone();
-      comment_sent.comment.my_vote = None;
-      comment_sent.comment.user_id = None;
-      let comment_sent_str = serde_json::to_string(&comment_sent)?;
-      chat.send_room_message(post_id, &comment_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(create_comment).perform(&conn)?;
+
+      chat.comment_sends(UserOperation::CreateComment, res, msg.id)
     }
     UserOperation::EditComment => {
       let edit_comment: EditComment = serde_json::from_str(data)?;
-      let post_id = edit_comment.post_id;
-      let res = Oper::new(user_operation, edit_comment).perform(&conn)?;
-      let mut comment_sent = res.clone();
-      comment_sent.comment.my_vote = None;
-      comment_sent.comment.user_id = None;
-      let comment_sent_str = serde_json::to_string(&comment_sent)?;
-      chat.send_room_message(post_id, &comment_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(edit_comment).perform(&conn)?;
+
+      chat.comment_sends(UserOperation::EditComment, res, msg.id)
     }
     UserOperation::SaveComment => {
-      let save_comment: SaveComment = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, save_comment).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
     }
     UserOperation::CreateCommentLike => {
       chat.check_rate_limit_message(msg.id)?;
       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
-      let post_id = create_comment_like.post_id;
-      let res = Oper::new(user_operation, create_comment_like).perform(&conn)?;
-      let mut comment_sent = res.clone();
-      comment_sent.comment.my_vote = None;
-      comment_sent.comment.user_id = None;
-      let comment_sent_str = serde_json::to_string(&comment_sent)?;
-      chat.send_room_message(post_id, &comment_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(create_comment_like).perform(&conn)?;
+
+      chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
     }
     UserOperation::GetModlog => {
-      let get_modlog: GetModlog = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_modlog).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
     }
     UserOperation::CreateSite => {
-      let create_site: CreateSite = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_site).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
     }
     UserOperation::EditSite => {
       let edit_site: EditSite = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_site).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      let res = Oper::new(edit_site).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_all_message(&res_str, msg.id);
+      Ok(res_str)
     }
     UserOperation::GetSite => {
-      let online: usize = chat.sessions.len();
       let get_site: GetSite = serde_json::from_str(data)?;
-      let mut res = Oper::new(user_operation, get_site).perform(&conn)?;
-      res.online = online;
-      Ok(serde_json::to_string(&res)?)
+      let mut res = Oper::new(get_site).perform(&conn)?;
+      res.online = chat.sessions.len();
+      to_json_string(&user_operation, &res)
     }
     UserOperation::Search => {
-      let search: Search = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, search).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
     }
     UserOperation::TransferCommunity => {
-      let transfer_community: TransferCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, transfer_community).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
     }
     UserOperation::TransferSite => {
-      let transfer_site: TransferSite = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, transfer_site).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
     }
     UserOperation::DeleteAccount => {
-      let delete_account: DeleteAccount = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, delete_account).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
     }
     UserOperation::PasswordReset => {
-      let password_reset: PasswordReset = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, password_reset).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
     }
     UserOperation::PasswordChange => {
-      let password_change: PasswordChange = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, password_change).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
     }
     UserOperation::CreatePrivateMessage => {
       chat.check_rate_limit_message(msg.id)?;
       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_private_message).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      let recipient_id = create_private_message.recipient_id;
+      let res = Oper::new(create_private_message).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+
+      chat.send_user_room_message(recipient_id, &res_str, msg.id);
+      Ok(res_str)
     }
     UserOperation::EditPrivateMessage => {
-      let edit_private_message: EditPrivateMessage = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_private_message).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
     }
     UserOperation::GetPrivateMessages => {
-      let messages: GetPrivateMessages = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, messages).perform(&conn)?;
-      Ok(serde_json::to_string(&res)?)
+      do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
+    }
+    UserOperation::UserJoin => {
+      let user_join: UserJoin = serde_json::from_str(data)?;
+      let res = Oper::new(user_join).perform(&conn)?;
+      chat.join_user_room(res.user_id, msg.id);
+      to_json_string(&user_operation, &res)
     }
   }
 }