]> Untitled Git - lemmy.git/blob - crates/websocket/src/routes.rs
e5551b4180d445e15393f17630a2da3035d97c68
[lemmy.git] / crates / websocket / src / routes.rs
1 use crate::{
2   chat_server::ChatServer,
3   messages::{Connect, Disconnect, StandardMessage, WsMessage},
4   LemmyContext,
5 };
6 use actix::prelude::*;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_utils::{utils::get_ip, ConnectionId, IpAddr};
10 use std::time::{Duration, Instant};
11 use tracing::{debug, error, info};
12
13 /// How often heartbeat pings are sent
14 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
15 /// How long before lack of client response causes a timeout
16 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
17
18 /// Entry point for our route
19 pub async fn chat_route(
20   req: HttpRequest,
21   stream: web::Payload,
22   context: web::Data<LemmyContext>,
23 ) -> Result<HttpResponse, Error> {
24   ws::start(
25     WsSession {
26       cs_addr: context.chat_server().to_owned(),
27       id: 0,
28       hb: Instant::now(),
29       ip: get_ip(&req.connection_info()),
30     },
31     &req,
32     stream,
33   )
34 }
35
36 struct WsSession {
37   cs_addr: Addr<ChatServer>,
38   /// unique session id
39   id: ConnectionId,
40   ip: IpAddr,
41   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
42   /// otherwise we drop connection.
43   hb: Instant,
44 }
45
46 impl Actor for WsSession {
47   type Context = ws::WebsocketContext<Self>;
48
49   /// Method is called on actor start.
50   /// We register ws session with ChatServer
51   fn started(&mut self, ctx: &mut Self::Context) {
52     // we'll start heartbeat process on session start.
53     self.hb(ctx);
54
55     // register self in chat server. `AsyncContext::wait` register
56     // future within context, but context waits until this future resolves
57     // before processing any other events.
58     // across all routes within application
59     let addr = ctx.address();
60     self
61       .cs_addr
62       .send(Connect {
63         addr: addr.recipient(),
64         ip: self.ip.to_owned(),
65       })
66       .into_actor(self)
67       .then(|res, act, ctx| {
68         match res {
69           Ok(res) => act.id = res,
70           // something is wrong with chat server
71           _ => ctx.stop(),
72         }
73         actix::fut::ready(())
74       })
75       .wait(ctx);
76   }
77
78   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
79     // notify chat server
80     self.cs_addr.do_send(Disconnect {
81       id: self.id,
82       ip: self.ip.to_owned(),
83     });
84     Running::Stop
85   }
86 }
87
88 /// Handle messages from chat server, we simply send it to peer websocket
89 /// These are room messages, IE sent to others in the room
90 impl Handler<WsMessage> for WsSession {
91   type Result = ();
92
93   fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
94     ctx.text(msg.0);
95   }
96 }
97
98 /// WebSocket message handler
99 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
100   fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
101     let message = match result {
102       Ok(m) => m,
103       Err(e) => {
104         error!("{}", e);
105         return;
106       }
107     };
108     match message {
109       ws::Message::Ping(msg) => {
110         self.hb = Instant::now();
111         ctx.pong(&msg);
112       }
113       ws::Message::Pong(_) => {
114         self.hb = Instant::now();
115       }
116       ws::Message::Text(text) => {
117         let m = text.trim().to_owned();
118         info!("Message received: {:?} from id: {}", &m, self.id);
119
120         self
121           .cs_addr
122           .send(StandardMessage {
123             id: self.id,
124             msg: m,
125           })
126           .into_actor(self)
127           .then(|res, _, ctx| {
128             match res {
129               Ok(Ok(res)) => ctx.text(res),
130               Ok(Err(_)) => {}
131               Err(e) => error!("{}", &e),
132             }
133             actix::fut::ready(())
134           })
135           .spawn(ctx);
136       }
137       ws::Message::Binary(_bin) => info!("Unexpected binary"),
138       ws::Message::Close(_) => {
139         ctx.stop();
140       }
141       _ => {}
142     }
143   }
144 }
145
146 impl WsSession {
147   /// helper method that sends ping to client every second.
148   ///
149   /// also this method checks heartbeats from client
150   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
151     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
152       // check client heartbeats
153       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
154         // heartbeat timed out
155         debug!("Websocket Client heartbeat failed, disconnecting!");
156
157         // notify chat server
158         act.cs_addr.do_send(Disconnect {
159           id: act.id,
160           ip: act.ip.to_owned(),
161         });
162
163         // stop actor
164         ctx.stop();
165
166         // don't try to send a ping
167         return;
168       }
169
170       ctx.ping(b"");
171     });
172   }
173 }