1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
12 use std::time::{SystemTime};
16 use api::community::*;
21 const RATE_LIMIT_MESSAGES: i32 = 30;
22 const RATE_LIMIT_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_REGISTER_MESSAGES: i32 = 3;
24 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*3;
27 /// Chat server sends this messages to session
29 pub struct WSMessage(pub String);
31 /// Message for chat server communications
33 /// New chat session is created
37 pub addr: Recipient<WSMessage>,
41 /// Session is disconnected
43 pub struct Disconnect {
48 /// Send message to specific room
50 pub struct ClientMessage {
51 /// Id of the client session
59 #[derive(Serialize, Deserialize)]
60 pub struct StandardMessage {
61 /// Id of the client session
67 impl actix::Message for StandardMessage {
72 pub struct RateLimitBucket {
73 last_checked: SystemTime,
77 pub struct SessionInfo {
78 pub addr: Recipient<WSMessage>,
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session. implementation is super primitive
84 pub struct ChatServer {
85 sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
86 rate_limits: HashMap<String, RateLimitBucket>,
87 rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
91 impl Default for ChatServer {
92 fn default() -> ChatServer {
94 let rooms = HashMap::new();
97 sessions: HashMap::new(),
98 rate_limits: HashMap::new(),
100 rng: rand::thread_rng(),
106 /// Send message to all users in the room
107 fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
108 if let Some(sessions) = self.rooms.get(room) {
111 if let Some(info) = self.sessions.get(id) {
112 let _ = info.addr.do_send(WSMessage(message.to_owned()));
119 fn join_room(&mut self, room_id: i32, id: usize) {
120 // remove session from all rooms
121 for (_n, mut sessions) in &mut self.rooms {
122 sessions.remove(&id);
125 // If the room doesn't exist yet
126 if self.rooms.get_mut(&room_id).is_none() {
127 self.rooms.insert(room_id, HashSet::new());
130 &self.rooms.get_mut(&room_id).unwrap().insert(id);
133 fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
135 use db::post_view::*;
136 let conn = establish_connection();
137 let posts = PostView::list(&conn,
138 PostListingType::Community,
149 self.send_room_message(&post.id, message, skip_id);
155 fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
156 self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
159 fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
160 self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
163 fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
164 if let Some(info) = self.sessions.get(&id) {
165 if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
167 if rate_limit.allowance == -2f64 {
168 rate_limit.allowance = rate as f64;
171 let current = SystemTime::now();
172 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
173 rate_limit.last_checked = current;
174 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
175 if rate_limit.allowance > rate as f64 {
176 rate_limit.allowance = rate as f64;
179 if rate_limit.allowance < 1.0 {
180 println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
182 op: "Rate Limit".to_string(),
183 message: format!("Too many requests. {} per {} seconds", rate, per),
186 rate_limit.allowance -= 1.0;
199 /// Make actor from `ChatServer`
200 impl Actor for ChatServer {
201 /// We are going to use simple Context, we just need ability to communicate
202 /// with other actors.
203 type Context = Context<Self>;
206 /// Handler for Connect message.
208 /// Register new session and assign unique id to this session
209 impl Handler<Connect> for ChatServer {
212 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
214 // notify all users in same room
215 // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
217 // register session with random id
218 let id = self.rng.gen::<usize>();
219 println!("{} joined", &msg.ip);
221 self.sessions.insert(id, SessionInfo {
223 ip: msg.ip.to_owned(),
226 if self.rate_limits.get(&msg.ip).is_none() {
227 self.rate_limits.insert(msg.ip, RateLimitBucket {
228 last_checked: SystemTime::now(),
233 // for (k,v) in &self.rate_limits {
234 // println!("{}: {:?}", k,v);
237 // auto join session to Main room
238 // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
245 /// Handler for Disconnect message.
246 impl Handler<Disconnect> for ChatServer {
249 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
251 // let mut rooms: Vec<i32> = Vec::new();
254 if self.sessions.remove(&msg.id).is_some() {
255 // remove session from all rooms
256 for (_id, sessions) in &mut self.rooms {
257 if sessions.remove(&msg.id) {
265 /// Handler for Message message.
266 impl Handler<StandardMessage> for ChatServer {
267 type Result = MessageResult<StandardMessage>;
270 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
272 let msg_out = match parse_json_message(self, msg) {
274 Err(e) => e.to_string()
277 MessageResult(msg_out)
281 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
283 let json: Value = serde_json::from_str(&msg.msg)?;
284 let data = &json["data"].to_string();
285 let op = &json["op"].as_str().unwrap();
287 let user_operation: UserOperation = UserOperation::from_str(&op)?;
289 match user_operation {
290 UserOperation::Login => {
291 let login: Login = serde_json::from_str(data)?;
292 let res = Oper::new(user_operation, login).perform()?;
293 Ok(serde_json::to_string(&res)?)
295 UserOperation::Register => {
296 chat.check_rate_limit_register(msg.id)?;
297 let register: Register = serde_json::from_str(data)?;
298 let res = Oper::new(user_operation, register).perform()?;
299 Ok(serde_json::to_string(&res)?)
301 UserOperation::GetUserDetails => {
302 let get_user_details: GetUserDetails = serde_json::from_str(data)?;
303 let res = Oper::new(user_operation, get_user_details).perform()?;
304 Ok(serde_json::to_string(&res)?)
306 UserOperation::AddAdmin => {
307 let add_admin: AddAdmin = serde_json::from_str(data)?;
308 let res = Oper::new(user_operation, add_admin).perform()?;
309 Ok(serde_json::to_string(&res)?)
311 UserOperation::BanUser => {
312 let ban_user: BanUser = serde_json::from_str(data)?;
313 let res = Oper::new(user_operation, ban_user).perform()?;
314 Ok(serde_json::to_string(&res)?)
316 UserOperation::GetReplies => {
317 let get_replies: GetReplies = serde_json::from_str(data)?;
318 let res = Oper::new(user_operation, get_replies).perform()?;
319 Ok(serde_json::to_string(&res)?)
321 UserOperation::MarkAllAsRead => {
322 let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
323 let res = Oper::new(user_operation, mark_all_as_read).perform()?;
324 Ok(serde_json::to_string(&res)?)
326 UserOperation::GetCommunity => {
327 let get_community: GetCommunity = serde_json::from_str(data)?;
328 let res = Oper::new(user_operation, get_community).perform()?;
329 Ok(serde_json::to_string(&res)?)
331 UserOperation::ListCommunities => {
332 let list_communities: ListCommunities = serde_json::from_str(data)?;
333 let res = Oper::new(user_operation, list_communities).perform()?;
334 Ok(serde_json::to_string(&res)?)
336 UserOperation::CreateCommunity => {
337 chat.check_rate_limit_register(msg.id)?;
338 let create_community: CreateCommunity = serde_json::from_str(data)?;
339 let res = Oper::new(user_operation, create_community).perform()?;
340 Ok(serde_json::to_string(&res)?)
342 UserOperation::EditCommunity => {
343 let edit_community: EditCommunity = serde_json::from_str(data)?;
344 let res = Oper::new(user_operation, edit_community).perform()?;
345 let mut community_sent: CommunityResponse = res.clone();
346 community_sent.community.user_id = None;
347 community_sent.community.subscribed = None;
348 let community_sent_str = serde_json::to_string(&community_sent)?;
349 chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
350 Ok(serde_json::to_string(&res)?)
352 UserOperation::FollowCommunity => {
353 let follow_community: FollowCommunity = serde_json::from_str(data)?;
354 let res = Oper::new(user_operation, follow_community).perform()?;
355 Ok(serde_json::to_string(&res)?)
357 UserOperation::GetFollowedCommunities => {
358 let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
359 let res = Oper::new(user_operation, followed_communities).perform()?;
360 Ok(serde_json::to_string(&res)?)
362 UserOperation::BanFromCommunity => {
363 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
364 let community_id = ban_from_community.community_id;
365 let res = Oper::new(user_operation, ban_from_community).perform()?;
366 let res_str = serde_json::to_string(&res)?;
367 chat.send_community_message(&community_id, &res_str, msg.id)?;
370 UserOperation::AddModToCommunity => {
371 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
372 let community_id = mod_add_to_community.community_id;
373 let res = Oper::new(user_operation, mod_add_to_community).perform()?;
374 let res_str = serde_json::to_string(&res)?;
375 chat.send_community_message(&community_id, &res_str, msg.id)?;
378 UserOperation::ListCategories => {
379 let list_categories: ListCategories = ListCategories;
380 let res = Oper::new(user_operation, list_categories).perform()?;
381 Ok(serde_json::to_string(&res)?)
383 UserOperation::CreatePost => {
384 chat.check_rate_limit_register(msg.id)?;
385 let create_post: CreatePost = serde_json::from_str(data)?;
386 let res = Oper::new(user_operation, create_post).perform()?;
387 Ok(serde_json::to_string(&res)?)
389 UserOperation::GetPost => {
390 let get_post: GetPost = serde_json::from_str(data)?;
391 chat.join_room(get_post.id, msg.id);
392 let res = Oper::new(user_operation, get_post).perform()?;
393 Ok(serde_json::to_string(&res)?)
395 UserOperation::GetPosts => {
396 let get_posts: GetPosts = serde_json::from_str(data)?;
397 let res = Oper::new(user_operation, get_posts).perform()?;
398 Ok(serde_json::to_string(&res)?)
400 UserOperation::CreatePostLike => {
401 chat.check_rate_limit(msg.id)?;
402 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
403 let res = Oper::new(user_operation, create_post_like).perform()?;
404 Ok(serde_json::to_string(&res)?)
406 UserOperation::EditPost => {
407 let edit_post: EditPost = serde_json::from_str(data)?;
408 let res = Oper::new(user_operation, edit_post).perform()?;
409 let mut post_sent = res.clone();
410 post_sent.post.my_vote = None;
411 let post_sent_str = serde_json::to_string(&post_sent)?;
412 chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
413 Ok(serde_json::to_string(&res)?)
415 UserOperation::SavePost => {
416 let save_post: SavePost = serde_json::from_str(data)?;
417 let res = Oper::new(user_operation, save_post).perform()?;
418 Ok(serde_json::to_string(&res)?)
420 UserOperation::CreateComment => {
421 chat.check_rate_limit(msg.id)?;
422 let create_comment: CreateComment = serde_json::from_str(data)?;
423 let post_id = create_comment.post_id;
424 let res = Oper::new(user_operation, create_comment).perform()?;
425 let mut comment_sent = res.clone();
426 comment_sent.comment.my_vote = None;
427 comment_sent.comment.user_id = None;
428 let comment_sent_str = serde_json::to_string(&comment_sent)?;
429 chat.send_room_message(&post_id, &comment_sent_str, msg.id);
430 Ok(serde_json::to_string(&res)?)
432 UserOperation::EditComment => {
433 let edit_comment: EditComment = serde_json::from_str(data)?;
434 let post_id = edit_comment.post_id;
435 let res = Oper::new(user_operation, edit_comment).perform()?;
436 let mut comment_sent = res.clone();
437 comment_sent.comment.my_vote = None;
438 comment_sent.comment.user_id = None;
439 let comment_sent_str = serde_json::to_string(&comment_sent)?;
440 chat.send_room_message(&post_id, &comment_sent_str, msg.id);
441 Ok(serde_json::to_string(&res)?)
443 UserOperation::SaveComment => {
444 let save_comment: SaveComment = serde_json::from_str(data)?;
445 let res = Oper::new(user_operation, save_comment).perform()?;
446 Ok(serde_json::to_string(&res)?)
448 UserOperation::CreateCommentLike => {
449 chat.check_rate_limit(msg.id)?;
450 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
451 let post_id = create_comment_like.post_id;
452 let res = Oper::new(user_operation, create_comment_like).perform()?;
453 let mut comment_sent = res.clone();
454 comment_sent.comment.my_vote = None;
455 comment_sent.comment.user_id = None;
456 let comment_sent_str = serde_json::to_string(&comment_sent)?;
457 chat.send_room_message(&post_id, &comment_sent_str, msg.id);
458 Ok(serde_json::to_string(&res)?)
460 UserOperation::GetModlog => {
461 let get_modlog: GetModlog = serde_json::from_str(data)?;
462 let res = Oper::new(user_operation, get_modlog).perform()?;
463 Ok(serde_json::to_string(&res)?)
465 UserOperation::CreateSite => {
466 let create_site: CreateSite = serde_json::from_str(data)?;
467 let res = Oper::new(user_operation, create_site).perform()?;
468 Ok(serde_json::to_string(&res)?)
470 UserOperation::EditSite => {
471 let edit_site: EditSite = serde_json::from_str(data)?;
472 let res = Oper::new(user_operation, edit_site).perform()?;
473 Ok(serde_json::to_string(&res)?)
475 UserOperation::GetSite => {
476 let get_site: GetSite = serde_json::from_str(data)?;
477 let res = Oper::new(user_operation, get_site).perform()?;
478 Ok(serde_json::to_string(&res)?)
480 UserOperation::Search => {
481 let search: Search = serde_json::from_str(data)?;
482 let res = Oper::new(user_operation, search).perform()?;
483 Ok(serde_json::to_string(&res)?)