1 extern crate lemmy_server;
2 #[macro_use] extern crate diesel_migrations;
4 use std::time::{Instant, Duration};
6 use lemmy_server::actix::*;
7 use lemmy_server::actix_web::server::HttpServer;
8 use lemmy_server::actix_web::{ws, App, Error, HttpRequest, HttpResponse, fs::NamedFile, fs};
9 use lemmy_server::websocket::server::*;
10 use lemmy_server::db::establish_connection;
14 /// How often heartbeat pings are sent
15 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
16 /// How long before lack of client response causes a timeout
17 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
19 /// This is our websocket route state, this state is shared with all route
20 /// instances via `HttpContext::state()`
21 struct WsChatSessionState {
22 addr: Addr<ChatServer>,
25 /// Entry point for our route
26 fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
32 ip: req.connection_info()
34 .unwrap_or("127.0.0.1:12345")
37 .unwrap_or("127.0.0.1")
47 /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
48 /// otherwise we drop connection.
52 impl Actor for WSSession {
53 type Context = ws::WebsocketContext<Self, WsChatSessionState>;
55 /// Method is called on actor start.
56 /// We register ws session with ChatServer
57 fn started(&mut self, ctx: &mut Self::Context) {
58 // we'll start heartbeat process on session start.
61 // register self in chat server. `AsyncContext::wait` register
62 // future within context, but context waits until this future resolves
63 // before processing any other events.
64 // HttpContext::state() is instance of WsChatSessionState, state is shared
65 // across all routes within application
66 let addr = ctx.address();
70 addr: addr.recipient(),
71 ip: self.ip.to_owned(),
74 .then(|res, act, ctx| {
76 Ok(res) => act.id = res,
77 // something is wrong with chat server
85 fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
87 ctx.state().addr.do_send(Disconnect {
89 ip: self.ip.to_owned(),
95 /// Handle messages from chat server, we simply send it to peer websocket
96 /// These are room messages, IE sent to others in the room
97 impl Handler<WSMessage> for WSSession {
100 fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
101 // println!("id: {} msg: {}", self.id, msg.0);
106 /// WebSocket message handler
107 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
108 fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
109 // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
111 ws::Message::Ping(msg) => {
112 self.hb = Instant::now();
115 ws::Message::Pong(_) => {
116 self.hb = Instant::now();
118 ws::Message::Text(text) => {
119 let m = text.trim().to_owned();
120 println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
124 .send(StandardMessage {
129 .then(|res, _, ctx| {
131 Ok(res) => ctx.text(res),
140 ws::Message::Binary(_bin) => println!("Unexpected binary"),
141 ws::Message::Close(_) => {
149 /// helper method that sends ping to client every second.
151 /// also this method checks heartbeats from client
152 fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) {
153 ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
154 // check client heartbeats
155 if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
156 // heartbeat timed out
157 println!("Websocket Client heartbeat failed, disconnecting!");
159 // notify chat server
162 .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
167 // don't try to send a ping
177 let _ = env_logger::init();
178 let sys = actix::System::new("lemmy");
180 // Run the migrations from code
181 let conn = establish_connection();
182 embedded_migrations::run(&conn).unwrap();
184 // Start chat server actor in separate thread
185 let server = Arbiter::start(|_| ChatServer::default());
187 // Create Http server with websocket support
188 HttpServer::new(move || {
189 // Websocket sessions state
190 let state = WsChatSessionState {
191 addr: server.clone(),
194 App::with_state(state)
195 // redirect to websocket.html
196 // .resource("/", |r| r.method(http::Method::GET).f(|_| {
197 // HttpResponse::Found()
198 // .header("LOCATION", "/static/websocket.html")
201 .resource("/service/ws", |r| r.route().f(chat_route))
203 .resource("/", |r| r.route().f(index))
206 fs::StaticFiles::new(front_end_dir()).unwrap()
209 }).bind("0.0.0.0:8536")
213 println!("Started http server: 0.0.0.0:8536");
217 fn index(_req: &HttpRequest<WsChatSessionState>) -> Result<NamedFile, actix_web::error::Error> {
218 Ok(NamedFile::open(front_end_dir() + "/index.html")?)
221 fn front_end_dir() -> String {
222 env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())