]> Untitled Git - lemmy.git/blob - crates/websocket/src/routes.rs
Check user accepted before sending jwt in password reset (fixes #2591) (#2597)
[lemmy.git] / crates / websocket / src / routes.rs
1 use crate::{
2   chat_server::ChatServer,
3   messages::{Connect, Disconnect, StandardMessage, WsMessage},
4   LemmyContext,
5 };
6 use actix::prelude::*;
7 use actix_web::{web, Error, HttpRequest, HttpResponse};
8 use actix_web_actors::ws;
9 use lemmy_utils::{rate_limit::RateLimitCell, utils::get_ip, ConnectionId, IpAddr};
10 use std::time::{Duration, Instant};
11 use tracing::{debug, error, info};
12
13 /// How often heartbeat pings are sent
14 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
15 /// How long before lack of client response causes a timeout
16 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
17
18 /// Entry point for our route
19 pub async fn chat_route(
20   req: HttpRequest,
21   stream: web::Payload,
22   context: web::Data<LemmyContext>,
23   rate_limiter: web::Data<RateLimitCell>,
24 ) -> Result<HttpResponse, Error> {
25   ws::start(
26     WsSession {
27       cs_addr: context.chat_server().clone(),
28       id: 0,
29       hb: Instant::now(),
30       ip: get_ip(&req.connection_info()),
31       rate_limiter: rate_limiter.as_ref().clone(),
32     },
33     &req,
34     stream,
35   )
36 }
37
38 struct WsSession {
39   cs_addr: Addr<ChatServer>,
40   /// unique session id
41   id: ConnectionId,
42   ip: IpAddr,
43   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
44   /// otherwise we drop connection.
45   hb: Instant,
46   /// A rate limiter for websocket joins
47   rate_limiter: RateLimitCell,
48 }
49
50 impl Actor for WsSession {
51   type Context = ws::WebsocketContext<Self>;
52
53   /// Method is called on actor start.
54   /// We register ws session with ChatServer
55   fn started(&mut self, ctx: &mut Self::Context) {
56     // we'll start heartbeat process on session start.
57     WsSession::hb(ctx);
58
59     // register self in chat server. `AsyncContext::wait` register
60     // future within context, but context waits until this future resolves
61     // before processing any other events.
62     // across all routes within application
63     let addr = ctx.address();
64
65     if !self.rate_limit_check(ctx) {
66       return;
67     }
68
69     self
70       .cs_addr
71       .send(Connect {
72         addr: addr.recipient(),
73         ip: self.ip.clone(),
74       })
75       .into_actor(self)
76       .then(|res, act, ctx| {
77         match res {
78           Ok(res) => act.id = res,
79           // something is wrong with chat server
80           _ => ctx.stop(),
81         }
82         actix::fut::ready(())
83       })
84       .wait(ctx);
85   }
86
87   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
88     // notify chat server
89     self.cs_addr.do_send(Disconnect {
90       id: self.id,
91       ip: self.ip.clone(),
92     });
93     Running::Stop
94   }
95 }
96
97 /// Handle messages from chat server, we simply send it to peer websocket
98 /// These are room messages, IE sent to others in the room
99 impl Handler<WsMessage> for WsSession {
100   type Result = ();
101
102   fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
103     ctx.text(msg.0);
104   }
105 }
106
107 /// WebSocket message handler
108 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
109   fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
110     if !self.rate_limit_check(ctx) {
111       return;
112     }
113
114     let message = match result {
115       Ok(m) => m,
116       Err(e) => {
117         error!("{}", e);
118         return;
119       }
120     };
121     match message {
122       ws::Message::Ping(msg) => {
123         self.hb = Instant::now();
124         ctx.pong(&msg);
125       }
126       ws::Message::Pong(_) => {
127         self.hb = Instant::now();
128       }
129       ws::Message::Text(text) => {
130         let m = text.trim().to_owned();
131
132         self
133           .cs_addr
134           .send(StandardMessage {
135             id: self.id,
136             msg: m,
137           })
138           .into_actor(self)
139           .then(|res, _, ctx| {
140             match res {
141               Ok(Ok(res)) => ctx.text(res),
142               Ok(Err(_)) => {}
143               Err(e) => error!("{}", &e),
144             }
145             actix::fut::ready(())
146           })
147           .spawn(ctx);
148       }
149       ws::Message::Binary(_bin) => info!("Unexpected binary"),
150       ws::Message::Close(_) => {
151         ctx.stop();
152       }
153       _ => {}
154     }
155   }
156 }
157
158 impl WsSession {
159   /// helper method that sends ping to client every second.
160   ///
161   /// also this method checks heartbeats from client
162   fn hb(ctx: &mut ws::WebsocketContext<Self>) {
163     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
164       // check client heartbeats
165       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
166         // heartbeat timed out
167         debug!("Websocket Client heartbeat failed, disconnecting!");
168
169         // notify chat server
170         act.cs_addr.do_send(Disconnect {
171           id: act.id,
172           ip: act.ip.clone(),
173         });
174
175         // stop actor
176         ctx.stop();
177
178         // don't try to send a ping
179         return;
180       }
181
182       ctx.ping(b"");
183     });
184   }
185
186   /// Check the rate limit, and stop the ctx if it fails
187   fn rate_limit_check(&mut self, ctx: &mut ws::WebsocketContext<Self>) -> bool {
188     let check = self.rate_limiter.message().check(self.ip.clone());
189     if !check {
190       debug!("Websocket join with IP: {} has been rate limited.", self.ip);
191       ctx.stop()
192     }
193     check
194   }
195 }