]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Use a dedicated structure in order to search posts
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20 use crate::Settings;
21
22 /// Chat server sends this messages to session
23 #[derive(Message)]
24 pub struct WSMessage(pub String);
25
26 /// Message for chat server communications
27
28 /// New chat session is created
29 #[derive(Message)]
30 #[rtype(usize)]
31 pub struct Connect {
32   pub addr: Recipient<WSMessage>,
33   pub ip: String,
34 }
35
36 /// Session is disconnected
37 #[derive(Message)]
38 pub struct Disconnect {
39   pub id: usize,
40   pub ip: String,
41 }
42
43 /// Send message to specific room
44 #[derive(Message)]
45 pub struct ClientMessage {
46   /// Id of the client session
47   pub id: usize,
48   /// Peer message
49   pub msg: String,
50   /// Room name
51   pub room: String,
52 }
53
54 #[derive(Serialize, Deserialize)]
55 pub struct StandardMessage {
56   /// Id of the client session
57   pub id: usize,
58   /// Peer message
59   pub msg: String,
60 }
61
62 impl actix::Message for StandardMessage {
63   type Result = String;
64 }
65
66 #[derive(Debug)]
67 pub struct RateLimitBucket {
68   last_checked: SystemTime,
69   allowance: f64,
70 }
71
72 pub struct SessionInfo {
73   pub addr: Recipient<WSMessage>,
74   pub ip: String,
75 }
76
77 /// `ChatServer` manages chat rooms and responsible for coordinating chat
78 /// session. implementation is super primitive
79 pub struct ChatServer {
80   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
81   rate_limits: HashMap<String, RateLimitBucket>,
82   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
83   rng: ThreadRng,
84 }
85
86 impl Default for ChatServer {
87   fn default() -> ChatServer {
88     // default room
89     let rooms = HashMap::new();
90
91     ChatServer {
92       sessions: HashMap::new(),
93       rate_limits: HashMap::new(),
94       rooms: rooms,
95       rng: rand::thread_rng(),
96     }
97   }
98 }
99
100 impl ChatServer {
101   /// Send message to all users in the room
102   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
103     if let Some(sessions) = self.rooms.get(room) {
104       for id in sessions {
105         if *id != skip_id {
106           if let Some(info) = self.sessions.get(id) {
107             let _ = info.addr.do_send(WSMessage(message.to_owned()));
108           }
109         }
110       }
111     }
112   }
113
114   fn join_room(&mut self, room_id: i32, id: usize) {
115     // remove session from all rooms
116     for (_n, sessions) in &mut self.rooms {
117       sessions.remove(&id);
118     }
119
120     // If the room doesn't exist yet
121     if self.rooms.get_mut(&room_id).is_none() {
122       self.rooms.insert(room_id, HashSet::new());
123     }
124
125     &self.rooms.get_mut(&room_id).unwrap().insert(id);
126   }
127
128   fn send_community_message(
129     &self,
130     community_id: &i32,
131     message: &str,
132     skip_id: usize,
133   ) -> Result<(), Error> {
134     use crate::db::post_view::*;
135     use crate::db::*;
136     let conn = establish_connection();
137
138     let posts = PostViewQuery::create(
139       &conn,
140       ListingType::Community,
141       &SortType::New,
142       false,
143       false,
144       false,
145     )
146     .for_community_id(*community_id)
147     .limit(9999)
148     .list()?;
149
150     for post in posts {
151       self.send_room_message(&post.id, message, skip_id);
152     }
153
154     Ok(())
155   }
156
157   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
158     self.check_rate_limit_full(
159       id,
160       Settings::get().rate_limit_register,
161       Settings::get().rate_limit_register_per_second,
162     )
163   }
164
165   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
166     self.check_rate_limit_full(
167       id,
168       Settings::get().rate_limit_post,
169       Settings::get().rate_limit_post_per_second,
170     )
171   }
172
173   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
174     self.check_rate_limit_full(
175       id,
176       Settings::get().rate_limit_message,
177       Settings::get().rate_limit_message_per_second,
178     )
179   }
180
181   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
182     if let Some(info) = self.sessions.get(&id) {
183       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
184         // The initial value
185         if rate_limit.allowance == -2f64 {
186           rate_limit.allowance = rate as f64;
187         };
188
189         let current = SystemTime::now();
190         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
191         rate_limit.last_checked = current;
192         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
193         if rate_limit.allowance > rate as f64 {
194           rate_limit.allowance = rate as f64;
195         }
196
197         if rate_limit.allowance < 1.0 {
198           println!(
199             "Rate limited IP: {}, time_passed: {}, allowance: {}",
200             &info.ip, time_passed, rate_limit.allowance
201           );
202           Err(APIError {
203             op: "Rate Limit".to_string(),
204             message: format!("Too many requests. {} per {} seconds", rate, per),
205           })?
206         } else {
207           rate_limit.allowance -= 1.0;
208           Ok(())
209         }
210       } else {
211         Ok(())
212       }
213     } else {
214       Ok(())
215     }
216   }
217 }
218
219 /// Make actor from `ChatServer`
220 impl Actor for ChatServer {
221   /// We are going to use simple Context, we just need ability to communicate
222   /// with other actors.
223   type Context = Context<Self>;
224 }
225
226 /// Handler for Connect message.
227 ///
228 /// Register new session and assign unique id to this session
229 impl Handler<Connect> for ChatServer {
230   type Result = usize;
231
232   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
233     // notify all users in same room
234     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
235
236     // register session with random id
237     let id = self.rng.gen::<usize>();
238     println!("{} joined", &msg.ip);
239
240     self.sessions.insert(
241       id,
242       SessionInfo {
243         addr: msg.addr,
244         ip: msg.ip.to_owned(),
245       },
246     );
247
248     if self.rate_limits.get(&msg.ip).is_none() {
249       self.rate_limits.insert(
250         msg.ip,
251         RateLimitBucket {
252           last_checked: SystemTime::now(),
253           allowance: -2f64,
254         },
255       );
256     }
257
258     id
259   }
260 }
261
262 /// Handler for Disconnect message.
263 impl Handler<Disconnect> for ChatServer {
264   type Result = ();
265
266   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
267     // let mut rooms: Vec<i32> = Vec::new();
268
269     // remove address
270     if self.sessions.remove(&msg.id).is_some() {
271       // remove session from all rooms
272       for (_id, sessions) in &mut self.rooms {
273         if sessions.remove(&msg.id) {
274           // rooms.push(*id);
275         }
276       }
277     }
278   }
279 }
280
281 /// Handler for Message message.
282 impl Handler<StandardMessage> for ChatServer {
283   type Result = MessageResult<StandardMessage>;
284
285   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
286     let msg_out = match parse_json_message(self, msg) {
287       Ok(m) => m,
288       Err(e) => e.to_string(),
289     };
290
291     MessageResult(msg_out)
292   }
293 }
294
295 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
296   let json: Value = serde_json::from_str(&msg.msg)?;
297   let data = &json["data"].to_string();
298   let op = &json["op"].as_str().ok_or(APIError {
299     op: "Unknown op type".to_string(),
300     message: format!("Unknown op type"),
301   })?;
302
303   let user_operation: UserOperation = UserOperation::from_str(&op)?;
304
305   match user_operation {
306     UserOperation::Login => {
307       let login: Login = serde_json::from_str(data)?;
308       let res = Oper::new(user_operation, login).perform()?;
309       Ok(serde_json::to_string(&res)?)
310     }
311     UserOperation::Register => {
312       let register: Register = serde_json::from_str(data)?;
313       let res = Oper::new(user_operation, register).perform();
314       if res.is_ok() {
315         chat.check_rate_limit_register(msg.id)?;
316       }
317       Ok(serde_json::to_string(&res?)?)
318     }
319     UserOperation::GetUserDetails => {
320       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
321       let res = Oper::new(user_operation, get_user_details).perform()?;
322       Ok(serde_json::to_string(&res)?)
323     }
324     UserOperation::SaveUserSettings => {
325       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
326       let res = Oper::new(user_operation, save_user_settings).perform()?;
327       Ok(serde_json::to_string(&res)?)
328     }
329     UserOperation::AddAdmin => {
330       let add_admin: AddAdmin = serde_json::from_str(data)?;
331       let res = Oper::new(user_operation, add_admin).perform()?;
332       Ok(serde_json::to_string(&res)?)
333     }
334     UserOperation::BanUser => {
335       let ban_user: BanUser = serde_json::from_str(data)?;
336       let res = Oper::new(user_operation, ban_user).perform()?;
337       Ok(serde_json::to_string(&res)?)
338     }
339     UserOperation::GetReplies => {
340       let get_replies: GetReplies = serde_json::from_str(data)?;
341       let res = Oper::new(user_operation, get_replies).perform()?;
342       Ok(serde_json::to_string(&res)?)
343     }
344     UserOperation::GetUserMentions => {
345       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
346       let res = Oper::new(user_operation, get_user_mentions).perform()?;
347       Ok(serde_json::to_string(&res)?)
348     }
349     UserOperation::EditUserMention => {
350       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
351       let res = Oper::new(user_operation, edit_user_mention).perform()?;
352       Ok(serde_json::to_string(&res)?)
353     }
354     UserOperation::MarkAllAsRead => {
355       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
356       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
357       Ok(serde_json::to_string(&res)?)
358     }
359     UserOperation::GetCommunity => {
360       let get_community: GetCommunity = serde_json::from_str(data)?;
361       let res = Oper::new(user_operation, get_community).perform()?;
362       Ok(serde_json::to_string(&res)?)
363     }
364     UserOperation::ListCommunities => {
365       let list_communities: ListCommunities = serde_json::from_str(data)?;
366       let res = Oper::new(user_operation, list_communities).perform()?;
367       Ok(serde_json::to_string(&res)?)
368     }
369     UserOperation::CreateCommunity => {
370       chat.check_rate_limit_register(msg.id)?;
371       let create_community: CreateCommunity = serde_json::from_str(data)?;
372       let res = Oper::new(user_operation, create_community).perform()?;
373       Ok(serde_json::to_string(&res)?)
374     }
375     UserOperation::EditCommunity => {
376       let edit_community: EditCommunity = serde_json::from_str(data)?;
377       let res = Oper::new(user_operation, edit_community).perform()?;
378       let mut community_sent: CommunityResponse = res.clone();
379       community_sent.community.user_id = None;
380       community_sent.community.subscribed = None;
381       let community_sent_str = serde_json::to_string(&community_sent)?;
382       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
383       Ok(serde_json::to_string(&res)?)
384     }
385     UserOperation::FollowCommunity => {
386       let follow_community: FollowCommunity = serde_json::from_str(data)?;
387       let res = Oper::new(user_operation, follow_community).perform()?;
388       Ok(serde_json::to_string(&res)?)
389     }
390     UserOperation::GetFollowedCommunities => {
391       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
392       let res = Oper::new(user_operation, followed_communities).perform()?;
393       Ok(serde_json::to_string(&res)?)
394     }
395     UserOperation::BanFromCommunity => {
396       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
397       let community_id = ban_from_community.community_id;
398       let res = Oper::new(user_operation, ban_from_community).perform()?;
399       let res_str = serde_json::to_string(&res)?;
400       chat.send_community_message(&community_id, &res_str, msg.id)?;
401       Ok(res_str)
402     }
403     UserOperation::AddModToCommunity => {
404       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
405       let community_id = mod_add_to_community.community_id;
406       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
407       let res_str = serde_json::to_string(&res)?;
408       chat.send_community_message(&community_id, &res_str, msg.id)?;
409       Ok(res_str)
410     }
411     UserOperation::ListCategories => {
412       let list_categories: ListCategories = ListCategories;
413       let res = Oper::new(user_operation, list_categories).perform()?;
414       Ok(serde_json::to_string(&res)?)
415     }
416     UserOperation::CreatePost => {
417       chat.check_rate_limit_post(msg.id)?;
418       let create_post: CreatePost = serde_json::from_str(data)?;
419       let res = Oper::new(user_operation, create_post).perform()?;
420       Ok(serde_json::to_string(&res)?)
421     }
422     UserOperation::GetPost => {
423       let get_post: GetPost = serde_json::from_str(data)?;
424       chat.join_room(get_post.id, msg.id);
425       let res = Oper::new(user_operation, get_post).perform()?;
426       Ok(serde_json::to_string(&res)?)
427     }
428     UserOperation::GetPosts => {
429       let get_posts: GetPosts = serde_json::from_str(data)?;
430       let res = Oper::new(user_operation, get_posts).perform()?;
431       Ok(serde_json::to_string(&res)?)
432     }
433     UserOperation::CreatePostLike => {
434       chat.check_rate_limit_message(msg.id)?;
435       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
436       let res = Oper::new(user_operation, create_post_like).perform()?;
437       Ok(serde_json::to_string(&res)?)
438     }
439     UserOperation::EditPost => {
440       let edit_post: EditPost = serde_json::from_str(data)?;
441       let res = Oper::new(user_operation, edit_post).perform()?;
442       let mut post_sent = res.clone();
443       post_sent.post.my_vote = None;
444       let post_sent_str = serde_json::to_string(&post_sent)?;
445       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
446       Ok(serde_json::to_string(&res)?)
447     }
448     UserOperation::SavePost => {
449       let save_post: SavePost = serde_json::from_str(data)?;
450       let res = Oper::new(user_operation, save_post).perform()?;
451       Ok(serde_json::to_string(&res)?)
452     }
453     UserOperation::CreateComment => {
454       chat.check_rate_limit_message(msg.id)?;
455       let create_comment: CreateComment = serde_json::from_str(data)?;
456       let post_id = create_comment.post_id;
457       let res = Oper::new(user_operation, create_comment).perform()?;
458       let mut comment_sent = res.clone();
459       comment_sent.comment.my_vote = None;
460       comment_sent.comment.user_id = None;
461       let comment_sent_str = serde_json::to_string(&comment_sent)?;
462       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
463       Ok(serde_json::to_string(&res)?)
464     }
465     UserOperation::EditComment => {
466       let edit_comment: EditComment = serde_json::from_str(data)?;
467       let post_id = edit_comment.post_id;
468       let res = Oper::new(user_operation, edit_comment).perform()?;
469       let mut comment_sent = res.clone();
470       comment_sent.comment.my_vote = None;
471       comment_sent.comment.user_id = None;
472       let comment_sent_str = serde_json::to_string(&comment_sent)?;
473       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
474       Ok(serde_json::to_string(&res)?)
475     }
476     UserOperation::SaveComment => {
477       let save_comment: SaveComment = serde_json::from_str(data)?;
478       let res = Oper::new(user_operation, save_comment).perform()?;
479       Ok(serde_json::to_string(&res)?)
480     }
481     UserOperation::CreateCommentLike => {
482       chat.check_rate_limit_message(msg.id)?;
483       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
484       let post_id = create_comment_like.post_id;
485       let res = Oper::new(user_operation, create_comment_like).perform()?;
486       let mut comment_sent = res.clone();
487       comment_sent.comment.my_vote = None;
488       comment_sent.comment.user_id = None;
489       let comment_sent_str = serde_json::to_string(&comment_sent)?;
490       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
491       Ok(serde_json::to_string(&res)?)
492     }
493     UserOperation::GetModlog => {
494       let get_modlog: GetModlog = serde_json::from_str(data)?;
495       let res = Oper::new(user_operation, get_modlog).perform()?;
496       Ok(serde_json::to_string(&res)?)
497     }
498     UserOperation::CreateSite => {
499       let create_site: CreateSite = serde_json::from_str(data)?;
500       let res = Oper::new(user_operation, create_site).perform()?;
501       Ok(serde_json::to_string(&res)?)
502     }
503     UserOperation::EditSite => {
504       let edit_site: EditSite = serde_json::from_str(data)?;
505       let res = Oper::new(user_operation, edit_site).perform()?;
506       Ok(serde_json::to_string(&res)?)
507     }
508     UserOperation::GetSite => {
509       let online: usize = chat.sessions.len();
510       let get_site: GetSite = serde_json::from_str(data)?;
511       let mut res = Oper::new(user_operation, get_site).perform()?;
512       res.online = online;
513       Ok(serde_json::to_string(&res)?)
514     }
515     UserOperation::Search => {
516       let search: Search = serde_json::from_str(data)?;
517       let res = Oper::new(user_operation, search).perform()?;
518       Ok(serde_json::to_string(&res)?)
519     }
520     UserOperation::TransferCommunity => {
521       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
522       let res = Oper::new(user_operation, transfer_community).perform()?;
523       Ok(serde_json::to_string(&res)?)
524     }
525     UserOperation::TransferSite => {
526       let transfer_site: TransferSite = serde_json::from_str(data)?;
527       let res = Oper::new(user_operation, transfer_site).perform()?;
528       Ok(serde_json::to_string(&res)?)
529     }
530     UserOperation::DeleteAccount => {
531       let delete_account: DeleteAccount = serde_json::from_str(data)?;
532       let res = Oper::new(user_operation, delete_account).perform()?;
533       Ok(serde_json::to_string(&res)?)
534     }
535     UserOperation::PasswordReset => {
536       let password_reset: PasswordReset = serde_json::from_str(data)?;
537       let res = Oper::new(user_operation, password_reset).perform()?;
538       Ok(serde_json::to_string(&res)?)
539     }
540     UserOperation::PasswordChange => {
541       let password_change: PasswordChange = serde_json::from_str(data)?;
542       let res = Oper::new(user_operation, password_change).perform()?;
543       Ok(serde_json::to_string(&res)?)
544     }
545   }
546 }