]> Untitled Git - lemmy.git/blob - crates/websocket/src/chat_server.rs
Merge pull request #1328 from LemmyNet/move_views_to_diesel
[lemmy.git] / crates / websocket / src / chat_server.rs
1 use crate::{messages::*, serialize_websocket_message, LemmyContext, UserOperation};
2 use actix::prelude::*;
3 use anyhow::Context as acontext;
4 use background_jobs::QueueHandle;
5 use diesel::{
6   r2d2::{ConnectionManager, Pool},
7   PgConnection,
8 };
9 use lemmy_structs::{comment::*, post::*};
10 use lemmy_utils::{
11   location_info,
12   rate_limit::RateLimit,
13   APIError,
14   CommunityId,
15   ConnectionId,
16   IPAddr,
17   LemmyError,
18   PostId,
19   UserId,
20 };
21 use rand::rngs::ThreadRng;
22 use reqwest::Client;
23 use serde::Serialize;
24 use serde_json::Value;
25 use std::{
26   collections::{HashMap, HashSet},
27   str::FromStr,
28 };
29 use tokio::macros::support::Pin;
30
31 type MessageHandlerType = fn(
32   context: LemmyContext,
33   id: ConnectionId,
34   op: UserOperation,
35   data: &str,
36 ) -> Pin<Box<dyn Future<Output = Result<String, LemmyError>> + '_>>;
37
38 /// `ChatServer` manages chat rooms and responsible for coordinating chat
39 /// session.
40 pub struct ChatServer {
41   /// A map from generated random ID to session addr
42   pub sessions: HashMap<ConnectionId, SessionInfo>,
43
44   /// A map from post_id to set of connectionIDs
45   pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
46
47   /// A map from community to set of connectionIDs
48   pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
49
50   pub mod_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
51
52   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
53   /// sessions (IE clients)
54   pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
55
56   pub(super) rng: ThreadRng,
57
58   /// The DB Pool
59   pub(super) pool: Pool<ConnectionManager<PgConnection>>,
60
61   /// Rate limiting based on rate type and IP addr
62   pub(super) rate_limiter: RateLimit,
63
64   /// A list of the current captchas
65   pub(super) captchas: Vec<CaptchaItem>,
66
67   message_handler: MessageHandlerType,
68
69   /// An HTTP Client
70   client: Client,
71
72   activity_queue: QueueHandle,
73 }
74
75 pub struct SessionInfo {
76   pub addr: Recipient<WSMessage>,
77   pub ip: IPAddr,
78 }
79
80 /// `ChatServer` is an actor. It maintains list of connection client session.
81 /// And manages available rooms. Peers send messages to other peers in same
82 /// room through `ChatServer`.
83 impl ChatServer {
84   pub fn startup(
85     pool: Pool<ConnectionManager<PgConnection>>,
86     rate_limiter: RateLimit,
87     message_handler: MessageHandlerType,
88     client: Client,
89     activity_queue: QueueHandle,
90   ) -> ChatServer {
91     ChatServer {
92       sessions: HashMap::new(),
93       post_rooms: HashMap::new(),
94       community_rooms: HashMap::new(),
95       mod_rooms: HashMap::new(),
96       user_rooms: HashMap::new(),
97       rng: rand::thread_rng(),
98       pool,
99       rate_limiter,
100       captchas: Vec::new(),
101       message_handler,
102       client,
103       activity_queue,
104     }
105   }
106
107   pub fn join_community_room(
108     &mut self,
109     community_id: CommunityId,
110     id: ConnectionId,
111   ) -> Result<(), LemmyError> {
112     // remove session from all rooms
113     for sessions in self.community_rooms.values_mut() {
114       sessions.remove(&id);
115     }
116
117     // Also leave all post rooms
118     // This avoids double messages
119     for sessions in self.post_rooms.values_mut() {
120       sessions.remove(&id);
121     }
122
123     // If the room doesn't exist yet
124     if self.community_rooms.get_mut(&community_id).is_none() {
125       self.community_rooms.insert(community_id, HashSet::new());
126     }
127
128     self
129       .community_rooms
130       .get_mut(&community_id)
131       .context(location_info!())?
132       .insert(id);
133     Ok(())
134   }
135
136   pub fn join_mod_room(
137     &mut self,
138     community_id: CommunityId,
139     id: ConnectionId,
140   ) -> Result<(), LemmyError> {
141     // remove session from all rooms
142     for sessions in self.mod_rooms.values_mut() {
143       sessions.remove(&id);
144     }
145
146     // If the room doesn't exist yet
147     if self.mod_rooms.get_mut(&community_id).is_none() {
148       self.mod_rooms.insert(community_id, HashSet::new());
149     }
150
151     self
152       .mod_rooms
153       .get_mut(&community_id)
154       .context(location_info!())?
155       .insert(id);
156     Ok(())
157   }
158
159   pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
160     // remove session from all rooms
161     for sessions in self.post_rooms.values_mut() {
162       sessions.remove(&id);
163     }
164
165     // Also leave all communities
166     // This avoids double messages
167     // TODO found a bug, whereby community messages like
168     // delete and remove aren't sent, because
169     // you left the community room
170     for sessions in self.community_rooms.values_mut() {
171       sessions.remove(&id);
172     }
173
174     // If the room doesn't exist yet
175     if self.post_rooms.get_mut(&post_id).is_none() {
176       self.post_rooms.insert(post_id, HashSet::new());
177     }
178
179     self
180       .post_rooms
181       .get_mut(&post_id)
182       .context(location_info!())?
183       .insert(id);
184
185     Ok(())
186   }
187
188   pub fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) -> Result<(), LemmyError> {
189     // remove session from all rooms
190     for sessions in self.user_rooms.values_mut() {
191       sessions.remove(&id);
192     }
193
194     // If the room doesn't exist yet
195     if self.user_rooms.get_mut(&user_id).is_none() {
196       self.user_rooms.insert(user_id, HashSet::new());
197     }
198
199     self
200       .user_rooms
201       .get_mut(&user_id)
202       .context(location_info!())?
203       .insert(id);
204
205     Ok(())
206   }
207
208   fn send_post_room_message<Response>(
209     &self,
210     op: &UserOperation,
211     response: &Response,
212     post_id: PostId,
213     websocket_id: Option<ConnectionId>,
214   ) -> Result<(), LemmyError>
215   where
216     Response: Serialize,
217   {
218     let res_str = &serialize_websocket_message(op, response)?;
219     if let Some(sessions) = self.post_rooms.get(&post_id) {
220       for id in sessions {
221         if let Some(my_id) = websocket_id {
222           if *id == my_id {
223             continue;
224           }
225         }
226         self.sendit(res_str, *id);
227       }
228     }
229     Ok(())
230   }
231
232   pub fn send_community_room_message<Response>(
233     &self,
234     op: &UserOperation,
235     response: &Response,
236     community_id: CommunityId,
237     websocket_id: Option<ConnectionId>,
238   ) -> Result<(), LemmyError>
239   where
240     Response: Serialize,
241   {
242     let res_str = &serialize_websocket_message(op, response)?;
243     if let Some(sessions) = self.community_rooms.get(&community_id) {
244       for id in sessions {
245         if let Some(my_id) = websocket_id {
246           if *id == my_id {
247             continue;
248           }
249         }
250         self.sendit(res_str, *id);
251       }
252     }
253     Ok(())
254   }
255
256   pub fn send_mod_room_message<Response>(
257     &self,
258     op: &UserOperation,
259     response: &Response,
260     community_id: CommunityId,
261     websocket_id: Option<ConnectionId>,
262   ) -> Result<(), LemmyError>
263   where
264     Response: Serialize,
265   {
266     let res_str = &serialize_websocket_message(op, response)?;
267     if let Some(sessions) = self.mod_rooms.get(&community_id) {
268       for id in sessions {
269         if let Some(my_id) = websocket_id {
270           if *id == my_id {
271             continue;
272           }
273         }
274         self.sendit(res_str, *id);
275       }
276     }
277     Ok(())
278   }
279
280   pub fn send_all_message<Response>(
281     &self,
282     op: &UserOperation,
283     response: &Response,
284     websocket_id: Option<ConnectionId>,
285   ) -> Result<(), LemmyError>
286   where
287     Response: Serialize,
288   {
289     let res_str = &serialize_websocket_message(op, response)?;
290     for id in self.sessions.keys() {
291       if let Some(my_id) = websocket_id {
292         if *id == my_id {
293           continue;
294         }
295       }
296       self.sendit(res_str, *id);
297     }
298     Ok(())
299   }
300
301   pub fn send_user_room_message<Response>(
302     &self,
303     op: &UserOperation,
304     response: &Response,
305     recipient_id: UserId,
306     websocket_id: Option<ConnectionId>,
307   ) -> Result<(), LemmyError>
308   where
309     Response: Serialize,
310   {
311     let res_str = &serialize_websocket_message(op, response)?;
312     if let Some(sessions) = self.user_rooms.get(&recipient_id) {
313       for id in sessions {
314         if let Some(my_id) = websocket_id {
315           if *id == my_id {
316             continue;
317           }
318         }
319         self.sendit(res_str, *id);
320       }
321     }
322     Ok(())
323   }
324
325   pub fn send_comment(
326     &self,
327     user_operation: &UserOperation,
328     comment: &CommentResponse,
329     websocket_id: Option<ConnectionId>,
330   ) -> Result<(), LemmyError> {
331     let mut comment_reply_sent = comment.clone();
332
333     // Strip out my specific user info
334     comment_reply_sent.comment_view.my_vote = None;
335
336     // Send it to the post room
337     let mut comment_post_sent = comment_reply_sent.clone();
338     // Remove the recipients here to separate mentions / user messages from post or community comments
339     comment_post_sent.recipient_ids = Vec::new();
340     self.send_post_room_message(
341       user_operation,
342       &comment_post_sent,
343       comment_post_sent.comment_view.post.id,
344       websocket_id,
345     )?;
346
347     // Send it to the community too
348     self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?;
349     self.send_community_room_message(
350       user_operation,
351       &comment_post_sent,
352       comment.comment_view.community.id,
353       websocket_id,
354     )?;
355
356     // Send it to the recipient(s) including the mentioned users
357     for recipient_id in &comment_reply_sent.recipient_ids {
358       self.send_user_room_message(
359         user_operation,
360         &comment_reply_sent,
361         *recipient_id,
362         websocket_id,
363       )?;
364     }
365
366     Ok(())
367   }
368
369   pub fn send_post(
370     &self,
371     user_operation: &UserOperation,
372     post_res: &PostResponse,
373     websocket_id: Option<ConnectionId>,
374   ) -> Result<(), LemmyError> {
375     let community_id = post_res.post_view.community.id;
376
377     // Don't send my data with it
378     let mut post_sent = post_res.clone();
379     post_sent.post_view.my_vote = None;
380
381     // Send it to /c/all and that community
382     self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?;
383     self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
384
385     // Send it to the post room
386     self.send_post_room_message(
387       user_operation,
388       &post_sent,
389       post_res.post_view.post.id,
390       websocket_id,
391     )?;
392
393     Ok(())
394   }
395
396   fn sendit(&self, message: &str, id: ConnectionId) {
397     if let Some(info) = self.sessions.get(&id) {
398       let _ = info.addr.do_send(WSMessage(message.to_owned()));
399     }
400   }
401
402   pub(super) fn parse_json_message(
403     &mut self,
404     msg: StandardMessage,
405     ctx: &mut Context<Self>,
406   ) -> impl Future<Output = Result<String, LemmyError>> {
407     let rate_limiter = self.rate_limiter.clone();
408
409     let ip: IPAddr = match self.sessions.get(&msg.id) {
410       Some(info) => info.ip.to_owned(),
411       None => "blank_ip".to_string(),
412     };
413
414     let context = LemmyContext {
415       pool: self.pool.clone(),
416       chat_server: ctx.address(),
417       client: self.client.to_owned(),
418       activity_queue: self.activity_queue.to_owned(),
419     };
420     let message_handler = self.message_handler;
421     async move {
422       let json: Value = serde_json::from_str(&msg.msg)?;
423       let data = &json["data"].to_string();
424       let op = &json["op"].as_str().ok_or(APIError {
425         message: "Unknown op type".to_string(),
426       })?;
427
428       let user_operation = UserOperation::from_str(&op)?;
429       let fut = (message_handler)(context, msg.id, user_operation.clone(), data);
430       match user_operation {
431         UserOperation::Register => rate_limiter.register().wrap(ip, fut).await,
432         UserOperation::CreatePost => rate_limiter.post().wrap(ip, fut).await,
433         UserOperation::CreateCommunity => rate_limiter.register().wrap(ip, fut).await,
434         _ => rate_limiter.message().wrap(ip, fut).await,
435       }
436     }
437   }
438 }