]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'teromene-config_dif_addr' into config_dif_addr_merge
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20 use crate::Settings;
21
22 /// Chat server sends this messages to session
23 #[derive(Message)]
24 pub struct WSMessage(pub String);
25
26 /// Message for chat server communications
27
28 /// New chat session is created
29 #[derive(Message)]
30 #[rtype(usize)]
31 pub struct Connect {
32   pub addr: Recipient<WSMessage>,
33   pub ip: String,
34 }
35
36 /// Session is disconnected
37 #[derive(Message)]
38 pub struct Disconnect {
39   pub id: usize,
40   pub ip: String,
41 }
42
43 /// Send message to specific room
44 #[derive(Message)]
45 pub struct ClientMessage {
46   /// Id of the client session
47   pub id: usize,
48   /// Peer message
49   pub msg: String,
50   /// Room name
51   pub room: String,
52 }
53
54 #[derive(Serialize, Deserialize)]
55 pub struct StandardMessage {
56   /// Id of the client session
57   pub id: usize,
58   /// Peer message
59   pub msg: String,
60 }
61
62 impl actix::Message for StandardMessage {
63   type Result = String;
64 }
65
66 #[derive(Debug)]
67 pub struct RateLimitBucket {
68   last_checked: SystemTime,
69   allowance: f64,
70 }
71
72 pub struct SessionInfo {
73   pub addr: Recipient<WSMessage>,
74   pub ip: String,
75 }
76
77 /// `ChatServer` manages chat rooms and responsible for coordinating chat
78 /// session. implementation is super primitive
79 pub struct ChatServer {
80   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
81   rate_limits: HashMap<String, RateLimitBucket>,
82   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
83   rng: ThreadRng,
84 }
85
86 impl Default for ChatServer {
87   fn default() -> ChatServer {
88     // default room
89     let rooms = HashMap::new();
90
91     ChatServer {
92       sessions: HashMap::new(),
93       rate_limits: HashMap::new(),
94       rooms: rooms,
95       rng: rand::thread_rng(),
96     }
97   }
98 }
99
100 impl ChatServer {
101   /// Send message to all users in the room
102   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
103     if let Some(sessions) = self.rooms.get(room) {
104       for id in sessions {
105         if *id != skip_id {
106           if let Some(info) = self.sessions.get(id) {
107             let _ = info.addr.do_send(WSMessage(message.to_owned()));
108           }
109         }
110       }
111     }
112   }
113
114   fn join_room(&mut self, room_id: i32, id: usize) {
115     // remove session from all rooms
116     for (_n, sessions) in &mut self.rooms {
117       sessions.remove(&id);
118     }
119
120     // If the room doesn't exist yet
121     if self.rooms.get_mut(&room_id).is_none() {
122       self.rooms.insert(room_id, HashSet::new());
123     }
124
125     &self.rooms.get_mut(&room_id).unwrap().insert(id);
126   }
127
128   fn send_community_message(
129     &self,
130     community_id: &i32,
131     message: &str,
132     skip_id: usize,
133   ) -> Result<(), Error> {
134     use crate::db::post_view::*;
135     use crate::db::*;
136     let conn = establish_connection();
137
138     let posts = PostQueryBuilder::create(&conn)
139       .listing_type(ListingType::Community)
140       .sort(&SortType::New)
141       .for_community_id(*community_id)
142       .limit(9999)
143       .list()?;
144
145     for post in posts {
146       self.send_room_message(&post.id, message, skip_id);
147     }
148
149     Ok(())
150   }
151
152   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
153     self.check_rate_limit_full(
154       id,
155       Settings::get().rate_limit_register,
156       Settings::get().rate_limit_register_per_second,
157     )
158   }
159
160   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
161     self.check_rate_limit_full(
162       id,
163       Settings::get().rate_limit_post,
164       Settings::get().rate_limit_post_per_second,
165     )
166   }
167
168   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
169     self.check_rate_limit_full(
170       id,
171       Settings::get().rate_limit_message,
172       Settings::get().rate_limit_message_per_second,
173     )
174   }
175
176   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
177     if let Some(info) = self.sessions.get(&id) {
178       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
179         // The initial value
180         if rate_limit.allowance == -2f64 {
181           rate_limit.allowance = rate as f64;
182         };
183
184         let current = SystemTime::now();
185         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
186         rate_limit.last_checked = current;
187         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
188         if rate_limit.allowance > rate as f64 {
189           rate_limit.allowance = rate as f64;
190         }
191
192         if rate_limit.allowance < 1.0 {
193           println!(
194             "Rate limited IP: {}, time_passed: {}, allowance: {}",
195             &info.ip, time_passed, rate_limit.allowance
196           );
197           Err(APIError {
198             op: "Rate Limit".to_string(),
199             message: format!("Too many requests. {} per {} seconds", rate, per),
200           })?
201         } else {
202           rate_limit.allowance -= 1.0;
203           Ok(())
204         }
205       } else {
206         Ok(())
207       }
208     } else {
209       Ok(())
210     }
211   }
212 }
213
214 /// Make actor from `ChatServer`
215 impl Actor for ChatServer {
216   /// We are going to use simple Context, we just need ability to communicate
217   /// with other actors.
218   type Context = Context<Self>;
219 }
220
221 /// Handler for Connect message.
222 ///
223 /// Register new session and assign unique id to this session
224 impl Handler<Connect> for ChatServer {
225   type Result = usize;
226
227   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
228     // notify all users in same room
229     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
230
231     // register session with random id
232     let id = self.rng.gen::<usize>();
233     println!("{} joined", &msg.ip);
234
235     self.sessions.insert(
236       id,
237       SessionInfo {
238         addr: msg.addr,
239         ip: msg.ip.to_owned(),
240       },
241     );
242
243     if self.rate_limits.get(&msg.ip).is_none() {
244       self.rate_limits.insert(
245         msg.ip,
246         RateLimitBucket {
247           last_checked: SystemTime::now(),
248           allowance: -2f64,
249         },
250       );
251     }
252
253     id
254   }
255 }
256
257 /// Handler for Disconnect message.
258 impl Handler<Disconnect> for ChatServer {
259   type Result = ();
260
261   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
262     // let mut rooms: Vec<i32> = Vec::new();
263
264     // remove address
265     if self.sessions.remove(&msg.id).is_some() {
266       // remove session from all rooms
267       for (_id, sessions) in &mut self.rooms {
268         if sessions.remove(&msg.id) {
269           // rooms.push(*id);
270         }
271       }
272     }
273   }
274 }
275
276 /// Handler for Message message.
277 impl Handler<StandardMessage> for ChatServer {
278   type Result = MessageResult<StandardMessage>;
279
280   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
281     let msg_out = match parse_json_message(self, msg) {
282       Ok(m) => m,
283       Err(e) => e.to_string(),
284     };
285
286     MessageResult(msg_out)
287   }
288 }
289
290 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
291   let json: Value = serde_json::from_str(&msg.msg)?;
292   let data = &json["data"].to_string();
293   let op = &json["op"].as_str().ok_or(APIError {
294     op: "Unknown op type".to_string(),
295     message: format!("Unknown op type"),
296   })?;
297
298   let user_operation: UserOperation = UserOperation::from_str(&op)?;
299
300   match user_operation {
301     UserOperation::Login => {
302       let login: Login = serde_json::from_str(data)?;
303       let res = Oper::new(user_operation, login).perform()?;
304       Ok(serde_json::to_string(&res)?)
305     }
306     UserOperation::Register => {
307       let register: Register = serde_json::from_str(data)?;
308       let res = Oper::new(user_operation, register).perform();
309       if res.is_ok() {
310         chat.check_rate_limit_register(msg.id)?;
311       }
312       Ok(serde_json::to_string(&res?)?)
313     }
314     UserOperation::GetUserDetails => {
315       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
316       let res = Oper::new(user_operation, get_user_details).perform()?;
317       Ok(serde_json::to_string(&res)?)
318     }
319     UserOperation::SaveUserSettings => {
320       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
321       let res = Oper::new(user_operation, save_user_settings).perform()?;
322       Ok(serde_json::to_string(&res)?)
323     }
324     UserOperation::AddAdmin => {
325       let add_admin: AddAdmin = serde_json::from_str(data)?;
326       let res = Oper::new(user_operation, add_admin).perform()?;
327       Ok(serde_json::to_string(&res)?)
328     }
329     UserOperation::BanUser => {
330       let ban_user: BanUser = serde_json::from_str(data)?;
331       let res = Oper::new(user_operation, ban_user).perform()?;
332       Ok(serde_json::to_string(&res)?)
333     }
334     UserOperation::GetReplies => {
335       let get_replies: GetReplies = serde_json::from_str(data)?;
336       let res = Oper::new(user_operation, get_replies).perform()?;
337       Ok(serde_json::to_string(&res)?)
338     }
339     UserOperation::GetUserMentions => {
340       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
341       let res = Oper::new(user_operation, get_user_mentions).perform()?;
342       Ok(serde_json::to_string(&res)?)
343     }
344     UserOperation::EditUserMention => {
345       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
346       let res = Oper::new(user_operation, edit_user_mention).perform()?;
347       Ok(serde_json::to_string(&res)?)
348     }
349     UserOperation::MarkAllAsRead => {
350       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
351       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
352       Ok(serde_json::to_string(&res)?)
353     }
354     UserOperation::GetCommunity => {
355       let get_community: GetCommunity = serde_json::from_str(data)?;
356       let res = Oper::new(user_operation, get_community).perform()?;
357       Ok(serde_json::to_string(&res)?)
358     }
359     UserOperation::ListCommunities => {
360       let list_communities: ListCommunities = serde_json::from_str(data)?;
361       let res = Oper::new(user_operation, list_communities).perform()?;
362       Ok(serde_json::to_string(&res)?)
363     }
364     UserOperation::CreateCommunity => {
365       chat.check_rate_limit_register(msg.id)?;
366       let create_community: CreateCommunity = serde_json::from_str(data)?;
367       let res = Oper::new(user_operation, create_community).perform()?;
368       Ok(serde_json::to_string(&res)?)
369     }
370     UserOperation::EditCommunity => {
371       let edit_community: EditCommunity = serde_json::from_str(data)?;
372       let res = Oper::new(user_operation, edit_community).perform()?;
373       let mut community_sent: CommunityResponse = res.clone();
374       community_sent.community.user_id = None;
375       community_sent.community.subscribed = None;
376       let community_sent_str = serde_json::to_string(&community_sent)?;
377       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
378       Ok(serde_json::to_string(&res)?)
379     }
380     UserOperation::FollowCommunity => {
381       let follow_community: FollowCommunity = serde_json::from_str(data)?;
382       let res = Oper::new(user_operation, follow_community).perform()?;
383       Ok(serde_json::to_string(&res)?)
384     }
385     UserOperation::GetFollowedCommunities => {
386       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
387       let res = Oper::new(user_operation, followed_communities).perform()?;
388       Ok(serde_json::to_string(&res)?)
389     }
390     UserOperation::BanFromCommunity => {
391       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
392       let community_id = ban_from_community.community_id;
393       let res = Oper::new(user_operation, ban_from_community).perform()?;
394       let res_str = serde_json::to_string(&res)?;
395       chat.send_community_message(&community_id, &res_str, msg.id)?;
396       Ok(res_str)
397     }
398     UserOperation::AddModToCommunity => {
399       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
400       let community_id = mod_add_to_community.community_id;
401       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
402       let res_str = serde_json::to_string(&res)?;
403       chat.send_community_message(&community_id, &res_str, msg.id)?;
404       Ok(res_str)
405     }
406     UserOperation::ListCategories => {
407       let list_categories: ListCategories = ListCategories;
408       let res = Oper::new(user_operation, list_categories).perform()?;
409       Ok(serde_json::to_string(&res)?)
410     }
411     UserOperation::CreatePost => {
412       chat.check_rate_limit_post(msg.id)?;
413       let create_post: CreatePost = serde_json::from_str(data)?;
414       let res = Oper::new(user_operation, create_post).perform()?;
415       Ok(serde_json::to_string(&res)?)
416     }
417     UserOperation::GetPost => {
418       let get_post: GetPost = serde_json::from_str(data)?;
419       chat.join_room(get_post.id, msg.id);
420       let res = Oper::new(user_operation, get_post).perform()?;
421       Ok(serde_json::to_string(&res)?)
422     }
423     UserOperation::GetPosts => {
424       let get_posts: GetPosts = serde_json::from_str(data)?;
425       let res = Oper::new(user_operation, get_posts).perform()?;
426       Ok(serde_json::to_string(&res)?)
427     }
428     UserOperation::CreatePostLike => {
429       chat.check_rate_limit_message(msg.id)?;
430       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
431       let res = Oper::new(user_operation, create_post_like).perform()?;
432       Ok(serde_json::to_string(&res)?)
433     }
434     UserOperation::EditPost => {
435       let edit_post: EditPost = serde_json::from_str(data)?;
436       let res = Oper::new(user_operation, edit_post).perform()?;
437       let mut post_sent = res.clone();
438       post_sent.post.my_vote = None;
439       let post_sent_str = serde_json::to_string(&post_sent)?;
440       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
441       Ok(serde_json::to_string(&res)?)
442     }
443     UserOperation::SavePost => {
444       let save_post: SavePost = serde_json::from_str(data)?;
445       let res = Oper::new(user_operation, save_post).perform()?;
446       Ok(serde_json::to_string(&res)?)
447     }
448     UserOperation::CreateComment => {
449       chat.check_rate_limit_message(msg.id)?;
450       let create_comment: CreateComment = serde_json::from_str(data)?;
451       let post_id = create_comment.post_id;
452       let res = Oper::new(user_operation, create_comment).perform()?;
453       let mut comment_sent = res.clone();
454       comment_sent.comment.my_vote = None;
455       comment_sent.comment.user_id = None;
456       let comment_sent_str = serde_json::to_string(&comment_sent)?;
457       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
458       Ok(serde_json::to_string(&res)?)
459     }
460     UserOperation::EditComment => {
461       let edit_comment: EditComment = serde_json::from_str(data)?;
462       let post_id = edit_comment.post_id;
463       let res = Oper::new(user_operation, edit_comment).perform()?;
464       let mut comment_sent = res.clone();
465       comment_sent.comment.my_vote = None;
466       comment_sent.comment.user_id = None;
467       let comment_sent_str = serde_json::to_string(&comment_sent)?;
468       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
469       Ok(serde_json::to_string(&res)?)
470     }
471     UserOperation::SaveComment => {
472       let save_comment: SaveComment = serde_json::from_str(data)?;
473       let res = Oper::new(user_operation, save_comment).perform()?;
474       Ok(serde_json::to_string(&res)?)
475     }
476     UserOperation::CreateCommentLike => {
477       chat.check_rate_limit_message(msg.id)?;
478       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
479       let post_id = create_comment_like.post_id;
480       let res = Oper::new(user_operation, create_comment_like).perform()?;
481       let mut comment_sent = res.clone();
482       comment_sent.comment.my_vote = None;
483       comment_sent.comment.user_id = None;
484       let comment_sent_str = serde_json::to_string(&comment_sent)?;
485       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
486       Ok(serde_json::to_string(&res)?)
487     }
488     UserOperation::GetModlog => {
489       let get_modlog: GetModlog = serde_json::from_str(data)?;
490       let res = Oper::new(user_operation, get_modlog).perform()?;
491       Ok(serde_json::to_string(&res)?)
492     }
493     UserOperation::CreateSite => {
494       let create_site: CreateSite = serde_json::from_str(data)?;
495       let res = Oper::new(user_operation, create_site).perform()?;
496       Ok(serde_json::to_string(&res)?)
497     }
498     UserOperation::EditSite => {
499       let edit_site: EditSite = serde_json::from_str(data)?;
500       let res = Oper::new(user_operation, edit_site).perform()?;
501       Ok(serde_json::to_string(&res)?)
502     }
503     UserOperation::GetSite => {
504       let online: usize = chat.sessions.len();
505       let get_site: GetSite = serde_json::from_str(data)?;
506       let mut res = Oper::new(user_operation, get_site).perform()?;
507       res.online = online;
508       Ok(serde_json::to_string(&res)?)
509     }
510     UserOperation::Search => {
511       let search: Search = serde_json::from_str(data)?;
512       let res = Oper::new(user_operation, search).perform()?;
513       Ok(serde_json::to_string(&res)?)
514     }
515     UserOperation::TransferCommunity => {
516       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
517       let res = Oper::new(user_operation, transfer_community).perform()?;
518       Ok(serde_json::to_string(&res)?)
519     }
520     UserOperation::TransferSite => {
521       let transfer_site: TransferSite = serde_json::from_str(data)?;
522       let res = Oper::new(user_operation, transfer_site).perform()?;
523       Ok(serde_json::to_string(&res)?)
524     }
525     UserOperation::DeleteAccount => {
526       let delete_account: DeleteAccount = serde_json::from_str(data)?;
527       let res = Oper::new(user_operation, delete_account).perform()?;
528       Ok(serde_json::to_string(&res)?)
529     }
530     UserOperation::PasswordReset => {
531       let password_reset: PasswordReset = serde_json::from_str(data)?;
532       let res = Oper::new(user_operation, password_reset).perform()?;
533       Ok(serde_json::to_string(&res)?)
534     }
535     UserOperation::PasswordChange => {
536       let password_change: PasswordChange = serde_json::from_str(data)?;
537       let res = Oper::new(user_operation, password_change).perform()?;
538       Ok(serde_json::to_string(&res)?)
539     }
540   }
541 }