]> Untitled Git - lemmy.git/blob - src/routes/websocket.rs
aaa7ef37aead80dd841467b11baba14bc80e06a6
[lemmy.git] / src / routes / websocket.rs
1 use crate::{websocket::chat_server::ChatServer, LemmyContext};
2 use actix::prelude::*;
3 use actix_web::*;
4 use actix_web_actors::ws;
5 use lemmy_structs::websocket::{Connect, Disconnect, StandardMessage, WSMessage};
6 use lemmy_utils::utils::get_ip;
7 use log::{debug, error, info};
8 use std::time::{Duration, Instant};
9
10 /// How often heartbeat pings are sent
11 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
12 /// How long before lack of client response causes a timeout
13 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
14
15 /// Entry point for our route
16 pub async fn chat_route(
17   req: HttpRequest,
18   stream: web::Payload,
19   context: web::Data<LemmyContext>,
20 ) -> Result<HttpResponse, Error> {
21   ws::start(
22     WSSession {
23       cs_addr: context.chat_server().to_owned(),
24       id: 0,
25       hb: Instant::now(),
26       ip: get_ip(&req.connection_info()),
27     },
28     &req,
29     stream,
30   )
31 }
32
33 struct WSSession {
34   cs_addr: Addr<ChatServer>,
35   /// unique session id
36   id: usize,
37   ip: String,
38   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
39   /// otherwise we drop connection.
40   hb: Instant,
41 }
42
43 impl Actor for WSSession {
44   type Context = ws::WebsocketContext<Self>;
45
46   /// Method is called on actor start.
47   /// We register ws session with ChatServer
48   fn started(&mut self, ctx: &mut Self::Context) {
49     // we'll start heartbeat process on session start.
50     self.hb(ctx);
51
52     // register self in chat server. `AsyncContext::wait` register
53     // future within context, but context waits until this future resolves
54     // before processing any other events.
55     // across all routes within application
56     let addr = ctx.address();
57     self
58       .cs_addr
59       .send(Connect {
60         addr: addr.recipient(),
61         ip: self.ip.to_owned(),
62       })
63       .into_actor(self)
64       .then(|res, act, ctx| {
65         match res {
66           Ok(res) => act.id = res,
67           // something is wrong with chat server
68           _ => ctx.stop(),
69         }
70         actix::fut::ready(())
71       })
72       .wait(ctx);
73   }
74
75   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
76     // notify chat server
77     self.cs_addr.do_send(Disconnect {
78       id: self.id,
79       ip: self.ip.to_owned(),
80     });
81     Running::Stop
82   }
83 }
84
85 /// Handle messages from chat server, we simply send it to peer websocket
86 /// These are room messages, IE sent to others in the room
87 impl Handler<WSMessage> for WSSession {
88   type Result = ();
89
90   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
91     ctx.text(msg.0);
92   }
93 }
94
95 /// WebSocket message handler
96 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
97   fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
98     let message = match result {
99       Ok(m) => m,
100       Err(e) => {
101         error!("{}", e);
102         return;
103       }
104     };
105     match message {
106       ws::Message::Ping(msg) => {
107         self.hb = Instant::now();
108         ctx.pong(&msg);
109       }
110       ws::Message::Pong(_) => {
111         self.hb = Instant::now();
112       }
113       ws::Message::Text(text) => {
114         let m = text.trim().to_owned();
115         info!("Message received: {:?} from id: {}", &m, self.id);
116
117         self
118           .cs_addr
119           .send(StandardMessage {
120             id: self.id,
121             msg: m,
122           })
123           .into_actor(self)
124           .then(|res, _, ctx| {
125             match res {
126               Ok(Ok(res)) => ctx.text(res),
127               Ok(Err(_)) => {}
128               Err(e) => error!("{}", &e),
129             }
130             actix::fut::ready(())
131           })
132           .spawn(ctx);
133       }
134       ws::Message::Binary(_bin) => info!("Unexpected binary"),
135       ws::Message::Close(_) => {
136         ctx.stop();
137       }
138       _ => {}
139     }
140   }
141 }
142
143 impl WSSession {
144   /// helper method that sends ping to client every second.
145   ///
146   /// also this method checks heartbeats from client
147   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
148     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
149       // check client heartbeats
150       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
151         // heartbeat timed out
152         debug!("Websocket Client heartbeat failed, disconnecting!");
153
154         // notify chat server
155         act.cs_addr.do_send(Disconnect {
156           id: act.id,
157           ip: act.ip.to_owned(),
158         });
159
160         // stop actor
161         ctx.stop();
162
163         // don't try to send a ping
164         return;
165       }
166
167       ctx.ping(b"");
168     });
169   }
170 }