]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'dev' into nsfw
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
11 use failure::Error;
12 use std::time::{SystemTime};
13
14 use crate::api::*;
15 use crate::api::user::*;
16 use crate::api::community::*;
17 use crate::api::post::*;
18 use crate::api::comment::*;
19 use crate::api::site::*;
20
21 const RATE_LIMIT_MESSAGES: i32 = 30;
22 const RATE_LIMIT_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_REGISTER_MESSAGES: i32 = 3;
24 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*3;
25
26
27 /// Chat server sends this messages to session
28 #[derive(Message)]
29 pub struct WSMessage(pub String);
30
31 /// Message for chat server communications
32
33 /// New chat session is created
34 #[derive(Message)]
35 #[rtype(usize)]
36 pub struct Connect {
37   pub addr: Recipient<WSMessage>,
38   pub ip: String,
39 }
40
41 /// Session is disconnected
42 #[derive(Message)]
43 pub struct Disconnect {
44   pub id: usize,
45   pub ip: String,
46 }
47
48 /// Send message to specific room
49 #[derive(Message)]
50 pub struct ClientMessage {
51   /// Id of the client session
52   pub id: usize,
53   /// Peer message
54   pub msg: String,
55   /// Room name
56   pub room: String,
57 }
58
59 #[derive(Serialize, Deserialize)]
60 pub struct StandardMessage {
61   /// Id of the client session
62   pub id: usize,
63   /// Peer message
64   pub msg: String,
65 }
66
67 impl actix::Message for StandardMessage {
68   type Result = String;
69 }
70
71 #[derive(Debug)]
72 pub struct RateLimitBucket {
73   last_checked: SystemTime,
74   allowance: f64
75 }
76
77 pub struct SessionInfo {
78   pub addr: Recipient<WSMessage>,
79   pub ip: String,
80 }
81
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session. implementation is super primitive
84 pub struct ChatServer {
85   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
86   rate_limits: HashMap<String, RateLimitBucket>,
87   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
88   rng: ThreadRng,
89 }
90
91 impl Default for ChatServer {
92   fn default() -> ChatServer {
93     // default room
94     let rooms = HashMap::new();
95
96     ChatServer {
97       sessions: HashMap::new(),
98       rate_limits: HashMap::new(),
99       rooms: rooms,
100       rng: rand::thread_rng(),
101     }
102   }
103 }
104
105 impl ChatServer {
106   /// Send message to all users in the room
107   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
108     if let Some(sessions) = self.rooms.get(room) {
109       for id in sessions {
110         if *id != skip_id {
111           if let Some(info) = self.sessions.get(id) {
112             let _ = info.addr.do_send(WSMessage(message.to_owned()));
113           }
114         }
115       }
116     }
117   }
118
119   fn join_room(&mut self, room_id: i32, id: usize) {
120     // remove session from all rooms
121     for (_n, sessions) in &mut self.rooms {
122       sessions.remove(&id);
123     }
124
125     // If the room doesn't exist yet
126     if self.rooms.get_mut(&room_id).is_none() {
127       self.rooms.insert(room_id, HashSet::new());
128     }
129
130     &self.rooms.get_mut(&room_id).unwrap().insert(id);
131   }
132
133   fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
134     use crate::db::*;
135     use crate::db::post_view::*;
136     let conn = establish_connection();
137     let posts = PostView::list(
138       &conn,
139       PostListingType::Community, 
140       &SortType::New, 
141       Some(*community_id), 
142       None,
143       None, 
144       None,
145       false,
146       false,
147       false,
148       None,
149       Some(9999))?;
150     for post in posts {
151       self.send_room_message(&post.id, message, skip_id);
152     }
153
154     Ok(())
155   }
156
157   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
158     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
159   }
160
161   fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
162     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
163   }
164
165   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
166     if let Some(info) = self.sessions.get(&id) {
167       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
168         // The initial value
169         if rate_limit.allowance == -2f64 {
170           rate_limit.allowance = rate as f64;
171         };
172
173         let current = SystemTime::now();
174         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
175         rate_limit.last_checked = current;
176         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
177         if rate_limit.allowance > rate as f64 {
178           rate_limit.allowance = rate as f64;
179         }
180
181         if rate_limit.allowance < 1.0 {
182           println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
183           Err(APIError {
184             op: "Rate Limit".to_string(), 
185             message: format!("Too many requests. {} per {} seconds", rate, per),
186           })?
187         } else {
188           rate_limit.allowance -= 1.0;
189           Ok(())
190         }
191       } else {
192         Ok(())
193       }
194     } else {
195       Ok(())
196     }
197   }
198 }
199
200
201 /// Make actor from `ChatServer`
202 impl Actor for ChatServer {
203   /// We are going to use simple Context, we just need ability to communicate
204   /// with other actors.
205   type Context = Context<Self>;
206 }
207
208 /// Handler for Connect message.
209 ///
210 /// Register new session and assign unique id to this session
211 impl Handler<Connect> for ChatServer {
212   type Result = usize;
213
214   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
215
216     // notify all users in same room
217     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
218
219     // register session with random id
220     let id = self.rng.gen::<usize>();
221     println!("{} joined", &msg.ip);
222
223     self.sessions.insert(id, SessionInfo {
224       addr: msg.addr,
225       ip: msg.ip.to_owned(),
226     });
227
228     if self.rate_limits.get(&msg.ip).is_none() {
229       self.rate_limits.insert(msg.ip, RateLimitBucket {
230         last_checked: SystemTime::now(),
231         allowance: -2f64,
232       });
233     }
234
235     // for (k,v) in &self.rate_limits {
236     //   println!("{}: {:?}", k,v);
237     // }
238
239     // auto join session to Main room
240     // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
241
242     // send id back
243     id
244   }
245 }
246
247 /// Handler for Disconnect message.
248 impl Handler<Disconnect> for ChatServer {
249   type Result = ();
250
251   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
252
253     // let mut rooms: Vec<i32> = Vec::new();
254
255     // remove address
256     if self.sessions.remove(&msg.id).is_some() {
257       // remove session from all rooms
258       for (_id, sessions) in &mut self.rooms {
259         if sessions.remove(&msg.id) {
260           // rooms.push(*id);
261         }
262       }
263     }
264   }
265 }
266
267 /// Handler for Message message.
268 impl Handler<StandardMessage> for ChatServer {
269   type Result = MessageResult<StandardMessage>;
270
271
272   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
273
274     let msg_out = match parse_json_message(self, msg) {
275       Ok(m) => m,
276       Err(e) => e.to_string()
277     };
278
279     MessageResult(msg_out)
280   }
281 }
282
283 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
284
285   let json: Value = serde_json::from_str(&msg.msg)?;
286   let data = &json["data"].to_string();
287   let op = &json["op"].as_str().unwrap();
288
289   let user_operation: UserOperation = UserOperation::from_str(&op)?;
290
291   match user_operation {
292     UserOperation::Login => {
293       let login: Login = serde_json::from_str(data)?;
294       let res = Oper::new(user_operation, login).perform()?;
295       Ok(serde_json::to_string(&res)?)
296     },
297     UserOperation::Register => {
298       chat.check_rate_limit_register(msg.id)?;
299       let register: Register = serde_json::from_str(data)?;
300       let res = Oper::new(user_operation, register).perform()?;
301       Ok(serde_json::to_string(&res)?)
302     },
303     UserOperation::GetUserDetails => {
304       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
305       let res = Oper::new(user_operation, get_user_details).perform()?;
306       Ok(serde_json::to_string(&res)?)
307     },
308     UserOperation::AddAdmin => {
309       let add_admin: AddAdmin = serde_json::from_str(data)?;
310       let res = Oper::new(user_operation, add_admin).perform()?;
311       Ok(serde_json::to_string(&res)?)
312     },
313     UserOperation::BanUser => {
314       let ban_user: BanUser = serde_json::from_str(data)?;
315       let res = Oper::new(user_operation, ban_user).perform()?;
316       Ok(serde_json::to_string(&res)?)
317     },
318     UserOperation::GetReplies => {
319       let get_replies: GetReplies = serde_json::from_str(data)?;
320       let res = Oper::new(user_operation, get_replies).perform()?;
321       Ok(serde_json::to_string(&res)?)
322     },
323     UserOperation::MarkAllAsRead => {
324       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
325       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
326       Ok(serde_json::to_string(&res)?)
327     },
328     UserOperation::GetCommunity => {
329       let get_community: GetCommunity = serde_json::from_str(data)?;
330       let res = Oper::new(user_operation, get_community).perform()?;
331       Ok(serde_json::to_string(&res)?)
332     },
333     UserOperation::ListCommunities => {
334       let list_communities: ListCommunities = serde_json::from_str(data)?;
335       let res = Oper::new(user_operation, list_communities).perform()?;
336       Ok(serde_json::to_string(&res)?)
337     },
338     UserOperation::CreateCommunity => {
339       chat.check_rate_limit_register(msg.id)?;
340       let create_community: CreateCommunity = serde_json::from_str(data)?;
341       let res = Oper::new(user_operation, create_community).perform()?;
342       Ok(serde_json::to_string(&res)?)
343     },
344     UserOperation::EditCommunity => {
345       let edit_community: EditCommunity = serde_json::from_str(data)?;
346       let res = Oper::new(user_operation, edit_community).perform()?;
347       let mut community_sent: CommunityResponse = res.clone();
348       community_sent.community.user_id = None;
349       community_sent.community.subscribed = None;
350       let community_sent_str = serde_json::to_string(&community_sent)?;
351       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
352       Ok(serde_json::to_string(&res)?)
353     },
354     UserOperation::FollowCommunity => {
355       let follow_community: FollowCommunity = serde_json::from_str(data)?;
356       let res = Oper::new(user_operation, follow_community).perform()?;
357       Ok(serde_json::to_string(&res)?)
358     },
359     UserOperation::GetFollowedCommunities => {
360       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
361       let res = Oper::new(user_operation, followed_communities).perform()?;
362       Ok(serde_json::to_string(&res)?)
363     },
364     UserOperation::BanFromCommunity => {
365       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
366       let community_id = ban_from_community.community_id;
367       let res = Oper::new(user_operation, ban_from_community).perform()?;
368       let res_str = serde_json::to_string(&res)?;
369       chat.send_community_message(&community_id, &res_str, msg.id)?;
370       Ok(res_str)
371     },
372     UserOperation::AddModToCommunity => {
373       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
374       let community_id = mod_add_to_community.community_id;
375       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
376       let res_str = serde_json::to_string(&res)?;
377       chat.send_community_message(&community_id, &res_str, msg.id)?;
378       Ok(res_str)
379     },
380     UserOperation::ListCategories => {
381       let list_categories: ListCategories = ListCategories;
382       let res = Oper::new(user_operation, list_categories).perform()?;
383       Ok(serde_json::to_string(&res)?)
384     },
385     UserOperation::CreatePost => {
386       chat.check_rate_limit_register(msg.id)?;
387       let create_post: CreatePost = serde_json::from_str(data)?;
388       let res = Oper::new(user_operation, create_post).perform()?;
389       Ok(serde_json::to_string(&res)?)
390     },
391     UserOperation::GetPost => {
392       let get_post: GetPost = serde_json::from_str(data)?;
393       chat.join_room(get_post.id, msg.id);
394       let res = Oper::new(user_operation, get_post).perform()?;
395       Ok(serde_json::to_string(&res)?)
396     },
397     UserOperation::GetPosts => {
398       let get_posts: GetPosts = serde_json::from_str(data)?;
399       let res = Oper::new(user_operation, get_posts).perform()?;
400       Ok(serde_json::to_string(&res)?)
401     },
402     UserOperation::CreatePostLike => {
403       chat.check_rate_limit(msg.id)?;
404       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
405       let res = Oper::new(user_operation, create_post_like).perform()?;
406       Ok(serde_json::to_string(&res)?)
407     },
408     UserOperation::EditPost => {
409       let edit_post: EditPost = serde_json::from_str(data)?;
410       let res = Oper::new(user_operation, edit_post).perform()?;
411       let mut post_sent = res.clone();
412       post_sent.post.my_vote = None;
413       let post_sent_str = serde_json::to_string(&post_sent)?;
414       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
415       Ok(serde_json::to_string(&res)?)
416     },
417     UserOperation::SavePost => {
418       let save_post: SavePost = serde_json::from_str(data)?;
419       let res = Oper::new(user_operation, save_post).perform()?;
420       Ok(serde_json::to_string(&res)?)
421     },
422     UserOperation::CreateComment => {
423       chat.check_rate_limit(msg.id)?;
424       let create_comment: CreateComment = serde_json::from_str(data)?;
425       let post_id = create_comment.post_id;
426       let res = Oper::new(user_operation, create_comment).perform()?;
427       let mut comment_sent = res.clone();
428       comment_sent.comment.my_vote = None;
429       comment_sent.comment.user_id = None;
430       let comment_sent_str = serde_json::to_string(&comment_sent)?;
431       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
432       Ok(serde_json::to_string(&res)?)
433     },
434     UserOperation::EditComment => {
435       let edit_comment: EditComment = serde_json::from_str(data)?;
436       let post_id = edit_comment.post_id;
437       let res = Oper::new(user_operation, edit_comment).perform()?;
438       let mut comment_sent = res.clone();
439       comment_sent.comment.my_vote = None;
440       comment_sent.comment.user_id = None;
441       let comment_sent_str = serde_json::to_string(&comment_sent)?;
442       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
443       Ok(serde_json::to_string(&res)?)
444     },
445     UserOperation::SaveComment => {
446       let save_comment: SaveComment = serde_json::from_str(data)?;
447       let res = Oper::new(user_operation, save_comment).perform()?;
448       Ok(serde_json::to_string(&res)?)
449     },
450     UserOperation::CreateCommentLike => {
451       chat.check_rate_limit(msg.id)?;
452       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
453       let post_id = create_comment_like.post_id;
454       let res = Oper::new(user_operation, create_comment_like).perform()?;
455       let mut comment_sent = res.clone();
456       comment_sent.comment.my_vote = None;
457       comment_sent.comment.user_id = None;
458       let comment_sent_str = serde_json::to_string(&comment_sent)?;
459       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
460       Ok(serde_json::to_string(&res)?)
461     },
462     UserOperation::GetModlog => {
463       let get_modlog: GetModlog = serde_json::from_str(data)?;
464       let res = Oper::new(user_operation, get_modlog).perform()?;
465       Ok(serde_json::to_string(&res)?)
466     },
467     UserOperation::CreateSite => {
468       let create_site: CreateSite = serde_json::from_str(data)?;
469       let res = Oper::new(user_operation, create_site).perform()?;
470       Ok(serde_json::to_string(&res)?)
471     },
472     UserOperation::EditSite => {
473       let edit_site: EditSite = serde_json::from_str(data)?;
474       let res = Oper::new(user_operation, edit_site).perform()?;
475       Ok(serde_json::to_string(&res)?)
476     },
477     UserOperation::GetSite => {
478       let get_site: GetSite = serde_json::from_str(data)?;
479       let res = Oper::new(user_operation, get_site).perform()?;
480       Ok(serde_json::to_string(&res)?)
481     },
482     UserOperation::Search => {
483       let search: Search = serde_json::from_str(data)?;
484       let res = Oper::new(user_operation, search).perform()?;
485       Ok(serde_json::to_string(&res)?)
486     },
487   }
488 }