]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Upping post timeout.
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20
21 const RATE_LIMIT_MESSAGE: i32 = 30;
22 const RATE_LIMIT_MESSAGES_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_POST: i32 = 3;
24 const RATE_LIMIT_POSTS_PER_SECOND: i32 = 60 * 10;
25 const RATE_LIMIT_REGISTER: i32 = 1;
26 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60 * 60;
27
28 /// Chat server sends this messages to session
29 #[derive(Message)]
30 pub struct WSMessage(pub String);
31
32 /// Message for chat server communications
33
34 /// New chat session is created
35 #[derive(Message)]
36 #[rtype(usize)]
37 pub struct Connect {
38   pub addr: Recipient<WSMessage>,
39   pub ip: String,
40 }
41
42 /// Session is disconnected
43 #[derive(Message)]
44 pub struct Disconnect {
45   pub id: usize,
46   pub ip: String,
47 }
48
49 /// Send message to specific room
50 #[derive(Message)]
51 pub struct ClientMessage {
52   /// Id of the client session
53   pub id: usize,
54   /// Peer message
55   pub msg: String,
56   /// Room name
57   pub room: String,
58 }
59
60 #[derive(Serialize, Deserialize)]
61 pub struct StandardMessage {
62   /// Id of the client session
63   pub id: usize,
64   /// Peer message
65   pub msg: String,
66 }
67
68 impl actix::Message for StandardMessage {
69   type Result = String;
70 }
71
72 #[derive(Debug)]
73 pub struct RateLimitBucket {
74   last_checked: SystemTime,
75   allowance: f64,
76 }
77
78 pub struct SessionInfo {
79   pub addr: Recipient<WSMessage>,
80   pub ip: String,
81 }
82
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
84 /// session. implementation is super primitive
85 pub struct ChatServer {
86   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
87   rate_limits: HashMap<String, RateLimitBucket>,
88   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
89   rng: ThreadRng,
90 }
91
92 impl Default for ChatServer {
93   fn default() -> ChatServer {
94     // default room
95     let rooms = HashMap::new();
96
97     ChatServer {
98       sessions: HashMap::new(),
99       rate_limits: HashMap::new(),
100       rooms: rooms,
101       rng: rand::thread_rng(),
102     }
103   }
104 }
105
106 impl ChatServer {
107   /// Send message to all users in the room
108   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
109     if let Some(sessions) = self.rooms.get(room) {
110       for id in sessions {
111         if *id != skip_id {
112           if let Some(info) = self.sessions.get(id) {
113             let _ = info.addr.do_send(WSMessage(message.to_owned()));
114           }
115         }
116       }
117     }
118   }
119
120   fn join_room(&mut self, room_id: i32, id: usize) {
121     // remove session from all rooms
122     for (_n, sessions) in &mut self.rooms {
123       sessions.remove(&id);
124     }
125
126     // If the room doesn't exist yet
127     if self.rooms.get_mut(&room_id).is_none() {
128       self.rooms.insert(room_id, HashSet::new());
129     }
130
131     &self.rooms.get_mut(&room_id).unwrap().insert(id);
132   }
133
134   fn send_community_message(
135     &self,
136     community_id: &i32,
137     message: &str,
138     skip_id: usize,
139   ) -> Result<(), Error> {
140     use crate::db::post_view::*;
141     use crate::db::*;
142     let conn = establish_connection();
143     let posts = PostView::list(
144       &conn,
145       PostListingType::Community,
146       &SortType::New,
147       Some(*community_id),
148       None,
149       None,
150       None,
151       None,
152       false,
153       false,
154       false,
155       None,
156       Some(9999),
157     )?;
158     for post in posts {
159       self.send_room_message(&post.id, message, skip_id);
160     }
161
162     Ok(())
163   }
164
165   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
166     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER, RATE_LIMIT_REGISTER_PER_SECOND)
167   }
168
169   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
170     self.check_rate_limit_full(id, RATE_LIMIT_POST, RATE_LIMIT_POSTS_PER_SECOND)
171   }
172
173   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
174     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGE, RATE_LIMIT_MESSAGES_PER_SECOND)
175   }
176
177   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
178     if let Some(info) = self.sessions.get(&id) {
179       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
180         // The initial value
181         if rate_limit.allowance == -2f64 {
182           rate_limit.allowance = rate as f64;
183         };
184
185         let current = SystemTime::now();
186         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
187         rate_limit.last_checked = current;
188         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
189         if rate_limit.allowance > rate as f64 {
190           rate_limit.allowance = rate as f64;
191         }
192
193         if rate_limit.allowance < 1.0 {
194           println!(
195             "Rate limited IP: {}, time_passed: {}, allowance: {}",
196             &info.ip, time_passed, rate_limit.allowance
197           );
198           Err(APIError {
199             op: "Rate Limit".to_string(),
200             message: format!("Too many requests. {} per {} seconds", rate, per),
201           })?
202         } else {
203           rate_limit.allowance -= 1.0;
204           Ok(())
205         }
206       } else {
207         Ok(())
208       }
209     } else {
210       Ok(())
211     }
212   }
213 }
214
215 /// Make actor from `ChatServer`
216 impl Actor for ChatServer {
217   /// We are going to use simple Context, we just need ability to communicate
218   /// with other actors.
219   type Context = Context<Self>;
220 }
221
222 /// Handler for Connect message.
223 ///
224 /// Register new session and assign unique id to this session
225 impl Handler<Connect> for ChatServer {
226   type Result = usize;
227
228   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
229     // notify all users in same room
230     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
231
232     // register session with random id
233     let id = self.rng.gen::<usize>();
234     println!("{} joined", &msg.ip);
235
236     self.sessions.insert(
237       id,
238       SessionInfo {
239         addr: msg.addr,
240         ip: msg.ip.to_owned(),
241       },
242     );
243
244     if self.rate_limits.get(&msg.ip).is_none() {
245       self.rate_limits.insert(
246         msg.ip,
247         RateLimitBucket {
248           last_checked: SystemTime::now(),
249           allowance: -2f64,
250         },
251       );
252     }
253
254     id
255   }
256 }
257
258 /// Handler for Disconnect message.
259 impl Handler<Disconnect> for ChatServer {
260   type Result = ();
261
262   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
263     // let mut rooms: Vec<i32> = Vec::new();
264
265     // remove address
266     if self.sessions.remove(&msg.id).is_some() {
267       // remove session from all rooms
268       for (_id, sessions) in &mut self.rooms {
269         if sessions.remove(&msg.id) {
270           // rooms.push(*id);
271         }
272       }
273     }
274   }
275 }
276
277 /// Handler for Message message.
278 impl Handler<StandardMessage> for ChatServer {
279   type Result = MessageResult<StandardMessage>;
280
281   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
282     let msg_out = match parse_json_message(self, msg) {
283       Ok(m) => m,
284       Err(e) => e.to_string(),
285     };
286
287     MessageResult(msg_out)
288   }
289 }
290
291 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
292   let json: Value = serde_json::from_str(&msg.msg)?;
293   let data = &json["data"].to_string();
294   let op = &json["op"].as_str().ok_or(APIError {
295     op: "Unknown op type".to_string(),
296     message: format!("Unknown op type"),
297   })?;
298
299   let user_operation: UserOperation = UserOperation::from_str(&op)?;
300
301   match user_operation {
302     UserOperation::Login => {
303       let login: Login = serde_json::from_str(data)?;
304       let res = Oper::new(user_operation, login).perform()?;
305       Ok(serde_json::to_string(&res)?)
306     }
307     UserOperation::Register => {
308       chat.check_rate_limit_register(msg.id)?;
309       let register: Register = serde_json::from_str(data)?;
310       let res = Oper::new(user_operation, register).perform()?;
311       Ok(serde_json::to_string(&res)?)
312     }
313     UserOperation::GetUserDetails => {
314       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
315       let res = Oper::new(user_operation, get_user_details).perform()?;
316       Ok(serde_json::to_string(&res)?)
317     }
318     UserOperation::SaveUserSettings => {
319       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
320       let res = Oper::new(user_operation, save_user_settings).perform()?;
321       Ok(serde_json::to_string(&res)?)
322     }
323     UserOperation::AddAdmin => {
324       let add_admin: AddAdmin = serde_json::from_str(data)?;
325       let res = Oper::new(user_operation, add_admin).perform()?;
326       Ok(serde_json::to_string(&res)?)
327     }
328     UserOperation::BanUser => {
329       let ban_user: BanUser = serde_json::from_str(data)?;
330       let res = Oper::new(user_operation, ban_user).perform()?;
331       Ok(serde_json::to_string(&res)?)
332     }
333     UserOperation::GetReplies => {
334       let get_replies: GetReplies = serde_json::from_str(data)?;
335       let res = Oper::new(user_operation, get_replies).perform()?;
336       Ok(serde_json::to_string(&res)?)
337     }
338     UserOperation::MarkAllAsRead => {
339       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
340       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
341       Ok(serde_json::to_string(&res)?)
342     }
343     UserOperation::GetCommunity => {
344       let get_community: GetCommunity = serde_json::from_str(data)?;
345       let res = Oper::new(user_operation, get_community).perform()?;
346       Ok(serde_json::to_string(&res)?)
347     }
348     UserOperation::ListCommunities => {
349       let list_communities: ListCommunities = serde_json::from_str(data)?;
350       let res = Oper::new(user_operation, list_communities).perform()?;
351       Ok(serde_json::to_string(&res)?)
352     }
353     UserOperation::CreateCommunity => {
354       chat.check_rate_limit_register(msg.id)?;
355       let create_community: CreateCommunity = serde_json::from_str(data)?;
356       let res = Oper::new(user_operation, create_community).perform()?;
357       Ok(serde_json::to_string(&res)?)
358     }
359     UserOperation::EditCommunity => {
360       let edit_community: EditCommunity = serde_json::from_str(data)?;
361       let res = Oper::new(user_operation, edit_community).perform()?;
362       let mut community_sent: CommunityResponse = res.clone();
363       community_sent.community.user_id = None;
364       community_sent.community.subscribed = None;
365       let community_sent_str = serde_json::to_string(&community_sent)?;
366       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
367       Ok(serde_json::to_string(&res)?)
368     }
369     UserOperation::FollowCommunity => {
370       let follow_community: FollowCommunity = serde_json::from_str(data)?;
371       let res = Oper::new(user_operation, follow_community).perform()?;
372       Ok(serde_json::to_string(&res)?)
373     }
374     UserOperation::GetFollowedCommunities => {
375       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
376       let res = Oper::new(user_operation, followed_communities).perform()?;
377       Ok(serde_json::to_string(&res)?)
378     }
379     UserOperation::BanFromCommunity => {
380       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
381       let community_id = ban_from_community.community_id;
382       let res = Oper::new(user_operation, ban_from_community).perform()?;
383       let res_str = serde_json::to_string(&res)?;
384       chat.send_community_message(&community_id, &res_str, msg.id)?;
385       Ok(res_str)
386     }
387     UserOperation::AddModToCommunity => {
388       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
389       let community_id = mod_add_to_community.community_id;
390       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
391       let res_str = serde_json::to_string(&res)?;
392       chat.send_community_message(&community_id, &res_str, msg.id)?;
393       Ok(res_str)
394     }
395     UserOperation::ListCategories => {
396       let list_categories: ListCategories = ListCategories;
397       let res = Oper::new(user_operation, list_categories).perform()?;
398       Ok(serde_json::to_string(&res)?)
399     }
400     UserOperation::CreatePost => {
401       chat.check_rate_limit_post(msg.id)?;
402       let create_post: CreatePost = serde_json::from_str(data)?;
403       let res = Oper::new(user_operation, create_post).perform()?;
404       Ok(serde_json::to_string(&res)?)
405     }
406     UserOperation::GetPost => {
407       let get_post: GetPost = serde_json::from_str(data)?;
408       chat.join_room(get_post.id, msg.id);
409       let res = Oper::new(user_operation, get_post).perform()?;
410       Ok(serde_json::to_string(&res)?)
411     }
412     UserOperation::GetPosts => {
413       let get_posts: GetPosts = serde_json::from_str(data)?;
414       let res = Oper::new(user_operation, get_posts).perform()?;
415       Ok(serde_json::to_string(&res)?)
416     }
417     UserOperation::CreatePostLike => {
418       chat.check_rate_limit_message(msg.id)?;
419       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
420       let res = Oper::new(user_operation, create_post_like).perform()?;
421       Ok(serde_json::to_string(&res)?)
422     }
423     UserOperation::EditPost => {
424       let edit_post: EditPost = serde_json::from_str(data)?;
425       let res = Oper::new(user_operation, edit_post).perform()?;
426       let mut post_sent = res.clone();
427       post_sent.post.my_vote = None;
428       let post_sent_str = serde_json::to_string(&post_sent)?;
429       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
430       Ok(serde_json::to_string(&res)?)
431     }
432     UserOperation::SavePost => {
433       let save_post: SavePost = serde_json::from_str(data)?;
434       let res = Oper::new(user_operation, save_post).perform()?;
435       Ok(serde_json::to_string(&res)?)
436     }
437     UserOperation::CreateComment => {
438       chat.check_rate_limit_message(msg.id)?;
439       let create_comment: CreateComment = serde_json::from_str(data)?;
440       let post_id = create_comment.post_id;
441       let res = Oper::new(user_operation, create_comment).perform()?;
442       let mut comment_sent = res.clone();
443       comment_sent.comment.my_vote = None;
444       comment_sent.comment.user_id = None;
445       let comment_sent_str = serde_json::to_string(&comment_sent)?;
446       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
447       Ok(serde_json::to_string(&res)?)
448     }
449     UserOperation::EditComment => {
450       let edit_comment: EditComment = serde_json::from_str(data)?;
451       let post_id = edit_comment.post_id;
452       let res = Oper::new(user_operation, edit_comment).perform()?;
453       let mut comment_sent = res.clone();
454       comment_sent.comment.my_vote = None;
455       comment_sent.comment.user_id = None;
456       let comment_sent_str = serde_json::to_string(&comment_sent)?;
457       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
458       Ok(serde_json::to_string(&res)?)
459     }
460     UserOperation::SaveComment => {
461       let save_comment: SaveComment = serde_json::from_str(data)?;
462       let res = Oper::new(user_operation, save_comment).perform()?;
463       Ok(serde_json::to_string(&res)?)
464     }
465     UserOperation::CreateCommentLike => {
466       chat.check_rate_limit_message(msg.id)?;
467       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
468       let post_id = create_comment_like.post_id;
469       let res = Oper::new(user_operation, create_comment_like).perform()?;
470       let mut comment_sent = res.clone();
471       comment_sent.comment.my_vote = None;
472       comment_sent.comment.user_id = None;
473       let comment_sent_str = serde_json::to_string(&comment_sent)?;
474       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
475       Ok(serde_json::to_string(&res)?)
476     }
477     UserOperation::GetModlog => {
478       let get_modlog: GetModlog = serde_json::from_str(data)?;
479       let res = Oper::new(user_operation, get_modlog).perform()?;
480       Ok(serde_json::to_string(&res)?)
481     }
482     UserOperation::CreateSite => {
483       let create_site: CreateSite = serde_json::from_str(data)?;
484       let res = Oper::new(user_operation, create_site).perform()?;
485       Ok(serde_json::to_string(&res)?)
486     }
487     UserOperation::EditSite => {
488       let edit_site: EditSite = serde_json::from_str(data)?;
489       let res = Oper::new(user_operation, edit_site).perform()?;
490       Ok(serde_json::to_string(&res)?)
491     }
492     UserOperation::GetSite => {
493       let get_site: GetSite = serde_json::from_str(data)?;
494       let res = Oper::new(user_operation, get_site).perform()?;
495       Ok(serde_json::to_string(&res)?)
496     }
497     UserOperation::Search => {
498       let search: Search = serde_json::from_str(data)?;
499       let res = Oper::new(user_operation, search).perform()?;
500       Ok(serde_json::to_string(&res)?)
501     }
502     UserOperation::TransferCommunity => {
503       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
504       let res = Oper::new(user_operation, transfer_community).perform()?;
505       Ok(serde_json::to_string(&res)?)
506     }
507     UserOperation::TransferSite => {
508       let transfer_site: TransferSite = serde_json::from_str(data)?;
509       let res = Oper::new(user_operation, transfer_site).perform()?;
510       Ok(serde_json::to_string(&res)?)
511     }
512   }
513 }