]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Fixing rate limit checking to only ping after a success. Fixes #516
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15 use strum::IntoEnumIterator;
16
17 use crate::api::comment::*;
18 use crate::api::community::*;
19 use crate::api::post::*;
20 use crate::api::site::*;
21 use crate::api::user::*;
22 use crate::api::*;
23 use crate::websocket::UserOperation;
24 use crate::Settings;
25
26 type ConnectionId = usize;
27 type PostId = i32;
28 type CommunityId = i32;
29 type UserId = i32;
30 type IPAddr = String;
31
32 /// Chat server sends this messages to session
33 #[derive(Message)]
34 #[rtype(result = "()")]
35 pub struct WSMessage(pub String);
36
37 /// Message for chat server communications
38
39 /// New chat session is created
40 #[derive(Message)]
41 #[rtype(usize)]
42 pub struct Connect {
43   pub addr: Recipient<WSMessage>,
44   pub ip: IPAddr,
45 }
46
47 /// Session is disconnected
48 #[derive(Message)]
49 #[rtype(result = "()")]
50 pub struct Disconnect {
51   pub id: ConnectionId,
52   pub ip: IPAddr,
53 }
54
55 #[derive(Serialize, Deserialize, Message)]
56 #[rtype(String)]
57 pub struct StandardMessage {
58   /// Id of the client session
59   pub id: ConnectionId,
60   /// Peer message
61   pub msg: String,
62 }
63
64 #[derive(Debug)]
65 pub struct RateLimitBucket {
66   last_checked: SystemTime,
67   allowance: f64,
68 }
69
70 pub struct SessionInfo {
71   pub addr: Recipient<WSMessage>,
72   pub ip: IPAddr,
73 }
74
75 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
76 pub enum RateLimitType {
77   Message,
78   Register,
79   Post,
80 }
81
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session.
84 pub struct ChatServer {
85   /// A map from generated random ID to session addr
86   sessions: HashMap<ConnectionId, SessionInfo>,
87
88   /// A map from post_id to set of connectionIDs
89   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
90
91   /// A map from community to set of connectionIDs
92   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
93
94   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
95   /// sessions (IE clients)
96   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
97
98   /// Rate limiting based on rate type and IP addr
99   rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
100
101   rng: ThreadRng,
102   db: Pool<ConnectionManager<PgConnection>>,
103 }
104
105 impl ChatServer {
106   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
107     ChatServer {
108       sessions: HashMap::new(),
109       rate_limit_buckets: HashMap::new(),
110       post_rooms: HashMap::new(),
111       community_rooms: HashMap::new(),
112       user_rooms: HashMap::new(),
113       rng: rand::thread_rng(),
114       db,
115     }
116   }
117
118   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
119     // remove session from all rooms
120     for sessions in self.community_rooms.values_mut() {
121       sessions.remove(&id);
122     }
123
124     // If the room doesn't exist yet
125     if self.community_rooms.get_mut(&community_id).is_none() {
126       self.community_rooms.insert(community_id, HashSet::new());
127     }
128
129     self
130       .community_rooms
131       .get_mut(&community_id)
132       .unwrap()
133       .insert(id);
134   }
135
136   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
137     // remove session from all rooms
138     for sessions in self.post_rooms.values_mut() {
139       sessions.remove(&id);
140     }
141
142     // If the room doesn't exist yet
143     if self.post_rooms.get_mut(&post_id).is_none() {
144       self.post_rooms.insert(post_id, HashSet::new());
145     }
146
147     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
148   }
149
150   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
151     // remove session from all rooms
152     for sessions in self.user_rooms.values_mut() {
153       sessions.remove(&id);
154     }
155
156     // If the room doesn't exist yet
157     if self.user_rooms.get_mut(&user_id).is_none() {
158       self.user_rooms.insert(user_id, HashSet::new());
159     }
160
161     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
162   }
163
164   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
165     if let Some(sessions) = self.post_rooms.get(&post_id) {
166       for id in sessions {
167         if *id != skip_id {
168           if let Some(info) = self.sessions.get(id) {
169             let _ = info.addr.do_send(WSMessage(message.to_owned()));
170           }
171         }
172       }
173     }
174   }
175
176   fn send_community_room_message(
177     &self,
178     community_id: CommunityId,
179     message: &str,
180     skip_id: ConnectionId,
181   ) {
182     if let Some(sessions) = self.community_rooms.get(&community_id) {
183       for id in sessions {
184         if *id != skip_id {
185           if let Some(info) = self.sessions.get(id) {
186             let _ = info.addr.do_send(WSMessage(message.to_owned()));
187           }
188         }
189       }
190     }
191   }
192
193   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
194     if let Some(sessions) = self.user_rooms.get(&user_id) {
195       for id in sessions {
196         if *id != skip_id {
197           if let Some(info) = self.sessions.get(id) {
198             let _ = info.addr.do_send(WSMessage(message.to_owned()));
199           }
200         }
201       }
202     }
203   }
204
205   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
206     for id in self.sessions.keys() {
207       if *id != skip_id {
208         if let Some(info) = self.sessions.get(id) {
209           let _ = info.addr.do_send(WSMessage(message.to_owned()));
210         }
211       }
212     }
213   }
214
215   fn comment_sends(
216     &self,
217     user_operation: UserOperation,
218     comment: CommentResponse,
219     id: ConnectionId,
220   ) -> Result<String, Error> {
221     let mut comment_reply_sent = comment.clone();
222     comment_reply_sent.comment.my_vote = None;
223     comment_reply_sent.comment.user_id = None;
224
225     // For the post room ones, and the directs back to the user
226     // strip out the recipient_ids, so that
227     // users don't get double notifs
228     let mut comment_user_sent = comment.clone();
229     comment_user_sent.recipient_ids = Vec::new();
230
231     let mut comment_post_sent = comment_reply_sent.clone();
232     comment_post_sent.recipient_ids = Vec::new();
233
234     let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
235     let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
236     let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
237
238     // Send it to the post room
239     self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
240
241     // Send it to the recipient(s) including the mentioned users
242     for recipient_id in comment_reply_sent.recipient_ids {
243       self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
244     }
245
246     Ok(comment_user_sent_str)
247   }
248
249   fn post_sends(
250     &self,
251     user_operation: UserOperation,
252     post: PostResponse,
253     id: ConnectionId,
254   ) -> Result<String, Error> {
255     let community_id = post.post.community_id;
256
257     // Don't send my data with it
258     let mut post_sent = post.clone();
259     post_sent.post.my_vote = None;
260     post_sent.post.user_id = None;
261     let post_sent_str = to_json_string(&user_operation, &post_sent)?;
262
263     // Send it to /c/all and that community
264     self.send_community_room_message(0, &post_sent_str, id);
265     self.send_community_room_message(community_id, &post_sent_str, id);
266
267     to_json_string(&user_operation, post)
268   }
269
270   fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
271     self.check_rate_limit_full(
272       RateLimitType::Register,
273       id,
274       Settings::get().rate_limit.register,
275       Settings::get().rate_limit.register_per_second,
276       check_only,
277     )
278   }
279
280   fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
281     self.check_rate_limit_full(
282       RateLimitType::Post,
283       id,
284       Settings::get().rate_limit.post,
285       Settings::get().rate_limit.post_per_second,
286       check_only,
287     )
288   }
289
290   fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
291     self.check_rate_limit_full(
292       RateLimitType::Message,
293       id,
294       Settings::get().rate_limit.message,
295       Settings::get().rate_limit.message_per_second,
296       check_only,
297     )
298   }
299
300   #[allow(clippy::float_cmp)]
301   fn check_rate_limit_full(
302     &mut self,
303     type_: RateLimitType,
304     id: usize,
305     rate: i32,
306     per: i32,
307     check_only: bool,
308   ) -> Result<(), Error> {
309     if let Some(info) = self.sessions.get(&id) {
310       if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
311         if let Some(rate_limit) = bucket.get_mut(&info.ip) {
312           let current = SystemTime::now();
313           let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
314
315           // The initial value
316           if rate_limit.allowance == -2f64 {
317             rate_limit.allowance = rate as f64;
318           };
319
320           rate_limit.last_checked = current;
321           if !check_only {
322             rate_limit.allowance += time_passed * (rate as f64 / per as f64);
323             if rate_limit.allowance > rate as f64 {
324               rate_limit.allowance = rate as f64;
325             }
326           }
327
328           if rate_limit.allowance < 1.0 {
329             println!(
330               "Rate limited IP: {}, time_passed: {}, allowance: {}",
331               &info.ip, time_passed, rate_limit.allowance
332             );
333             Err(
334               APIError {
335                 message: format!("Too many requests. {} per {} seconds", rate, per),
336               }
337               .into(),
338             )
339           } else {
340             if !check_only {
341               rate_limit.allowance -= 1.0;
342             }
343             Ok(())
344           }
345         } else {
346           Ok(())
347         }
348       } else {
349         Ok(())
350       }
351     } else {
352       Ok(())
353     }
354   }
355 }
356
357 /// Make actor from `ChatServer`
358 impl Actor for ChatServer {
359   /// We are going to use simple Context, we just need ability to communicate
360   /// with other actors.
361   type Context = Context<Self>;
362 }
363
364 /// Handler for Connect message.
365 ///
366 /// Register new session and assign unique id to this session
367 impl Handler<Connect> for ChatServer {
368   type Result = usize;
369
370   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
371     // register session with random id
372     let id = self.rng.gen::<usize>();
373     println!("{} joined", &msg.ip);
374
375     self.sessions.insert(
376       id,
377       SessionInfo {
378         addr: msg.addr,
379         ip: msg.ip.to_owned(),
380       },
381     );
382
383     for rate_limit_type in RateLimitType::iter() {
384       if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
385         self
386           .rate_limit_buckets
387           .insert(rate_limit_type, HashMap::new());
388       }
389
390       if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
391         if bucket.get(&msg.ip).is_none() {
392           bucket.insert(
393             msg.ip.to_owned(),
394             RateLimitBucket {
395               last_checked: SystemTime::now(),
396               allowance: -2f64,
397             },
398           );
399         }
400       }
401     }
402
403     id
404   }
405 }
406
407 /// Handler for Disconnect message.
408 impl Handler<Disconnect> for ChatServer {
409   type Result = ();
410
411   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
412     // Remove connections from sessions and all 3 scopes
413     if self.sessions.remove(&msg.id).is_some() {
414       for sessions in self.user_rooms.values_mut() {
415         sessions.remove(&msg.id);
416       }
417
418       for sessions in self.post_rooms.values_mut() {
419         sessions.remove(&msg.id);
420       }
421
422       for sessions in self.community_rooms.values_mut() {
423         sessions.remove(&msg.id);
424       }
425     }
426   }
427 }
428
429 /// Handler for Message message.
430 impl Handler<StandardMessage> for ChatServer {
431   type Result = MessageResult<StandardMessage>;
432
433   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
434     let msg_out = match parse_json_message(self, msg) {
435       Ok(m) => m,
436       Err(e) => e.to_string(),
437     };
438
439     println!("Message Sent: {}", msg_out);
440     MessageResult(msg_out)
441   }
442 }
443
444 #[derive(Serialize)]
445 struct WebsocketResponse<T> {
446   op: String,
447   data: T,
448 }
449
450 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
451 where
452   T: Serialize,
453 {
454   let response = WebsocketResponse {
455     op: op.to_string(),
456     data,
457   };
458   Ok(serde_json::to_string(&response)?)
459 }
460
461 fn do_user_operation<'a, Data, Response>(
462   op: UserOperation,
463   data: &str,
464   conn: &PooledConnection<ConnectionManager<PgConnection>>,
465 ) -> Result<String, Error>
466 where
467   for<'de> Data: Deserialize<'de> + 'a,
468   Response: Serialize,
469   Oper<Data>: Perform<Response>,
470 {
471   let parsed_data: Data = serde_json::from_str(data)?;
472   let res = Oper::new(parsed_data).perform(&conn)?;
473   to_json_string(&op, &res)
474 }
475
476 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
477   let json: Value = serde_json::from_str(&msg.msg)?;
478   let data = &json["data"].to_string();
479   let op = &json["op"].as_str().ok_or(APIError {
480     message: "Unknown op type".to_string(),
481   })?;
482
483   let conn = chat.db.get()?;
484
485   let user_operation: UserOperation = UserOperation::from_str(&op)?;
486
487   // TODO: none of the chat messages are going to work if stuff is submitted via http api,
488   //       need to move that handling elsewhere
489
490   // A DDOS check
491   chat.check_rate_limit_message(msg.id, false)?;
492
493   match user_operation {
494     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
495     UserOperation::Register => {
496       chat.check_rate_limit_register(msg.id, true)?;
497       let register: Register = serde_json::from_str(data)?;
498       let res = Oper::new(register).perform(&conn)?;
499       chat.check_rate_limit_register(msg.id, false)?;
500       to_json_string(&user_operation, &res)
501     }
502     UserOperation::GetUserDetails => {
503       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
504     }
505     UserOperation::SaveUserSettings => {
506       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
507     }
508     UserOperation::AddAdmin => {
509       let add_admin: AddAdmin = serde_json::from_str(data)?;
510       let res = Oper::new(add_admin).perform(&conn)?;
511       let res_str = to_json_string(&user_operation, &res)?;
512       chat.send_all_message(&res_str, msg.id);
513       Ok(res_str)
514     }
515     UserOperation::BanUser => {
516       let ban_user: BanUser = serde_json::from_str(data)?;
517       let res = Oper::new(ban_user).perform(&conn)?;
518       let res_str = to_json_string(&user_operation, &res)?;
519       chat.send_all_message(&res_str, msg.id);
520       Ok(res_str)
521     }
522     UserOperation::GetReplies => {
523       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
524     }
525     UserOperation::GetUserMentions => {
526       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
527     }
528     UserOperation::EditUserMention => {
529       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
530     }
531     UserOperation::MarkAllAsRead => {
532       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
533     }
534     UserOperation::GetCommunity => {
535       let get_community: GetCommunity = serde_json::from_str(data)?;
536       let mut res = Oper::new(get_community).perform(&conn)?;
537       let community_id = res.community.id;
538
539       chat.join_community_room(community_id, msg.id);
540
541       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
542         community_users.len()
543       } else {
544         0
545       };
546
547       to_json_string(&user_operation, &res)
548     }
549     UserOperation::ListCommunities => {
550       do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
551     }
552     UserOperation::CreateCommunity => {
553       chat.check_rate_limit_register(msg.id, true)?;
554       let create_community: CreateCommunity = serde_json::from_str(data)?;
555       let res = Oper::new(create_community).perform(&conn)?;
556       chat.check_rate_limit_register(msg.id, false)?;
557       to_json_string(&user_operation, &res)
558     }
559     UserOperation::EditCommunity => {
560       let edit_community: EditCommunity = serde_json::from_str(data)?;
561       let res = Oper::new(edit_community).perform(&conn)?;
562       let mut community_sent: CommunityResponse = res.clone();
563       community_sent.community.user_id = None;
564       community_sent.community.subscribed = None;
565       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
566       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
567       to_json_string(&user_operation, &res)
568     }
569     UserOperation::FollowCommunity => {
570       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
571     }
572     UserOperation::GetFollowedCommunities => do_user_operation::<
573       GetFollowedCommunities,
574       GetFollowedCommunitiesResponse,
575     >(user_operation, data, &conn),
576     UserOperation::BanFromCommunity => {
577       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
578       let community_id = ban_from_community.community_id;
579       let res = Oper::new(ban_from_community).perform(&conn)?;
580       let res_str = to_json_string(&user_operation, &res)?;
581       chat.send_community_room_message(community_id, &res_str, msg.id);
582       Ok(res_str)
583     }
584     UserOperation::AddModToCommunity => {
585       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
586       let community_id = mod_add_to_community.community_id;
587       let res = Oper::new(mod_add_to_community).perform(&conn)?;
588       let res_str = to_json_string(&user_operation, &res)?;
589       chat.send_community_room_message(community_id, &res_str, msg.id);
590       Ok(res_str)
591     }
592     UserOperation::ListCategories => {
593       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
594     }
595     UserOperation::GetPost => {
596       let get_post: GetPost = serde_json::from_str(data)?;
597       let post_id = get_post.id;
598       chat.join_post_room(post_id, msg.id);
599       let mut res = Oper::new(get_post).perform(&conn)?;
600
601       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
602         post_users.len()
603       } else {
604         0
605       };
606
607       to_json_string(&user_operation, &res)
608     }
609     UserOperation::GetPosts => {
610       let get_posts: GetPosts = serde_json::from_str(data)?;
611       if get_posts.community_id.is_none() {
612         // 0 is the "all" community
613         chat.join_community_room(0, msg.id);
614       }
615       let res = Oper::new(get_posts).perform(&conn)?;
616       to_json_string(&user_operation, &res)
617     }
618     UserOperation::CreatePost => {
619       chat.check_rate_limit_post(msg.id, true)?;
620       let create_post: CreatePost = serde_json::from_str(data)?;
621       let res = Oper::new(create_post).perform(&conn)?;
622       chat.check_rate_limit_post(msg.id, false)?;
623
624       chat.post_sends(UserOperation::CreatePost, res, msg.id)
625     }
626     UserOperation::CreatePostLike => {
627       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
628       let res = Oper::new(create_post_like).perform(&conn)?;
629
630       chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
631     }
632     UserOperation::EditPost => {
633       let edit_post: EditPost = serde_json::from_str(data)?;
634       let res = Oper::new(edit_post).perform(&conn)?;
635
636       chat.post_sends(UserOperation::EditPost, res, msg.id)
637     }
638     UserOperation::SavePost => {
639       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
640     }
641     UserOperation::CreateComment => {
642       let create_comment: CreateComment = serde_json::from_str(data)?;
643       let res = Oper::new(create_comment).perform(&conn)?;
644
645       chat.comment_sends(UserOperation::CreateComment, res, msg.id)
646     }
647     UserOperation::EditComment => {
648       let edit_comment: EditComment = serde_json::from_str(data)?;
649       let res = Oper::new(edit_comment).perform(&conn)?;
650
651       chat.comment_sends(UserOperation::EditComment, res, msg.id)
652     }
653     UserOperation::SaveComment => {
654       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
655     }
656     UserOperation::CreateCommentLike => {
657       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
658       let res = Oper::new(create_comment_like).perform(&conn)?;
659
660       chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
661     }
662     UserOperation::GetModlog => {
663       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
664     }
665     UserOperation::CreateSite => {
666       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
667     }
668     UserOperation::EditSite => {
669       let edit_site: EditSite = serde_json::from_str(data)?;
670       let res = Oper::new(edit_site).perform(&conn)?;
671       let res_str = to_json_string(&user_operation, &res)?;
672       chat.send_all_message(&res_str, msg.id);
673       Ok(res_str)
674     }
675     UserOperation::GetSite => {
676       let get_site: GetSite = serde_json::from_str(data)?;
677       let mut res = Oper::new(get_site).perform(&conn)?;
678       res.online = chat.sessions.len();
679       to_json_string(&user_operation, &res)
680     }
681     UserOperation::Search => {
682       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
683     }
684     UserOperation::TransferCommunity => {
685       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
686     }
687     UserOperation::TransferSite => {
688       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
689     }
690     UserOperation::DeleteAccount => {
691       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
692     }
693     UserOperation::PasswordReset => {
694       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
695     }
696     UserOperation::PasswordChange => {
697       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
698     }
699     UserOperation::CreatePrivateMessage => {
700       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
701       let recipient_id = create_private_message.recipient_id;
702       let res = Oper::new(create_private_message).perform(&conn)?;
703       let res_str = to_json_string(&user_operation, &res)?;
704
705       chat.send_user_room_message(recipient_id, &res_str, msg.id);
706       Ok(res_str)
707     }
708     UserOperation::EditPrivateMessage => {
709       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
710     }
711     UserOperation::GetPrivateMessages => {
712       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
713     }
714     UserOperation::UserJoin => {
715       let user_join: UserJoin = serde_json::from_str(data)?;
716       let res = Oper::new(user_join).perform(&conn)?;
717       chat.join_user_room(res.user_id, msg.id);
718       to_json_string(&user_operation, &res)
719     }
720   }
721 }