]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'dev'
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20
21 const RATE_LIMIT_MESSAGE: i32 = 30;
22 const RATE_LIMIT_MESSAGES_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_POST: i32 = 3;
24 const RATE_LIMIT_POSTS_PER_SECOND: i32 = 60 * 10;
25 const RATE_LIMIT_REGISTER: i32 = 1;
26 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60 * 60;
27
28 /// Chat server sends this messages to session
29 #[derive(Message)]
30 pub struct WSMessage(pub String);
31
32 /// Message for chat server communications
33
34 /// New chat session is created
35 #[derive(Message)]
36 #[rtype(usize)]
37 pub struct Connect {
38   pub addr: Recipient<WSMessage>,
39   pub ip: String,
40 }
41
42 /// Session is disconnected
43 #[derive(Message)]
44 pub struct Disconnect {
45   pub id: usize,
46   pub ip: String,
47 }
48
49 /// Send message to specific room
50 #[derive(Message)]
51 pub struct ClientMessage {
52   /// Id of the client session
53   pub id: usize,
54   /// Peer message
55   pub msg: String,
56   /// Room name
57   pub room: String,
58 }
59
60 #[derive(Serialize, Deserialize)]
61 pub struct StandardMessage {
62   /// Id of the client session
63   pub id: usize,
64   /// Peer message
65   pub msg: String,
66 }
67
68 impl actix::Message for StandardMessage {
69   type Result = String;
70 }
71
72 #[derive(Debug)]
73 pub struct RateLimitBucket {
74   last_checked: SystemTime,
75   allowance: f64,
76 }
77
78 pub struct SessionInfo {
79   pub addr: Recipient<WSMessage>,
80   pub ip: String,
81 }
82
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
84 /// session. implementation is super primitive
85 pub struct ChatServer {
86   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
87   rate_limits: HashMap<String, RateLimitBucket>,
88   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
89   rng: ThreadRng,
90 }
91
92 impl Default for ChatServer {
93   fn default() -> ChatServer {
94     // default room
95     let rooms = HashMap::new();
96
97     ChatServer {
98       sessions: HashMap::new(),
99       rate_limits: HashMap::new(),
100       rooms: rooms,
101       rng: rand::thread_rng(),
102     }
103   }
104 }
105
106 impl ChatServer {
107   /// Send message to all users in the room
108   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
109     if let Some(sessions) = self.rooms.get(room) {
110       for id in sessions {
111         if *id != skip_id {
112           if let Some(info) = self.sessions.get(id) {
113             let _ = info.addr.do_send(WSMessage(message.to_owned()));
114           }
115         }
116       }
117     }
118   }
119
120   fn join_room(&mut self, room_id: i32, id: usize) {
121     // remove session from all rooms
122     for (_n, sessions) in &mut self.rooms {
123       sessions.remove(&id);
124     }
125
126     // If the room doesn't exist yet
127     if self.rooms.get_mut(&room_id).is_none() {
128       self.rooms.insert(room_id, HashSet::new());
129     }
130
131     &self.rooms.get_mut(&room_id).unwrap().insert(id);
132   }
133
134   fn send_community_message(
135     &self,
136     community_id: &i32,
137     message: &str,
138     skip_id: usize,
139   ) -> Result<(), Error> {
140     use crate::db::post_view::*;
141     use crate::db::*;
142     let conn = establish_connection();
143     let posts = PostView::list(
144       &conn,
145       PostListingType::Community,
146       &SortType::New,
147       Some(*community_id),
148       None,
149       None,
150       None,
151       None,
152       false,
153       false,
154       false,
155       None,
156       Some(9999),
157     )?;
158     for post in posts {
159       self.send_room_message(&post.id, message, skip_id);
160     }
161
162     Ok(())
163   }
164
165   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
166     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER, RATE_LIMIT_REGISTER_PER_SECOND)
167   }
168
169   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
170     self.check_rate_limit_full(id, RATE_LIMIT_POST, RATE_LIMIT_POSTS_PER_SECOND)
171   }
172
173   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
174     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGE, RATE_LIMIT_MESSAGES_PER_SECOND)
175   }
176
177   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
178     if let Some(info) = self.sessions.get(&id) {
179       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
180         // The initial value
181         if rate_limit.allowance == -2f64 {
182           rate_limit.allowance = rate as f64;
183         };
184
185         let current = SystemTime::now();
186         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
187         rate_limit.last_checked = current;
188         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
189         if rate_limit.allowance > rate as f64 {
190           rate_limit.allowance = rate as f64;
191         }
192
193         if rate_limit.allowance < 1.0 {
194           println!(
195             "Rate limited IP: {}, time_passed: {}, allowance: {}",
196             &info.ip, time_passed, rate_limit.allowance
197           );
198           Err(APIError {
199             op: "Rate Limit".to_string(),
200             message: format!("Too many requests. {} per {} seconds", rate, per),
201           })?
202         } else {
203           rate_limit.allowance -= 1.0;
204           Ok(())
205         }
206       } else {
207         Ok(())
208       }
209     } else {
210       Ok(())
211     }
212   }
213 }
214
215 /// Make actor from `ChatServer`
216 impl Actor for ChatServer {
217   /// We are going to use simple Context, we just need ability to communicate
218   /// with other actors.
219   type Context = Context<Self>;
220 }
221
222 /// Handler for Connect message.
223 ///
224 /// Register new session and assign unique id to this session
225 impl Handler<Connect> for ChatServer {
226   type Result = usize;
227
228   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
229     // notify all users in same room
230     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
231
232     // register session with random id
233     let id = self.rng.gen::<usize>();
234     println!("{} joined", &msg.ip);
235
236     self.sessions.insert(
237       id,
238       SessionInfo {
239         addr: msg.addr,
240         ip: msg.ip.to_owned(),
241       },
242     );
243
244     if self.rate_limits.get(&msg.ip).is_none() {
245       self.rate_limits.insert(
246         msg.ip,
247         RateLimitBucket {
248           last_checked: SystemTime::now(),
249           allowance: -2f64,
250         },
251       );
252     }
253
254     id
255   }
256 }
257
258 /// Handler for Disconnect message.
259 impl Handler<Disconnect> for ChatServer {
260   type Result = ();
261
262   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
263     // let mut rooms: Vec<i32> = Vec::new();
264
265     // remove address
266     if self.sessions.remove(&msg.id).is_some() {
267       // remove session from all rooms
268       for (_id, sessions) in &mut self.rooms {
269         if sessions.remove(&msg.id) {
270           // rooms.push(*id);
271         }
272       }
273     }
274   }
275 }
276
277 /// Handler for Message message.
278 impl Handler<StandardMessage> for ChatServer {
279   type Result = MessageResult<StandardMessage>;
280
281   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
282     let msg_out = match parse_json_message(self, msg) {
283       Ok(m) => m,
284       Err(e) => e.to_string(),
285     };
286
287     MessageResult(msg_out)
288   }
289 }
290
291 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
292   let json: Value = serde_json::from_str(&msg.msg)?;
293   let data = &json["data"].to_string();
294   let op = &json["op"].as_str().ok_or(APIError {
295     op: "Unknown op type".to_string(),
296     message: format!("Unknown op type"),
297   })?;
298
299   let user_operation: UserOperation = UserOperation::from_str(&op)?;
300
301   match user_operation {
302     UserOperation::Login => {
303       let login: Login = serde_json::from_str(data)?;
304       let res = Oper::new(user_operation, login).perform()?;
305       Ok(serde_json::to_string(&res)?)
306     }
307     UserOperation::Register => {
308       let register: Register = serde_json::from_str(data)?;
309       let res = Oper::new(user_operation, register).perform();
310       if res.is_ok() {
311         chat.check_rate_limit_register(msg.id)?;
312       }
313       Ok(serde_json::to_string(&res?)?)
314     }
315     UserOperation::GetUserDetails => {
316       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
317       let res = Oper::new(user_operation, get_user_details).perform()?;
318       Ok(serde_json::to_string(&res)?)
319     }
320     UserOperation::SaveUserSettings => {
321       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
322       let res = Oper::new(user_operation, save_user_settings).perform()?;
323       Ok(serde_json::to_string(&res)?)
324     }
325     UserOperation::AddAdmin => {
326       let add_admin: AddAdmin = serde_json::from_str(data)?;
327       let res = Oper::new(user_operation, add_admin).perform()?;
328       Ok(serde_json::to_string(&res)?)
329     }
330     UserOperation::BanUser => {
331       let ban_user: BanUser = serde_json::from_str(data)?;
332       let res = Oper::new(user_operation, ban_user).perform()?;
333       Ok(serde_json::to_string(&res)?)
334     }
335     UserOperation::GetReplies => {
336       let get_replies: GetReplies = serde_json::from_str(data)?;
337       let res = Oper::new(user_operation, get_replies).perform()?;
338       Ok(serde_json::to_string(&res)?)
339     }
340     UserOperation::MarkAllAsRead => {
341       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
342       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
343       Ok(serde_json::to_string(&res)?)
344     }
345     UserOperation::GetCommunity => {
346       let get_community: GetCommunity = serde_json::from_str(data)?;
347       let res = Oper::new(user_operation, get_community).perform()?;
348       Ok(serde_json::to_string(&res)?)
349     }
350     UserOperation::ListCommunities => {
351       let list_communities: ListCommunities = serde_json::from_str(data)?;
352       let res = Oper::new(user_operation, list_communities).perform()?;
353       Ok(serde_json::to_string(&res)?)
354     }
355     UserOperation::CreateCommunity => {
356       chat.check_rate_limit_register(msg.id)?;
357       let create_community: CreateCommunity = serde_json::from_str(data)?;
358       let res = Oper::new(user_operation, create_community).perform()?;
359       Ok(serde_json::to_string(&res)?)
360     }
361     UserOperation::EditCommunity => {
362       let edit_community: EditCommunity = serde_json::from_str(data)?;
363       let res = Oper::new(user_operation, edit_community).perform()?;
364       let mut community_sent: CommunityResponse = res.clone();
365       community_sent.community.user_id = None;
366       community_sent.community.subscribed = None;
367       let community_sent_str = serde_json::to_string(&community_sent)?;
368       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
369       Ok(serde_json::to_string(&res)?)
370     }
371     UserOperation::FollowCommunity => {
372       let follow_community: FollowCommunity = serde_json::from_str(data)?;
373       let res = Oper::new(user_operation, follow_community).perform()?;
374       Ok(serde_json::to_string(&res)?)
375     }
376     UserOperation::GetFollowedCommunities => {
377       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
378       let res = Oper::new(user_operation, followed_communities).perform()?;
379       Ok(serde_json::to_string(&res)?)
380     }
381     UserOperation::BanFromCommunity => {
382       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
383       let community_id = ban_from_community.community_id;
384       let res = Oper::new(user_operation, ban_from_community).perform()?;
385       let res_str = serde_json::to_string(&res)?;
386       chat.send_community_message(&community_id, &res_str, msg.id)?;
387       Ok(res_str)
388     }
389     UserOperation::AddModToCommunity => {
390       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
391       let community_id = mod_add_to_community.community_id;
392       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
393       let res_str = serde_json::to_string(&res)?;
394       chat.send_community_message(&community_id, &res_str, msg.id)?;
395       Ok(res_str)
396     }
397     UserOperation::ListCategories => {
398       let list_categories: ListCategories = ListCategories;
399       let res = Oper::new(user_operation, list_categories).perform()?;
400       Ok(serde_json::to_string(&res)?)
401     }
402     UserOperation::CreatePost => {
403       chat.check_rate_limit_post(msg.id)?;
404       let create_post: CreatePost = serde_json::from_str(data)?;
405       let res = Oper::new(user_operation, create_post).perform()?;
406       Ok(serde_json::to_string(&res)?)
407     }
408     UserOperation::GetPost => {
409       let get_post: GetPost = serde_json::from_str(data)?;
410       chat.join_room(get_post.id, msg.id);
411       let res = Oper::new(user_operation, get_post).perform()?;
412       Ok(serde_json::to_string(&res)?)
413     }
414     UserOperation::GetPosts => {
415       let get_posts: GetPosts = serde_json::from_str(data)?;
416       let res = Oper::new(user_operation, get_posts).perform()?;
417       Ok(serde_json::to_string(&res)?)
418     }
419     UserOperation::CreatePostLike => {
420       chat.check_rate_limit_message(msg.id)?;
421       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
422       let res = Oper::new(user_operation, create_post_like).perform()?;
423       Ok(serde_json::to_string(&res)?)
424     }
425     UserOperation::EditPost => {
426       let edit_post: EditPost = serde_json::from_str(data)?;
427       let res = Oper::new(user_operation, edit_post).perform()?;
428       let mut post_sent = res.clone();
429       post_sent.post.my_vote = None;
430       let post_sent_str = serde_json::to_string(&post_sent)?;
431       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
432       Ok(serde_json::to_string(&res)?)
433     }
434     UserOperation::SavePost => {
435       let save_post: SavePost = serde_json::from_str(data)?;
436       let res = Oper::new(user_operation, save_post).perform()?;
437       Ok(serde_json::to_string(&res)?)
438     }
439     UserOperation::CreateComment => {
440       chat.check_rate_limit_message(msg.id)?;
441       let create_comment: CreateComment = serde_json::from_str(data)?;
442       let post_id = create_comment.post_id;
443       let res = Oper::new(user_operation, create_comment).perform()?;
444       let mut comment_sent = res.clone();
445       comment_sent.comment.my_vote = None;
446       comment_sent.comment.user_id = None;
447       let comment_sent_str = serde_json::to_string(&comment_sent)?;
448       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
449       Ok(serde_json::to_string(&res)?)
450     }
451     UserOperation::EditComment => {
452       let edit_comment: EditComment = serde_json::from_str(data)?;
453       let post_id = edit_comment.post_id;
454       let res = Oper::new(user_operation, edit_comment).perform()?;
455       let mut comment_sent = res.clone();
456       comment_sent.comment.my_vote = None;
457       comment_sent.comment.user_id = None;
458       let comment_sent_str = serde_json::to_string(&comment_sent)?;
459       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
460       Ok(serde_json::to_string(&res)?)
461     }
462     UserOperation::SaveComment => {
463       let save_comment: SaveComment = serde_json::from_str(data)?;
464       let res = Oper::new(user_operation, save_comment).perform()?;
465       Ok(serde_json::to_string(&res)?)
466     }
467     UserOperation::CreateCommentLike => {
468       chat.check_rate_limit_message(msg.id)?;
469       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
470       let post_id = create_comment_like.post_id;
471       let res = Oper::new(user_operation, create_comment_like).perform()?;
472       let mut comment_sent = res.clone();
473       comment_sent.comment.my_vote = None;
474       comment_sent.comment.user_id = None;
475       let comment_sent_str = serde_json::to_string(&comment_sent)?;
476       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
477       Ok(serde_json::to_string(&res)?)
478     }
479     UserOperation::GetModlog => {
480       let get_modlog: GetModlog = serde_json::from_str(data)?;
481       let res = Oper::new(user_operation, get_modlog).perform()?;
482       Ok(serde_json::to_string(&res)?)
483     }
484     UserOperation::CreateSite => {
485       let create_site: CreateSite = serde_json::from_str(data)?;
486       let res = Oper::new(user_operation, create_site).perform()?;
487       Ok(serde_json::to_string(&res)?)
488     }
489     UserOperation::EditSite => {
490       let edit_site: EditSite = serde_json::from_str(data)?;
491       let res = Oper::new(user_operation, edit_site).perform()?;
492       Ok(serde_json::to_string(&res)?)
493     }
494     UserOperation::GetSite => {
495       let get_site: GetSite = serde_json::from_str(data)?;
496       let res = Oper::new(user_operation, get_site).perform()?;
497       Ok(serde_json::to_string(&res)?)
498     }
499     UserOperation::Search => {
500       let search: Search = serde_json::from_str(data)?;
501       let res = Oper::new(user_operation, search).perform()?;
502       Ok(serde_json::to_string(&res)?)
503     }
504     UserOperation::TransferCommunity => {
505       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
506       let res = Oper::new(user_operation, transfer_community).perform()?;
507       Ok(serde_json::to_string(&res)?)
508     }
509     UserOperation::TransferSite => {
510       let transfer_site: TransferSite = serde_json::from_str(data)?;
511       let res = Oper::new(user_operation, transfer_site).perform()?;
512       Ok(serde_json::to_string(&res)?)
513     }
514   }
515 }