]> Untitled Git - lemmy.git/blobdiff - server/src/websocket/server.rs
Merge branch 'federation' into dev_1
[lemmy.git] / server / src / websocket / server.rs
index abdf9ea9bb1b6bb9aa8cdc74e92d273a622b89c3..0c606284537c4c6f3fa4de9b8edfca6d39a53a6d 100644 (file)
@@ -3,29 +3,35 @@
 //! room through `ChatServer`.
 
 use actix::prelude::*;
+use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
+use diesel::PgConnection;
+use failure::Error;
 use rand::{rngs::ThreadRng, Rng};
-use std::collections::{HashMap, HashSet};
 use serde::{Deserialize, Serialize};
-use serde_json::{Value};
+use serde_json::Value;
+use std::collections::{HashMap, HashSet};
 use std::str::FromStr;
-use failure::Error;
-use std::time::{SystemTime};
-
-use api::*;
-use api::user::*;
-use api::community::*;
-use api::post::*;
-use api::comment::*;
-use api::site::*;
-
-const RATE_LIMIT_MESSAGES: i32 = 30;
-const RATE_LIMIT_PER_SECOND: i32 = 60;
-const RATE_LIMIT_REGISTER_MESSAGES: i32 = 3;
-const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*3;
-
+use std::time::SystemTime;
+
+use crate::api::comment::*;
+use crate::api::community::*;
+use crate::api::post::*;
+use crate::api::site::*;
+use crate::api::user::*;
+use crate::api::*;
+use crate::apub::puller::*;
+use crate::websocket::UserOperation;
+use crate::Settings;
+
+type ConnectionId = usize;
+type PostId = i32;
+type CommunityId = i32;
+type UserId = i32;
+type IPAddr = String;
 
 /// Chat server sends this messages to session
 #[derive(Message)]
+#[rtype(result = "()")]
 pub struct WSMessage(pub String);
 
 /// Message for chat server communications
@@ -35,77 +41,121 @@ pub struct WSMessage(pub String);
 #[rtype(usize)]
 pub struct Connect {
   pub addr: Recipient<WSMessage>,
-  pub ip: String,
+  pub ip: IPAddr,
 }
 
 /// Session is disconnected
 #[derive(Message)]
+#[rtype(result = "()")]
 pub struct Disconnect {
-  pub id: usize,
-  pub ip: String,
-}
-
-/// Send message to specific room
-#[derive(Message)]
-pub struct ClientMessage {
-  /// Id of the client session
-  pub id: usize,
-  /// Peer message
-  pub msg: String,
-  /// Room name
-  pub room: String,
+  pub id: ConnectionId,
+  pub ip: IPAddr,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Serialize, Deserialize, Message)]
+#[rtype(String)]
 pub struct StandardMessage {
   /// Id of the client session
-  pub id: usize,
+  pub id: ConnectionId,
   /// Peer message
   pub msg: String,
 }
 
-impl actix::Message for StandardMessage {
-  type Result = String;
-}
-
 #[derive(Debug)]
 pub struct RateLimitBucket {
   last_checked: SystemTime,
-  allowance: f64
+  allowance: f64,
 }
 
 pub struct SessionInfo {
   pub addr: Recipient<WSMessage>,
-  pub ip: String,
+  pub ip: IPAddr,
 }
 
 /// `ChatServer` manages chat rooms and responsible for coordinating chat
-/// session. implementation is super primitive
+/// session.
 pub struct ChatServer {
-  sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
-  rate_limits: HashMap<String, RateLimitBucket>,
-  rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
+  /// A map from generated random ID to session addr
+  sessions: HashMap<ConnectionId, SessionInfo>,
+
+  /// A map from post_id to set of connectionIDs
+  post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
+
+  /// A map from community to set of connectionIDs
+  community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
+
+  /// A map from user id to its connection ID for joined users. Remember a user can have multiple
+  /// sessions (IE clients)
+  user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
+
+  /// Rate limiting based on IP addr
+  rate_limits: HashMap<IPAddr, RateLimitBucket>,
+
   rng: ThreadRng,
+  db: Pool<ConnectionManager<PgConnection>>,
 }
 
