3 serialize_websocket_message,
10 use anyhow::Context as acontext;
12 r2d2::{ConnectionManager, Pool},
15 use lemmy_api_common::{comment::*, post::*};
16 use lemmy_db_schema::{
17 newtypes::{CommunityId, LocalUserId, PostId},
18 source::secret::Secret,
23 rate_limit::RateLimit,
24 settings::structs::Settings,
28 use rand::rngs::ThreadRng;
29 use reqwest_middleware::ClientWithMiddleware;
31 use serde_json::Value;
33 collections::{HashMap, HashSet},
37 use tokio::macros::support::Pin;
39 type MessageHandlerType = fn(
40 context: LemmyContext,
44 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
46 type MessageHandlerCrudType = fn(
47 context: LemmyContext,
49 op: UserOperationCrud,
51 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
53 /// `ChatServer` manages chat rooms and responsible for coordinating chat
55 pub struct ChatServer {
56 /// A map from generated random ID to session addr
57 pub sessions: HashMap<ConnectionId, SessionInfo>,
59 /// A map from post_id to set of connectionIDs
60 pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
62 /// A map from community to set of connectionIDs
63 pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
65 pub mod_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
67 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
68 /// sessions (IE clients)
69 pub(super) user_rooms: HashMap<LocalUserId, HashSet<ConnectionId>>,
71 pub(super) rng: ThreadRng,
74 pub(super) pool: Pool<ConnectionManager<PgConnection>>,
77 pub(super) settings: Settings,
80 pub(super) secret: Secret,
82 /// Rate limiting based on rate type and IP addr
83 pub(super) rate_limiter: RateLimit,
85 /// A list of the current captchas
86 pub(super) captchas: Vec<CaptchaItem>,
88 message_handler: MessageHandlerType,
89 message_handler_crud: MessageHandlerCrudType,
92 client: ClientWithMiddleware,
95 pub struct SessionInfo {
96 pub addr: Recipient<WsMessage>,
100 /// `ChatServer` is an actor. It maintains list of connection client session.
101 /// And manages available rooms. Peers send messages to other peers in same
102 /// room through `ChatServer`.
104 #![allow(clippy::too_many_arguments)]
106 pool: Pool<ConnectionManager<PgConnection>>,
107 rate_limiter: RateLimit,
108 message_handler: MessageHandlerType,
109 message_handler_crud: MessageHandlerCrudType,
110 client: ClientWithMiddleware,
115 sessions: HashMap::new(),
116 post_rooms: HashMap::new(),
117 community_rooms: HashMap::new(),
118 mod_rooms: HashMap::new(),
119 user_rooms: HashMap::new(),
120 rng: rand::thread_rng(),
123 captchas: Vec::new(),
125 message_handler_crud,
132 pub fn join_community_room(
134 community_id: CommunityId,
136 ) -> Result<(), LemmyError> {
137 // remove session from all rooms
138 for sessions in self.community_rooms.values_mut() {
139 sessions.remove(&id);
142 // Also leave all post rooms
143 // This avoids double messages
144 for sessions in self.post_rooms.values_mut() {
145 sessions.remove(&id);
148 // If the room doesn't exist yet
149 if self.community_rooms.get_mut(&community_id).is_none() {
150 self.community_rooms.insert(community_id, HashSet::new());
155 .get_mut(&community_id)
156 .context(location_info!())?
161 pub fn join_mod_room(
163 community_id: CommunityId,
165 ) -> Result<(), LemmyError> {
166 // remove session from all rooms
167 for sessions in self.mod_rooms.values_mut() {
168 sessions.remove(&id);
171 // If the room doesn't exist yet
172 if self.mod_rooms.get_mut(&community_id).is_none() {
173 self.mod_rooms.insert(community_id, HashSet::new());
178 .get_mut(&community_id)
179 .context(location_info!())?
184 pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
185 // remove session from all rooms
186 for sessions in self.post_rooms.values_mut() {
187 sessions.remove(&id);
190 // Also leave all communities
191 // This avoids double messages
192 // TODO found a bug, whereby community messages like
193 // delete and remove aren't sent, because
194 // you left the community room
195 for sessions in self.community_rooms.values_mut() {
196 sessions.remove(&id);
199 // If the room doesn't exist yet
200 if self.post_rooms.get_mut(&post_id).is_none() {
201 self.post_rooms.insert(post_id, HashSet::new());
207 .context(location_info!())?
213 pub fn join_user_room(
215 user_id: LocalUserId,
217 ) -> Result<(), LemmyError> {
218 // remove session from all rooms
219 for sessions in self.user_rooms.values_mut() {
220 sessions.remove(&id);
223 // If the room doesn't exist yet
224 if self.user_rooms.get_mut(&user_id).is_none() {
225 self.user_rooms.insert(user_id, HashSet::new());
231 .context(location_info!())?
237 fn send_post_room_message<OP, Response>(
242 websocket_id: Option<ConnectionId>,
243 ) -> Result<(), LemmyError>
245 OP: OperationType + ToString,
248 let res_str = &serialize_websocket_message(op, response)?;
249 if let Some(sessions) = self.post_rooms.get(&post_id) {
251 if let Some(my_id) = websocket_id {
256 self.sendit(res_str, *id);
262 pub fn send_community_room_message<OP, Response>(
266 community_id: CommunityId,
267 websocket_id: Option<ConnectionId>,
268 ) -> Result<(), LemmyError>
270 OP: OperationType + ToString,
273 let res_str = &serialize_websocket_message(op, response)?;
274 if let Some(sessions) = self.community_rooms.get(&community_id) {
276 if let Some(my_id) = websocket_id {
281 self.sendit(res_str, *id);
287 pub fn send_mod_room_message<OP, Response>(
291 community_id: CommunityId,
292 websocket_id: Option<ConnectionId>,
293 ) -> Result<(), LemmyError>
295 OP: OperationType + ToString,
298 let res_str = &serialize_websocket_message(op, response)?;
299 if let Some(sessions) = self.mod_rooms.get(&community_id) {
301 if let Some(my_id) = websocket_id {
306 self.sendit(res_str, *id);
312 pub fn send_all_message<OP, Response>(
316 websocket_id: Option<ConnectionId>,
317 ) -> Result<(), LemmyError>
319 OP: OperationType + ToString,
322 let res_str = &serialize_websocket_message(op, response)?;
323 for id in self.sessions.keys() {
324 if let Some(my_id) = websocket_id {
329 self.sendit(res_str, *id);
334 pub fn send_user_room_message<OP, Response>(
338 recipient_id: LocalUserId,
339 websocket_id: Option<ConnectionId>,
340 ) -> Result<(), LemmyError>
342 OP: OperationType + ToString,
345 let res_str = &serialize_websocket_message(op, response)?;
346 if let Some(sessions) = self.user_rooms.get(&recipient_id) {
348 if let Some(my_id) = websocket_id {
353 self.sendit(res_str, *id);
359 pub fn send_comment<OP>(
362 comment: &CommentResponse,
363 websocket_id: Option<ConnectionId>,
364 ) -> Result<(), LemmyError>
366 OP: OperationType + ToString,
368 let mut comment_reply_sent = comment.clone();
370 // Strip out my specific user info
371 comment_reply_sent.comment_view.my_vote = None;
373 // Send it to the post room
374 let mut comment_post_sent = comment_reply_sent.clone();
375 // Remove the recipients here to separate mentions / user messages from post or community comments
376 comment_post_sent.recipient_ids = Vec::new();
377 self.send_post_room_message(
380 comment_post_sent.comment_view.post.id,
384 // Send it to the community too
385 self.send_community_room_message(
391 self.send_community_room_message(
394 comment.comment_view.community.id,
398 // Send it to the recipient(s) including the mentioned users
399 for recipient_id in &comment_reply_sent.recipient_ids {
400 self.send_user_room_message(
411 pub fn send_post<OP>(
414 post_res: &PostResponse,
415 websocket_id: Option<ConnectionId>,
416 ) -> Result<(), LemmyError>
418 OP: OperationType + ToString,
420 let community_id = post_res.post_view.community.id;
422 // Don't send my data with it
423 let mut post_sent = post_res.clone();
424 post_sent.post_view.my_vote = None;
426 // Send it to /c/all and that community
427 self.send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id)?;
428 self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
430 // Send it to the post room
431 self.send_post_room_message(
434 post_res.post_view.post.id,
441 fn sendit(&self, message: &str, id: ConnectionId) {
442 if let Some(info) = self.sessions.get(&id) {
443 info.addr.do_send(WsMessage(message.to_owned()));
447 pub(super) fn parse_json_message(
449 msg: StandardMessage,
450 ctx: &mut Context<Self>,
451 ) -> impl Future<Output = Result<String, LemmyError>> {
452 let rate_limiter = self.rate_limiter.clone();
454 let ip: IpAddr = match self.sessions.get(&msg.id) {
455 Some(info) => info.ip.to_owned(),
456 None => IpAddr("blank_ip".to_string()),
459 let context = LemmyContext {
460 pool: self.pool.clone(),
461 chat_server: ctx.address(),
462 client: self.client.to_owned(),
463 settings: self.settings.to_owned(),
464 secret: self.secret.to_owned(),
466 let message_handler_crud = self.message_handler_crud;
467 let message_handler = self.message_handler;
469 let json: Value = serde_json::from_str(&msg.msg)?;
470 let data = &json["data"].to_string();
473 .ok_or_else(|| LemmyError::from_message("missing op"))?;
475 // check if api call passes the rate limit, and generate future for later execution
476 let (passed, fut) = if let Ok(user_operation_crud) = UserOperationCrud::from_str(op) {
477 let passed = match user_operation_crud {
478 UserOperationCrud::Register => rate_limiter.register().check(ip),
479 UserOperationCrud::CreatePost => rate_limiter.post().check(ip),
480 UserOperationCrud::CreateCommunity => rate_limiter.register().check(ip),
481 UserOperationCrud::CreateComment => rate_limiter.comment().check(ip),
482 _ => rate_limiter.message().check(ip),
484 let fut = (message_handler_crud)(context, msg.id, user_operation_crud, data);
487 let user_operation = UserOperation::from_str(op)?;
488 let passed = match user_operation {
489 UserOperation::GetCaptcha => rate_limiter.post().check(ip),
490 UserOperation::Search => rate_limiter.search().check(ip),
491 _ => rate_limiter.message().check(ip),
493 let fut = (message_handler)(context, msg.id, user_operation, data);
497 // if rate limit passed, execute api call future
501 // if rate limit was hit, respond with empty message