1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15 use strum::IntoEnumIterator;
17 use crate::api::comment::*;
18 use crate::api::community::*;
19 use crate::api::post::*;
20 use crate::api::site::*;
21 use crate::api::user::*;
23 use crate::websocket::UserOperation;
26 type ConnectionId = usize;
28 type CommunityId = i32;
32 /// Chat server sends this messages to session
34 #[rtype(result = "()")]
35 pub struct WSMessage(pub String);
37 /// Message for chat server communications
39 /// New chat session is created
43 pub addr: Recipient<WSMessage>,
47 /// Session is disconnected
49 #[rtype(result = "()")]
50 pub struct Disconnect {
55 #[derive(Serialize, Deserialize, Message)]
57 pub struct StandardMessage {
58 /// Id of the client session
65 pub struct RateLimitBucket {
66 last_checked: SystemTime,
70 pub struct SessionInfo {
71 pub addr: Recipient<WSMessage>,
75 #[derive(Eq, PartialEq, Hash, Debug, EnumIter, Copy, Clone)]
76 pub enum RateLimitType {
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
84 pub struct ChatServer {
85 /// A map from generated random ID to session addr
86 sessions: HashMap<ConnectionId, SessionInfo>,
88 /// A map from post_id to set of connectionIDs
89 post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
91 /// A map from community to set of connectionIDs
92 community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
94 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
95 /// sessions (IE clients)
96 user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
98 /// Rate limiting based on rate type and IP addr
99 rate_limit_buckets: HashMap<RateLimitType, HashMap<IPAddr, RateLimitBucket>>,
102 db: Pool<ConnectionManager<PgConnection>>,
106 pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
108 sessions: HashMap::new(),
109 rate_limit_buckets: HashMap::new(),
110 post_rooms: HashMap::new(),
111 community_rooms: HashMap::new(),
112 user_rooms: HashMap::new(),
113 rng: rand::thread_rng(),
118 fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
119 // remove session from all rooms
120 for sessions in self.community_rooms.values_mut() {
121 sessions.remove(&id);
124 // If the room doesn't exist yet
125 if self.community_rooms.get_mut(&community_id).is_none() {
126 self.community_rooms.insert(community_id, HashSet::new());
131 .get_mut(&community_id)
136 fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
137 // remove session from all rooms
138 for sessions in self.post_rooms.values_mut() {
139 sessions.remove(&id);
142 // If the room doesn't exist yet
143 if self.post_rooms.get_mut(&post_id).is_none() {
144 self.post_rooms.insert(post_id, HashSet::new());
147 self.post_rooms.get_mut(&post_id).unwrap().insert(id);
150 fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
151 // remove session from all rooms
152 for sessions in self.user_rooms.values_mut() {
153 sessions.remove(&id);
156 // If the room doesn't exist yet
157 if self.user_rooms.get_mut(&user_id).is_none() {
158 self.user_rooms.insert(user_id, HashSet::new());
161 self.user_rooms.get_mut(&user_id).unwrap().insert(id);
164 fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
165 if let Some(sessions) = self.post_rooms.get(&post_id) {
168 if let Some(info) = self.sessions.get(id) {
169 let _ = info.addr.do_send(WSMessage(message.to_owned()));
176 fn send_community_room_message(
178 community_id: CommunityId,
180 skip_id: ConnectionId,
182 if let Some(sessions) = self.community_rooms.get(&community_id) {
185 if let Some(info) = self.sessions.get(id) {
186 let _ = info.addr.do_send(WSMessage(message.to_owned()));
193 fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
194 if let Some(sessions) = self.user_rooms.get(&user_id) {
197 if let Some(info) = self.sessions.get(id) {
198 let _ = info.addr.do_send(WSMessage(message.to_owned()));
205 fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
206 for id in self.sessions.keys() {
208 if let Some(info) = self.sessions.get(id) {
209 let _ = info.addr.do_send(WSMessage(message.to_owned()));
217 user_operation: UserOperation,
218 comment: CommentResponse,
220 ) -> Result<String, Error> {
221 let mut comment_reply_sent = comment.clone();
222 comment_reply_sent.comment.my_vote = None;
223 comment_reply_sent.comment.user_id = None;
225 // For the post room ones, and the directs back to the user
226 // strip out the recipient_ids, so that
227 // users don't get double notifs
228 let mut comment_user_sent = comment.clone();
229 comment_user_sent.recipient_ids = Vec::new();
231 let mut comment_post_sent = comment_reply_sent.clone();
232 comment_post_sent.recipient_ids = Vec::new();
234 let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
235 let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
236 let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
238 // Send it to the post room
239 self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
241 // Send it to the recipient(s) including the mentioned users
242 for recipient_id in comment_reply_sent.recipient_ids {
243 self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
246 Ok(comment_user_sent_str)
251 user_operation: UserOperation,
254 ) -> Result<String, Error> {
255 let community_id = post.post.community_id;
257 // Don't send my data with it
258 let mut post_sent = post.clone();
259 post_sent.post.my_vote = None;
260 post_sent.post.user_id = None;
261 let post_sent_str = to_json_string(&user_operation, &post_sent)?;
263 // Send it to /c/all and that community
264 self.send_community_room_message(0, &post_sent_str, id);
265 self.send_community_room_message(community_id, &post_sent_str, id);
267 to_json_string(&user_operation, post)
270 fn check_rate_limit_register(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
271 self.check_rate_limit_full(
272 RateLimitType::Register,
274 Settings::get().rate_limit.register,
275 Settings::get().rate_limit.register_per_second,
280 fn check_rate_limit_post(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
281 self.check_rate_limit_full(
284 Settings::get().rate_limit.post,
285 Settings::get().rate_limit.post_per_second,
290 fn check_rate_limit_message(&mut self, id: usize, check_only: bool) -> Result<(), Error> {
291 self.check_rate_limit_full(
292 RateLimitType::Message,
294 Settings::get().rate_limit.message,
295 Settings::get().rate_limit.message_per_second,
300 #[allow(clippy::float_cmp)]
301 fn check_rate_limit_full(
303 type_: RateLimitType,
308 ) -> Result<(), Error> {
309 if let Some(info) = self.sessions.get(&id) {
310 if let Some(bucket) = self.rate_limit_buckets.get_mut(&type_) {
311 if let Some(rate_limit) = bucket.get_mut(&info.ip) {
312 let current = SystemTime::now();
313 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
316 if rate_limit.allowance == -2f64 {
317 rate_limit.allowance = rate as f64;
320 rate_limit.last_checked = current;
322 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
323 if rate_limit.allowance > rate as f64 {
324 rate_limit.allowance = rate as f64;
328 if rate_limit.allowance < 1.0 {
330 "Rate limited IP: {}, time_passed: {}, allowance: {}",
331 &info.ip, time_passed, rate_limit.allowance
335 message: format!("Too many requests. {} per {} seconds", rate, per),
341 rate_limit.allowance -= 1.0;
357 /// Make actor from `ChatServer`
358 impl Actor for ChatServer {
359 /// We are going to use simple Context, we just need ability to communicate
360 /// with other actors.
361 type Context = Context<Self>;
364 /// Handler for Connect message.
366 /// Register new session and assign unique id to this session
367 impl Handler<Connect> for ChatServer {
370 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
371 // register session with random id
372 let id = self.rng.gen::<usize>();
373 println!("{} joined", &msg.ip);
375 self.sessions.insert(
379 ip: msg.ip.to_owned(),
383 for rate_limit_type in RateLimitType::iter() {
384 if self.rate_limit_buckets.get(&rate_limit_type).is_none() {
387 .insert(rate_limit_type, HashMap::new());
390 if let Some(bucket) = self.rate_limit_buckets.get_mut(&rate_limit_type) {
391 if bucket.get(&msg.ip).is_none() {
395 last_checked: SystemTime::now(),
407 /// Handler for Disconnect message.
408 impl Handler<Disconnect> for ChatServer {
411 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
412 // Remove connections from sessions and all 3 scopes
413 if self.sessions.remove(&msg.id).is_some() {
414 for sessions in self.user_rooms.values_mut() {
415 sessions.remove(&msg.id);
418 for sessions in self.post_rooms.values_mut() {
419 sessions.remove(&msg.id);
422 for sessions in self.community_rooms.values_mut() {
423 sessions.remove(&msg.id);
429 /// Handler for Message message.
430 impl Handler<StandardMessage> for ChatServer {
431 type Result = MessageResult<StandardMessage>;
433 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
434 let msg_out = match parse_json_message(self, msg) {
436 Err(e) => e.to_string(),
439 println!("Message Sent: {}", msg_out);
440 MessageResult(msg_out)
445 struct WebsocketResponse<T> {
450 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
454 let response = WebsocketResponse {
458 Ok(serde_json::to_string(&response)?)
461 fn do_user_operation<'a, Data, Response>(
464 conn: &PooledConnection<ConnectionManager<PgConnection>>,
465 ) -> Result<String, Error>
467 for<'de> Data: Deserialize<'de> + 'a,
469 Oper<Data>: Perform<Response>,
471 let parsed_data: Data = serde_json::from_str(data)?;
472 let res = Oper::new(parsed_data).perform(&conn)?;
473 to_json_string(&op, &res)
476 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
477 let json: Value = serde_json::from_str(&msg.msg)?;
478 let data = &json["data"].to_string();
479 let op = &json["op"].as_str().ok_or(APIError {
480 message: "Unknown op type".to_string(),
483 let conn = chat.db.get()?;
485 let user_operation: UserOperation = UserOperation::from_str(&op)?;
487 // TODO: none of the chat messages are going to work if stuff is submitted via http api,
488 // need to move that handling elsewhere
491 chat.check_rate_limit_message(msg.id, false)?;
493 match user_operation {
494 UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
495 UserOperation::Register => {
496 chat.check_rate_limit_register(msg.id, true)?;
497 let register: Register = serde_json::from_str(data)?;
498 let res = Oper::new(register).perform(&conn)?;
499 chat.check_rate_limit_register(msg.id, false)?;
500 to_json_string(&user_operation, &res)
502 UserOperation::GetUserDetails => {
503 do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
505 UserOperation::SaveUserSettings => {
506 do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
508 UserOperation::AddAdmin => {
509 let add_admin: AddAdmin = serde_json::from_str(data)?;
510 let res = Oper::new(add_admin).perform(&conn)?;
511 let res_str = to_json_string(&user_operation, &res)?;
512 chat.send_all_message(&res_str, msg.id);
515 UserOperation::BanUser => {
516 let ban_user: BanUser = serde_json::from_str(data)?;
517 let res = Oper::new(ban_user).perform(&conn)?;
518 let res_str = to_json_string(&user_operation, &res)?;
519 chat.send_all_message(&res_str, msg.id);
522 UserOperation::GetReplies => {
523 do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
525 UserOperation::GetUserMentions => {
526 do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
528 UserOperation::EditUserMention => {
529 do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
531 UserOperation::MarkAllAsRead => {
532 do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
534 UserOperation::GetCommunity => {
535 let get_community: GetCommunity = serde_json::from_str(data)?;
536 let mut res = Oper::new(get_community).perform(&conn)?;
537 let community_id = res.community.id;
539 chat.join_community_room(community_id, msg.id);
541 res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
542 community_users.len()
547 to_json_string(&user_operation, &res)
549 UserOperation::ListCommunities => {
550 do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
552 UserOperation::CreateCommunity => {
553 chat.check_rate_limit_register(msg.id, true)?;
554 let create_community: CreateCommunity = serde_json::from_str(data)?;
555 let res = Oper::new(create_community).perform(&conn)?;
556 chat.check_rate_limit_register(msg.id, false)?;
557 to_json_string(&user_operation, &res)
559 UserOperation::EditCommunity => {
560 let edit_community: EditCommunity = serde_json::from_str(data)?;
561 let res = Oper::new(edit_community).perform(&conn)?;
562 let mut community_sent: CommunityResponse = res.clone();
563 community_sent.community.user_id = None;
564 community_sent.community.subscribed = None;
565 let community_sent_str = to_json_string(&user_operation, &community_sent)?;
566 chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
567 to_json_string(&user_operation, &res)
569 UserOperation::FollowCommunity => {
570 do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
572 UserOperation::GetFollowedCommunities => do_user_operation::<
573 GetFollowedCommunities,
574 GetFollowedCommunitiesResponse,
575 >(user_operation, data, &conn),
576 UserOperation::BanFromCommunity => {
577 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
578 let community_id = ban_from_community.community_id;
579 let res = Oper::new(ban_from_community).perform(&conn)?;
580 let res_str = to_json_string(&user_operation, &res)?;
581 chat.send_community_room_message(community_id, &res_str, msg.id);
584 UserOperation::AddModToCommunity => {
585 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
586 let community_id = mod_add_to_community.community_id;
587 let res = Oper::new(mod_add_to_community).perform(&conn)?;
588 let res_str = to_json_string(&user_operation, &res)?;
589 chat.send_community_room_message(community_id, &res_str, msg.id);
592 UserOperation::ListCategories => {
593 do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
595 UserOperation::GetPost => {
596 let get_post: GetPost = serde_json::from_str(data)?;
597 let post_id = get_post.id;
598 chat.join_post_room(post_id, msg.id);
599 let mut res = Oper::new(get_post).perform(&conn)?;
601 res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
607 to_json_string(&user_operation, &res)
609 UserOperation::GetPosts => {
610 let get_posts: GetPosts = serde_json::from_str(data)?;
611 if get_posts.community_id.is_none() {
612 // 0 is the "all" community
613 chat.join_community_room(0, msg.id);
615 let res = Oper::new(get_posts).perform(&conn)?;
616 to_json_string(&user_operation, &res)
618 UserOperation::CreatePost => {
619 chat.check_rate_limit_post(msg.id, true)?;
620 let create_post: CreatePost = serde_json::from_str(data)?;
621 let res = Oper::new(create_post).perform(&conn)?;
622 chat.check_rate_limit_post(msg.id, false)?;
624 chat.post_sends(UserOperation::CreatePost, res, msg.id)
626 UserOperation::CreatePostLike => {
627 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
628 let res = Oper::new(create_post_like).perform(&conn)?;
630 chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
632 UserOperation::EditPost => {
633 let edit_post: EditPost = serde_json::from_str(data)?;
634 let res = Oper::new(edit_post).perform(&conn)?;
636 chat.post_sends(UserOperation::EditPost, res, msg.id)
638 UserOperation::SavePost => {
639 do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
641 UserOperation::CreateComment => {
642 let create_comment: CreateComment = serde_json::from_str(data)?;
643 let res = Oper::new(create_comment).perform(&conn)?;
645 chat.comment_sends(UserOperation::CreateComment, res, msg.id)
647 UserOperation::EditComment => {
648 let edit_comment: EditComment = serde_json::from_str(data)?;
649 let res = Oper::new(edit_comment).perform(&conn)?;
651 chat.comment_sends(UserOperation::EditComment, res, msg.id)
653 UserOperation::SaveComment => {
654 do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
656 UserOperation::CreateCommentLike => {
657 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
658 let res = Oper::new(create_comment_like).perform(&conn)?;
660 chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
662 UserOperation::GetModlog => {
663 do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
665 UserOperation::CreateSite => {
666 do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
668 UserOperation::EditSite => {
669 let edit_site: EditSite = serde_json::from_str(data)?;
670 let res = Oper::new(edit_site).perform(&conn)?;
671 let res_str = to_json_string(&user_operation, &res)?;
672 chat.send_all_message(&res_str, msg.id);
675 UserOperation::GetSite => {
676 let get_site: GetSite = serde_json::from_str(data)?;
677 let mut res = Oper::new(get_site).perform(&conn)?;
678 res.online = chat.sessions.len();
679 to_json_string(&user_operation, &res)
681 UserOperation::Search => {
682 do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
684 UserOperation::TransferCommunity => {
685 do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
687 UserOperation::TransferSite => {
688 do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
690 UserOperation::DeleteAccount => {
691 do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
693 UserOperation::PasswordReset => {
694 do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
696 UserOperation::PasswordChange => {
697 do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
699 UserOperation::CreatePrivateMessage => {
700 let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
701 let recipient_id = create_private_message.recipient_id;
702 let res = Oper::new(create_private_message).perform(&conn)?;
703 let res_str = to_json_string(&user_operation, &res)?;
705 chat.send_user_room_message(recipient_id, &res_str, msg.id);
708 UserOperation::EditPrivateMessage => {
709 do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
711 UserOperation::GetPrivateMessages => {
712 do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
714 UserOperation::UserJoin => {
715 let user_join: UserJoin = serde_json::from_str(data)?;
716 let res = Oper::new(user_join).perform(&conn)?;
717 chat.join_user_room(res.user_id, msg.id);
718 to_json_string(&user_operation, &res)