-impl Default for ChatServer {
-  fn default() -> ChatServer {
-    // default room
-    let rooms = HashMap::new();
-
+impl ChatServer {
+  pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
     ChatServer {
       sessions: HashMap::new(),
       rate_limits: HashMap::new(),
-      rooms: rooms,
+      post_rooms: HashMap::new(),
+      community_rooms: HashMap::new(),
+      user_rooms: HashMap::new(),
       rng: rand::thread_rng(),
+      db,
     }
   }
-}
 
-impl ChatServer {
-  /// Send message to all users in the room
-  fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
-    if let Some(sessions) = self.rooms.get(room) {
+  fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
+    // remove session from all rooms
+    for sessions in self.community_rooms.values_mut() {
+      sessions.remove(&id);
+    }
+
+    // If the room doesn't exist yet
+    if self.community_rooms.get_mut(&community_id).is_none() {
+      self.community_rooms.insert(community_id, HashSet::new());
+    }
+
+    self
+      .community_rooms
+      .get_mut(&community_id)
+      .unwrap()
+      .insert(id);
+  }
+
+  fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
+    // remove session from all rooms
+    for sessions in self.post_rooms.values_mut() {
+      sessions.remove(&id);
+    }
+
+    // If the room doesn't exist yet
+    if self.post_rooms.get_mut(&post_id).is_none() {
+      self.post_rooms.insert(post_id, HashSet::new());
+    }
+
+    self.post_rooms.get_mut(&post_id).unwrap().insert(id);
+  }
+
+  fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
+    // remove session from all rooms
+    for sessions in self.user_rooms.values_mut() {
+      sessions.remove(&id);
+    }
+
+    // If the room doesn't exist yet
+    if self.user_rooms.get_mut(&user_id).is_none() {
+      self.user_rooms.insert(user_id, HashSet::new());
+    }
+
+    self.user_rooms.get_mut(&user_id).unwrap().insert(id);
+  }
+
+  fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
+    if let Some(sessions) = self.post_rooms.get(&post_id) {
       for id in sessions {
         if *id != skip_id {
           if let Some(info) = self.sessions.get(id) {
@@ -116,50 +166,125 @@ impl ChatServer {
     }
   }
 
-  fn join_room(&mut self, room_id: i32, id: usize) {
-    // remove session from all rooms
-    for (_n, mut sessions) in &mut self.rooms {
-      sessions.remove(&id);
+  fn send_community_room_message(
+    &self,
+    community_id: CommunityId,
+    message: &str,
+    skip_id: ConnectionId,
+  ) {
+    if let Some(sessions) = self.community_rooms.get(&community_id) {
+      for id in sessions {
+        if *id != skip_id {
+          if let Some(info) = self.sessions.get(id) {
+            let _ = info.addr.do_send(WSMessage(message.to_owned()));
+          }
+        }
+      }
     }
+  }
 
-    // If the room doesn't exist yet
-    if self.rooms.get_mut(&room_id).is_none() {
-      self.rooms.insert(room_id, HashSet::new());
+  fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
+    if let Some(sessions) = self.user_rooms.get(&user_id) {
+      for id in sessions {
+        if *id != skip_id {
+          if let Some(info) = self.sessions.get(id) {
+            let _ = info.addr.do_send(WSMessage(message.to_owned()));
+          }
+        }
+      }
     }
+  }
 
-    &self.rooms.get_mut(&room_id).unwrap().insert(id);
+  fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
+    for id in self.sessions.keys() {
+      if *id != skip_id {
+        if let Some(info) = self.sessions.get(id) {
+          let _ = info.addr.do_send(WSMessage(message.to_owned()));
+        }
+      }
+    }
   }
 
-  fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
-    use db::*;
-    use db::post_view::*;
-    let conn = establish_connection();
-    let posts = PostView::list(&conn,
-                               PostListingType::Community, 
-                               &SortType::New, 
-                               Some(*community_id), 
-                               None,
-                               None, 
-                               None,
-                               false,
-                               false,
-                               None,
-                               Some(9999))?;
-    for post in posts {
-      self.send_room_message(&post.id, message, skip_id);
-    }
-
-    Ok(())
+  fn comment_sends(
+    &self,
+    user_operation: UserOperation,
+    comment: CommentResponse,
+    id: ConnectionId,
+  ) -> Result<String, Error> {
+    let mut comment_reply_sent = comment.clone();
+    comment_reply_sent.comment.my_vote = None;
+    comment_reply_sent.comment.user_id = None;
+
+    // For the post room ones, and the directs back to the user
+    // strip out the recipient_ids, so that
+    // users don't get double notifs
+    let mut comment_user_sent = comment.clone();
+    comment_user_sent.recipient_ids = Vec::new();
+
+    let mut comment_post_sent = comment_reply_sent.clone();
+    comment_post_sent.recipient_ids = Vec::new();
+
+    let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
+    let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
+    let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
+
+    // Send it to the post room
+    self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
+
+    // Send it to the recipient(s) including the mentioned users
+    for recipient_id in comment_reply_sent.recipient_ids {
+      self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
+    }
+
+    Ok(comment_user_sent_str)
+  }
+
+  fn post_sends(
+    &self,
+    user_operation: UserOperation,
+    post: PostResponse,
+    id: ConnectionId,
+  ) -> Result<String, Error> {
+    let community_id = post.post.community_id;
+
+    // Don't send my data with it
+    let mut post_sent = post.clone();
+    post_sent.post.my_vote = None;
+    post_sent.post.user_id = None;
+    let post_sent_str = to_json_string(&user_operation, &post_sent)?;
+
+    // Send it to /c/all and that community
+    self.send_community_room_message(0, &post_sent_str, id);
+    self.send_community_room_message(community_id, &post_sent_str, id);
+
+    to_json_string(&user_operation, post)
   }
 
   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
-    self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
+    self.check_rate_limit_full(
+      id,
+      Settings::get().rate_limit.register,
+      Settings::get().rate_limit.register_per_second,
+    )
+  }
+
+  fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
+    self.check_rate_limit_full(
+      id,
+      Settings::get().rate_limit.post,
+      Settings::get().rate_limit.post_per_second,
+    )
   }
 
-  fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
-    self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
+  fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
+    self.check_rate_limit_full(
+      id,
+      Settings::get().rate_limit.message,
+      Settings::get().rate_limit.message_per_second,
+    )
   }
 
+  #[allow(clippy::float_cmp)]
   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
     if let Some(info) = self.sessions.get(&id) {
       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
@@ -177,11 +302,16 @@ impl ChatServer {
         }
 
         if rate_limit.allowance < 1.0 {
-          println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
-          Err(APIError {
-            op: "Rate Limit".to_string(), 
-            message: format!("Too many requests. {} per {} seconds", rate, per),
-          })?
+          println!(
+            "Rate limited IP: {}, time_passed: {}, allowance: {}",
+            &info.ip, time_passed, rate_limit.allowance
+          );
+          Err(
+            APIError {
+              message: format!("Too many requests. {} per {} seconds", rate, per),
+            }
+            .into(),
+          )
         } else {
           rate_limit.allowance -= 1.0;
           Ok(())
@@ -195,7 +325,6 @@ impl ChatServer {
   }
 }
 
-
 /// Make actor from `ChatServer`
 impl Actor for ChatServer {
   /// We are going to use simple Context, we just need ability to communicate
@@ -210,34 +339,28 @@ impl Handler<Connect> for ChatServer {
   type Result = usize;
 
   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
-
-    // notify all users in same room
-    // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
-
     // register session with random id
     let id = self.rng.gen::<usize>();
     println!("{} joined", &msg.ip);
 
-    self.sessions.insert(id, SessionInfo {
-      addr: msg.addr,
-      ip: msg.ip.to_owned(),
-    });
+    self.sessions.insert(
+      id,
+      SessionInfo {
+        addr: msg.addr,
+        ip: msg.ip.to_owned(),
+      },
+    );
 
     if self.rate_limits.get(&msg.ip).is_none() {
-      self.rate_limits.insert(msg.ip, RateLimitBucket {
-        last_checked: SystemTime::now(),
-        allowance: -2f64,
-      });
+      self.rate_limits.insert(
+        msg.ip,
+        RateLimitBucket {
+          last_checked: SystemTime::now(),
+          allowance: -2f64,
+        },
+      );
     }
 
-    // for (k,v) in &self.rate_limits {
-    //   println!("{}: {:?}", k,v);
-    // }
-
-    // auto join session to Main room
-    // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
-
-    // send id back
     id
   }
 }
@@ -247,16 +370,18 @@ impl Handler<Disconnect> for ChatServer {
   type Result = ();
 
   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
+    // Remove connections from sessions and all 3 scopes
+    if self.sessions.remove(&msg.id).is_some() {
+      for sessions in self.user_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
 
-    // let mut rooms: Vec<i32> = Vec::new();
+      for sessions in self.post_rooms.values_mut() {
+        sessions.remove(&msg.id);
+      }
 
-    // remove address
-    if self.sessions.remove(&msg.id).is_some() {
-      // remove session from all rooms
-      for (_id, sessions) in &mut self.rooms {
-        if sessions.remove(&msg.id) {
-          // rooms.push(*id);
-        }
+      for sessions in self.community_rooms.values_mut() {
+        sessions.remove(&msg.id);
       }
     }
   }
@@ -266,221 +391,308 @@ impl Handler<Disconnect> for ChatServer {
 impl Handler<StandardMessage> for ChatServer {
   type Result = MessageResult<StandardMessage>;
 
-
   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
-
     let msg_out = match parse_json_message(self, msg) {
       Ok(m) => m,
-      Err(e) => e.to_string()
+      Err(e) => e.to_string(),
     };
 
+    println!("Message Sent: {}", msg_out);
     MessageResult(msg_out)
   }
 }
 
-fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
+#[derive(Serialize)]
+struct WebsocketResponse<T> {
+  op: String,
+  data: T,
+}
+
+fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
+where
+  T: Serialize,
+{
+  let response = WebsocketResponse {
+    op: op.to_string(),
+    data,
+  };
+  Ok(serde_json::to_string(&response)?)
+}
+
+fn do_user_operation<'a, Data, Response>(
+  op: UserOperation,
+  data: &str,
+  conn: &PooledConnection<ConnectionManager<PgConnection>>,
+) -> Result<String, Error>
+where
+  for<'de> Data: Deserialize<'de> + 'a,
+  Response: Serialize,
+  Oper<Data>: Perform<Response>,
+{
+  let parsed_data: Data = serde_json::from_str(data)?;
+  let res = Oper::new(parsed_data).perform(&conn)?;
+  to_json_string(&op, &res)
+}
 
+fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
   let json: Value = serde_json::from_str(&msg.msg)?;
   let data = &json["data"].to_string();
-  let op = &json["op"].as_str().unwrap();
+  let op = &json["op"].as_str().ok_or(APIError {
+    message: "Unknown op type".to_string(),
+  })?;
+
+  let conn = chat.db.get()?;
 
   let user_operation: UserOperation = UserOperation::from_str(&op)?;
 
+  // TODO: none of the chat messages are going to work if stuff is submitted via http api,
+  //       need to move that handling elsewhere
   match user_operation {
-    UserOperation::Login => {
-      let login: Login = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, login).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+    UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
     UserOperation::Register => {
       chat.check_rate_limit_register(msg.id)?;
-      let register: Register = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, register).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<Register, LoginResponse>(user_operation, data, &conn)
+    }
     UserOperation::GetUserDetails => {
-      let get_user_details: GetUserDetails = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_user_details).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
+    }
+    UserOperation::SaveUserSettings => {
+      do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
+    }
     UserOperation::AddAdmin => {
       let add_admin: AddAdmin = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, add_admin).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(add_admin).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_all_message(&res_str, msg.id);
+      Ok(res_str)
+    }
     UserOperation::BanUser => {
       let ban_user: BanUser = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, ban_user).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(ban_user).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_all_message(&res_str, msg.id);
+      Ok(res_str)
+    }
     UserOperation::GetReplies => {
-      let get_replies: GetReplies = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_replies).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
+    }
+    UserOperation::GetUserMentions => {
+      do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
+    }
+    UserOperation::EditUserMention => {
+      do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
+    }
     UserOperation::MarkAllAsRead => {
-      let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, mark_all_as_read).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
+    }
     UserOperation::GetCommunity => {
       let get_community: GetCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_community).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+
