]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Merge branch 'transfer_community' into dev
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use rand::{rngs::ThreadRng, Rng};
7 use std::collections::{HashMap, HashSet};
8 use serde::{Deserialize, Serialize};
9 use serde_json::{Value};
10 use std::str::FromStr;
11 use failure::Error;
12 use std::time::{SystemTime};
13
14 use crate::api::*;
15 use crate::api::user::*;
16 use crate::api::community::*;
17 use crate::api::post::*;
18 use crate::api::comment::*;
19 use crate::api::site::*;
20
21 const RATE_LIMIT_MESSAGES: i32 = 30;
22 const RATE_LIMIT_PER_SECOND: i32 = 60;
23 const RATE_LIMIT_REGISTER_MESSAGES: i32 = 3;
24 const RATE_LIMIT_REGISTER_PER_SECOND: i32 = 60*3;
25
26
27 /// Chat server sends this messages to session
28 #[derive(Message)]
29 pub struct WSMessage(pub String);
30
31 /// Message for chat server communications
32
33 /// New chat session is created
34 #[derive(Message)]
35 #[rtype(usize)]
36 pub struct Connect {
37   pub addr: Recipient<WSMessage>,
38   pub ip: String,
39 }
40
41 /// Session is disconnected
42 #[derive(Message)]
43 pub struct Disconnect {
44   pub id: usize,
45   pub ip: String,
46 }
47
48 /// Send message to specific room
49 #[derive(Message)]
50 pub struct ClientMessage {
51   /// Id of the client session
52   pub id: usize,
53   /// Peer message
54   pub msg: String,
55   /// Room name
56   pub room: String,
57 }
58
59 #[derive(Serialize, Deserialize)]
60 pub struct StandardMessage {
61   /// Id of the client session
62   pub id: usize,
63   /// Peer message
64   pub msg: String,
65 }
66
67 impl actix::Message for StandardMessage {
68   type Result = String;
69 }
70
71 #[derive(Debug)]
72 pub struct RateLimitBucket {
73   last_checked: SystemTime,
74   allowance: f64
75 }
76
77 pub struct SessionInfo {
78   pub addr: Recipient<WSMessage>,
79   pub ip: String,
80 }
81
82 /// `ChatServer` manages chat rooms and responsible for coordinating chat
83 /// session. implementation is super primitive
84 pub struct ChatServer {
85   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
86   rate_limits: HashMap<String, RateLimitBucket>,
87   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
88   rng: ThreadRng,
89 }
90
91 impl Default for ChatServer {
92   fn default() -> ChatServer {
93     // default room
94     let rooms = HashMap::new();
95
96     ChatServer {
97       sessions: HashMap::new(),
98       rate_limits: HashMap::new(),
99       rooms: rooms,
100       rng: rand::thread_rng(),
101     }
102   }
103 }
104
105 impl ChatServer {
106   /// Send message to all users in the room
107   fn send_room_message(&self, room: &i32, message: &str, skip_id: usize) {
108     if let Some(sessions) = self.rooms.get(room) {
109       for id in sessions {
110         if *id != skip_id {
111           if let Some(info) = self.sessions.get(id) {
112             let _ = info.addr.do_send(WSMessage(message.to_owned()));
113           }
114         }
115       }
116     }
117   }
118
119   fn join_room(&mut self, room_id: i32, id: usize) {
120     // remove session from all rooms
121     for (_n, sessions) in &mut self.rooms {
122       sessions.remove(&id);
123     }
124
125     // If the room doesn't exist yet
126     if self.rooms.get_mut(&room_id).is_none() {
127       self.rooms.insert(room_id, HashSet::new());
128     }
129
130     &self.rooms.get_mut(&room_id).unwrap().insert(id);
131   }
132
133   fn send_community_message(&self, community_id: &i32, message: &str, skip_id: usize) -> Result<(), Error> {
134     use crate::db::*;
135     use crate::db::post_view::*;
136     let conn = establish_connection();
137     let posts = PostView::list(
138       &conn,
139       PostListingType::Community, 
140       &SortType::New, 
141       Some(*community_id), 
142       None,
143       None, 
144       None,
145       None,
146       false,
147       false,
148       false,
149       None,
150       Some(9999))?;
151     for post in posts {
152       self.send_room_message(&post.id, message, skip_id);
153     }
154
155     Ok(())
156   }
157
158   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
159     self.check_rate_limit_full(id, RATE_LIMIT_REGISTER_MESSAGES, RATE_LIMIT_REGISTER_PER_SECOND)
160   }
161
162   fn check_rate_limit(&mut self, id: usize) -> Result<(), Error> {
163     self.check_rate_limit_full(id, RATE_LIMIT_MESSAGES, RATE_LIMIT_PER_SECOND)
164   }
165
166   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
167     if let Some(info) = self.sessions.get(&id) {
168       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
169         // The initial value
170         if rate_limit.allowance == -2f64 {
171           rate_limit.allowance = rate as f64;
172         };
173
174         let current = SystemTime::now();
175         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
176         rate_limit.last_checked = current;
177         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
178         if rate_limit.allowance > rate as f64 {
179           rate_limit.allowance = rate as f64;
180         }
181
182         if rate_limit.allowance < 1.0 {
183           println!("Rate limited IP: {}, time_passed: {}, allowance: {}", &info.ip, time_passed, rate_limit.allowance);
184           Err(APIError {
185             op: "Rate Limit".to_string(), 
186             message: format!("Too many requests. {} per {} seconds", rate, per),
187           })?
188         } else {
189           rate_limit.allowance -= 1.0;
190           Ok(())
191         }
192       } else {
193         Ok(())
194       }
195     } else {
196       Ok(())
197     }
198   }
199 }
200
201
202 /// Make actor from `ChatServer`
203 impl Actor for ChatServer {
204   /// We are going to use simple Context, we just need ability to communicate
205   /// with other actors.
206   type Context = Context<Self>;
207 }
208
209 /// Handler for Connect message.
210 ///
211 /// Register new session and assign unique id to this session
212 impl Handler<Connect> for ChatServer {
213   type Result = usize;
214
215   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
216
217     // notify all users in same room
218     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
219
220     // register session with random id
221     let id = self.rng.gen::<usize>();
222     println!("{} joined", &msg.ip);
223
224     self.sessions.insert(id, SessionInfo {
225       addr: msg.addr,
226       ip: msg.ip.to_owned(),
227     });
228
229     if self.rate_limits.get(&msg.ip).is_none() {
230       self.rate_limits.insert(msg.ip, RateLimitBucket {
231         last_checked: SystemTime::now(),
232         allowance: -2f64,
233       });
234     }
235
236     // for (k,v) in &self.rate_limits {
237     //   println!("{}: {:?}", k,v);
238     // }
239
240     // auto join session to Main room
241     // self.rooms.get_mut(&"Main".to_owned()).unwrap().insert(id);
242
243     // send id back
244     id
245   }
246 }
247
248 /// Handler for Disconnect message.
249 impl Handler<Disconnect> for ChatServer {
250   type Result = ();
251
252   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
253
254     // let mut rooms: Vec<i32> = Vec::new();
255
256     // remove address
257     if self.sessions.remove(&msg.id).is_some() {
258       // remove session from all rooms
259       for (_id, sessions) in &mut self.rooms {
260         if sessions.remove(&msg.id) {
261           // rooms.push(*id);
262         }
263       }
264     }
265   }
266 }
267
268 /// Handler for Message message.
269 impl Handler<StandardMessage> for ChatServer {
270   type Result = MessageResult<StandardMessage>;
271
272
273   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
274
275     let msg_out = match parse_json_message(self, msg) {
276       Ok(m) => m,
277       Err(e) => e.to_string()
278     };
279
280     MessageResult(msg_out)
281   }
282 }
283
284 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
285
286   let json: Value = serde_json::from_str(&msg.msg)?;
287   let data = &json["data"].to_string();
288   let op = &json["op"].as_str().unwrap();
289
290   let user_operation: UserOperation = UserOperation::from_str(&op)?;
291
292   match user_operation {
293     UserOperation::Login => {
294       let login: Login = serde_json::from_str(data)?;
295       let res = Oper::new(user_operation, login).perform()?;
296       Ok(serde_json::to_string(&res)?)
297     },
298     UserOperation::Register => {
299       chat.check_rate_limit_register(msg.id)?;
300       let register: Register = serde_json::from_str(data)?;
301       let res = Oper::new(user_operation, register).perform()?;
302       Ok(serde_json::to_string(&res)?)
303     },
304     UserOperation::GetUserDetails => {
305       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
306       let res = Oper::new(user_operation, get_user_details).perform()?;
307       Ok(serde_json::to_string(&res)?)
308     },
309     UserOperation::SaveUserSettings => {
310       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
311       let res = Oper::new(user_operation, save_user_settings).perform()?;
312       Ok(serde_json::to_string(&res)?)
313     },
314     UserOperation::AddAdmin => {
315       let add_admin: AddAdmin = serde_json::from_str(data)?;
316       let res = Oper::new(user_operation, add_admin).perform()?;
317       Ok(serde_json::to_string(&res)?)
318     },
319     UserOperation::BanUser => {
320       let ban_user: BanUser = serde_json::from_str(data)?;
321       let res = Oper::new(user_operation, ban_user).perform()?;
322       Ok(serde_json::to_string(&res)?)
323     },
324     UserOperation::GetReplies => {
325       let get_replies: GetReplies = serde_json::from_str(data)?;
326       let res = Oper::new(user_operation, get_replies).perform()?;
327       Ok(serde_json::to_string(&res)?)
328     },
329     UserOperation::MarkAllAsRead => {
330       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
331       let res = Oper::new(user_operation, mark_all_as_read).perform()?;
332       Ok(serde_json::to_string(&res)?)
333     },
334     UserOperation::GetCommunity => {
335       let get_community: GetCommunity = serde_json::from_str(data)?;
336       let res = Oper::new(user_operation, get_community).perform()?;
337       Ok(serde_json::to_string(&res)?)
338     },
339     UserOperation::ListCommunities => {
340       let list_communities: ListCommunities = serde_json::from_str(data)?;
341       let res = Oper::new(user_operation, list_communities).perform()?;
342       Ok(serde_json::to_string(&res)?)
343     },
344     UserOperation::CreateCommunity => {
345       chat.check_rate_limit_register(msg.id)?;
346       let create_community: CreateCommunity = serde_json::from_str(data)?;
347       let res = Oper::new(user_operation, create_community).perform()?;
348       Ok(serde_json::to_string(&res)?)
349     },
350     UserOperation::EditCommunity => {
351       let edit_community: EditCommunity = serde_json::from_str(data)?;
352       let res = Oper::new(user_operation, edit_community).perform()?;
353       let mut community_sent: CommunityResponse = res.clone();
354       community_sent.community.user_id = None;
355       community_sent.community.subscribed = None;
356       let community_sent_str = serde_json::to_string(&community_sent)?;
357       chat.send_community_message(&community_sent.community.id, &community_sent_str, msg.id)?;
358       Ok(serde_json::to_string(&res)?)
359     },
360     UserOperation::FollowCommunity => {
361       let follow_community: FollowCommunity = serde_json::from_str(data)?;
362       let res = Oper::new(user_operation, follow_community).perform()?;
363       Ok(serde_json::to_string(&res)?)
364     },
365     UserOperation::GetFollowedCommunities => {
366       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
367       let res = Oper::new(user_operation, followed_communities).perform()?;
368       Ok(serde_json::to_string(&res)?)
369     },
370     UserOperation::BanFromCommunity => {
371       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
372       let community_id = ban_from_community.community_id;
373       let res = Oper::new(user_operation, ban_from_community).perform()?;
374       let res_str = serde_json::to_string(&res)?;
375       chat.send_community_message(&community_id, &res_str, msg.id)?;
376       Ok(res_str)
377     },
378     UserOperation::AddModToCommunity => {
379       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
380       let community_id = mod_add_to_community.community_id;
381       let res = Oper::new(user_operation, mod_add_to_community).perform()?;
382       let res_str = serde_json::to_string(&res)?;
383       chat.send_community_message(&community_id, &res_str, msg.id)?;
384       Ok(res_str)
385     },
386     UserOperation::ListCategories => {
387       let list_categories: ListCategories = ListCategories;
388       let res = Oper::new(user_operation, list_categories).perform()?;
389       Ok(serde_json::to_string(&res)?)
390     },
391     UserOperation::CreatePost => {
392       chat.check_rate_limit_register(msg.id)?;
393       let create_post: CreatePost = serde_json::from_str(data)?;
394       let res = Oper::new(user_operation, create_post).perform()?;
395       Ok(serde_json::to_string(&res)?)
396     },
397     UserOperation::GetPost => {
398       let get_post: GetPost = serde_json::from_str(data)?;
399       chat.join_room(get_post.id, msg.id);
400       let res = Oper::new(user_operation, get_post).perform()?;
401       Ok(serde_json::to_string(&res)?)
402     },
403     UserOperation::GetPosts => {
404       let get_posts: GetPosts = serde_json::from_str(data)?;
405       let res = Oper::new(user_operation, get_posts).perform()?;
406       Ok(serde_json::to_string(&res)?)
407     },
408     UserOperation::CreatePostLike => {
409       chat.check_rate_limit(msg.id)?;
410       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
411       let res = Oper::new(user_operation, create_post_like).perform()?;
412       Ok(serde_json::to_string(&res)?)
413     },
414     UserOperation::EditPost => {
415       let edit_post: EditPost = serde_json::from_str(data)?;
416       let res = Oper::new(user_operation, edit_post).perform()?;
417       let mut post_sent = res.clone();
418       post_sent.post.my_vote = None;
419       let post_sent_str = serde_json::to_string(&post_sent)?;
420       chat.send_room_message(&post_sent.post.id, &post_sent_str, msg.id);
421       Ok(serde_json::to_string(&res)?)
422     },
423     UserOperation::SavePost => {
424       let save_post: SavePost = serde_json::from_str(data)?;
425       let res = Oper::new(user_operation, save_post).perform()?;
426       Ok(serde_json::to_string(&res)?)
427     },
428     UserOperation::CreateComment => {
429       chat.check_rate_limit(msg.id)?;
430       let create_comment: CreateComment = serde_json::from_str(data)?;
431       let post_id = create_comment.post_id;
432       let res = Oper::new(user_operation, create_comment).perform()?;
433       let mut comment_sent = res.clone();
434       comment_sent.comment.my_vote = None;
435       comment_sent.comment.user_id = None;
436       let comment_sent_str = serde_json::to_string(&comment_sent)?;
437       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
438       Ok(serde_json::to_string(&res)?)
439     },
440     UserOperation::EditComment => {
441       let edit_comment: EditComment = serde_json::from_str(data)?;
442       let post_id = edit_comment.post_id;
443       let res = Oper::new(user_operation, edit_comment).perform()?;
444       let mut comment_sent = res.clone();
445       comment_sent.comment.my_vote = None;
446       comment_sent.comment.user_id = None;
447       let comment_sent_str = serde_json::to_string(&comment_sent)?;
448       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
449       Ok(serde_json::to_string(&res)?)
450     },
451     UserOperation::SaveComment => {
452       let save_comment: SaveComment = serde_json::from_str(data)?;
453       let res = Oper::new(user_operation, save_comment).perform()?;
454       Ok(serde_json::to_string(&res)?)
455     },
456     UserOperation::CreateCommentLike => {
457       chat.check_rate_limit(msg.id)?;
458       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
459       let post_id = create_comment_like.post_id;
460       let res = Oper::new(user_operation, create_comment_like).perform()?;
461       let mut comment_sent = res.clone();
462       comment_sent.comment.my_vote = None;
463       comment_sent.comment.user_id = None;
464       let comment_sent_str = serde_json::to_string(&comment_sent)?;
465       chat.send_room_message(&post_id, &comment_sent_str, msg.id);
466       Ok(serde_json::to_string(&res)?)
467     },
468     UserOperation::GetModlog => {
469       let get_modlog: GetModlog = serde_json::from_str(data)?;
470       let res = Oper::new(user_operation, get_modlog).perform()?;
471       Ok(serde_json::to_string(&res)?)
472     },
473     UserOperation::CreateSite => {
474       let create_site: CreateSite = serde_json::from_str(data)?;
475       let res = Oper::new(user_operation, create_site).perform()?;
476       Ok(serde_json::to_string(&res)?)
477     },
478     UserOperation::EditSite => {
479       let edit_site: EditSite = serde_json::from_str(data)?;
480       let res = Oper::new(user_operation, edit_site).perform()?;
481       Ok(serde_json::to_string(&res)?)
482     },
483     UserOperation::GetSite => {
484       let get_site: GetSite = serde_json::from_str(data)?;
485       let res = Oper::new(user_operation, get_site).perform()?;
486       Ok(serde_json::to_string(&res)?)
487     },
488     UserOperation::Search => {
489       let search: Search = serde_json::from_str(data)?;
490       let res = Oper::new(user_operation, search).perform()?;
491       Ok(serde_json::to_string(&res)?)
492     },
493     UserOperation::TransferCommunity => {
494       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
495       let res = Oper::new(user_operation, transfer_community).perform()?;
496       Ok(serde_json::to_string(&res)?)
497     },
498     UserOperation::TransferSite => {
499       let transfer_site: TransferSite = serde_json::from_str(data)?;
500       let res = Oper::new(user_operation, transfer_site).perform()?;
501       Ok(serde_json::to_string(&res)?)
502     },
503   }
504 }