2 websocket::handlers::{do_user_operation, to_json_string, Args},
6 use anyhow::Context as acontext;
7 use background_jobs::QueueHandle;
9 r2d2::{ConnectionManager, Pool},
12 use lemmy_rate_limit::RateLimit;
13 use lemmy_structs::{comment::*, community::*, post::*, site::*, user::*, websocket::*};
24 use rand::rngs::ThreadRng;
27 use serde_json::Value;
29 collections::{HashMap, HashSet},
33 /// `ChatServer` manages chat rooms and responsible for coordinating chat
35 pub struct ChatServer {
36 /// A map from generated random ID to session addr
37 pub sessions: HashMap<ConnectionId, SessionInfo>,
39 /// A map from post_id to set of connectionIDs
40 pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
42 /// A map from community to set of connectionIDs
43 pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
45 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
46 /// sessions (IE clients)
47 pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
49 pub(super) rng: ThreadRng,
52 pub(super) pool: Pool<ConnectionManager<PgConnection>>,
54 /// Rate limiting based on rate type and IP addr
55 pub(super) rate_limiter: RateLimit,
57 /// A list of the current captchas
58 pub(super) captchas: Vec<CaptchaItem>,
63 activity_queue: QueueHandle,
66 pub struct SessionInfo {
67 pub addr: Recipient<WSMessage>,
71 /// `ChatServer` is an actor. It maintains list of connection client session.
72 /// And manages available rooms. Peers send messages to other peers in same
73 /// room through `ChatServer`.
76 pool: Pool<ConnectionManager<PgConnection>>,
77 rate_limiter: RateLimit,
79 activity_queue: QueueHandle,
82 sessions: HashMap::new(),
83 post_rooms: HashMap::new(),
84 community_rooms: HashMap::new(),
85 user_rooms: HashMap::new(),
86 rng: rand::thread_rng(),
95 pub fn join_community_room(
97 community_id: CommunityId,
99 ) -> Result<(), LemmyError> {
100 // remove session from all rooms
101 for sessions in self.community_rooms.values_mut() {
102 sessions.remove(&id);
105 // Also leave all post rooms
106 // This avoids double messages
107 for sessions in self.post_rooms.values_mut() {
108 sessions.remove(&id);
111 // If the room doesn't exist yet
112 if self.community_rooms.get_mut(&community_id).is_none() {
113 self.community_rooms.insert(community_id, HashSet::new());
118 .get_mut(&community_id)
119 .context(location_info!())?
124 pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
125 // remove session from all rooms
126 for sessions in self.post_rooms.values_mut() {
127 sessions.remove(&id);
130 // Also leave all communities
131 // This avoids double messages
132 // TODO found a bug, whereby community messages like
133 // delete and remove aren't sent, because
134 // you left the community room
135 for sessions in self.community_rooms.values_mut() {
136 sessions.remove(&id);
139 // If the room doesn't exist yet
140 if self.post_rooms.get_mut(&post_id).is_none() {
141 self.post_rooms.insert(post_id, HashSet::new());
147 .context(location_info!())?
153 pub fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) -> Result<(), LemmyError> {
154 // remove session from all rooms
155 for sessions in self.user_rooms.values_mut() {
156 sessions.remove(&id);
159 // If the room doesn't exist yet
160 if self.user_rooms.get_mut(&user_id).is_none() {
161 self.user_rooms.insert(user_id, HashSet::new());
167 .context(location_info!())?
173 fn send_post_room_message<Response>(
178 websocket_id: Option<ConnectionId>,
179 ) -> Result<(), LemmyError>
183 let res_str = &to_json_string(op, response)?;
184 if let Some(sessions) = self.post_rooms.get(&post_id) {
186 if let Some(my_id) = websocket_id {
191 self.sendit(res_str, *id);
197 pub fn send_community_room_message<Response>(
201 community_id: CommunityId,
202 websocket_id: Option<ConnectionId>,
203 ) -> Result<(), LemmyError>
207 let res_str = &to_json_string(op, response)?;
208 if let Some(sessions) = self.community_rooms.get(&community_id) {
210 if let Some(my_id) = websocket_id {
215 self.sendit(res_str, *id);
221 pub fn send_all_message<Response>(
225 websocket_id: Option<ConnectionId>,
226 ) -> Result<(), LemmyError>
230 let res_str = &to_json_string(op, response)?;
231 for id in self.sessions.keys() {
232 if let Some(my_id) = websocket_id {
237 self.sendit(res_str, *id);
242 pub fn send_user_room_message<Response>(
246 recipient_id: UserId,
247 websocket_id: Option<ConnectionId>,
248 ) -> Result<(), LemmyError>
252 let res_str = &to_json_string(op, response)?;
253 if let Some(sessions) = self.user_rooms.get(&recipient_id) {
255 if let Some(my_id) = websocket_id {
260 self.sendit(res_str, *id);
268 user_operation: &UserOperation,
269 comment: &CommentResponse,
270 websocket_id: Option<ConnectionId>,
271 ) -> Result<(), LemmyError> {
272 let mut comment_reply_sent = comment.clone();
273 comment_reply_sent.comment.my_vote = None;
274 comment_reply_sent.comment.user_id = None;
276 let mut comment_post_sent = comment_reply_sent.clone();
277 comment_post_sent.recipient_ids = Vec::new();
279 // Send it to the post room
280 self.send_post_room_message(
283 comment_post_sent.comment.post_id,
287 // Send it to the recipient(s) including the mentioned users
288 for recipient_id in &comment_reply_sent.recipient_ids {
289 self.send_user_room_message(
297 // Send it to the community too
298 self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?;
299 self.send_community_room_message(
302 comment.comment.community_id,
311 user_operation: &UserOperation,
313 websocket_id: Option<ConnectionId>,
314 ) -> Result<(), LemmyError> {
315 let community_id = post.post.community_id;
317 // Don't send my data with it
318 let mut post_sent = post.clone();
319 post_sent.post.my_vote = None;
320 post_sent.post.user_id = None;
322 // Send it to /c/all and that community
323 self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?;
324 self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
326 // Send it to the post room
327 self.send_post_room_message(user_operation, &post_sent, post.post.id, websocket_id)?;
332 fn sendit(&self, message: &str, id: ConnectionId) {
333 if let Some(info) = self.sessions.get(&id) {
334 let _ = info.addr.do_send(WSMessage(message.to_owned()));
338 pub(super) fn parse_json_message(
340 msg: StandardMessage,
341 ctx: &mut Context<Self>,
342 ) -> impl Future<Output = Result<String, LemmyError>> {
343 let addr = ctx.address();
344 let pool = self.pool.clone();
345 let rate_limiter = self.rate_limiter.clone();
347 let ip: IPAddr = match self.sessions.get(&msg.id) {
348 Some(info) => info.ip.to_owned(),
349 None => "blank_ip".to_string(),
352 let client = self.client.clone();
353 let activity_queue = self.activity_queue.clone();
356 let json: Value = serde_json::from_str(&msg.msg)?;
357 let data = &json["data"].to_string();
358 let op = &json["op"].as_str().ok_or(APIError {
359 message: "Unknown op type".to_string(),
362 let user_operation: UserOperation = UserOperation::from_str(&op)?;
364 let context = LemmyContext {
375 op: user_operation.clone(),
379 match user_operation {
381 UserOperation::Login => do_user_operation::<Login>(args).await,
382 UserOperation::Register => do_user_operation::<Register>(args).await,
383 UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await,
384 UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
385 UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
386 UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
387 UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
388 UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
389 UserOperation::MarkUserMentionAsRead => {
390 do_user_operation::<MarkUserMentionAsRead>(args).await
392 UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
393 UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
394 UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
395 UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
396 UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
397 UserOperation::PostJoin => do_user_operation::<PostJoin>(args).await,
398 UserOperation::CommunityJoin => do_user_operation::<CommunityJoin>(args).await,
399 UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
401 // Private Message ops
402 UserOperation::CreatePrivateMessage => {
403 do_user_operation::<CreatePrivateMessage>(args).await
405 UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
406 UserOperation::DeletePrivateMessage => {
407 do_user_operation::<DeletePrivateMessage>(args).await
409 UserOperation::MarkPrivateMessageAsRead => {
410 do_user_operation::<MarkPrivateMessageAsRead>(args).await
412 UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
415 UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
416 UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
417 UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
418 UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
419 UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
420 UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
421 UserOperation::Search => do_user_operation::<Search>(args).await,
422 UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
423 UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
424 UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
427 UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
428 UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
429 UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
430 UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
431 UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
432 UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
433 UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
434 UserOperation::GetFollowedCommunities => {
435 do_user_operation::<GetFollowedCommunities>(args).await
437 UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
438 UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
441 UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
442 UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
443 UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
444 UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
445 UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
446 UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
447 UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
448 UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
449 UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
450 UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
453 UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
454 UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
455 UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
456 UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
457 UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
458 UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
459 UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
460 UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,