1 use crate::{messages::*, serialize_websocket_message, LemmyContext, UserOperation};
3 use anyhow::Context as acontext;
4 use background_jobs::QueueHandle;
6 r2d2::{ConnectionManager, Pool},
9 use lemmy_rate_limit::RateLimit;
10 use lemmy_structs::{comment::*, post::*};
21 use rand::rngs::ThreadRng;
24 use serde_json::Value;
26 collections::{HashMap, HashSet},
29 use tokio::macros::support::Pin;
31 type MessageHandlerType = fn(
32 context: LemmyContext,
36 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
38 /// `ChatServer` manages chat rooms and responsible for coordinating chat
40 pub struct ChatServer {
41 /// A map from generated random ID to session addr
42 pub sessions: HashMap<ConnectionId, SessionInfo>,
44 /// A map from post_id to set of connectionIDs
45 pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
47 /// A map from community to set of connectionIDs
48 pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
50 /// A map from user id to its connection ID for joined users. Remember a user can have multiple
51 /// sessions (IE clients)
52 pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
54 pub(super) rng: ThreadRng,
57 pub(super) pool: Pool<ConnectionManager<PgConnection>>,
59 /// Rate limiting based on rate type and IP addr
60 pub(super) rate_limiter: RateLimit,
62 /// A list of the current captchas
63 pub(super) captchas: Vec<CaptchaItem>,
65 message_handler: MessageHandlerType,
70 activity_queue: QueueHandle,
73 pub struct SessionInfo {
74 pub addr: Recipient<WSMessage>,
78 /// `ChatServer` is an actor. It maintains list of connection client session.
79 /// And manages available rooms. Peers send messages to other peers in same
80 /// room through `ChatServer`.
83 pool: Pool<ConnectionManager<PgConnection>>,
84 rate_limiter: RateLimit,
85 message_handler: MessageHandlerType,
87 activity_queue: QueueHandle,
90 sessions: HashMap::new(),
91 post_rooms: HashMap::new(),
92 community_rooms: HashMap::new(),
93 user_rooms: HashMap::new(),
94 rng: rand::thread_rng(),
104 pub fn join_community_room(
106 community_id: CommunityId,
108 ) -> Result<(), LemmyError> {
109 // remove session from all rooms
110 for sessions in self.community_rooms.values_mut() {
111 sessions.remove(&id);
114 // Also leave all post rooms
115 // This avoids double messages
116 for sessions in self.post_rooms.values_mut() {
117 sessions.remove(&id);
120 // If the room doesn't exist yet
121 if self.community_rooms.get_mut(&community_id).is_none() {
122 self.community_rooms.insert(community_id, HashSet::new());
127 .get_mut(&community_id)
128 .context(location_info!())?
133 pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
134 // remove session from all rooms
135 for sessions in self.post_rooms.values_mut() {
136 sessions.remove(&id);
139 // Also leave all communities
140 // This avoids double messages
141 // TODO found a bug, whereby community messages like
142 // delete and remove aren't sent, because
143 // you left the community room
144 for sessions in self.community_rooms.values_mut() {
145 sessions.remove(&id);
148 // If the room doesn't exist yet
149 if self.post_rooms.get_mut(&post_id).is_none() {
150 self.post_rooms.insert(post_id, HashSet::new());
156 .context(location_info!())?
162 pub fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) -> Result<(), LemmyError> {
163 // remove session from all rooms
164 for sessions in self.user_rooms.values_mut() {
165 sessions.remove(&id);
168 // If the room doesn't exist yet
169 if self.user_rooms.get_mut(&user_id).is_none() {
170 self.user_rooms.insert(user_id, HashSet::new());
176 .context(location_info!())?
182 fn send_post_room_message<Response>(
187 websocket_id: Option<ConnectionId>,
188 ) -> Result<(), LemmyError>
192 let res_str = &serialize_websocket_message(op, response)?;
193 if let Some(sessions) = self.post_rooms.get(&post_id) {
195 if let Some(my_id) = websocket_id {
200 self.sendit(res_str, *id);
206 pub fn send_community_room_message<Response>(
210 community_id: CommunityId,
211 websocket_id: Option<ConnectionId>,
212 ) -> Result<(), LemmyError>
216 let res_str = &serialize_websocket_message(op, response)?;
217 if let Some(sessions) = self.community_rooms.get(&community_id) {
219 if let Some(my_id) = websocket_id {
224 self.sendit(res_str, *id);
230 pub fn send_all_message<Response>(
234 websocket_id: Option<ConnectionId>,
235 ) -> Result<(), LemmyError>
239 let res_str = &serialize_websocket_message(op, response)?;
240 for id in self.sessions.keys() {
241 if let Some(my_id) = websocket_id {
246 self.sendit(res_str, *id);
251 pub fn send_user_room_message<Response>(
255 recipient_id: UserId,
256 websocket_id: Option<ConnectionId>,
257 ) -> Result<(), LemmyError>
261 let res_str = &serialize_websocket_message(op, response)?;
262 if let Some(sessions) = self.user_rooms.get(&recipient_id) {
264 if let Some(my_id) = websocket_id {
269 self.sendit(res_str, *id);
277 user_operation: &UserOperation,
278 comment: &CommentResponse,
279 websocket_id: Option<ConnectionId>,
280 ) -> Result<(), LemmyError> {
281 let mut comment_reply_sent = comment.clone();
282 comment_reply_sent.comment.my_vote = None;
283 comment_reply_sent.comment.user_id = None;
285 let mut comment_post_sent = comment_reply_sent.clone();
286 comment_post_sent.recipient_ids = Vec::new();
288 // Send it to the post room
289 self.send_post_room_message(
292 comment_post_sent.comment.post_id,
296 // Send it to the recipient(s) including the mentioned users
297 for recipient_id in &comment_reply_sent.recipient_ids {
298 self.send_user_room_message(
306 // Send it to the community too
307 self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?;
308 self.send_community_room_message(
311 comment.comment.community_id,
320 user_operation: &UserOperation,
322 websocket_id: Option<ConnectionId>,
323 ) -> Result<(), LemmyError> {
324 let community_id = post.post.community_id;
326 // Don't send my data with it
327 let mut post_sent = post.clone();
328 post_sent.post.my_vote = None;
329 post_sent.post.user_id = None;
331 // Send it to /c/all and that community
332 self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?;
333 self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
335 // Send it to the post room
336 self.send_post_room_message(user_operation, &post_sent, post.post.id, websocket_id)?;
341 fn sendit(&self, message: &str, id: ConnectionId) {
342 if let Some(info) = self.sessions.get(&id) {
343 let _ = info.addr.do_send(WSMessage(message.to_owned()));
347 pub(super) fn parse_json_message(
349 msg: StandardMessage,
350 ctx: &mut Context<Self>,
351 ) -> impl Future<Output = Result<String, LemmyError>> {
352 let rate_limiter = self.rate_limiter.clone();
354 let ip: IPAddr = match self.sessions.get(&msg.id) {
355 Some(info) => info.ip.to_owned(),
356 None => "blank_ip".to_string(),
359 let context = LemmyContext {
360 pool: self.pool.clone(),
361 chat_server: ctx.address(),
362 client: self.client.to_owned(),
363 activity_queue: self.activity_queue.to_owned(),
365 let message_handler = self.message_handler;
367 let json: Value = serde_json::from_str(&msg.msg)?;
368 let data = &json["data"].to_string();
369 let op = &json["op"].as_str().ok_or(APIError {
370 message: "Unknown op type".to_string(),
373 let user_operation = UserOperation::from_str(&op)?;
374 let fut = (message_handler)(context, msg.id, user_operation.clone(), data);
375 match user_operation {
376 UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
377 UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
378 UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
379 _ => rate_limiter.message().wrap(ip, fut).await,