]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'master' into federation
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20 use crate::apub::puller::*;
21 use crate::Settings;
22
23 /// Chat server sends this messages to session
24 #[derive(Message)]
25 pub struct WSMessage(pub String);
26
27 /// Message for chat server communications
28
29 /// New chat session is created
30 #[derive(Message)]
31 #[rtype(usize)]
32 pub struct Connect {
33   pub addr: Recipient<WSMessage>,
34   pub ip: String,
35 }
36
37 /// Session is disconnected
38 #[derive(Message)]
39 pub struct Disconnect {
40   pub id: usize,
41   pub ip: String,
42 }
43
44 /// Send message to specific room
45 #[derive(Message)]
46 pub struct ClientMessage {
47   /// Id of the client session
48   pub id: usize,
49   /// Peer message
50   pub msg: String,
51   /// Room name
52   pub room: String,
53 }
54
55 #[derive(Serialize, Deserialize)]
56 pub struct StandardMessage {
57   /// Id of the client session
58   pub id: usize,
59   /// Peer message
60   pub msg: String,
61 }
62
63 impl actix::Message for StandardMessage {
64   type Result = String;
65 }
66
67 #[derive(Debug)]
68 pub struct RateLimitBucket {
69   last_checked: SystemTime,
70   allowance: f64,
71 }
72
73 pub struct SessionInfo {
74   pub addr: Recipient<WSMessage>,
75   pub ip: String,
76 }
77
78 /// `ChatServer` manages chat rooms and responsible for coordinating chat
79 /// session. implementation is super primitive
80 pub struct ChatServer {
81   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
82   rate_limits: HashMap<String, RateLimitBucket>,
83   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
84   rng: ThreadRng,
85 }
86
87 impl Default for ChatServer {
88   fn default() -> ChatServer {
89     // default room
90     let rooms = HashMap::new();
91
92     ChatServer {
93       sessions: HashMap::new(),
94       rate_limits: HashMap::new(),
95       rooms,
96       rng: rand::thread_rng(),
97     }
98   }
99 }
100
101 impl ChatServer {
102   /// Send message to all users in the room
103   fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
104     if let Some(sessions) = self.rooms.get(&room) {
105       for id in sessions {
106         if *id != skip_id {
107           if let Some(info) = self.sessions.get(id) {
108             let _ = info.addr.do_send(WSMessage(message.to_owned()));
109           }
110         }
111       }
112     }
113   }
114
115   fn join_room(&mut self, room_id: i32, id: usize) {
116     // remove session from all rooms
117     for sessions in self.rooms.values_mut() {
118       sessions.remove(&id);
119     }
120
121     // If the room doesn't exist yet
122     if self.rooms.get_mut(&room_id).is_none() {
123       self.rooms.insert(room_id, HashSet::new());
124     }
125
126     self.rooms.get_mut(&room_id).unwrap().insert(id);
127   }
128
129   fn send_community_message(
130     &self,
131     community_id: i32,
132     message: &str,
133     skip_id: usize,
134   ) -> Result<(), Error> {
135     use crate::db::post_view::*;
136     use crate::db::*;
137     let conn = establish_connection();
138
139     let posts = PostQueryBuilder::create(&conn)
140       .listing_type(ListingType::Community)
141       .sort(&SortType::New)
142       .for_community_id(community_id)
143       .limit(9999)
144       .list()?;
145
146     for post in posts {
147       self.send_room_message(post.id, message, skip_id);
148     }
149
150     Ok(())
151   }
152
153   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
154     self.check_rate_limit_full(
155       id,
156       Settings::get().rate_limit.register,
157       Settings::get().rate_limit.register_per_second,
158     )
159   }
160
161   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
162     self.check_rate_limit_full(
163       id,
164       Settings::get().rate_limit.post,
165       Settings::get().rate_limit.post_per_second,
166     )
167   }
168
169   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
170     self.check_rate_limit_full(
171       id,
172       Settings::get().rate_limit.message,
173       Settings::get().rate_limit.message_per_second,
174     )
175   }
176
177   #[allow(clippy::float_cmp)]
178   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
179     if let Some(info) = self.sessions.get(&id) {
180       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
181         // The initial value
182         if rate_limit.allowance == -2f64 {
183           rate_limit.allowance = rate as f64;
184         };
185
186         let current = SystemTime::now();
187         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
188         rate_limit.last_checked = current;
189         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
190         if rate_limit.allowance > rate as f64 {
191           rate_limit.allowance = rate as f64;
192         }
193
194         if rate_limit.allowance < 1.0 {
195           println!(
196             "Rate limited IP: {}, time_passed: {}, allowance: {}",
197             &info.ip, time_passed, rate_limit.allowance
198           );
199           Err(
200             APIError {
201               op: "Rate Limit".to_string(),
202               message: format!("Too many requests. {} per {} seconds", rate, per),
203             }
204             .into(),
205           )
206         } else {
207           rate_limit.allowance -= 1.0;
208           Ok(())
209         }
210       } else {
211         Ok(())
212       }
213     } else {
214       Ok(())
215     }
216   }
217 }
218
219 /// Make actor from `ChatServer`
220 impl Actor for ChatServer {
221   /// We are going to use simple Context, we just need ability to communicate
222   /// with other actors.
223   type Context = Context<Self>;
224 }
225
226 /// Handler for Connect message.
227 ///
228 /// Register new session and assign unique id to this session
229 impl Handler<Connect> for ChatServer {
230   type Result = usize;
231
232   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
233     // notify all users in same room
234     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
235
236     // register session with random id
237     let id = self.rng.gen::<usize>();
238     println!("{} joined", &msg.ip);
239
240     self.sessions.insert(
241       id,
242       SessionInfo {
243         addr: msg.addr,
244         ip: msg.ip.to_owned(),
245       },
246     );
247
248     if self.rate_limits.get(&msg.ip).is_none() {
249       self.rate_limits.insert(
250         msg.ip,
251         RateLimitBucket {
252           last_checked: SystemTime::now(),
253           allowance: -2f64,
254         },
255       );
256     }
257
258     id
259   }
260 }
261
262 /// Handler for Disconnect message.
263 impl Handler<Disconnect> for ChatServer {
264   type Result = ();
265
266   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
267     // let mut rooms: Vec<i32> = Vec::new();
268
269     // remove address
270     if self.sessions.remove(&msg.id).is_some() {
271       // remove session from all rooms
272       for sessions in self.rooms.values_mut() {
273         if sessions.remove(&msg.id) {
274           // rooms.push(*id);
275         }
276       }
277     }
278   }
279 }
280
281 /// Handler for Message message.
282 impl Handler<StandardMessage> for ChatServer {
283   type Result = MessageResult<StandardMessage>;
284
285   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
286     let msg_out = match parse_json_message(self, msg) {
287       Ok(m) => m,
288       Err(e) => e.to_string(),
289     };
290
291     MessageResult(msg_out)
292   }
293 }
294
295 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
296   let json: Value = serde_json::from_str(&msg.msg)?;
297   let data = &json["data"].to_string();
298   let op = &json["op"].as_str().ok_or(APIError {
299     op: "Unknown op type".to_string(),
300     message: "Unknown op type".to_string(),
301   })?;
302
303   let user_operation: UserOperation = UserOperation::from_str(&op)?;
304
305   match user_operation {
306     UserOperation::Login => {
307       let login: Login = serde_json::from_str(data)?;
308       let res = Oper::new(user_operation, login).perform()?;
309       Ok(serde_json::to_string(&res)?)
310     }
311     UserOperation::Register => {
312       let register: Register = serde_json::from_str(data)?;
313       let res = Oper::new(user_operation, register).perform();
314       if res.is_ok() {
315         chat.check_rate_limit_register(msg.id)?;
316       }
317       Ok(serde_json::to_string(&res?)?)
318     }
319     UserOperation::GetUserDetails => {
320       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
321       let res = Oper::new(user_operation, get_user_details).perform()?;
322       Ok(serde_json::to_string(&res)?)
323     }
324     UserOperation::SaveUserSettings => {
325       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
326       let res = Oper::new(user_operation, save_user_settings).perform()?;
327       Ok(serde_json::to_string(&res)?)
328     }
329     UserOperation::AddAdmin => {
330       let add_admin: AddAdmin = serde_json::from_str(data)?;
331       let res = Oper::new(user_operation, add_admin).perform()?;
332       Ok(serde_json::to_string(&res)?)
333     }
334     UserOperation::BanUser => {
335       let ban_user: BanUser = serde_json::from_str(data)?;
336       let res = Oper::new(user_operation, ban_user).perform()?;
337       Ok(serde_json::to_string(&res)?)
338     }
339     UserOperation::GetReplies => {
340       let get_replies: GetReplies = serde_json::from_str(data)?;
341       let res = Oper::new(user_operation, get_replies).perform()?;
342       Ok(serde_json::to_string(&res)?)
343     }
344     UserOperation::GetUserMentions => {
345       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
346       let res = Oper::new(user_operation, get_user_mentions).perform()?;
347       Ok(serde_json::to_string(&res)?)
348     }
349     UserOperation::EditUserMention => {
350       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
351       let res = Oper::new(user_operation, edit_user_mention).perform()?;
352       Ok(serde_json::to_string(&res)?)
353     }
354     UserOperation::MarkAllAsRead => {
355       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
356       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
357       Ok(serde_json::to_string(&res)?)
358     }
359     UserOperation::GetCommunity => {
360       let mut get_community: GetCommunity = serde_json::from_str(data)?;
361       if Settings::get().federation_enabled && get_community.name.is_some() {
362         let name = &get_community.name.unwrap();
363         let remote_community = if name.contains("@") {
364           // TODO: need to support sort, filter etc for remote communities
365           get_remote_community(name.to_owned())?
366         } else {
367           get_community.name = Some(name.replace("!", ""));
368           Oper::new(user_operation, get_community).perform()?
369         };
370         Ok(serde_json::to_string(&remote_community)?)
371       } else {
372         let res = Oper::new(user_operation, get_community).perform()?;
373         Ok(serde_json::to_string(&res)?)
374       }
375     }
376     UserOperation::ListCommunities => {
377       if Settings::get().federation_enabled {
378         let res = get_all_communities()?;
379         let val = ListCommunitiesResponse {
380           op: UserOperation::ListCommunities.to_string(),
381           communities: res,
382         };
383         Ok(serde_json::to_string(&val)?)
384       } else {
385         let list_communities: ListCommunities = serde_json::from_str(data)?;
386         let res = Oper::new(user_operation, list_communities).perform()?;
387         Ok(serde_json::to_string(&res)?)
388       }
389     }
390     UserOperation::CreateCommunity => {
391       chat.check_rate_limit_register(msg.id)?;
392       let create_community: CreateCommunity = serde_json::from_str(data)?;
393       let res = Oper::new(user_operation, create_community).perform()?;
394       Ok(serde_json::to_string(&res)?)
395     }
396     UserOperation::EditCommunity => {
397       let edit_community: EditCommunity = serde_json::from_str(data)?;
398       let res = Oper::new(user_operation, edit_community).perform()?;
399       let mut community_sent: CommunityResponse = res.clone();
400       community_sent.community.user_id = None;
401       community_sent.community.subscribed = None;
402       let community_sent_str = serde_json::to_string(&community_sent)?;
403       chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
404       Ok(serde_json::to_string(&res)?)
405     }
406     UserOperation::FollowCommunity => {
407       let follow_community: FollowCommunity = serde_json::from_str(data)?;
408       let res = Oper::new(user_operation, follow_community).perform()?;
409       Ok(serde_json::to_string(&res)?)
410     }
411     UserOperation::GetFollowedCommunities => {
412       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
413       let res = Oper::new(user_operation, followed_communities).perform()?;
414       Ok(serde_json::to_string(&res)?)
415     }
416     UserOperation::BanFromCommunity => {
417       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
418       let community_id = ban_from_community.community_id;
419       let res = Oper::new(user_operation, ban_from_community).perform()?;
420       let res_str = serde_json::to_string(&res)?;
421       chat.send_community_message(community_id, &res_str, msg.id)?;
422       Ok(res_str)
423     }
424     UserOperation::AddModToCommunity => {
425       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
426       let community_id = mod_add_to_community.community_id;
427       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
428       let res_str = serde_json::to_string(&res)?;
429       chat.send_community_message(community_id, &res_str, msg.id)?;
430       Ok(res_str)
431     }
432     UserOperation::ListCategories => {
433       let list_categories: ListCategories = ListCategories;
434       let res = Oper::new(user_operation, list_categories).perform()?;
435       Ok(serde_json::to_string(&res)?)
436     }
437     UserOperation::CreatePost => {
438       chat.check_rate_limit_post(msg.id)?;
439       let create_post: CreatePost = serde_json::from_str(data)?;
440       let res = Oper::new(user_operation, create_post).perform()?;
441       Ok(serde_json::to_string(&res)?)
442     }
443     UserOperation::GetPost => {
444       let get_post: GetPost = serde_json::from_str(data)?;
445       chat.join_room(get_post.id, msg.id);
446       let res = Oper::new(user_operation, get_post).perform()?;
447       Ok(serde_json::to_string(&res)?)
448     }
449     UserOperation::GetPosts => {
450       let get_posts: GetPosts = serde_json::from_str(data)?;
451       let res = Oper::new(user_operation, get_posts).perform()?;
452       Ok(serde_json::to_string(&res)?)
453     }
454     UserOperation::CreatePostLike => {
455       chat.check_rate_limit_message(msg.id)?;
456       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
457       let res = Oper::new(user_operation, create_post_like).perform()?;
458       Ok(serde_json::to_string(&res)?)
459     }
460     UserOperation::EditPost => {
461       let edit_post: EditPost = serde_json::from_str(data)?;
462       let res = Oper::new(user_operation, edit_post).perform()?;
463       let mut post_sent = res.clone();
464       post_sent.post.my_vote = None;
465       let post_sent_str = serde_json::to_string(&post_sent)?;
466       chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
467       Ok(serde_json::to_string(&res)?)
468     }
469     UserOperation::SavePost => {
470       let save_post: SavePost = serde_json::from_str(data)?;
471       let res = Oper::new(user_operation, save_post).perform()?;
472       Ok(serde_json::to_string(&res)?)
473     }
474     UserOperation::CreateComment => {
475       chat.check_rate_limit_message(msg.id)?;
476       let create_comment: CreateComment = serde_json::from_str(data)?;
477       let post_id = create_comment.post_id;
478       let res = Oper::new(user_operation, create_comment).perform()?;
479       let mut comment_sent = res.clone();
480       comment_sent.comment.my_vote = None;
481       comment_sent.comment.user_id = None;
482       let comment_sent_str = serde_json::to_string(&comment_sent)?;
483       chat.send_room_message(post_id, &comment_sent_str, msg.id);
484       Ok(serde_json::to_string(&res)?)
485     }
486     UserOperation::EditComment => {
487       let edit_comment: EditComment = serde_json::from_str(data)?;
488       let post_id = edit_comment.post_id;
489       let res = Oper::new(user_operation, edit_comment).perform()?;
490       let mut comment_sent = res.clone();
491       comment_sent.comment.my_vote = None;
492       comment_sent.comment.user_id = None;
493       let comment_sent_str = serde_json::to_string(&comment_sent)?;
494       chat.send_room_message(post_id, &comment_sent_str, msg.id);
495       Ok(serde_json::to_string(&res)?)
496     }
497     UserOperation::SaveComment => {
498       let save_comment: SaveComment = serde_json::from_str(data)?;
499       let res = Oper::new(user_operation, save_comment).perform()?;
500       Ok(serde_json::to_string(&res)?)
501     }
502     UserOperation::CreateCommentLike => {
503       chat.check_rate_limit_message(msg.id)?;
504       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
505       let post_id = create_comment_like.post_id;
506       let res = Oper::new(user_operation, create_comment_like).perform()?;
507       let mut comment_sent = res.clone();
508       comment_sent.comment.my_vote = None;
509       comment_sent.comment.user_id = None;
510       let comment_sent_str = serde_json::to_string(&comment_sent)?;
511       chat.send_room_message(post_id, &comment_sent_str, msg.id);
512       Ok(serde_json::to_string(&res)?)
513     }
514     UserOperation::GetModlog => {
515       let get_modlog: GetModlog = serde_json::from_str(data)?;
516       let res = Oper::new(user_operation, get_modlog).perform()?;
517       Ok(serde_json::to_string(&res)?)
518     }
519     UserOperation::CreateSite => {
520       let create_site: CreateSite = serde_json::from_str(data)?;
521       let res = Oper::new(user_operation, create_site).perform()?;
522       Ok(serde_json::to_string(&res)?)
523     }
524     UserOperation::EditSite => {
525       let edit_site: EditSite = serde_json::from_str(data)?;
526       let res = Oper::new(user_operation, edit_site).perform()?;
527       Ok(serde_json::to_string(&res)?)
528     }
529     UserOperation::GetSite => {
530       let online: usize = chat.sessions.len();
531       let get_site: GetSite = serde_json::from_str(data)?;
532       let mut res = Oper::new(user_operation, get_site).perform()?;
533       res.online = online;
534       Ok(serde_json::to_string(&res)?)
535     }
536     UserOperation::Search => {
537       let search: Search = serde_json::from_str(data)?;
538       let res = Oper::new(user_operation, search).perform()?;
539       Ok(serde_json::to_string(&res)?)
540     }
541     UserOperation::TransferCommunity => {
542       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
543       let res = Oper::new(user_operation, transfer_community).perform()?;
544       Ok(serde_json::to_string(&res)?)
545     }
546     UserOperation::TransferSite => {
547       let transfer_site: TransferSite = serde_json::from_str(data)?;
548       let res = Oper::new(user_operation, transfer_site).perform()?;
549       Ok(serde_json::to_string(&res)?)
550     }
551     UserOperation::DeleteAccount => {
552       let delete_account: DeleteAccount = serde_json::from_str(data)?;
553       let res = Oper::new(user_operation, delete_account).perform()?;
554       Ok(serde_json::to_string(&res)?)
555     }
556     UserOperation::PasswordReset => {
557       let password_reset: PasswordReset = serde_json::from_str(data)?;
558       let res = Oper::new(user_operation, password_reset).perform()?;
559       Ok(serde_json::to_string(&res)?)
560     }
561     UserOperation::PasswordChange => {
562       let password_change: PasswordChange = serde_json::from_str(data)?;
563       let res = Oper::new(user_operation, password_change).perform()?;
564       Ok(serde_json::to_string(&res)?)
565     }
566   }
567 }