1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
7 use diesel::PgConnection;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
22 use crate::apub::puller::*;
23 use crate::websocket::UserOperation;
26 type ConnectionId = usize;
28 type CommunityId = i32;
32 /// Chat server sends this messages to session
34 #[rtype(result = "()")]
35 pub struct WSMessage(pub String);
37 /// Message for chat server communications
39 /// New chat session is created
43 pub addr: Recipient<WSMessage>,
47 /// Session is disconnected
49 #[rtype(result = "()")]
50 pub struct Disconnect {
55 #[derive(Serialize, Deserialize, Message)]
57 pub struct StandardMessage {
58 /// Id of the client session
65 pub struct RateLimitBucket {
66 last_checked: SystemTime,
70 pub struct SessionInfo {
71 pub addr: Recipient<WSMessage>,
75 /// `ChatServer` manages chat rooms and responsible for coordinating chat
77 pub struct ChatServer {
78 /// A map from generated random ID to session addr
79 sessions: HashMap<ConnectionId, SessionInfo>,
81 /// A map from post_id to set of connectionIDs
82 post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
84 /// A map from community to set of connectionIDs
85 community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
87 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
88 /// sessions (IE clients)
89 user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
91 /// Rate limiting based on IP addr
92 rate_limits: HashMap<IPAddr, RateLimitBucket>,
95 db: Pool<ConnectionManager<PgConnection>>,
99 pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
101 sessions: HashMap::new(),
102 rate_limits: HashMap::new(),
103 post_rooms: HashMap::new(),
104 community_rooms: HashMap::new(),
105 user_rooms: HashMap::new(),
106 rng: rand::thread_rng(),
111 fn join_community_room(&mut self, community_id: CommunityId, id: ConnectionId) {
112 // remove session from all rooms
113 for sessions in self.community_rooms.values_mut() {
114 sessions.remove(&id);
117 // If the room doesn't exist yet
118 if self.community_rooms.get_mut(&community_id).is_none() {
119 self.community_rooms.insert(community_id, HashSet::new());
124 .get_mut(&community_id)
129 fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) {
130 // remove session from all rooms
131 for sessions in self.post_rooms.values_mut() {
132 sessions.remove(&id);
135 // If the room doesn't exist yet
136 if self.post_rooms.get_mut(&post_id).is_none() {
137 self.post_rooms.insert(post_id, HashSet::new());
140 self.post_rooms.get_mut(&post_id).unwrap().insert(id);
143 fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) {
144 // remove session from all rooms
145 for sessions in self.user_rooms.values_mut() {
146 sessions.remove(&id);
149 // If the room doesn't exist yet
150 if self.user_rooms.get_mut(&user_id).is_none() {
151 self.user_rooms.insert(user_id, HashSet::new());
154 self.user_rooms.get_mut(&user_id).unwrap().insert(id);
157 fn send_post_room_message(&self, post_id: PostId, message: &str, skip_id: ConnectionId) {
158 if let Some(sessions) = self.post_rooms.get(&post_id) {
161 if let Some(info) = self.sessions.get(id) {
162 let _ = info.addr.do_send(WSMessage(message.to_owned()));
169 fn send_community_room_message(
171 community_id: CommunityId,
173 skip_id: ConnectionId,
175 if let Some(sessions) = self.community_rooms.get(&community_id) {
178 if let Some(info) = self.sessions.get(id) {
179 let _ = info.addr.do_send(WSMessage(message.to_owned()));
186 fn send_user_room_message(&self, user_id: UserId, message: &str, skip_id: ConnectionId) {
187 if let Some(sessions) = self.user_rooms.get(&user_id) {
190 if let Some(info) = self.sessions.get(id) {
191 let _ = info.addr.do_send(WSMessage(message.to_owned()));
198 fn send_all_message(&self, message: &str, skip_id: ConnectionId) {
199 for id in self.sessions.keys() {
201 if let Some(info) = self.sessions.get(id) {
202 let _ = info.addr.do_send(WSMessage(message.to_owned()));
210 user_operation: UserOperation,
211 comment: CommentResponse,
213 ) -> Result<String, Error> {
214 let mut comment_reply_sent = comment.clone();
215 comment_reply_sent.comment.my_vote = None;
216 comment_reply_sent.comment.user_id = None;
218 // For the post room ones, and the directs back to the user
219 // strip out the recipient_ids, so that
220 // users don't get double notifs
221 let mut comment_user_sent = comment.clone();
222 comment_user_sent.recipient_ids = Vec::new();
224 let mut comment_post_sent = comment_reply_sent.clone();
225 comment_post_sent.recipient_ids = Vec::new();
227 let comment_reply_sent_str = to_json_string(&user_operation, &comment_reply_sent)?;
228 let comment_post_sent_str = to_json_string(&user_operation, &comment_post_sent)?;
229 let comment_user_sent_str = to_json_string(&user_operation, &comment_user_sent)?;
231 // Send it to the post room
232 self.send_post_room_message(comment.comment.post_id, &comment_post_sent_str, id);
234 // Send it to the recipient(s) including the mentioned users
235 for recipient_id in comment_reply_sent.recipient_ids {
236 self.send_user_room_message(recipient_id, &comment_reply_sent_str, id);
239 Ok(comment_user_sent_str)
244 user_operation: UserOperation,
247 ) -> Result<String, Error> {
248 let community_id = post.post.community_id;
250 // Don't send my data with it
251 let mut post_sent = post.clone();
252 post_sent.post.my_vote = None;
253 post_sent.post.user_id = None;
254 let post_sent_str = to_json_string(&user_operation, &post_sent)?;
256 // Send it to /c/all and that community
257 self.send_community_room_message(0, &post_sent_str, id);
258 self.send_community_room_message(community_id, &post_sent_str, id);
260 to_json_string(&user_operation, post)
263 fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
264 self.check_rate_limit_full(
266 Settings::get().rate_limit.register,
267 Settings::get().rate_limit.register_per_second,
271 fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
272 self.check_rate_limit_full(
274 Settings::get().rate_limit.post,
275 Settings::get().rate_limit.post_per_second,
279 fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
280 self.check_rate_limit_full(
282 Settings::get().rate_limit.message,
283 Settings::get().rate_limit.message_per_second,
287 #[allow(clippy::float_cmp)]
288 fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
289 if let Some(info) = self.sessions.get(&id) {
290 if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
292 if rate_limit.allowance == -2f64 {
293 rate_limit.allowance = rate as f64;
296 let current = SystemTime::now();
297 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
298 rate_limit.last_checked = current;
299 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
300 if rate_limit.allowance > rate as f64 {
301 rate_limit.allowance = rate as f64;
304 if rate_limit.allowance < 1.0 {
306 "Rate limited IP: {}, time_passed: {}, allowance: {}",
307 &info.ip, time_passed, rate_limit.allowance
311 message: format!("Too many requests. {} per {} seconds", rate, per),
316 rate_limit.allowance -= 1.0;
328 /// Make actor from `ChatServer`
329 impl Actor for ChatServer {
330 /// We are going to use simple Context, we just need ability to communicate
331 /// with other actors.
332 type Context = Context<Self>;
335 /// Handler for Connect message.
337 /// Register new session and assign unique id to this session
338 impl Handler<Connect> for ChatServer {
341 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
342 // register session with random id
343 let id = self.rng.gen::<usize>();
344 println!("{} joined", &msg.ip);
346 self.sessions.insert(
350 ip: msg.ip.to_owned(),
354 if self.rate_limits.get(&msg.ip).is_none() {
355 self.rate_limits.insert(
358 last_checked: SystemTime::now(),
368 /// Handler for Disconnect message.
369 impl Handler<Disconnect> for ChatServer {
372 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
373 // Remove connections from sessions and all 3 scopes
374 if self.sessions.remove(&msg.id).is_some() {
375 for sessions in self.user_rooms.values_mut() {
376 sessions.remove(&msg.id);
379 for sessions in self.post_rooms.values_mut() {
380 sessions.remove(&msg.id);
383 for sessions in self.community_rooms.values_mut() {
384 sessions.remove(&msg.id);
390 /// Handler for Message message.
391 impl Handler<StandardMessage> for ChatServer {
392 type Result = MessageResult<StandardMessage>;
394 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
395 let msg_out = match parse_json_message(self, msg) {
397 Err(e) => e.to_string(),
400 println!("Message Sent: {}", msg_out);
401 MessageResult(msg_out)
406 struct WebsocketResponse<T> {
411 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
415 let response = WebsocketResponse {
419 Ok(serde_json::to_string(&response)?)
422 fn do_user_operation<'a, Data, Response>(
425 conn: &PooledConnection<ConnectionManager<PgConnection>>,
426 ) -> Result<String, Error>
428 for<'de> Data: Deserialize<'de> + 'a,
430 Oper<Data>: Perform<Response>,
432 let parsed_data: Data = serde_json::from_str(data)?;
433 let res = Oper::new(parsed_data).perform(&conn)?;
434 to_json_string(&op, &res)
437 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
438 let json: Value = serde_json::from_str(&msg.msg)?;
439 let data = &json["data"].to_string();
440 let op = &json["op"].as_str().ok_or(APIError {
441 message: "Unknown op type".to_string(),
444 let conn = chat.db.get()?;
446 let user_operation: UserOperation = UserOperation::from_str(&op)?;
448 // TODO: none of the chat messages are going to work if stuff is submitted via http api,
449 // need to move that handling elsewhere
450 match user_operation {
451 UserOperation::Login => do_user_operation::<Login, LoginResponse>(user_operation, data, &conn),
452 UserOperation::Register => {
453 chat.check_rate_limit_register(msg.id)?;
454 do_user_operation::<Register, LoginResponse>(user_operation, data, &conn)
456 UserOperation::GetUserDetails => {
457 do_user_operation::<GetUserDetails, GetUserDetailsResponse>(user_operation, data, &conn)
459 UserOperation::SaveUserSettings => {
460 do_user_operation::<SaveUserSettings, LoginResponse>(user_operation, data, &conn)
462 UserOperation::AddAdmin => {
463 let add_admin: AddAdmin = serde_json::from_str(data)?;
464 let res = Oper::new(add_admin).perform(&conn)?;
465 let res_str = to_json_string(&user_operation, &res)?;
466 chat.send_all_message(&res_str, msg.id);
469 UserOperation::BanUser => {
470 let ban_user: BanUser = serde_json::from_str(data)?;
471 let res = Oper::new(ban_user).perform(&conn)?;
472 let res_str = to_json_string(&user_operation, &res)?;
473 chat.send_all_message(&res_str, msg.id);
476 UserOperation::GetReplies => {
477 do_user_operation::<GetReplies, GetRepliesResponse>(user_operation, data, &conn)
479 UserOperation::GetUserMentions => {
480 do_user_operation::<GetUserMentions, GetUserMentionsResponse>(user_operation, data, &conn)
482 UserOperation::EditUserMention => {
483 do_user_operation::<EditUserMention, UserMentionResponse>(user_operation, data, &conn)
485 UserOperation::MarkAllAsRead => {
486 do_user_operation::<MarkAllAsRead, GetRepliesResponse>(user_operation, data, &conn)
488 UserOperation::GetCommunity => {
489 let get_community: GetCommunity = serde_json::from_str(data)?;
491 let mut res = if Settings::get().federation_enabled {
492 if let Some(community_name) = get_community.name.to_owned() {
493 if community_name.contains('@') {
494 // TODO: need to support sort, filter etc for remote communities
495 get_remote_community(community_name)?
496 // TODO what is this about
497 // get_community.name = Some(name.replace("!", ""));
499 Oper::new(get_community).perform(&conn)?
502 Oper::new(get_community).perform(&conn)?
505 Oper::new(get_community).perform(&conn)?
508 let community_id = res.community.id;
510 chat.join_community_room(community_id, msg.id);
512 res.online = if let Some(community_users) = chat.community_rooms.get(&community_id) {
513 community_users.len()
518 to_json_string(&user_operation, &res)
520 UserOperation::ListCommunities => {
521 if Settings::get().federation_enabled {
522 let res = get_all_communities()?;
523 let val = ListCommunitiesResponse { communities: res };
524 to_json_string(&user_operation, &val)
526 do_user_operation::<ListCommunities, ListCommunitiesResponse>(user_operation, data, &conn)
529 UserOperation::CreateCommunity => {
530 chat.check_rate_limit_register(msg.id)?;
531 do_user_operation::<CreateCommunity, CommunityResponse>(user_operation, data, &conn)
533 UserOperation::EditCommunity => {
534 let edit_community: EditCommunity = serde_json::from_str(data)?;
535 let res = Oper::new(edit_community).perform(&conn)?;
536 let mut community_sent: CommunityResponse = res.clone();
537 community_sent.community.user_id = None;
538 community_sent.community.subscribed = None;
539 let community_sent_str = to_json_string(&user_operation, &community_sent)?;
540 chat.send_community_room_message(community_sent.community.id, &community_sent_str, msg.id);
541 to_json_string(&user_operation, &res)
543 UserOperation::FollowCommunity => {
544 do_user_operation::<FollowCommunity, CommunityResponse>(user_operation, data, &conn)
546 UserOperation::GetFollowedCommunities => do_user_operation::<
547 GetFollowedCommunities,
548 GetFollowedCommunitiesResponse,
549 >(user_operation, data, &conn),
550 UserOperation::BanFromCommunity => {
551 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
552 let community_id = ban_from_community.community_id;
553 let res = Oper::new(ban_from_community).perform(&conn)?;
554 let res_str = to_json_string(&user_operation, &res)?;
555 chat.send_community_room_message(community_id, &res_str, msg.id);
558 UserOperation::AddModToCommunity => {
559 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
560 let community_id = mod_add_to_community.community_id;
561 let res = Oper::new(mod_add_to_community).perform(&conn)?;
562 let res_str = to_json_string(&user_operation, &res)?;
563 chat.send_community_room_message(community_id, &res_str, msg.id);
566 UserOperation::ListCategories => {
567 do_user_operation::<ListCategories, ListCategoriesResponse>(user_operation, data, &conn)
569 UserOperation::GetPost => {
570 let get_post: GetPost = serde_json::from_str(data)?;
571 let post_id = get_post.id;
572 chat.join_post_room(post_id, msg.id);
573 let mut res = Oper::new(get_post).perform(&conn)?;
575 res.online = if let Some(post_users) = chat.post_rooms.get(&post_id) {
581 to_json_string(&user_operation, &res)
583 UserOperation::GetPosts => {
584 let get_posts: GetPosts = serde_json::from_str(data)?;
585 if get_posts.community_id.is_none() {
586 // 0 is the "all" community
587 chat.join_community_room(0, msg.id);
589 let res = Oper::new(get_posts).perform(&conn)?;
590 to_json_string(&user_operation, &res)
592 UserOperation::CreatePost => {
593 chat.check_rate_limit_post(msg.id)?;
594 let create_post: CreatePost = serde_json::from_str(data)?;
595 let res = Oper::new(create_post).perform(&conn)?;
597 chat.post_sends(UserOperation::CreatePost, res, msg.id)
599 UserOperation::CreatePostLike => {
600 chat.check_rate_limit_message(msg.id)?;
601 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
602 let res = Oper::new(create_post_like).perform(&conn)?;
604 chat.post_sends(UserOperation::CreatePostLike, res, msg.id)
606 UserOperation::EditPost => {
607 let edit_post: EditPost = serde_json::from_str(data)?;
608 let res = Oper::new(edit_post).perform(&conn)?;
610 chat.post_sends(UserOperation::EditPost, res, msg.id)
612 UserOperation::SavePost => {
613 do_user_operation::<SavePost, PostResponse>(user_operation, data, &conn)
615 UserOperation::CreateComment => {
616 chat.check_rate_limit_message(msg.id)?;
617 let create_comment: CreateComment = serde_json::from_str(data)?;
618 let res = Oper::new(create_comment).perform(&conn)?;
620 chat.comment_sends(UserOperation::CreateComment, res, msg.id)
622 UserOperation::EditComment => {
623 let edit_comment: EditComment = serde_json::from_str(data)?;
624 let res = Oper::new(edit_comment).perform(&conn)?;
626 chat.comment_sends(UserOperation::EditComment, res, msg.id)
628 UserOperation::SaveComment => {
629 do_user_operation::<SaveComment, CommentResponse>(user_operation, data, &conn)
631 UserOperation::CreateCommentLike => {
632 chat.check_rate_limit_message(msg.id)?;
633 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
634 let res = Oper::new(create_comment_like).perform(&conn)?;
636 chat.comment_sends(UserOperation::CreateCommentLike, res, msg.id)
638 UserOperation::GetModlog => {
639 do_user_operation::<GetModlog, GetModlogResponse>(user_operation, data, &conn)
641 UserOperation::CreateSite => {
642 do_user_operation::<CreateSite, SiteResponse>(user_operation, data, &conn)
644 UserOperation::EditSite => {
645 let edit_site: EditSite = serde_json::from_str(data)?;
646 let res = Oper::new(edit_site).perform(&conn)?;
647 let res_str = to_json_string(&user_operation, &res)?;
648 chat.send_all_message(&res_str, msg.id);
651 UserOperation::GetSite => {
652 let get_site: GetSite = serde_json::from_str(data)?;
653 let mut res = Oper::new(get_site).perform(&conn)?;
654 res.online = chat.sessions.len();
655 to_json_string(&user_operation, &res)
657 UserOperation::Search => {
658 do_user_operation::<Search, SearchResponse>(user_operation, data, &conn)
660 UserOperation::TransferCommunity => {
661 do_user_operation::<TransferCommunity, GetCommunityResponse>(user_operation, data, &conn)
663 UserOperation::TransferSite => {
664 do_user_operation::<TransferSite, GetSiteResponse>(user_operation, data, &conn)
666 UserOperation::DeleteAccount => {
667 do_user_operation::<DeleteAccount, LoginResponse>(user_operation, data, &conn)
669 UserOperation::PasswordReset => {
670 do_user_operation::<PasswordReset, PasswordResetResponse>(user_operation, data, &conn)
672 UserOperation::PasswordChange => {
673 do_user_operation::<PasswordChange, LoginResponse>(user_operation, data, &conn)
675 UserOperation::CreatePrivateMessage => {
676 chat.check_rate_limit_message(msg.id)?;
677 let create_private_message: CreatePrivateMessage = serde_json::from_str(data)?;
678 let recipient_id = create_private_message.recipient_id;
679 let res = Oper::new(create_private_message).perform(&conn)?;
680 let res_str = to_json_string(&user_operation, &res)?;
682 chat.send_user_room_message(recipient_id, &res_str, msg.id);
685 UserOperation::EditPrivateMessage => {
686 do_user_operation::<EditPrivateMessage, PrivateMessageResponse>(user_operation, data, &conn)
688 UserOperation::GetPrivateMessages => {
689 do_user_operation::<GetPrivateMessages, PrivateMessagesResponse>(user_operation, data, &conn)
691 UserOperation::UserJoin => {
692 let user_join: UserJoin = serde_json::from_str(data)?;
693 let res = Oper::new(user_join).perform(&conn)?;
694 chat.join_user_room(res.user_id, msg.id);
695 to_json_string(&user_operation, &res)