+      let mut res = if Settings::get().federation_enabled {
+        if let Some(community_name) = get_community.name.to_owned() {
+          if community_name.contains('@') {
+            // TODO: need to support sort, filter etc for remote communities
+            get_remote_community(community_name)?
+          // TODO what is this about
+          // get_community.name = Some(name.replace("!", ""));
+          } else {
+            Oper::new(get_community).perform(&conn)?
+          }
+        } else {
+          Oper::new(get_community).perform(&conn)?
+        }
+      } else {
+        Oper::new(get_community).perform(&conn)?
+      };
+
+      let community_id = res.community.id;
+
+      chat.join_community_room(community_id, msg.id);
+
+      res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
+        community_users.len()
+      } else {
+        0
+      };
+
+      to_json_string(&user_operation, &res)
+    }
     UserOperation::ListCommunities => {
-      let list_communities: ListCommunities = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, list_communities).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      if Settings::get().federation_enabled {
+        let res = get_all_communities()?;
+        let val = ListCommunitiesResponse { communities: res };
+        to_json_string(&user_operation, &val)
+      } else {
+        do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
+      }
+    }
     UserOperation::CreateCommunity => {
       chat.check_rate_limit_register(msg.id)?;
-      let create_community: CreateCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_community).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<CreateCommunity, CommunityResponse>(user_operation, data, &conn)
+    }
     UserOperation::EditCommunity => {
       let edit_community: EditCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_community).perform()?;
