]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'dev' into federation
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15 use strum::IntoEnumIterator;
16
17 use crate::api::comment::*;
18 use crate::api::community::*;
19 use crate::api::post::*;
20 use crate::api::site::*;
21 use crate::api::user::*;
22 use crate::api::*;
23 use crate::apub::puller::*;
24 use crate::websocket::UserOperation;
25 use crate::Settings;
26
27 type ConnectionId = usize;
28 type PostId = i32;
29 type CommunityId = i32;
30 type UserId = i32;
31 type IPAddr = String;
32
33 /// Chat server sends this messages to session
34 #[derive(Message)]
35 #[rtype(result = "()")]
36 pub struct WSMessage(pub String);
37
38 /// Message for chat server communications
39
40 /// New chat session is created
41 #[derive(Message)]
42 #[rtype(usize)]
43 pub struct Connect {
44   pub addr: Recipient<WSMessage>,
45   pub ip: IPAddr,
46 }
47
48 /// Session is disconnected
49 #[derive(Message)]
50 #[rtype(result = "()")]
51 pub struct Disconnect {
52   pub id: ConnectionId,
53   pub ip: IPAddr,
54 }
55
56 #[derive(Serialize, Deserialize, Message)]
57 #[rtype(String)]
58 pub struct StandardMessage {
59   /// Id of the client session
60   pub id: ConnectionId,
61   /// Peer message
62   pub msg: String,
63 }
64
65 #[derive(Debug)]
66 pub struct RateLimitBucket {
67   last_checked: SystemTime,
68   allowance: f64,
69 }
70
71 pub struct SessionInfo {
72   pub addr: Recipient<WSMessage>,
73   pub ip: IPAddr,
74 }
75
76 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
77 pub enum RateLimitType {
78   Message,
79   Register,
80   Post,
81 }
82
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
84 /// session.
85 pub struct ChatServer {
86   /// A map from generated random ID to session addr
87   sessions: HashMap<ConnectionId, SessionInfo>,
88
89   /// A map from post_id to set of connectionIDs
90   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
91
92   /// A map from community to set of connectionIDs
93   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
94
95   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
96   /// sessions (IE clients)
97   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
98
99   /// Rate limiting based on rate type and IP addr
100   rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
101
102   rng: ThreadRng,
103   db: Pool<ConnectionManager<PgConnection>>,
104 }
105
106 impl ChatServer {
107   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
108     ChatServer {
109       sessions: HashMap::new(),
110       rate_limit_buckets: HashMap::new(),
111       post_rooms: HashMap::new(),
112       community_rooms: HashMap::new(),
113       user_rooms: HashMap::new(),
114       rng: rand::thread_rng(),
115       db,
116     }
117   }
118
119   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
120     // remove session from all rooms
121     for sessions in self.community_rooms.values_mut() {
122       sessions.remove(&id);
123     }
124
125     // If the room doesn't exist yet
126     if self.community_rooms.get_mut(&community_id).is_none() {
127       self.community_rooms.insert(community_id, HashSet::new());
128     }
129
130     self
131       .community_rooms
132       .get_mut(&community_id)
133       .unwrap()
134       .insert(id);
135   }
136
137   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
138     // remove session from all rooms
139     for sessions in self.post_rooms.values_mut() {
140       sessions.remove(&id);
141     }
142
143     // If the room doesn't exist yet
144     if self.post_rooms.get_mut(&post_id).is_none() {
145       self.post_rooms.insert(post_id, HashSet::new());
146     }
147
148     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
149   }
150
151   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
152     // remove session from all rooms
153     for sessions in self.user_rooms.values_mut() {
154       sessions.remove(&id);
155     }
156
157     // If the room doesn't exist yet
158     if self.user_rooms.get_mut(&user_id).is_none() {
159       self.user_rooms.insert(user_id, HashSet::new());
160     }
161
162     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
163   }
164
165   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
166     if let Some(sessions) = self.post_rooms.get(&post_id) {
167       for id in sessions {
168         if *id != skip_id {
169           if let Some(info) = self.sessions.get(id) {
170             let _ = info.addr.do_send(WSMessage(message.to_owned()));
171           }
172         }
173       }
174     }
175   }
176
177   fn send_community_room_message(
178     &self,
179     community_id: CommunityId,
180     message: &str,
181     skip_id: ConnectionId,
182   ) {
183     if let Some(sessions) = self.community_rooms.get(&community_id) {
184       for id in sessions {
185         if *id != skip_id {
186           if let Some(info) = self.sessions.get(id) {
187             let _ = info.addr.do_send(WSMessage(message.to_owned()));
188           }
189         }
190       }
191     }
192   }
193
194   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
195     if let Some(sessions) = self.user_rooms.get(&user_id) {
196       for id in sessions {
197         if *id != skip_id {
198           if let Some(info) = self.sessions.get(id) {
199             let _ = info.addr.do_send(WSMessage(message.to_owned()));
200           }
201         }
202       }
203     }
204   }
205
206   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
207     for id in self.sessions.keys() {
208       if *id != skip_id {
209         if let Some(info) = self.sessions.get(id) {
210           let _ = info.addr.do_send(WSMessage(message.to_owned()));
211         }
212       }
213     }
214   }
215
216   fn comment_sends(
217     &self,
218     user_operation: UserOperation,
219     comment: CommentResponse,
220     id: ConnectionId,
221   ) -> Result<String, Error> {
222     let mut comment_reply_sent = comment.clone();
223     comment_reply_sent.comment.my_vote = None;
224     comment_reply_sent.comment.user_id = None;
225
226     // For the post room ones, and the directs back to the user
227     // strip out the recipient_ids, so that
228     // users don't get double notifs
229     let mut comment_user_sent = comment.clone();
230     comment_user_sent.recipient_ids = Vec::new();
231
232     let mut comment_post_sent = comment_reply_sent.clone();
233     comment_post_sent.recipient_ids = Vec::new();
234
235     let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
236     let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
237     let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
238
239     // Send it to the post room
240     self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
241
242     // Send it to the recipient(s) including the mentioned users
243     for recipient_id in comment_reply_sent.recipient_ids {
244       self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
245     }
246
247     Ok(comment_user_sent_str)
248   }
249
250   fn post_sends(
251     &self,
252     user_operation: UserOperation,
253     post: PostResponse,
254     id: ConnectionId,
255   ) -> Result<String, Error> {
256     let community_id = post.post.community_id;
257
258     // Don't send my data with it
259     let mut post_sent = post.clone();
260     post_sent.post.my_vote = None;
261     post_sent.post.user_id = None;
262     let post_sent_str = to_json_string(&user_operation, &post_sent)?;
263
264     // Send it to /c/all and that community
265     self.send_community_room_message(0, &post_sent_str, id);
266     self.send_community_room_message(community_id, &post_sent_str, id);
267
268     to_json_string(&user_operation, post)
269   }
270
271   fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
272     self.check_rate_limit_full(
273       RateLimitType::Register,
274       id,
275       Settings::get().rate_limit.register,
276       Settings::get().rate_limit.register_per_second,
277       check_only,
278     )
279   }
280
281   fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
282     self.check_rate_limit_full(
283       RateLimitType::Post,
284       id,
285       Settings::get().rate_limit.post,
286       Settings::get().rate_limit.post_per_second,
287       check_only,
288     )
289   }
290
291   fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
292     self.check_rate_limit_full(
293       RateLimitType::Message,
294       id,
295       Settings::get().rate_limit.message,
296       Settings::get().rate_limit.message_per_second,
297       check_only,
298     )
299   }
300
301   #[allow(clippy::float_cmp)]
302   fn check_rate_limit_full(
303     &mut self,
304     type_: RateLimitType,
305     id: usize,
306     rate: i32,
307     per: i32,
308     check_only: bool,
309   ) -> Result<(), Error> {
310     if let Some(info) = self.sessions.get(&id) {
311       if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
312         if let Some(rate_limit) = bucket.get_mut(&info.ip) {
313           let current = SystemTime::now();
314           let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
315
316           // The initial value
317           if rate_limit.allowance == -2f64 {
318             rate_limit.allowance = rate as f64;
319           };
320
321           rate_limit.last_checked = current;
322           rate_limit.allowance += time_passed * (rate as f64 / per as f64);
323           if !check_only && rate_limit.allowance > rate as f64 {
324             rate_limit.allowance = rate as f64;
325           }
326
327           if rate_limit.allowance < 1.0 {
328             println!(
329               "Rate limited IP: {}, time_passed: {}, allowance: {}",
330               &info.ip, time_passed, rate_limit.allowance
331             );
332             Err(
333               APIError {
334                 message: format!("Too many requests. {} per {} seconds", rate, per),
335               }
336               .into(),
337             )
338           } else {
339             if !check_only {
340               rate_limit.allowance -= 1.0;
341             }
342             Ok(())
343           }
344         } else {
345           Ok(())
346         }
347       } else {
348         Ok(())
349       }
350     } else {
351       Ok(())
352     }
353   }
354 }
355
356 /// Make actor from `ChatServer`
357 impl Actor for ChatServer {
358   /// We are going to use simple Context, we just need ability to communicate
359   /// with other actors.
360   type Context = Context<Self>;
361 }
362
363 /// Handler for Connect message.
364 ///
365 /// Register new session and assign unique id to this session
366 impl Handler<Connect> for ChatServer {
367   type Result = usize;
368
369   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
370     // register session with random id
371     let id = self.rng.gen::<usize>();
372     println!("{} joined", &msg.ip);
373
374     self.sessions.insert(
375       id,
376       SessionInfo {
377         addr: msg.addr,
378         ip: msg.ip.to_owned(),
379       },
380     );
381
382     for rate_limit_type in RateLimitType::iter() {
383       if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
384         self
385           .rate_limit_buckets
386           .insert(rate_limit_type, HashMap::new());
387       }
388
389       if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
390         if bucket.get(&msg.ip).is_none() {
391           bucket.insert(
392             msg.ip.to_owned(),
393             RateLimitBucket {
394               last_checked: SystemTime::now(),
395               allowance: -2f64,
396             },
397           );
398         }
399       }
400     }
401
402     id
403   }
404 }
405
406 /// Handler for Disconnect message.
407 impl Handler<Disconnect> for ChatServer {
408   type Result = ();
409
410   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
411     // Remove connections from sessions and all 3 scopes
412     if self.sessions.remove(&msg.id).is_some() {
413       for sessions in self.user_rooms.values_mut() {
414         sessions.remove(&msg.id);
415       }
416
417       for sessions in self.post_rooms.values_mut() {
418         sessions.remove(&msg.id);
419       }
420
421       for sessions in self.community_rooms.values_mut() {
422         sessions.remove(&msg.id);
423       }
424     }
425   }
426 }
427
428 /// Handler for Message message.
429 impl Handler<StandardMessage> for ChatServer {
430   type Result = MessageResult<StandardMessage>;
431
432   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
433     let msg_out = match parse_json_message(self, msg) {
434       Ok(m) => m,
435       Err(e) => e.to_string(),
436     };
437
438     println!("Message Sent: {}", msg_out);
439     MessageResult(msg_out)
440   }
441 }
442
443 #[derive(Serialize)]
444 struct WebsocketResponse<T> {
445   op: String,
446   data: T,
447 }
448
449 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
450 where
451   T: Serialize,
452 {
453   let response = WebsocketResponse {
454     op: op.to_string(),
455     data,
456   };
457   Ok(serde_json::to_string(&response)?)
458 }
459
460 fn do_user_operation<'a, Data, Response>(
461   op: UserOperation,
462   data: &str,
463   conn: &PooledConnection<ConnectionManager<PgConnection>>,
464 ) -> Result<String, Error>
465 where
466   for<'de> Data: Deserialize<'de> + 'a,
467   Response: Serialize,
468   Oper<Data>: Perform<Response>,
469 {
470   let parsed_data: Data = serde_json::from_str(data)?;
471   let res = Oper::new(parsed_data).perform(&conn)?;
472   to_json_string(&op, &res)
473 }
474
475 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
476   let json: Value = serde_json::from_str(&msg.msg)?;
477   let data = &json["data"].to_string();
478   let op = &json["op"].as_str().ok_or(APIError {
479     message: "Unknown op type".to_string(),
480   })?;
481
482   let conn = chat.db.get()?;
483
484   let user_operation: UserOperation = UserOperation::from_str(&op)?;
485
486   // TODO: none of the chat messages are going to work if stuff is submitted via http api,
487   //       need to move that handling elsewhere
488
489   // A DDOS check
490   chat.check_rate_limit_message(msg.id, false)?;
491
492   match user_operation {
493     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
494     UserOperation::Register => {
495       chat.check_rate_limit_register(msg.id, true)?;
496       let register: Register = serde_json::from_str(data)?;
497       let res = Oper::new(register).perform(&conn)?;
498       chat.check_rate_limit_register(msg.id, false)?;
499       to_json_string(&user_operation, &res)
500     }
501     UserOperation::GetUserDetails => {
502       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
503     }
504     UserOperation::SaveUserSettings => {
505       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
506     }
507     UserOperation::AddAdmin => {
508       let add_admin: AddAdmin = serde_json::from_str(data)?;
509       let res = Oper::new(add_admin).perform(&conn)?;
510       let res_str = to_json_string(&user_operation, &res)?;
511       chat.send_all_message(&res_str, msg.id);
512       Ok(res_str)
513     }
514     UserOperation::BanUser => {
515       let ban_user: BanUser = serde_json::from_str(data)?;
516       let res = Oper::new(ban_user).perform(&conn)?;
517       let res_str = to_json_string(&user_operation, &res)?;
518       chat.send_all_message(&res_str, msg.id);
519       Ok(res_str)
520     }
521     UserOperation::GetReplies => {
522       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
523     }
524     UserOperation::GetUserMentions => {
525       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
526     }
527     UserOperation::EditUserMention => {
528       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
529     }
530     UserOperation::MarkAllAsRead => {
531       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
532     }
533     UserOperation::GetCommunity => {
534       let get_community: GetCommunity = serde_json::from_str(data)?;
535
536       let mut res = if Settings::get().federation_enabled {
537         if let Some(community_name) = get_community.name.to_owned() {
538           if community_name.contains('@') {
539             // TODO: need to support sort, filter etc for remote communities
540             get_remote_community(community_name)?
541           // TODO what is this about
542           // get_community.name = Some(name.replace("!", ""));
543           } else {
544             Oper::new(get_community).perform(&conn)?
545           }
546         } else {
547           Oper::new(get_community).perform(&conn)?
548         }
549       } else {
550         Oper::new(get_community).perform(&conn)?
551       };
552
553       let community_id = res.community.id;
554
555       chat.join_community_room(community_id, msg.id);
556
557       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
558         community_users.len()
559       } else {
560         0
561       };
562
563       to_json_string(&user_operation, &res)
564     }
565     UserOperation::ListCommunities => {
566       if Settings::get().federation_enabled {
567         let res = get_all_communities()?;
568         let val = ListCommunitiesResponse { communities: res };
569         to_json_string(&user_operation, &val)
570       } else {
571         do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
572       }
573     }
574     UserOperation::CreateCommunity => {
575       chat.check_rate_limit_register(msg.id, true)?;
576       let create_community: CreateCommunity = serde_json::from_str(data)?;
577       let res = Oper::new(create_community).perform(&conn)?;
578       chat.check_rate_limit_register(msg.id, false)?;
579       to_json_string(&user_operation, &res)
580     }
581     UserOperation::EditCommunity => {
582       let edit_community: EditCommunity = serde_json::from_str(data)?;
583       let res = Oper::new(edit_community).perform(&conn)?;
584       let mut community_sent: CommunityResponse = res.clone();
585       community_sent.community.user_id = None;
586       community_sent.community.subscribed = None;
587       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
588       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
589       to_json_string(&user_operation, &res)
590     }
591     UserOperation::FollowCommunity => {
592       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
593     }
594     UserOperation::GetFollowedCommunities => do_user_operation::<
595       GetFollowedCommunities,
596       GetFollowedCommunitiesResponse,
597     >(user_operation, data, &conn),
598     UserOperation::BanFromCommunity => {
599       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
600       let community_id = ban_from_community.community_id;
601       let res = Oper::new(ban_from_community).perform(&conn)?;
602       let res_str = to_json_string(&user_operation, &res)?;
603       chat.send_community_room_message(community_id, &res_str, msg.id);
604       Ok(res_str)
605     }
606     UserOperation::AddModToCommunity => {
607       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
608       let community_id = mod_add_to_community.community_id;
609       let res = Oper::new(mod_add_to_community).perform(&conn)?;
610       let res_str = to_json_string(&user_operation, &res)?;
611       chat.send_community_room_message(community_id, &res_str, msg.id);
612       Ok(res_str)
613     }
614     UserOperation::ListCategories => {
615       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
616     }
617     UserOperation::GetPost => {
618       let get_post: GetPost = serde_json::from_str(data)?;
619       let post_id = get_post.id;
620       chat.join_post_room(post_id, msg.id);
621       let mut res = Oper::new(get_post).perform(&conn)?;
622
623       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
624         post_users.len()
625       } else {
626         0
627       };
628
629       to_json_string(&user_operation, &res)
630     }
631     UserOperation::GetPosts => {
632       let get_posts: GetPosts = serde_json::from_str(data)?;
633       if get_posts.community_id.is_none() {
634         // 0 is the "all" community
635         chat.join_community_room(0, msg.id);
636       }
637       let res = Oper::new(get_posts).perform(&conn)?;
638       to_json_string(&user_operation, &res)
639     }
640     UserOperation::CreatePost => {
641       chat.check_rate_limit_post(msg.id, true)?;
642       let create_post: CreatePost = serde_json::from_str(data)?;
643       let res = Oper::new(create_post).perform(&conn)?;
644       chat.check_rate_limit_post(msg.id, false)?;
645
646       chat.post_sends(UserOperation::CreatePost, res, msg.id)
647     }
648     UserOperation::CreatePostLike => {
649       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
650       let res = Oper::new(create_post_like).perform(&conn)?;
651
652       chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
653     }
654     UserOperation::EditPost => {
655       let edit_post: EditPost = serde_json::from_str(data)?;
656       let res = Oper::new(edit_post).perform(&conn)?;
657
658       chat.post_sends(UserOperation::EditPost, res, msg.id)
659     }
660     UserOperation::SavePost => {
661       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
662     }
663     UserOperation::CreateComment => {
664       let create_comment: CreateComment = serde_json::from_str(data)?;
665       let res = Oper::new(create_comment).perform(&conn)?;
666
667       chat.comment_sends(UserOperation::CreateComment, res, msg.id)
668     }
669     UserOperation::EditComment => {
670       let edit_comment: EditComment = serde_json::from_str(data)?;
671       let res = Oper::new(edit_comment).perform(&conn)?;
672
673       chat.comment_sends(UserOperation::EditComment, res, msg.id)
674     }
675     UserOperation::SaveComment => {
676       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
677     }
678     UserOperation::CreateCommentLike => {
679       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
680       let res = Oper::new(create_comment_like).perform(&conn)?;
681
682       chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
683     }
684     UserOperation::GetModlog => {
685       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
686     }
687     UserOperation::CreateSite => {
688       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
689     }
690     UserOperation::EditSite => {
691       let edit_site: EditSite = serde_json::from_str(data)?;
692       let res = Oper::new(edit_site).perform(&conn)?;
693       let res_str = to_json_string(&user_operation, &res)?;
694       chat.send_all_message(&res_str, msg.id);
695       Ok(res_str)
696     }
697     UserOperation::GetSite => {
698       let get_site: GetSite = serde_json::from_str(data)?;
699       let mut res = Oper::new(get_site).perform(&conn)?;
700       res.online = chat.sessions.len();
701       to_json_string(&user_operation, &res)
702     }
703     UserOperation::Search => {
704       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
705     }
706     UserOperation::TransferCommunity => {
707       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
708     }
709     UserOperation::TransferSite => {
710       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
711     }
712     UserOperation::DeleteAccount => {
713       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
714     }
715     UserOperation::PasswordReset => {
716       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
717     }
718     UserOperation::PasswordChange => {
719       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
720     }
721     UserOperation::CreatePrivateMessage => {
722       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
723       let recipient_id = create_private_message.recipient_id;
724       let res = Oper::new(create_private_message).perform(&conn)?;
725       let res_str = to_json_string(&user_operation, &res)?;
726
727       chat.send_user_room_message(recipient_id, &res_str, msg.id);
728       Ok(res_str)
729     }
730     UserOperation::EditPrivateMessage => {
731       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
732     }
733     UserOperation::GetPrivateMessages => {
734       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
735     }
736     UserOperation::UserJoin => {
737       let user_join: UserJoin = serde_json::from_str(data)?;
738       let res = Oper::new(user_join).perform(&conn)?;
739       chat.join_user_room(res.user_id, msg.id);
740       to_json_string(&user_operation, &res)
741     }
742   }
743 }