]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'actix-2.0' into dev
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20 use crate::Settings;
21
22 /// Chat server sends this messages to session
23 #[derive(Message)]
24 #[rtype(result = "()")]
25 pub struct WSMessage(pub String);
26
27 /// Message for chat server communications
28
29 /// New chat session is created
30 #[derive(Message)]
31 #[rtype(usize)]
32 pub struct Connect {
33   pub addr: Recipient<WSMessage>,
34   pub ip: String,
35 }
36
37 /// Session is disconnected
38 #[derive(Message)]
39 #[rtype(result = "()")]
40 pub struct Disconnect {
41   pub id: usize,
42   pub ip: String,
43 }
44
45 /// Send message to specific room
46 #[derive(Message)]
47 #[rtype(result = "()")]
48 pub struct ClientMessage {
49   /// Id of the client session
50   pub id: usize,
51   /// Peer message
52   pub msg: String,
53   /// Room name
54   pub room: String,
55 }
56
57 #[derive(Serialize, Deserialize, Message)]
58 #[rtype(String)]
59 pub struct StandardMessage {
60   /// Id of the client session
61   pub id: usize,
62   /// Peer message
63   pub msg: String,
64 }
65
66 #[derive(Debug)]
67 pub struct RateLimitBucket {
68   last_checked: SystemTime,
69   allowance: f64,
70 }
71
72 pub struct SessionInfo {
73   pub addr: Recipient<WSMessage>,
74   pub ip: String,
75 }
76
77 /// `ChatServer` manages chat rooms and responsible for coordinating chat
78 /// session. implementation is super primitive
79 pub struct ChatServer {
80   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
81   rate_limits: HashMap<String, RateLimitBucket>,
82   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
83   rng: ThreadRng,
84 }
85
86 impl Default for ChatServer {
87   fn default() -> ChatServer {
88     // default room
89     let rooms = HashMap::new();
90
91     ChatServer {
92       sessions: HashMap::new(),
93       rate_limits: HashMap::new(),
94       rooms,
95       rng: rand::thread_rng(),
96     }
97   }
98 }
99
100 impl ChatServer {
101   /// Send message to all users in the room
102   fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
103     if let Some(sessions) = self.rooms.get(&room) {
104       for id in sessions {
105         if *id != skip_id {
106           if let Some(info) = self.sessions.get(id) {
107             let _ = info.addr.do_send(WSMessage(message.to_owned()));
108           }
109         }
110       }
111     }
112   }
113
114   fn join_room(&mut self, room_id: i32, id: usize) {
115     // remove session from all rooms
116     for sessions in self.rooms.values_mut() {
117       sessions.remove(&id);
118     }
119
120     // If the room doesn't exist yet
121     if self.rooms.get_mut(&room_id).is_none() {
122       self.rooms.insert(room_id, HashSet::new());
123     }
124
125     self.rooms.get_mut(&room_id).unwrap().insert(id);
126   }
127
128   fn send_community_message(
129     &self,
130     community_id: i32,
131     message: &str,
132     skip_id: usize,
133   ) -> Result<(), Error> {
134     use crate::db::post_view::*;
135     use crate::db::*;
136     let conn = establish_connection();
137
138     let posts = PostQueryBuilder::create(&conn)
139       .listing_type(ListingType::Community)
140       .sort(&SortType::New)
141       .for_community_id(community_id)
142       .limit(9999)
143       .list()?;
144
145     for post in posts {
146       self.send_room_message(post.id, message, skip_id);
147     }
148
149     Ok(())
150   }
151
152   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
153     self.check_rate_limit_full(
154       id,
155       Settings::get().rate_limit.register,
156       Settings::get().rate_limit.register_per_second,
157     )
158   }
159
160   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
161     self.check_rate_limit_full(
162       id,
163       Settings::get().rate_limit.post,
164       Settings::get().rate_limit.post_per_second,
165     )
166   }
167
168   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
169     self.check_rate_limit_full(
170       id,
171       Settings::get().rate_limit.message,
172       Settings::get().rate_limit.message_per_second,
173     )
174   }
175
176   #[allow(clippy::float_cmp)]
177   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
178     if let Some(info) = self.sessions.get(&id) {
179       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
180         // The initial value
181         if rate_limit.allowance == -2f64 {
182           rate_limit.allowance = rate as f64;
183         };
184
185         let current = SystemTime::now();
186         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
187         rate_limit.last_checked = current;
188         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
189         if rate_limit.allowance > rate as f64 {
190           rate_limit.allowance = rate as f64;
191         }
192
193         if rate_limit.allowance < 1.0 {
194           println!(
195             "Rate limited IP: {}, time_passed: {}, allowance: {}",
196             &info.ip, time_passed, rate_limit.allowance
197           );
198           Err(
199             APIError {
200               op: "Rate Limit".to_string(),
201               message: format!("Too many requests. {} per {} seconds", rate, per),
202             }
203             .into(),
204           )
205         } else {
206           rate_limit.allowance -= 1.0;
207           Ok(())
208         }
209       } else {
210         Ok(())
211       }
212     } else {
213       Ok(())
214     }
215   }
216 }
217
218 /// Make actor from `ChatServer`
219 impl Actor for ChatServer {
220   /// We are going to use simple Context, we just need ability to communicate
221   /// with other actors.
222   type Context = Context<Self>;
223 }
224
225 /// Handler for Connect message.
226 ///
227 /// Register new session and assign unique id to this session
228 impl Handler<Connect> for ChatServer {
229   type Result = usize;
230
231   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
232     // notify all users in same room
233     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
234
235     // register session with random id
236     let id = self.rng.gen::<usize>();
237     println!("{} joined", &msg.ip);
238
239     self.sessions.insert(
240       id,
241       SessionInfo {
242         addr: msg.addr,
243         ip: msg.ip.to_owned(),
244       },
245     );
246
247     if self.rate_limits.get(&msg.ip).is_none() {
248       self.rate_limits.insert(
249         msg.ip,
250         RateLimitBucket {
251           last_checked: SystemTime::now(),
252           allowance: -2f64,
253         },
254       );
255     }
256
257     id
258   }
259 }
260
261 /// Handler for Disconnect message.
262 impl Handler<Disconnect> for ChatServer {
263   type Result = ();
264
265   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
266     // let mut rooms: Vec<i32> = Vec::new();
267
268     // remove address
269     if self.sessions.remove(&msg.id).is_some() {
270       // remove session from all rooms
271       for sessions in self.rooms.values_mut() {
272         if sessions.remove(&msg.id) {
273           // rooms.push(*id);
274         }
275       }
276     }
277   }
278 }
279
280 /// Handler for Message message.
281 impl Handler<StandardMessage> for ChatServer {
282   type Result = MessageResult<StandardMessage>;
283
284   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
285     let msg_out = match parse_json_message(self, msg) {
286       Ok(m) => m,
287       Err(e) => e.to_string(),
288     };
289
290     MessageResult(msg_out)
291   }
292 }
293
294 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
295   let json: Value = serde_json::from_str(&msg.msg)?;
296   let data = &json["data"].to_string();
297   let op = &json["op"].as_str().ok_or(APIError {
298     op: "Unknown op type".to_string(),
299     message: "Unknown op type".to_string(),
300   })?;
301
302   let user_operation: UserOperation = UserOperation::from_str(&op)?;
303
304   match user_operation {
305     UserOperation::Login => {
306       let login: Login = serde_json::from_str(data)?;
307       let res = Oper::new(user_operation, login).perform()?;
308       Ok(serde_json::to_string(&res)?)
309     }
310     UserOperation::Register => {
311       let register: Register = serde_json::from_str(data)?;
312       let res = Oper::new(user_operation, register).perform();
313       if res.is_ok() {
314         chat.check_rate_limit_register(msg.id)?;
315       }
316       Ok(serde_json::to_string(&res?)?)
317     }
318     UserOperation::GetUserDetails => {
319       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
320       let res = Oper::new(user_operation, get_user_details).perform()?;
321       Ok(serde_json::to_string(&res)?)
322     }
323     UserOperation::SaveUserSettings => {
324       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
325       let res = Oper::new(user_operation, save_user_settings).perform()?;
326       Ok(serde_json::to_string(&res)?)
327     }
328     UserOperation::AddAdmin => {
329       let add_admin: AddAdmin = serde_json::from_str(data)?;
330       let res = Oper::new(user_operation, add_admin).perform()?;
331       Ok(serde_json::to_string(&res)?)
332     }
333     UserOperation::BanUser => {
334       let ban_user: BanUser = serde_json::from_str(data)?;
335       let res = Oper::new(user_operation, ban_user).perform()?;
336       Ok(serde_json::to_string(&res)?)
337     }
338     UserOperation::GetReplies => {
339       let get_replies: GetReplies = serde_json::from_str(data)?;
340       let res = Oper::new(user_operation, get_replies).perform()?;
341       Ok(serde_json::to_string(&res)?)
342     }
343     UserOperation::GetUserMentions => {
344       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
345       let res = Oper::new(user_operation, get_user_mentions).perform()?;
346       Ok(serde_json::to_string(&res)?)
347     }
348     UserOperation::EditUserMention => {
349       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
350       let res = Oper::new(user_operation, edit_user_mention).perform()?;
351       Ok(serde_json::to_string(&res)?)
352     }
353     UserOperation::MarkAllAsRead => {
354       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
355       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
356       Ok(serde_json::to_string(&res)?)
357     }
358     UserOperation::GetCommunity => {
359       let get_community: GetCommunity = serde_json::from_str(data)?;
360       let res = Oper::new(user_operation, get_community).perform()?;
361       Ok(serde_json::to_string(&res)?)
362     }
363     UserOperation::ListCommunities => {
364       let list_communities: ListCommunities = serde_json::from_str(data)?;
365       let res = Oper::new(user_operation, list_communities).perform()?;
366       Ok(serde_json::to_string(&res)?)
367     }
368     UserOperation::CreateCommunity => {
369       chat.check_rate_limit_register(msg.id)?;
370       let create_community: CreateCommunity = serde_json::from_str(data)?;
371       let res = Oper::new(user_operation, create_community).perform()?;
372       Ok(serde_json::to_string(&res)?)
373     }
374     UserOperation::EditCommunity => {
375       let edit_community: EditCommunity = serde_json::from_str(data)?;
376       let res = Oper::new(user_operation, edit_community).perform()?;
377       let mut community_sent: CommunityResponse = res.clone();
378       community_sent.community.user_id = None;
379       community_sent.community.subscribed = None;
380       let community_sent_str = serde_json::to_string(&community_sent)?;
381       chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
382       Ok(serde_json::to_string(&res)?)
383     }
384     UserOperation::FollowCommunity => {
385       let follow_community: FollowCommunity = serde_json::from_str(data)?;
386       let res = Oper::new(user_operation, follow_community).perform()?;
387       Ok(serde_json::to_string(&res)?)
388     }
389     UserOperation::GetFollowedCommunities => {
390       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
391       let res = Oper::new(user_operation, followed_communities).perform()?;
392       Ok(serde_json::to_string(&res)?)
393     }
394     UserOperation::BanFromCommunity => {
395       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
396       let community_id = ban_from_community.community_id;
397       let res = Oper::new(user_operation, ban_from_community).perform()?;
398       let res_str = serde_json::to_string(&res)?;
399       chat.send_community_message(community_id, &res_str, msg.id)?;
400       Ok(res_str)
401     }
402     UserOperation::AddModToCommunity => {
403       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
404       let community_id = mod_add_to_community.community_id;
405       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
406       let res_str = serde_json::to_string(&res)?;
407       chat.send_community_message(community_id, &res_str, msg.id)?;
408       Ok(res_str)
409     }
410     UserOperation::ListCategories => {
411       let list_categories: ListCategories = ListCategories;
412       let res = Oper::new(user_operation, list_categories).perform()?;
413       Ok(serde_json::to_string(&res)?)
414     }
415     UserOperation::CreatePost => {
416       chat.check_rate_limit_post(msg.id)?;
417       let create_post: CreatePost = serde_json::from_str(data)?;
418       let res = Oper::new(user_operation, create_post).perform()?;
419       Ok(serde_json::to_string(&res)?)
420     }
421     UserOperation::GetPost => {
422       let get_post: GetPost = serde_json::from_str(data)?;
423       chat.join_room(get_post.id, msg.id);
424       let res = Oper::new(user_operation, get_post).perform()?;
425       Ok(serde_json::to_string(&res)?)
426     }
427     UserOperation::GetPosts => {
428       let get_posts: GetPosts = serde_json::from_str(data)?;
429       let res = Oper::new(user_operation, get_posts).perform()?;
430       Ok(serde_json::to_string(&res)?)
431     }
432     UserOperation::CreatePostLike => {
433       chat.check_rate_limit_message(msg.id)?;
434       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
435       let res = Oper::new(user_operation, create_post_like).perform()?;
436       Ok(serde_json::to_string(&res)?)
437     }
438     UserOperation::EditPost => {
439       let edit_post: EditPost = serde_json::from_str(data)?;
440       let res = Oper::new(user_operation, edit_post).perform()?;
441       let mut post_sent = res.clone();
442       post_sent.post.my_vote = None;
443       let post_sent_str = serde_json::to_string(&post_sent)?;
444       chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
445       Ok(serde_json::to_string(&res)?)
446     }
447     UserOperation::SavePost => {
448       let save_post: SavePost = serde_json::from_str(data)?;
449       let res = Oper::new(user_operation, save_post).perform()?;
450       Ok(serde_json::to_string(&res)?)
451     }
452     UserOperation::CreateComment => {
453       chat.check_rate_limit_message(msg.id)?;
454       let create_comment: CreateComment = serde_json::from_str(data)?;
455       let post_id = create_comment.post_id;
456       let res = Oper::new(user_operation, create_comment).perform()?;
457       let mut comment_sent = res.clone();
458       comment_sent.comment.my_vote = None;
459       comment_sent.comment.user_id = None;
460       let comment_sent_str = serde_json::to_string(&comment_sent)?;
461       chat.send_room_message(post_id, &comment_sent_str, msg.id);
462       Ok(serde_json::to_string(&res)?)
463     }
464     UserOperation::EditComment => {
465       let edit_comment: EditComment = serde_json::from_str(data)?;
466       let post_id = edit_comment.post_id;
467       let res = Oper::new(user_operation, edit_comment).perform()?;
468       let mut comment_sent = res.clone();
469       comment_sent.comment.my_vote = None;
470       comment_sent.comment.user_id = None;
471       let comment_sent_str = serde_json::to_string(&comment_sent)?;
472       chat.send_room_message(post_id, &comment_sent_str, msg.id);
473       Ok(serde_json::to_string(&res)?)
474     }
475     UserOperation::SaveComment => {
476       let save_comment: SaveComment = serde_json::from_str(data)?;
477       let res = Oper::new(user_operation, save_comment).perform()?;
478       Ok(serde_json::to_string(&res)?)
479     }
480     UserOperation::CreateCommentLike => {
481       chat.check_rate_limit_message(msg.id)?;
482       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
483       let post_id = create_comment_like.post_id;
484       let res = Oper::new(user_operation, create_comment_like).perform()?;
485       let mut comment_sent = res.clone();
486       comment_sent.comment.my_vote = None;
487       comment_sent.comment.user_id = None;
488       let comment_sent_str = serde_json::to_string(&comment_sent)?;
489       chat.send_room_message(post_id, &comment_sent_str, msg.id);
490       Ok(serde_json::to_string(&res)?)
491     }
492     UserOperation::GetModlog => {
493       let get_modlog: GetModlog = serde_json::from_str(data)?;
494       let res = Oper::new(user_operation, get_modlog).perform()?;
495       Ok(serde_json::to_string(&res)?)
496     }
497     UserOperation::CreateSite => {
498       let create_site: CreateSite = serde_json::from_str(data)?;
499       let res = Oper::new(user_operation, create_site).perform()?;
500       Ok(serde_json::to_string(&res)?)
501     }
502     UserOperation::EditSite => {
503       let edit_site: EditSite = serde_json::from_str(data)?;
504       let res = Oper::new(user_operation, edit_site).perform()?;
505       Ok(serde_json::to_string(&res)?)
506     }
507     UserOperation::GetSite => {
508       let online: usize = chat.sessions.len();
509       let get_site: GetSite = serde_json::from_str(data)?;
510       let mut res = Oper::new(user_operation, get_site).perform()?;
511       res.online = online;
512       Ok(serde_json::to_string(&res)?)
513     }
514     UserOperation::Search => {
515       let search: Search = serde_json::from_str(data)?;
516       let res = Oper::new(user_operation, search).perform()?;
517       Ok(serde_json::to_string(&res)?)
518     }
519     UserOperation::TransferCommunity => {
520       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
521       let res = Oper::new(user_operation, transfer_community).perform()?;
522       Ok(serde_json::to_string(&res)?)
523     }
524     UserOperation::TransferSite => {
525       let transfer_site: TransferSite = serde_json::from_str(data)?;
526       let res = Oper::new(user_operation, transfer_site).perform()?;
527       Ok(serde_json::to_string(&res)?)
528     }
529     UserOperation::DeleteAccount => {
530       let delete_account: DeleteAccount = serde_json::from_str(data)?;
531       let res = Oper::new(user_operation, delete_account).perform()?;
532       Ok(serde_json::to_string(&res)?)
533     }
534     UserOperation::PasswordReset => {
535       let password_reset: PasswordReset = serde_json::from_str(data)?;
536       let res = Oper::new(user_operation, password_reset).perform()?;
537       Ok(serde_json::to_string(&res)?)
538     }
539     UserOperation::PasswordChange => {
540       let password_change: PasswordChange = serde_json::from_str(data)?;
541       let res = Oper::new(user_operation, password_change).perform()?;
542       Ok(serde_json::to_string(&res)?)
543     }
544   }
545 }