1 extern crate lemmy_server;
2 #[macro_use] extern crate diesel_migrations;
4 use std::time::{Instant, Duration};
6 use lemmy_server::actix::*;
7 use lemmy_server::actix_web::server::HttpServer;
8 use lemmy_server::actix_web::{ws, App, Error, HttpRequest, HttpResponse, fs::NamedFile, fs};
9 use lemmy_server::websocket::server::*;
10 use lemmy_server::establish_connection;
14 /// How often heartbeat pings are sent
15 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
16 /// How long before lack of client response causes a timeout
17 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
19 /// This is our websocket route state, this state is shared with all route
20 /// instances via `HttpContext::state()`
21 struct WsChatSessionState {
22 addr: Addr<ChatServer>,
25 /// Entry point for our route
26 fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
32 ip: req.connection_info()
34 .unwrap_or("127.0.0.1:12345")
37 .unwrap_or("127.0.0.1")
47 /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
48 /// otherwise we drop connection.
52 impl Actor for WSSession {
53 type Context = ws::WebsocketContext<Self, WsChatSessionState>;
55 /// Method is called on actor start.
56 /// We register ws session with ChatServer
57 fn started(&mut self, ctx: &mut Self::Context) {
58 // we'll start heartbeat process on session start.
61 // register self in chat server. `AsyncContext::wait` register
62 // future within context, but context waits until this future resolves
63 // before processing any other events.
64 // HttpContext::state() is instance of WsChatSessionState, state is shared
65 // across all routes within application
66 let addr = ctx.address();
70 addr: addr.recipient(),
71 ip: self.ip.to_owned(),
74 .then(|res, act, ctx| {
76 Ok(res) => act.id = res,
77 // something is wrong with chat server
85 fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
87 ctx.state().addr.do_send(Disconnect {
89 ip: self.ip.to_owned(),
95 /// Handle messages from chat server, we simply send it to peer websocket
96 /// These are room messages, IE sent to others in the room
97 impl Handler<WSMessage> for WSSession {
100 fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
101 // println!("id: {} msg: {}", self.id, msg.0);
106 /// WebSocket message handler
107 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
108 fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
109 // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
111 ws::Message::Ping(msg) => {
112 self.hb = Instant::now();
115 ws::Message::Pong(_) => {
116 self.hb = Instant::now();
118 ws::Message::Text(text) => {
119 let m = text.trim().to_owned();
123 .send(StandardMessage {
128 .then(|res, _, ctx| {
130 Ok(res) => ctx.text(res),
136 // Ok(res) => ctx.text(res),
137 // // something is wrong with chat server
143 // we check for /sss type of messages
144 // if m.starts_with('/') {
145 // let v: Vec<&str> = m.splitn(2, ' ').collect();
148 // // Send ListRooms message to chat server and wait for
150 // println!("List rooms");
155 // .then(|res, _, ctx| {
158 // for room in rooms {
162 // _ => println!("Something is wrong"),
167 // .wait(ctx) pauses all events in context,
168 // so actor wont receive any new messages until it get list
173 // self.room = v[1].to_owned();
174 // ctx.state().addr.do_send(Join {
176 // name: self.room.clone(),
179 // ctx.text("joined");
181 // ctx.text("!!! room name is required");
186 // self.name = Some(v[1].to_owned());
188 // ctx.text("!!! name is required");
191 // _ => ctx.text(format!("!!! unknown command: {:?}", m)),
194 // let msg = if let Some(ref name) = self.name {
195 // format!("{}: {}", name, m)
199 // send message to chat server
200 // ctx.state().addr.do_send(ClientMessage {
203 // room: self.room.clone(),
207 ws::Message::Binary(_bin) => println!("Unexpected binary"),
208 ws::Message::Close(_) => {
216 /// helper method that sends ping to client every second.
218 /// also this method checks heartbeats from client
219 fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) {
220 ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
221 // check client heartbeats
222 if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
223 // heartbeat timed out
224 println!("Websocket Client heartbeat failed, disconnecting!");
226 // notify chat server
229 .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
234 // don't try to send a ping
244 let _ = env_logger::init();
245 let sys = actix::System::new("lemmy");
247 // Run the migrations from code
248 let conn = establish_connection();
249 embedded_migrations::run(&conn).unwrap();
251 // Start chat server actor in separate thread
252 let server = Arbiter::start(|_| ChatServer::default());
254 // Create Http server with websocket support
255 HttpServer::new(move || {
256 // Websocket sessions state
257 let state = WsChatSessionState {
258 addr: server.clone(),
261 App::with_state(state)
262 // redirect to websocket.html
263 // .resource("/", |r| r.method(http::Method::GET).f(|_| {
264 // HttpResponse::Found()
265 // .header("LOCATION", "/static/websocket.html")
268 .resource("/service/ws", |r| r.route().f(chat_route))
270 .resource("/", |r| r.route().f(index))
273 fs::StaticFiles::new(front_end_dir()).unwrap()
276 }).bind("0.0.0.0:8536")
280 println!("Started http server: 0.0.0.0:8536");
284 fn index(_req: &HttpRequest<WsChatSessionState>) -> Result<NamedFile, actix_web::error::Error> {
285 Ok(NamedFile::open(front_end_dir() + "/index.html")?)
288 fn front_end_dir() -> String {
289 env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())