3 serialize_websocket_message,
10 use anyhow::Context as acontext;
11 use background_jobs::QueueHandle;
13 r2d2::{ConnectionManager, Pool},
16 use lemmy_api_common::{comment::*, post::*};
17 use lemmy_db_schema::{
18 newtypes::{CommunityId, LocalUserId, PostId},
19 source::secret::Secret,
23 rate_limit::RateLimit,
24 settings::structs::Settings,
30 use rand::rngs::ThreadRng;
33 use serde_json::Value;
35 collections::{HashMap, HashSet},
39 use tokio::macros::support::Pin;
41 type MessageHandlerType = fn(
42 context: LemmyContext,
46 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
48 type MessageHandlerCrudType = fn(
49 context: LemmyContext,
51 op: UserOperationCrud,
53 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
55 /// `ChatServer` manages chat rooms and responsible for coordinating chat
57 pub struct ChatServer {
58 /// A map from generated random ID to session addr
59 pub sessions: HashMap<ConnectionId, SessionInfo>,
61 /// A map from post_id to set of connectionIDs
62 pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
64 /// A map from community to set of connectionIDs
65 pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
67 pub mod_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
69 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
70 /// sessions (IE clients)
71 pub(super) user_rooms: HashMap<LocalUserId, HashSet<ConnectionId>>,
73 pub(super) rng: ThreadRng,
76 pub(super) pool: Pool<ConnectionManager<PgConnection>>,
79 pub(super) settings: Settings,
82 pub(super) secret: Secret,
84 /// Rate limiting based on rate type and IP addr
85 pub(super) rate_limiter: RateLimit,
87 /// A list of the current captchas
88 pub(super) captchas: Vec<CaptchaItem>,
90 message_handler: MessageHandlerType,
91 message_handler_crud: MessageHandlerCrudType,
96 activity_queue: QueueHandle,
99 pub struct SessionInfo {
100 pub addr: Recipient<WsMessage>,
104 /// `ChatServer` is an actor. It maintains list of connection client session.
105 /// And manages available rooms. Peers send messages to other peers in same
106 /// room through `ChatServer`.
108 #![allow(clippy::too_many_arguments)]
110 pool: Pool<ConnectionManager<PgConnection>>,
111 rate_limiter: RateLimit,
112 message_handler: MessageHandlerType,
113 message_handler_crud: MessageHandlerCrudType,
115 activity_queue: QueueHandle,
120 sessions: HashMap::new(),
121 post_rooms: HashMap::new(),
122 community_rooms: HashMap::new(),
123 mod_rooms: HashMap::new(),
124 user_rooms: HashMap::new(),
125 rng: rand::thread_rng(),
128 captchas: Vec::new(),
130 message_handler_crud,
138 pub fn join_community_room(
140 community_id: CommunityId,
142 ) -> Result<(), LemmyError> {
143 // remove session from all rooms
144 for sessions in self.community_rooms.values_mut() {
145 sessions.remove(&id);
148 // Also leave all post rooms
149 // This avoids double messages
150 for sessions in self.post_rooms.values_mut() {
151 sessions.remove(&id);
154 // If the room doesn't exist yet
155 if self.community_rooms.get_mut(&community_id).is_none() {
156 self.community_rooms.insert(community_id, HashSet::new());
161 .get_mut(&community_id)
162 .context(location_info!())?
167 pub fn join_mod_room(
169 community_id: CommunityId,
171 ) -> Result<(), LemmyError> {
172 // remove session from all rooms
173 for sessions in self.mod_rooms.values_mut() {
174 sessions.remove(&id);
177 // If the room doesn't exist yet
178 if self.mod_rooms.get_mut(&community_id).is_none() {
179 self.mod_rooms.insert(community_id, HashSet::new());
184 .get_mut(&community_id)
185 .context(location_info!())?
190 pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
191 // remove session from all rooms
192 for sessions in self.post_rooms.values_mut() {
193 sessions.remove(&id);
196 // Also leave all communities
197 // This avoids double messages
198 // TODO found a bug, whereby community messages like
199 // delete and remove aren't sent, because
200 // you left the community room
201 for sessions in self.community_rooms.values_mut() {
202 sessions.remove(&id);
205 // If the room doesn't exist yet
206 if self.post_rooms.get_mut(&post_id).is_none() {
207 self.post_rooms.insert(post_id, HashSet::new());
213 .context(location_info!())?
219 pub fn join_user_room(
221 user_id: LocalUserId,
223 ) -> Result<(), LemmyError> {
224 // remove session from all rooms
225 for sessions in self.user_rooms.values_mut() {
226 sessions.remove(&id);
229 // If the room doesn't exist yet
230 if self.user_rooms.get_mut(&user_id).is_none() {
231 self.user_rooms.insert(user_id, HashSet::new());
237 .context(location_info!())?
243 fn send_post_room_message<OP, Response>(
248 websocket_id: Option<ConnectionId>,
249 ) -> Result<(), LemmyError>
251 OP: OperationType + ToString,
254 let res_str = &serialize_websocket_message(op, response)?;
255 if let Some(sessions) = self.post_rooms.get(&post_id) {
257 if let Some(my_id) = websocket_id {
262 self.sendit(res_str, *id);
268 pub fn send_community_room_message<OP, Response>(
272 community_id: CommunityId,
273 websocket_id: Option<ConnectionId>,
274 ) -> Result<(), LemmyError>
276 OP: OperationType + ToString,
279 let res_str = &serialize_websocket_message(op, response)?;
280 if let Some(sessions) = self.community_rooms.get(&community_id) {
282 if let Some(my_id) = websocket_id {
287 self.sendit(res_str, *id);
293 pub fn send_mod_room_message<OP, Response>(
297 community_id: CommunityId,
298 websocket_id: Option<ConnectionId>,
299 ) -> Result<(), LemmyError>
301 OP: OperationType + ToString,
304 let res_str = &serialize_websocket_message(op, response)?;
305 if let Some(sessions) = self.mod_rooms.get(&community_id) {
307 if let Some(my_id) = websocket_id {
312 self.sendit(res_str, *id);
318 pub fn send_all_message<OP, Response>(
322 websocket_id: Option<ConnectionId>,
323 ) -> Result<(), LemmyError>
325 OP: OperationType + ToString,
328 let res_str = &serialize_websocket_message(op, response)?;
329 for id in self.sessions.keys() {
330 if let Some(my_id) = websocket_id {
335 self.sendit(res_str, *id);
340 pub fn send_user_room_message<OP, Response>(
344 recipient_id: LocalUserId,
345 websocket_id: Option<ConnectionId>,
346 ) -> Result<(), LemmyError>
348 OP: OperationType + ToString,
351 let res_str = &serialize_websocket_message(op, response)?;
352 if let Some(sessions) = self.user_rooms.get(&recipient_id) {
354 if let Some(my_id) = websocket_id {
359 self.sendit(res_str, *id);
365 pub fn send_comment<OP>(
368 comment: &CommentResponse,
369 websocket_id: Option<ConnectionId>,
370 ) -> Result<(), LemmyError>
372 OP: OperationType + ToString,
374 let mut comment_reply_sent = comment.clone();
376 // Strip out my specific user info
377 comment_reply_sent.comment_view.my_vote = None;
379 // Send it to the post room
380 let mut comment_post_sent = comment_reply_sent.clone();
381 // Remove the recipients here to separate mentions / user messages from post or community comments
382 comment_post_sent.recipient_ids = Vec::new();
383 self.send_post_room_message(
386 comment_post_sent.comment_view.post.id,
390 // Send it to the community too
391 self.send_community_room_message(
397 self.send_community_room_message(
400 comment.comment_view.community.id,
404 // Send it to the recipient(s) including the mentioned users
405 for recipient_id in &comment_reply_sent.recipient_ids {
406 self.send_user_room_message(
417 pub fn send_post<OP>(
420 post_res: &PostResponse,
421 websocket_id: Option<ConnectionId>,
422 ) -> Result<(), LemmyError>
424 OP: OperationType + ToString,
426 let community_id = post_res.post_view.community.id;
428 // Don't send my data with it
429 let mut post_sent = post_res.clone();
430 post_sent.post_view.my_vote = None;
432 // Send it to /c/all and that community
433 self.send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id)?;
434 self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
436 // Send it to the post room
437 self.send_post_room_message(
440 post_res.post_view.post.id,
447 fn sendit(&self, message: &str, id: ConnectionId) {
448 if let Some(info) = self.sessions.get(&id) {
449 let _ = info.addr.do_send(WsMessage(message.to_owned()));
453 pub(super) fn parse_json_message(
455 msg: StandardMessage,
456 ctx: &mut Context<Self>,
457 ) -> impl Future<Output = Result<String, LemmyError>> {
458 let rate_limiter = self.rate_limiter.clone();
460 let ip: IpAddr = match self.sessions.get(&msg.id) {
461 Some(info) => info.ip.to_owned(),
462 None => IpAddr("blank_ip".to_string()),
465 let context = LemmyContext {
466 pool: self.pool.clone(),
467 chat_server: ctx.address(),
468 client: self.client.to_owned(),
469 activity_queue: self.activity_queue.to_owned(),
470 settings: self.settings.to_owned(),
471 secret: self.secret.to_owned(),
473 let message_handler_crud = self.message_handler_crud;
474 let message_handler = self.message_handler;
476 let json: Value = serde_json::from_str(&msg.msg)?;
477 let data = &json["data"].to_string();
480 .ok_or_else(|| ApiError::err_plain("missing op"))?;
482 if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
483 let fut = (message_handler_crud)(context, msg.id, user_operation_crud.clone(), data);
484 match user_operation_crud {
485 UserOperationCrud::Register => rate_limiter.register().wrap(ip, fut).await,
486 UserOperationCrud::CreatePost => rate_limiter.post().wrap(ip, fut).await,
487 UserOperationCrud::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
488 UserOperationCrud::CreateComment => rate_limiter.comment().wrap(ip, fut).await,
489 _ => rate_limiter.message().wrap(ip, fut).await,
492 let user_operation = UserOperation::from_str(op)?;
493 let fut = (message_handler)(context, msg.id, user_operation.clone(), data);
494 rate_limiter.message().wrap(ip, fut).await