+      let res = Oper::new(edit_community).perform(&conn)?;
       let mut community_sent: CommunityResponse = res.clone();
       community_sent.community.user_id = None;
       community_sent.community.subscribed = None;
-      let community_sent_str = serde_json::to_string(&community_sent)?;
-      chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let community_sent_str = to_json_string(&user_operation, &community_sent)?;
+      chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
+      to_json_string(&user_operation, &res)
+    }
     UserOperation::FollowCommunity => {
-      let follow_community: FollowCommunity = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, follow_community).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
-    UserOperation::GetFollowedCommunities => {
-      let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, followed_communities).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
+    }
+    UserOperation::GetFollowedCommunities => do_user_operation::<
+      GetFollowedCommunities,
+      GetFollowedCommunitiesResponse,
+    >(user_operation, data, &conn),
     UserOperation::BanFromCommunity => {
       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
       let community_id = ban_from_community.community_id;
-      let res = Oper::new(user_operation, ban_from_community).perform()?;
-      let res_str = serde_json::to_string(&res)?;
-      chat.send_community_message(&community_id, &res_str, msg.id)?;
+      let res = Oper::new(ban_from_community).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_community_room_message(community_id, &res_str, msg.id);
       Ok(res_str)
-    },
+    }
     UserOperation::AddModToCommunity => {
       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
       let community_id = mod_add_to_community.community_id;
-      let res = Oper::new(user_operation, mod_add_to_community).perform()?;
-      let res_str = serde_json::to_string(&res)?;
-      chat.send_community_message(&community_id, &res_str, msg.id)?;
+      let res = Oper::new(mod_add_to_community).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_community_room_message(community_id, &res_str, msg.id);
       Ok(res_str)
-    },
+    }
     UserOperation::ListCategories => {
-      let list_categories: ListCategories = ListCategories;
-      let res = Oper::new(user_operation, list_categories).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
-    UserOperation::CreatePost => {
-      chat.check_rate_limit_register(msg.id)?;
-      let create_post: CreatePost = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_post).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
+    }
     UserOperation::GetPost => {
       let get_post: GetPost = serde_json::from_str(data)?;
-      chat.join_room(get_post.id, msg.id);
-      let res = Oper::new(user_operation, get_post).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let post_id = get_post.id;
+      chat.join_post_room(post_id, msg.id);
+      let mut res = Oper::new(get_post).perform(&conn)?;
+
+      res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
+        post_users.len()
+      } else {
+        0
+      };
+
+      to_json_string(&user_operation, &res)
+    }
     UserOperation::GetPosts => {
       let get_posts: GetPosts = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_posts).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      if get_posts.community_id.is_none() {
+        // 0 is the "all" community
+        chat.join_community_room(0, msg.id);
+      }
+      let res = Oper::new(get_posts).perform(&conn)?;
+      to_json_string(&user_operation, &res)
+    }
+    UserOperation::CreatePost => {
+      chat.check_rate_limit_post(msg.id)?;
+      let create_post: CreatePost = serde_json::from_str(data)?;
+      let res = Oper::new(create_post).perform(&conn)?;
+
+      chat.post_sends(UserOperation::CreatePost, res, msg.id)
+    }
     UserOperation::CreatePostLike => {
-      chat.check_rate_limit(msg.id)?;
+      chat.check_rate_limit_message(msg.id)?;
       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_post_like).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(create_post_like).perform(&conn)?;
