]> Untitled Git - lemmy.git/blob - src/websocket/chat_server.rs
611f44f9715ba6caa12e6b92e4c6d218787b8d7a
[lemmy.git] / src / websocket / chat_server.rs
1 use crate::{
2   websocket::handlers::{do_user_operation, to_json_string, Args},
3   LemmyContext,
4 };
5 use actix::prelude::*;
6 use anyhow::Context as acontext;
7 use background_jobs::QueueHandle;
8 use diesel::{
9   r2d2::{ConnectionManager, Pool},
10   PgConnection,
11 };
12 use lemmy_rate_limit::RateLimit;
13 use lemmy_structs::{comment::*, community::*, post::*, site::*, user::*, websocket::*};
14 use lemmy_utils::{
15   location_info,
16   APIError,
17   CommunityId,
18   ConnectionId,
19   IPAddr,
20   LemmyError,
21   PostId,
22   UserId,
23 };
24 use rand::rngs::ThreadRng;
25 use reqwest::Client;
26 use serde::Serialize;
27 use serde_json::Value;
28 use std::{
29   collections::{HashMap, HashSet},
30   str::FromStr,
31 };
32
33 /// `ChatServer` manages chat rooms and responsible for coordinating chat
34 /// session.
35 pub struct ChatServer {
36   /// A map from generated random ID to session addr
37   pub sessions: HashMap<ConnectionId, SessionInfo>,
38
39   /// A map from post_id to set of connectionIDs
40   pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
41
42   /// A map from community to set of connectionIDs
43   pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
44
45   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
46   /// sessions (IE clients)
47   pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
48
49   pub(super) rng: ThreadRng,
50
51   /// The DB Pool
52   pub(super) pool: Pool<ConnectionManager<PgConnection>>,
53
54   /// Rate limiting based on rate type and IP addr
55   pub(super) rate_limiter: RateLimit,
56
57   /// A list of the current captchas
58   pub(super) captchas: Vec<CaptchaItem>,
59
60   /// An HTTP Client
61   client: Client,
62
63   activity_queue: QueueHandle,
64 }
65
66 pub struct SessionInfo {
67   pub addr: Recipient<WSMessage>,
68   pub ip: IPAddr,
69 }
70
71 /// `ChatServer` is an actor. It maintains list of connection client session.
72 /// And manages available rooms. Peers send messages to other peers in same
73 /// room through `ChatServer`.
74 impl ChatServer {
75   pub fn startup(
76     pool: Pool<ConnectionManager<PgConnection>>,
77     rate_limiter: RateLimit,
78     client: Client,
79     activity_queue: QueueHandle,
80   ) -> ChatServer {
81     ChatServer {
82       sessions: HashMap::new(),
83       post_rooms: HashMap::new(),
84       community_rooms: HashMap::new(),
85       user_rooms: HashMap::new(),
86       rng: rand::thread_rng(),
87       pool,
88       rate_limiter,
89       captchas: Vec::new(),
90       client,
91       activity_queue,
92     }
93   }
94
95   pub fn join_community_room(
96     &mut self,
97     community_id: CommunityId,
98     id: ConnectionId,
99   ) -> Result<(), LemmyError> {
100     // remove session from all rooms
101     for sessions in self.community_rooms.values_mut() {
102       sessions.remove(&id);
103     }
104
105     // Also leave all post rooms
106     // This avoids double messages
107     for sessions in self.post_rooms.values_mut() {
108       sessions.remove(&id);
109     }
110
111     // If the room doesn't exist yet
112     if self.community_rooms.get_mut(&community_id).is_none() {
113       self.community_rooms.insert(community_id, HashSet::new());
114     }
115
116     self
117       .community_rooms
118       .get_mut(&community_id)
119       .context(location_info!())?
120       .insert(id);
121     Ok(())
122   }
123
124   pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
125     // remove session from all rooms
126     for sessions in self.post_rooms.values_mut() {
127       sessions.remove(&id);
128     }
129
130     // Also leave all communities
131     // This avoids double messages
132     // TODO found a bug, whereby community messages like
133     // delete and remove aren't sent, because
134     // you left the community room
135     for sessions in self.community_rooms.values_mut() {
136       sessions.remove(&id);
137     }
138
139     // If the room doesn't exist yet
140     if self.post_rooms.get_mut(&post_id).is_none() {
141       self.post_rooms.insert(post_id, HashSet::new());
142     }
143
144     self
145       .post_rooms
146       .get_mut(&post_id)
147       .context(location_info!())?
148       .insert(id);
149
150     Ok(())
151   }
152
153   pub fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) -> Result<(), LemmyError> {
154     // remove session from all rooms
155     for sessions in self.user_rooms.values_mut() {
156       sessions.remove(&id);
157     }
158
159     // If the room doesn't exist yet
160     if self.user_rooms.get_mut(&user_id).is_none() {
161       self.user_rooms.insert(user_id, HashSet::new());
162     }
163
164     self
165       .user_rooms
166       .get_mut(&user_id)
167       .context(location_info!())?
168       .insert(id);
169
170     Ok(())
171   }
172
173   fn send_post_room_message<Response>(
174     &self,
175     op: &UserOperation,
176     response: &Response,
177     post_id: PostId,
178     websocket_id: Option<ConnectionId>,
179   ) -> Result<(), LemmyError>
180   where
181     Response: Serialize,
182   {
183     let res_str = &to_json_string(op, response)?;
184     if let Some(sessions) = self.post_rooms.get(&post_id) {
185       for id in sessions {
186         if let Some(my_id) = websocket_id {
187           if *id == my_id {
188             continue;
189           }
190         }
191         self.sendit(res_str, *id);
192       }
193     }
194     Ok(())
195   }
196
197   pub fn send_community_room_message<Response>(
198     &self,
199     op: &UserOperation,
200     response: &Response,
201     community_id: CommunityId,
202     websocket_id: Option<ConnectionId>,
203   ) -> Result<(), LemmyError>
204   where
205     Response: Serialize,
206   {
207     let res_str = &to_json_string(op, response)?;
208     if let Some(sessions) = self.community_rooms.get(&community_id) {
209       for id in sessions {
210         if let Some(my_id) = websocket_id {
211           if *id == my_id {
212             continue;
213           }
214         }
215         self.sendit(res_str, *id);
216       }
217     }
218     Ok(())
219   }
220
221   pub fn send_all_message<Response>(
222     &self,
223     op: &UserOperation,
224     response: &Response,
225     websocket_id: Option<ConnectionId>,
226   ) -> Result<(), LemmyError>
227   where
228     Response: Serialize,
229   {
230     let res_str = &to_json_string(op, response)?;
231     for id in self.sessions.keys() {
232       if let Some(my_id) = websocket_id {
233         if *id == my_id {
234           continue;
235         }
236       }
237       self.sendit(res_str, *id);
238     }
239     Ok(())
240   }
241
242   pub fn send_user_room_message<Response>(
243     &self,
244     op: &UserOperation,
245     response: &Response,
246     recipient_id: UserId,
247     websocket_id: Option<ConnectionId>,
248   ) -> Result<(), LemmyError>
249   where
250     Response: Serialize,
251   {
252     let res_str = &to_json_string(op, response)?;
253     if let Some(sessions) = self.user_rooms.get(&recipient_id) {
254       for id in sessions {
255         if let Some(my_id) = websocket_id {
256           if *id == my_id {
257             continue;
258           }
259         }
260         self.sendit(res_str, *id);
261       }
262     }
263     Ok(())
264   }
265
266   pub fn send_comment(
267     &self,
268     user_operation: &UserOperation,
269     comment: &CommentResponse,
270     websocket_id: Option<ConnectionId>,
271   ) -> Result<(), LemmyError> {
272     let mut comment_reply_sent = comment.clone();
273     comment_reply_sent.comment.my_vote = None;
274     comment_reply_sent.comment.user_id = None;
275
276     let mut comment_post_sent = comment_reply_sent.clone();
277     comment_post_sent.recipient_ids = Vec::new();
278
279     // Send it to the post room
280     self.send_post_room_message(
281       user_operation,
282       &comment_post_sent,
283       comment_post_sent.comment.post_id,
284       websocket_id,
285     )?;
286
287     // Send it to the recipient(s) including the mentioned users
288     for recipient_id in &comment_reply_sent.recipient_ids {
289       self.send_user_room_message(
290         user_operation,
291         &comment_reply_sent,
292         *recipient_id,
293         websocket_id,
294       )?;
295     }
296
297     // Send it to the community too
298     self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?;
299     self.send_community_room_message(
300       user_operation,
301       &comment_post_sent,
302       comment.comment.community_id,
303       websocket_id,
304     )?;
305
306     Ok(())
307   }
308
309   pub fn send_post(
310     &self,
311     user_operation: &UserOperation,
312     post: &PostResponse,
313     websocket_id: Option<ConnectionId>,
314   ) -> Result<(), LemmyError> {
315     let community_id = post.post.community_id;
316
317     // Don't send my data with it
318     let mut post_sent = post.clone();
319     post_sent.post.my_vote = None;
320     post_sent.post.user_id = None;
321
322     // Send it to /c/all and that community
323     self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?;
324     self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
325
326     // Send it to the post room
327     self.send_post_room_message(user_operation, &post_sent, post.post.id, websocket_id)?;
328
329     Ok(())
330   }
331
332   fn sendit(&self, message: &str, id: ConnectionId) {
333     if let Some(info) = self.sessions.get(&id) {
334       let _ = info.addr.do_send(WSMessage(message.to_owned()));
335     }
336   }
337
338   pub(super) fn parse_json_message(
339     &mut self,
340     msg: StandardMessage,
341     ctx: &mut Context<Self>,
342   ) -> impl Future<Output = Result<String, LemmyError>> {
343     let addr = ctx.address();
344     let pool = self.pool.clone();
345     let rate_limiter = self.rate_limiter.clone();
346
347     let ip: IPAddr = match self.sessions.get(&msg.id) {
348       Some(info) => info.ip.to_owned(),
349       None => "blank_ip".to_string(),
350     };
351
352     let client = self.client.clone();
353     let activity_queue = self.activity_queue.clone();
354     async move {
355       let msg = msg;
356       let json: Value = serde_json::from_str(&msg.msg)?;
357       let data = &json["data"].to_string();
358       let op = &json["op"].as_str().ok_or(APIError {
359         message: "Unknown op type".to_string(),
360       })?;
361
362       let user_operation: UserOperation = UserOperation::from_str(&op)?;
363
364       let context = LemmyContext::new(pool, addr, client, activity_queue);
365       let args = Args {
366         context,
367         rate_limiter,
368         id: msg.id,
369         ip,
370         op: user_operation.clone(),
371         data,
372       };
373
374       match user_operation {
375         // User ops
376         UserOperation::Login => do_user_operation::<Login>(args).await,
377         UserOperation::Register => do_user_operation::<Register>(args).await,
378         UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await,
379         UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
380         UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
381         UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
382         UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
383         UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
384         UserOperation::MarkUserMentionAsRead => {
385           do_user_operation::<MarkUserMentionAsRead>(args).await
386         }
387         UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
388         UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
389         UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
390         UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
391         UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
392         UserOperation::PostJoin => do_user_operation::<PostJoin>(args).await,
393         UserOperation::CommunityJoin => do_user_operation::<CommunityJoin>(args).await,
394         UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
395
396         // Private Message ops
397         UserOperation::CreatePrivateMessage => {
398           do_user_operation::<CreatePrivateMessage>(args).await
399         }
400         UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
401         UserOperation::DeletePrivateMessage => {
402           do_user_operation::<DeletePrivateMessage>(args).await
403         }
404         UserOperation::MarkPrivateMessageAsRead => {
405           do_user_operation::<MarkPrivateMessageAsRead>(args).await
406         }
407         UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
408
409         // Site ops
410         UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
411         UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
412         UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
413         UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
414         UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
415         UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
416         UserOperation::Search => do_user_operation::<Search>(args).await,
417         UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
418         UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
419         UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
420
421         // Community ops
422         UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
423         UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
424         UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
425         UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
426         UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
427         UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
428         UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
429         UserOperation::GetFollowedCommunities => {
430           do_user_operation::<GetFollowedCommunities>(args).await
431         }
432         UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
433         UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
434
435         // Post ops
436         UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
437         UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
438         UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
439         UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
440         UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
441         UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
442         UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
443         UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
444         UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
445         UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
446
447         // Comment ops
448         UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
449         UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
450         UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
451         UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
452         UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
453         UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
454         UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
455         UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,
456       }
457     }
458   }
459 }