]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'dev'
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
11 use failure::Error;
12 use std::time::{SystemTime};
13
14 use crate::api::*;
15 use crate::api::user::*;
16 use crate::api::community::*;
17 use crate::api::post::*;
18 use crate::api::comment::*;
19 use crate::api::site::*;
20
21 const RATE_LIMIT_MESSAGE: i32 = 30;
22 const RATE_LIMIT_MESSAGES_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_POST: i32 = 1;
24 const RATE_LIMIT_POSTS_PER_SECOND: i32 = 60*10;
25 const RATE_LIMIT_REGISTER: i32 = 1;
26 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*60;
27
28
29 /// Chat server sends this messages to session
30 #[derive(Message)]
31 pub struct WSMessage(pub String);
32
33 /// Message for chat server communications
34
35 /// New chat session is created
36 #[derive(Message)]
37 #[rtype(usize)]
38 pub struct Connect {
39   pub addr: Recipient<WSMessage>,
40   pub ip: String,
41 }
42
43 /// Session is disconnected
44 #[derive(Message)]
45 pub struct Disconnect {
46   pub id: usize,
47   pub ip: String,
48 }
49
50 /// Send message to specific room
51 #[derive(Message)]
52 pub struct ClientMessage {
53   /// Id of the client session
54   pub id: usize,
55   /// Peer message
56   pub msg: String,
57   /// Room name
58   pub room: String,
59 }
60
61 #[derive(Serialize, Deserialize)]
62 pub struct StandardMessage {
63   /// Id of the client session
64   pub id: usize,
65   /// Peer message
66   pub msg: String,
67 }
68
69 impl actix::Message for StandardMessage {
70   type Result = String;
71 }
72
73 #[derive(Debug)]
74 pub struct RateLimitBucket {
75   last_checked: SystemTime,
76   allowance: f64
77 }
78
79 pub struct SessionInfo {
80   pub addr: Recipient<WSMessage>,
81   pub ip: String,
82 }
83
84 /// `ChatServer` manages chat rooms and responsible for coordinating chat
85 /// session. implementation is super primitive
86 pub struct ChatServer {
87   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
88   rate_limits: HashMap<String, RateLimitBucket>,
89   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
90   rng: ThreadRng,
91 }
92
93 impl Default for ChatServer {
94   fn default() -> ChatServer {
95     // default room
96     let rooms = HashMap::new();
97
98     ChatServer {
99       sessions: HashMap::new(),
100       rate_limits: HashMap::new(),
101       rooms: rooms,
102       rng: rand::thread_rng(),
103     }
104   }
105 }
106
107 impl ChatServer {
108   /// Send message to all users in the room
109   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
110     if let Some(sessions) = self.rooms.get(room) {
111       for id in sessions {
112         if *id != skip_id {
113           if let Some(info) = self.sessions.get(id) {
114             let _ = info.addr.do_send(WSMessage(message.to_owned()));
115           }
116         }
117       }
118     }
119   }
120
121   fn join_room(&mut self, room_id: i32, id: usize) {
122     // remove session from all rooms
123     for (_n, sessions) in &mut self.rooms {
124       sessions.remove(&id);
125     }
126
127     // If the room doesn't exist yet
128     if self.rooms.get_mut(&room_id).is_none() {
129       self.rooms.insert(room_id, HashSet::new());
130     }
131
132     &self.rooms.get_mut(&room_id).unwrap().insert(id);
133   }
134
135   fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
136     use crate::db::*;
137     use crate::db::post_view::*;
138     let conn = establish_connection();
139     let posts = PostView::list(
140       &conn,
141       PostListingType::Community, 
142       &SortType::New, 
143       Some(*community_id), 
144       None,
145       None, 
146       None,
147       None,
148       false,
149       false,
150       false,
151       None,
152       Some(9999))?;
153     for post in posts {
154       self.send_room_message(&post.id, message, skip_id);
155     }
156
157     Ok(())
158   }
159
160   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
161     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER, RATE_LIMIT_REGISTER_PER_SECOND)
162   }
163
164   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
165     self.check_rate_limit_full(id, RATE_LIMIT_POST, RATE_LIMIT_POSTS_PER_SECOND)
166   }
167
168   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
169     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGE, RATE_LIMIT_MESSAGES_PER_SECOND)
170   }
171
172   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
173     if let Some(info) = self.sessions.get(&id) {
174       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
175         // The initial value
176         if rate_limit.allowance == -2f64 {
177           rate_limit.allowance = rate as f64;
178         };
179
180         let current = SystemTime::now();
181         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
182         rate_limit.last_checked = current;
183         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
184         if rate_limit.allowance > rate as f64 {
185           rate_limit.allowance = rate as f64;
186         }
187
188         if rate_limit.allowance < 1.0 {
189           println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
190           Err(APIError {
191             op: "Rate Limit".to_string(), 
192             message: format!("Too many requests. {} per {} seconds", rate, per),
193           })?
194         } else {
195           rate_limit.allowance -= 1.0;
196           Ok(())
197         }
198       } else {
199         Ok(())
200       }
201     } else {
202       Ok(())
203     }
204   }
205 }
206
207
208 /// Make actor from `ChatServer`
209 impl Actor for ChatServer {
210   /// We are going to use simple Context, we just need ability to communicate
211   /// with other actors.
212   type Context = Context<Self>;
213 }
214
215 /// Handler for Connect message.
216 ///
217 /// Register new session and assign unique id to this session
218 impl Handler<Connect> for ChatServer {
219   type Result = usize;
220
221   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
222
223     // notify all users in same room
224     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
225
226     // register session with random id
227     let id = self.rng.gen::<usize>();
228     println!("{} joined", &msg.ip);
229
230     self.sessions.insert(id, SessionInfo {
231       addr: msg.addr,
232       ip: msg.ip.to_owned(),
233     });
234
235     if self.rate_limits.get(&msg.ip).is_none() {
236       self.rate_limits.insert(msg.ip, RateLimitBucket {
237         last_checked: SystemTime::now(),
238         allowance: -2f64,
239       });
240     }
241
242     id
243   }
244 }
245
246 /// Handler for Disconnect message.
247 impl Handler<Disconnect> for ChatServer {
248   type Result = ();
249
250   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
251
252     // let mut rooms: Vec<i32> = Vec::new();
253
254     // remove address
255     if self.sessions.remove(&msg.id).is_some() {
256       // remove session from all rooms
257       for (_id, sessions) in &mut self.rooms {
258         if sessions.remove(&msg.id) {
259           // rooms.push(*id);
260         }
261       }
262     }
263   }
264 }
265
266 /// Handler for Message message.
267 impl Handler<StandardMessage> for ChatServer {
268   type Result = MessageResult<StandardMessage>;
269
270
271   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
272
273     let msg_out = match parse_json_message(self, msg) {
274       Ok(m) => m,
275       Err(e) => e.to_string()
276     };
277
278     MessageResult(msg_out)
279   }
280 }
281
282 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
283
284   let json: Value = serde_json::from_str(&msg.msg)?;
285   let data = &json["data"].to_string();
286   let op = &json["op"].as_str().ok_or(APIError {
287     op: "Unknown op type".to_string(), 
288     message: format!("Unknown op type"),
289   })?;
290
291   let user_operation: UserOperation = UserOperation::from_str(&op)?;
292
293   match user_operation {
294     UserOperation::Login => {
295       let login: Login = serde_json::from_str(data)?;
296       let res = Oper::new(user_operation, login).perform()?;
297       Ok(serde_json::to_string(&res)?)
298     },
299     UserOperation::Register => {
300       chat.check_rate_limit_register(msg.id)?;
301       let register: Register = serde_json::from_str(data)?;
302       let res = Oper::new(user_operation, register).perform()?;
303       Ok(serde_json::to_string(&res)?)
304     },
305     UserOperation::GetUserDetails => {
306       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
307       let res = Oper::new(user_operation, get_user_details).perform()?;
308       Ok(serde_json::to_string(&res)?)
309     },
310     UserOperation::SaveUserSettings => {
311       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
312       let res = Oper::new(user_operation, save_user_settings).perform()?;
313       Ok(serde_json::to_string(&res)?)
314     },
315     UserOperation::AddAdmin => {
316       let add_admin: AddAdmin = serde_json::from_str(data)?;
317       let res = Oper::new(user_operation, add_admin).perform()?;
318       Ok(serde_json::to_string(&res)?)
319     },
320     UserOperation::BanUser => {
321       let ban_user: BanUser = serde_json::from_str(data)?;
322       let res = Oper::new(user_operation, ban_user).perform()?;
323       Ok(serde_json::to_string(&res)?)
324     },
325     UserOperation::GetReplies => {
326       let get_replies: GetReplies = serde_json::from_str(data)?;
327       let res = Oper::new(user_operation, get_replies).perform()?;
328       Ok(serde_json::to_string(&res)?)
329     },
330     UserOperation::MarkAllAsRead => {
331       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
332       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
333       Ok(serde_json::to_string(&res)?)
334     },
335     UserOperation::GetCommunity => {
336       let get_community: GetCommunity = serde_json::from_str(data)?;
337       let res = Oper::new(user_operation, get_community).perform()?;
338       Ok(serde_json::to_string(&res)?)
339     },
340     UserOperation::ListCommunities => {
341       let list_communities: ListCommunities = serde_json::from_str(data)?;
342       let res = Oper::new(user_operation, list_communities).perform()?;
343       Ok(serde_json::to_string(&res)?)
344     },
345     UserOperation::CreateCommunity => {
346       chat.check_rate_limit_register(msg.id)?;
347       let create_community: CreateCommunity = serde_json::from_str(data)?;
348       let res = Oper::new(user_operation, create_community).perform()?;
349       Ok(serde_json::to_string(&res)?)
350     },
351     UserOperation::EditCommunity => {
352       let edit_community: EditCommunity = serde_json::from_str(data)?;
353       let res = Oper::new(user_operation, edit_community).perform()?;
354       let mut community_sent: CommunityResponse = res.clone();
355       community_sent.community.user_id = None;
356       community_sent.community.subscribed = None;
357       let community_sent_str = serde_json::to_string(&community_sent)?;
358       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
359       Ok(serde_json::to_string(&res)?)
360     },
361     UserOperation::FollowCommunity => {
362       let follow_community: FollowCommunity = serde_json::from_str(data)?;
363       let res = Oper::new(user_operation, follow_community).perform()?;
364       Ok(serde_json::to_string(&res)?)
365     },
366     UserOperation::GetFollowedCommunities => {
367       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
368       let res = Oper::new(user_operation, followed_communities).perform()?;
369       Ok(serde_json::to_string(&res)?)
370     },
371     UserOperation::BanFromCommunity => {
372       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
373       let community_id = ban_from_community.community_id;
374       let res = Oper::new(user_operation, ban_from_community).perform()?;
375       let res_str = serde_json::to_string(&res)?;
376       chat.send_community_message(&community_id, &res_str, msg.id)?;
377       Ok(res_str)
378     },
379     UserOperation::AddModToCommunity => {
380       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
381       let community_id = mod_add_to_community.community_id;
382       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
383       let res_str = serde_json::to_string(&res)?;
384       chat.send_community_message(&community_id, &res_str, msg.id)?;
385       Ok(res_str)
386     },
387     UserOperation::ListCategories => {
388       let list_categories: ListCategories = ListCategories;
389       let res = Oper::new(user_operation, list_categories).perform()?;
390       Ok(serde_json::to_string(&res)?)
391     },
392     UserOperation::CreatePost => {
393       chat.check_rate_limit_post(msg.id)?;
394       let create_post: CreatePost = serde_json::from_str(data)?;
395       let res = Oper::new(user_operation, create_post).perform()?;
396       Ok(serde_json::to_string(&res)?)
397     },
398     UserOperation::GetPost => {
399       let get_post: GetPost = serde_json::from_str(data)?;
400       chat.join_room(get_post.id, msg.id);
401       let res = Oper::new(user_operation, get_post).perform()?;
402       Ok(serde_json::to_string(&res)?)
403     },
404     UserOperation::GetPosts => {
405       let get_posts: GetPosts = serde_json::from_str(data)?;
406       let res = Oper::new(user_operation, get_posts).perform()?;
407       Ok(serde_json::to_string(&res)?)
408     },
409     UserOperation::CreatePostLike => {
410       chat.check_rate_limit_message(msg.id)?;
411       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
412       let res = Oper::new(user_operation, create_post_like).perform()?;
413       Ok(serde_json::to_string(&res)?)
414     },
415     UserOperation::EditPost => {
416       let edit_post: EditPost = serde_json::from_str(data)?;
417       let res = Oper::new(user_operation, edit_post).perform()?;
418       let mut post_sent = res.clone();
419       post_sent.post.my_vote = None;
420       let post_sent_str = serde_json::to_string(&post_sent)?;
421       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
422       Ok(serde_json::to_string(&res)?)
423     },
424     UserOperation::SavePost => {
425       let save_post: SavePost = serde_json::from_str(data)?;
426       let res = Oper::new(user_operation, save_post).perform()?;
427       Ok(serde_json::to_string(&res)?)
428     },
429     UserOperation::CreateComment => {
430       chat.check_rate_limit_message(msg.id)?;
431       let create_comment: CreateComment = serde_json::from_str(data)?;
432       let post_id = create_comment.post_id;
433       let res = Oper::new(user_operation, create_comment).perform()?;
434       let mut comment_sent = res.clone();
435       comment_sent.comment.my_vote = None;
436       comment_sent.comment.user_id = None;
437       let comment_sent_str = serde_json::to_string(&comment_sent)?;
438       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
439       Ok(serde_json::to_string(&res)?)
440     },
441     UserOperation::EditComment => {
442       let edit_comment: EditComment = serde_json::from_str(data)?;
443       let post_id = edit_comment.post_id;
444       let res = Oper::new(user_operation, edit_comment).perform()?;
445       let mut comment_sent = res.clone();
446       comment_sent.comment.my_vote = None;
447       comment_sent.comment.user_id = None;
448       let comment_sent_str = serde_json::to_string(&comment_sent)?;
449       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
450       Ok(serde_json::to_string(&res)?)
451     },
452     UserOperation::SaveComment => {
453       let save_comment: SaveComment = serde_json::from_str(data)?;
454       let res = Oper::new(user_operation, save_comment).perform()?;
455       Ok(serde_json::to_string(&res)?)
456     },
457     UserOperation::CreateCommentLike => {
458       chat.check_rate_limit_message(msg.id)?;
459       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
460       let post_id = create_comment_like.post_id;
461       let res = Oper::new(user_operation, create_comment_like).perform()?;
462       let mut comment_sent = res.clone();
463       comment_sent.comment.my_vote = None;
464       comment_sent.comment.user_id = None;
465       let comment_sent_str = serde_json::to_string(&comment_sent)?;
466       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
467       Ok(serde_json::to_string(&res)?)
468     },
469     UserOperation::GetModlog => {
470       let get_modlog: GetModlog = serde_json::from_str(data)?;
471       let res = Oper::new(user_operation, get_modlog).perform()?;
472       Ok(serde_json::to_string(&res)?)
473     },
474     UserOperation::CreateSite => {
475       let create_site: CreateSite = serde_json::from_str(data)?;
476       let res = Oper::new(user_operation, create_site).perform()?;
477       Ok(serde_json::to_string(&res)?)
478     },
479     UserOperation::EditSite => {
480       let edit_site: EditSite = serde_json::from_str(data)?;
481       let res = Oper::new(user_operation, edit_site).perform()?;
482       Ok(serde_json::to_string(&res)?)
483     },
484     UserOperation::GetSite => {
485       let get_site: GetSite = serde_json::from_str(data)?;
486       let res = Oper::new(user_operation, get_site).perform()?;
487       Ok(serde_json::to_string(&res)?)
488     },
489     UserOperation::Search => {
490       let search: Search = serde_json::from_str(data)?;
491       let res = Oper::new(user_operation, search).perform()?;
492       Ok(serde_json::to_string(&res)?)
493     },
494     UserOperation::TransferCommunity => {
495       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
496       let res = Oper::new(user_operation, transfer_community).perform()?;
497       Ok(serde_json::to_string(&res)?)
498     },
499     UserOperation::TransferSite => {
500       let transfer_site: TransferSite = serde_json::from_str(data)?;
501       let res = Oper::new(user_operation, transfer_site).perform()?;
502       Ok(serde_json::to_string(&res)?)
503     },
504   }
505 }