]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Initial post-listing community non-local.
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use log::{error, info, warn};
10 use rand::{rngs::ThreadRng, Rng};
11 use serde::{Deserialize, Serialize};
12 use serde_json::Value;
13 use std::collections::{HashMap, HashSet};
14 use std::str::FromStr;
15 use std::time::SystemTime;
16 use strum::IntoEnumIterator;
17
18 use crate::api::comment::*;
19 use crate::api::community::*;
20 use crate::api::post::*;
21 use crate::api::site::*;
22 use crate::api::user::*;
23 use crate::api::*;
24 use crate::websocket::UserOperation;
25 use crate::Settings;
26
27 type ConnectionId = usize;
28 type PostId = i32;
29 type CommunityId = i32;
30 type UserId = i32;
31 type IPAddr = String;
32
33 /// Chat server sends this messages to session
34 #[derive(Message)]
35 #[rtype(result = "()")]
36 pub struct WSMessage(pub String);
37
38 /// Message for chat server communications
39
40 /// New chat session is created
41 #[derive(Message)]
42 #[rtype(usize)]
43 pub struct Connect {
44   pub addr: Recipient<WSMessage>,
45   pub ip: IPAddr,
46 }
47
48 /// Session is disconnected
49 #[derive(Message)]
50 #[rtype(result = "()")]
51 pub struct Disconnect {
52   pub id: ConnectionId,
53   pub ip: IPAddr,
54 }
55
56 #[derive(Serialize, Deserialize, Message)]
57 #[rtype(String)]
58 pub struct StandardMessage {
59   /// Id of the client session
60   pub id: ConnectionId,
61   /// Peer message
62   pub msg: String,
63 }
64
65 #[derive(Debug)]
66 pub struct RateLimitBucket {
67   last_checked: SystemTime,
68   allowance: f64,
69 }
70
71 pub struct SessionInfo {
72   pub addr: Recipient<WSMessage>,
73   pub ip: IPAddr,
74 }
75
76 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
77 pub enum RateLimitType {
78   Message,
79   Register,
80   Post,
81 }
82
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
84 /// session.
85 pub struct ChatServer {
86   /// A map from generated random ID to session addr
87   sessions: HashMap<ConnectionId, SessionInfo>,
88
89   /// A map from post_id to set of connectionIDs
90   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
91
92   /// A map from community to set of connectionIDs
93   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
94
95   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
96   /// sessions (IE clients)
97   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
98
99   /// Rate limiting based on rate type and IP addr
100   rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
101
102   rng: ThreadRng,
103   db: Pool<ConnectionManager<PgConnection>>,
104 }
105
106 impl ChatServer {
107   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
108     ChatServer {
109       sessions: HashMap::new(),
110       rate_limit_buckets: HashMap::new(),
111       post_rooms: HashMap::new(),
112       community_rooms: HashMap::new(),
113       user_rooms: HashMap::new(),
114       rng: rand::thread_rng(),
115       db,
116     }
117   }
118
119   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
120     // remove session from all rooms
121     for sessions in self.community_rooms.values_mut() {
122       sessions.remove(&id);
123     }
124
125     // Also leave all post rooms
126     // This avoids double messages
127     for sessions in self.post_rooms.values_mut() {
128       sessions.remove(&id);
129     }
130
131     // If the room doesn't exist yet
132     if self.community_rooms.get_mut(&community_id).is_none() {
133       self.community_rooms.insert(community_id, HashSet::new());
134     }
135
136     self
137       .community_rooms
138       .get_mut(&community_id)
139       .unwrap()
140       .insert(id);
141   }
142
143   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
144     // remove session from all rooms
145     for sessions in self.post_rooms.values_mut() {
146       sessions.remove(&id);
147     }
148
149     // Also leave all communities
150     // This avoids double messages
151     for sessions in self.community_rooms.values_mut() {
152       sessions.remove(&id);
153     }
154
155     // If the room doesn't exist yet
156     if self.post_rooms.get_mut(&post_id).is_none() {
157       self.post_rooms.insert(post_id, HashSet::new());
158     }
159
160     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
161   }
162
163   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
164     // remove session from all rooms
165     for sessions in self.user_rooms.values_mut() {
166       sessions.remove(&id);
167     }
168
169     // If the room doesn't exist yet
170     if self.user_rooms.get_mut(&user_id).is_none() {
171       self.user_rooms.insert(user_id, HashSet::new());
172     }
173
174     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
175   }
176
177   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
178     if let Some(sessions) = self.post_rooms.get(&post_id) {
179       for id in sessions {
180         if *id != skip_id {
181           if let Some(info) = self.sessions.get(id) {
182             let _ = info.addr.do_send(WSMessage(message.to_owned()));
183           }
184         }
185       }
186     }
187   }
188
189   fn send_community_room_message(
190     &self,
191     community_id: CommunityId,
192     message: &str,
193     skip_id: ConnectionId,
194   ) {
195     if let Some(sessions) = self.community_rooms.get(&community_id) {
196       for id in sessions {
197         if *id != skip_id {
198           if let Some(info) = self.sessions.get(id) {
199             let _ = info.addr.do_send(WSMessage(message.to_owned()));
200           }
201         }
202       }
203     }
204   }
205
206   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
207     if let Some(sessions) = self.user_rooms.get(&user_id) {
208       for id in sessions {
209         if *id != skip_id {
210           if let Some(info) = self.sessions.get(id) {
211             let _ = info.addr.do_send(WSMessage(message.to_owned()));
212           }
213         }
214       }
215     }
216   }
217
218   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
219     for id in self.sessions.keys() {
220       if *id != skip_id {
221         if let Some(info) = self.sessions.get(id) {
222           let _ = info.addr.do_send(WSMessage(message.to_owned()));
223         }
224       }
225     }
226   }
227
228   fn comment_sends(
229     &self,
230     user_operation: UserOperation,
231     comment: CommentResponse,
232     id: ConnectionId,
233   ) -> Result<String, Error> {
234     let mut comment_reply_sent = comment.clone();
235     comment_reply_sent.comment.my_vote = None;
236     comment_reply_sent.comment.user_id = None;
237
238     // For the post room ones, and the directs back to the user
239     // strip out the recipient_ids, so that
240     // users don't get double notifs
241     let mut comment_user_sent = comment.clone();
242     comment_user_sent.recipient_ids = Vec::new();
243
244     let mut comment_post_sent = comment_reply_sent.clone();
245     comment_post_sent.recipient_ids = Vec::new();
246
247     let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
248     let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
249     let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
250
251     // Send it to the post room
252     self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
253
254     // Send it to the recipient(s) including the mentioned users
255     for recipient_id in comment_reply_sent.recipient_ids {
256       self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
257     }
258
259     // Send it to the community too
260     self.send_community_room_message(0, &comment_post_sent_str, id);
261     self.send_community_room_message(comment.comment.community_id, &comment_post_sent_str, id);
262
263     Ok(comment_user_sent_str)
264   }
265
266   fn post_sends(
267     &self,
268     user_operation: UserOperation,
269     post: PostResponse,
270     id: ConnectionId,
271   ) -> Result<String, Error> {
272     let community_id = post.post.community_id;
273
274     // Don't send my data with it
275     let mut post_sent = post.clone();
276     post_sent.post.my_vote = None;
277     post_sent.post.user_id = None;
278     let post_sent_str = to_json_string(&user_operation, &post_sent)?;
279
280     // Send it to /c/all and that community
281     self.send_community_room_message(0, &post_sent_str, id);
282     self.send_community_room_message(community_id, &post_sent_str, id);
283
284     // Send it to the post room
285     self.send_post_room_message(post_sent.post.id, &post_sent_str, id);
286
287     to_json_string(&user_operation, post)
288   }
289
290   fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
291     self.check_rate_limit_full(
292       RateLimitType::Register,
293       id,
294       Settings::get().rate_limit.register,
295       Settings::get().rate_limit.register_per_second,
296       check_only,
297     )
298   }
299
300   fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
301     self.check_rate_limit_full(
302       RateLimitType::Post,
303       id,
304       Settings::get().rate_limit.post,
305       Settings::get().rate_limit.post_per_second,
306       check_only,
307     )
308   }
309
310   fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
311     self.check_rate_limit_full(
312       RateLimitType::Message,
313       id,
314       Settings::get().rate_limit.message,
315       Settings::get().rate_limit.message_per_second,
316       check_only,
317     )
318   }
319
320   #[allow(clippy::float_cmp)]
321   fn check_rate_limit_full(
322     &mut self,
323     type_: RateLimitType,
324     id: usize,
325     rate: i32,
326     per: i32,
327     check_only: bool,
328   ) -> Result<(), Error> {
329     if let Some(info) = self.sessions.get(&id) {
330       if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
331         if let Some(rate_limit) = bucket.get_mut(&info.ip) {
332           let current = SystemTime::now();
333           let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
334
335           // The initial value
336           if rate_limit.allowance == -2f64 {
337             rate_limit.allowance = rate as f64;
338           };
339
340           rate_limit.last_checked = current;
341           rate_limit.allowance += time_passed * (rate as f64 / per as f64);
342           if !check_only && rate_limit.allowance > rate as f64 {
343             rate_limit.allowance = rate as f64;
344           }
345
346           if rate_limit.allowance < 1.0 {
347             warn!(
348               "Rate limited IP: {}, time_passed: {}, allowance: {}",
349               &info.ip, time_passed, rate_limit.allowance
350             );
351             Err(
352               APIError {
353                 message: format!("Too many requests. {} per {} seconds", rate, per),
354               }
355               .into(),
356             )
357           } else {
358             if !check_only {
359               rate_limit.allowance -= 1.0;
360             }
361             Ok(())
362           }
363         } else {
364           Ok(())
365         }
366       } else {
367         Ok(())
368       }
369     } else {
370       Ok(())
371     }
372   }
373 }
374
375 /// Make actor from `ChatServer`
376 impl Actor for ChatServer {
377   /// We are going to use simple Context, we just need ability to communicate
378   /// with other actors.
379   type Context = Context<Self>;
380 }
381
382 /// Handler for Connect message.
383 ///
384 /// Register new session and assign unique id to this session
385 impl Handler<Connect> for ChatServer {
386   type Result = usize;
387
388   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
389     // register session with random id
390     let id = self.rng.gen::<usize>();
391     info!("{} joined", &msg.ip);
392
393     self.sessions.insert(
394       id,
395       SessionInfo {
396         addr: msg.addr,
397         ip: msg.ip.to_owned(),
398       },
399     );
400
401     for rate_limit_type in RateLimitType::iter() {
402       if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
403         self
404           .rate_limit_buckets
405           .insert(rate_limit_type, HashMap::new());
406       }
407
408       if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
409         if bucket.get(&msg.ip).is_none() {
410           bucket.insert(
411             msg.ip.to_owned(),
412             RateLimitBucket {
413               last_checked: SystemTime::now(),
414               allowance: -2f64,
415             },
416           );
417         }
418       }
419     }
420
421     id
422   }
423 }
424
425 /// Handler for Disconnect message.
426 impl Handler<Disconnect> for ChatServer {
427   type Result = ();
428
429   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
430     // Remove connections from sessions and all 3 scopes
431     if self.sessions.remove(&msg.id).is_some() {
432       for sessions in self.user_rooms.values_mut() {
433         sessions.remove(&msg.id);
434       }
435
436       for sessions in self.post_rooms.values_mut() {
437         sessions.remove(&msg.id);
438       }
439
440       for sessions in self.community_rooms.values_mut() {
441         sessions.remove(&msg.id);
442       }
443     }
444   }
445 }
446
447 /// Handler for Message message.
448 impl Handler<StandardMessage> for ChatServer {
449   type Result = MessageResult<StandardMessage>;
450
451   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
452     match parse_json_message(self, msg) {
453       Ok(m) => {
454         info!("Message Sent: {}", m);
455         MessageResult(m)
456       }
457       Err(e) => {
458         error!("Error during message handling {}", e);
459         MessageResult(e.to_string())
460       }
461     }
462   }
463 }
464
465 #[derive(Serialize)]
466 struct WebsocketResponse<T> {
467   op: String,
468   data: T,
469 }
470
471 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
472 where
473   T: Serialize,
474 {
475   let response = WebsocketResponse {
476     op: op.to_string(),
477     data,
478   };
479   Ok(serde_json::to_string(&response)?)
480 }
481
482 fn do_user_operation<'a, Data, Response>(
483   op: UserOperation,
484   data: &str,
485   conn: &PooledConnection<ConnectionManager<PgConnection>>,
486 ) -> Result<String, Error>
487 where
488   for<'de> Data: Deserialize<'de> + 'a,
489   Response: Serialize,
490   Oper<Data>: Perform<Response>,
491 {
492   let parsed_data: Data = serde_json::from_str(data)?;
493   let res = Oper::new(parsed_data).perform(&conn)?;
494   to_json_string(&op, &res)
495 }
496
497 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
498   let json: Value = serde_json::from_str(&msg.msg)?;
499   let data = &json["data"].to_string();
500   let op = &json["op"].as_str().ok_or(APIError {
501     message: "Unknown op type".to_string(),
502   })?;
503
504   let conn = chat.db.get()?;
505
506   let user_operation: UserOperation = UserOperation::from_str(&op)?;
507
508   // A DDOS check
509   chat.check_rate_limit_message(msg.id, false)?;
510
511   match user_operation {
512     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
513     UserOperation::Register => {
514       chat.check_rate_limit_register(msg.id, true)?;
515       let register: Register = serde_json::from_str(data)?;
516       let res = Oper::new(register).perform(&conn)?;
517       chat.check_rate_limit_register(msg.id, false)?;
518       to_json_string(&user_operation, &res)
519     }
520     UserOperation::GetUserDetails => {
521       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
522     }
523     UserOperation::SaveUserSettings => {
524       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
525     }
526     UserOperation::AddAdmin => {
527       let add_admin: AddAdmin = serde_json::from_str(data)?;
528       let res = Oper::new(add_admin).perform(&conn)?;
529       let res_str = to_json_string(&user_operation, &res)?;
530       chat.send_all_message(&res_str, msg.id);
531       Ok(res_str)
532     }
533     UserOperation::BanUser => {
534       let ban_user: BanUser = serde_json::from_str(data)?;
535       let res = Oper::new(ban_user).perform(&conn)?;
536       let res_str = to_json_string(&user_operation, &res)?;
537       chat.send_all_message(&res_str, msg.id);
538       Ok(res_str)
539     }
540     UserOperation::GetReplies => {
541       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
542     }
543     UserOperation::GetUserMentions => {
544       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
545     }
546     UserOperation::EditUserMention => {
547       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
548     }
549     UserOperation::MarkAllAsRead => {
550       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
551     }
552     UserOperation::GetCommunity => {
553       let get_community: GetCommunity = serde_json::from_str(data)?;
554
555       let mut res = Oper::new(get_community).perform(&conn)?;
556
557       let community_id = res.community.id;
558
559       chat.join_community_room(community_id, msg.id);
560
561       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
562         community_users.len()
563       } else {
564         0
565       };
566
567       to_json_string(&user_operation, &res)
568     }
569     UserOperation::ListCommunities => {
570       do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
571     }
572     UserOperation::CreateCommunity => {
573       chat.check_rate_limit_register(msg.id, true)?;
574       let create_community: CreateCommunity = serde_json::from_str(data)?;
575       let res = Oper::new(create_community).perform(&conn)?;
576       chat.check_rate_limit_register(msg.id, false)?;
577       to_json_string(&user_operation, &res)
578     }
579     UserOperation::EditCommunity => {
580       let edit_community: EditCommunity = serde_json::from_str(data)?;
581       let res = Oper::new(edit_community).perform(&conn)?;
582       let mut community_sent: CommunityResponse = res.clone();
583       community_sent.community.user_id = None;
584       community_sent.community.subscribed = None;
585       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
586       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
587       to_json_string(&user_operation, &res)
588     }
589     UserOperation::FollowCommunity => {
590       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
591     }
592     UserOperation::GetFollowedCommunities => do_user_operation::<
593       GetFollowedCommunities,
594       GetFollowedCommunitiesResponse,
595     >(user_operation, data, &conn),
596     UserOperation::BanFromCommunity => {
597       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
598       let community_id = ban_from_community.community_id;
599       let res = Oper::new(ban_from_community).perform(&conn)?;
600       let res_str = to_json_string(&user_operation, &res)?;
601       chat.send_community_room_message(community_id, &res_str, msg.id);
602       Ok(res_str)
603     }
604     UserOperation::AddModToCommunity => {
605       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
606       let community_id = mod_add_to_community.community_id;
607       let res = Oper::new(mod_add_to_community).perform(&conn)?;
608       let res_str = to_json_string(&user_operation, &res)?;
609       chat.send_community_room_message(community_id, &res_str, msg.id);
610       Ok(res_str)
611     }
612     UserOperation::ListCategories => {
613       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
614     }
615     UserOperation::GetPost => {
616       let get_post: GetPost = serde_json::from_str(data)?;
617       let post_id = get_post.id;
618       chat.join_post_room(post_id, msg.id);
619       let mut res = Oper::new(get_post).perform(&conn)?;
620
621       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
622         post_users.len()
623       } else {
624         0
625       };
626
627       to_json_string(&user_operation, &res)
628     }
629     UserOperation::GetPosts => {
630       let get_posts: GetPosts = serde_json::from_str(data)?;
631
632       if get_posts.community_id.is_none() {
633         // 0 is the "all" community
634         chat.join_community_room(0, msg.id);
635       }
636       let res = Oper::new(get_posts).perform(&conn)?;
637       to_json_string(&user_operation, &res)
638     }
639     UserOperation::GetComments => {
640       let get_comments: GetComments = serde_json::from_str(data)?;
641       if get_comments.community_id.is_none() {
642         // 0 is the "all" community
643         chat.join_community_room(0, msg.id);
644       }
645       let res = Oper::new(get_comments).perform(&conn)?;
646       to_json_string(&user_operation, &res)
647     }
648     UserOperation::CreatePost => {
649       chat.check_rate_limit_post(msg.id, true)?;
650       let create_post: CreatePost = serde_json::from_str(data)?;
651       let res = Oper::new(create_post).perform(&conn)?;
652       chat.check_rate_limit_post(msg.id, false)?;
653
654       chat.post_sends(UserOperation::CreatePost, res, msg.id)
655     }
656     UserOperation::CreatePostLike => {
657       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
658       let res = Oper::new(create_post_like).perform(&conn)?;
659
660       chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
661     }
662     UserOperation::EditPost => {
663       let edit_post: EditPost = serde_json::from_str(data)?;
664       let res = Oper::new(edit_post).perform(&conn)?;
665
666       chat.post_sends(UserOperation::EditPost, res, msg.id)
667     }
668     UserOperation::SavePost => {
669       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
670     }
671     UserOperation::CreateComment => {
672       let create_comment: CreateComment = serde_json::from_str(data)?;
673       let res = Oper::new(create_comment).perform(&conn)?;
674
675       chat.comment_sends(UserOperation::CreateComment, res, msg.id)
676     }
677     UserOperation::EditComment => {
678       let edit_comment: EditComment = serde_json::from_str(data)?;
679       let res = Oper::new(edit_comment).perform(&conn)?;
680
681       chat.comment_sends(UserOperation::EditComment, res, msg.id)
682     }
683     UserOperation::SaveComment => {
684       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
685     }
686     UserOperation::CreateCommentLike => {
687       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
688       let res = Oper::new(create_comment_like).perform(&conn)?;
689
690       chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
691     }
692     UserOperation::GetModlog => {
693       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
694     }
695     UserOperation::CreateSite => {
696       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
697     }
698     UserOperation::EditSite => {
699       let edit_site: EditSite = serde_json::from_str(data)?;
700       let res = Oper::new(edit_site).perform(&conn)?;
701       let res_str = to_json_string(&user_operation, &res)?;
702       chat.send_all_message(&res_str, msg.id);
703       Ok(res_str)
704     }
705     UserOperation::GetSite => {
706       let get_site: GetSite = serde_json::from_str(data)?;
707       let mut res = Oper::new(get_site).perform(&conn)?;
708       res.online = chat.sessions.len();
709       to_json_string(&user_operation, &res)
710     }
711     UserOperation::Search => {
712       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
713     }
714     UserOperation::TransferCommunity => {
715       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
716     }
717     UserOperation::TransferSite => {
718       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
719     }
720     UserOperation::DeleteAccount => {
721       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
722     }
723     UserOperation::PasswordReset => {
724       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
725     }
726     UserOperation::PasswordChange => {
727       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
728     }
729     UserOperation::CreatePrivateMessage => {
730       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
731       let recipient_id = create_private_message.recipient_id;
732       let res = Oper::new(create_private_message).perform(&conn)?;
733       let res_str = to_json_string(&user_operation, &res)?;
734
735       chat.send_user_room_message(recipient_id, &res_str, msg.id);
736       Ok(res_str)
737     }
738     UserOperation::EditPrivateMessage => {
739       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
740     }
741     UserOperation::GetPrivateMessages => {
742       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
743     }
744     UserOperation::UserJoin => {
745       let user_join: UserJoin = serde_json::from_str(data)?;
746       let res = Oper::new(user_join).perform(&conn)?;
747       chat.join_user_room(res.user_id, msg.id);
748       to_json_string(&user_operation, &res)
749     }
750   }
751 }