]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'reorg' into dev
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
11 use failure::Error;
12 use std::time::{SystemTime};
13
14 use api::*;
15 use api::user::*;
16 use api::community::*;
17 use api::post::*;
18 use api::comment::*;
19 use api::site::*;
20
21 const RATE_LIMIT_MESSAGES: i32 = 30;
22 const RATE_LIMIT_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_REGISTER_MESSAGES: i32 = 1;
24 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60;
25
26
27 /// Chat server sends this messages to session
28 #[derive(Message)]
29 pub struct WSMessage(pub String);
30
31 /// Message for chat server communications
32
33 /// New chat session is created
34 #[derive(Message)]
35 #[rtype(usize)]
36 pub struct Connect {
37   pub addr: Recipient<WSMessage>,
38   pub ip: String,
39 }
40
41 /// Session is disconnected
42 #[derive(Message)]
43 pub struct Disconnect {
44   pub id: usize,
45   pub ip: String,
46 }
47
48 /// Send message to specific room
49 #[derive(Message)]
50 pub struct ClientMessage {
51   /// Id of the client session
52   pub id: usize,
53   /// Peer message
54   pub msg: String,
55   /// Room name
56   pub room: String,
57 }
58
59 #[derive(Serialize, Deserialize)]
60 pub struct StandardMessage {
61   /// Id of the client session
62   pub id: usize,
63   /// Peer message
64   pub msg: String,
65 }
66
67 impl actix::Message for StandardMessage {
68   type Result = String;
69 }
70
71 #[derive(Debug)]
72 pub struct RateLimitBucket {
73   last_checked: SystemTime,
74   allowance: f64
75 }
76
77 pub struct SessionInfo {
78   pub addr: Recipient<WSMessage>,
79   pub ip: String,
80 }
81
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session. implementation is super primitive
84 pub struct ChatServer {
85   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
86   rate_limits: HashMap<String, RateLimitBucket>,
87   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
88   rng: ThreadRng,
89 }
90
91 impl Default for ChatServer {
92   fn default() -> ChatServer {
93     // default room
94     let rooms = HashMap::new();
95
96     ChatServer {
97       sessions: HashMap::new(),
98       rate_limits: HashMap::new(),
99       rooms: rooms,
100       rng: rand::thread_rng(),
101     }
102   }
103 }
104
105 impl ChatServer {
106   /// Send message to all users in the room
107   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
108     if let Some(sessions) = self.rooms.get(room) {
109       for id in sessions {
110         if *id != skip_id {
111           if let Some(info) = self.sessions.get(id) {
112             let _ = info.addr.do_send(WSMessage(message.to_owned()));
113           }
114         }
115       }
116     }
117   }
118
119   fn join_room(&mut self, room_id: i32, id: usize) {
120     // remove session from all rooms
121     for (_n, mut sessions) in &mut self.rooms {
122       sessions.remove(&id);
123     }
124
125     // If the room doesn't exist yet
126     if self.rooms.get_mut(&room_id).is_none() {
127       self.rooms.insert(room_id, HashSet::new());
128     }
129
130     &self.rooms.get_mut(&room_id).unwrap().insert(id);
131   }
132
133   fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
134     use db::*;
135     use db::post_view::*;
136     let conn = establish_connection();
137     let posts = PostView::list(&conn,
138                                PostListingType::Community, 
139                                &SortType::New, 
140                                Some(*community_id), 
141                                None,
142                                None, 
143                                None,
144                                false,
145                                false,
146                                None,
147                                Some(9999))?;
148     for post in posts {
149       self.send_room_message(&post.id, message, skip_id);
150     }
151
152     Ok(())
153   }
154
155   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
156     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
157   }
158
159   fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
160     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
161   }
162
163   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
164     if let Some(info) = self.sessions.get(&id) {
165       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
166         // The initial value
167         if rate_limit.allowance == -2f64 {
168           rate_limit.allowance = rate as f64;
169         };
170
171         let current = SystemTime::now();
172         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
173         rate_limit.last_checked = current;
174         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
175         if rate_limit.allowance > rate as f64 {
176           rate_limit.allowance = rate as f64;
177         }
178
179         if rate_limit.allowance < 1.0 {
180           println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
181           Err(APIError {
182             op: "Rate Limit".to_string(), 
183             message: format!("Too many requests. {} per {} seconds", rate, per),
184           })?
185         } else {
186           rate_limit.allowance -= 1.0;
187           Ok(())
188         }
189       } else {
190         Ok(())
191       }
192     } else {
193       Ok(())
194     }
195   }
196 }
197
198
199 /// Make actor from `ChatServer`
200 impl Actor for ChatServer {
201   /// We are going to use simple Context, we just need ability to communicate
202   /// with other actors.
203   type Context = Context<Self>;
204 }
205
206 /// Handler for Connect message.
207 ///
208 /// Register new session and assign unique id to this session
209 impl Handler<Connect> for ChatServer {
210   type Result = usize;
211
212   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
213
214     // notify all users in same room
215     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
216
217     // register session with random id
218     let id = self.rng.gen::<usize>();
219     println!("{} joined", &msg.ip);
220
221     self.sessions.insert(id, SessionInfo {
222       addr: msg.addr,
223       ip: msg.ip.to_owned(),
224     });
225
226     if self.rate_limits.get(&msg.ip).is_none() {
227       self.rate_limits.insert(msg.ip, RateLimitBucket {
228         last_checked: SystemTime::now(),
229         allowance: -2f64,
230       });
231     }
232
233     // for (k,v) in &self.rate_limits {
234     //   println!("{}: {:?}", k,v);
235     // }
236
237     // auto join session to Main room
238     // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
239
240     // send id back
241     id
242   }
243 }
244
245 /// Handler for Disconnect message.
246 impl Handler<Disconnect> for ChatServer {
247   type Result = ();
248
249   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
250
251     // let mut rooms: Vec<i32> = Vec::new();
252
253     // remove address
254     if self.sessions.remove(&msg.id).is_some() {
255       // remove session from all rooms
256       for (_id, sessions) in &mut self.rooms {
257         if sessions.remove(&msg.id) {
258           // rooms.push(*id);
259         }
260       }
261     }
262   }
263 }
264
265 /// Handler for Message message.
266 impl Handler<StandardMessage> for ChatServer {
267   type Result = MessageResult<StandardMessage>;
268
269
270   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
271
272     let msg_out = match parse_json_message(self, msg) {
273       Ok(m) => m,
274       Err(e) => e.to_string()
275     };
276
277     MessageResult(msg_out)
278   }
279 }
280
281 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
282
283   let json: Value = serde_json::from_str(&msg.msg)?;
284   let data = &json["data"].to_string();
285   let op = &json["op"].as_str().unwrap();
286
287   let user_operation: UserOperation = UserOperation::from_str(&op)?;
288
289   match user_operation {
290     UserOperation::Login => {
291       let login: Login = serde_json::from_str(data)?;
292       let res = Oper::new(user_operation, login).perform()?;
293       Ok(serde_json::to_string(&res)?)
294     },
295     UserOperation::Register => {
296       chat.check_rate_limit_register(msg.id)?;
297       let register: Register = serde_json::from_str(data)?;
298       let res = Oper::new(user_operation, register).perform()?;
299       Ok(serde_json::to_string(&res)?)
300     },
301     UserOperation::GetUserDetails => {
302       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
303       let res = Oper::new(user_operation, get_user_details).perform()?;
304       Ok(serde_json::to_string(&res)?)
305     },
306     UserOperation::AddAdmin => {
307       let add_admin: AddAdmin = serde_json::from_str(data)?;
308       let res = Oper::new(user_operation, add_admin).perform()?;
309       Ok(serde_json::to_string(&res)?)
310     },
311     UserOperation::BanUser => {
312       let ban_user: BanUser = serde_json::from_str(data)?;
313       let res = Oper::new(user_operation, ban_user).perform()?;
314       Ok(serde_json::to_string(&res)?)
315     },
316     UserOperation::GetReplies => {
317       let get_replies: GetReplies = serde_json::from_str(data)?;
318       let res = Oper::new(user_operation, get_replies).perform()?;
319       Ok(serde_json::to_string(&res)?)
320     },
321     UserOperation::MarkAllAsRead => {
322       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
323       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
324       Ok(serde_json::to_string(&res)?)
325     },
326     UserOperation::GetCommunity => {
327       let get_community: GetCommunity = serde_json::from_str(data)?;
328       let res = Oper::new(user_operation, get_community).perform()?;
329       Ok(serde_json::to_string(&res)?)
330     },
331     UserOperation::ListCommunities => {
332       let list_communities: ListCommunities = serde_json::from_str(data)?;
333       let res = Oper::new(user_operation, list_communities).perform()?;
334       Ok(serde_json::to_string(&res)?)
335     },
336     UserOperation::CreateCommunity => {
337       chat.check_rate_limit_register(msg.id)?;
338       let create_community: CreateCommunity = serde_json::from_str(data)?;
339       let res = Oper::new(user_operation, create_community).perform()?;
340       Ok(serde_json::to_string(&res)?)
341     },
342     UserOperation::EditCommunity => {
343       let edit_community: EditCommunity = serde_json::from_str(data)?;
344       let res = Oper::new(user_operation, edit_community).perform()?;
345       let mut community_sent: CommunityResponse = res.clone();
346       community_sent.community.user_id = None;
347       community_sent.community.subscribed = None;
348       let community_sent_str = serde_json::to_string(&community_sent)?;
349       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
350       Ok(serde_json::to_string(&res)?)
351     },
352     UserOperation::FollowCommunity => {
353       let follow_community: FollowCommunity = serde_json::from_str(data)?;
354       let res = Oper::new(user_operation, follow_community).perform()?;
355       Ok(serde_json::to_string(&res)?)
356     },
357     UserOperation::GetFollowedCommunities => {
358       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
359       let res = Oper::new(user_operation, followed_communities).perform()?;
360       Ok(serde_json::to_string(&res)?)
361     },
362     UserOperation::BanFromCommunity => {
363       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
364       let community_id = ban_from_community.community_id;
365       let res = Oper::new(user_operation, ban_from_community).perform()?;
366       let res_str = serde_json::to_string(&res)?;
367       chat.send_community_message(&community_id, &res_str, msg.id)?;
368       Ok(res_str)
369     },
370     UserOperation::AddModToCommunity => {
371       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
372       let community_id = mod_add_to_community.community_id;
373       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
374       let res_str = serde_json::to_string(&res)?;
375       chat.send_community_message(&community_id, &res_str, msg.id)?;
376       Ok(res_str)
377     },
378     UserOperation::ListCategories => {
379       let list_categories: ListCategories = ListCategories;
380       let res = Oper::new(user_operation, list_categories).perform()?;
381       Ok(serde_json::to_string(&res)?)
382     },
383     UserOperation::CreatePost => {
384       chat.check_rate_limit_register(msg.id)?;
385       let create_post: CreatePost = serde_json::from_str(data)?;
386       let res = Oper::new(user_operation, create_post).perform()?;
387       Ok(serde_json::to_string(&res)?)
388     },
389     UserOperation::GetPost => {
390       let get_post: GetPost = serde_json::from_str(data)?;
391       chat.join_room(get_post.id, msg.id);
392       let res = Oper::new(user_operation, get_post).perform()?;
393       Ok(serde_json::to_string(&res)?)
394     },
395     UserOperation::GetPosts => {
396       let get_posts: GetPosts = serde_json::from_str(data)?;
397       let res = Oper::new(user_operation, get_posts).perform()?;
398       Ok(serde_json::to_string(&res)?)
399     },
400     UserOperation::CreatePostLike => {
401       chat.check_rate_limit(msg.id)?;
402       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
403       let res = Oper::new(user_operation, create_post_like).perform()?;
404       Ok(serde_json::to_string(&res)?)
405     },
406     UserOperation::EditPost => {
407       let edit_post: EditPost = serde_json::from_str(data)?;
408       let res = Oper::new(user_operation, edit_post).perform()?;
409       let mut post_sent = res.clone();
410       post_sent.post.my_vote = None;
411       let post_sent_str = serde_json::to_string(&post_sent)?;
412       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
413       Ok(serde_json::to_string(&res)?)
414     },
415     UserOperation::SavePost => {
416       let save_post: SavePost = serde_json::from_str(data)?;
417       let res = Oper::new(user_operation, save_post).perform()?;
418       Ok(serde_json::to_string(&res)?)
419     },
420     UserOperation::CreateComment => {
421       chat.check_rate_limit(msg.id)?;
422       let create_comment: CreateComment = serde_json::from_str(data)?;
423       let post_id = create_comment.post_id;
424       let res = Oper::new(user_operation, create_comment).perform()?;
425       let mut comment_sent = res.clone();
426       comment_sent.comment.my_vote = None;
427       comment_sent.comment.user_id = None;
428       let comment_sent_str = serde_json::to_string(&comment_sent)?;
429       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
430       Ok(serde_json::to_string(&res)?)
431     },
432     UserOperation::EditComment => {
433       let edit_comment: EditComment = serde_json::from_str(data)?;
434       let post_id = edit_comment.post_id;
435       let res = Oper::new(user_operation, edit_comment).perform()?;
436       let mut comment_sent = res.clone();
437       comment_sent.comment.my_vote = None;
438       comment_sent.comment.user_id = None;
439       let comment_sent_str = serde_json::to_string(&comment_sent)?;
440       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
441       Ok(serde_json::to_string(&res)?)
442     },
443     UserOperation::SaveComment => {
444       let save_comment: SaveComment = serde_json::from_str(data)?;
445       let res = Oper::new(user_operation, save_comment).perform()?;
446       Ok(serde_json::to_string(&res)?)
447     },
448     UserOperation::CreateCommentLike => {
449       chat.check_rate_limit(msg.id)?;
450       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
451       let post_id = create_comment_like.post_id;
452       let res = Oper::new(user_operation, create_comment_like).perform()?;
453       let mut comment_sent = res.clone();
454       comment_sent.comment.my_vote = None;
455       comment_sent.comment.user_id = None;
456       let comment_sent_str = serde_json::to_string(&comment_sent)?;
457       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
458       Ok(serde_json::to_string(&res)?)
459     },
460     UserOperation::GetModlog => {
461       let get_modlog: GetModlog = serde_json::from_str(data)?;
462       let res = Oper::new(user_operation, get_modlog).perform()?;
463       Ok(serde_json::to_string(&res)?)
464     },
465     UserOperation::CreateSite => {
466       let create_site: CreateSite = serde_json::from_str(data)?;
467       let res = Oper::new(user_operation, create_site).perform()?;
468       Ok(serde_json::to_string(&res)?)
469     },
470     UserOperation::EditSite => {
471       let edit_site: EditSite = serde_json::from_str(data)?;
472       let res = Oper::new(user_operation, edit_site).perform()?;
473       Ok(serde_json::to_string(&res)?)
474     },
475     UserOperation::GetSite => {
476       let get_site: GetSite = serde_json::from_str(data)?;
477       let res = Oper::new(user_operation, get_site).perform()?;
478       Ok(serde_json::to_string(&res)?)
479     },
480     UserOperation::Search => {
481       let search: Search = serde_json::from_str(data)?;
482       let res = Oper::new(user_operation, search).perform()?;
483       Ok(serde_json::to_string(&res)?)
484     },
485   }
486 }