]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'master' into federation
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
21 use crate::api::*;
22 use crate::apub::puller::*;
23 use crate::Settings;
24
25 /// Chat server sends this messages to session
26 #[derive(Message)]
27 #[rtype(result = "()")]
28 pub struct WSMessage(pub String);
29
30 /// Message for chat server communications
31
32 /// New chat session is created
33 #[derive(Message)]
34 #[rtype(usize)]
35 pub struct Connect {
36   pub addr: Recipient<WSMessage>,
37   pub ip: String,
38 }
39
40 /// Session is disconnected
41 #[derive(Message)]
42 #[rtype(result = "()")]
43 pub struct Disconnect {
44   pub id: usize,
45   pub ip: String,
46 }
47
48 // TODO this is unused rn
49 /// Send message to specific room
50 #[derive(Message)]
51 #[rtype(result = "()")]
52 pub struct ClientMessage {
53   /// Id of the client session
54   pub id: usize,
55   /// Peer message
56   pub msg: String,
57   /// Room name
58   pub room: String,
59 }
60
61 #[derive(Serialize, Deserialize, Message)]
62 #[rtype(String)]
63 pub struct StandardMessage {
64   /// Id of the client session
65   pub id: usize,
66   /// Peer message
67   pub msg: String,
68 }
69
70 #[derive(Debug)]
71 pub struct RateLimitBucket {
72   last_checked: SystemTime,
73   allowance: f64,
74 }
75
76 pub struct SessionInfo {
77   pub addr: Recipient<WSMessage>,
78   pub ip: String,
79 }
80
81 /// `ChatServer` manages chat rooms and responsible for coordinating chat
82 /// session. implementation is super primitive
83 pub struct ChatServer {
84   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
85   rate_limits: HashMap<String, RateLimitBucket>,
86   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
87   rng: ThreadRng,
88   db: Pool<ConnectionManager<PgConnection>>,
89 }
90
91 // impl Default for ChatServer {
92 //   fn default(nah: String) -> ChatServer {
93 //     // default room
94 //     let rooms = HashMap::new();
95
96 //     ChatServer {
97 //       sessions: HashMap::new(),
98 //       rate_limits: HashMap::new(),
99 //       rooms,
100 //       rng: rand::thread_rng(),
101 //       nah: nah,
102 //     }
103 //   }
104 // }
105
106 impl ChatServer {
107   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
108     // default room
109     let rooms = HashMap::new();
110
111     ChatServer {
112       sessions: HashMap::new(),
113       rate_limits: HashMap::new(),
114       rooms,
115       rng: rand::thread_rng(),
116       db,
117     }
118   }
119
120   /// Send message to all users in the room
121   fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
122     if let Some(sessions) = self.rooms.get(&room) {
123       for id in sessions {
124         if *id != skip_id {
125           if let Some(info) = self.sessions.get(id) {
126             let _ = info.addr.do_send(WSMessage(message.to_owned()));
127           }
128         }
129       }
130     }
131   }
132
133   fn join_room(&mut self, room_id: i32, id: usize) {
134     // remove session from all rooms
135     for sessions in self.rooms.values_mut() {
136       sessions.remove(&id);
137     }
138
139     // If the room doesn't exist yet
140     if self.rooms.get_mut(&room_id).is_none() {
141       self.rooms.insert(room_id, HashSet::new());
142     }
143
144     self.rooms.get_mut(&room_id).unwrap().insert(id);
145   }
146
147   fn send_community_message(
148     &self,
149     community_id: i32,
150     message: &str,
151     skip_id: usize,
152   ) -> Result<(), Error> {
153     use crate::db::post_view::*;
154     use crate::db::*;
155
156     let conn = self.db.get()?;
157
158     let posts = PostQueryBuilder::create(&conn)
159       .listing_type(ListingType::Community)
160       .sort(&SortType::New)
161       .for_community_id(community_id)
162       .limit(9999)
163       .list()?;
164
165     for post in posts {
166       self.send_room_message(post.id, message, skip_id);
167     }
168
169     Ok(())
170   }
171
172   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
173     self.check_rate_limit_full(
174       id,
175       Settings::get().rate_limit.register,
176       Settings::get().rate_limit.register_per_second,
177     )
178   }
179
180   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
181     self.check_rate_limit_full(
182       id,
183       Settings::get().rate_limit.post,
184       Settings::get().rate_limit.post_per_second,
185     )
186   }
187
188   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
189     self.check_rate_limit_full(
190       id,
191       Settings::get().rate_limit.message,
192       Settings::get().rate_limit.message_per_second,
193     )
194   }
195
196   #[allow(clippy::float_cmp)]
197   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
198     if let Some(info) = self.sessions.get(&id) {
199       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
200         // The initial value
201         if rate_limit.allowance == -2f64 {
202           rate_limit.allowance = rate as f64;
203         };
204
205         let current = SystemTime::now();
206         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
207         rate_limit.last_checked = current;
208         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
209         if rate_limit.allowance > rate as f64 {
210           rate_limit.allowance = rate as f64;
211         }
212
213         if rate_limit.allowance < 1.0 {
214           println!(
215             "Rate limited IP: {}, time_passed: {}, allowance: {}",
216             &info.ip, time_passed, rate_limit.allowance
217           );
218           Err(
219             APIError {
220               op: "Rate Limit".to_string(),
221               message: format!("Too many requests. {} per {} seconds", rate, per),
222             }
223             .into(),
224           )
225         } else {
226           rate_limit.allowance -= 1.0;
227           Ok(())
228         }
229       } else {
230         Ok(())
231       }
232     } else {
233       Ok(())
234     }
235   }
236 }
237
238 /// Make actor from `ChatServer`
239 impl Actor for ChatServer {
240   /// We are going to use simple Context, we just need ability to communicate
241   /// with other actors.
242   type Context = Context<Self>;
243 }
244
245 /// Handler for Connect message.
246 ///
247 /// Register new session and assign unique id to this session
248 impl Handler<Connect> for ChatServer {
249   type Result = usize;
250
251   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
252     // notify all users in same room
253     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
254
255     // register session with random id
256     let id = self.rng.gen::<usize>();
257     println!("{} joined", &msg.ip);
258
259     self.sessions.insert(
260       id,
261       SessionInfo {
262         addr: msg.addr,
263         ip: msg.ip.to_owned(),
264       },
265     );
266
267     if self.rate_limits.get(&msg.ip).is_none() {
268       self.rate_limits.insert(
269         msg.ip,
270         RateLimitBucket {
271           last_checked: SystemTime::now(),
272           allowance: -2f64,
273         },
274       );
275     }
276
277     id
278   }
279 }
280
281 /// Handler for Disconnect message.
282 impl Handler<Disconnect> for ChatServer {
283   type Result = ();
284
285   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
286     // let mut rooms: Vec<i32> = Vec::new();
287
288     // remove address
289     if self.sessions.remove(&msg.id).is_some() {
290       // remove session from all rooms
291       for sessions in self.rooms.values_mut() {
292         if sessions.remove(&msg.id) {
293           // rooms.push(*id);
294         }
295       }
296     }
297   }
298 }
299
300 /// Handler for Message message.
301 impl Handler<StandardMessage> for ChatServer {
302   type Result = MessageResult<StandardMessage>;
303
304   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
305     let msg_out = match parse_json_message(self, msg) {
306       Ok(m) => m,
307       Err(e) => e.to_string(),
308     };
309
310     MessageResult(msg_out)
311   }
312 }
313
314 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
315   let json: Value = serde_json::from_str(&msg.msg)?;
316   let data = &json["data"].to_string();
317   let op = &json["op"].as_str().ok_or(APIError {
318     op: "Unknown op type".to_string(),
319     message: "Unknown op type".to_string(),
320   })?;
321
322   let conn = chat.db.get()?;
323
324   let user_operation: UserOperation = UserOperation::from_str(&op)?;
325
326   match user_operation {
327     UserOperation::Login => {
328       let login: Login = serde_json::from_str(data)?;
329       let res = Oper::new(user_operation, login).perform(&conn)?;
330       Ok(serde_json::to_string(&res)?)
331     }
332     UserOperation::Register => {
333       let register: Register = serde_json::from_str(data)?;
334       let res = Oper::new(user_operation, register).perform(&conn);
335       if res.is_ok() {
336         chat.check_rate_limit_register(msg.id)?;
337       }
338       Ok(serde_json::to_string(&res?)?)
339     }
340     UserOperation::GetUserDetails => {
341       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
342       let res = Oper::new(user_operation, get_user_details).perform(&conn)?;
343       Ok(serde_json::to_string(&res)?)
344     }
345     UserOperation::SaveUserSettings => {
346       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
347       let res = Oper::new(user_operation, save_user_settings).perform(&conn)?;
348       Ok(serde_json::to_string(&res)?)
349     }
350     UserOperation::AddAdmin => {
351       let add_admin: AddAdmin = serde_json::from_str(data)?;
352       let res = Oper::new(user_operation, add_admin).perform(&conn)?;
353       Ok(serde_json::to_string(&res)?)
354     }
355     UserOperation::BanUser => {
356       let ban_user: BanUser = serde_json::from_str(data)?;
357       let res = Oper::new(user_operation, ban_user).perform(&conn)?;
358       Ok(serde_json::to_string(&res)?)
359     }
360     UserOperation::GetReplies => {
361       let get_replies: GetReplies = serde_json::from_str(data)?;
362       let res = Oper::new(user_operation, get_replies).perform(&conn)?;
363       Ok(serde_json::to_string(&res)?)
364     }
365     UserOperation::GetUserMentions => {
366       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
367       let res = Oper::new(user_operation, get_user_mentions).perform(&conn)?;
368       Ok(serde_json::to_string(&res)?)
369     }
370     UserOperation::EditUserMention => {
371       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
372       let res = Oper::new(user_operation, edit_user_mention).perform(&conn)?;
373       Ok(serde_json::to_string(&res)?)
374     }
375     UserOperation::MarkAllAsRead => {
376       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
377       let res = Oper::new(user_operation, mark_all_as_read).perform(&conn)?;
378       Ok(serde_json::to_string(&res)?)
379     }
380     UserOperation::GetCommunity => {
381       let mut get_community: GetCommunity = serde_json::from_str(data)?;
382       if Settings::get().federation_enabled && get_community.name.is_some() {
383         let name = &get_community.name.unwrap();
384         let remote_community = if name.contains('@') {
385           // TODO: need to support sort, filter etc for remote communities
386           get_remote_community(name.to_owned())?
387         } else {
388           get_community.name = Some(name.replace("!", ""));
389           Oper::new(user_operation, get_community).perform(&conn)?
390         };
391         Ok(serde_json::to_string(&remote_community)?)
392       } else {
393         let res = Oper::new(user_operation, get_community).perform(&conn)?;
394         Ok(serde_json::to_string(&res)?)
395       }
396     }
397     UserOperation::ListCommunities => {
398       if Settings::get().federation_enabled {
399         let res = get_all_communities()?;
400         let val = ListCommunitiesResponse {
401           op: UserOperation::ListCommunities.to_string(),
402           communities: res,
403         };
404         Ok(serde_json::to_string(&val)?)
405       } else {
406         let list_communities: ListCommunities = serde_json::from_str(data)?;
407         let res = Oper::new(user_operation, list_communities).perform(&conn)?;
408         Ok(serde_json::to_string(&res)?)
409       }
410     }
411     UserOperation::CreateCommunity => {
412       chat.check_rate_limit_register(msg.id)?;
413       let create_community: CreateCommunity = serde_json::from_str(data)?;
414       let res = Oper::new(user_operation, create_community).perform(&conn)?;
415       Ok(serde_json::to_string(&res)?)
416     }
417     UserOperation::EditCommunity => {
418       let edit_community: EditCommunity = serde_json::from_str(data)?;
419       let res = Oper::new(user_operation, edit_community).perform(&conn)?;
420       let mut community_sent: CommunityResponse = res.clone();
421       community_sent.community.user_id = None;
422       community_sent.community.subscribed = None;
423       let community_sent_str = serde_json::to_string(&community_sent)?;
424       chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
425       Ok(serde_json::to_string(&res)?)
426     }
427     UserOperation::FollowCommunity => {
428       let follow_community: FollowCommunity = serde_json::from_str(data)?;
429       let res = Oper::new(user_operation, follow_community).perform(&conn)?;
430       Ok(serde_json::to_string(&res)?)
431     }
432     UserOperation::GetFollowedCommunities => {
433       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
434       let res = Oper::new(user_operation, followed_communities).perform(&conn)?;
435       Ok(serde_json::to_string(&res)?)
436     }
437     UserOperation::BanFromCommunity => {
438       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
439       let community_id = ban_from_community.community_id;
440       let res = Oper::new(user_operation, ban_from_community).perform(&conn)?;
441       let res_str = serde_json::to_string(&res)?;
442       chat.send_community_message(community_id, &res_str, msg.id)?;
443       Ok(res_str)
444     }
445     UserOperation::AddModToCommunity => {
446       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
447       let community_id = mod_add_to_community.community_id;
448       let res = Oper::new(user_operation, mod_add_to_community).perform(&conn)?;
449       let res_str = serde_json::to_string(&res)?;
450       chat.send_community_message(community_id, &res_str, msg.id)?;
451       Ok(res_str)
452     }
453     UserOperation::ListCategories => {
454       let list_categories: ListCategories = ListCategories;
455       let res = Oper::new(user_operation, list_categories).perform(&conn)?;
456       Ok(serde_json::to_string(&res)?)
457     }
458     UserOperation::CreatePost => {
459       chat.check_rate_limit_post(msg.id)?;
460       let create_post: CreatePost = serde_json::from_str(data)?;
461       let res = Oper::new(user_operation, create_post).perform(&conn)?;
462       Ok(serde_json::to_string(&res)?)
463     }
464     UserOperation::GetPost => {
465       let get_post: GetPost = serde_json::from_str(data)?;
466       chat.join_room(get_post.id, msg.id);
467       let res = Oper::new(user_operation, get_post).perform(&conn)?;
468       Ok(serde_json::to_string(&res)?)
469     }
470     UserOperation::GetPosts => {
471       let get_posts: GetPosts = serde_json::from_str(data)?;
472       let res = Oper::new(user_operation, get_posts).perform(&conn)?;
473       Ok(serde_json::to_string(&res)?)
474     }
475     UserOperation::CreatePostLike => {
476       chat.check_rate_limit_message(msg.id)?;
477       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
478       let res = Oper::new(user_operation, create_post_like).perform(&conn)?;
479       Ok(serde_json::to_string(&res)?)
480     }
481     UserOperation::EditPost => {
482       let edit_post: EditPost = serde_json::from_str(data)?;
483       let res = Oper::new(user_operation, edit_post).perform(&conn)?;
484       let mut post_sent = res.clone();
485       post_sent.post.my_vote = None;
486       let post_sent_str = serde_json::to_string(&post_sent)?;
487       chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
488       Ok(serde_json::to_string(&res)?)
489     }
490     UserOperation::SavePost => {
491       let save_post: SavePost = serde_json::from_str(data)?;
492       let res = Oper::new(user_operation, save_post).perform(&conn)?;
493       Ok(serde_json::to_string(&res)?)
494     }
495     UserOperation::CreateComment => {
496       chat.check_rate_limit_message(msg.id)?;
497       let create_comment: CreateComment = serde_json::from_str(data)?;
498       let post_id = create_comment.post_id;
499       let res = Oper::new(user_operation, create_comment).perform(&conn)?;
500       let mut comment_sent = res.clone();
501       comment_sent.comment.my_vote = None;
502       comment_sent.comment.user_id = None;
503       let comment_sent_str = serde_json::to_string(&comment_sent)?;
504       chat.send_room_message(post_id, &comment_sent_str, msg.id);
505       Ok(serde_json::to_string(&res)?)
506     }
507     UserOperation::EditComment => {
508       let edit_comment: EditComment = serde_json::from_str(data)?;
509       let post_id = edit_comment.post_id;
510       let res = Oper::new(user_operation, edit_comment).perform(&conn)?;
511       let mut comment_sent = res.clone();
512       comment_sent.comment.my_vote = None;
513       comment_sent.comment.user_id = None;
514       let comment_sent_str = serde_json::to_string(&comment_sent)?;
515       chat.send_room_message(post_id, &comment_sent_str, msg.id);
516       Ok(serde_json::to_string(&res)?)
517     }
518     UserOperation::SaveComment => {
519       let save_comment: SaveComment = serde_json::from_str(data)?;
520       let res = Oper::new(user_operation, save_comment).perform(&conn)?;
521       Ok(serde_json::to_string(&res)?)
522     }
523     UserOperation::CreateCommentLike => {
524       chat.check_rate_limit_message(msg.id)?;
525       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
526       let post_id = create_comment_like.post_id;
527       let res = Oper::new(user_operation, create_comment_like).perform(&conn)?;
528       let mut comment_sent = res.clone();
529       comment_sent.comment.my_vote = None;
530       comment_sent.comment.user_id = None;
531       let comment_sent_str = serde_json::to_string(&comment_sent)?;
532       chat.send_room_message(post_id, &comment_sent_str, msg.id);
533       Ok(serde_json::to_string(&res)?)
534     }
535     UserOperation::GetModlog => {
536       let get_modlog: GetModlog = serde_json::from_str(data)?;
537       let res = Oper::new(user_operation, get_modlog).perform(&conn)?;
538       Ok(serde_json::to_string(&res)?)
539     }
540     UserOperation::CreateSite => {
541       let create_site: CreateSite = serde_json::from_str(data)?;
542       let res = Oper::new(user_operation, create_site).perform(&conn)?;
543       Ok(serde_json::to_string(&res)?)
544     }
545     UserOperation::EditSite => {
546       let edit_site: EditSite = serde_json::from_str(data)?;
547       let res = Oper::new(user_operation, edit_site).perform(&conn)?;
548       Ok(serde_json::to_string(&res)?)
549     }
550     UserOperation::GetSite => {
551       let online: usize = chat.sessions.len();
552       let get_site: GetSite = serde_json::from_str(data)?;
553       let mut res = Oper::new(user_operation, get_site).perform(&conn)?;
554       res.online = online;
555       Ok(serde_json::to_string(&res)?)
556     }
557     UserOperation::Search => {
558       let search: Search = serde_json::from_str(data)?;
559       let res = Oper::new(user_operation, search).perform(&conn)?;
560       Ok(serde_json::to_string(&res)?)
561     }
562     UserOperation::TransferCommunity => {
563       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
564       let res = Oper::new(user_operation, transfer_community).perform(&conn)?;
565       Ok(serde_json::to_string(&res)?)
566     }
567     UserOperation::TransferSite => {
568       let transfer_site: TransferSite = serde_json::from_str(data)?;
569       let res = Oper::new(user_operation, transfer_site).perform(&conn)?;
570       Ok(serde_json::to_string(&res)?)
571     }
572     UserOperation::DeleteAccount => {
573       let delete_account: DeleteAccount = serde_json::from_str(data)?;
574       let res = Oper::new(user_operation, delete_account).perform(&conn)?;
575       Ok(serde_json::to_string(&res)?)
576     }
577     UserOperation::PasswordReset => {
578       let password_reset: PasswordReset = serde_json::from_str(data)?;
579       let res = Oper::new(user_operation, password_reset).perform(&conn)?;
580       Ok(serde_json::to_string(&res)?)
581     }
582     UserOperation::PasswordChange => {
583       let password_change: PasswordChange = serde_json::from_str(data)?;
584       let res = Oper::new(user_operation, password_change).perform(&conn)?;
585       Ok(serde_json::to_string(&res)?)
586     }
587   }
588 }