]> Untitled Git - lemmy.git/blob - src/routes/websocket.rs
Isomorphic docker (#1124)
[lemmy.git] / src / routes / websocket.rs
1 use crate::{
2   websocket::{
3     chat_server::ChatServer,
4     messages::{Connect, Disconnect, StandardMessage, WSMessage},
5   },
6   LemmyContext,
7 };
8 use actix::prelude::*;
9 use actix_web::*;
10 use actix_web_actors::ws;
11 use lemmy_utils::utils::get_ip;
12 use log::{debug, error, info};
13 use std::time::{Duration, Instant};
14
15 /// How often heartbeat pings are sent
16 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
17 /// How long before lack of client response causes a timeout
18 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
19
20 /// Entry point for our route
21 pub async fn chat_route(
22   req: HttpRequest,
23   stream: web::Payload,
24   context: web::Data<LemmyContext>,
25 ) -> Result<HttpResponse, Error> {
26   ws::start(
27     WSSession {
28       cs_addr: context.chat_server().to_owned(),
29       id: 0,
30       hb: Instant::now(),
31       ip: get_ip(&req.connection_info()),
32     },
33     &req,
34     stream,
35   )
36 }
37
38 struct WSSession {
39   cs_addr: Addr<ChatServer>,
40   /// unique session id
41   id: usize,
42   ip: String,
43   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
44   /// otherwise we drop connection.
45   hb: Instant,
46 }
47
48 impl Actor for WSSession {
49   type Context = ws::WebsocketContext<Self>;
50
51   /// Method is called on actor start.
52   /// We register ws session with ChatServer
53   fn started(&mut self, ctx: &mut Self::Context) {
54     // we'll start heartbeat process on session start.
55     self.hb(ctx);
56
57     // register self in chat server. `AsyncContext::wait` register
58     // future within context, but context waits until this future resolves
59     // before processing any other events.
60     // across all routes within application
61     let addr = ctx.address();
62     self
63       .cs_addr
64       .send(Connect {
65         addr: addr.recipient(),
66         ip: self.ip.to_owned(),
67       })
68       .into_actor(self)
69       .then(|res, act, ctx| {
70         match res {
71           Ok(res) => act.id = res,
72           // something is wrong with chat server
73           _ => ctx.stop(),
74         }
75         actix::fut::ready(())
76       })
77       .wait(ctx);
78   }
79
80   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
81     // notify chat server
82     self.cs_addr.do_send(Disconnect {
83       id: self.id,
84       ip: self.ip.to_owned(),
85     });
86     Running::Stop
87   }
88 }
89
90 /// Handle messages from chat server, we simply send it to peer websocket
91 /// These are room messages, IE sent to others in the room
92 impl Handler<WSMessage> for WSSession {
93   type Result = ();
94
95   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
96     ctx.text(msg.0);
97   }
98 }
99
100 /// WebSocket message handler
101 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
102   fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
103     let message = match result {
104       Ok(m) => m,
105       Err(e) => {
106         error!("{}", e);
107         return;
108       }
109     };
110     match message {
111       ws::Message::Ping(msg) => {
112         self.hb = Instant::now();
113         ctx.pong(&msg);
114       }
115       ws::Message::Pong(_) => {
116         self.hb = Instant::now();
117       }
118       ws::Message::Text(text) => {
119         let m = text.trim().to_owned();
120         info!("Message received: {:?} from id: {}", &m, self.id);
121
122         self
123           .cs_addr
124           .send(StandardMessage {
125             id: self.id,
126             msg: m,
127           })
128           .into_actor(self)
129           .then(|res, _, ctx| {
130             match res {
131               Ok(Ok(res)) => ctx.text(res),
132               Ok(Err(_)) => {}
133               Err(e) => error!("{}", &e),
134             }
135             actix::fut::ready(())
136           })
137           .spawn(ctx);
138       }
139       ws::Message::Binary(_bin) => info!("Unexpected binary"),
140       ws::Message::Close(_) => {
141         ctx.stop();
142       }
143       _ => {}
144     }
145   }
146 }
147
148 impl WSSession {
149   /// helper method that sends ping to client every second.
150   ///
151   /// also this method checks heartbeats from client
152   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
153     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
154       // check client heartbeats
155       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
156         // heartbeat timed out
157         debug!("Websocket Client heartbeat failed, disconnecting!");
158
159         // notify chat server
160         act.cs_addr.do_send(Disconnect {
161           id: act.id,
162           ip: act.ip.to_owned(),
163         });
164
165         // stop actor
166         ctx.stop();
167
168         // don't try to send a ping
169         return;
170       }
171
172       ctx.ping(b"");
173     });
174   }
175 }