]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'federation' into dev_1
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
21 use crate::api::*;
22 use crate::apub::puller::*;
23 use crate::websocket::UserOperation;
24 use crate::Settings;
25
26 type ConnectionId = usize;
27 type PostId = i32;
28 type CommunityId = i32;
29 type UserId = i32;
30 type IPAddr = String;
31
32 /// Chat server sends this messages to session
33 #[derive(Message)]
34 #[rtype(result = "()")]
35 pub struct WSMessage(pub String);
36
37 /// Message for chat server communications
38
39 /// New chat session is created
40 #[derive(Message)]
41 #[rtype(usize)]
42 pub struct Connect {
43   pub addr: Recipient<WSMessage>,
44   pub ip: IPAddr,
45 }
46
47 /// Session is disconnected
48 #[derive(Message)]
49 #[rtype(result = "()")]
50 pub struct Disconnect {
51   pub id: ConnectionId,
52   pub ip: IPAddr,
53 }
54
55 #[derive(Serialize, Deserialize, Message)]
56 #[rtype(String)]
57 pub struct StandardMessage {
58   /// Id of the client session
59   pub id: ConnectionId,
60   /// Peer message
61   pub msg: String,
62 }
63
64 #[derive(Debug)]
65 pub struct RateLimitBucket {
66   last_checked: SystemTime,
67   allowance: f64,
68 }
69
70 pub struct SessionInfo {
71   pub addr: Recipient<WSMessage>,
72   pub ip: IPAddr,
73 }
74
75 /// `ChatServer` manages chat rooms and responsible for coordinating chat
76 /// session.
77 pub struct ChatServer {
78   /// A map from generated random ID to session addr
79   sessions: HashMap<ConnectionId, SessionInfo>,
80
81   /// A map from post_id to set of connectionIDs
82   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
83
84   /// A map from community to set of connectionIDs
85   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
86
87   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
88   /// sessions (IE clients)
89   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
90
91   /// Rate limiting based on IP addr
92   rate_limits: HashMap<IPAddr, RateLimitBucket>,
93
94   rng: ThreadRng,
95   db: Pool<ConnectionManager<PgConnection>>,
96 }
97
98 impl ChatServer {
99   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
100     ChatServer {
101       sessions: HashMap::new(),
102       rate_limits: HashMap::new(),
103       post_rooms: HashMap::new(),
104       community_rooms: HashMap::new(),
105       user_rooms: HashMap::new(),
106       rng: rand::thread_rng(),
107       db,
108     }
109   }
110
111   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
112     // remove session from all rooms
113     for sessions in self.community_rooms.values_mut() {
114       sessions.remove(&id);
115     }
116
117     // If the room doesn't exist yet
118     if self.community_rooms.get_mut(&community_id).is_none() {
119       self.community_rooms.insert(community_id, HashSet::new());
120     }
121
122     self
123       .community_rooms
124       .get_mut(&community_id)
125       .unwrap()
126       .insert(id);
127   }
128
129   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
130     // remove session from all rooms
131     for sessions in self.post_rooms.values_mut() {
132       sessions.remove(&id);
133     }
134
135     // If the room doesn't exist yet
136     if self.post_rooms.get_mut(&post_id).is_none() {
137       self.post_rooms.insert(post_id, HashSet::new());
138     }
139
140     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
141   }
142
143   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
144     // remove session from all rooms
145     for sessions in self.user_rooms.values_mut() {
146       sessions.remove(&id);
147     }
148
149     // If the room doesn't exist yet
150     if self.user_rooms.get_mut(&user_id).is_none() {
151       self.user_rooms.insert(user_id, HashSet::new());
152     }
153
154     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
155   }
156
157   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
158     if let Some(sessions) = self.post_rooms.get(&post_id) {
159       for id in sessions {
160         if *id != skip_id {
161           if let Some(info) = self.sessions.get(id) {
162             let _ = info.addr.do_send(WSMessage(message.to_owned()));
163           }
164         }
165       }
166     }
167   }
168
169   fn send_community_room_message(
170     &self,
171     community_id: CommunityId,
172     message: &str,
173     skip_id: ConnectionId,
174   ) {
175     if let Some(sessions) = self.community_rooms.get(&community_id) {
176       for id in sessions {
177         if *id != skip_id {
178           if let Some(info) = self.sessions.get(id) {
179             let _ = info.addr.do_send(WSMessage(message.to_owned()));
180           }
181         }
182       }
183     }
184   }
185
186   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
187     if let Some(sessions) = self.user_rooms.get(&user_id) {
188       for id in sessions {
189         if *id != skip_id {
190           if let Some(info) = self.sessions.get(id) {
191             let _ = info.addr.do_send(WSMessage(message.to_owned()));
192           }
193         }
194       }
195     }
196   }
197
198   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
199     for id in self.sessions.keys() {
200       if *id != skip_id {
201         if let Some(info) = self.sessions.get(id) {
202           let _ = info.addr.do_send(WSMessage(message.to_owned()));
203         }
204       }
205     }
206   }
207
208   fn comment_sends(
209     &self,
210     user_operation: UserOperation,
211     comment: CommentResponse,
212     id: ConnectionId,
213   ) -> Result<String, Error> {
214     let mut comment_reply_sent = comment.clone();
215     comment_reply_sent.comment.my_vote = None;
216     comment_reply_sent.comment.user_id = None;
217
218     // For the post room ones, and the directs back to the user
219     // strip out the recipient_ids, so that
220     // users don't get double notifs
221     let mut comment_user_sent = comment.clone();
222     comment_user_sent.recipient_ids = Vec::new();
223
224     let mut comment_post_sent = comment_reply_sent.clone();
225     comment_post_sent.recipient_ids = Vec::new();
226
227     let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
228     let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
229     let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
230
231     // Send it to the post room
232     self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
233
234     // Send it to the recipient(s) including the mentioned users
235     for recipient_id in comment_reply_sent.recipient_ids {
236       self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
237     }
238
239     Ok(comment_user_sent_str)
240   }
241
242   fn post_sends(
243     &self,
244     user_operation: UserOperation,
245     post: PostResponse,
246     id: ConnectionId,
247   ) -> Result<String, Error> {
248     let community_id = post.post.community_id;
249
250     // Don't send my data with it
251     let mut post_sent = post.clone();
252     post_sent.post.my_vote = None;
253     post_sent.post.user_id = None;
254     let post_sent_str = to_json_string(&user_operation, &post_sent)?;
255
256     // Send it to /c/all and that community
257     self.send_community_room_message(0, &post_sent_str, id);
258     self.send_community_room_message(community_id, &post_sent_str, id);
259
260     to_json_string(&user_operation, post)
261   }
262
263   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
264     self.check_rate_limit_full(
265       id,
266       Settings::get().rate_limit.register,
267       Settings::get().rate_limit.register_per_second,
268     )
269   }
270
271   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
272     self.check_rate_limit_full(
273       id,
274       Settings::get().rate_limit.post,
275       Settings::get().rate_limit.post_per_second,
276     )
277   }
278
279   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
280     self.check_rate_limit_full(
281       id,
282       Settings::get().rate_limit.message,
283       Settings::get().rate_limit.message_per_second,
284     )
285   }
286
287   #[allow(clippy::float_cmp)]
288   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
289     if let Some(info) = self.sessions.get(&id) {
290       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
291         // The initial value
292         if rate_limit.allowance == -2f64 {
293           rate_limit.allowance = rate as f64;
294         };
295
296         let current = SystemTime::now();
297         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
298         rate_limit.last_checked = current;
299         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
300         if rate_limit.allowance > rate as f64 {
301           rate_limit.allowance = rate as f64;
302         }
303
304         if rate_limit.allowance < 1.0 {
305           println!(
306             "Rate limited IP: {}, time_passed: {}, allowance: {}",
307             &info.ip, time_passed, rate_limit.allowance
308           );
309           Err(
310             APIError {
311               message: format!("Too many requests. {} per {} seconds", rate, per),
312             }
313             .into(),
314           )
315         } else {
316           rate_limit.allowance -= 1.0;
317           Ok(())
318         }
319       } else {
320         Ok(())
321       }
322     } else {
323       Ok(())
324     }
325   }
326 }
327
328 /// Make actor from `ChatServer`
329 impl Actor for ChatServer {
330   /// We are going to use simple Context, we just need ability to communicate
331   /// with other actors.
332   type Context = Context<Self>;
333 }
334
335 /// Handler for Connect message.
336 ///
337 /// Register new session and assign unique id to this session
338 impl Handler<Connect> for ChatServer {
339   type Result = usize;
340
341   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
342     // register session with random id
343     let id = self.rng.gen::<usize>();
344     println!("{} joined", &msg.ip);
345
346     self.sessions.insert(
347       id,
348       SessionInfo {
349         addr: msg.addr,
350         ip: msg.ip.to_owned(),
351       },
352     );
353
354     if self.rate_limits.get(&msg.ip).is_none() {
355       self.rate_limits.insert(
356         msg.ip,
357         RateLimitBucket {
358           last_checked: SystemTime::now(),
359           allowance: -2f64,
360         },
361       );
362     }
363
364     id
365   }
366 }
367
368 /// Handler for Disconnect message.
369 impl Handler<Disconnect> for ChatServer {
370   type Result = ();
371
372   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
373     // Remove connections from sessions and all 3 scopes
374     if self.sessions.remove(&msg.id).is_some() {
375       for sessions in self.user_rooms.values_mut() {
376         sessions.remove(&msg.id);
377       }
378
379       for sessions in self.post_rooms.values_mut() {
380         sessions.remove(&msg.id);
381       }
382
383       for sessions in self.community_rooms.values_mut() {
384         sessions.remove(&msg.id);
385       }
386     }
387   }
388 }
389
390 /// Handler for Message message.
391 impl Handler<StandardMessage> for ChatServer {
392   type Result = MessageResult<StandardMessage>;
393
394   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
395     let msg_out = match parse_json_message(self, msg) {
396       Ok(m) => m,
397       Err(e) => e.to_string(),
398     };
399
400     println!("Message Sent: {}", msg_out);
401     MessageResult(msg_out)
402   }
403 }
404
405 #[derive(Serialize)]
406 struct WebsocketResponse<T> {
407   op: String,
408   data: T,
409 }
410
411 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
412 where
413   T: Serialize,
414 {
415   let response = WebsocketResponse {
416     op: op.to_string(),
417     data,
418   };
419   Ok(serde_json::to_string(&response)?)
420 }
421
422 fn do_user_operation<'a, Data, Response>(
423   op: UserOperation,
424   data: &str,
425   conn: &PooledConnection<ConnectionManager<PgConnection>>,
426 ) -> Result<String, Error>
427 where
428   for<'de> Data: Deserialize<'de> + 'a,
429   Response: Serialize,
430   Oper<Data>: Perform<Response>,
431 {
432   let parsed_data: Data = serde_json::from_str(data)?;
433   let res = Oper::new(parsed_data).perform(&conn)?;
434   to_json_string(&op, &res)
435 }
436
437 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
438   let json: Value = serde_json::from_str(&msg.msg)?;
439   let data = &json["data"].to_string();
440   let op = &json["op"].as_str().ok_or(APIError {
441     message: "Unknown op type".to_string(),
442   })?;
443
444   let conn = chat.db.get()?;
445
446   let user_operation: UserOperation = UserOperation::from_str(&op)?;
447
448   // TODO: none of the chat messages are going to work if stuff is submitted via http api,
449   //       need to move that handling elsewhere
450   match user_operation {
451     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
452     UserOperation::Register => {
453       chat.check_rate_limit_register(msg.id)?;
454       do_user_operation::<Register, LoginResponse>(user_operation, data, &conn)
455     }
456     UserOperation::GetUserDetails => {
457       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
458     }
459     UserOperation::SaveUserSettings => {
460       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
461     }
462     UserOperation::AddAdmin => {
463       let add_admin: AddAdmin = serde_json::from_str(data)?;
464       let res = Oper::new(add_admin).perform(&conn)?;
465       let res_str = to_json_string(&user_operation, &res)?;
466       chat.send_all_message(&res_str, msg.id);
467       Ok(res_str)
468     }
469     UserOperation::BanUser => {
470       let ban_user: BanUser = serde_json::from_str(data)?;
471       let res = Oper::new(ban_user).perform(&conn)?;
472       let res_str = to_json_string(&user_operation, &res)?;
473       chat.send_all_message(&res_str, msg.id);
474       Ok(res_str)
475     }
476     UserOperation::GetReplies => {
477       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
478     }
479     UserOperation::GetUserMentions => {
480       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
481     }
482     UserOperation::EditUserMention => {
483       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
484     }
485     UserOperation::MarkAllAsRead => {
486       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
487     }
488     UserOperation::GetCommunity => {
489       let get_community: GetCommunity = serde_json::from_str(data)?;
490
491       let mut res = if Settings::get().federation_enabled {
492         if let Some(community_name) = get_community.name.to_owned() {
493           if community_name.contains('@') {
494             // TODO: need to support sort, filter etc for remote communities
495             get_remote_community(community_name)?
496           // TODO what is this about
497           // get_community.name = Some(name.replace("!", ""));
498           } else {
499             Oper::new(get_community).perform(&conn)?
500           }
501         } else {
502           Oper::new(get_community).perform(&conn)?
503         }
504       } else {
505         Oper::new(get_community).perform(&conn)?
506       };
507
508       let community_id = res.community.id;
509
510       chat.join_community_room(community_id, msg.id);
511
512       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
513         community_users.len()
514       } else {
515         0
516       };
517
518       to_json_string(&user_operation, &res)
519     }
520     UserOperation::ListCommunities => {
521       if Settings::get().federation_enabled {
522         let res = get_all_communities()?;
523         let val = ListCommunitiesResponse { communities: res };
524         to_json_string(&user_operation, &val)
525       } else {
526         do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
527       }
528     }
529     UserOperation::CreateCommunity => {
530       chat.check_rate_limit_register(msg.id)?;
531       do_user_operation::<CreateCommunity, CommunityResponse>(user_operation, data, &conn)
532     }
533     UserOperation::EditCommunity => {
534       let edit_community: EditCommunity = serde_json::from_str(data)?;
535       let res = Oper::new(edit_community).perform(&conn)?;
536       let mut community_sent: CommunityResponse = res.clone();
537       community_sent.community.user_id = None;
538       community_sent.community.subscribed = None;
539       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
540       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
541       to_json_string(&user_operation, &res)
542     }
543     UserOperation::FollowCommunity => {
544       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
545     }
546     UserOperation::GetFollowedCommunities => do_user_operation::<
547       GetFollowedCommunities,
548       GetFollowedCommunitiesResponse,
549     >(user_operation, data, &conn),
550     UserOperation::BanFromCommunity => {
551       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
552       let community_id = ban_from_community.community_id;
553       let res = Oper::new(ban_from_community).perform(&conn)?;
554       let res_str = to_json_string(&user_operation, &res)?;
555       chat.send_community_room_message(community_id, &res_str, msg.id);
556       Ok(res_str)
557     }
558     UserOperation::AddModToCommunity => {
559       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
560       let community_id = mod_add_to_community.community_id;
561       let res = Oper::new(mod_add_to_community).perform(&conn)?;
562       let res_str = to_json_string(&user_operation, &res)?;
563       chat.send_community_room_message(community_id, &res_str, msg.id);
564       Ok(res_str)
565     }
566     UserOperation::ListCategories => {
567       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
568     }
569     UserOperation::GetPost => {
570       let get_post: GetPost = serde_json::from_str(data)?;
571       let post_id = get_post.id;
572       chat.join_post_room(post_id, msg.id);
573       let mut res = Oper::new(get_post).perform(&conn)?;
574
575       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
576         post_users.len()
577       } else {
578         0
579       };
580
581       to_json_string(&user_operation, &res)
582     }
583     UserOperation::GetPosts => {
584       let get_posts: GetPosts = serde_json::from_str(data)?;
585       if get_posts.community_id.is_none() {
586         // 0 is the "all" community
587         chat.join_community_room(0, msg.id);
588       }
589       let res = Oper::new(get_posts).perform(&conn)?;
590       to_json_string(&user_operation, &res)
591     }
592     UserOperation::CreatePost => {
593       chat.check_rate_limit_post(msg.id)?;
594       let create_post: CreatePost = serde_json::from_str(data)?;
595       let res = Oper::new(create_post).perform(&conn)?;
596
597       chat.post_sends(UserOperation::CreatePost, res, msg.id)
598     }
599     UserOperation::CreatePostLike => {
600       chat.check_rate_limit_message(msg.id)?;
601       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
602       let res = Oper::new(create_post_like).perform(&conn)?;
603
604       chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
605     }
606     UserOperation::EditPost => {
607       let edit_post: EditPost = serde_json::from_str(data)?;
608       let res = Oper::new(edit_post).perform(&conn)?;
609
610       chat.post_sends(UserOperation::EditPost, res, msg.id)
611     }
612     UserOperation::SavePost => {
613       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
614     }
615     UserOperation::CreateComment => {
616       chat.check_rate_limit_message(msg.id)?;
617       let create_comment: CreateComment = serde_json::from_str(data)?;
618       let res = Oper::new(create_comment).perform(&conn)?;
619
620       chat.comment_sends(UserOperation::CreateComment, res, msg.id)
621     }
622     UserOperation::EditComment => {
623       let edit_comment: EditComment = serde_json::from_str(data)?;
624       let res = Oper::new(edit_comment).perform(&conn)?;
625
626       chat.comment_sends(UserOperation::EditComment, res, msg.id)
627     }
628     UserOperation::SaveComment => {
629       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
630     }
631     UserOperation::CreateCommentLike => {
632       chat.check_rate_limit_message(msg.id)?;
633       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
634       let res = Oper::new(create_comment_like).perform(&conn)?;
635
636       chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
637     }
638     UserOperation::GetModlog => {
639       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
640     }
641     UserOperation::CreateSite => {
642       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
643     }
644     UserOperation::EditSite => {
645       let edit_site: EditSite = serde_json::from_str(data)?;
646       let res = Oper::new(edit_site).perform(&conn)?;
647       let res_str = to_json_string(&user_operation, &res)?;
648       chat.send_all_message(&res_str, msg.id);
649       Ok(res_str)
650     }
651     UserOperation::GetSite => {
652       let get_site: GetSite = serde_json::from_str(data)?;
653       let mut res = Oper::new(get_site).perform(&conn)?;
654       res.online = chat.sessions.len();
655       to_json_string(&user_operation, &res)
656     }
657     UserOperation::Search => {
658       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
659     }
660     UserOperation::TransferCommunity => {
661       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
662     }
663     UserOperation::TransferSite => {
664       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
665     }
666     UserOperation::DeleteAccount => {
667       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
668     }
669     UserOperation::PasswordReset => {
670       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
671     }
672     UserOperation::PasswordChange => {
673       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
674     }
675     UserOperation::CreatePrivateMessage => {
676       chat.check_rate_limit_message(msg.id)?;
677       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
678       let recipient_id = create_private_message.recipient_id;
679       let res = Oper::new(create_private_message).perform(&conn)?;
680       let res_str = to_json_string(&user_operation, &res)?;
681
682       chat.send_user_room_message(recipient_id, &res_str, msg.id);
683       Ok(res_str)
684     }
685     UserOperation::EditPrivateMessage => {
686       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
687     }
688     UserOperation::GetPrivateMessages => {
689       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
690     }
691     UserOperation::UserJoin => {
692       let user_join: UserJoin = serde_json::from_str(data)?;
693       let res = Oper::new(user_join).perform(&conn)?;
694       chat.join_user_room(res.user_id, msg.id);
695       to_json_string(&user_operation, &res)
696     }
697   }
698 }