3 chat_server::ChatServer,
4 messages::{Connect, Disconnect, StandardMessage, WSMessage},
10 use actix_web_actors::ws;
11 use lemmy_utils::utils::get_ip;
12 use log::{debug, error, info};
13 use std::time::{Duration, Instant};
15 /// How often heartbeat pings are sent
16 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
17 /// How long before lack of client response causes a timeout
18 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
20 /// Entry point for our route
21 pub async fn chat_route(
24 context: web::Data<LemmyContext>,
25 ) -> Result<HttpResponse, Error> {
28 cs_addr: context.chat_server().to_owned(),
31 ip: get_ip(&req.connection_info()),
39 cs_addr: Addr<ChatServer>,
43 /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
44 /// otherwise we drop connection.
48 impl Actor for WSSession {
49 type Context = ws::WebsocketContext<Self>;
51 /// Method is called on actor start.
52 /// We register ws session with ChatServer
53 fn started(&mut self, ctx: &mut Self::Context) {
54 // we'll start heartbeat process on session start.
57 // register self in chat server. `AsyncContext::wait` register
58 // future within context, but context waits until this future resolves
59 // before processing any other events.
60 // across all routes within application
61 let addr = ctx.address();
65 addr: addr.recipient(),
66 ip: self.ip.to_owned(),
69 .then(|res, act, ctx| {
71 Ok(res) => act.id = res,
72 // something is wrong with chat server
80 fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
82 self.cs_addr.do_send(Disconnect {
84 ip: self.ip.to_owned(),
90 /// Handle messages from chat server, we simply send it to peer websocket
91 /// These are room messages, IE sent to others in the room
92 impl Handler<WSMessage> for WSSession {
95 fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
100 /// WebSocket message handler
101 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
102 fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
103 let message = match result {
111 ws::Message::Ping(msg) => {
112 self.hb = Instant::now();
115 ws::Message::Pong(_) => {
116 self.hb = Instant::now();
118 ws::Message::Text(text) => {
119 let m = text.trim().to_owned();
120 info!("Message received: {:?} from id: {}", &m, self.id);
124 .send(StandardMessage {
129 .then(|res, _, ctx| {
131 Ok(Ok(res)) => ctx.text(res),
133 Err(e) => error!("{}", &e),
135 actix::fut::ready(())
139 ws::Message::Binary(_bin) => info!("Unexpected binary"),
140 ws::Message::Close(_) => {
149 /// helper method that sends ping to client every second.
151 /// also this method checks heartbeats from client
152 fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
153 ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
154 // check client heartbeats
155 if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
156 // heartbeat timed out
157 debug!("Websocket Client heartbeat failed, disconnecting!");
159 // notify chat server
160 act.cs_addr.do_send(Disconnect {
162 ip: act.ip.to_owned(),
168 // don't try to send a ping