]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'dev' into websocket_scopes
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
21 use crate::api::*;
22 use crate::websocket::UserOperation;
23 use crate::Settings;
24
25 type ConnectionId = usize;
26 type PostId = i32;
27 type CommunityId = i32;
28 type UserId = i32;
29 type IPAddr = String;
30
31 /// Chat server sends this messages to session
32 #[derive(Message)]
33 #[rtype(result = "()")]
34 pub struct WSMessage(pub String);
35
36 /// Message for chat server communications
37
38 /// New chat session is created
39 #[derive(Message)]
40 #[rtype(usize)]
41 pub struct Connect {
42   pub addr: Recipient<WSMessage>,
43   pub ip: IPAddr,
44 }
45
46 /// Session is disconnected
47 #[derive(Message)]
48 #[rtype(result = "()")]
49 pub struct Disconnect {
50   pub id: ConnectionId,
51   pub ip: IPAddr,
52 }
53
54 #[derive(Serialize, Deserialize, Message)]
55 #[rtype(String)]
56 pub struct StandardMessage {
57   /// Id of the client session
58   pub id: ConnectionId,
59   /// Peer message
60   pub msg: String,
61 }
62
63 #[derive(Debug)]
64 pub struct RateLimitBucket {
65   last_checked: SystemTime,
66   allowance: f64,
67 }
68
69 pub struct SessionInfo {
70   pub addr: Recipient<WSMessage>,
71   pub ip: IPAddr,
72 }
73
74 /// `ChatServer` manages chat rooms and responsible for coordinating chat
75 /// session.
76 pub struct ChatServer {
77   /// A map from generated random ID to session addr
78   sessions: HashMap<ConnectionId, SessionInfo>,
79
80   /// A map from post_id to set of connectionIDs
81   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
82
83   /// A map from community to set of connectionIDs
84   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
85
86   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
87   /// sessions (IE clients)
88   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
89
90   /// Rate limiting based on IP addr
91   rate_limits: HashMap<IPAddr, RateLimitBucket>,
92
93   rng: ThreadRng,
94   db: Pool<ConnectionManager<PgConnection>>,
95 }
96
97 impl ChatServer {
98   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
99     ChatServer {
100       sessions: HashMap::new(),
101       rate_limits: HashMap::new(),
102       post_rooms: HashMap::new(),
103       community_rooms: HashMap::new(),
104       user_rooms: HashMap::new(),
105       rng: rand::thread_rng(),
106       db,
107     }
108   }
109
110   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
111     // remove session from all rooms
112     for sessions in self.community_rooms.values_mut() {
113       sessions.remove(&id);
114     }
115
116     // If the room doesn't exist yet
117     if self.community_rooms.get_mut(&community_id).is_none() {
118       self.community_rooms.insert(community_id, HashSet::new());
119     }
120
121     self
122       .community_rooms
123       .get_mut(&community_id)
124       .unwrap()
125       .insert(id);
126   }
127
128   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
129     // remove session from all rooms
130     for sessions in self.post_rooms.values_mut() {
131       sessions.remove(&id);
132     }
133
134     // If the room doesn't exist yet
135     if self.post_rooms.get_mut(&post_id).is_none() {
136       self.post_rooms.insert(post_id, HashSet::new());
137     }
138
139     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
140   }
141
142   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
143     // remove session from all rooms
144     for sessions in self.user_rooms.values_mut() {
145       sessions.remove(&id);
146     }
147
148     // If the room doesn't exist yet
149     if self.user_rooms.get_mut(&user_id).is_none() {
150       self.user_rooms.insert(user_id, HashSet::new());
151     }
152
153     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
154   }
155
156   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
157     if let Some(sessions) = self.post_rooms.get(&post_id) {
158       for id in sessions {
159         if *id != skip_id {
160           if let Some(info) = self.sessions.get(id) {
161             let _ = info.addr.do_send(WSMessage(message.to_owned()));
162           }
163         }
164       }
165     }
166   }
167
168   fn send_community_room_message(
169     &self,
170     community_id: CommunityId,
171     message: &str,
172     skip_id: ConnectionId,
173   ) {
174     if let Some(sessions) = self.community_rooms.get(&community_id) {
175       for id in sessions {
176         if *id != skip_id {
177           if let Some(info) = self.sessions.get(id) {
178             let _ = info.addr.do_send(WSMessage(message.to_owned()));
179           }
180         }
181       }
182     }
183   }
184
185   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
186     if let Some(sessions) = self.user_rooms.get(&user_id) {
187       for id in sessions {
188         if *id != skip_id {
189           if let Some(info) = self.sessions.get(id) {
190             let _ = info.addr.do_send(WSMessage(message.to_owned()));
191           }
192         }
193       }
194     }
195   }
196
197   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
198     for id in self.sessions.keys() {
199       if *id != skip_id {
200         if let Some(info) = self.sessions.get(id) {
201           let _ = info.addr.do_send(WSMessage(message.to_owned()));
202         }
203       }
204     }
205   }
206
207   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
208     self.check_rate_limit_full(
209       id,
210       Settings::get().rate_limit.register,
211       Settings::get().rate_limit.register_per_second,
212     )
213   }
214
215   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
216     self.check_rate_limit_full(
217       id,
218       Settings::get().rate_limit.post,
219       Settings::get().rate_limit.post_per_second,
220     )
221   }
222
223   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
224     self.check_rate_limit_full(
225       id,
226       Settings::get().rate_limit.message,
227       Settings::get().rate_limit.message_per_second,
228     )
229   }
230
231   #[allow(clippy::float_cmp)]
232   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
233     if let Some(info) = self.sessions.get(&id) {
234       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
235         // The initial value
236         if rate_limit.allowance == -2f64 {
237           rate_limit.allowance = rate as f64;
238         };
239
240         let current = SystemTime::now();
241         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
242         rate_limit.last_checked = current;
243         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
244         if rate_limit.allowance > rate as f64 {
245           rate_limit.allowance = rate as f64;
246         }
247
248         if rate_limit.allowance < 1.0 {
249           println!(
250             "Rate limited IP: {}, time_passed: {}, allowance: {}",
251             &info.ip, time_passed, rate_limit.allowance
252           );
253           Err(
254             APIError {
255               message: format!("Too many requests. {} per {} seconds", rate, per),
256             }
257             .into(),
258           )
259         } else {
260           rate_limit.allowance -= 1.0;
261           Ok(())
262         }
263       } else {
264         Ok(())
265       }
266     } else {
267       Ok(())
268     }
269   }
270 }
271
272 /// Make actor from `ChatServer`
273 impl Actor for ChatServer {
274   /// We are going to use simple Context, we just need ability to communicate
275   /// with other actors.
276   type Context = Context<Self>;
277 }
278
279 /// Handler for Connect message.
280 ///
281 /// Register new session and assign unique id to this session
282 impl Handler<Connect> for ChatServer {
283   type Result = usize;
284
285   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
286     // register session with random id
287     let id = self.rng.gen::<usize>();
288     println!("{} joined", &msg.ip);
289
290     self.sessions.insert(
291       id,
292       SessionInfo {
293         addr: msg.addr,
294         ip: msg.ip.to_owned(),
295       },
296     );
297
298     if self.rate_limits.get(&msg.ip).is_none() {
299       self.rate_limits.insert(
300         msg.ip,
301         RateLimitBucket {
302           last_checked: SystemTime::now(),
303           allowance: -2f64,
304         },
305       );
306     }
307
308     id
309   }
310 }
311
312 /// Handler for Disconnect message.
313 impl Handler<Disconnect> for ChatServer {
314   type Result = ();
315
316   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
317     // Remove connections from sessions and all 3 scopes
318     if self.sessions.remove(&msg.id).is_some() {
319       for sessions in self.user_rooms.values_mut() {
320         sessions.remove(&msg.id);
321       }
322
323       for sessions in self.post_rooms.values_mut() {
324         sessions.remove(&msg.id);
325       }
326
327       for sessions in self.community_rooms.values_mut() {
328         sessions.remove(&msg.id);
329       }
330     }
331   }
332 }
333
334 /// Handler for Message message.
335 impl Handler<StandardMessage> for ChatServer {
336   type Result = MessageResult<StandardMessage>;
337
338   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
339     let msg_out = match parse_json_message(self, msg) {
340       Ok(m) => m,
341       Err(e) => e.to_string(),
342     };
343
344     println!("Message Sent: {}", msg_out);
345     MessageResult(msg_out)
346   }
347 }
348
349 #[derive(Serialize)]
350 struct WebsocketResponse<T> {
351   op: String,
352   data: T,
353 }
354
355 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
356 where
357   T: Serialize,
358 {
359   let response = WebsocketResponse {
360     op: op.to_string(),
361     data,
362   };
363   Ok(serde_json::to_string(&response)?)
364 }
365
366 fn do_user_operation<'a, Data, Response>(
367   op: UserOperation,
368   data: &str,
369   conn: &PooledConnection<ConnectionManager<PgConnection>>,
370 ) -> Result<String, Error>
371 where
372   for<'de> Data: Deserialize<'de> + 'a,
373   Response: Serialize,
374   Oper<Data>: Perform<Response>,
375 {
376   let parsed_data: Data = serde_json::from_str(data)?;
377   let res = Oper::new(parsed_data).perform(&conn)?;
378   to_json_string(&op, &res)
379 }
380
381 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
382   let json: Value = serde_json::from_str(&msg.msg)?;
383   let data = &json["data"].to_string();
384   let op = &json["op"].as_str().ok_or(APIError {
385     message: "Unknown op type".to_string(),
386   })?;
387
388   let conn = chat.db.get()?;
389
390   let user_operation: UserOperation = UserOperation::from_str(&op)?;
391
392   // TODO: none of the chat messages are going to work if stuff is submitted via http api,
393   //       need to move that handling elsewhere
394   match user_operation {
395     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
396     UserOperation::Register => {
397       chat.check_rate_limit_register(msg.id)?;
398       do_user_operation::<Register, LoginResponse>(user_operation, data, &conn)
399     }
400     UserOperation::GetUserDetails => {
401       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
402     }
403     UserOperation::SaveUserSettings => {
404       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
405     }
406     UserOperation::AddAdmin => {
407       let add_admin: AddAdmin = serde_json::from_str(data)?;
408       let res = Oper::new(add_admin).perform(&conn)?;
409       let res_str = to_json_string(&user_operation, &res)?;
410       chat.send_all_message(&res_str, msg.id);
411       Ok(res_str)
412     }
413     UserOperation::BanUser => {
414       let ban_user: BanUser = serde_json::from_str(data)?;
415       let res = Oper::new(ban_user).perform(&conn)?;
416       let res_str = to_json_string(&user_operation, &res)?;
417       chat.send_all_message(&res_str, msg.id);
418       Ok(res_str)
419     }
420     UserOperation::GetReplies => {
421       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
422     }
423     UserOperation::GetUserMentions => {
424       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
425     }
426     UserOperation::EditUserMention => {
427       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
428     }
429     UserOperation::MarkAllAsRead => {
430       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
431     }
432     UserOperation::GetCommunity => {
433       let get_community: GetCommunity = serde_json::from_str(data)?;
434       let mut res = Oper::new(get_community).perform(&conn)?;
435       let community_id = res.community.id;
436
437       chat.join_community_room(community_id, msg.id);
438
439       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
440         community_users.len()
441       } else {
442         0
443       };
444
445       to_json_string(&user_operation, &res)
446     }
447     UserOperation::ListCommunities => {
448       do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
449     }
450     UserOperation::CreateCommunity => {
451       chat.check_rate_limit_register(msg.id)?;
452       do_user_operation::<CreateCommunity, CommunityResponse>(user_operation, data, &conn)
453     }
454     UserOperation::EditCommunity => {
455       let edit_community: EditCommunity = serde_json::from_str(data)?;
456       let res = Oper::new(edit_community).perform(&conn)?;
457       let mut community_sent: CommunityResponse = res.clone();
458       community_sent.community.user_id = None;
459       community_sent.community.subscribed = None;
460       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
461       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
462       to_json_string(&user_operation, &res)
463     }
464     UserOperation::FollowCommunity => {
465       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
466     }
467     UserOperation::GetFollowedCommunities => do_user_operation::<
468       GetFollowedCommunities,
469       GetFollowedCommunitiesResponse,
470     >(user_operation, data, &conn),
471     UserOperation::BanFromCommunity => {
472       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
473       let community_id = ban_from_community.community_id;
474       let res = Oper::new(ban_from_community).perform(&conn)?;
475       let res_str = to_json_string(&user_operation, &res)?;
476       chat.send_community_room_message(community_id, &res_str, msg.id);
477       Ok(res_str)
478     }
479     UserOperation::AddModToCommunity => {
480       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
481       let community_id = mod_add_to_community.community_id;
482       let res = Oper::new(mod_add_to_community).perform(&conn)?;
483       let res_str = to_json_string(&user_operation, &res)?;
484       chat.send_community_room_message(community_id, &res_str, msg.id);
485       Ok(res_str)
486     }
487     UserOperation::ListCategories => {
488       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
489     }
490     UserOperation::CreatePost => {
491       chat.check_rate_limit_post(msg.id)?;
492       let create_post: CreatePost = serde_json::from_str(data)?;
493       let community_id = create_post.community_id;
494       let res = Oper::new(create_post).perform(&conn)?;
495       let res_str = to_json_string(&user_operation, &res)?;
496
497       // Don't send my data with it
498       let mut post_sent = res;
499       post_sent.post.my_vote = None;
500       post_sent.post.user_id = None;
501       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
502
503       // Send it to /c/all and that community
504       chat.send_community_room_message(0, &post_sent_str, msg.id);
505       chat.send_community_room_message(community_id, &post_sent_str, msg.id);
506
507       Ok(res_str)
508     }
509     UserOperation::GetPost => {
510       let get_post: GetPost = serde_json::from_str(data)?;
511       let post_id = get_post.id;
512       chat.join_post_room(post_id, msg.id);
513       let mut res = Oper::new(get_post).perform(&conn)?;
514
515       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
516         post_users.len()
517       } else {
518         0
519       };
520
521       to_json_string(&user_operation, &res)
522     }
523     UserOperation::GetPosts => {
524       let get_posts: GetPosts = serde_json::from_str(data)?;
525       if get_posts.community_id.is_none() {
526         // 0 is the "all" community
527         chat.join_community_room(0, msg.id);
528       }
529       let res = Oper::new(get_posts).perform(&conn)?;
530       to_json_string(&user_operation, &res)
531     }
532     UserOperation::UserJoin => {
533       let user_join: UserJoin = serde_json::from_str(data)?;
534       let res = Oper::new(user_join).perform(&conn)?;
535       chat.join_user_room(res.user_id, msg.id);
536       to_json_string(&user_operation, &res)
537     }
538     UserOperation::CreatePostLike => {
539       chat.check_rate_limit_message(msg.id)?;
540       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
541       let res = Oper::new(create_post_like).perform(&conn)?;
542       let community_id = res.post.community_id;
543       let res_str = to_json_string(&user_operation, &res)?;
544
545       // Don't send my data with it
546       let mut post_sent = res;
547       post_sent.post.my_vote = None;
548       post_sent.post.user_id = None;
549       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
550
551       // Send it to /c/all and that community
552       chat.send_community_room_message(0, &post_sent_str, msg.id);
553       chat.send_community_room_message(community_id, &post_sent_str, msg.id);
554
555       Ok(res_str)
556     }
557     UserOperation::EditPost => {
558       let edit_post: EditPost = serde_json::from_str(data)?;
559       let res = Oper::new(edit_post).perform(&conn)?;
560       let mut post_sent = res.clone();
561       post_sent.post.my_vote = None;
562       post_sent.post.user_id = None;
563       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
564
565       // Send it to /c/all and that community
566       chat.send_community_room_message(0, &post_sent_str, msg.id);
567       chat.send_community_room_message(post_sent.post.community_id, &post_sent_str, msg.id);
568       to_json_string(&user_operation, &res)
569     }
570     UserOperation::SavePost => {
571       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
572     }
573     UserOperation::CreateComment => {
574       chat.check_rate_limit_message(msg.id)?;
575       let create_comment: CreateComment = serde_json::from_str(data)?;
576       let post_id = create_comment.post_id;
577       let res = Oper::new(create_comment).perform(&conn)?;
578       let mut comment_sent = res.clone();
579       comment_sent.comment.my_vote = None;
580       comment_sent.comment.user_id = None;
581       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
582
583       // Send it to the post room
584       chat.send_post_room_message(post_id, &comment_sent_str, msg.id);
585
586       // Send it to the recipient(s) including the mentioned users
587       for recipient_id in comment_sent.recipient_ids {
588         chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id);
589       }
590
591       to_json_string(&user_operation, &res)
592     }
593     UserOperation::EditComment => {
594       let edit_comment: EditComment = serde_json::from_str(data)?;
595       let post_id = edit_comment.post_id;
596       let res = Oper::new(edit_comment).perform(&conn)?;
597       let mut comment_sent = res.clone();
598       comment_sent.comment.my_vote = None;
599       comment_sent.comment.user_id = None;
600       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
601
602       chat.send_post_room_message(post_id, &comment_sent_str, msg.id);
603
604       // Send it to the recipient(s) including the mentioned users
605       for recipient_id in comment_sent.recipient_ids {
606         chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id);
607       }
608
609       to_json_string(&user_operation, &res)
610     }
611     UserOperation::SaveComment => {
612       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
613     }
614     UserOperation::CreateCommentLike => {
615       chat.check_rate_limit_message(msg.id)?;
616       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
617       let post_id = create_comment_like.post_id;
618       let res = Oper::new(create_comment_like).perform(&conn)?;
619       let mut comment_sent = res.clone();
620       comment_sent.comment.my_vote = None;
621       comment_sent.comment.user_id = None;
622       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
623
624       chat.send_post_room_message(post_id, &comment_sent_str, msg.id);
625
626       // Send it to the recipient(s) including the mentioned users
627       for recipient_id in comment_sent.recipient_ids {
628         chat.send_user_room_message(recipient_id, &comment_sent_str, msg.id);
629       }
630       to_json_string(&user_operation, &res)
631     }
632     UserOperation::GetModlog => {
633       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
634     }
635     UserOperation::CreateSite => {
636       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
637     }
638     UserOperation::EditSite => {
639       let edit_site: EditSite = serde_json::from_str(data)?;
640       let res = Oper::new(edit_site).perform(&conn)?;
641       let res_str = to_json_string(&user_operation, &res)?;
642       chat.send_all_message(&res_str, msg.id);
643       Ok(res_str)
644     }
645     UserOperation::GetSite => {
646       let get_site: GetSite = serde_json::from_str(data)?;
647       let mut res = Oper::new(get_site).perform(&conn)?;
648       res.online = chat.sessions.len();
649       to_json_string(&user_operation, &res)
650     }
651     UserOperation::Search => {
652       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
653     }
654     UserOperation::TransferCommunity => {
655       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
656     }
657     UserOperation::TransferSite => {
658       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
659     }
660     UserOperation::DeleteAccount => {
661       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
662     }
663     UserOperation::PasswordReset => {
664       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
665     }
666     UserOperation::PasswordChange => {
667       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
668     }
669     UserOperation::CreatePrivateMessage => {
670       chat.check_rate_limit_message(msg.id)?;
671       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
672       let recipient_id = create_private_message.recipient_id;
673       let res = Oper::new(create_private_message).perform(&conn)?;
674       let res_str = to_json_string(&user_operation, &res)?;
675
676       chat.send_user_room_message(recipient_id, &res_str, msg.id);
677       Ok(res_str)
678     }
679     UserOperation::EditPrivateMessage => {
680       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
681     }
682     UserOperation::GetPrivateMessages => {
683       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
684     }
685   }
686 }