1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
9 use log::{error, info, warn};
10 use rand::{rngs::ThreadRng, Rng};
11 use serde::{Deserialize, Serialize};
12 use serde_json::Value;
13 use std::collections::{HashMap, HashSet};
14 use std::str::FromStr;
15 use std::time::SystemTime;
16 use strum::IntoEnumIterator;
18 use crate::api::comment::*;
19 use crate::api::community::*;
20 use crate::api::post::*;
21 use crate::api::site::*;
22 use crate::api::user::*;
24 use crate::websocket::UserOperation;
27 type ConnectionId = usize;
29 type CommunityId = i32;
33 /// Chat server sends this messages to session
35 #[rtype(result = "()")]
36 pub struct WSMessage(pub String);
38 /// Message for chat server communications
40 /// New chat session is created
44 pub addr: Recipient<WSMessage>,
48 /// Session is disconnected
50 #[rtype(result = "()")]
51 pub struct Disconnect {
56 #[derive(Serialize, Deserialize, Message)]
58 pub struct StandardMessage {
59 /// Id of the client session
66 pub struct RateLimitBucket {
67 last_checked: SystemTime,
71 pub struct SessionInfo {
72 pub addr: Recipient<WSMessage>,
76 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
77 pub enum RateLimitType {
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
85 pub struct ChatServer {
86 /// A map from generated random ID to session addr
87 sessions: HashMap<ConnectionId, SessionInfo>,
89 /// A map from post_id to set of connectionIDs
90 post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
92 /// A map from community to set of connectionIDs
93 community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
95 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
96 /// sessions (IE clients)
97 user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
99 /// Rate limiting based on rate type and IP addr
100 rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
103 db: Pool<ConnectionManager<PgConnection>>,
107 pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
109 sessions: HashMap::new(),
110 rate_limit_buckets: HashMap::new(),
111 post_rooms: HashMap::new(),
112 community_rooms: HashMap::new(),
113 user_rooms: HashMap::new(),
114 rng: rand::thread_rng(),
119 fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
120 // remove session from all rooms
121 for sessions in self.community_rooms.values_mut() {
122 sessions.remove(&id);
125 // Also leave all post rooms
126 // This avoids double messages
127 for sessions in self.post_rooms.values_mut() {
128 sessions.remove(&id);
131 // If the room doesn't exist yet
132 if self.community_rooms.get_mut(&community_id).is_none() {
133 self.community_rooms.insert(community_id, HashSet::new());
138 .get_mut(&community_id)
143 fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
144 // remove session from all rooms
145 for sessions in self.post_rooms.values_mut() {
146 sessions.remove(&id);
149 // Also leave all communities
150 // This avoids double messages
151 for sessions in self.community_rooms.values_mut() {
152 sessions.remove(&id);
155 // If the room doesn't exist yet
156 if self.post_rooms.get_mut(&post_id).is_none() {
157 self.post_rooms.insert(post_id, HashSet::new());
160 self.post_rooms.get_mut(&post_id).unwrap().insert(id);
163 fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
164 // remove session from all rooms
165 for sessions in self.user_rooms.values_mut() {
166 sessions.remove(&id);
169 // If the room doesn't exist yet
170 if self.user_rooms.get_mut(&user_id).is_none() {
171 self.user_rooms.insert(user_id, HashSet::new());
174 self.user_rooms.get_mut(&user_id).unwrap().insert(id);
177 fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
178 if let Some(sessions) = self.post_rooms.get(&post_id) {
181 if let Some(info) = self.sessions.get(id) {
182 let _ = info.addr.do_send(WSMessage(message.to_owned()));
189 fn send_community_room_message(
191 community_id: CommunityId,
193 skip_id: ConnectionId,
195 if let Some(sessions) = self.community_rooms.get(&community_id) {
198 if let Some(info) = self.sessions.get(id) {
199 let _ = info.addr.do_send(WSMessage(message.to_owned()));
206 fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
207 if let Some(sessions) = self.user_rooms.get(&user_id) {
210 if let Some(info) = self.sessions.get(id) {
211 let _ = info.addr.do_send(WSMessage(message.to_owned()));
218 fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
219 for id in self.sessions.keys() {
221 if let Some(info) = self.sessions.get(id) {
222 let _ = info.addr.do_send(WSMessage(message.to_owned()));
230 user_operation: UserOperation,
231 comment: CommentResponse,
233 ) -> Result<String, Error> {
234 let mut comment_reply_sent = comment.clone();
235 comment_reply_sent.comment.my_vote = None;
236 comment_reply_sent.comment.user_id = None;
238 // For the post room ones, and the directs back to the user
239 // strip out the recipient_ids, so that
240 // users don't get double notifs
241 let mut comment_user_sent = comment.clone();
242 comment_user_sent.recipient_ids = Vec::new();
244 let mut comment_post_sent = comment_reply_sent.clone();
245 comment_post_sent.recipient_ids = Vec::new();
247 let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
248 let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
249 let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
251 // Send it to the post room
252 self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
254 // Send it to the recipient(s) including the mentioned users
255 for recipient_id in comment_reply_sent.recipient_ids {
256 self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
259 // Send it to the community too
260 self.send_community_room_message(0, &comment_post_sent_str, id);
261 self.send_community_room_message(comment.comment.community_id, &comment_post_sent_str, id);
263 Ok(comment_user_sent_str)
268 user_operation: UserOperation,
271 ) -> Result<String, Error> {
272 let community_id = post.post.community_id;
274 // Don't send my data with it
275 let mut post_sent = post.clone();
276 post_sent.post.my_vote = None;
277 post_sent.post.user_id = None;
278 let post_sent_str = to_json_string(&user_operation, &post_sent)?;
280 // Send it to /c/all and that community
281 self.send_community_room_message(0, &post_sent_str, id);
282 self.send_community_room_message(community_id, &post_sent_str, id);
284 // Send it to the post room
285 self.send_post_room_message(post_sent.post.id, &post_sent_str, id);
287 to_json_string(&user_operation, post)
290 fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
291 self.check_rate_limit_full(
292 RateLimitType::Register,
294 Settings::get().rate_limit.register,
295 Settings::get().rate_limit.register_per_second,
300 fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
301 self.check_rate_limit_full(
304 Settings::get().rate_limit.post,
305 Settings::get().rate_limit.post_per_second,
310 fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
311 self.check_rate_limit_full(
312 RateLimitType::Message,
314 Settings::get().rate_limit.message,
315 Settings::get().rate_limit.message_per_second,
320 #[allow(clippy::float_cmp)]
321 fn check_rate_limit_full(
323 type_: RateLimitType,
328 ) -> Result<(), Error> {
329 if let Some(info) = self.sessions.get(&id) {
330 if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
331 if let Some(rate_limit) = bucket.get_mut(&info.ip) {
332 let current = SystemTime::now();
333 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
336 if rate_limit.allowance == -2f64 {
337 rate_limit.allowance = rate as f64;
340 rate_limit.last_checked = current;
341 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
342 if !check_only && rate_limit.allowance > rate as f64 {
343 rate_limit.allowance = rate as f64;
346 if rate_limit.allowance < 1.0 {
348 "Rate limited IP: {}, time_passed: {}, allowance: {}",
349 &info.ip, time_passed, rate_limit.allowance
353 message: format!("Too many requests. {} per {} seconds", rate, per),
359 rate_limit.allowance -= 1.0;
375 /// Make actor from `ChatServer`
376 impl Actor for ChatServer {
377 /// We are going to use simple Context, we just need ability to communicate
378 /// with other actors.
379 type Context = Context<Self>;
382 /// Handler for Connect message.
384 /// Register new session and assign unique id to this session
385 impl Handler<Connect> for ChatServer {
388 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
389 // register session with random id
390 let id = self.rng.gen::<usize>();
391 info!("{} joined", &msg.ip);
393 self.sessions.insert(
397 ip: msg.ip.to_owned(),
401 for rate_limit_type in RateLimitType::iter() {
402 if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
405 .insert(rate_limit_type, HashMap::new());
408 if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
409 if bucket.get(&msg.ip).is_none() {
413 last_checked: SystemTime::now(),
425 /// Handler for Disconnect message.
426 impl Handler<Disconnect> for ChatServer {
429 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
430 // Remove connections from sessions and all 3 scopes
431 if self.sessions.remove(&msg.id).is_some() {
432 for sessions in self.user_rooms.values_mut() {
433 sessions.remove(&msg.id);
436 for sessions in self.post_rooms.values_mut() {
437 sessions.remove(&msg.id);
440 for sessions in self.community_rooms.values_mut() {
441 sessions.remove(&msg.id);
447 /// Handler for Message message.
448 impl Handler<StandardMessage> for ChatServer {
449 type Result = MessageResult<StandardMessage>;
451 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
452 match parse_json_message(self, msg) {
454 info!("Message Sent: {}", m);
458 error!("Error during message handling {}", e);
459 MessageResult(e.to_string())
466 struct WebsocketResponse<T> {
471 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
475 let response = WebsocketResponse {
479 Ok(serde_json::to_string(&response)?)
482 fn do_user_operation<'a, Data, Response>(
485 conn: &PooledConnection<ConnectionManager<PgConnection>>,
486 ) -> Result<String, Error>
488 for<'de> Data: Deserialize<'de> + 'a,
490 Oper<Data>: Perform<Response>,
492 let parsed_data: Data = serde_json::from_str(data)?;
493 let res = Oper::new(parsed_data).perform(&conn)?;
494 to_json_string(&op, &res)
497 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
498 let json: Value = serde_json::from_str(&msg.msg)?;
499 let data = &json["data"].to_string();
500 let op = &json["op"].as_str().ok_or(APIError {
501 message: "Unknown op type".to_string(),
504 let conn = chat.db.get()?;
506 let user_operation: UserOperation = UserOperation::from_str(&op)?;
509 chat.check_rate_limit_message(msg.id, false)?;
511 match user_operation {
512 UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
513 UserOperation::Register => {
514 chat.check_rate_limit_register(msg.id, true)?;
515 let register: Register = serde_json::from_str(data)?;
516 let res = Oper::new(register).perform(&conn)?;
517 chat.check_rate_limit_register(msg.id, false)?;
518 to_json_string(&user_operation, &res)
520 UserOperation::GetUserDetails => {
521 do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
523 UserOperation::SaveUserSettings => {
524 do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
526 UserOperation::AddAdmin => {
527 let add_admin: AddAdmin = serde_json::from_str(data)?;
528 let res = Oper::new(add_admin).perform(&conn)?;
529 let res_str = to_json_string(&user_operation, &res)?;
530 chat.send_all_message(&res_str, msg.id);
533 UserOperation::BanUser => {
534 let ban_user: BanUser = serde_json::from_str(data)?;
535 let res = Oper::new(ban_user).perform(&conn)?;
536 let res_str = to_json_string(&user_operation, &res)?;
537 chat.send_all_message(&res_str, msg.id);
540 UserOperation::GetReplies => {
541 do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
543 UserOperation::GetUserMentions => {
544 do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
546 UserOperation::EditUserMention => {
547 do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
549 UserOperation::MarkAllAsRead => {
550 do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
552 UserOperation::GetCommunity => {
553 let get_community: GetCommunity = serde_json::from_str(data)?;
555 let mut res = Oper::new(get_community).perform(&conn)?;
557 let community_id = res.community.id;
559 chat.join_community_room(community_id, msg.id);
561 res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
562 community_users.len()
567 to_json_string(&user_operation, &res)
569 UserOperation::ListCommunities => {
570 do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
572 UserOperation::CreateCommunity => {
573 chat.check_rate_limit_register(msg.id, true)?;
574 let create_community: CreateCommunity = serde_json::from_str(data)?;
575 let res = Oper::new(create_community).perform(&conn)?;
576 chat.check_rate_limit_register(msg.id, false)?;
577 to_json_string(&user_operation, &res)
579 UserOperation::EditCommunity => {
580 let edit_community: EditCommunity = serde_json::from_str(data)?;
581 let res = Oper::new(edit_community).perform(&conn)?;
582 let mut community_sent: CommunityResponse = res.clone();
583 community_sent.community.user_id = None;
584 community_sent.community.subscribed = None;
585 let community_sent_str = to_json_string(&user_operation, &community_sent)?;
586 chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
587 to_json_string(&user_operation, &res)
589 UserOperation::FollowCommunity => {
590 do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
592 UserOperation::GetFollowedCommunities => do_user_operation::<
593 GetFollowedCommunities,
594 GetFollowedCommunitiesResponse,
595 >(user_operation, data, &conn),
596 UserOperation::BanFromCommunity => {
597 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
598 let community_id = ban_from_community.community_id;
599 let res = Oper::new(ban_from_community).perform(&conn)?;
600 let res_str = to_json_string(&user_operation, &res)?;
601 chat.send_community_room_message(community_id, &res_str, msg.id);
604 UserOperation::AddModToCommunity => {
605 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
606 let community_id = mod_add_to_community.community_id;
607 let res = Oper::new(mod_add_to_community).perform(&conn)?;
608 let res_str = to_json_string(&user_operation, &res)?;
609 chat.send_community_room_message(community_id, &res_str, msg.id);
612 UserOperation::ListCategories => {
613 do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
615 UserOperation::GetPost => {
616 let get_post: GetPost = serde_json::from_str(data)?;
617 let post_id = get_post.id;
618 chat.join_post_room(post_id, msg.id);
619 let mut res = Oper::new(get_post).perform(&conn)?;
621 res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
627 to_json_string(&user_operation, &res)
629 UserOperation::GetPosts => {
630 let get_posts: GetPosts = serde_json::from_str(data)?;
632 if get_posts.community_id.is_none() {
633 // 0 is the "all" community
634 chat.join_community_room(0, msg.id);
636 let res = Oper::new(get_posts).perform(&conn)?;
637 to_json_string(&user_operation, &res)
639 UserOperation::GetComments => {
640 let get_comments: GetComments = serde_json::from_str(data)?;
641 if get_comments.community_id.is_none() {
642 // 0 is the "all" community
643 chat.join_community_room(0, msg.id);
645 let res = Oper::new(get_comments).perform(&conn)?;
646 to_json_string(&user_operation, &res)
648 UserOperation::CreatePost => {
649 chat.check_rate_limit_post(msg.id, true)?;
650 let create_post: CreatePost = serde_json::from_str(data)?;
651 let res = Oper::new(create_post).perform(&conn)?;
652 chat.check_rate_limit_post(msg.id, false)?;
654 chat.post_sends(UserOperation::CreatePost, res, msg.id)
656 UserOperation::CreatePostLike => {
657 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
658 let res = Oper::new(create_post_like).perform(&conn)?;
660 chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
662 UserOperation::EditPost => {
663 let edit_post: EditPost = serde_json::from_str(data)?;
664 let res = Oper::new(edit_post).perform(&conn)?;
666 chat.post_sends(UserOperation::EditPost, res, msg.id)
668 UserOperation::SavePost => {
669 do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
671 UserOperation::CreateComment => {
672 let create_comment: CreateComment = serde_json::from_str(data)?;
673 let res = Oper::new(create_comment).perform(&conn)?;
675 chat.comment_sends(UserOperation::CreateComment, res, msg.id)
677 UserOperation::EditComment => {
678 let edit_comment: EditComment = serde_json::from_str(data)?;
679 let res = Oper::new(edit_comment).perform(&conn)?;
681 chat.comment_sends(UserOperation::EditComment, res, msg.id)
683 UserOperation::SaveComment => {
684 do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
686 UserOperation::CreateCommentLike => {
687 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
688 let res = Oper::new(create_comment_like).perform(&conn)?;
690 chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
692 UserOperation::GetModlog => {
693 do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
695 UserOperation::CreateSite => {
696 do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
698 UserOperation::EditSite => {
699 let edit_site: EditSite = serde_json::from_str(data)?;
700 let res = Oper::new(edit_site).perform(&conn)?;
701 let res_str = to_json_string(&user_operation, &res)?;
702 chat.send_all_message(&res_str, msg.id);
705 UserOperation::GetSite => {
706 let get_site: GetSite = serde_json::from_str(data)?;
707 let mut res = Oper::new(get_site).perform(&conn)?;
708 res.online = chat.sessions.len();
709 to_json_string(&user_operation, &res)
711 UserOperation::GetSiteConfig => {
712 let get_site_config: GetSiteConfig = serde_json::from_str(data)?;
713 let res = Oper::new(get_site_config).perform(&conn)?;
714 to_json_string(&user_operation, &res)
716 UserOperation::SaveSiteConfig => {
717 let save_site_config: SaveSiteConfig = serde_json::from_str(data)?;
718 let res = Oper::new(save_site_config).perform(&conn)?;
719 to_json_string(&user_operation, &res)
721 UserOperation::Search => {
722 do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
724 UserOperation::TransferCommunity => {
725 do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
727 UserOperation::TransferSite => {
728 do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
730 UserOperation::DeleteAccount => {
731 do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
733 UserOperation::PasswordReset => {
734 do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
736 UserOperation::PasswordChange => {
737 do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
739 UserOperation::CreatePrivateMessage => {
740 let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
741 let recipient_id = create_private_message.recipient_id;
742 let res = Oper::new(create_private_message).perform(&conn)?;
743 let res_str = to_json_string(&user_operation, &res)?;
745 chat.send_user_room_message(recipient_id, &res_str, msg.id);
748 UserOperation::EditPrivateMessage => {
749 do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
751 UserOperation::GetPrivateMessages => {
752 do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
754 UserOperation::UserJoin => {
755 let user_join: UserJoin = serde_json::from_str(data)?;
756 let res = Oper::new(user_join).perform(&conn)?;
757 chat.join_user_room(res.user_id, msg.id);
758 to_json_string(&user_operation, &res)