]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'dev' into federation
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15 use strum::IntoEnumIterator;
16
17 use crate::api::comment::*;
18 use crate::api::community::*;
19 use crate::api::post::*;
20 use crate::api::site::*;
21 use crate::api::user::*;
22 use crate::api::*;
23 use crate::apub::puller::*;
24 use crate::websocket::UserOperation;
25 use crate::Settings;
26
27 type ConnectionId = usize;
28 type PostId = i32;
29 type CommunityId = i32;
30 type UserId = i32;
31 type IPAddr = String;
32
33 /// Chat server sends this messages to session
34 #[derive(Message)]
35 #[rtype(result = "()")]
36 pub struct WSMessage(pub String);
37
38 /// Message for chat server communications
39
40 /// New chat session is created
41 #[derive(Message)]
42 #[rtype(usize)]
43 pub struct Connect {
44   pub addr: Recipient<WSMessage>,
45   pub ip: IPAddr,
46 }
47
48 /// Session is disconnected
49 #[derive(Message)]
50 #[rtype(result = "()")]
51 pub struct Disconnect {
52   pub id: ConnectionId,
53   pub ip: IPAddr,
54 }
55
56 #[derive(Serialize, Deserialize, Message)]
57 #[rtype(String)]
58 pub struct StandardMessage {
59   /// Id of the client session
60   pub id: ConnectionId,
61   /// Peer message
62   pub msg: String,
63 }
64
65 #[derive(Debug)]
66 pub struct RateLimitBucket {
67   last_checked: SystemTime,
68   allowance: f64,
69 }
70
71 pub struct SessionInfo {
72   pub addr: Recipient<WSMessage>,
73   pub ip: IPAddr,
74 }
75
76 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
77 pub enum RateLimitType {
78   Message,
79   Register,
80   Post,
81 }
82
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
84 /// session.
85 pub struct ChatServer {
86   /// A map from generated random ID to session addr
87   sessions: HashMap<ConnectionId, SessionInfo>,
88
89   /// A map from post_id to set of connectionIDs
90   post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
91
92   /// A map from community to set of connectionIDs
93   community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
94
95   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
96   /// sessions (IE clients)
97   user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
98
99   /// Rate limiting based on rate type and IP addr
100   rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
101
102   rng: ThreadRng,
103   db: Pool<ConnectionManager<PgConnection>>,
104 }
105
106 impl ChatServer {
107   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
108     ChatServer {
109       sessions: HashMap::new(),
110       rate_limit_buckets: HashMap::new(),
111       post_rooms: HashMap::new(),
112       community_rooms: HashMap::new(),
113       user_rooms: HashMap::new(),
114       rng: rand::thread_rng(),
115       db,
116     }
117   }
118
119   fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
120     // remove session from all rooms
121     for sessions in self.community_rooms.values_mut() {
122       sessions.remove(&id);
123     }
124
125     // Also leave all post rooms
126     // This avoids double messages
127     for sessions in self.post_rooms.values_mut() {
128       sessions.remove(&id);
129     }
130
131     // If the room doesn't exist yet
132     if self.community_rooms.get_mut(&community_id).is_none() {
133       self.community_rooms.insert(community_id, HashSet::new());
134     }
135
136     self
137       .community_rooms
138       .get_mut(&community_id)
139       .unwrap()
140       .insert(id);
141   }
142
143   fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
144     // remove session from all rooms
145     for sessions in self.post_rooms.values_mut() {
146       sessions.remove(&id);
147     }
148
149     // Also leave all communities
150     // This avoids double messages
151     for sessions in self.community_rooms.values_mut() {
152       sessions.remove(&id);
153     }
154
155     // If the room doesn't exist yet
156     if self.post_rooms.get_mut(&post_id).is_none() {
157       self.post_rooms.insert(post_id, HashSet::new());
158     }
159
160     self.post_rooms.get_mut(&post_id).unwrap().insert(id);
161   }
162
163   fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
164     // remove session from all rooms
165     for sessions in self.user_rooms.values_mut() {
166       sessions.remove(&id);
167     }
168
169     // If the room doesn't exist yet
170     if self.user_rooms.get_mut(&user_id).is_none() {
171       self.user_rooms.insert(user_id, HashSet::new());
172     }
173
174     self.user_rooms.get_mut(&user_id).unwrap().insert(id);
175   }
176
177   fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
178     if let Some(sessions) = self.post_rooms.get(&post_id) {
179       for id in sessions {
180         if *id != skip_id {
181           if let Some(info) = self.sessions.get(id) {
182             let _ = info.addr.do_send(WSMessage(message.to_owned()));
183           }
184         }
185       }
186     }
187   }
188
189   fn send_community_room_message(
190     &self,
191     community_id: CommunityId,
192     message: &str,
193     skip_id: ConnectionId,
194   ) {
195     if let Some(sessions) = self.community_rooms.get(&community_id) {
196       for id in sessions {
197         if *id != skip_id {
198           if let Some(info) = self.sessions.get(id) {
199             let _ = info.addr.do_send(WSMessage(message.to_owned()));
200           }
201         }
202       }
203     }
204   }
205
206   fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
207     if let Some(sessions) = self.user_rooms.get(&user_id) {
208       for id in sessions {
209         if *id != skip_id {
210           if let Some(info) = self.sessions.get(id) {
211             let _ = info.addr.do_send(WSMessage(message.to_owned()));
212           }
213         }
214       }
215     }
216   }
217
218   fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
219     for id in self.sessions.keys() {
220       if *id != skip_id {
221         if let Some(info) = self.sessions.get(id) {
222           let _ = info.addr.do_send(WSMessage(message.to_owned()));
223         }
224       }
225     }
226   }
227
228   fn comment_sends(
229     &self,
230     user_operation: UserOperation,
231     comment: CommentResponse,
232     id: ConnectionId,
233   ) -> Result<String, Error> {
234     let mut comment_reply_sent = comment.clone();
235     comment_reply_sent.comment.my_vote = None;
236     comment_reply_sent.comment.user_id = None;
237
238     // For the post room ones, and the directs back to the user
239     // strip out the recipient_ids, so that
240     // users don't get double notifs
241     let mut comment_user_sent = comment.clone();
242     comment_user_sent.recipient_ids = Vec::new();
243
244     let mut comment_post_sent = comment_reply_sent.clone();
245     comment_post_sent.recipient_ids = Vec::new();
246
247     let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
248     let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
249     let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
250
251     // Send it to the post room
252     self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
253
254     // Send it to the recipient(s) including the mentioned users
255     for recipient_id in comment_reply_sent.recipient_ids {
256       self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
257     }
258
259     // Send it to the community too
260     self.send_community_room_message(0, &comment_post_sent_str, id);
261     self.send_community_room_message(comment.comment.community_id, &comment_post_sent_str, id);
262
263     Ok(comment_user_sent_str)
264   }
265
266   fn post_sends(
267     &self,
268     user_operation: UserOperation,
269     post: PostResponse,
270     id: ConnectionId,
271   ) -> Result<String, Error> {
272     let community_id = post.post.community_id;
273
274     // Don't send my data with it
275     let mut post_sent = post.clone();
276     post_sent.post.my_vote = None;
277     post_sent.post.user_id = None;
278     let post_sent_str = to_json_string(&user_operation, &post_sent)?;
279
280     // Send it to /c/all and that community
281     self.send_community_room_message(0, &post_sent_str, id);
282     self.send_community_room_message(community_id, &post_sent_str, id);
283
284     // Send it to the post room
285     self.send_post_room_message(post_sent.post.id, &post_sent_str, id);
286
287     to_json_string(&user_operation, post)
288   }
289
290   fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
291     self.check_rate_limit_full(
292       RateLimitType::Register,
293       id,
294       Settings::get().rate_limit.register,
295       Settings::get().rate_limit.register_per_second,
296       check_only,
297     )
298   }
299
300   fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
301     self.check_rate_limit_full(
302       RateLimitType::Post,
303       id,
304       Settings::get().rate_limit.post,
305       Settings::get().rate_limit.post_per_second,
306       check_only,
307     )
308   }
309
310   fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
311     self.check_rate_limit_full(
312       RateLimitType::Message,
313       id,
314       Settings::get().rate_limit.message,
315       Settings::get().rate_limit.message_per_second,
316       check_only,
317     )
318   }
319
320   #[allow(clippy::float_cmp)]
321   fn check_rate_limit_full(
322     &mut self,
323     type_: RateLimitType,
324     id: usize,
325     rate: i32,
326     per: i32,
327     check_only: bool,
328   ) -> Result<(), Error> {
329     if let Some(info) = self.sessions.get(&id) {
330       if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
331         if let Some(rate_limit) = bucket.get_mut(&info.ip) {
332           let current = SystemTime::now();
333           let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
334
335           // The initial value
336           if rate_limit.allowance == -2f64 {
337             rate_limit.allowance = rate as f64;
338           };
339
340           rate_limit.last_checked = current;
341           rate_limit.allowance += time_passed * (rate as f64 / per as f64);
342           if !check_only && rate_limit.allowance > rate as f64 {
343             rate_limit.allowance = rate as f64;
344           }
345
346           if rate_limit.allowance < 1.0 {
347             println!(
348               "Rate limited IP: {}, time_passed: {}, allowance: {}",
349               &info.ip, time_passed, rate_limit.allowance
350             );
351             Err(
352               APIError {
353                 message: format!("Too many requests. {} per {} seconds", rate, per),
354               }
355               .into(),
356             )
357           } else {
358             if !check_only {
359               rate_limit.allowance -= 1.0;
360             }
361             Ok(())
362           }
363         } else {
364           Ok(())
365         }
366       } else {
367         Ok(())
368       }
369     } else {
370       Ok(())
371     }
372   }
373 }
374
375 /// Make actor from `ChatServer`
376 impl Actor for ChatServer {
377   /// We are going to use simple Context, we just need ability to communicate
378   /// with other actors.
379   type Context = Context<Self>;
380 }
381
382 /// Handler for Connect message.
383 ///
384 /// Register new session and assign unique id to this session
385 impl Handler<Connect> for ChatServer {
386   type Result = usize;
387
388   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
389     // register session with random id
390     let id = self.rng.gen::<usize>();
391     println!("{} joined", &msg.ip);
392
393     self.sessions.insert(
394       id,
395       SessionInfo {
396         addr: msg.addr,
397         ip: msg.ip.to_owned(),
398       },
399     );
400
401     for rate_limit_type in RateLimitType::iter() {
402       if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
403         self
404           .rate_limit_buckets
405           .insert(rate_limit_type, HashMap::new());
406       }
407
408       if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
409         if bucket.get(&msg.ip).is_none() {
410           bucket.insert(
411             msg.ip.to_owned(),
412             RateLimitBucket {
413               last_checked: SystemTime::now(),
414               allowance: -2f64,
415             },
416           );
417         }
418       }
419     }
420
421     id
422   }
423 }
424
425 /// Handler for Disconnect message.
426 impl Handler<Disconnect> for ChatServer {
427   type Result = ();
428
429   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
430     // Remove connections from sessions and all 3 scopes
431     if self.sessions.remove(&msg.id).is_some() {
432       for sessions in self.user_rooms.values_mut() {
433         sessions.remove(&msg.id);
434       }
435
436       for sessions in self.post_rooms.values_mut() {
437         sessions.remove(&msg.id);
438       }
439
440       for sessions in self.community_rooms.values_mut() {
441         sessions.remove(&msg.id);
442       }
443     }
444   }
445 }
446
447 /// Handler for Message message.
448 impl Handler<StandardMessage> for ChatServer {
449   type Result = MessageResult<StandardMessage>;
450
451   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
452     let msg_out = match parse_json_message(self, msg) {
453       Ok(m) => m,
454       Err(e) => e.to_string(),
455     };
456
457     println!("Message Sent: {}", msg_out);
458     MessageResult(msg_out)
459   }
460 }
461
462 #[derive(Serialize)]
463 struct WebsocketResponse<T> {
464   op: String,
465   data: T,
466 }
467
468 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
469 where
470   T: Serialize,
471 {
472   let response = WebsocketResponse {
473     op: op.to_string(),
474     data,
475   };
476   Ok(serde_json::to_string(&response)?)
477 }
478
479 fn do_user_operation<'a, Data, Response>(
480   op: UserOperation,
481   data: &str,
482   conn: &PooledConnection<ConnectionManager<PgConnection>>,
483 ) -> Result<String, Error>
484 where
485   for<'de> Data: Deserialize<'de> + 'a,
486   Response: Serialize,
487   Oper<Data>: Perform<Response>,
488 {
489   let parsed_data: Data = serde_json::from_str(data)?;
490   let res = Oper::new(parsed_data).perform(&conn)?;
491   to_json_string(&op, &res)
492 }
493
494 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
495   let json: Value = serde_json::from_str(&msg.msg)?;
496   let data = &json["data"].to_string();
497   let op = &json["op"].as_str().ok_or(APIError {
498     message: "Unknown op type".to_string(),
499   })?;
500
501   let conn = chat.db.get()?;
502
503   let user_operation: UserOperation = UserOperation::from_str(&op)?;
504
505   // TODO: none of the chat messages are going to work if stuff is submitted via http api,
506   //       need to move that handling elsewhere
507
508   // A DDOS check
509   chat.check_rate_limit_message(msg.id, false)?;
510
511   match user_operation {
512     UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
513     UserOperation::Register => {
514       chat.check_rate_limit_register(msg.id, true)?;
515       let register: Register = serde_json::from_str(data)?;
516       let res = Oper::new(register).perform(&conn)?;
517       chat.check_rate_limit_register(msg.id, false)?;
518       to_json_string(&user_operation, &res)
519     }
520     UserOperation::GetUserDetails => {
521       do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
522     }
523     UserOperation::SaveUserSettings => {
524       do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
525     }
526     UserOperation::AddAdmin => {
527       let add_admin: AddAdmin = serde_json::from_str(data)?;
528       let res = Oper::new(add_admin).perform(&conn)?;
529       let res_str = to_json_string(&user_operation, &res)?;
530       chat.send_all_message(&res_str, msg.id);
531       Ok(res_str)
532     }
533     UserOperation::BanUser => {
534       let ban_user: BanUser = serde_json::from_str(data)?;
535       let res = Oper::new(ban_user).perform(&conn)?;
536       let res_str = to_json_string(&user_operation, &res)?;
537       chat.send_all_message(&res_str, msg.id);
538       Ok(res_str)
539     }
540     UserOperation::GetReplies => {
541       do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
542     }
543     UserOperation::GetUserMentions => {
544       do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
545     }
546     UserOperation::EditUserMention => {
547       do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
548     }
549     UserOperation::MarkAllAsRead => {
550       do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
551     }
552     UserOperation::GetCommunity => {
553       let get_community: GetCommunity = serde_json::from_str(data)?;
554
555       let mut res = if Settings::get().federation_enabled {
556         if let Some(community_name) = get_community.name.to_owned() {
557           if community_name.contains('@') {
558             // TODO: need to support sort, filter etc for remote communities
559             get_remote_community(community_name)?
560           // TODO what is this about
561           // get_community.name = Some(name.replace("!", ""));
562           } else {
563             Oper::new(get_community).perform(&conn)?
564           }
565         } else {
566           Oper::new(get_community).perform(&conn)?
567         }
568       } else {
569         Oper::new(get_community).perform(&conn)?
570       };
571
572       let community_id = res.community.id;
573
574       chat.join_community_room(community_id, msg.id);
575
576       res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
577         community_users.len()
578       } else {
579         0
580       };
581
582       to_json_string(&user_operation, &res)
583     }
584     UserOperation::ListCommunities => {
585       if Settings::get().federation_enabled {
586         let res = get_all_communities()?;
587         let val = ListCommunitiesResponse { communities: res };
588         to_json_string(&user_operation, &val)
589       } else {
590         do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
591       }
592     }
593     UserOperation::CreateCommunity => {
594       chat.check_rate_limit_register(msg.id, true)?;
595       let create_community: CreateCommunity = serde_json::from_str(data)?;
596       let res = Oper::new(create_community).perform(&conn)?;
597       chat.check_rate_limit_register(msg.id, false)?;
598       to_json_string(&user_operation, &res)
599     }
600     UserOperation::EditCommunity => {
601       let edit_community: EditCommunity = serde_json::from_str(data)?;
602       let res = Oper::new(edit_community).perform(&conn)?;
603       let mut community_sent: CommunityResponse = res.clone();
604       community_sent.community.user_id = None;
605       community_sent.community.subscribed = None;
606       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
607       chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
608       to_json_string(&user_operation, &res)
609     }
610     UserOperation::FollowCommunity => {
611       do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
612     }
613     UserOperation::GetFollowedCommunities => do_user_operation::<
614       GetFollowedCommunities,
615       GetFollowedCommunitiesResponse,
616     >(user_operation, data, &conn),
617     UserOperation::BanFromCommunity => {
618       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
619       let community_id = ban_from_community.community_id;
620       let res = Oper::new(ban_from_community).perform(&conn)?;
621       let res_str = to_json_string(&user_operation, &res)?;
622       chat.send_community_room_message(community_id, &res_str, msg.id);
623       Ok(res_str)
624     }
625     UserOperation::AddModToCommunity => {
626       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
627       let community_id = mod_add_to_community.community_id;
628       let res = Oper::new(mod_add_to_community).perform(&conn)?;
629       let res_str = to_json_string(&user_operation, &res)?;
630       chat.send_community_room_message(community_id, &res_str, msg.id);
631       Ok(res_str)
632     }
633     UserOperation::ListCategories => {
634       do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
635     }
636     UserOperation::GetPost => {
637       let get_post: GetPost = serde_json::from_str(data)?;
638       let post_id = get_post.id;
639       chat.join_post_room(post_id, msg.id);
640       let mut res = Oper::new(get_post).perform(&conn)?;
641
642       res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
643         post_users.len()
644       } else {
645         0
646       };
647
648       to_json_string(&user_operation, &res)
649     }
650     UserOperation::GetPosts => {
651       let get_posts: GetPosts = serde_json::from_str(data)?;
652       if get_posts.community_id.is_none() {
653         // 0 is the "all" community
654         chat.join_community_room(0, msg.id);
655       }
656       let res = Oper::new(get_posts).perform(&conn)?;
657       to_json_string(&user_operation, &res)
658     }
659     UserOperation::GetComments => {
660       let get_comments: GetComments = serde_json::from_str(data)?;
661       if get_comments.community_id.is_none() {
662         // 0 is the "all" community
663         chat.join_community_room(0, msg.id);
664       }
665       let res = Oper::new(get_comments).perform(&conn)?;
666       to_json_string(&user_operation, &res)
667     }
668     UserOperation::CreatePost => {
669       chat.check_rate_limit_post(msg.id, true)?;
670       let create_post: CreatePost = serde_json::from_str(data)?;
671       let res = Oper::new(create_post).perform(&conn)?;
672       chat.check_rate_limit_post(msg.id, false)?;
673
674       chat.post_sends(UserOperation::CreatePost, res, msg.id)
675     }
676     UserOperation::CreatePostLike => {
677       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
678       let res = Oper::new(create_post_like).perform(&conn)?;
679
680       chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
681     }
682     UserOperation::EditPost => {
683       let edit_post: EditPost = serde_json::from_str(data)?;
684       let res = Oper::new(edit_post).perform(&conn)?;
685
686       chat.post_sends(UserOperation::EditPost, res, msg.id)
687     }
688     UserOperation::SavePost => {
689       do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
690     }
691     UserOperation::CreateComment => {
692       let create_comment: CreateComment = serde_json::from_str(data)?;
693       let res = Oper::new(create_comment).perform(&conn)?;
694
695       chat.comment_sends(UserOperation::CreateComment, res, msg.id)
696     }
697     UserOperation::EditComment => {
698       let edit_comment: EditComment = serde_json::from_str(data)?;
699       let res = Oper::new(edit_comment).perform(&conn)?;
700
701       chat.comment_sends(UserOperation::EditComment, res, msg.id)
702     }
703     UserOperation::SaveComment => {
704       do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
705     }
706     UserOperation::CreateCommentLike => {
707       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
708       let res = Oper::new(create_comment_like).perform(&conn)?;
709
710       chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
711     }
712     UserOperation::GetModlog => {
713       do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
714     }
715     UserOperation::CreateSite => {
716       do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
717     }
718     UserOperation::EditSite => {
719       let edit_site: EditSite = serde_json::from_str(data)?;
720       let res = Oper::new(edit_site).perform(&conn)?;
721       let res_str = to_json_string(&user_operation, &res)?;
722       chat.send_all_message(&res_str, msg.id);
723       Ok(res_str)
724     }
725     UserOperation::GetSite => {
726       let get_site: GetSite = serde_json::from_str(data)?;
727       let mut res = Oper::new(get_site).perform(&conn)?;
728       res.online = chat.sessions.len();
729       to_json_string(&user_operation, &res)
730     }
731     UserOperation::Search => {
732       do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
733     }
734     UserOperation::TransferCommunity => {
735       do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
736     }
737     UserOperation::TransferSite => {
738       do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
739     }
740     UserOperation::DeleteAccount => {
741       do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
742     }
743     UserOperation::PasswordReset => {
744       do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
745     }
746     UserOperation::PasswordChange => {
747       do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
748     }
749     UserOperation::CreatePrivateMessage => {
750       let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
751       let recipient_id = create_private_message.recipient_id;
752       let res = Oper::new(create_private_message).perform(&conn)?;
753       let res_str = to_json_string(&user_operation, &res)?;
754
755       chat.send_user_room_message(recipient_id, &res_str, msg.id);
756       Ok(res_str)
757     }
758     UserOperation::EditPrivateMessage => {
759       do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
760     }
761     UserOperation::GetPrivateMessages => {
762       do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
763     }
764     UserOperation::UserJoin => {
765       let user_join: UserJoin = serde_json::from_str(data)?;
766       let res = Oper::new(user_join).perform(&conn)?;
767       chat.join_user_room(res.user_id, msg.id);
768       to_json_string(&user_operation, &res)
769     }
770   }
771 }