+
+      chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
+    }
     UserOperation::EditPost => {
       let edit_post: EditPost = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_post).perform()?;
-      let mut post_sent = res.clone();
-      post_sent.post.my_vote = None;
-      let post_sent_str = serde_json::to_string(&post_sent)?;
-      chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(edit_post).perform(&conn)?;
+
+      chat.post_sends(UserOperation::EditPost, res, msg.id)
+    }
     UserOperation::SavePost => {
-      let save_post: SavePost = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, save_post).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
+    }
     UserOperation::CreateComment => {
-      chat.check_rate_limit(msg.id)?;
+      chat.check_rate_limit_message(msg.id)?;
       let create_comment: CreateComment = serde_json::from_str(data)?;
-      let post_id = create_comment.post_id;
-      let res = Oper::new(user_operation, create_comment).perform()?;
-      let mut comment_sent = res.clone();
-      comment_sent.comment.my_vote = None;
-      comment_sent.comment.user_id = None;
-      let comment_sent_str = serde_json::to_string(&comment_sent)?;
-      chat.send_room_message(&post_id, &comment_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(create_comment).perform(&conn)?;
+
+      chat.comment_sends(UserOperation::CreateComment, res, msg.id)
+    }
     UserOperation::EditComment => {
       let edit_comment: EditComment = serde_json::from_str(data)?;
-      let post_id = edit_comment.post_id;
-      let res = Oper::new(user_operation, edit_comment).perform()?;
-      let mut comment_sent = res.clone();
-      comment_sent.comment.my_vote = None;
-      comment_sent.comment.user_id = None;
-      let comment_sent_str = serde_json::to_string(&comment_sent)?;
-      chat.send_room_message(&post_id, &comment_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(edit_comment).perform(&conn)?;
+
+      chat.comment_sends(UserOperation::EditComment, res, msg.id)
+    }
     UserOperation::SaveComment => {
-      let save_comment: SaveComment = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, save_comment).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
+    }
     UserOperation::CreateCommentLike => {
-      chat.check_rate_limit(msg.id)?;
+      chat.check_rate_limit_message(msg.id)?;
       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
-      let post_id = create_comment_like.post_id;
-      let res = Oper::new(user_operation, create_comment_like).perform()?;
-      let mut comment_sent = res.clone();
-      comment_sent.comment.my_vote = None;
-      comment_sent.comment.user_id = None;
-      let comment_sent_str = serde_json::to_string(&comment_sent)?;
-      chat.send_room_message(&post_id, &comment_sent_str, msg.id);
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(create_comment_like).perform(&conn)?;
+
+      chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
+    }
     UserOperation::GetModlog => {
-      let get_modlog: GetModlog = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_modlog).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
+    }
     UserOperation::CreateSite => {
-      let create_site: CreateSite = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, create_site).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
+    }
     UserOperation::EditSite => {
       let edit_site: EditSite = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, edit_site).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let res = Oper::new(edit_site).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+      chat.send_all_message(&res_str, msg.id);
