1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
12 use std::time::{SystemTime};
15 use crate::api::user::*;
16 use crate::api::community::*;
17 use crate::api::post::*;
18 use crate::api::comment::*;
19 use crate::api::site::*;
21 const RATE_LIMIT_MESSAGES: i32 = 30;
22 const RATE_LIMIT_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_REGISTER_MESSAGES: i32 = 3;
24 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*3;
27 /// Chat server sends this messages to session
29 pub struct WSMessage(pub String);
31 /// Message for chat server communications
33 /// New chat session is created
37 pub addr: Recipient<WSMessage>,
41 /// Session is disconnected
43 pub struct Disconnect {
48 /// Send message to specific room
50 pub struct ClientMessage {
51 /// Id of the client session
59 #[derive(Serialize, Deserialize)]
60 pub struct StandardMessage {
61 /// Id of the client session
67 impl actix::Message for StandardMessage {
72 pub struct RateLimitBucket {
73 last_checked: SystemTime,
77 pub struct SessionInfo {
78 pub addr: Recipient<WSMessage>,
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session. implementation is super primitive
84 pub struct ChatServer {
85 sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
86 rate_limits: HashMap<String, RateLimitBucket>,
87 rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
91 impl Default for ChatServer {
92 fn default() -> ChatServer {
94 let rooms = HashMap::new();
97 sessions: HashMap::new(),
98 rate_limits: HashMap::new(),
100 rng: rand::thread_rng(),
106 /// Send message to all users in the room
107 fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
108 if let Some(sessions) = self.rooms.get(room) {
111 if let Some(info) = self.sessions.get(id) {
112 let _ = info.addr.do_send(WSMessage(message.to_owned()));
119 fn join_room(&mut self, room_id: i32, id: usize) {
120 // remove session from all rooms
121 for (_n, sessions) in &mut self.rooms {
122 sessions.remove(&id);
125 // If the room doesn't exist yet
126 if self.rooms.get_mut(&room_id).is_none() {
127 self.rooms.insert(room_id, HashSet::new());
130 &self.rooms.get_mut(&room_id).unwrap().insert(id);
133 fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
135 use crate::db::post_view::*;
136 let conn = establish_connection();
137 let posts = PostView::list(
139 PostListingType::Community,
152 self.send_room_message(&post.id, message, skip_id);
158 fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
159 self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
162 fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
163 self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
166 fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
167 if let Some(info) = self.sessions.get(&id) {
168 if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
170 if rate_limit.allowance == -2f64 {
171 rate_limit.allowance = rate as f64;
174 let current = SystemTime::now();
175 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
176 rate_limit.last_checked = current;
177 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
178 if rate_limit.allowance > rate as f64 {
179 rate_limit.allowance = rate as f64;
182 if rate_limit.allowance < 1.0 {
183 println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
185 op: "Rate Limit".to_string(),
186 message: format!("Too many requests. {} per {} seconds", rate, per),
189 rate_limit.allowance -= 1.0;
202 /// Make actor from `ChatServer`
203 impl Actor for ChatServer {
204 /// We are going to use simple Context, we just need ability to communicate
205 /// with other actors.
206 type Context = Context<Self>;
209 /// Handler for Connect message.
211 /// Register new session and assign unique id to this session
212 impl Handler<Connect> for ChatServer {
215 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
217 // notify all users in same room
218 // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
220 // register session with random id
221 let id = self.rng.gen::<usize>();
222 println!("{} joined", &msg.ip);
224 self.sessions.insert(id, SessionInfo {
226 ip: msg.ip.to_owned(),
229 if self.rate_limits.get(&msg.ip).is_none() {
230 self.rate_limits.insert(msg.ip, RateLimitBucket {
231 last_checked: SystemTime::now(),
236 // for (k,v) in &self.rate_limits {
237 // println!("{}: {:?}", k,v);
240 // auto join session to Main room
241 // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
248 /// Handler for Disconnect message.
249 impl Handler<Disconnect> for ChatServer {
252 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
254 // let mut rooms: Vec<i32> = Vec::new();
257 if self.sessions.remove(&msg.id).is_some() {
258 // remove session from all rooms
259 for (_id, sessions) in &mut self.rooms {
260 if sessions.remove(&msg.id) {
268 /// Handler for Message message.
269 impl Handler<StandardMessage> for ChatServer {
270 type Result = MessageResult<StandardMessage>;
273 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
275 let msg_out = match parse_json_message(self, msg) {
277 Err(e) => e.to_string()
280 MessageResult(msg_out)
284 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
286 let json: Value = serde_json::from_str(&msg.msg)?;
287 let data = &json["data"].to_string();
288 let op = &json["op"].as_str().unwrap();
290 let user_operation: UserOperation = UserOperation::from_str(&op)?;
292 match user_operation {
293 UserOperation::Login => {
294 let login: Login = serde_json::from_str(data)?;
295 let res = Oper::new(user_operation, login).perform()?;
296 Ok(serde_json::to_string(&res)?)
298 UserOperation::Register => {
299 chat.check_rate_limit_register(msg.id)?;
300 let register: Register = serde_json::from_str(data)?;
301 let res = Oper::new(user_operation, register).perform()?;
302 Ok(serde_json::to_string(&res)?)
304 UserOperation::GetUserDetails => {
305 let get_user_details: GetUserDetails = serde_json::from_str(data)?;
306 let res = Oper::new(user_operation, get_user_details).perform()?;
307 Ok(serde_json::to_string(&res)?)
309 UserOperation::SaveUserSettings => {
310 let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
311 let res = Oper::new(user_operation, save_user_settings).perform()?;
312 Ok(serde_json::to_string(&res)?)
314 UserOperation::AddAdmin => {
315 let add_admin: AddAdmin = serde_json::from_str(data)?;
316 let res = Oper::new(user_operation, add_admin).perform()?;
317 Ok(serde_json::to_string(&res)?)
319 UserOperation::BanUser => {
320 let ban_user: BanUser = serde_json::from_str(data)?;
321 let res = Oper::new(user_operation, ban_user).perform()?;
322 Ok(serde_json::to_string(&res)?)
324 UserOperation::GetReplies => {
325 let get_replies: GetReplies = serde_json::from_str(data)?;
326 let res = Oper::new(user_operation, get_replies).perform()?;
327 Ok(serde_json::to_string(&res)?)
329 UserOperation::MarkAllAsRead => {
330 let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
331 let res = Oper::new(user_operation, mark_all_as_read).perform()?;
332 Ok(serde_json::to_string(&res)?)
334 UserOperation::GetCommunity => {
335 let get_community: GetCommunity = serde_json::from_str(data)?;
336 let res = Oper::new(user_operation, get_community).perform()?;
337 Ok(serde_json::to_string(&res)?)
339 UserOperation::ListCommunities => {
340 let list_communities: ListCommunities = serde_json::from_str(data)?;
341 let res = Oper::new(user_operation, list_communities).perform()?;
342 Ok(serde_json::to_string(&res)?)
344 UserOperation::CreateCommunity => {
345 chat.check_rate_limit_register(msg.id)?;
346 let create_community: CreateCommunity = serde_json::from_str(data)?;
347 let res = Oper::new(user_operation, create_community).perform()?;
348 Ok(serde_json::to_string(&res)?)
350 UserOperation::EditCommunity => {
351 let edit_community: EditCommunity = serde_json::from_str(data)?;
352 let res = Oper::new(user_operation, edit_community).perform()?;
353 let mut community_sent: CommunityResponse = res.clone();
354 community_sent.community.user_id = None;
355 community_sent.community.subscribed = None;
356 let community_sent_str = serde_json::to_string(&community_sent)?;
357 chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
358 Ok(serde_json::to_string(&res)?)
360 UserOperation::FollowCommunity => {
361 let follow_community: FollowCommunity = serde_json::from_str(data)?;
362 let res = Oper::new(user_operation, follow_community).perform()?;
363 Ok(serde_json::to_string(&res)?)
365 UserOperation::GetFollowedCommunities => {
366 let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
367 let res = Oper::new(user_operation, followed_communities).perform()?;
368 Ok(serde_json::to_string(&res)?)
370 UserOperation::BanFromCommunity => {
371 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
372 let community_id = ban_from_community.community_id;
373 let res = Oper::new(user_operation, ban_from_community).perform()?;
374 let res_str = serde_json::to_string(&res)?;
375 chat.send_community_message(&community_id, &res_str, msg.id)?;
378 UserOperation::AddModToCommunity => {
379 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
380 let community_id = mod_add_to_community.community_id;
381 let res = Oper::new(user_operation, mod_add_to_community).perform()?;
382 let res_str = serde_json::to_string(&res)?;
383 chat.send_community_message(&community_id, &res_str, msg.id)?;
386 UserOperation::ListCategories => {
387 let list_categories: ListCategories = ListCategories;
388 let res = Oper::new(user_operation, list_categories).perform()?;
389 Ok(serde_json::to_string(&res)?)
391 UserOperation::CreatePost => {
392 chat.check_rate_limit_register(msg.id)?;
393 let create_post: CreatePost = serde_json::from_str(data)?;
394 let res = Oper::new(user_operation, create_post).perform()?;
395 Ok(serde_json::to_string(&res)?)
397 UserOperation::GetPost => {
398 let get_post: GetPost = serde_json::from_str(data)?;
399 chat.join_room(get_post.id, msg.id);
400 let res = Oper::new(user_operation, get_post).perform()?;
401 Ok(serde_json::to_string(&res)?)
403 UserOperation::GetPosts => {
404 let get_posts: GetPosts = serde_json::from_str(data)?;
405 let res = Oper::new(user_operation, get_posts).perform()?;
406 Ok(serde_json::to_string(&res)?)
408 UserOperation::CreatePostLike => {
409 chat.check_rate_limit(msg.id)?;
410 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
411 let res = Oper::new(user_operation, create_post_like).perform()?;
412 Ok(serde_json::to_string(&res)?)
414 UserOperation::EditPost => {
415 let edit_post: EditPost = serde_json::from_str(data)?;
416 let res = Oper::new(user_operation, edit_post).perform()?;
417 let mut post_sent = res.clone();
418 post_sent.post.my_vote = None;
419 let post_sent_str = serde_json::to_string(&post_sent)?;
420 chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
421 Ok(serde_json::to_string(&res)?)
423 UserOperation::SavePost => {
424 let save_post: SavePost = serde_json::from_str(data)?;
425 let res = Oper::new(user_operation, save_post).perform()?;
426 Ok(serde_json::to_string(&res)?)
428 UserOperation::CreateComment => {
429 chat.check_rate_limit(msg.id)?;
430 let create_comment: CreateComment = serde_json::from_str(data)?;
431 let post_id = create_comment.post_id;
432 let res = Oper::new(user_operation, create_comment).perform()?;
433 let mut comment_sent = res.clone();
434 comment_sent.comment.my_vote = None;
435 comment_sent.comment.user_id = None;
436 let comment_sent_str = serde_json::to_string(&comment_sent)?;
437 chat.send_room_message(&post_id, &comment_sent_str, msg.id);
438 Ok(serde_json::to_string(&res)?)
440 UserOperation::EditComment => {
441 let edit_comment: EditComment = serde_json::from_str(data)?;
442 let post_id = edit_comment.post_id;
443 let res = Oper::new(user_operation, edit_comment).perform()?;
444 let mut comment_sent = res.clone();
445 comment_sent.comment.my_vote = None;
446 comment_sent.comment.user_id = None;
447 let comment_sent_str = serde_json::to_string(&comment_sent)?;
448 chat.send_room_message(&post_id, &comment_sent_str, msg.id);
449 Ok(serde_json::to_string(&res)?)
451 UserOperation::SaveComment => {
452 let save_comment: SaveComment = serde_json::from_str(data)?;
453 let res = Oper::new(user_operation, save_comment).perform()?;
454 Ok(serde_json::to_string(&res)?)
456 UserOperation::CreateCommentLike => {
457 chat.check_rate_limit(msg.id)?;
458 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
459 let post_id = create_comment_like.post_id;
460 let res = Oper::new(user_operation, create_comment_like).perform()?;
461 let mut comment_sent = res.clone();
462 comment_sent.comment.my_vote = None;
463 comment_sent.comment.user_id = None;
464 let comment_sent_str = serde_json::to_string(&comment_sent)?;
465 chat.send_room_message(&post_id, &comment_sent_str, msg.id);
466 Ok(serde_json::to_string(&res)?)
468 UserOperation::GetModlog => {
469 let get_modlog: GetModlog = serde_json::from_str(data)?;
470 let res = Oper::new(user_operation, get_modlog).perform()?;
471 Ok(serde_json::to_string(&res)?)
473 UserOperation::CreateSite => {
474 let create_site: CreateSite = serde_json::from_str(data)?;
475 let res = Oper::new(user_operation, create_site).perform()?;
476 Ok(serde_json::to_string(&res)?)
478 UserOperation::EditSite => {
479 let edit_site: EditSite = serde_json::from_str(data)?;
480 let res = Oper::new(user_operation, edit_site).perform()?;
481 Ok(serde_json::to_string(&res)?)
483 UserOperation::GetSite => {
484 let get_site: GetSite = serde_json::from_str(data)?;
485 let res = Oper::new(user_operation, get_site).perform()?;
486 Ok(serde_json::to_string(&res)?)
488 UserOperation::Search => {
489 let search: Search = serde_json::from_str(data)?;
490 let res = Oper::new(user_operation, search).perform()?;
491 Ok(serde_json::to_string(&res)?)
493 UserOperation::TransferCommunity => {
494 let transfer_community: TransferCommunity = serde_json::from_str(data)?;
495 let res = Oper::new(user_operation, transfer_community).perform()?;
496 Ok(serde_json::to_string(&res)?)
498 UserOperation::TransferSite => {
499 let transfer_site: TransferSite = serde_json::from_str(data)?;
500 let res = Oper::new(user_operation, transfer_site).perform()?;
501 Ok(serde_json::to_string(&res)?)