]> Untitled Git - lemmy.git/blob - crates/websocket/src/routes.rs
Fix API and clippy warnings
[lemmy.git] / crates / websocket / src / routes.rs
1 use crate::{
2   chat_server::ChatServer,
3   messages::{Connect, Disconnect, StandardMessage, WsMessage},
4   LemmyContext,
5 };
6 use actix::prelude::*;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_utils::{utils::get_ip, ConnectionId, IpAddr};
10 use log::{debug, error, info};
11 use std::time::{Duration, Instant};
12
13 /// How often heartbeat pings are sent
14 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
15 /// How long before lack of client response causes a timeout
16 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
17
18 pub fn config(cfg: &mut web::ServiceConfig) {
19   cfg.service(web::resource("/ws").to(chat_route));
20 }
21
22 /// Entry point for our route
23 async fn chat_route(
24   req: HttpRequest,
25   stream: web::Payload,
26   context: web::Data<LemmyContext>,
27 ) -> Result<HttpResponse, Error> {
28   ws::start(
29     WsSession {
30       cs_addr: context.chat_server().to_owned(),
31       id: 0,
32       hb: Instant::now(),
33       ip: get_ip(&req.connection_info()),
34     },
35     &req,
36     stream,
37   )
38 }
39
40 struct WsSession {
41   cs_addr: Addr<ChatServer>,
42   /// unique session id
43   id: ConnectionId,
44   ip: IpAddr,
45   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
46   /// otherwise we drop connection.
47   hb: Instant,
48 }
49
50 impl Actor for WsSession {
51   type Context = ws::WebsocketContext<Self>;
52
53   /// Method is called on actor start.
54   /// We register ws session with ChatServer
55   fn started(&mut self, ctx: &mut Self::Context) {
56     // we'll start heartbeat process on session start.
57     self.hb(ctx);
58
59     // register self in chat server. `AsyncContext::wait` register
60     // future within context, but context waits until this future resolves
61     // before processing any other events.
62     // across all routes within application
63     let addr = ctx.address();
64     self
65       .cs_addr
66       .send(Connect {
67         addr: addr.recipient(),
68         ip: self.ip.to_owned(),
69       })
70       .into_actor(self)
71       .then(|res, act, ctx| {
72         match res {
73           Ok(res) => act.id = res,
74           // something is wrong with chat server
75           _ => ctx.stop(),
76         }
77         actix::fut::ready(())
78       })
79       .wait(ctx);
80   }
81
82   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
83     // notify chat server
84     self.cs_addr.do_send(Disconnect {
85       id: self.id,
86       ip: self.ip.to_owned(),
87     });
88     Running::Stop
89   }
90 }
91
92 /// Handle messages from chat server, we simply send it to peer websocket
93 /// These are room messages, IE sent to others in the room
94 impl Handler<WsMessage> for WsSession {
95   type Result = ();
96
97   fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) {
98     ctx.text(msg.0);
99   }
100 }
101
102 /// WebSocket message handler
103 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
104   fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
105     let message = match result {
106       Ok(m) => m,
107       Err(e) => {
108         error!("{}", e);
109         return;
110       }
111     };
112     match message {
113       ws::Message::Ping(msg) => {
114         self.hb = Instant::now();
115         ctx.pong(&msg);
116       }
117       ws::Message::Pong(_) => {
118         self.hb = Instant::now();
119       }
120       ws::Message::Text(text) => {
121         let m = text.trim().to_owned();
122         info!("Message received: {:?} from id: {}", &m, self.id);
123
124         self
125           .cs_addr
126           .send(StandardMessage {
127             id: self.id,
128             msg: m,
129           })
130           .into_actor(self)
131           .then(|res, _, ctx| {
132             match res {
133               Ok(Ok(res)) => ctx.text(res),
134               Ok(Err(_)) => {}
135               Err(e) => error!("{}", &e),
136             }
137             actix::fut::ready(())
138           })
139           .spawn(ctx);
140       }
141       ws::Message::Binary(_bin) => info!("Unexpected binary"),
142       ws::Message::Close(_) => {
143         ctx.stop();
144       }
145       _ => {}
146     }
147   }
148 }
149
150 impl WsSession {
151   /// helper method that sends ping to client every second.
152   ///
153   /// also this method checks heartbeats from client
154   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
155     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
156       // check client heartbeats
157       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
158         // heartbeat timed out
159         debug!("Websocket Client heartbeat failed, disconnecting!");
160
161         // notify chat server
162         act.cs_addr.do_send(Disconnect {
163           id: act.id,
164           ip: act.ip.to_owned(),
165         });
166
167         // stop actor
168         ctx.stop();
169
170         // don't try to send a ping
171         return;
172       }
173
174       ctx.ping(b"");
175     });
176   }
177 }