]> Untitled Git - lemmy.git/blob - src/websocket/chat_server.rs
Isomorphic docker (#1124)
[lemmy.git] / src / websocket / chat_server.rs
1 use crate::{
2   websocket::{
3     handlers::{do_user_operation, to_json_string, Args},
4     messages::*,
5     UserOperation,
6   },
7   LemmyContext,
8 };
9 use actix::prelude::*;
10 use anyhow::Context as acontext;
11 use background_jobs::QueueHandle;
12 use diesel::{
13   r2d2::{ConnectionManager, Pool},
14   PgConnection,
15 };
16 use lemmy_api_structs::{comment::*, community::*, post::*, site::*, user::*};
17 use lemmy_rate_limit::RateLimit;
18 use lemmy_utils::{
19   location_info,
20   APIError,
21   CommunityId,
22   ConnectionId,
23   IPAddr,
24   LemmyError,
25   PostId,
26   UserId,
27 };
28 use rand::rngs::ThreadRng;
29 use reqwest::Client;
30 use serde::Serialize;
31 use serde_json::Value;
32 use std::{
33   collections::{HashMap, HashSet},
34   str::FromStr,
35 };
36
37 /// `ChatServer` manages chat rooms and responsible for coordinating chat
38 /// session.
39 pub struct ChatServer {
40   /// A map from generated random ID to session addr
41   pub sessions: HashMap<ConnectionId, SessionInfo>,
42
43   /// A map from post_id to set of connectionIDs
44   pub post_rooms: HashMap<PostId, HashSet<ConnectionId>>,
45
46   /// A map from community to set of connectionIDs
47   pub community_rooms: HashMap<CommunityId, HashSet<ConnectionId>>,
48
49   /// A map from user id to its connection ID for joined users. Remember a user can have multiple
50   /// sessions (IE clients)
51   pub(super) user_rooms: HashMap<UserId, HashSet<ConnectionId>>,
52
53   pub(super) rng: ThreadRng,
54
55   /// The DB Pool
56   pub(super) pool: Pool<ConnectionManager<PgConnection>>,
57
58   /// Rate limiting based on rate type and IP addr
59   pub(super) rate_limiter: RateLimit,
60
61   /// A list of the current captchas
62   pub(super) captchas: Vec<CaptchaItem>,
63
64   /// An HTTP Client
65   client: Client,
66
67   activity_queue: QueueHandle,
68 }
69
70 pub struct SessionInfo {
71   pub addr: Recipient<WSMessage>,
72   pub ip: IPAddr,
73 }
74
75 /// `ChatServer` is an actor. It maintains list of connection client session.
76 /// And manages available rooms. Peers send messages to other peers in same
77 /// room through `ChatServer`.
78 impl ChatServer {
79   pub fn startup(
80     pool: Pool<ConnectionManager<PgConnection>>,
81     rate_limiter: RateLimit,
82     client: Client,
83     activity_queue: QueueHandle,
84   ) -> ChatServer {
85     ChatServer {
86       sessions: HashMap::new(),
87       post_rooms: HashMap::new(),
88       community_rooms: HashMap::new(),
89       user_rooms: HashMap::new(),
90       rng: rand::thread_rng(),
91       pool,
92       rate_limiter,
93       captchas: Vec::new(),
94       client,
95       activity_queue,
96     }
97   }
98
99   pub fn join_community_room(
100     &mut self,
101     community_id: CommunityId,
102     id: ConnectionId,
103   ) -> Result<(), LemmyError> {
104     // remove session from all rooms
105     for sessions in self.community_rooms.values_mut() {
106       sessions.remove(&id);
107     }
108
109     // Also leave all post rooms
110     // This avoids double messages
111     for sessions in self.post_rooms.values_mut() {
112       sessions.remove(&id);
113     }
114
115     // If the room doesn't exist yet
116     if self.community_rooms.get_mut(&community_id).is_none() {
117       self.community_rooms.insert(community_id, HashSet::new());
118     }
119
120     self
121       .community_rooms
122       .get_mut(&community_id)
123       .context(location_info!())?
124       .insert(id);
125     Ok(())
126   }
127
128   pub fn join_post_room(&mut self, post_id: PostId, id: ConnectionId) -> Result<(), LemmyError> {
129     // remove session from all rooms
130     for sessions in self.post_rooms.values_mut() {
131       sessions.remove(&id);
132     }
133
134     // Also leave all communities
135     // This avoids double messages
136     // TODO found a bug, whereby community messages like
137     // delete and remove aren't sent, because
138     // you left the community room
139     for sessions in self.community_rooms.values_mut() {
140       sessions.remove(&id);
141     }
142
143     // If the room doesn't exist yet
144     if self.post_rooms.get_mut(&post_id).is_none() {
145       self.post_rooms.insert(post_id, HashSet::new());
146     }
147
148     self
149       .post_rooms
150       .get_mut(&post_id)
151       .context(location_info!())?
152       .insert(id);
153
154     Ok(())
155   }
156
157   pub fn join_user_room(&mut self, user_id: UserId, id: ConnectionId) -> Result<(), LemmyError> {
158     // remove session from all rooms
159     for sessions in self.user_rooms.values_mut() {
160       sessions.remove(&id);
161     }
162
163     // If the room doesn't exist yet
164     if self.user_rooms.get_mut(&user_id).is_none() {
165       self.user_rooms.insert(user_id, HashSet::new());
166     }
167
168     self
169       .user_rooms
170       .get_mut(&user_id)
171       .context(location_info!())?
172       .insert(id);
173
174     Ok(())
175   }
176
177   fn send_post_room_message<Response>(
178     &self,
179     op: &UserOperation,
180     response: &Response,
181     post_id: PostId,
182     websocket_id: Option<ConnectionId>,
183   ) -> Result<(), LemmyError>
184   where
185     Response: Serialize,
186   {
187     let res_str = &to_json_string(op, response)?;
188     if let Some(sessions) = self.post_rooms.get(&post_id) {
189       for id in sessions {
190         if let Some(my_id) = websocket_id {
191           if *id == my_id {
192             continue;
193           }
194         }
195         self.sendit(res_str, *id);
196       }
197     }
198     Ok(())
199   }
200
201   pub fn send_community_room_message<Response>(
202     &self,
203     op: &UserOperation,
204     response: &Response,
205     community_id: CommunityId,
206     websocket_id: Option<ConnectionId>,
207   ) -> Result<(), LemmyError>
208   where
209     Response: Serialize,
210   {
211     let res_str = &to_json_string(op, response)?;
212     if let Some(sessions) = self.community_rooms.get(&community_id) {
213       for id in sessions {
214         if let Some(my_id) = websocket_id {
215           if *id == my_id {
216             continue;
217           }
218         }
219         self.sendit(res_str, *id);
220       }
221     }
222     Ok(())
223   }
224
225   pub fn send_all_message<Response>(
226     &self,
227     op: &UserOperation,
228     response: &Response,
229     websocket_id: Option<ConnectionId>,
230   ) -> Result<(), LemmyError>
231   where
232     Response: Serialize,
233   {
234     let res_str = &to_json_string(op, response)?;
235     for id in self.sessions.keys() {
236       if let Some(my_id) = websocket_id {
237         if *id == my_id {
238           continue;
239         }
240       }
241       self.sendit(res_str, *id);
242     }
243     Ok(())
244   }
245
246   pub fn send_user_room_message<Response>(
247     &self,
248     op: &UserOperation,
249     response: &Response,
250     recipient_id: UserId,
251     websocket_id: Option<ConnectionId>,
252   ) -> Result<(), LemmyError>
253   where
254     Response: Serialize,
255   {
256     let res_str = &to_json_string(op, response)?;
257     if let Some(sessions) = self.user_rooms.get(&recipient_id) {
258       for id in sessions {
259         if let Some(my_id) = websocket_id {
260           if *id == my_id {
261             continue;
262           }
263         }
264         self.sendit(res_str, *id);
265       }
266     }
267     Ok(())
268   }
269
270   pub fn send_comment(
271     &self,
272     user_operation: &UserOperation,
273     comment: &CommentResponse,
274     websocket_id: Option<ConnectionId>,
275   ) -> Result<(), LemmyError> {
276     let mut comment_reply_sent = comment.clone();
277     comment_reply_sent.comment.my_vote = None;
278     comment_reply_sent.comment.user_id = None;
279
280     let mut comment_post_sent = comment_reply_sent.clone();
281     comment_post_sent.recipient_ids = Vec::new();
282
283     // Send it to the post room
284     self.send_post_room_message(
285       user_operation,
286       &comment_post_sent,
287       comment_post_sent.comment.post_id,
288       websocket_id,
289     )?;
290
291     // Send it to the recipient(s) including the mentioned users
292     for recipient_id in &comment_reply_sent.recipient_ids {
293       self.send_user_room_message(
294         user_operation,
295         &comment_reply_sent,
296         *recipient_id,
297         websocket_id,
298       )?;
299     }
300
301     // Send it to the community too
302     self.send_community_room_message(user_operation, &comment_post_sent, 0, websocket_id)?;
303     self.send_community_room_message(
304       user_operation,
305       &comment_post_sent,
306       comment.comment.community_id,
307       websocket_id,
308     )?;
309
310     Ok(())
311   }
312
313   pub fn send_post(
314     &self,
315     user_operation: &UserOperation,
316     post: &PostResponse,
317     websocket_id: Option<ConnectionId>,
318   ) -> Result<(), LemmyError> {
319     let community_id = post.post.community_id;
320
321     // Don't send my data with it
322     let mut post_sent = post.clone();
323     post_sent.post.my_vote = None;
324     post_sent.post.user_id = None;
325
326     // Send it to /c/all and that community
327     self.send_community_room_message(user_operation, &post_sent, 0, websocket_id)?;
328     self.send_community_room_message(user_operation, &post_sent, community_id, websocket_id)?;
329
330     // Send it to the post room
331     self.send_post_room_message(user_operation, &post_sent, post.post.id, websocket_id)?;
332
333     Ok(())
334   }
335
336   fn sendit(&self, message: &str, id: ConnectionId) {
337     if let Some(info) = self.sessions.get(&id) {
338       let _ = info.addr.do_send(WSMessage(message.to_owned()));
339     }
340   }
341
342   pub(super) fn parse_json_message(
343     &mut self,
344     msg: StandardMessage,
345     ctx: &mut Context<Self>,
346   ) -> impl Future<Output = Result<String, LemmyError>> {
347     let addr = ctx.address();
348     let pool = self.pool.clone();
349     let rate_limiter = self.rate_limiter.clone();
350
351     let ip: IPAddr = match self.sessions.get(&msg.id) {
352       Some(info) => info.ip.to_owned(),
353       None => "blank_ip".to_string(),
354     };
355
356     let client = self.client.clone();
357     let activity_queue = self.activity_queue.clone();
358     async move {
359       let msg = msg;
360       let json: Value = serde_json::from_str(&msg.msg)?;
361       let data = &json["data"].to_string();
362       let op = &json["op"].as_str().ok_or(APIError {
363         message: "Unknown op type".to_string(),
364       })?;
365
366       let user_operation: UserOperation = UserOperation::from_str(&op)?;
367
368       let context = LemmyContext {
369         pool,
370         chat_server: addr,
371         client,
372         activity_queue,
373       };
374       let args = Args {
375         context,
376         rate_limiter,
377         id: msg.id,
378         ip,
379         op: user_operation.clone(),
380         data,
381       };
382
383       match user_operation {
384         // User ops
385         UserOperation::Login => do_user_operation::<Login>(args).await,
386         UserOperation::Register => do_user_operation::<Register>(args).await,
387         UserOperation::GetCaptcha => do_user_operation::<GetCaptcha>(args).await,
388         UserOperation::GetUserDetails => do_user_operation::<GetUserDetails>(args).await,
389         UserOperation::GetReplies => do_user_operation::<GetReplies>(args).await,
390         UserOperation::AddAdmin => do_user_operation::<AddAdmin>(args).await,
391         UserOperation::BanUser => do_user_operation::<BanUser>(args).await,
392         UserOperation::GetUserMentions => do_user_operation::<GetUserMentions>(args).await,
393         UserOperation::MarkUserMentionAsRead => {
394           do_user_operation::<MarkUserMentionAsRead>(args).await
395         }
396         UserOperation::MarkAllAsRead => do_user_operation::<MarkAllAsRead>(args).await,
397         UserOperation::DeleteAccount => do_user_operation::<DeleteAccount>(args).await,
398         UserOperation::PasswordReset => do_user_operation::<PasswordReset>(args).await,
399         UserOperation::PasswordChange => do_user_operation::<PasswordChange>(args).await,
400         UserOperation::UserJoin => do_user_operation::<UserJoin>(args).await,
401         UserOperation::PostJoin => do_user_operation::<PostJoin>(args).await,
402         UserOperation::CommunityJoin => do_user_operation::<CommunityJoin>(args).await,
403         UserOperation::SaveUserSettings => do_user_operation::<SaveUserSettings>(args).await,
404
405         // Private Message ops
406         UserOperation::CreatePrivateMessage => {
407           do_user_operation::<CreatePrivateMessage>(args).await
408         }
409         UserOperation::EditPrivateMessage => do_user_operation::<EditPrivateMessage>(args).await,
410         UserOperation::DeletePrivateMessage => {
411           do_user_operation::<DeletePrivateMessage>(args).await
412         }
413         UserOperation::MarkPrivateMessageAsRead => {
414           do_user_operation::<MarkPrivateMessageAsRead>(args).await
415         }
416         UserOperation::GetPrivateMessages => do_user_operation::<GetPrivateMessages>(args).await,
417
418         // Site ops
419         UserOperation::GetModlog => do_user_operation::<GetModlog>(args).await,
420         UserOperation::CreateSite => do_user_operation::<CreateSite>(args).await,
421         UserOperation::EditSite => do_user_operation::<EditSite>(args).await,
422         UserOperation::GetSite => do_user_operation::<GetSite>(args).await,
423         UserOperation::GetSiteConfig => do_user_operation::<GetSiteConfig>(args).await,
424         UserOperation::SaveSiteConfig => do_user_operation::<SaveSiteConfig>(args).await,
425         UserOperation::Search => do_user_operation::<Search>(args).await,
426         UserOperation::TransferCommunity => do_user_operation::<TransferCommunity>(args).await,
427         UserOperation::TransferSite => do_user_operation::<TransferSite>(args).await,
428         UserOperation::ListCategories => do_user_operation::<ListCategories>(args).await,
429
430         // Community ops
431         UserOperation::GetCommunity => do_user_operation::<GetCommunity>(args).await,
432         UserOperation::ListCommunities => do_user_operation::<ListCommunities>(args).await,
433         UserOperation::CreateCommunity => do_user_operation::<CreateCommunity>(args).await,
434         UserOperation::EditCommunity => do_user_operation::<EditCommunity>(args).await,
435         UserOperation::DeleteCommunity => do_user_operation::<DeleteCommunity>(args).await,
436         UserOperation::RemoveCommunity => do_user_operation::<RemoveCommunity>(args).await,
437         UserOperation::FollowCommunity => do_user_operation::<FollowCommunity>(args).await,
438         UserOperation::GetFollowedCommunities => {
439           do_user_operation::<GetFollowedCommunities>(args).await
440         }
441         UserOperation::BanFromCommunity => do_user_operation::<BanFromCommunity>(args).await,
442         UserOperation::AddModToCommunity => do_user_operation::<AddModToCommunity>(args).await,
443
444         // Post ops
445         UserOperation::CreatePost => do_user_operation::<CreatePost>(args).await,
446         UserOperation::GetPost => do_user_operation::<GetPost>(args).await,
447         UserOperation::GetPosts => do_user_operation::<GetPosts>(args).await,
448         UserOperation::EditPost => do_user_operation::<EditPost>(args).await,
449         UserOperation::DeletePost => do_user_operation::<DeletePost>(args).await,
450         UserOperation::RemovePost => do_user_operation::<RemovePost>(args).await,
451         UserOperation::LockPost => do_user_operation::<LockPost>(args).await,
452         UserOperation::StickyPost => do_user_operation::<StickyPost>(args).await,
453         UserOperation::CreatePostLike => do_user_operation::<CreatePostLike>(args).await,
454         UserOperation::SavePost => do_user_operation::<SavePost>(args).await,
455
456         // Comment ops
457         UserOperation::CreateComment => do_user_operation::<CreateComment>(args).await,
458         UserOperation::EditComment => do_user_operation::<EditComment>(args).await,
459         UserOperation::DeleteComment => do_user_operation::<DeleteComment>(args).await,
460         UserOperation::RemoveComment => do_user_operation::<RemoveComment>(args).await,
461         UserOperation::MarkCommentAsRead => do_user_operation::<MarkCommentAsRead>(args).await,
462         UserOperation::SaveComment => do_user_operation::<SaveComment>(args).await,
463         UserOperation::GetComments => do_user_operation::<GetComments>(args).await,
464         UserOperation::CreateCommentLike => do_user_operation::<CreateCommentLike>(args).await,
465       }
466     }
467   }
468 }