3 handlers::{do_user_operation, to_json_string, Args},
10 use anyhow::Context as acontext;
11 use background_jobs::QueueHandle;
13 r2d2::{ConnectionManager, Pool},
16 use lemmy_api_structs::{comment::*, community::*, post::*, site::*, user::*};
17 use lemmy_rate_limit::RateLimit;
28 use rand::rngs::ThreadRng;
31 use serde_json::Value;
33 collections::{HashMap, HashSet},
37 /// `ChatServer` manages chat rooms and responsible for coordinating chat
39 pub struct ChatServer {
40 /// A map from generated random ID to session addr
41 pub sessions: HashMap<ConnectionId, SessionInfo>,
43 /// A map from post_id to set of connectionIDs
44 pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
46 /// A map from community to set of connectionIDs
47 pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
49 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
50 /// sessions (IE clients)
51 pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
53 pub(super) rng: ThreadRng,
56 pub(super) pool: Pool<ConnectionManager<PgConnection>>,
58 /// Rate limiting based on rate type and IP addr
59 pub(super) rate_limiter: RateLimit,
61 /// A list of the current captchas
62 pub(super) captchas: Vec<CaptchaItem>,
67 activity_queue: QueueHandle,
70 pub struct SessionInfo {
71 pub addr: Recipient<WSMessage>,
75 /// `ChatServer` is an actor. It maintains list of connection client session.
76 /// And manages available rooms. Peers send messages to other peers in same
77 /// room through `ChatServer`.
80 pool: Pool<ConnectionManager<PgConnection>>,
81 rate_limiter: RateLimit,
83 activity_queue: QueueHandle,
86 sessions: HashMap::new(),
87 post_rooms: HashMap::new(),
88 community_rooms: HashMap::new(),
89 user_rooms: HashMap::new(),
90 rng: rand::thread_rng(),
99 pub fn join_community_room(
101 community_id: CommunityId,
103 ) -> Result<(), LemmyError> {
104 // remove session from all rooms
105 for sessions in self.community_rooms.values_mut() {
106 sessions.remove(&id);
109 // Also leave all post rooms
110 // This avoids double messages
111 for sessions in self.post_rooms.values_mut() {
112 sessions.remove(&id);
115 // If the room doesn't exist yet
116 if self.community_rooms.get_mut(&community_id).is_none() {
117 self.community_rooms.insert(community_id, HashSet::new());
122 .get_mut(&community_id)
123 .context(location_info!())?
128 pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
129 // remove session from all rooms
130 for sessions in self.post_rooms.values_mut() {
131 sessions.remove(&id);
134 // Also leave all communities
135 // This avoids double messages
136 // TODO found a bug, whereby community messages like
137 // delete and remove aren't sent, because
138 // you left the community room
139 for sessions in self.community_rooms.values_mut() {
140 sessions.remove(&id);
143 // If the room doesn't exist yet
144 if self.post_rooms.get_mut(&post_id).is_none() {
145 self.post_rooms.insert(post_id, HashSet::new());
151 .context(location_info!())?
157 pub fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) -> Result<(), LemmyError> {
158 // remove session from all rooms
159 for sessions in self.user_rooms.values_mut() {
160 sessions.remove(&id);
163 // If the room doesn't exist yet
164 if self.user_rooms.get_mut(&user_id).is_none() {
165 self.user_rooms.insert(user_id, HashSet::new());
171 .context(location_info!())?
177 fn send_post_room_message<Response>(
182 websocket_id: Option<ConnectionId>,
183 ) -> Result<(), LemmyError>
187 let res_str = &to_json_string(op, response)?;
188 if let Some(sessions) = self.post_rooms.get(&post_id) {
190 if let Some(my_id) = websocket_id {
195 self.sendit(res_str, *id);
201 pub fn send_community_room_message<Response>(
205 community_id: CommunityId,
206 websocket_id: Option<ConnectionId>,
207 ) -> Result<(), LemmyError>
211 let res_str = &to_json_string(op, response)?;
212 if let Some(sessions) = self.community_rooms.get(&community_id) {
214 if let Some(my_id) = websocket_id {
219 self.sendit(res_str, *id);
225 pub fn send_all_message<Response>(
229 websocket_id: Option<ConnectionId>,
230 ) -> Result<(), LemmyError>
234 let res_str = &to_json_string(op, response)?;
235 for id in self.sessions.keys() {
236 if let Some(my_id) = websocket_id {
241 self.sendit(res_str, *id);
246 pub fn send_user_room_message<Response>(
250 recipient_id: UserId,
251 websocket_id: Option<ConnectionId>,
252 ) -> Result<(), LemmyError>
256 let res_str = &to_json_string(op, response)?;
257 if let Some(sessions) = self.user_rooms.get(&recipient_id) {
259 if let Some(my_id) = websocket_id {
264 self.sendit(res_str, *id);
272 user_operation: &UserOperation,
273 comment: &CommentResponse,
274 websocket_id: Option<ConnectionId>,
275 ) -> Result<(), LemmyError> {
276 let mut comment_reply_sent = comment.clone();
277 comment_reply_sent.comment.my_vote = None;
278 comment_reply_sent.comment.user_id = None;
280 let mut comment_post_sent = comment_reply_sent.clone();
281 comment_post_sent.recipient_ids = Vec::new();
283 // Send it to the post room
284 self.send_post_room_message(
287 comment_post_sent.comment.post_id,
291 // Send it to the recipient(s) including the mentioned users
292 for recipient_id in &comment_reply_sent.recipient_ids {
293 self.send_user_room_message(
301 // Send it to the community too
302 self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?;
303 self.send_community_room_message(
306 comment.comment.community_id,
315 user_operation: &UserOperation,
317 websocket_id: Option<ConnectionId>,
318 ) -> Result<(), LemmyError> {
319 let community_id = post.post.community_id;
321 // Don't send my data with it
322 let mut post_sent = post.clone();
323 post_sent.post.my_vote = None;
324 post_sent.post.user_id = None;
326 // Send it to /c/all and that community
327 self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?;
328 self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
330 // Send it to the post room
331 self.send_post_room_message(user_operation, &post_sent, post.post.id, websocket_id)?;
336 fn sendit(&self, message: &str, id: ConnectionId) {
337 if let Some(info) = self.sessions.get(&id) {
338 let _ = info.addr.do_send(WSMessage(message.to_owned()));
342 pub(super) fn parse_json_message(
344 msg: StandardMessage,
345 ctx: &mut Context<Self>,
346 ) -> impl Future<Output = Result<String, LemmyError>> {
347 let addr = ctx.address();
348 let pool = self.pool.clone();
349 let rate_limiter = self.rate_limiter.clone();
351 let ip: IPAddr = match self.sessions.get(&msg.id) {
352 Some(info) => info.ip.to_owned(),
353 None => "blank_ip".to_string(),
356 let client = self.client.clone();
357 let activity_queue = self.activity_queue.clone();
360 let json: Value = serde_json::from_str(&msg.msg)?;
361 let data = &json["data"].to_string();
362 let op = &json["op"].as_str().ok_or(APIError {
363 message: "Unknown op type".to_string(),
366 let user_operation: UserOperation = UserOperation::from_str(&op)?;
368 let context = LemmyContext {
379 op: user_operation.clone(),
383 match user_operation {
385 UserOperation::Login => do_user_operation::<Login>(args).await,
386 UserOperation::Register => do_user_operation::<Register>(args).await,
387 UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await,
388 UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
389 UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
390 UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
391 UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
392 UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
393 UserOperation::MarkUserMentionAsRead => {
394 do_user_operation::<MarkUserMentionAsRead>(args).await
396 UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
397 UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
398 UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
399 UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
400 UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
401 UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
403 // Private Message ops
404 UserOperation::CreatePrivateMessage => {
405 do_user_operation::<CreatePrivateMessage>(args).await
407 UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
408 UserOperation::DeletePrivateMessage => {
409 do_user_operation::<DeletePrivateMessage>(args).await
411 UserOperation::MarkPrivateMessageAsRead => {
412 do_user_operation::<MarkPrivateMessageAsRead>(args).await
414 UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
417 UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
418 UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
419 UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
420 UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
421 UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
422 UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
423 UserOperation::Search => do_user_operation::<Search>(args).await,
424 UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
425 UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
426 UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
429 UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
430 UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
431 UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
432 UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
433 UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
434 UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
435 UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
436 UserOperation::GetFollowedCommunities => {
437 do_user_operation::<GetFollowedCommunities>(args).await
439 UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
440 UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
443 UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
444 UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
445 UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
446 UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
447 UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
448 UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
449 UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
450 UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
451 UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
452 UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
455 UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
456 UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
457 UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
458 UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
459 UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
460 UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
461 UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
462 UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,