]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
Trying to add r2d2 connection pooling to websockets.
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
21 use crate::api::*;
22 use crate::Settings;
23
24 /// Chat server sends this messages to session
25 #[derive(Message)]
26 #[rtype(result = "()")]
27 pub struct WSMessage(pub String);
28
29 /// Message for chat server communications
30
31 /// New chat session is created
32 #[derive(Message)]
33 #[rtype(usize)]
34 pub struct Connect {
35   pub addr: Recipient<WSMessage>,
36   pub ip: String,
37 }
38
39 /// Session is disconnected
40 #[derive(Message)]
41 #[rtype(result = "()")]
42 pub struct Disconnect {
43   pub id: usize,
44   pub ip: String,
45 }
46
47 // TODO this is unused rn
48 /// Send message to specific room
49 #[derive(Message)]
50 #[rtype(result = "()")]
51 pub struct ClientMessage {
52   /// Id of the client session
53   pub id: usize,
54   /// Peer message
55   pub msg: String,
56   /// Room name
57   pub room: String,
58 }
59
60 #[derive(Serialize, Deserialize, Message)]
61 #[rtype(String)]
62 pub struct StandardMessage {
63   /// Id of the client session
64   pub id: usize,
65   /// Peer message
66   pub msg: String,
67 }
68
69 #[derive(Debug)]
70 pub struct RateLimitBucket {
71   last_checked: SystemTime,
72   allowance: f64,
73 }
74
75 pub struct SessionInfo {
76   pub addr: Recipient<WSMessage>,
77   pub ip: String,
78 }
79
80 /// `ChatServer` manages chat rooms and responsible for coordinating chat
81 /// session. implementation is super primitive
82 pub struct ChatServer {
83   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
84   rate_limits: HashMap<String, RateLimitBucket>,
85   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
86   rng: ThreadRng,
87   db: Pool<ConnectionManager<PgConnection>>,
88 }
89
90 // impl Default for ChatServer {
91 //   fn default(nah: String) -> ChatServer {
92 //     // default room
93 //     let rooms = HashMap::new();
94
95 //     ChatServer {
96 //       sessions: HashMap::new(),
97 //       rate_limits: HashMap::new(),
98 //       rooms,
99 //       rng: rand::thread_rng(),
100 //       nah: nah,
101 //     }
102 //   }
103 // }
104
105 impl ChatServer {
106   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
107     // default room
108     let rooms = HashMap::new();
109
110     ChatServer {
111       sessions: HashMap::new(),
112       rate_limits: HashMap::new(),
113       rooms,
114       rng: rand::thread_rng(),
115       db,
116     }
117   }
118
119   /// Send message to all users in the room
120   fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
121     if let Some(sessions) = self.rooms.get(&room) {
122       for id in sessions {
123         if *id != skip_id {
124           if let Some(info) = self.sessions.get(id) {
125             let _ = info.addr.do_send(WSMessage(message.to_owned()));
126           }
127         }
128       }
129     }
130   }
131
132   fn join_room(&mut self, room_id: i32, id: usize) {
133     // remove session from all rooms
134     for sessions in self.rooms.values_mut() {
135       sessions.remove(&id);
136     }
137
138     // If the room doesn't exist yet
139     if self.rooms.get_mut(&room_id).is_none() {
140       self.rooms.insert(room_id, HashSet::new());
141     }
142
143     self.rooms.get_mut(&room_id).unwrap().insert(id);
144   }
145
146   fn send_community_message(
147     &self,
148     community_id: i32,
149     message: &str,
150     skip_id: usize,
151   ) -> Result<(), Error> {
152     use crate::db::post_view::*;
153     use crate::db::*;
154
155     let conn = self.db.get()?;
156
157     let posts = PostQueryBuilder::create(&conn)
158       .listing_type(ListingType::Community)
159       .sort(&SortType::New)
160       .for_community_id(community_id)
161       .limit(9999)
162       .list()?;
163
164     for post in posts {
165       self.send_room_message(post.id, message, skip_id);
166     }
167
168     Ok(())
169   }
170
171   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
172     self.check_rate_limit_full(
173       id,
174       Settings::get().rate_limit.register,
175       Settings::get().rate_limit.register_per_second,
176     )
177   }
178
179   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
180     self.check_rate_limit_full(
181       id,
182       Settings::get().rate_limit.post,
183       Settings::get().rate_limit.post_per_second,
184     )
185   }
186
187   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
188     self.check_rate_limit_full(
189       id,
190       Settings::get().rate_limit.message,
191       Settings::get().rate_limit.message_per_second,
192     )
193   }
194
195   #[allow(clippy::float_cmp)]
196   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
197     if let Some(info) = self.sessions.get(&id) {
198       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
199         // The initial value
200         if rate_limit.allowance == -2f64 {
201           rate_limit.allowance = rate as f64;
202         };
203
204         let current = SystemTime::now();
205         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
206         rate_limit.last_checked = current;
207         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
208         if rate_limit.allowance > rate as f64 {
209           rate_limit.allowance = rate as f64;
210         }
211
212         if rate_limit.allowance < 1.0 {
213           println!(
214             "Rate limited IP: {}, time_passed: {}, allowance: {}",
215             &info.ip, time_passed, rate_limit.allowance
216           );
217           Err(
218             APIError {
219               op: "Rate Limit".to_string(),
220               message: format!("Too many requests. {} per {} seconds", rate, per),
221             }
222             .into(),
223           )
224         } else {
225           rate_limit.allowance -= 1.0;
226           Ok(())
227         }
228       } else {
229         Ok(())
230       }
231     } else {
232       Ok(())
233     }
234   }
235 }
236
237 /// Make actor from `ChatServer`
238 impl Actor for ChatServer {
239   /// We are going to use simple Context, we just need ability to communicate
240   /// with other actors.
241   type Context = Context<Self>;
242 }
243
244 /// Handler for Connect message.
245 ///
246 /// Register new session and assign unique id to this session
247 impl Handler<Connect> for ChatServer {
248   type Result = usize;
249
250   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
251     // notify all users in same room
252     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
253
254     // register session with random id
255     let id = self.rng.gen::<usize>();
256     println!("{} joined", &msg.ip);
257
258     self.sessions.insert(
259       id,
260       SessionInfo {
261         addr: msg.addr,
262         ip: msg.ip.to_owned(),
263       },
264     );
265
266     if self.rate_limits.get(&msg.ip).is_none() {
267       self.rate_limits.insert(
268         msg.ip,
269         RateLimitBucket {
270           last_checked: SystemTime::now(),
271           allowance: -2f64,
272         },
273       );
274     }
275
276     id
277   }
278 }
279
280 /// Handler for Disconnect message.
281 impl Handler<Disconnect> for ChatServer {
282   type Result = ();
283
284   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
285     // let mut rooms: Vec<i32> = Vec::new();
286
287     // remove address
288     if self.sessions.remove(&msg.id).is_some() {
289       // remove session from all rooms
290       for sessions in self.rooms.values_mut() {
291         if sessions.remove(&msg.id) {
292           // rooms.push(*id);
293         }
294       }
295     }
296   }
297 }
298
299 /// Handler for Message message.
300 impl Handler<StandardMessage> for ChatServer {
301   type Result = MessageResult<StandardMessage>;
302
303   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
304     let msg_out = match parse_json_message(self, msg) {
305       Ok(m) => m,
306       Err(e) => e.to_string(),
307     };
308
309     MessageResult(msg_out)
310   }
311 }
312
313 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
314   let json: Value = serde_json::from_str(&msg.msg)?;
315   let data = &json["data"].to_string();
316   let op = &json["op"].as_str().ok_or(APIError {
317     op: "Unknown op type".to_string(),
318     message: "Unknown op type".to_string(),
319   })?;
320
321   let conn = chat.db.get()?;
322
323   let user_operation: UserOperation = UserOperation::from_str(&op)?;
324
325   match user_operation {
326     UserOperation::Login => {
327       let login: Login = serde_json::from_str(data)?;
328       let res = Oper::new(user_operation, login).perform(&conn)?;
329       Ok(serde_json::to_string(&res)?)
330     }
331     UserOperation::Register => {
332       let register: Register = serde_json::from_str(data)?;
333       let res = Oper::new(user_operation, register).perform(&conn);
334       if res.is_ok() {
335         chat.check_rate_limit_register(msg.id)?;
336       }
337       Ok(serde_json::to_string(&res?)?)
338     }
339     UserOperation::GetUserDetails => {
340       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
341       let res = Oper::new(user_operation, get_user_details).perform(&conn)?;
342       Ok(serde_json::to_string(&res)?)
343     }
344     UserOperation::SaveUserSettings => {
345       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
346       let res = Oper::new(user_operation, save_user_settings).perform(&conn)?;
347       Ok(serde_json::to_string(&res)?)
348     }
349     UserOperation::AddAdmin => {
350       let add_admin: AddAdmin = serde_json::from_str(data)?;
351       let res = Oper::new(user_operation, add_admin).perform(&conn)?;
352       Ok(serde_json::to_string(&res)?)
353     }
354     UserOperation::BanUser => {
355       let ban_user: BanUser = serde_json::from_str(data)?;
356       let res = Oper::new(user_operation, ban_user).perform(&conn)?;
357       Ok(serde_json::to_string(&res)?)
358     }
359     UserOperation::GetReplies => {
360       let get_replies: GetReplies = serde_json::from_str(data)?;
361       let res = Oper::new(user_operation, get_replies).perform(&conn)?;
362       Ok(serde_json::to_string(&res)?)
363     }
364     UserOperation::GetUserMentions => {
365       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
366       let res = Oper::new(user_operation, get_user_mentions).perform(&conn)?;
367       Ok(serde_json::to_string(&res)?)
368     }
369     UserOperation::EditUserMention => {
370       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
371       let res = Oper::new(user_operation, edit_user_mention).perform(&conn)?;
372       Ok(serde_json::to_string(&res)?)
373     }
374     UserOperation::MarkAllAsRead => {
375       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
376       let res = Oper::new(user_operation, mark_all_as_read).perform(&conn)?;
377       Ok(serde_json::to_string(&res)?)
378     }
379     UserOperation::GetCommunity => {
380       let get_community: GetCommunity = serde_json::from_str(data)?;
381       let res = Oper::new(user_operation, get_community).perform(&conn)?;
382       Ok(serde_json::to_string(&res)?)
383     }
384     UserOperation::ListCommunities => {
385       let list_communities: ListCommunities = serde_json::from_str(data)?;
386       let res = Oper::new(user_operation, list_communities).perform(&conn)?;
387       Ok(serde_json::to_string(&res)?)
388     }
389     UserOperation::CreateCommunity => {
390       chat.check_rate_limit_register(msg.id)?;
391       let create_community: CreateCommunity = serde_json::from_str(data)?;
392       let res = Oper::new(user_operation, create_community).perform(&conn)?;
393       Ok(serde_json::to_string(&res)?)
394     }
395     UserOperation::EditCommunity => {
396       let edit_community: EditCommunity = serde_json::from_str(data)?;
397       let res = Oper::new(user_operation, edit_community).perform(&conn)?;
398       let mut community_sent: CommunityResponse = res.clone();
399       community_sent.community.user_id = None;
400       community_sent.community.subscribed = None;
401       let community_sent_str = serde_json::to_string(&community_sent)?;
402       chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
403       Ok(serde_json::to_string(&res)?)
404     }
405     UserOperation::FollowCommunity => {
406       let follow_community: FollowCommunity = serde_json::from_str(data)?;
407       let res = Oper::new(user_operation, follow_community).perform(&conn)?;
408       Ok(serde_json::to_string(&res)?)
409     }
410     UserOperation::GetFollowedCommunities => {
411       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
412       let res = Oper::new(user_operation, followed_communities).perform(&conn)?;
413       Ok(serde_json::to_string(&res)?)
414     }
415     UserOperation::BanFromCommunity => {
416       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
417       let community_id = ban_from_community.community_id;
418       let res = Oper::new(user_operation, ban_from_community).perform(&conn)?;
419       let res_str = serde_json::to_string(&res)?;
420       chat.send_community_message(community_id, &res_str, msg.id)?;
421       Ok(res_str)
422     }
423     UserOperation::AddModToCommunity => {
424       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
425       let community_id = mod_add_to_community.community_id;
426       let res = Oper::new(user_operation, mod_add_to_community).perform(&conn)?;
427       let res_str = serde_json::to_string(&res)?;
428       chat.send_community_message(community_id, &res_str, msg.id)?;
429       Ok(res_str)
430     }
431     UserOperation::ListCategories => {
432       let list_categories: ListCategories = ListCategories;
433       let res = Oper::new(user_operation, list_categories).perform(&conn)?;
434       Ok(serde_json::to_string(&res)?)
435     }
436     UserOperation::CreatePost => {
437       chat.check_rate_limit_post(msg.id)?;
438       let create_post: CreatePost = serde_json::from_str(data)?;
439       let res = Oper::new(user_operation, create_post).perform(&conn)?;
440       Ok(serde_json::to_string(&res)?)
441     }
442     UserOperation::GetPost => {
443       let get_post: GetPost = serde_json::from_str(data)?;
444       chat.join_room(get_post.id, msg.id);
445       let res = Oper::new(user_operation, get_post).perform(&conn)?;
446       Ok(serde_json::to_string(&res)?)
447     }
448     UserOperation::GetPosts => {
449       let get_posts: GetPosts = serde_json::from_str(data)?;
450       let res = Oper::new(user_operation, get_posts).perform(&conn)?;
451       Ok(serde_json::to_string(&res)?)
452     }
453     UserOperation::CreatePostLike => {
454       chat.check_rate_limit_message(msg.id)?;
455       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
456       let res = Oper::new(user_operation, create_post_like).perform(&conn)?;
457       Ok(serde_json::to_string(&res)?)
458     }
459     UserOperation::EditPost => {
460       let edit_post: EditPost = serde_json::from_str(data)?;
461       let res = Oper::new(user_operation, edit_post).perform(&conn)?;
462       let mut post_sent = res.clone();
463       post_sent.post.my_vote = None;
464       let post_sent_str = serde_json::to_string(&post_sent)?;
465       chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
466       Ok(serde_json::to_string(&res)?)
467     }
468     UserOperation::SavePost => {
469       let save_post: SavePost = serde_json::from_str(data)?;
470       let res = Oper::new(user_operation, save_post).perform(&conn)?;
471       Ok(serde_json::to_string(&res)?)
472     }
473     UserOperation::CreateComment => {
474       chat.check_rate_limit_message(msg.id)?;
475       let create_comment: CreateComment = serde_json::from_str(data)?;
476       let post_id = create_comment.post_id;
477       let res = Oper::new(user_operation, create_comment).perform(&conn)?;
478       let mut comment_sent = res.clone();
479       comment_sent.comment.my_vote = None;
480       comment_sent.comment.user_id = None;
481       let comment_sent_str = serde_json::to_string(&comment_sent)?;
482       chat.send_room_message(post_id, &comment_sent_str, msg.id);
483       Ok(serde_json::to_string(&res)?)
484     }
485     UserOperation::EditComment => {
486       let edit_comment: EditComment = serde_json::from_str(data)?;
487       let post_id = edit_comment.post_id;
488       let res = Oper::new(user_operation, edit_comment).perform(&conn)?;
489       let mut comment_sent = res.clone();
490       comment_sent.comment.my_vote = None;
491       comment_sent.comment.user_id = None;
492       let comment_sent_str = serde_json::to_string(&comment_sent)?;
493       chat.send_room_message(post_id, &comment_sent_str, msg.id);
494       Ok(serde_json::to_string(&res)?)
495     }
496     UserOperation::SaveComment => {
497       let save_comment: SaveComment = serde_json::from_str(data)?;
498       let res = Oper::new(user_operation, save_comment).perform(&conn)?;
499       Ok(serde_json::to_string(&res)?)
500     }
501     UserOperation::CreateCommentLike => {
502       chat.check_rate_limit_message(msg.id)?;
503       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
504       let post_id = create_comment_like.post_id;
505       let res = Oper::new(user_operation, create_comment_like).perform(&conn)?;
506       let mut comment_sent = res.clone();
507       comment_sent.comment.my_vote = None;
508       comment_sent.comment.user_id = None;
509       let comment_sent_str = serde_json::to_string(&comment_sent)?;
510       chat.send_room_message(post_id, &comment_sent_str, msg.id);
511       Ok(serde_json::to_string(&res)?)
512     }
513     UserOperation::GetModlog => {
514       let get_modlog: GetModlog = serde_json::from_str(data)?;
515       let res = Oper::new(user_operation, get_modlog).perform(&conn)?;
516       Ok(serde_json::to_string(&res)?)
517     }
518     UserOperation::CreateSite => {
519       let create_site: CreateSite = serde_json::from_str(data)?;
520       let res = Oper::new(user_operation, create_site).perform(&conn)?;
521       Ok(serde_json::to_string(&res)?)
522     }
523     UserOperation::EditSite => {
524       let edit_site: EditSite = serde_json::from_str(data)?;
525       let res = Oper::new(user_operation, edit_site).perform(&conn)?;
526       Ok(serde_json::to_string(&res)?)
527     }
528     UserOperation::GetSite => {
529       let online: usize = chat.sessions.len();
530       let get_site: GetSite = serde_json::from_str(data)?;
531       let mut res = Oper::new(user_operation, get_site).perform(&conn)?;
532       res.online = online;
533       Ok(serde_json::to_string(&res)?)
534     }
535     UserOperation::Search => {
536       let search: Search = serde_json::from_str(data)?;
537       let res = Oper::new(user_operation, search).perform(&conn)?;
538       Ok(serde_json::to_string(&res)?)
539     }
540     UserOperation::TransferCommunity => {
541       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
542       let res = Oper::new(user_operation, transfer_community).perform(&conn)?;
543       Ok(serde_json::to_string(&res)?)
544     }
545     UserOperation::TransferSite => {
546       let transfer_site: TransferSite = serde_json::from_str(data)?;
547       let res = Oper::new(user_operation, transfer_site).perform(&conn)?;
548       Ok(serde_json::to_string(&res)?)
549     }
550     UserOperation::DeleteAccount => {
551       let delete_account: DeleteAccount = serde_json::from_str(data)?;
552       let res = Oper::new(user_operation, delete_account).perform(&conn)?;
553       Ok(serde_json::to_string(&res)?)
554     }
555     UserOperation::PasswordReset => {
556       let password_reset: PasswordReset = serde_json::from_str(data)?;
557       let res = Oper::new(user_operation, password_reset).perform(&conn)?;
558       Ok(serde_json::to_string(&res)?)
559     }
560     UserOperation::PasswordChange => {
561       let password_change: PasswordChange = serde_json::from_str(data)?;
562       let res = Oper::new(user_operation, password_change).perform(&conn)?;
563       Ok(serde_json::to_string(&res)?)
564     }
565   }
566 }