+      Ok(res_str)
+    }
     UserOperation::GetSite => {
       let get_site: GetSite = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, get_site).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      let mut res = Oper::new(get_site).perform(&conn)?;
+      res.online = chat.sessions.len();
+      to_json_string(&user_operation, &res)
+    }
     UserOperation::Search => {
-      let search: Search = serde_json::from_str(data)?;
-      let res = Oper::new(user_operation, search).perform()?;
-      Ok(serde_json::to_string(&res)?)
-    },
+      do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
+    }
+    UserOperation::TransferCommunity => {
+      do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
+    }
+    UserOperation::TransferSite => {
+      do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
+    }
+    UserOperation::DeleteAccount => {
+      do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
+    }
+    UserOperation::PasswordReset => {
+      do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
+    }
+    UserOperation::PasswordChange => {
+      do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
+    }
+    UserOperation::CreatePrivateMessage => {
+      chat.check_rate_limit_message(msg.id)?;
+      let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
+      let recipient_id = create_private_message.recipient_id;
+      let res = Oper::new(create_private_message).perform(&conn)?;
+      let res_str = to_json_string(&user_operation, &res)?;
+
+      chat.send_user_room_message(recipient_id, &res_str, msg.id);
+      Ok(res_str)
+    }
+    UserOperation::EditPrivateMessage => {
+      do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
+    }
+    UserOperation::GetPrivateMessages => {
+      do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
+    }
+    UserOperation::UserJoin => {
+      let user_join: UserJoin = serde_json::from_str(data)?;
+      let res = Oper::new(user_join).perform(&conn)?;
+      chat.join_user_room(res.user_id, msg.id);
+      to_json_string(&user_operation, &res)
+    }
   }
 }