1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
9 use log::{error, info, warn};
10 use rand::{rngs::ThreadRng, Rng};
11 use serde::{Deserialize, Serialize};
12 use serde_json::Value;
13 use std::collections::{HashMap, HashSet};
14 use std::str::FromStr;
15 use std::time::SystemTime;
16 use strum::IntoEnumIterator;
18 use crate::api::comment::*;
19 use crate::api::community::*;
20 use crate::api::post::*;
21 use crate::api::site::*;
22 use crate::api::user::*;
24 use crate::websocket::UserOperation;
27 type ConnectionId = usize;
29 type CommunityId = i32;
33 /// Chat server sends this messages to session
35 #[rtype(result = "()")]
36 pub struct WSMessage(pub String);
38 /// Message for chat server communications
40 /// New chat session is created
44 pub addr: Recipient<WSMessage>,
48 /// Session is disconnected
50 #[rtype(result = "()")]
51 pub struct Disconnect {
56 #[derive(Serialize, Deserialize, Message)]
58 pub struct StandardMessage {
59 /// Id of the client session
66 pub struct RateLimitBucket {
67 last_checked: SystemTime,
71 pub struct SessionInfo {
72 pub addr: Recipient<WSMessage>,
76 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
77 pub enum RateLimitType {
83 /// `ChatServer` manages chat rooms and responsible for coordinating chat
85 pub struct ChatServer {
86 /// A map from generated random ID to session addr
87 sessions: HashMap<ConnectionId, SessionInfo>,
89 /// A map from post_id to set of connectionIDs
90 post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
92 /// A map from community to set of connectionIDs
93 community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
95 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
96 /// sessions (IE clients)
97 user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
99 /// Rate limiting based on rate type and IP addr
100 rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
103 db: Pool<ConnectionManager<PgConnection>>,
107 pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
109 sessions: HashMap::new(),
110 rate_limit_buckets: HashMap::new(),
111 post_rooms: HashMap::new(),
112 community_rooms: HashMap::new(),
113 user_rooms: HashMap::new(),
114 rng: rand::thread_rng(),
119 fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
120 // remove session from all rooms
121 for sessions in self.community_rooms.values_mut() {
122 sessions.remove(&id);
125 // Also leave all post rooms
126 // This avoids double messages
127 for sessions in self.post_rooms.values_mut() {
128 sessions.remove(&id);
131 // If the room doesn't exist yet
132 if self.community_rooms.get_mut(&community_id).is_none() {
133 self.community_rooms.insert(community_id, HashSet::new());
138 .get_mut(&community_id)
143 fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
144 // remove session from all rooms
145 for sessions in self.post_rooms.values_mut() {
146 sessions.remove(&id);
149 // Also leave all communities
150 // This avoids double messages
151 for sessions in self.community_rooms.values_mut() {
152 sessions.remove(&id);
155 // If the room doesn't exist yet
156 if self.post_rooms.get_mut(&post_id).is_none() {
157 self.post_rooms.insert(post_id, HashSet::new());
160 self.post_rooms.get_mut(&post_id).unwrap().insert(id);
163 fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
164 // remove session from all rooms
165 for sessions in self.user_rooms.values_mut() {
166 sessions.remove(&id);
169 // If the room doesn't exist yet
170 if self.user_rooms.get_mut(&user_id).is_none() {
171 self.user_rooms.insert(user_id, HashSet::new());
174 self.user_rooms.get_mut(&user_id).unwrap().insert(id);
177 fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
178 if let Some(sessions) = self.post_rooms.get(&post_id) {
181 if let Some(info) = self.sessions.get(id) {
182 let _ = info.addr.do_send(WSMessage(message.to_owned()));
189 fn send_community_room_message(
191 community_id: CommunityId,
193 skip_id: ConnectionId,
195 if let Some(sessions) = self.community_rooms.get(&community_id) {
198 if let Some(info) = self.sessions.get(id) {
199 let _ = info.addr.do_send(WSMessage(message.to_owned()));
206 fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
207 if let Some(sessions) = self.user_rooms.get(&user_id) {
210 if let Some(info) = self.sessions.get(id) {
211 let _ = info.addr.do_send(WSMessage(message.to_owned()));
218 fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
219 for id in self.sessions.keys() {
221 if let Some(info) = self.sessions.get(id) {
222 let _ = info.addr.do_send(WSMessage(message.to_owned()));
230 user_operation: UserOperation,
231 comment: CommentResponse,
233 ) -> Result<String, Error> {
234 let mut comment_reply_sent = comment.clone();
235 comment_reply_sent.comment.my_vote = None;
236 comment_reply_sent.comment.user_id = None;
238 // For the post room ones, and the directs back to the user
239 // strip out the recipient_ids, so that
240 // users don't get double notifs
241 let mut comment_user_sent = comment.clone();
242 comment_user_sent.recipient_ids = Vec::new();
244 let mut comment_post_sent = comment_reply_sent.clone();
245 comment_post_sent.recipient_ids = Vec::new();
247 let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
248 let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
249 let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
251 // Send it to the post room
252 self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
254 // Send it to the recipient(s) including the mentioned users
255 for recipient_id in comment_reply_sent.recipient_ids {
256 self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
259 // Send it to the community too
260 self.send_community_room_message(0, &comment_post_sent_str, id);
261 self.send_community_room_message(comment.comment.community_id, &comment_post_sent_str, id);
263 Ok(comment_user_sent_str)
268 user_operation: UserOperation,
271 ) -> Result<String, Error> {
272 let community_id = post.post.community_id;
274 // Don't send my data with it
275 let mut post_sent = post.clone();
276 post_sent.post.my_vote = None;
277 post_sent.post.user_id = None;
278 let post_sent_str = to_json_string(&user_operation, &post_sent)?;
280 // Send it to /c/all and that community
281 self.send_community_room_message(0, &post_sent_str, id);
282 self.send_community_room_message(community_id, &post_sent_str, id);
284 // Send it to the post room
285 self.send_post_room_message(post_sent.post.id, &post_sent_str, id);
287 to_json_string(&user_operation, post)
290 fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
291 self.check_rate_limit_full(
292 RateLimitType::Register,
294 Settings::get().rate_limit.register,
295 Settings::get().rate_limit.register_per_second,
300 fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
301 self.check_rate_limit_full(
304 Settings::get().rate_limit.post,
305 Settings::get().rate_limit.post_per_second,
310 fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
311 self.check_rate_limit_full(
312 RateLimitType::Message,
314 Settings::get().rate_limit.message,
315 Settings::get().rate_limit.message_per_second,
320 #[allow(clippy::float_cmp)]
321 fn check_rate_limit_full(
323 type_: RateLimitType,
328 ) -> Result<(), Error> {
329 if let Some(info) = self.sessions.get(&id) {
330 if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
331 if let Some(rate_limit) = bucket.get_mut(&info.ip) {
332 let current = SystemTime::now();
333 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
336 if rate_limit.allowance == -2f64 {
337 rate_limit.allowance = rate as f64;
340 rate_limit.last_checked = current;
341 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
342 if !check_only && rate_limit.allowance > rate as f64 {
343 rate_limit.allowance = rate as f64;
346 if rate_limit.allowance < 1.0 {
348 "Rate limited IP: {}, time_passed: {}, allowance: {}",
349 &info.ip, time_passed, rate_limit.allowance
353 message: format!("Too many requests. {} per {} seconds", rate, per),
359 rate_limit.allowance -= 1.0;
375 /// Make actor from `ChatServer`
376 impl Actor for ChatServer {
377 /// We are going to use simple Context, we just need ability to communicate
378 /// with other actors.
379 type Context = Context<Self>;
382 /// Handler for Connect message.
384 /// Register new session and assign unique id to this session
385 impl Handler<Connect> for ChatServer {
388 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
389 // register session with random id
390 let id = self.rng.gen::<usize>();
391 info!("{} joined", &msg.ip);
393 self.sessions.insert(
397 ip: msg.ip.to_owned(),
401 for rate_limit_type in RateLimitType::iter() {
402 if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
405 .insert(rate_limit_type, HashMap::new());
408 if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
409 if bucket.get(&msg.ip).is_none() {
413 last_checked: SystemTime::now(),
425 /// Handler for Disconnect message.
426 impl Handler<Disconnect> for ChatServer {
429 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
430 // Remove connections from sessions and all 3 scopes
431 if self.sessions.remove(&msg.id).is_some() {
432 for sessions in self.user_rooms.values_mut() {
433 sessions.remove(&msg.id);
436 for sessions in self.post_rooms.values_mut() {
437 sessions.remove(&msg.id);
440 for sessions in self.community_rooms.values_mut() {
441 sessions.remove(&msg.id);
447 /// Handler for Message message.
448 impl Handler<StandardMessage> for ChatServer {
449 type Result = MessageResult<StandardMessage>;
451 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
452 match parse_json_message(self, msg) {
454 info!("Message Sent: {}", m);
458 error!("Error during message handling {}", e);
459 MessageResult(e.to_string())
466 struct WebsocketResponse<T> {
471 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
475 let response = WebsocketResponse {
479 Ok(serde_json::to_string(&response)?)
482 fn do_user_operation<'a, Data, Response>(
485 conn: &PooledConnection<ConnectionManager<PgConnection>>,
486 ) -> Result<String, Error>
488 for<'de> Data: Deserialize<'de> + 'a,
490 Oper<Data>: Perform<Response>,
492 let parsed_data: Data = serde_json::from_str(data)?;
493 let res = Oper::new(parsed_data).perform(&conn)?;
494 to_json_string(&op, &res)
497 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
498 let json: Value = serde_json::from_str(&msg.msg)?;
499 let data = &json["data"].to_string();
500 let op = &json["op"].as_str().ok_or(APIError {
501 message: "Unknown op type".to_string(),
504 let conn = chat.db.get()?;
506 let user_operation: UserOperation = UserOperation::from_str(&op)?;
508 // TODO: none of the chat messages are going to work if stuff is submitted via http api,
509 // need to move that handling elsewhere
512 chat.check_rate_limit_message(msg.id, false)?;
514 match user_operation {
515 UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
516 UserOperation::Register => {
517 chat.check_rate_limit_register(msg.id, true)?;
518 let register: Register = serde_json::from_str(data)?;
519 let res = Oper::new(register).perform(&conn)?;
520 chat.check_rate_limit_register(msg.id, false)?;
521 to_json_string(&user_operation, &res)
523 UserOperation::GetUserDetails => {
524 do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
526 UserOperation::SaveUserSettings => {
527 do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
529 UserOperation::AddAdmin => {
530 let add_admin: AddAdmin = serde_json::from_str(data)?;
531 let res = Oper::new(add_admin).perform(&conn)?;
532 let res_str = to_json_string(&user_operation, &res)?;
533 chat.send_all_message(&res_str, msg.id);
536 UserOperation::BanUser => {
537 let ban_user: BanUser = serde_json::from_str(data)?;
538 let res = Oper::new(ban_user).perform(&conn)?;
539 let res_str = to_json_string(&user_operation, &res)?;
540 chat.send_all_message(&res_str, msg.id);
543 UserOperation::GetReplies => {
544 do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
546 UserOperation::GetUserMentions => {
547 do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
549 UserOperation::EditUserMention => {
550 do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
552 UserOperation::MarkAllAsRead => {
553 do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
555 UserOperation::GetCommunity => {
556 let get_community: GetCommunity = serde_json::from_str(data)?;
557 let mut res = Oper::new(get_community).perform(&conn)?;
558 let community_id = res.community.id;
560 chat.join_community_room(community_id, msg.id);
562 res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
563 community_users.len()
568 to_json_string(&user_operation, &res)
570 UserOperation::ListCommunities => {
571 do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
573 UserOperation::CreateCommunity => {
574 chat.check_rate_limit_register(msg.id, true)?;
575 let create_community: CreateCommunity = serde_json::from_str(data)?;
576 let res = Oper::new(create_community).perform(&conn)?;
577 chat.check_rate_limit_register(msg.id, false)?;
578 to_json_string(&user_operation, &res)
580 UserOperation::EditCommunity => {
581 let edit_community: EditCommunity = serde_json::from_str(data)?;
582 let res = Oper::new(edit_community).perform(&conn)?;
583 let mut community_sent: CommunityResponse = res.clone();
584 community_sent.community.user_id = None;
585 community_sent.community.subscribed = None;
586 let community_sent_str = to_json_string(&user_operation, &community_sent)?;
587 chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
588 to_json_string(&user_operation, &res)
590 UserOperation::FollowCommunity => {
591 do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
593 UserOperation::GetFollowedCommunities => do_user_operation::<
594 GetFollowedCommunities,
595 GetFollowedCommunitiesResponse,
596 >(user_operation, data, &conn),
597 UserOperation::BanFromCommunity => {
598 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
599 let community_id = ban_from_community.community_id;
600 let res = Oper::new(ban_from_community).perform(&conn)?;
601 let res_str = to_json_string(&user_operation, &res)?;
602 chat.send_community_room_message(community_id, &res_str, msg.id);
605 UserOperation::AddModToCommunity => {
606 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
607 let community_id = mod_add_to_community.community_id;
608 let res = Oper::new(mod_add_to_community).perform(&conn)?;
609 let res_str = to_json_string(&user_operation, &res)?;
610 chat.send_community_room_message(community_id, &res_str, msg.id);
613 UserOperation::ListCategories => {
614 do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
616 UserOperation::GetPost => {
617 let get_post: GetPost = serde_json::from_str(data)?;
618 let post_id = get_post.id;
619 chat.join_post_room(post_id, msg.id);
620 let mut res = Oper::new(get_post).perform(&conn)?;
622 res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
628 to_json_string(&user_operation, &res)
630 UserOperation::GetPosts => {
631 let get_posts: GetPosts = serde_json::from_str(data)?;
632 if get_posts.community_id.is_none() {
633 // 0 is the "all" community
634 chat.join_community_room(0, msg.id);
636 let res = Oper::new(get_posts).perform(&conn)?;
637 to_json_string(&user_operation, &res)
639 UserOperation::GetComments => {
640 let get_comments: GetComments = serde_json::from_str(data)?;
641 if get_comments.community_id.is_none() {
642 // 0 is the "all" community
643 chat.join_community_room(0, msg.id);
645 let res = Oper::new(get_comments).perform(&conn)?;
646 to_json_string(&user_operation, &res)
648 UserOperation::CreatePost => {
649 chat.check_rate_limit_post(msg.id, true)?;
650 let create_post: CreatePost = serde_json::from_str(data)?;
651 let res = Oper::new(create_post).perform(&conn)?;
652 chat.check_rate_limit_post(msg.id, false)?;
654 chat.post_sends(UserOperation::CreatePost, res, msg.id)
656 UserOperation::CreatePostLike => {
657 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
658 let res = Oper::new(create_post_like).perform(&conn)?;
660 chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
662 UserOperation::EditPost => {
663 let edit_post: EditPost = serde_json::from_str(data)?;
664 let res = Oper::new(edit_post).perform(&conn)?;
666 chat.post_sends(UserOperation::EditPost, res, msg.id)
668 UserOperation::SavePost => {
669 do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
671 UserOperation::CreateComment => {
672 let create_comment: CreateComment = serde_json::from_str(data)?;
673 let res = Oper::new(create_comment).perform(&conn)?;
675 chat.comment_sends(UserOperation::CreateComment, res, msg.id)
677 UserOperation::EditComment => {
678 let edit_comment: EditComment = serde_json::from_str(data)?;
679 let res = Oper::new(edit_comment).perform(&conn)?;
681 chat.comment_sends(UserOperation::EditComment, res, msg.id)
683 UserOperation::SaveComment => {
684 do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
686 UserOperation::CreateCommentLike => {
687 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
688 let res = Oper::new(create_comment_like).perform(&conn)?;
690 chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
692 UserOperation::GetModlog => {
693 do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
695 UserOperation::CreateSite => {
696 do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
698 UserOperation::EditSite => {
699 let edit_site: EditSite = serde_json::from_str(data)?;
700 let res = Oper::new(edit_site).perform(&conn)?;
701 let res_str = to_json_string(&user_operation, &res)?;
702 chat.send_all_message(&res_str, msg.id);
705 UserOperation::GetSite => {
706 let get_site: GetSite = serde_json::from_str(data)?;
707 let mut res = Oper::new(get_site).perform(&conn)?;
708 res.online = chat.sessions.len();
709 to_json_string(&user_operation, &res)
711 UserOperation::GetSiteConfig => {
712 let get_site_config: GetSiteConfig = serde_json::from_str(data)?;
713 let res = Oper::new(get_site_config).perform(&conn)?;
714 to_json_string(&user_operation, &res)
716 UserOperation::SaveSiteConfig => {
717 let save_site_config: SaveSiteConfig = serde_json::from_str(data)?;
718 let res = Oper::new(save_site_config).perform(&conn)?;
719 to_json_string(&user_operation, &res)
721 UserOperation::Search => {
722 do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
724 UserOperation::TransferCommunity => {
725 do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
727 UserOperation::TransferSite => {
728 do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
730 UserOperation::DeleteAccount => {
731 do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
733 UserOperation::PasswordReset => {
734 do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
736 UserOperation::PasswordChange => {
737 do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
739 UserOperation::CreatePrivateMessage => {
740 let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
741 let recipient_id = create_private_message.recipient_id;
742 let res = Oper::new(create_private_message).perform(&conn)?;
743 let res_str = to_json_string(&user_operation, &res)?;
745 chat.send_user_room_message(recipient_id, &res_str, msg.id);
748 UserOperation::EditPrivateMessage => {
749 do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
751 UserOperation::GetPrivateMessages => {
752 do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
754 UserOperation::UserJoin => {
755 let user_join: UserJoin = serde_json::from_str(data)?;
756 let res = Oper::new(user_join).perform(&conn)?;
757 chat.join_user_room(res.user_id, msg.id);
758 to_json_string(&user_operation, &res)