3 serialize_websocket_message,
10 use anyhow::Context as acontext;
11 use background_jobs::QueueHandle;
13 r2d2::{ConnectionManager, Pool},
16 use lemmy_api_common::{comment::*, post::*};
17 use lemmy_db_schema::{
18 newtypes::{CommunityId, LocalUserId, PostId},
19 source::secret::Secret,
23 rate_limit::RateLimit,
24 settings::structs::Settings,
29 use rand::rngs::ThreadRng;
30 use reqwest_middleware::ClientWithMiddleware;
32 use serde_json::Value;
34 collections::{HashMap, HashSet},
38 use tokio::macros::support::Pin;
40 type MessageHandlerType = fn(
41 context: LemmyContext,
45 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
47 type MessageHandlerCrudType = fn(
48 context: LemmyContext,
50 op: UserOperationCrud,
52 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
54 /// `ChatServer` manages chat rooms and responsible for coordinating chat
56 pub struct ChatServer {
57 /// A map from generated random ID to session addr
58 pub sessions: HashMap<ConnectionId, SessionInfo>,
60 /// A map from post_id to set of connectionIDs
61 pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
63 /// A map from community to set of connectionIDs
64 pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
66 pub mod_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
68 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
69 /// sessions (IE clients)
70 pub(super) user_rooms: HashMap<LocalUserId, HashSet<ConnectionId>>,
72 pub(super) rng: ThreadRng,
75 pub(super) pool: Pool<ConnectionManager<PgConnection>>,
78 pub(super) settings: Settings,
81 pub(super) secret: Secret,
83 /// Rate limiting based on rate type and IP addr
84 pub(super) rate_limiter: RateLimit,
86 /// A list of the current captchas
87 pub(super) captchas: Vec<CaptchaItem>,
89 message_handler: MessageHandlerType,
90 message_handler_crud: MessageHandlerCrudType,
93 client: ClientWithMiddleware,
95 activity_queue: QueueHandle,
98 pub struct SessionInfo {
99 pub addr: Recipient<WsMessage>,
103 /// `ChatServer` is an actor. It maintains list of connection client session.
104 /// And manages available rooms. Peers send messages to other peers in same
105 /// room through `ChatServer`.
107 #![allow(clippy::too_many_arguments)]
109 pool: Pool<ConnectionManager<PgConnection>>,
110 rate_limiter: RateLimit,
111 message_handler: MessageHandlerType,
112 message_handler_crud: MessageHandlerCrudType,
113 client: ClientWithMiddleware,
114 activity_queue: QueueHandle,
119 sessions: HashMap::new(),
120 post_rooms: HashMap::new(),
121 community_rooms: HashMap::new(),
122 mod_rooms: HashMap::new(),
123 user_rooms: HashMap::new(),
124 rng: rand::thread_rng(),
127 captchas: Vec::new(),
129 message_handler_crud,
137 pub fn join_community_room(
139 community_id: CommunityId,
141 ) -> Result<(), LemmyError> {
142 // remove session from all rooms
143 for sessions in self.community_rooms.values_mut() {
144 sessions.remove(&id);
147 // Also leave all post rooms
148 // This avoids double messages
149 for sessions in self.post_rooms.values_mut() {
150 sessions.remove(&id);
153 // If the room doesn't exist yet
154 if self.community_rooms.get_mut(&community_id).is_none() {
155 self.community_rooms.insert(community_id, HashSet::new());
160 .get_mut(&community_id)
161 .context(location_info!())?
166 pub fn join_mod_room(
168 community_id: CommunityId,
170 ) -> Result<(), LemmyError> {
171 // remove session from all rooms
172 for sessions in self.mod_rooms.values_mut() {
173 sessions.remove(&id);
176 // If the room doesn't exist yet
177 if self.mod_rooms.get_mut(&community_id).is_none() {
178 self.mod_rooms.insert(community_id, HashSet::new());
183 .get_mut(&community_id)
184 .context(location_info!())?
189 pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
190 // remove session from all rooms
191 for sessions in self.post_rooms.values_mut() {
192 sessions.remove(&id);
195 // Also leave all communities
196 // This avoids double messages
197 // TODO found a bug, whereby community messages like
198 // delete and remove aren't sent, because
199 // you left the community room
200 for sessions in self.community_rooms.values_mut() {
201 sessions.remove(&id);
204 // If the room doesn't exist yet
205 if self.post_rooms.get_mut(&post_id).is_none() {
206 self.post_rooms.insert(post_id, HashSet::new());
212 .context(location_info!())?
218 pub fn join_user_room(
220 user_id: LocalUserId,
222 ) -> Result<(), LemmyError> {
223 // remove session from all rooms
224 for sessions in self.user_rooms.values_mut() {
225 sessions.remove(&id);
228 // If the room doesn't exist yet
229 if self.user_rooms.get_mut(&user_id).is_none() {
230 self.user_rooms.insert(user_id, HashSet::new());
236 .context(location_info!())?
242 fn send_post_room_message<OP, Response>(
247 websocket_id: Option<ConnectionId>,
248 ) -> Result<(), LemmyError>
250 OP: OperationType + ToString,
253 let res_str = &serialize_websocket_message(op, response)?;
254 if let Some(sessions) = self.post_rooms.get(&post_id) {
256 if let Some(my_id) = websocket_id {
261 self.sendit(res_str, *id);
267 pub fn send_community_room_message<OP, Response>(
271 community_id: CommunityId,
272 websocket_id: Option<ConnectionId>,
273 ) -> Result<(), LemmyError>
275 OP: OperationType + ToString,
278 let res_str = &serialize_websocket_message(op, response)?;
279 if let Some(sessions) = self.community_rooms.get(&community_id) {
281 if let Some(my_id) = websocket_id {
286 self.sendit(res_str, *id);
292 pub fn send_mod_room_message<OP, Response>(
296 community_id: CommunityId,
297 websocket_id: Option<ConnectionId>,
298 ) -> Result<(), LemmyError>
300 OP: OperationType + ToString,
303 let res_str = &serialize_websocket_message(op, response)?;
304 if let Some(sessions) = self.mod_rooms.get(&community_id) {
306 if let Some(my_id) = websocket_id {
311 self.sendit(res_str, *id);
317 pub fn send_all_message<OP, Response>(
321 websocket_id: Option<ConnectionId>,
322 ) -> Result<(), LemmyError>
324 OP: OperationType + ToString,
327 let res_str = &serialize_websocket_message(op, response)?;
328 for id in self.sessions.keys() {
329 if let Some(my_id) = websocket_id {
334 self.sendit(res_str, *id);
339 pub fn send_user_room_message<OP, Response>(
343 recipient_id: LocalUserId,
344 websocket_id: Option<ConnectionId>,
345 ) -> Result<(), LemmyError>
347 OP: OperationType + ToString,
350 let res_str = &serialize_websocket_message(op, response)?;
351 if let Some(sessions) = self.user_rooms.get(&recipient_id) {
353 if let Some(my_id) = websocket_id {
358 self.sendit(res_str, *id);
364 pub fn send_comment<OP>(
367 comment: &CommentResponse,
368 websocket_id: Option<ConnectionId>,
369 ) -> Result<(), LemmyError>
371 OP: OperationType + ToString,
373 let mut comment_reply_sent = comment.clone();
375 // Strip out my specific user info
376 comment_reply_sent.comment_view.my_vote = None;
378 // Send it to the post room
379 let mut comment_post_sent = comment_reply_sent.clone();
380 // Remove the recipients here to separate mentions / user messages from post or community comments
381 comment_post_sent.recipient_ids = Vec::new();
382 self.send_post_room_message(
385 comment_post_sent.comment_view.post.id,
389 // Send it to the community too
390 self.send_community_room_message(
396 self.send_community_room_message(
399 comment.comment_view.community.id,
403 // Send it to the recipient(s) including the mentioned users
404 for recipient_id in &comment_reply_sent.recipient_ids {
405 self.send_user_room_message(
416 pub fn send_post<OP>(
419 post_res: &PostResponse,
420 websocket_id: Option<ConnectionId>,
421 ) -> Result<(), LemmyError>
423 OP: OperationType + ToString,
425 let community_id = post_res.post_view.community.id;
427 // Don't send my data with it
428 let mut post_sent = post_res.clone();
429 post_sent.post_view.my_vote = None;
431 // Send it to /c/all and that community
432 self.send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id)?;
433 self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
435 // Send it to the post room
436 self.send_post_room_message(
439 post_res.post_view.post.id,
446 fn sendit(&self, message: &str, id: ConnectionId) {
447 if let Some(info) = self.sessions.get(&id) {
448 let _ = info.addr.do_send(WsMessage(message.to_owned()));
452 pub(super) fn parse_json_message(
454 msg: StandardMessage,
455 ctx: &mut Context<Self>,
456 ) -> impl Future<Output = Result<String, LemmyError>> {
457 let rate_limiter = self.rate_limiter.clone();
459 let ip: IpAddr = match self.sessions.get(&msg.id) {
460 Some(info) => info.ip.to_owned(),
461 None => IpAddr("blank_ip".to_string()),
464 let context = LemmyContext {
465 pool: self.pool.clone(),
466 chat_server: ctx.address(),
467 client: self.client.to_owned(),
468 activity_queue: self.activity_queue.to_owned(),
469 settings: self.settings.to_owned(),
470 secret: self.secret.to_owned(),
472 let message_handler_crud = self.message_handler_crud;
473 let message_handler = self.message_handler;
475 let json: Value = serde_json::from_str(&msg.msg)?;
476 let data = &json["data"].to_string();
479 .ok_or_else(|| LemmyError::from_message("missing op"))?;
481 if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
482 let fut = (message_handler_crud)(context, msg.id, user_operation_crud.clone(), data);
483 match user_operation_crud {
484 UserOperationCrud::Register => rate_limiter.register().wrap(ip, fut).await,
485 UserOperationCrud::CreatePost => rate_limiter.post().wrap(ip, fut).await,
486 UserOperationCrud::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
487 UserOperationCrud::CreateComment => rate_limiter.comment().wrap(ip, fut).await,
488 _ => rate_limiter.message().wrap(ip, fut).await,
491 let user_operation = UserOperation::from_str(op)?;
492 let fut = (message_handler)(context, msg.id, user_operation.clone(), data);
493 match user_operation {
494 UserOperation::GetCaptcha => rate_limiter.post().wrap(ip, fut).await,
495 _ => rate_limiter.message().wrap(ip, fut).await,