]> Untitled Git - lemmy.git/blob - crates/websocket/src/chat_server.rs
f1c936d6d4e208721515e085a7b5f592989af8f1
[lemmy.git] / crates / websocket / src / chat_server.rs
1 use crate::{messages::*, serialize_websocket_message, LemmyContext, UserOperation};
2 use actix::prelude::*;
3 use anyhow::Context as acontext;
4 use background_jobs::QueueHandle;
5 use diesel::{
6   r2d2::{ConnectionManager, Pool},
7   PgConnection,
8 };
9 use lemmy_api_common::{comment::*, post::*};
10 use lemmy_db_schema::{CommunityId, LocalUserId, PostId};
11 use lemmy_utils::{
12   location_info,
13   rate_limit::RateLimit,
14   ApiError,
15   ConnectionId,
16   IpAddr,
17   LemmyError,
18 };
19 use rand::rngs::ThreadRng;
20 use reqwest::Client;
21 use serde::Serialize;
22 use serde_json::Value;
23 use std::{
24   collections::{HashMap, HashSet},
25   str::FromStr,
26 };
27 use tokio::macros::support::Pin;
28
29 type MessageHandlerType = fn(
30   context: LemmyContext,
31   id: ConnectionId,
32   op: UserOperation,
33   data: &str,
34 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
35
36 /// `ChatServer` manages chat rooms and responsible for coordinating chat
37 /// session.
38 pub struct ChatServer {
39   /// A map from generated random ID to session addr
40   pub sessions: HashMap<ConnectionId, SessionInfo>,
41
42   /// A map from post_id to set of connectionIDs
43   pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
44
45   /// A map from community to set of connectionIDs
46   pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
47
48   pub mod_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
49
50   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
51   /// sessions (IE clients)
52   pub(super) user_rooms: HashMap<LocalUserId, HashSet<ConnectionId>>,
53
54   pub(super) rng: ThreadRng,
55
56   /// The DB Pool
57   pub(super) pool: Pool<ConnectionManager<PgConnection>>,
58
59   /// Rate limiting based on rate type and IP addr
60   pub(super) rate_limiter: RateLimit,
61
62   /// A list of the current captchas
63   pub(super) captchas: Vec<CaptchaItem>,
64
65   message_handler: MessageHandlerType,
66
67   /// An HTTP Client
68   client: Client,
69
70   activity_queue: QueueHandle,
71 }
72
73 pub struct SessionInfo {
74   pub addr: Recipient<WsMessage>,
75   pub ip: IpAddr,
76 }
77
78 /// `ChatServer` is an actor. It maintains list of connection client session.
79 /// And manages available rooms. Peers send messages to other peers in same
80 /// room through `ChatServer`.
81 impl ChatServer {
82   pub fn startup(
83     pool: Pool<ConnectionManager<PgConnection>>,
84     rate_limiter: RateLimit,
85     message_handler: MessageHandlerType,
86     client: Client,
87     activity_queue: QueueHandle,
88   ) -> ChatServer {
89     ChatServer {
90       sessions: HashMap::new(),
91       post_rooms: HashMap::new(),
92       community_rooms: HashMap::new(),
93       mod_rooms: HashMap::new(),
94       user_rooms: HashMap::new(),
95       rng: rand::thread_rng(),
96       pool,
97       rate_limiter,
98       captchas: Vec::new(),
99       message_handler,
100       client,
101       activity_queue,
102     }
103   }
104
105   pub fn join_community_room(
106     &mut self,
107     community_id: CommunityId,
108     id: ConnectionId,
109   ) -> Result<(), LemmyError> {
110     // remove session from all rooms
111     for sessions in self.community_rooms.values_mut() {
112       sessions.remove(&id);
113     }
114
115     // Also leave all post rooms
116     // This avoids double messages
117     for sessions in self.post_rooms.values_mut() {
118       sessions.remove(&id);
119     }
120
121     // If the room doesn't exist yet
122     if self.community_rooms.get_mut(&community_id).is_none() {
123       self.community_rooms.insert(community_id, HashSet::new());
124     }
125
126     self
127       .community_rooms
128       .get_mut(&community_id)
129       .context(location_info!())?
130       .insert(id);
131     Ok(())
132   }
133
134   pub fn join_mod_room(
135     &mut self,
136     community_id: CommunityId,
137     id: ConnectionId,
138   ) -> Result<(), LemmyError> {
139     // remove session from all rooms
140     for sessions in self.mod_rooms.values_mut() {
141       sessions.remove(&id);
142     }
143
144     // If the room doesn't exist yet
145     if self.mod_rooms.get_mut(&community_id).is_none() {
146       self.mod_rooms.insert(community_id, HashSet::new());
147     }
148
149     self
150       .mod_rooms
151       .get_mut(&community_id)
152       .context(location_info!())?
153       .insert(id);
154     Ok(())
155   }
156
157   pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
158     // remove session from all rooms
159     for sessions in self.post_rooms.values_mut() {
160       sessions.remove(&id);
161     }
162
163     // Also leave all communities
164     // This avoids double messages
165     // TODO found a bug, whereby community messages like
166     // delete and remove aren't sent, because
167     // you left the community room
168     for sessions in self.community_rooms.values_mut() {
169       sessions.remove(&id);
170     }
171
172     // If the room doesn't exist yet
173     if self.post_rooms.get_mut(&post_id).is_none() {
174       self.post_rooms.insert(post_id, HashSet::new());
175     }
176
177     self
178       .post_rooms
179       .get_mut(&post_id)
180       .context(location_info!())?
181       .insert(id);
182
183     Ok(())
184   }
185
186   pub fn join_user_room(
187     &mut self,
188     user_id: LocalUserId,
189     id: ConnectionId,
190   ) -> Result<(), LemmyError> {
191     // remove session from all rooms
192     for sessions in self.user_rooms.values_mut() {
193       sessions.remove(&id);
194     }
195
196     // If the room doesn't exist yet
197     if self.user_rooms.get_mut(&user_id).is_none() {
198       self.user_rooms.insert(user_id, HashSet::new());
199     }
200
201     self
202       .user_rooms
203       .get_mut(&user_id)
204       .context(location_info!())?
205       .insert(id);
206
207     Ok(())
208   }
209
210   fn send_post_room_message<Response>(
211     &self,
212     op: &UserOperation,
213     response: &Response,
214     post_id: PostId,
215     websocket_id: Option<ConnectionId>,
216   ) -> Result<(), LemmyError>
217   where
218     Response: Serialize,
219   {
220     let res_str = &serialize_websocket_message(op, response)?;
221     if let Some(sessions) = self.post_rooms.get(&post_id) {
222       for id in sessions {
223         if let Some(my_id) = websocket_id {
224           if *id == my_id {
225             continue;
226           }
227         }
228         self.sendit(res_str, *id);
229       }
230     }
231     Ok(())
232   }
233
234   pub fn send_community_room_message<Response>(
235     &self,
236     op: &UserOperation,
237     response: &Response,
238     community_id: CommunityId,
239     websocket_id: Option<ConnectionId>,
240   ) -> Result<(), LemmyError>
241   where
242     Response: Serialize,
243   {
244     let res_str = &serialize_websocket_message(op, response)?;
245     if let Some(sessions) = self.community_rooms.get(&community_id) {
246       for id in sessions {
247         if let Some(my_id) = websocket_id {
248           if *id == my_id {
249             continue;
250           }
251         }
252         self.sendit(res_str, *id);
253       }
254     }
255     Ok(())
256   }
257
258   pub fn send_mod_room_message<Response>(
259     &self,
260     op: &UserOperation,
261     response: &Response,
262     community_id: CommunityId,
263     websocket_id: Option<ConnectionId>,
264   ) -> Result<(), LemmyError>
265   where
266     Response: Serialize,
267   {
268     let res_str = &serialize_websocket_message(op, response)?;
269     if let Some(sessions) = self.mod_rooms.get(&community_id) {
270       for id in sessions {
271         if let Some(my_id) = websocket_id {
272           if *id == my_id {
273             continue;
274           }
275         }
276         self.sendit(res_str, *id);
277       }
278     }
279     Ok(())
280   }
281
282   pub fn send_all_message<Response>(
283     &self,
284     op: &UserOperation,
285     response: &Response,
286     websocket_id: Option<ConnectionId>,
287   ) -> Result<(), LemmyError>
288   where
289     Response: Serialize,
290   {
291     let res_str = &serialize_websocket_message(op, response)?;
292     for id in self.sessions.keys() {
293       if let Some(my_id) = websocket_id {
294         if *id == my_id {
295           continue;
296         }
297       }
298       self.sendit(res_str, *id);
299     }
300     Ok(())
301   }
302
303   pub fn send_user_room_message<Response>(
304     &self,
305     op: &UserOperation,
306     response: &Response,
307     recipient_id: LocalUserId,
308     websocket_id: Option<ConnectionId>,
309   ) -> Result<(), LemmyError>
310   where
311     Response: Serialize,
312   {
313     let res_str = &serialize_websocket_message(op, response)?;
314     if let Some(sessions) = self.user_rooms.get(&recipient_id) {
315       for id in sessions {
316         if let Some(my_id) = websocket_id {
317           if *id == my_id {
318             continue;
319           }
320         }
321         self.sendit(res_str, *id);
322       }
323     }
324     Ok(())
325   }
326
327   pub fn send_comment(
328     &self,
329     user_operation: &UserOperation,
330     comment: &CommentResponse,
331     websocket_id: Option<ConnectionId>,
332   ) -> Result<(), LemmyError> {
333     let mut comment_reply_sent = comment.clone();
334
335     // Strip out my specific user info
336     comment_reply_sent.comment_view.my_vote = None;
337
338     // Send it to the post room
339     let mut comment_post_sent = comment_reply_sent.clone();
340     // Remove the recipients here to separate mentions / user messages from post or community comments
341     comment_post_sent.recipient_ids = Vec::new();
342     self.send_post_room_message(
343       user_operation,
344       &comment_post_sent,
345       comment_post_sent.comment_view.post.id,
346       websocket_id,
347     )?;
348
349     // Send it to the community too
350     self.send_community_room_message(
351       user_operation,
352       &comment_post_sent,
353       CommunityId(0),
354       websocket_id,
355     )?;
356     self.send_community_room_message(
357       user_operation,
358       &comment_post_sent,
359       comment.comment_view.community.id,
360       websocket_id,
361     )?;
362
363     // Send it to the recipient(s) including the mentioned users
364     for recipient_id in &comment_reply_sent.recipient_ids {
365       self.send_user_room_message(
366         user_operation,
367         &comment_reply_sent,
368         *recipient_id,
369         websocket_id,
370       )?;
371     }
372
373     Ok(())
374   }
375
376   pub fn send_post(
377     &self,
378     user_operation: &UserOperation,
379     post_res: &PostResponse,
380     websocket_id: Option<ConnectionId>,
381   ) -> Result<(), LemmyError> {
382     let community_id = post_res.post_view.community.id;
383
384     // Don't send my data with it
385     let mut post_sent = post_res.clone();
386     post_sent.post_view.my_vote = None;
387
388     // Send it to /c/all and that community
389     self.send_community_room_message(user_operation, &post_sent, CommunityId(0), websocket_id)?;
390     self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
391
392     // Send it to the post room
393     self.send_post_room_message(
394       user_operation,
395       &post_sent,
396       post_res.post_view.post.id,
397       websocket_id,
398     )?;
399
400     Ok(())
401   }
402
403   fn sendit(&self, message: &str, id: ConnectionId) {
404     if let Some(info) = self.sessions.get(&id) {
405       let _ = info.addr.do_send(WsMessage(message.to_owned()));
406     }
407   }
408
409   pub(super) fn parse_json_message(
410     &mut self,
411     msg: StandardMessage,
412     ctx: &mut Context<Self>,
413   ) -> impl Future<Output = Result<String, LemmyError>> {
414     let rate_limiter = self.rate_limiter.clone();
415
416     let ip: IpAddr = match self.sessions.get(&msg.id) {
417       Some(info) => info.ip.to_owned(),
418       None => IpAddr("blank_ip".to_string()),
419     };
420
421     let context = LemmyContext {
422       pool: self.pool.clone(),
423       chat_server: ctx.address(),
424       client: self.client.to_owned(),
425       activity_queue: self.activity_queue.to_owned(),
426     };
427     let message_handler = self.message_handler;
428     async move {
429       let json: Value = serde_json::from_str(&msg.msg)?;
430       let data = &json["data"].to_string();
431       let op = &json["op"].as_str().ok_or(ApiError {
432         message: "Unknown op type".to_string(),
433       })?;
434
435       let user_operation = UserOperation::from_str(&op)?;
436       let fut = (message_handler)(context, msg.id, user_operation.clone(), data);
437       match user_operation {
438         UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
439         UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
440         UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
441         _ => rate_limiter.message().wrap(ip, fut).await,
442       }
443     }
444   }
445 }