1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
6 use diesel::r2d2::{ConnectionManager, Pool};
7 use diesel::PgConnection;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
22 use crate::apub::puller::*;
25 /// Chat server sends this messages to session
27 #[rtype(result = "()")]
28 pub struct WSMessage(pub String);
30 /// Message for chat server communications
32 /// New chat session is created
36 pub addr: Recipient<WSMessage>,
40 /// Session is disconnected
42 #[rtype(result = "()")]
43 pub struct Disconnect {
48 // TODO this is unused rn
49 /// Send message to specific room
51 #[rtype(result = "()")]
52 pub struct ClientMessage {
53 /// Id of the client session
61 #[derive(Serialize, Deserialize, Message)]
63 pub struct StandardMessage {
64 /// Id of the client session
71 pub struct RateLimitBucket {
72 last_checked: SystemTime,
76 pub struct SessionInfo {
77 pub addr: Recipient<WSMessage>,
81 /// `ChatServer` manages chat rooms and responsible for coordinating chat
82 /// session. implementation is super primitive
83 pub struct ChatServer {
84 sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
85 rate_limits: HashMap<String, RateLimitBucket>,
86 rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
88 db: Pool<ConnectionManager<PgConnection>>,
91 // impl Default for ChatServer {
92 // fn default(nah: String) -> ChatServer {
94 // let rooms = HashMap::new();
97 // sessions: HashMap::new(),
98 // rate_limits: HashMap::new(),
100 // rng: rand::thread_rng(),
107 pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
109 let rooms = HashMap::new();
112 sessions: HashMap::new(),
113 rate_limits: HashMap::new(),
115 rng: rand::thread_rng(),
120 /// Send message to all users in the room
121 fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
122 if let Some(sessions) = self.rooms.get(&room) {
125 if let Some(info) = self.sessions.get(id) {
126 let _ = info.addr.do_send(WSMessage(message.to_owned()));
133 fn join_room(&mut self, room_id: i32, id: usize) {
134 // remove session from all rooms
135 for sessions in self.rooms.values_mut() {
136 sessions.remove(&id);
139 // If the room doesn't exist yet
140 if self.rooms.get_mut(&room_id).is_none() {
141 self.rooms.insert(room_id, HashSet::new());
144 self.rooms.get_mut(&room_id).unwrap().insert(id);
147 fn send_community_message(
152 ) -> Result<(), Error> {
153 use crate::db::post_view::*;
156 let conn = self.db.get()?;
158 let posts = PostQueryBuilder::create(&conn)
159 .listing_type(ListingType::Community)
160 .sort(&SortType::New)
161 .for_community_id(community_id)
166 self.send_room_message(post.id, message, skip_id);
172 fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
173 self.check_rate_limit_full(
175 Settings::get().rate_limit.register,
176 Settings::get().rate_limit.register_per_second,
180 fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
181 self.check_rate_limit_full(
183 Settings::get().rate_limit.post,
184 Settings::get().rate_limit.post_per_second,
188 fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
189 self.check_rate_limit_full(
191 Settings::get().rate_limit.message,
192 Settings::get().rate_limit.message_per_second,
196 #[allow(clippy::float_cmp)]
197 fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
198 if let Some(info) = self.sessions.get(&id) {
199 if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
201 if rate_limit.allowance == -2f64 {
202 rate_limit.allowance = rate as f64;
205 let current = SystemTime::now();
206 let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
207 rate_limit.last_checked = current;
208 rate_limit.allowance += time_passed * (rate as f64 / per as f64);
209 if rate_limit.allowance > rate as f64 {
210 rate_limit.allowance = rate as f64;
213 if rate_limit.allowance < 1.0 {
215 "Rate limited IP: {}, time_passed: {}, allowance: {}",
216 &info.ip, time_passed, rate_limit.allowance
220 op: "Rate Limit".to_string(),
221 message: format!("Too many requests. {} per {} seconds", rate, per),
226 rate_limit.allowance -= 1.0;
238 /// Make actor from `ChatServer`
239 impl Actor for ChatServer {
240 /// We are going to use simple Context, we just need ability to communicate
241 /// with other actors.
242 type Context = Context<Self>;
245 /// Handler for Connect message.
247 /// Register new session and assign unique id to this session
248 impl Handler<Connect> for ChatServer {
251 fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
252 // notify all users in same room
253 // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
255 // register session with random id
256 let id = self.rng.gen::<usize>();
257 println!("{} joined", &msg.ip);
259 self.sessions.insert(
263 ip: msg.ip.to_owned(),
267 if self.rate_limits.get(&msg.ip).is_none() {
268 self.rate_limits.insert(
271 last_checked: SystemTime::now(),
281 /// Handler for Disconnect message.
282 impl Handler<Disconnect> for ChatServer {
285 fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
286 // let mut rooms: Vec<i32> = Vec::new();
289 if self.sessions.remove(&msg.id).is_some() {
290 // remove session from all rooms
291 for sessions in self.rooms.values_mut() {
292 if sessions.remove(&msg.id) {
300 /// Handler for Message message.
301 impl Handler<StandardMessage> for ChatServer {
302 type Result = MessageResult<StandardMessage>;
304 fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
305 let msg_out = match parse_json_message(self, msg) {
307 Err(e) => e.to_string(),
310 MessageResult(msg_out)
314 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
315 let json: Value = serde_json::from_str(&msg.msg)?;
316 let data = &json["data"].to_string();
317 let op = &json["op"].as_str().ok_or(APIError {
318 op: "Unknown op type".to_string(),
319 message: "Unknown op type".to_string(),
322 let conn = chat.db.get()?;
324 let user_operation: UserOperation = UserOperation::from_str(&op)?;
326 match user_operation {
327 UserOperation::Login => {
328 let login: Login = serde_json::from_str(data)?;
329 let res = Oper::new(user_operation, login).perform(&conn)?;
330 Ok(serde_json::to_string(&res)?)
332 UserOperation::Register => {
333 let register: Register = serde_json::from_str(data)?;
334 let res = Oper::new(user_operation, register).perform(&conn);
336 chat.check_rate_limit_register(msg.id)?;
338 Ok(serde_json::to_string(&res?)?)
340 UserOperation::GetUserDetails => {
341 let get_user_details: GetUserDetails = serde_json::from_str(data)?;
342 let res = Oper::new(user_operation, get_user_details).perform(&conn)?;
343 Ok(serde_json::to_string(&res)?)
345 UserOperation::SaveUserSettings => {
346 let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
347 let res = Oper::new(user_operation, save_user_settings).perform(&conn)?;
348 Ok(serde_json::to_string(&res)?)
350 UserOperation::AddAdmin => {
351 let add_admin: AddAdmin = serde_json::from_str(data)?;
352 let res = Oper::new(user_operation, add_admin).perform(&conn)?;
353 Ok(serde_json::to_string(&res)?)
355 UserOperation::BanUser => {
356 let ban_user: BanUser = serde_json::from_str(data)?;
357 let res = Oper::new(user_operation, ban_user).perform(&conn)?;
358 Ok(serde_json::to_string(&res)?)
360 UserOperation::GetReplies => {
361 let get_replies: GetReplies = serde_json::from_str(data)?;
362 let res = Oper::new(user_operation, get_replies).perform(&conn)?;
363 Ok(serde_json::to_string(&res)?)
365 UserOperation::GetUserMentions => {
366 let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
367 let res = Oper::new(user_operation, get_user_mentions).perform(&conn)?;
368 Ok(serde_json::to_string(&res)?)
370 UserOperation::EditUserMention => {
371 let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
372 let res = Oper::new(user_operation, edit_user_mention).perform(&conn)?;
373 Ok(serde_json::to_string(&res)?)
375 UserOperation::MarkAllAsRead => {
376 let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
377 let res = Oper::new(user_operation, mark_all_as_read).perform(&conn)?;
378 Ok(serde_json::to_string(&res)?)
380 UserOperation::GetCommunity => {
381 let mut get_community: GetCommunity = serde_json::from_str(data)?;
382 if Settings::get().federation_enabled && get_community.name.is_some() {
383 let name = &get_community.name.unwrap();
384 let remote_community = if name.contains('@') {
385 // TODO: need to support sort, filter etc for remote communities
386 get_remote_community(name.to_owned())?
388 get_community.name = Some(name.replace("!", ""));
389 Oper::new(user_operation, get_community).perform(&conn)?
391 Ok(serde_json::to_string(&remote_community)?)
393 let res = Oper::new(user_operation, get_community).perform(&conn)?;
394 Ok(serde_json::to_string(&res)?)
397 UserOperation::ListCommunities => {
398 if Settings::get().federation_enabled {
399 let res = get_all_communities()?;
400 let val = ListCommunitiesResponse {
401 op: UserOperation::ListCommunities.to_string(),
404 Ok(serde_json::to_string(&val)?)
406 let list_communities: ListCommunities = serde_json::from_str(data)?;
407 let res = Oper::new(user_operation, list_communities).perform(&conn)?;
408 Ok(serde_json::to_string(&res)?)
411 UserOperation::CreateCommunity => {
412 chat.check_rate_limit_register(msg.id)?;
413 let create_community: CreateCommunity = serde_json::from_str(data)?;
414 let res = Oper::new(user_operation, create_community).perform(&conn)?;
415 Ok(serde_json::to_string(&res)?)
417 UserOperation::EditCommunity => {
418 let edit_community: EditCommunity = serde_json::from_str(data)?;
419 let res = Oper::new(user_operation, edit_community).perform(&conn)?;
420 let mut community_sent: CommunityResponse = res.clone();
421 community_sent.community.user_id = None;
422 community_sent.community.subscribed = None;
423 let community_sent_str = serde_json::to_string(&community_sent)?;
424 chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
425 Ok(serde_json::to_string(&res)?)
427 UserOperation::FollowCommunity => {
428 let follow_community: FollowCommunity = serde_json::from_str(data)?;
429 let res = Oper::new(user_operation, follow_community).perform(&conn)?;
430 Ok(serde_json::to_string(&res)?)
432 UserOperation::GetFollowedCommunities => {
433 let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
434 let res = Oper::new(user_operation, followed_communities).perform(&conn)?;
435 Ok(serde_json::to_string(&res)?)
437 UserOperation::BanFromCommunity => {
438 let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
439 let community_id = ban_from_community.community_id;
440 let res = Oper::new(user_operation, ban_from_community).perform(&conn)?;
441 let res_str = serde_json::to_string(&res)?;
442 chat.send_community_message(community_id, &res_str, msg.id)?;
445 UserOperation::AddModToCommunity => {
446 let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
447 let community_id = mod_add_to_community.community_id;
448 let res = Oper::new(user_operation, mod_add_to_community).perform(&conn)?;
449 let res_str = serde_json::to_string(&res)?;
450 chat.send_community_message(community_id, &res_str, msg.id)?;
453 UserOperation::ListCategories => {
454 let list_categories: ListCategories = ListCategories;
455 let res = Oper::new(user_operation, list_categories).perform(&conn)?;
456 Ok(serde_json::to_string(&res)?)
458 UserOperation::CreatePost => {
459 chat.check_rate_limit_post(msg.id)?;
460 let create_post: CreatePost = serde_json::from_str(data)?;
461 let res = Oper::new(user_operation, create_post).perform(&conn)?;
462 Ok(serde_json::to_string(&res)?)
464 UserOperation::GetPost => {
465 let get_post: GetPost = serde_json::from_str(data)?;
466 chat.join_room(get_post.id, msg.id);
467 let res = Oper::new(user_operation, get_post).perform(&conn)?;
468 Ok(serde_json::to_string(&res)?)
470 UserOperation::GetPosts => {
471 let get_posts: GetPosts = serde_json::from_str(data)?;
472 let res = Oper::new(user_operation, get_posts).perform(&conn)?;
473 Ok(serde_json::to_string(&res)?)
475 UserOperation::CreatePostLike => {
476 chat.check_rate_limit_message(msg.id)?;
477 let create_post_like: CreatePostLike = serde_json::from_str(data)?;
478 let res = Oper::new(user_operation, create_post_like).perform(&conn)?;
479 Ok(serde_json::to_string(&res)?)
481 UserOperation::EditPost => {
482 let edit_post: EditPost = serde_json::from_str(data)?;
483 let res = Oper::new(user_operation, edit_post).perform(&conn)?;
484 let mut post_sent = res.clone();
485 post_sent.post.my_vote = None;
486 let post_sent_str = serde_json::to_string(&post_sent)?;
487 chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
488 Ok(serde_json::to_string(&res)?)
490 UserOperation::SavePost => {
491 let save_post: SavePost = serde_json::from_str(data)?;
492 let res = Oper::new(user_operation, save_post).perform(&conn)?;
493 Ok(serde_json::to_string(&res)?)
495 UserOperation::CreateComment => {
496 chat.check_rate_limit_message(msg.id)?;
497 let create_comment: CreateComment = serde_json::from_str(data)?;
498 let post_id = create_comment.post_id;
499 let res = Oper::new(user_operation, create_comment).perform(&conn)?;
500 let mut comment_sent = res.clone();
501 comment_sent.comment.my_vote = None;
502 comment_sent.comment.user_id = None;
503 let comment_sent_str = serde_json::to_string(&comment_sent)?;
504 chat.send_room_message(post_id, &comment_sent_str, msg.id);
505 Ok(serde_json::to_string(&res)?)
507 UserOperation::EditComment => {
508 let edit_comment: EditComment = serde_json::from_str(data)?;
509 let post_id = edit_comment.post_id;
510 let res = Oper::new(user_operation, edit_comment).perform(&conn)?;
511 let mut comment_sent = res.clone();
512 comment_sent.comment.my_vote = None;
513 comment_sent.comment.user_id = None;
514 let comment_sent_str = serde_json::to_string(&comment_sent)?;
515 chat.send_room_message(post_id, &comment_sent_str, msg.id);
516 Ok(serde_json::to_string(&res)?)
518 UserOperation::SaveComment => {
519 let save_comment: SaveComment = serde_json::from_str(data)?;
520 let res = Oper::new(user_operation, save_comment).perform(&conn)?;
521 Ok(serde_json::to_string(&res)?)
523 UserOperation::CreateCommentLike => {
524 chat.check_rate_limit_message(msg.id)?;
525 let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
526 let post_id = create_comment_like.post_id;
527 let res = Oper::new(user_operation, create_comment_like).perform(&conn)?;
528 let mut comment_sent = res.clone();
529 comment_sent.comment.my_vote = None;
530 comment_sent.comment.user_id = None;
531 let comment_sent_str = serde_json::to_string(&comment_sent)?;
532 chat.send_room_message(post_id, &comment_sent_str, msg.id);
533 Ok(serde_json::to_string(&res)?)
535 UserOperation::GetModlog => {
536 let get_modlog: GetModlog = serde_json::from_str(data)?;
537 let res = Oper::new(user_operation, get_modlog).perform(&conn)?;
538 Ok(serde_json::to_string(&res)?)
540 UserOperation::CreateSite => {
541 let create_site: CreateSite = serde_json::from_str(data)?;
542 let res = Oper::new(user_operation, create_site).perform(&conn)?;
543 Ok(serde_json::to_string(&res)?)
545 UserOperation::EditSite => {
546 let edit_site: EditSite = serde_json::from_str(data)?;
547 let res = Oper::new(user_operation, edit_site).perform(&conn)?;
548 Ok(serde_json::to_string(&res)?)
550 UserOperation::GetSite => {
551 let online: usize = chat.sessions.len();
552 let get_site: GetSite = serde_json::from_str(data)?;
553 let mut res = Oper::new(user_operation, get_site).perform(&conn)?;
555 Ok(serde_json::to_string(&res)?)
557 UserOperation::Search => {
558 let search: Search = serde_json::from_str(data)?;
559 let res = Oper::new(user_operation, search).perform(&conn)?;
560 Ok(serde_json::to_string(&res)?)
562 UserOperation::TransferCommunity => {
563 let transfer_community: TransferCommunity = serde_json::from_str(data)?;
564 let res = Oper::new(user_operation, transfer_community).perform(&conn)?;
565 Ok(serde_json::to_string(&res)?)
567 UserOperation::TransferSite => {
568 let transfer_site: TransferSite = serde_json::from_str(data)?;
569 let res = Oper::new(user_operation, transfer_site).perform(&conn)?;
570 Ok(serde_json::to_string(&res)?)
572 UserOperation::DeleteAccount => {
573 let delete_account: DeleteAccount = serde_json::from_str(data)?;
574 let res = Oper::new(user_operation, delete_account).perform(&conn)?;
575 Ok(serde_json::to_string(&res)?)
577 UserOperation::PasswordReset => {
578 let password_reset: PasswordReset = serde_json::from_str(data)?;
579 let res = Oper::new(user_operation, password_reset).perform(&conn)?;
580 Ok(serde_json::to_string(&res)?)
582 UserOperation::PasswordChange => {
583 let password_change: PasswordChange = serde_json::from_str(data)?;
584 let res = Oper::new(user_operation, password_change).perform(&conn)?;
585 Ok(serde_json::to_string(&res)?)