]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Fixing crash on unknown op type.
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
11 use failure::Error;
12 use std::time::{SystemTime};
13
14 use crate::api::*;
15 use crate::api::user::*;
16 use crate::api::community::*;
17 use crate::api::post::*;
18 use crate::api::comment::*;
19 use crate::api::site::*;
20
21 const RATE_LIMIT_MESSAGES: i32 = 30;
22 const RATE_LIMIT_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_REGISTER_MESSAGES: i32 = 3;
24 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*3;
25
26
27 /// Chat server sends this messages to session
28 #[derive(Message)]
29 pub struct WSMessage(pub String);
30
31 /// Message for chat server communications
32
33 /// New chat session is created
34 #[derive(Message)]
35 #[rtype(usize)]
36 pub struct Connect {
37   pub addr: Recipient<WSMessage>,
38   pub ip: String,
39 }
40
41 /// Session is disconnected
42 #[derive(Message)]
43 pub struct Disconnect {
44   pub id: usize,
45   pub ip: String,
46 }
47
48 /// Send message to specific room
49 #[derive(Message)]
50 pub struct ClientMessage {
51   /// Id of the client session
52   pub id: usize,
53   /// Peer message
54   pub msg: String,
55   /// Room name
56   pub room: String,
57 }
58
59 #[derive(Serialize, Deserialize)]
60 pub struct StandardMessage {
61   /// Id of the client session
62   pub id: usize,
63   /// Peer message
64   pub msg: String,
65 }
66
67 impl actix::Message for StandardMessage {
68   type Result = String;
69 }
70
71 #[derive(Debug)]
72 pub struct RateLimitBucket {
73   last_checked: SystemTime,
74   allowance: f64
75 }
76
77 pub struct SessionInfo {
78   pub addr: Recipient<WSMessage>,
79   pub ip: String,
80 }
81
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session. implementation is super primitive
84 pub struct ChatServer {
85   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
86   rate_limits: HashMap<String, RateLimitBucket>,
87   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
88   rng: ThreadRng,
89 }
90
91 impl Default for ChatServer {
92   fn default() -> ChatServer {
93     // default room
94     let rooms = HashMap::new();
95
96     ChatServer {
97       sessions: HashMap::new(),
98       rate_limits: HashMap::new(),
99       rooms: rooms,
100       rng: rand::thread_rng(),
101     }
102   }
103 }
104
105 impl ChatServer {
106   /// Send message to all users in the room
107   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
108     if let Some(sessions) = self.rooms.get(room) {
109       for id in sessions {
110         if *id != skip_id {
111           if let Some(info) = self.sessions.get(id) {
112             let _ = info.addr.do_send(WSMessage(message.to_owned()));
113           }
114         }
115       }
116     }
117   }
118
119   fn join_room(&mut self, room_id: i32, id: usize) {
120     // remove session from all rooms
121     for (_n, sessions) in &mut self.rooms {
122       sessions.remove(&id);
123     }
124
125     // If the room doesn't exist yet
126     if self.rooms.get_mut(&room_id).is_none() {
127       self.rooms.insert(room_id, HashSet::new());
128     }
129
130     &self.rooms.get_mut(&room_id).unwrap().insert(id);
131   }
132
133   fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
134     use crate::db::*;
135     use crate::db::post_view::*;
136     let conn = establish_connection();
137     let posts = PostView::list(
138       &conn,
139       PostListingType::Community, 
140       &SortType::New, 
141       Some(*community_id), 
142       None,
143       None, 
144       None,
145       None,
146       false,
147       false,
148       false,
149       None,
150       Some(9999))?;
151     for post in posts {
152       self.send_room_message(&post.id, message, skip_id);
153     }
154
155     Ok(())
156   }
157
158   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
159     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
160   }
161
162   fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
163     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
164   }
165
166   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
167     if let Some(info) = self.sessions.get(&id) {
168       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
169         // The initial value
170         if rate_limit.allowance == -2f64 {
171           rate_limit.allowance = rate as f64;
172         };
173
174         let current = SystemTime::now();
175         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
176         rate_limit.last_checked = current;
177         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
178         if rate_limit.allowance > rate as f64 {
179           rate_limit.allowance = rate as f64;
180         }
181
182         if rate_limit.allowance < 1.0 {
183           println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
184           Err(APIError {
185             op: "Rate Limit".to_string(), 
186             message: format!("Too many requests. {} per {} seconds", rate, per),
187           })?
188         } else {
189           rate_limit.allowance -= 1.0;
190           Ok(())
191         }
192       } else {
193         Ok(())
194       }
195     } else {
196       Ok(())
197     }
198   }
199 }
200
201
202 /// Make actor from `ChatServer`
203 impl Actor for ChatServer {
204   /// We are going to use simple Context, we just need ability to communicate
205   /// with other actors.
206   type Context = Context<Self>;
207 }
208
209 /// Handler for Connect message.
210 ///
211 /// Register new session and assign unique id to this session
212 impl Handler<Connect> for ChatServer {
213   type Result = usize;
214
215   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
216
217     // notify all users in same room
218     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
219
220     // register session with random id
221     let id = self.rng.gen::<usize>();
222     println!("{} joined", &msg.ip);
223
224     self.sessions.insert(id, SessionInfo {
225       addr: msg.addr,
226       ip: msg.ip.to_owned(),
227     });
228
229     if self.rate_limits.get(&msg.ip).is_none() {
230       self.rate_limits.insert(msg.ip, RateLimitBucket {
231         last_checked: SystemTime::now(),
232         allowance: -2f64,
233       });
234     }
235
236     id
237   }
238 }
239
240 /// Handler for Disconnect message.
241 impl Handler<Disconnect> for ChatServer {
242   type Result = ();
243
244   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
245
246     // let mut rooms: Vec<i32> = Vec::new();
247
248     // remove address
249     if self.sessions.remove(&msg.id).is_some() {
250       // remove session from all rooms
251       for (_id, sessions) in &mut self.rooms {
252         if sessions.remove(&msg.id) {
253           // rooms.push(*id);
254         }
255       }
256     }
257   }
258 }
259
260 /// Handler for Message message.
261 impl Handler<StandardMessage> for ChatServer {
262   type Result = MessageResult<StandardMessage>;
263
264
265   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
266
267     let msg_out = match parse_json_message(self, msg) {
268       Ok(m) => m,
269       Err(e) => e.to_string()
270     };
271
272     MessageResult(msg_out)
273   }
274 }
275
276 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
277
278   let json: Value = serde_json::from_str(&msg.msg)?;
279   let data = &json["data"].to_string();
280   let op = &json["op"].as_str().ok_or(APIError {
281     op: "Unknown op type".to_string(), 
282     message: format!("Unknown op type"),
283   })?;
284
285   let user_operation: UserOperation = UserOperation::from_str(&op)?;
286
287   match user_operation {
288     UserOperation::Login => {
289       let login: Login = serde_json::from_str(data)?;
290       let res = Oper::new(user_operation, login).perform()?;
291       Ok(serde_json::to_string(&res)?)
292     },
293     UserOperation::Register => {
294       chat.check_rate_limit_register(msg.id)?;
295       let register: Register = serde_json::from_str(data)?;
296       let res = Oper::new(user_operation, register).perform()?;
297       Ok(serde_json::to_string(&res)?)
298     },
299     UserOperation::GetUserDetails => {
300       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
301       let res = Oper::new(user_operation, get_user_details).perform()?;
302       Ok(serde_json::to_string(&res)?)
303     },
304     UserOperation::SaveUserSettings => {
305       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
306       let res = Oper::new(user_operation, save_user_settings).perform()?;
307       Ok(serde_json::to_string(&res)?)
308     },
309     UserOperation::AddAdmin => {
310       let add_admin: AddAdmin = serde_json::from_str(data)?;
311       let res = Oper::new(user_operation, add_admin).perform()?;
312       Ok(serde_json::to_string(&res)?)
313     },
314     UserOperation::BanUser => {
315       let ban_user: BanUser = serde_json::from_str(data)?;
316       let res = Oper::new(user_operation, ban_user).perform()?;
317       Ok(serde_json::to_string(&res)?)
318     },
319     UserOperation::GetReplies => {
320       let get_replies: GetReplies = serde_json::from_str(data)?;
321       let res = Oper::new(user_operation, get_replies).perform()?;
322       Ok(serde_json::to_string(&res)?)
323     },
324     UserOperation::MarkAllAsRead => {
325       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
326       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
327       Ok(serde_json::to_string(&res)?)
328     },
329     UserOperation::GetCommunity => {
330       let get_community: GetCommunity = serde_json::from_str(data)?;
331       let res = Oper::new(user_operation, get_community).perform()?;
332       Ok(serde_json::to_string(&res)?)
333     },
334     UserOperation::ListCommunities => {
335       let list_communities: ListCommunities = serde_json::from_str(data)?;
336       let res = Oper::new(user_operation, list_communities).perform()?;
337       Ok(serde_json::to_string(&res)?)
338     },
339     UserOperation::CreateCommunity => {
340       chat.check_rate_limit_register(msg.id)?;
341       let create_community: CreateCommunity = serde_json::from_str(data)?;
342       let res = Oper::new(user_operation, create_community).perform()?;
343       Ok(serde_json::to_string(&res)?)
344     },
345     UserOperation::EditCommunity => {
346       let edit_community: EditCommunity = serde_json::from_str(data)?;
347       let res = Oper::new(user_operation, edit_community).perform()?;
348       let mut community_sent: CommunityResponse = res.clone();
349       community_sent.community.user_id = None;
350       community_sent.community.subscribed = None;
351       let community_sent_str = serde_json::to_string(&community_sent)?;
352       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
353       Ok(serde_json::to_string(&res)?)
354     },
355     UserOperation::FollowCommunity => {
356       let follow_community: FollowCommunity = serde_json::from_str(data)?;
357       let res = Oper::new(user_operation, follow_community).perform()?;
358       Ok(serde_json::to_string(&res)?)
359     },
360     UserOperation::GetFollowedCommunities => {
361       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
362       let res = Oper::new(user_operation, followed_communities).perform()?;
363       Ok(serde_json::to_string(&res)?)
364     },
365     UserOperation::BanFromCommunity => {
366       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
367       let community_id = ban_from_community.community_id;
368       let res = Oper::new(user_operation, ban_from_community).perform()?;
369       let res_str = serde_json::to_string(&res)?;
370       chat.send_community_message(&community_id, &res_str, msg.id)?;
371       Ok(res_str)
372     },
373     UserOperation::AddModToCommunity => {
374       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
375       let community_id = mod_add_to_community.community_id;
376       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
377       let res_str = serde_json::to_string(&res)?;
378       chat.send_community_message(&community_id, &res_str, msg.id)?;
379       Ok(res_str)
380     },
381     UserOperation::ListCategories => {
382       let list_categories: ListCategories = ListCategories;
383       let res = Oper::new(user_operation, list_categories).perform()?;
384       Ok(serde_json::to_string(&res)?)
385     },
386     UserOperation::CreatePost => {
387       chat.check_rate_limit_register(msg.id)?;
388       let create_post: CreatePost = serde_json::from_str(data)?;
389       let res = Oper::new(user_operation, create_post).perform()?;
390       Ok(serde_json::to_string(&res)?)
391     },
392     UserOperation::GetPost => {
393       let get_post: GetPost = serde_json::from_str(data)?;
394       chat.join_room(get_post.id, msg.id);
395       let res = Oper::new(user_operation, get_post).perform()?;
396       Ok(serde_json::to_string(&res)?)
397     },
398     UserOperation::GetPosts => {
399       let get_posts: GetPosts = serde_json::from_str(data)?;
400       let res = Oper::new(user_operation, get_posts).perform()?;
401       Ok(serde_json::to_string(&res)?)
402     },
403     UserOperation::CreatePostLike => {
404       chat.check_rate_limit(msg.id)?;
405       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
406       let res = Oper::new(user_operation, create_post_like).perform()?;
407       Ok(serde_json::to_string(&res)?)
408     },
409     UserOperation::EditPost => {
410       let edit_post: EditPost = serde_json::from_str(data)?;
411       let res = Oper::new(user_operation, edit_post).perform()?;
412       let mut post_sent = res.clone();
413       post_sent.post.my_vote = None;
414       let post_sent_str = serde_json::to_string(&post_sent)?;
415       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
416       Ok(serde_json::to_string(&res)?)
417     },
418     UserOperation::SavePost => {
419       let save_post: SavePost = serde_json::from_str(data)?;
420       let res = Oper::new(user_operation, save_post).perform()?;
421       Ok(serde_json::to_string(&res)?)
422     },
423     UserOperation::CreateComment => {
424       chat.check_rate_limit(msg.id)?;
425       let create_comment: CreateComment = serde_json::from_str(data)?;
426       let post_id = create_comment.post_id;
427       let res = Oper::new(user_operation, create_comment).perform()?;
428       let mut comment_sent = res.clone();
429       comment_sent.comment.my_vote = None;
430       comment_sent.comment.user_id = None;
431       let comment_sent_str = serde_json::to_string(&comment_sent)?;
432       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
433       Ok(serde_json::to_string(&res)?)
434     },
435     UserOperation::EditComment => {
436       let edit_comment: EditComment = serde_json::from_str(data)?;
437       let post_id = edit_comment.post_id;
438       let res = Oper::new(user_operation, edit_comment).perform()?;
439       let mut comment_sent = res.clone();
440       comment_sent.comment.my_vote = None;
441       comment_sent.comment.user_id = None;
442       let comment_sent_str = serde_json::to_string(&comment_sent)?;
443       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
444       Ok(serde_json::to_string(&res)?)
445     },
446     UserOperation::SaveComment => {
447       let save_comment: SaveComment = serde_json::from_str(data)?;
448       let res = Oper::new(user_operation, save_comment).perform()?;
449       Ok(serde_json::to_string(&res)?)
450     },
451     UserOperation::CreateCommentLike => {
452       chat.check_rate_limit(msg.id)?;
453       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
454       let post_id = create_comment_like.post_id;
455       let res = Oper::new(user_operation, create_comment_like).perform()?;
456       let mut comment_sent = res.clone();
457       comment_sent.comment.my_vote = None;
458       comment_sent.comment.user_id = None;
459       let comment_sent_str = serde_json::to_string(&comment_sent)?;
460       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
461       Ok(serde_json::to_string(&res)?)
462     },
463     UserOperation::GetModlog => {
464       let get_modlog: GetModlog = serde_json::from_str(data)?;
465       let res = Oper::new(user_operation, get_modlog).perform()?;
466       Ok(serde_json::to_string(&res)?)
467     },
468     UserOperation::CreateSite => {
469       let create_site: CreateSite = serde_json::from_str(data)?;
470       let res = Oper::new(user_operation, create_site).perform()?;
471       Ok(serde_json::to_string(&res)?)
472     },
473     UserOperation::EditSite => {
474       let edit_site: EditSite = serde_json::from_str(data)?;
475       let res = Oper::new(user_operation, edit_site).perform()?;
476       Ok(serde_json::to_string(&res)?)
477     },
478     UserOperation::GetSite => {
479       let get_site: GetSite = serde_json::from_str(data)?;
480       let res = Oper::new(user_operation, get_site).perform()?;
481       Ok(serde_json::to_string(&res)?)
482     },
483     UserOperation::Search => {
484       let search: Search = serde_json::from_str(data)?;
485       let res = Oper::new(user_operation, search).perform()?;
486       Ok(serde_json::to_string(&res)?)
487     },
488     UserOperation::TransferCommunity => {
489       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
490       let res = Oper::new(user_operation, transfer_community).perform()?;
491       Ok(serde_json::to_string(&res)?)
492     },
493     UserOperation::TransferSite => {
494       let transfer_site: TransferSite = serde_json::from_str(data)?;
495       let res = Oper::new(user_operation, transfer_site).perform()?;
496       Ok(serde_json::to_string(&res)?)
497     },
498   }
499 }