]> Untitled Git - lemmy.git/blob - server/src/websocket/server.rs
small fix
[lemmy.git] / server / src / websocket / server.rs
1 //! `ChatServer` is an actor. It maintains list of connection client session.
2 //! And manages available rooms. Peers send messages to other peers in same
3 //! room through `ChatServer`.
4
5 use actix::prelude::*;
6 use diesel::r2d2::{ConnectionManager, Pool};
7 use diesel::PgConnection;
8 use failure::Error;
9 use rand::{rngs::ThreadRng, Rng};
10 use serde::{Deserialize, Serialize};
11 use serde_json::Value;
12 use std::collections::{HashMap, HashSet};
13 use std::str::FromStr;
14 use std::time::SystemTime;
15
16 use crate::api::comment::*;
17 use crate::api::community::*;
18 use crate::api::post::*;
19 use crate::api::site::*;
20 use crate::api::user::*;
21 use crate::api::*;
22 use crate::websocket::UserOperation;
23 use crate::Settings;
24
25 /// Chat server sends this messages to session
26 #[derive(Message)]
27 #[rtype(result = "()")]
28 pub struct WSMessage(pub String);
29
30 /// Message for chat server communications
31
32 /// New chat session is created
33 #[derive(Message)]
34 #[rtype(usize)]
35 pub struct Connect {
36   pub addr: Recipient<WSMessage>,
37   pub ip: String,
38 }
39
40 /// Session is disconnected
41 #[derive(Message)]
42 #[rtype(result = "()")]
43 pub struct Disconnect {
44   pub id: usize,
45   pub ip: String,
46 }
47
48 // TODO this is unused rn
49 /// Send message to specific room
50 #[derive(Message)]
51 #[rtype(result = "()")]
52 pub struct ClientMessage {
53   /// Id of the client session
54   pub id: usize,
55   /// Peer message
56   pub msg: String,
57   /// Room name
58   pub room: String,
59 }
60
61 #[derive(Serialize, Deserialize, Message)]
62 #[rtype(String)]
63 pub struct StandardMessage {
64   /// Id of the client session
65   pub id: usize,
66   /// Peer message
67   pub msg: String,
68 }
69
70 #[derive(Debug)]
71 pub struct RateLimitBucket {
72   last_checked: SystemTime,
73   allowance: f64,
74 }
75
76 pub struct SessionInfo {
77   pub addr: Recipient<WSMessage>,
78   pub ip: String,
79 }
80
81 /// `ChatServer` manages chat rooms and responsible for coordinating chat
82 /// session. implementation is super primitive
83 pub struct ChatServer {
84   sessions: HashMap<usize, SessionInfo>, // A map from generated random ID to session addr
85   rate_limits: HashMap<String, RateLimitBucket>,
86   rooms: HashMap<i32, HashSet<usize>>, // A map from room / post name to set of connectionIDs
87   rng: ThreadRng,
88   db: Pool<ConnectionManager<PgConnection>>,
89 }
90
91 impl ChatServer {
92   pub fn startup(db: Pool<ConnectionManager<PgConnection>>) -> ChatServer {
93     // default room
94     let rooms = HashMap::new();
95
96     ChatServer {
97       sessions: HashMap::new(),
98       rate_limits: HashMap::new(),
99       rooms,
100       rng: rand::thread_rng(),
101       db,
102     }
103   }
104
105   /// Send message to all users in the room
106   fn send_room_message(&self, room: i32, message: &str, skip_id: usize) {
107     if let Some(sessions) = self.rooms.get(&room) {
108       for id in sessions {
109         if *id != skip_id {
110           if let Some(info) = self.sessions.get(id) {
111             let _ = info.addr.do_send(WSMessage(message.to_owned()));
112           }
113         }
114       }
115     }
116   }
117
118   fn join_room(&mut self, room_id: i32, id: usize) {
119     // remove session from all rooms
120     for sessions in self.rooms.values_mut() {
121       sessions.remove(&id);
122     }
123
124     // If the room doesn't exist yet
125     if self.rooms.get_mut(&room_id).is_none() {
126       self.rooms.insert(room_id, HashSet::new());
127     }
128
129     self.rooms.get_mut(&room_id).unwrap().insert(id);
130   }
131
132   fn send_community_message(
133     &self,
134     community_id: i32,
135     message: &str,
136     skip_id: usize,
137   ) -> Result<(), Error> {
138     use crate::db::post_view::*;
139     use crate::db::*;
140
141     let conn = self.db.get()?;
142
143     let posts = PostQueryBuilder::create(&conn)
144       .listing_type(ListingType::Community)
145       .sort(&SortType::New)
146       .for_community_id(community_id)
147       .limit(9999)
148       .list()?;
149
150     for post in posts {
151       self.send_room_message(post.id, message, skip_id);
152     }
153
154     Ok(())
155   }
156
157   fn check_rate_limit_register(&mut self, id: usize) -> Result<(), Error> {
158     self.check_rate_limit_full(
159       id,
160       Settings::get().rate_limit.register,
161       Settings::get().rate_limit.register_per_second,
162     )
163   }
164
165   fn check_rate_limit_post(&mut self, id: usize) -> Result<(), Error> {
166     self.check_rate_limit_full(
167       id,
168       Settings::get().rate_limit.post,
169       Settings::get().rate_limit.post_per_second,
170     )
171   }
172
173   fn check_rate_limit_message(&mut self, id: usize) -> Result<(), Error> {
174     self.check_rate_limit_full(
175       id,
176       Settings::get().rate_limit.message,
177       Settings::get().rate_limit.message_per_second,
178     )
179   }
180
181   #[allow(clippy::float_cmp)]
182   fn check_rate_limit_full(&mut self, id: usize, rate: i32, per: i32) -> Result<(), Error> {
183     if let Some(info) = self.sessions.get(&id) {
184       if let Some(rate_limit) = self.rate_limits.get_mut(&info.ip) {
185         // The initial value
186         if rate_limit.allowance == -2f64 {
187           rate_limit.allowance = rate as f64;
188         };
189
190         let current = SystemTime::now();
191         let time_passed = current.duration_since(rate_limit.last_checked)?.as_secs() as f64;
192         rate_limit.last_checked = current;
193         rate_limit.allowance += time_passed * (rate as f64 / per as f64);
194         if rate_limit.allowance > rate as f64 {
195           rate_limit.allowance = rate as f64;
196         }
197
198         if rate_limit.allowance < 1.0 {
199           println!(
200             "Rate limited IP: {}, time_passed: {}, allowance: {}",
201             &info.ip, time_passed, rate_limit.allowance
202           );
203           Err(
204             APIError {
205               message: format!("Too many requests. {} per {} seconds", rate, per),
206             }
207             .into(),
208           )
209         } else {
210           rate_limit.allowance -= 1.0;
211           Ok(())
212         }
213       } else {
214         Ok(())
215       }
216     } else {
217       Ok(())
218     }
219   }
220 }
221
222 /// Make actor from `ChatServer`
223 impl Actor for ChatServer {
224   /// We are going to use simple Context, we just need ability to communicate
225   /// with other actors.
226   type Context = Context<Self>;
227 }
228
229 /// Handler for Connect message.
230 ///
231 /// Register new session and assign unique id to this session
232 impl Handler<Connect> for ChatServer {
233   type Result = usize;
234
235   fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
236     // notify all users in same room
237     // self.send_room_message(&"Main".to_owned(), "Someone joined", 0);
238
239     // register session with random id
240     let id = self.rng.gen::<usize>();
241     println!("{} joined", &msg.ip);
242
243     self.sessions.insert(
244       id,
245       SessionInfo {
246         addr: msg.addr,
247         ip: msg.ip.to_owned(),
248       },
249     );
250
251     if self.rate_limits.get(&msg.ip).is_none() {
252       self.rate_limits.insert(
253         msg.ip,
254         RateLimitBucket {
255           last_checked: SystemTime::now(),
256           allowance: -2f64,
257         },
258       );
259     }
260
261     id
262   }
263 }
264
265 /// Handler for Disconnect message.
266 impl Handler<Disconnect> for ChatServer {
267   type Result = ();
268
269   fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
270     // let mut rooms: Vec<i32> = Vec::new();
271
272     // remove address
273     if self.sessions.remove(&msg.id).is_some() {
274       // remove session from all rooms
275       for sessions in self.rooms.values_mut() {
276         if sessions.remove(&msg.id) {
277           // rooms.push(*id);
278         }
279       }
280     }
281   }
282 }
283
284 /// Handler for Message message.
285 impl Handler<StandardMessage> for ChatServer {
286   type Result = MessageResult<StandardMessage>;
287
288   fn handle(&mut self, msg: StandardMessage, _: &mut Context<Self>) -> Self::Result {
289     let msg_out = match parse_json_message(self, msg) {
290       Ok(m) => m,
291       Err(e) => e.to_string(),
292     };
293
294     MessageResult(msg_out)
295   }
296 }
297
298 fn to_json_string<T>(op: &UserOperation, data: T) -> Result<String, Error>
299 where
300   T: Serialize,
301 {
302   let mut json = serde_json::to_value(&data)?;
303   match json.as_object_mut() {
304     Some(j) => j.insert("op".to_string(), serde_json::to_value(op.to_string())?),
305     None => return Err(format_err!("")),
306   };
307   // TODO: it seems like this is never called?
308   let x = serde_json::to_string(&json)?;
309   Ok(x)
310 }
311
312 fn parse_json_message(chat: &mut ChatServer, msg: StandardMessage) -> Result<String, Error> {
313   let json: Value = serde_json::from_str(&msg.msg)?;
314   let data = &json["data"].to_string();
315   let op = &json["op"].as_str().ok_or(APIError {
316     message: "Unknown op type".to_string(),
317   })?;
318
319   let conn = chat.db.get()?;
320
321   let user_operation: UserOperation = UserOperation::from_str(&op)?;
322
323   match user_operation {
324     UserOperation::Login => {
325       let login: Login = serde_json::from_str(data)?;
326       let res = Oper::new(login).perform(&conn)?;
327       to_json_string(&user_operation, &res)
328     }
329     UserOperation::Register => {
330       let register: Register = serde_json::from_str(data)?;
331       let res = Oper::new(register).perform(&conn);
332       if res.is_ok() {
333         chat.check_rate_limit_register(msg.id)?;
334       }
335       to_json_string(&user_operation, &res?)
336     }
337     UserOperation::GetUserDetails => {
338       let get_user_details: GetUserDetails = serde_json::from_str(data)?;
339       let res = Oper::new(get_user_details).perform(&conn)?;
340       to_json_string(&user_operation, &res)
341     }
342     UserOperation::SaveUserSettings => {
343       let save_user_settings: SaveUserSettings = serde_json::from_str(data)?;
344       let res = Oper::new(save_user_settings).perform(&conn)?;
345       to_json_string(&user_operation, &res)
346     }
347     UserOperation::AddAdmin => {
348       let add_admin: AddAdmin = serde_json::from_str(data)?;
349       let res = Oper::new(add_admin).perform(&conn)?;
350       to_json_string(&user_operation, &res)
351     }
352     UserOperation::BanUser => {
353       let ban_user: BanUser = serde_json::from_str(data)?;
354       let res = Oper::new(ban_user).perform(&conn)?;
355       to_json_string(&user_operation, &res)
356     }
357     UserOperation::GetReplies => {
358       let get_replies: GetReplies = serde_json::from_str(data)?;
359       let res = Oper::new(get_replies).perform(&conn)?;
360       to_json_string(&user_operation, &res)
361     }
362     UserOperation::GetUserMentions => {
363       let get_user_mentions: GetUserMentions = serde_json::from_str(data)?;
364       let res = Oper::new(get_user_mentions).perform(&conn)?;
365       to_json_string(&user_operation, &res)
366     }
367     UserOperation::EditUserMention => {
368       let edit_user_mention: EditUserMention = serde_json::from_str(data)?;
369       let res = Oper::new(edit_user_mention).perform(&conn)?;
370       to_json_string(&user_operation, &res)
371     }
372     UserOperation::MarkAllAsRead => {
373       let mark_all_as_read: MarkAllAsRead = serde_json::from_str(data)?;
374       let res = Oper::new(mark_all_as_read).perform(&conn)?;
375       to_json_string(&user_operation, &res)
376     }
377     UserOperation::GetCommunity => {
378       let get_community: GetCommunity = serde_json::from_str(data)?;
379       let res = Oper::new(get_community).perform(&conn)?;
380       to_json_string(&user_operation, &res)
381     }
382     UserOperation::ListCommunities => {
383       let list_communities: ListCommunities = serde_json::from_str(data)?;
384       let res = Oper::new(list_communities).perform(&conn)?;
385       to_json_string(&user_operation, &res)
386     }
387     UserOperation::CreateCommunity => {
388       chat.check_rate_limit_register(msg.id)?;
389       let create_community: CreateCommunity = serde_json::from_str(data)?;
390       let res = Oper::new(create_community).perform(&conn)?;
391       to_json_string(&user_operation, &res)
392     }
393     UserOperation::EditCommunity => {
394       let edit_community: EditCommunity = serde_json::from_str(data)?;
395       let res = Oper::new(edit_community).perform(&conn)?;
396       let mut community_sent: CommunityResponse = res.clone();
397       community_sent.community.user_id = None;
398       community_sent.community.subscribed = None;
399       let community_sent_str = to_json_string(&user_operation, &community_sent)?;
400       chat.send_community_message(community_sent.community.id, &community_sent_str, msg.id)?;
401       to_json_string(&user_operation, &res)
402     }
403     UserOperation::FollowCommunity => {
404       let follow_community: FollowCommunity = serde_json::from_str(data)?;
405       let res = Oper::new(follow_community).perform(&conn)?;
406       to_json_string(&user_operation, &res)
407     }
408     UserOperation::GetFollowedCommunities => {
409       let followed_communities: GetFollowedCommunities = serde_json::from_str(data)?;
410       let res = Oper::new(followed_communities).perform(&conn)?;
411       to_json_string(&user_operation, &res)
412     }
413     UserOperation::BanFromCommunity => {
414       let ban_from_community: BanFromCommunity = serde_json::from_str(data)?;
415       let community_id = ban_from_community.community_id;
416       let res = Oper::new(ban_from_community).perform(&conn)?;
417       let res_str = to_json_string(&user_operation, &res)?;
418       chat.send_community_message(community_id, &res_str, msg.id)?;
419       Ok(res_str)
420     }
421     UserOperation::AddModToCommunity => {
422       let mod_add_to_community: AddModToCommunity = serde_json::from_str(data)?;
423       let community_id = mod_add_to_community.community_id;
424       let res = Oper::new(mod_add_to_community).perform(&conn)?;
425       let res_str = to_json_string(&user_operation, &res)?;
426       chat.send_community_message(community_id, &res_str, msg.id)?;
427       Ok(res_str)
428     }
429     UserOperation::ListCategories => {
430       let list_categories: ListCategories = ListCategories;
431       let res = Oper::new(list_categories).perform(&conn)?;
432       to_json_string(&user_operation, &res)
433     }
434     UserOperation::CreatePost => {
435       chat.check_rate_limit_post(msg.id)?;
436       let create_post: CreatePost = serde_json::from_str(data)?;
437       let res = Oper::new(create_post).perform(&conn)?;
438       to_json_string(&user_operation, &res)
439     }
440     UserOperation::GetPost => {
441       let get_post: GetPost = serde_json::from_str(data)?;
442       chat.join_room(get_post.id, msg.id);
443       let res = Oper::new(get_post).perform(&conn)?;
444       to_json_string(&user_operation, &res)
445     }
446     UserOperation::GetPosts => {
447       let get_posts: GetPosts = serde_json::from_str(data)?;
448       let res = Oper::new(get_posts).perform(&conn)?;
449       to_json_string(&user_operation, &res)
450     }
451     UserOperation::CreatePostLike => {
452       chat.check_rate_limit_message(msg.id)?;
453       let create_post_like: CreatePostLike = serde_json::from_str(data)?;
454       let res = Oper::new(create_post_like).perform(&conn)?;
455       to_json_string(&user_operation, &res)
456     }
457     UserOperation::EditPost => {
458       let edit_post: EditPost = serde_json::from_str(data)?;
459       let res = Oper::new(edit_post).perform(&conn)?;
460       let mut post_sent = res.clone();
461       post_sent.post.my_vote = None;
462       let post_sent_str = to_json_string(&user_operation, &post_sent)?;
463       chat.send_room_message(post_sent.post.id, &post_sent_str, msg.id);
464       to_json_string(&user_operation, &res)
465     }
466     UserOperation::SavePost => {
467       let save_post: SavePost = serde_json::from_str(data)?;
468       let res = Oper::new(save_post).perform(&conn)?;
469       to_json_string(&user_operation, &res)
470     }
471     UserOperation::CreateComment => {
472       chat.check_rate_limit_message(msg.id)?;
473       let create_comment: CreateComment = serde_json::from_str(data)?;
474       let post_id = create_comment.post_id;
475       let res = Oper::new(create_comment).perform(&conn)?;
476       let mut comment_sent = res.clone();
477       comment_sent.comment.my_vote = None;
478       comment_sent.comment.user_id = None;
479       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
480       chat.send_room_message(post_id, &comment_sent_str, msg.id);
481       to_json_string(&user_operation, &res)
482     }
483     UserOperation::EditComment => {
484       let edit_comment: EditComment = serde_json::from_str(data)?;
485       let post_id = edit_comment.post_id;
486       let res = Oper::new(edit_comment).perform(&conn)?;
487       let mut comment_sent = res.clone();
488       comment_sent.comment.my_vote = None;
489       comment_sent.comment.user_id = None;
490       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
491       chat.send_room_message(post_id, &comment_sent_str, msg.id);
492       to_json_string(&user_operation, &res)
493     }
494     UserOperation::SaveComment => {
495       let save_comment: SaveComment = serde_json::from_str(data)?;
496       let res = Oper::new(save_comment).perform(&conn)?;
497       to_json_string(&user_operation, &res)
498     }
499     UserOperation::CreateCommentLike => {
500       chat.check_rate_limit_message(msg.id)?;
501       let create_comment_like: CreateCommentLike = serde_json::from_str(data)?;
502       let post_id = create_comment_like.post_id;
503       let res = Oper::new(create_comment_like).perform(&conn)?;
504       let mut comment_sent = res.clone();
505       comment_sent.comment.my_vote = None;
506       comment_sent.comment.user_id = None;
507       let comment_sent_str = to_json_string(&user_operation, &comment_sent)?;
508       chat.send_room_message(post_id, &comment_sent_str, msg.id);
509       to_json_string(&user_operation, &res)
510     }
511     UserOperation::GetModlog => {
512       let get_modlog: GetModlog = serde_json::from_str(data)?;
513       let res = Oper::new(get_modlog).perform(&conn)?;
514       to_json_string(&user_operation, &res)
515     }
516     UserOperation::CreateSite => {
517       let create_site: CreateSite = serde_json::from_str(data)?;
518       let res = Oper::new(create_site).perform(&conn)?;
519       to_json_string(&user_operation, &res)
520     }
521     UserOperation::EditSite => {
522       let edit_site: EditSite = serde_json::from_str(data)?;
523       let res = Oper::new(edit_site).perform(&conn)?;
524       to_json_string(&user_operation, &res)
525     }
526     UserOperation::GetSite => {
527       let online: usize = chat.sessions.len();
528       let get_site: GetSite = serde_json::from_str(data)?;
529       let mut res = Oper::new(get_site).perform(&conn)?;
530       res.online = online;
531       to_json_string(&user_operation, &res)
532     }
533     UserOperation::Search => {
534       let search: Search = serde_json::from_str(data)?;
535       let res = Oper::new(search).perform(&conn)?;
536       to_json_string(&user_operation, &res)
537     }
538     UserOperation::TransferCommunity => {
539       let transfer_community: TransferCommunity = serde_json::from_str(data)?;
540       let res = Oper::new(transfer_community).perform(&conn)?;
541       to_json_string(&user_operation, &res)
542     }
543     UserOperation::TransferSite => {
544       let transfer_site: TransferSite = serde_json::from_str(data)?;
545       let res = Oper::new(transfer_site).perform(&conn)?;
546       to_json_string(&user_operation, &res)
547     }
548     UserOperation::DeleteAccount => {
549       let delete_account: DeleteAccount = serde_json::from_str(data)?;
550       let res = Oper::new(delete_account).perform(&conn)?;
551       to_json_string(&user_operation, &res)
552     }
553     UserOperation::PasswordReset => {
554       let password_reset: PasswordReset = serde_json::from_str(data)?;
555       let res = Oper::new(password_reset).perform(&conn)?;
556       to_json_string(&user_operation, &res)
557     }
558     UserOperation::PasswordChange => {
559       let password_change: PasswordChange = serde_json::from_str(data)?;
560       let res = Oper::new(password_change).perform(&conn)?;
561       to_json_string(&user_operation, &res)
562     }
563   }
564 }