2 use crate::websocket::server::*;
3 use actix_web::{Error, Result};
5 pub fn config(cfg: &mut web::ServiceConfig) {
6 cfg.service(web::resource("/api/v1/ws").to(chat_route));
9 /// How often heartbeat pings are sent
10 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
11 /// How long before lack of client response causes a timeout
12 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
14 /// Entry point for our route
18 chat_server: web::Data<Addr<ChatServer>>,
19 ) -> Result<HttpResponse, Error> {
22 cs_addr: chat_server.get_ref().to_owned(),
33 cs_addr: Addr<ChatServer>,
37 /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
38 /// otherwise we drop connection.
40 // db: Pool<ConnectionManager<PgConnection>>,
43 impl Actor for WSSession {
44 type Context = ws::WebsocketContext<Self>;
46 /// Method is called on actor start.
47 /// We register ws session with ChatServer
48 fn started(&mut self, ctx: &mut Self::Context) {
49 // we'll start heartbeat process on session start.
52 // register self in chat server. `AsyncContext::wait` register
53 // future within context, but context waits until this future resolves
54 // before processing any other events.
55 // across all routes within application
56 let addr = ctx.address();
60 addr: addr.recipient(),
61 ip: self.ip.to_owned(),
64 .then(|res, act, ctx| {
66 Ok(res) => act.id = res,
67 // something is wrong with chat server
75 fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
77 self.cs_addr.do_send(Disconnect {
79 ip: self.ip.to_owned(),
85 /// Handle messages from chat server, we simply send it to peer websocket
86 /// These are room messages, IE sent to others in the room
87 impl Handler<WSMessage> for WSSession {
90 fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
95 /// WebSocket message handler
96 impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WSSession {
97 fn handle(&mut self, result: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
98 let message = match result {
106 ws::Message::Ping(msg) => {
107 self.hb = Instant::now();
110 ws::Message::Pong(_) => {
111 self.hb = Instant::now();
113 ws::Message::Text(text) => {
114 let m = text.trim().to_owned();
115 info!("Message received: {:?} from id: {}", &m, self.id);
119 .send(StandardMessage {
124 .then(|res, _, ctx| {
126 Ok(Ok(res)) => ctx.text(res),
127 Ok(Err(e)) => error!("{}", e),
128 Err(e) => error!("{}", &e),
130 actix::fut::ready(())
134 ws::Message::Binary(_bin) => info!("Unexpected binary"),
135 ws::Message::Close(_) => {
144 /// helper method that sends ping to client every second.
146 /// also this method checks heartbeats from client
147 fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
148 ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
149 // check client heartbeats
150 if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
151 // heartbeat timed out
152 error!("Websocket Client heartbeat failed, disconnecting!");
154 // notify chat server
155 act.cs_addr.do_send(Disconnect {
157 ip: act.ip.to_owned(),
163 // don't try to send a ping