]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Implementing very basic federation including test setup
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use failure::Error;
7 use rand::{rngs::ThreadRng, Rng};
8 use serde::{Deserialize, Serialize};
9 use serde_json::Value;
10 use std::collections::{HashMap, HashSet};
11 use std::str::FromStr;
12 use std::time::SystemTime;
13
14 use crate::api::comment::*;
15 use crate::api::community::*;
16 use crate::api::post::*;
17 use crate::api::site::*;
18 use crate::api::user::*;
19 use crate::api::*;
20 use crate::apub::puller::*;
21 use crate::Settings;
22
23 /// Chat server sends this messages to session
24 #[derive(Message)]
25 pub struct WSMessage(pub String);
26
27 /// Message for chat server communications
28
29 /// New chat session is created
30 #[derive(Message)]
31 #[rtype(usize)]
32 pub struct Connect {
33   pub addr: Recipient<WSMessage>,
34   pub ip: String,
35 }
36
37 /// Session is disconnected
38 #[derive(Message)]
39 pub struct Disconnect {
40   pub id: usize,
41   pub ip: String,
42 }
43
44 /// Send message to specific room
45 #[derive(Message)]
46 pub struct ClientMessage {
47   /// Id of the client session
48   pub id: usize,
49   /// Peer message
50   pub msg: String,
51   /// Room name
52   pub room: String,
53 }
54
55 #[derive(Serialize, Deserialize)]
56 pub struct StandardMessage {
57   /// Id of the client session
58   pub id: usize,
59   /// Peer message
60   pub msg: String,
61 }
62
63 impl actix::Message for StandardMessage {
64   type Result = String;
65 }
66
67 #[derive(Debug)]
68 pub struct RateLimitBucket {
69   last_checked: SystemTime,
70   allowance: f64,
71 }
72
73 pub struct SessionInfo {
74   pub addr: Recipient<WSMessage>,
75   pub ip: String,
76 }
77
78 /// `ChatServer` manages chat rooms and responsible for coordinating chat
79 /// session. implementation is super primitive
80 pub struct ChatServer {
81   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
82   rate_limits: HashMap<String, RateLimitBucket>,
83   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
84   rng: ThreadRng,
85 }
86
87 impl Default for ChatServer {
88   fn default() -> ChatServer {
89     // default room
90     let rooms = HashMap::new();
91
92     ChatServer {
93       sessions: HashMap::new(),
94       rate_limits: HashMap::new(),
95       rooms: rooms,
96       rng: rand::thread_rng(),
97     }
98   }
99 }
100
101 impl ChatServer {
102   /// Send message to all users in the room
103   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
104     if let Some(sessions) = self.rooms.get(room) {
105       for id in sessions {
106         if *id != skip_id {
107           if let Some(info) = self.sessions.get(id) {
108             let _ = info.addr.do_send(WSMessage(message.to_owned()));
109           }
110         }
111       }
112     }
113   }
114
115   fn join_room(&mut self, room_id: i32, id: usize) {
116     // remove session from all rooms
117     for (_n, sessions) in &mut self.rooms {
118       sessions.remove(&id);
119     }
120
121     // If the room doesn't exist yet
122     if self.rooms.get_mut(&room_id).is_none() {
123       self.rooms.insert(room_id, HashSet::new());
124     }
125
126     &self.rooms.get_mut(&room_id).unwrap().insert(id);
127   }
128
129   fn send_community_message(
130     &self,
131     community_id: &i32,
132     message: &str,
133     skip_id: usize,
134   ) -> Result<(), Error> {
135     use crate::db::post_view::*;
136     use crate::db::*;
137     let conn = establish_connection();
138
139     let posts = PostQueryBuilder::create(&conn)
140       .listing_type(ListingType::Community)
141       .sort(&SortType::New)
142       .for_community_id(*community_id)
143       .limit(9999)
144       .list()?;
145
146     for post in posts {
147       self.send_room_message(&post.id, message, skip_id);
148     }
149
150     Ok(())
151   }
152
153   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
154     self.check_rate_limit_full(
155       id,
156       Settings::get().rate_limit.register,
157       Settings::get().rate_limit.register_per_second,
158     )
159   }
160
161   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
162     self.check_rate_limit_full(
163       id,
164       Settings::get().rate_limit.post,
165       Settings::get().rate_limit.post_per_second,
166     )
167   }
168
169   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
170     self.check_rate_limit_full(
171       id,
172       Settings::get().rate_limit.message,
173       Settings::get().rate_limit.message_per_second,
174     )
175   }
176
177   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
178     if let Some(info) = self.sessions.get(&id) {
179       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
180         // The initial value
181         if rate_limit.allowance == -2f64 {
182           rate_limit.allowance = rate as f64;
183         };
184
185         let current = SystemTime::now();
186         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
187         rate_limit.last_checked = current;
188         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
189         if rate_limit.allowance > rate as f64 {
190           rate_limit.allowance = rate as f64;
191         }
192
193         if rate_limit.allowance < 1.0 {
194           println!(
195             "Rate limited IP: {}, time_passed: {}, allowance: {}",
196             &info.ip, time_passed, rate_limit.allowance
197           );
198           Err(APIError {
199             op: "Rate Limit".to_string(),
200             message: format!("Too many requests. {} per {} seconds", rate, per),
201           })?
202         } else {
203           rate_limit.allowance -= 1.0;
204           Ok(())
205         }
206       } else {
207         Ok(())
208       }
209     } else {
210       Ok(())
211     }
212   }
213 }
214
215 /// Make actor from `ChatServer`
216 impl Actor for ChatServer {
217   /// We are going to use simple Context, we just need ability to communicate
218   /// with other actors.
219   type Context = Context<Self>;
220 }
221
222 /// Handler for Connect message.
223 ///
224 /// Register new session and assign unique id to this session
225 impl Handler<Connect> for ChatServer {
226   type Result = usize;
227
228   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
229     // notify all users in same room
230     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
231
232     // register session with random id
233     let id = self.rng.gen::<usize>();
234     println!("{} joined", &msg.ip);
235
236     self.sessions.insert(
237       id,
238       SessionInfo {
239         addr: msg.addr,
240         ip: msg.ip.to_owned(),
241       },
242     );
243
244     if self.rate_limits.get(&msg.ip).is_none() {
245       self.rate_limits.insert(
246         msg.ip,
247         RateLimitBucket {
248           last_checked: SystemTime::now(),
249           allowance: -2f64,
250         },
251       );
252     }
253
254     id
255   }
256 }
257
258 /// Handler for Disconnect message.
259 impl Handler<Disconnect> for ChatServer {
260   type Result = ();
261
262   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
263     // let mut rooms: Vec<i32> = Vec::new();
264
265     // remove address
266     if self.sessions.remove(&msg.id).is_some() {
267       // remove session from all rooms
268       for (_id, sessions) in &mut self.rooms {
269         if sessions.remove(&msg.id) {
270           // rooms.push(*id);
271         }
272       }
273     }
274   }
275 }
276
277 /// Handler for Message message.
278 impl Handler<StandardMessage> for ChatServer {
279   type Result = MessageResult<StandardMessage>;
280
281   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
282     let msg_out = match parse_json_message(self, msg) {
283       Ok(m) => m,
284       Err(e) => e.to_string(),
285     };
286
287     MessageResult(msg_out)
288   }
289 }
290
291 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
292   let json: Value = serde_json::from_str(&msg.msg)?;
293   let data = &json["data"].to_string();
294   let op = &json["op"].as_str().ok_or(APIError {
295     op: "Unknown op type".to_string(),
296     message: format!("Unknown op type"),
297   })?;
298
299   let user_operation: UserOperation = UserOperation::from_str(&op)?;
300
301   match user_operation {
302     UserOperation::Login => {
303       let login: Login = serde_json::from_str(data)?;
304       let res = Oper::new(user_operation, login).perform()?;
305       Ok(serde_json::to_string(&res)?)
306     }
307     UserOperation::Register => {
308       let register: Register = serde_json::from_str(data)?;
309       let res = Oper::new(user_operation, register).perform();
310       if res.is_ok() {
311         chat.check_rate_limit_register(msg.id)?;
312       }
313       Ok(serde_json::to_string(&res?)?)
314     }
315     UserOperation::GetUserDetails => {
316       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
317       let res = Oper::new(user_operation, get_user_details).perform()?;
318       Ok(serde_json::to_string(&res)?)
319     }
320     UserOperation::SaveUserSettings => {
321       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
322       let res = Oper::new(user_operation, save_user_settings).perform()?;
323       Ok(serde_json::to_string(&res)?)
324     }
325     UserOperation::AddAdmin => {
326       let add_admin: AddAdmin = serde_json::from_str(data)?;
327       let res = Oper::new(user_operation, add_admin).perform()?;
328       Ok(serde_json::to_string(&res)?)
329     }
330     UserOperation::BanUser => {
331       let ban_user: BanUser = serde_json::from_str(data)?;
332       let res = Oper::new(user_operation, ban_user).perform()?;
333       Ok(serde_json::to_string(&res)?)
334     }
335     UserOperation::GetReplies => {
336       let get_replies: GetReplies = serde_json::from_str(data)?;
337       let res = Oper::new(user_operation, get_replies).perform()?;
338       Ok(serde_json::to_string(&res)?)
339     }
340     UserOperation::GetUserMentions => {
341       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
342       let res = Oper::new(user_operation, get_user_mentions).perform()?;
343       Ok(serde_json::to_string(&res)?)
344     }
345     UserOperation::EditUserMention => {
346       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
347       let res = Oper::new(user_operation, edit_user_mention).perform()?;
348       Ok(serde_json::to_string(&res)?)
349     }
350     UserOperation::MarkAllAsRead => {
351       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
352       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
353       Ok(serde_json::to_string(&res)?)
354     }
355     UserOperation::GetCommunity => {
356       let mut get_community: GetCommunity = serde_json::from_str(data)?;
357       if Settings::get().federation_enabled && get_community.name.is_some() {
358         let name = &get_community.name.unwrap();
359         let remote_community = if name.contains("@") {
360           // TODO: need to support sort, filter etc for remote communities
361           get_remote_community(name.to_owned())?
362         } else {
363           get_community.name = Some(name.replace("!", ""));
364           Oper::new(user_operation, get_community).perform()?
365         };
366         Ok(serde_json::to_string(&remote_community)?)
367       } else {
368         let res = Oper::new(user_operation, get_community).perform()?;
369         Ok(serde_json::to_string(&res)?)
370       }
371     }
372     UserOperation::ListCommunities => {
373       if Settings::get().federation_enabled {
374         let res = get_all_communities()?;
375         let val = ListCommunitiesResponse {
376           op: UserOperation::ListCommunities.to_string(),
377           communities: res,
378         };
379         Ok(serde_json::to_string(&val)?)
380       } else {
381         let list_communities: ListCommunities = serde_json::from_str(data)?;
382         let res = Oper::new(user_operation, list_communities).perform()?;
383         Ok(serde_json::to_string(&res)?)
384       }
385     }
386     UserOperation::CreateCommunity => {
387       chat.check_rate_limit_register(msg.id)?;
388       let create_community: CreateCommunity = serde_json::from_str(data)?;
389       let res = Oper::new(user_operation, create_community).perform()?;
390       Ok(serde_json::to_string(&res)?)
391     }
392     UserOperation::EditCommunity => {
393       let edit_community: EditCommunity = serde_json::from_str(data)?;
394       let res = Oper::new(user_operation, edit_community).perform()?;
395       let mut community_sent: CommunityResponse = res.clone();
396       community_sent.community.user_id = None;
397       community_sent.community.subscribed = None;
398       let community_sent_str = serde_json::to_string(&community_sent)?;
399       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
400       Ok(serde_json::to_string(&res)?)
401     }
402     UserOperation::FollowCommunity => {
403       let follow_community: FollowCommunity = serde_json::from_str(data)?;
404       let res = Oper::new(user_operation, follow_community).perform()?;
405       Ok(serde_json::to_string(&res)?)
406     }
407     UserOperation::GetFollowedCommunities => {
408       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
409       let res = Oper::new(user_operation, followed_communities).perform()?;
410       Ok(serde_json::to_string(&res)?)
411     }
412     UserOperation::BanFromCommunity => {
413       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
414       let community_id = ban_from_community.community_id;
415       let res = Oper::new(user_operation, ban_from_community).perform()?;
416       let res_str = serde_json::to_string(&res)?;
417       chat.send_community_message(&community_id, &res_str, msg.id)?;
418       Ok(res_str)
419     }
420     UserOperation::AddModToCommunity => {
421       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
422       let community_id = mod_add_to_community.community_id;
423       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
424       let res_str = serde_json::to_string(&res)?;
425       chat.send_community_message(&community_id, &res_str, msg.id)?;
426       Ok(res_str)
427     }
428     UserOperation::ListCategories => {
429       let list_categories: ListCategories = ListCategories;
430       let res = Oper::new(user_operation, list_categories).perform()?;
431       Ok(serde_json::to_string(&res)?)
432     }
433     UserOperation::CreatePost => {
434       chat.check_rate_limit_post(msg.id)?;
435       let create_post: CreatePost = serde_json::from_str(data)?;
436       let res = Oper::new(user_operation, create_post).perform()?;
437       Ok(serde_json::to_string(&res)?)
438     }
439     UserOperation::GetPost => {
440       let get_post: GetPost = serde_json::from_str(data)?;
441       chat.join_room(get_post.id, msg.id);
442       let res = Oper::new(user_operation, get_post).perform()?;
443       Ok(serde_json::to_string(&res)?)
444     }
445     UserOperation::GetPosts => {
446       let get_posts: GetPosts = serde_json::from_str(data)?;
447       let res = Oper::new(user_operation, get_posts).perform()?;
448       Ok(serde_json::to_string(&res)?)
449     }
450     UserOperation::CreatePostLike => {
451       chat.check_rate_limit_message(msg.id)?;
452       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
453       let res = Oper::new(user_operation, create_post_like).perform()?;
454       Ok(serde_json::to_string(&res)?)
455     }
456     UserOperation::EditPost => {
457       let edit_post: EditPost = serde_json::from_str(data)?;
458       let res = Oper::new(user_operation, edit_post).perform()?;
459       let mut post_sent = res.clone();
460       post_sent.post.my_vote = None;
461       let post_sent_str = serde_json::to_string(&post_sent)?;
462       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
463       Ok(serde_json::to_string(&res)?)
464     }
465     UserOperation::SavePost => {
466       let save_post: SavePost = serde_json::from_str(data)?;
467       let res = Oper::new(user_operation, save_post).perform()?;
468       Ok(serde_json::to_string(&res)?)
469     }
470     UserOperation::CreateComment => {
471       chat.check_rate_limit_message(msg.id)?;
472       let create_comment: CreateComment = serde_json::from_str(data)?;
473       let post_id = create_comment.post_id;
474       let res = Oper::new(user_operation, create_comment).perform()?;
475       let mut comment_sent = res.clone();
476       comment_sent.comment.my_vote = None;
477       comment_sent.comment.user_id = None;
478       let comment_sent_str = serde_json::to_string(&comment_sent)?;
479       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
480       Ok(serde_json::to_string(&res)?)
481     }
482     UserOperation::EditComment => {
483       let edit_comment: EditComment = serde_json::from_str(data)?;
484       let post_id = edit_comment.post_id;
485       let res = Oper::new(user_operation, edit_comment).perform()?;
486       let mut comment_sent = res.clone();
487       comment_sent.comment.my_vote = None;
488       comment_sent.comment.user_id = None;
489       let comment_sent_str = serde_json::to_string(&comment_sent)?;
490       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
491       Ok(serde_json::to_string(&res)?)
492     }
493     UserOperation::SaveComment => {
494       let save_comment: SaveComment = serde_json::from_str(data)?;
495       let res = Oper::new(user_operation, save_comment).perform()?;
496       Ok(serde_json::to_string(&res)?)
497     }
498     UserOperation::CreateCommentLike => {
499       chat.check_rate_limit_message(msg.id)?;
500       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
501       let post_id = create_comment_like.post_id;
502       let res = Oper::new(user_operation, create_comment_like).perform()?;
503       let mut comment_sent = res.clone();
504       comment_sent.comment.my_vote = None;
505       comment_sent.comment.user_id = None;
506       let comment_sent_str = serde_json::to_string(&comment_sent)?;
507       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
508       Ok(serde_json::to_string(&res)?)
509     }
510     UserOperation::GetModlog => {
511       let get_modlog: GetModlog = serde_json::from_str(data)?;
512       let res = Oper::new(user_operation, get_modlog).perform()?;
513       Ok(serde_json::to_string(&res)?)
514     }
515     UserOperation::CreateSite => {
516       let create_site: CreateSite = serde_json::from_str(data)?;
517       let res = Oper::new(user_operation, create_site).perform()?;
518       Ok(serde_json::to_string(&res)?)
519     }
520     UserOperation::EditSite => {
521       let edit_site: EditSite = serde_json::from_str(data)?;
522       let res = Oper::new(user_operation, edit_site).perform()?;
523       Ok(serde_json::to_string(&res)?)
524     }
525     UserOperation::GetSite => {
526       let online: usize = chat.sessions.len();
527       let get_site: GetSite = serde_json::from_str(data)?;
528       let mut res = Oper::new(user_operation, get_site).perform()?;
529       res.online = online;
530       Ok(serde_json::to_string(&res)?)
531     }
532     UserOperation::Search => {
533       let search: Search = serde_json::from_str(data)?;
534       let res = Oper::new(user_operation, search).perform()?;
535       Ok(serde_json::to_string(&res)?)
536     }
537     UserOperation::TransferCommunity => {
538       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
539       let res = Oper::new(user_operation, transfer_community).perform()?;
540       Ok(serde_json::to_string(&res)?)
541     }
542     UserOperation::TransferSite => {
543       let transfer_site: TransferSite = serde_json::from_str(data)?;
544       let res = Oper::new(user_operation, transfer_site).perform()?;
545       Ok(serde_json::to_string(&res)?)
546     }
547     UserOperation::DeleteAccount => {
548       let delete_account: DeleteAccount = serde_json::from_str(data)?;
549       let res = Oper::new(user_operation, delete_account).perform()?;
550       Ok(serde_json::to_string(&res)?)
551     }
552     UserOperation::PasswordReset => {
553       let password_reset: PasswordReset = serde_json::from_str(data)?;
554       let res = Oper::new(user_operation, password_reset).perform()?;
555       Ok(serde_json::to_string(&res)?)
556     }
557     UserOperation::PasswordChange => {
558       let password_change: PasswordChange = serde_json::from_str(data)?;
559       let res = Oper::new(user_operation, password_change).perform()?;
560       Ok(serde_json::to_string(&res)?)
561     }
562   }
563 }