]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Starting to work on user message scope.
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
21 use crate::api::*;
22 use crate::websocket::UserOperation;
23 use crate::Settings;
24
25 type ConnectionId = usize;
26 type PostId = i32;
27 type CommunityId = i32;
28 type UserId = i32;
29 type IPAddr = String;
30
31 /// Chat server sends this messages to session
32 #[derive(Message)]
33 #[rtype(result = "()")]
34 pub struct WSMessage(pub String);
35
36 /// Message for chat server communications
37
38 /// New chat session is created
39 #[derive(Message)]
40 #[rtype(usize)]
41 pub struct Connect {
42   pub addr: Recipient<WSMessage>,
43   pub ip: IPAddr,
44 }
45
46 /// Session is disconnected
47 #[derive(Message)]
48 #[rtype(result = "()")]
49 pub struct Disconnect {
50   pub id: ConnectionId,
51   pub ip: IPAddr,
52 }
53
54 #[derive(Serialize, Deserialize, Message)]
55 #[rtype(String)]
56 pub struct StandardMessage {
57   /// Id of the client session
58   pub id: ConnectionId,
59   /// Peer message
60   pub msg: String,
61 }
62
63 #[derive(Debug)]
64 pub struct RateLimitBucket {
65   last_checked: SystemTime,
66   allowance: f64,
67 }
68
69 pub struct SessionInfo {
70   pub addr: Recipient<WSMessage>,
71   pub ip: IPAddr,
72 }
73
74 /// `ChatServer` manages chat rooms and responsible for coordinating chat
75 /// session.
76 pub struct ChatServer {
77   /// A map from generated random ID to session addr
78   sessions: HashMap<ConnectionId, SessionInfo>,
79
80   /// A map from post_id to set of connectionIDs
81   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
82
83   /// A map from community to set of connectionIDs
84   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
85
86   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
87   /// sessions (IE clients)
88   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
89
90   /// Rate limiting based on IP addr
91   rate_limits: HashMap<IPAddr, RateLimitBucket>,
92
93   rng: ThreadRng,
94   db: Pool<ConnectionManager<PgConnection>>,
95 }
96
97 // TODO show online users for communities too
98 // TODO GetPosts is the community / front page join.
99 // What is sent: New posts, post edits, post removes, post likes, community edits, community mod adds. Notifs for new posts?
100 // GetPost is the PostJoin, LeavePost is the leave
101 // What is sent: New comments, comment edits, comment likes
102 // UserJoin is the user join, a disconnect should remove you from all the scopes
103 impl ChatServer {
104   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
105     ChatServer {
106       sessions: HashMap::new(),
107       rate_limits: HashMap::new(),
108       post_rooms: HashMap::new(),
109       community_rooms: HashMap::new(),
110       user_rooms: HashMap::new(),
111       rng: rand::thread_rng(),
112       db,
113     }
114   }
115
116   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
117     // remove session from all rooms
118     for sessions in self.community_rooms.values_mut() {
119       sessions.remove(&id);
120     }
121
122     // If the room doesn't exist yet
123     if self.community_rooms.get_mut(&community_id).is_none() {
124       self.community_rooms.insert(community_id, HashSet::new());
125     }
126
127     self
128       .community_rooms
129       .get_mut(&community_id)
130       .unwrap()
131       .insert(id);
132   }
133
134   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
135     // remove session from all rooms
136     for sessions in self.post_rooms.values_mut() {
137       sessions.remove(&id);
138     }
139
140     // If the room doesn't exist yet
141     if self.post_rooms.get_mut(&post_id).is_none() {
142       self.post_rooms.insert(post_id, HashSet::new());
143     }
144
145     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
146   }
147
148   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
149     // remove session from all rooms
150     for sessions in self.user_rooms.values_mut() {
151       sessions.remove(&id);
152     }
153
154     // If the room doesn't exist yet
155     if self.user_rooms.get_mut(&user_id).is_none() {
156       self.user_rooms.insert(user_id, HashSet::new());
157     }
158
159     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
160   }
161
162   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
163     if let Some(sessions) = self.post_rooms.get(&post_id) {
164       for id in sessions {
165         if *id != skip_id {
166           if let Some(info) = self.sessions.get(id) {
167             let _ = info.addr.do_send(WSMessage(message.to_owned()));
168           }
169         }
170       }
171     }
172   }
173
174   fn send_community_room_message(
175     &self,
176     community_id: CommunityId,
177     message: &str,
178     skip_id: ConnectionId,
179   ) {
180     if let Some(sessions) = self.community_rooms.get(&community_id) {
181       for id in sessions {
182         if *id != skip_id {
183           if let Some(info) = self.sessions.get(id) {
184             let _ = info.addr.do_send(WSMessage(message.to_owned()));
185           }
186         }
187       }
188     }
189   }
190
191   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
192     if let Some(sessions) = self.user_rooms.get(&user_id) {
193       for id in sessions {
194         if *id != skip_id {
195           if let Some(info) = self.sessions.get(id) {
196             let _ = info.addr.do_send(WSMessage(message.to_owned()));
197           }
198         }
199       }
200     }
201   }
202
203   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
204     for id in self.sessions.keys() {
205       if *id != skip_id {
206         if let Some(info) = self.sessions.get(id) {
207           let _ = info.addr.do_send(WSMessage(message.to_owned()));
208         }
209       }
210     }
211   }
212
213   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
214     self.check_rate_limit_full(
215       id,
216       Settings::get().rate_limit.register,
217       Settings::get().rate_limit.register_per_second,
218     )
219   }
220
221   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
222     self.check_rate_limit_full(
223       id,
224       Settings::get().rate_limit.post,
225       Settings::get().rate_limit.post_per_second,
226     )
227   }
228
229   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
230     self.check_rate_limit_full(
231       id,
232       Settings::get().rate_limit.message,
233       Settings::get().rate_limit.message_per_second,
234     )
235   }
236
237   #[allow(clippy::float_cmp)]
238   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
239     if let Some(info) = self.sessions.get(&id) {
240       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
241         // The initial value
242         if rate_limit.allowance == -2f64 {
243           rate_limit.allowance = rate as f64;
244         };
245
246         let current = SystemTime::now();
247         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
248         rate_limit.last_checked = current;
249         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
250         if rate_limit.allowance > rate as f64 {
251           rate_limit.allowance = rate as f64;
252         }
253
254         if rate_limit.allowance < 1.0 {
255           println!(
256             "Rate limited IP: {}, time_passed: {}, allowance: {}",
257             &info.ip, time_passed, rate_limit.allowance
258           );
259           Err(
260             APIError {
261               message: format!("Too many requests. {} per {} seconds", rate, per),
262             }
263             .into(),
264           )
265         } else {
266           rate_limit.allowance -= 1.0;
267           Ok(())
268         }
269       } else {
270         Ok(())
271       }
272     } else {
273       Ok(())
274     }
275   }
276 }
277
278 /// Make actor from `ChatServer`
279 impl Actor for ChatServer {
280   /// We are going to use simple Context, we just need ability to communicate
281   /// with other actors.
282   type Context = Context<Self>;
283 }
284
285 /// Handler for Connect message.
286 ///
287 /// Register new session and assign unique id to this session
288 impl Handler<Connect> for ChatServer {
289   type Result = usize;
290
291   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
292     // register session with random id
293     let id = self.rng.gen::<usize>();
294     println!("{} joined", &msg.ip);
295
296     self.sessions.insert(
297       id,
298       SessionInfo {
299         addr: msg.addr,
300         ip: msg.ip.to_owned(),
301       },
302     );
303
304     if self.rate_limits.get(&msg.ip).is_none() {
305       self.rate_limits.insert(
306         msg.ip,
307         RateLimitBucket {
308           last_checked: SystemTime::now(),
309           allowance: -2f64,
310         },
311       );
312     }
313
314     id
315   }
316 }
317
318 /// Handler for Disconnect message.
319 impl Handler<Disconnect> for ChatServer {
320   type Result = ();
321
322   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
323     // Remove connections from sessions and all 3 scopes
324     if self.sessions.remove(&msg.id).is_some() {
325       for sessions in self.user_rooms.values_mut() {
326         sessions.remove(&msg.id);
327       }
328
329       for sessions in self.post_rooms.values_mut() {
330         sessions.remove(&msg.id);
331       }
332
333       for sessions in self.community_rooms.values_mut() {
334         sessions.remove(&msg.id);
335       }
336     }
337   }
338 }
339
340 /// Handler for Message message.
341 impl Handler<StandardMessage> for ChatServer {
342   type Result = MessageResult<StandardMessage>;
343
344   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
345     let msg_out = match parse_json_message(self, msg) {
346       Ok(m) => m,
347       Err(e) => e.to_string(),
348     };
349
350     println!("Message Sent: {}", msg_out);
351     MessageResult(msg_out)
352   }
353 }
354
355 #[derive(Serialize)]
356 struct WebsocketResponse<T> {
357   op: String,
358   data: T,
359 }
360
361 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
362 where
363   T: Serialize,
364 {
365   let response = WebsocketResponse {
366     op: op.to_string(),
367     data,
368   };
369   Ok(serde_json::to_string(&response)?)
370 }
371
372 fn do_user_operation<'a, Data, Response>(
373   op: UserOperation,
374   data: &str,
375   conn: &PooledConnection<ConnectionManager<PgConnection>>,
376 ) -> Result<String, Error>
377 where
378   for<'de> Data: Deserialize<'de> + 'a,
379   Response: Serialize,
380   Oper<Data>: Perform<Response>,
381 {
382   let parsed_data: Data = serde_json::from_str(data)?;
383   let res = Oper::new(parsed_data).perform(&conn)?;
384   to_json_string(&op, &res)
385 }
386
387 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
388   let json: Value = serde_json::from_str(&msg.msg)?;
389   let data = &json["data"].to_string();
390   let op = &json["op"].as_str().ok_or(APIError {
391     message: "Unknown op type".to_string(),
392   })?;
393
394   let conn = chat.db.get()?;
395
396   let user_operation: UserOperation = UserOperation::from_str(&op)?;
397
398   // TODO: none of the chat messages are going to work if stuff is submitted via http api,
399   //       need to move that handling elsewhere
400   match user_operation {
401     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
402     UserOperation::Register => {
403       chat.check_rate_limit_register(msg.id)?;
404       do_user_operation::<Register, LoginResponse>(user_operation, data, &conn)
405     }
406     UserOperation::GetUserDetails => {
407       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
408     }
409     UserOperation::SaveUserSettings => {
410       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
411     }
412     UserOperation::AddAdmin => {
413       let add_admin: AddAdmin = serde_json::from_str(data)?;
414       let res = Oper::new(add_admin).perform(&conn)?;
415       let res_str = to_json_string(&user_operation, &res)?;
416       chat.send_all_message(&res_str, msg.id);
417       Ok(res_str)
418     }
419     UserOperation::BanUser => {
420       let ban_user: BanUser = serde_json::from_str(data)?;
421       let res = Oper::new(ban_user).perform(&conn)?;
422       let res_str = to_json_string(&user_operation, &res)?;
423       chat.send_all_message(&res_str, msg.id);
424       Ok(res_str)
425     }
426     UserOperation::GetReplies => {
427       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
428     }
429     UserOperation::GetUserMentions => {
430       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
431     }
432     UserOperation::EditUserMention => {
433       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
434     }
435     UserOperation::MarkAllAsRead => {
436       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
437     }
438     UserOperation::GetCommunity => {
439       let get_community: GetCommunity = serde_json::from_str(data)?;
440       let mut res = Oper::new(get_community).perform(&conn)?;
441       let community_id = res.community.id;
442
443       chat.join_community_room(community_id, msg.id);
444
445       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
446         community_users.len()
447       } else {
448         0
449       };
450
451       to_json_string(&user_operation, &res)
452     }
453     UserOperation::ListCommunities => {
454       do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
455     }
456     UserOperation::CreateCommunity => {
457       chat.check_rate_limit_register(msg.id)?;
458       do_user_operation::<CreateCommunity, CommunityResponse>(user_operation, data, &conn)
459     }
460     UserOperation::EditCommunity => {
461       let edit_community: EditCommunity = serde_json::from_str(data)?;
462       let res = Oper::new(edit_community).perform(&conn)?;
463       let mut community_sent: CommunityResponse = res.clone();
464       community_sent.community.user_id = None;
465       community_sent.community.subscribed = None;
466       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
467       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
468       to_json_string(&user_operation, &res)
469     }
470     UserOperation::FollowCommunity => {
471       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
472     }
473     UserOperation::GetFollowedCommunities => do_user_operation::<
474       GetFollowedCommunities,
475       GetFollowedCommunitiesResponse,
476     >(user_operation, data, &conn),
477     UserOperation::BanFromCommunity => {
478       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
479       let community_id = ban_from_community.community_id;
480       let res = Oper::new(ban_from_community).perform(&conn)?;
481       let res_str = to_json_string(&user_operation, &res)?;
482       chat.send_community_room_message(community_id, &res_str, msg.id);
483       Ok(res_str)
484     }
485     UserOperation::AddModToCommunity => {
486       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
487       let community_id = mod_add_to_community.community_id;
488       let res = Oper::new(mod_add_to_community).perform(&conn)?;
489       let res_str = to_json_string(&user_operation, &res)?;
490       chat.send_community_room_message(community_id, &res_str, msg.id);
491       Ok(res_str)
492     }
493     UserOperation::ListCategories => {
494       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
495     }
496     UserOperation::CreatePost => {
497       chat.check_rate_limit_post(msg.id)?;
498       let create_post: CreatePost = serde_json::from_str(data)?;
499       let community_id = create_post.community_id;
500       let res = Oper::new(create_post).perform(&conn)?;
501       let res_str = to_json_string(&user_operation, &res)?;
502
503       // Don't send my data with it
504       let mut post_sent = res.clone();
505       post_sent.post.my_vote = None;
506       post_sent.post.user_id = None;
507       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
508
509       // Send it to /c/all and that community
510       chat.send_community_room_message(0, &post_sent_str, msg.id);
511       chat.send_community_room_message(community_id, &post_sent_str, msg.id);
512
513       Ok(res_str)
514     }
515     UserOperation::GetPost => {
516       let get_post: GetPost = serde_json::from_str(data)?;
517       let post_id = get_post.id;
518       chat.join_post_room(post_id, msg.id);
519       let mut res = Oper::new(get_post).perform(&conn)?;
520
521       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
522         post_users.len()
523       } else {
524         0
525       };
526
527       to_json_string(&user_operation, &res)
528     }
529     UserOperation::GetPosts => {
530       let get_posts: GetPosts = serde_json::from_str(data)?;
531       if get_posts.community_id.is_none() {
532         // 0 is the "all" community
533         chat.join_community_room(0, msg.id);
534       }
535       let res = Oper::new(get_posts).perform(&conn)?;
536       to_json_string(&user_operation, &res)
537     }
538     UserOperation::UserJoin => {
539       let user_join: UserJoin = serde_json::from_str(data)?;
540       let res = Oper::new(user_join).perform(&conn)?;
541       chat.join_user_room(res.user_id, msg.id);
542       to_json_string(&user_operation, &res)
543     }
544     UserOperation::CreatePostLike => {
545       chat.check_rate_limit_message(msg.id)?;
546       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
547       let res = Oper::new(create_post_like).perform(&conn)?;
548       let community_id = res.post.community_id;
549       let res_str = to_json_string(&user_operation, &res)?;
550
551       // Don't send my data with it
552       let mut post_sent = res.clone();
553       post_sent.post.my_vote = None;
554       post_sent.post.user_id = None;
555       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
556
557       // Send it to /c/all and that community
558       chat.send_community_room_message(0, &post_sent_str, msg.id);
559       chat.send_community_room_message(community_id, &post_sent_str, msg.id);
560
561       Ok(res_str)
562     }
563     UserOperation::EditPost => {
564       let edit_post: EditPost = serde_json::from_str(data)?;
565       let res = Oper::new(edit_post).perform(&conn)?;
566       let mut post_sent = res.clone();
567       post_sent.post.my_vote = None;
568       post_sent.post.user_id = None;
569       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
570
571       // Send it to /c/all and that community
572       chat.send_community_room_message(0, &post_sent_str, msg.id);
573       chat.send_community_room_message(post_sent.post.community_id, &post_sent_str, msg.id);
574       to_json_string(&user_operation, &res)
575     }
576     UserOperation::SavePost => {
577       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
578     }
579     UserOperation::CreateComment => {
580       chat.check_rate_limit_message(msg.id)?;
581       let create_comment: CreateComment = serde_json::from_str(data)?;
582       let post_id = create_comment.post_id;
583       let res = Oper::new(create_comment).perform(&conn)?;
584       let mut comment_sent = res.clone();
585       comment_sent.comment.my_vote = None;
586       comment_sent.comment.user_id = None;
587       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
588
589       // Send it to the post room
590       chat.send_post_room_message(post_id, &comment_sent_str, msg.id);
591
592       // Send it to the recipient(s) including the mentioned users
593       for recipient_id in comment_sent.recipient_ids {
594         chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id);
595       }
596
597       to_json_string(&user_operation, &res)
598     }
599     UserOperation::EditComment => {
600       let edit_comment: EditComment = serde_json::from_str(data)?;
601       let post_id = edit_comment.post_id;
602       let res = Oper::new(edit_comment).perform(&conn)?;
603       let mut comment_sent = res.clone();
604       comment_sent.comment.my_vote = None;
605       comment_sent.comment.user_id = None;
606       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
607
608       chat.send_post_room_message(post_id, &comment_sent_str, msg.id);
609
610       // Send it to the recipient(s) including the mentioned users
611       for recipient_id in comment_sent.recipient_ids {
612         chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id);
613       }
614
615       to_json_string(&user_operation, &res)
616     }
617     UserOperation::SaveComment => {
618       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
619     }
620     UserOperation::CreateCommentLike => {
621       chat.check_rate_limit_message(msg.id)?;
622       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
623       let post_id = create_comment_like.post_id;
624       let res = Oper::new(create_comment_like).perform(&conn)?;
625       let mut comment_sent = res.clone();
626       comment_sent.comment.my_vote = None;
627       comment_sent.comment.user_id = None;
628       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
629
630       chat.send_post_room_message(post_id, &comment_sent_str, msg.id);
631
632       // Send it to the recipient(s) including the mentioned users
633       for recipient_id in comment_sent.recipient_ids {
634         chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id);
635       }
636       to_json_string(&user_operation, &res)
637     }
638     UserOperation::GetModlog => {
639       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
640     }
641     UserOperation::CreateSite => {
642       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
643     }
644     UserOperation::EditSite => {
645       let edit_site: EditSite = serde_json::from_str(data)?;
646       let res = Oper::new(edit_site).perform(&conn)?;
647       let res_str = to_json_string(&user_operation, &res)?;
648       chat.send_all_message(&res_str, msg.id);
649       Ok(res_str)
650     }
651     UserOperation::GetSite => {
652       let get_site: GetSite = serde_json::from_str(data)?;
653       let mut res = Oper::new(get_site).perform(&conn)?;
654       res.online = chat.sessions.len();
655       to_json_string(&user_operation, &res)
656     }
657     UserOperation::Search => {
658       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
659     }
660     UserOperation::TransferCommunity => {
661       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
662     }
663     UserOperation::TransferSite => {
664       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
665     }
666     UserOperation::DeleteAccount => {
667       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
668     }
669     UserOperation::PasswordReset => {
670       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
671     }
672     UserOperation::PasswordChange => {
673       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
674     }
675     UserOperation::CreatePrivateMessage => {
676       chat.check_rate_limit_message(msg.id)?;
677       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
678       let recipient_id = create_private_message.recipient_id;
679       let res = Oper::new(create_private_message).perform(&conn)?;
680       let res_str = to_json_string(&user_operation, &res)?;
681
682       chat.send_user_room_message(recipient_id, &res_str, msg.id);
683       Ok(res_str)
684     }
685     UserOperation::EditPrivateMessage => {
686       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
687     }
688     UserOperation::GetPrivateMessages => {
689       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
690     }
691   }
692 }