]> Untitled Git - lemmy.git/blob - server/src/main.rs
Adding emoji support
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use] extern crate diesel_migrations;
3
4 use std::time::{Instant, Duration};
5 use std::env;
6 use lemmy_server::actix::*;
7 use lemmy_server::actix_web::server::HttpServer;
8 use lemmy_server::actix_web::{ws, App, Error, HttpRequest, HttpResponse, fs::NamedFile, fs};
9 use lemmy_server::websocket::server::*;
10 use lemmy_server::db::establish_connection;
11
12 embed_migrations!();
13
14 /// How often heartbeat pings are sent
15 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
16 /// How long before lack of client response causes a timeout
17 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
18
19 /// This is our websocket route state, this state is shared with all route
20 /// instances via `HttpContext::state()`
21 struct WsChatSessionState {
22   addr: Addr<ChatServer>,
23 }
24
25 /// Entry point for our route
26 fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
27   ws::start(
28     req,
29     WSSession {
30       id: 0,
31       hb: Instant::now(),
32       ip: req.connection_info()
33         .remote()
34         .unwrap_or("127.0.0.1:12345")
35         .split(":")
36         .next()
37         .unwrap_or("127.0.0.1")
38         .to_string()
39     },
40     )
41 }
42
43 struct WSSession {
44   /// unique session id
45   id: usize,
46   ip: String,
47   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
48   /// otherwise we drop connection.
49   hb: Instant
50 }
51
52 impl Actor for WSSession {
53   type Context = ws::WebsocketContext<Self, WsChatSessionState>;
54
55   /// Method is called on actor start.
56   /// We register ws session with ChatServer
57   fn started(&mut self, ctx: &mut Self::Context) {
58     // we'll start heartbeat process on session start.
59     self.hb(ctx);
60
61     // register self in chat server. `AsyncContext::wait` register
62     // future within context, but context waits until this future resolves
63     // before processing any other events.
64     // HttpContext::state() is instance of WsChatSessionState, state is shared
65     // across all routes within application
66     let addr = ctx.address();
67     ctx.state()
68       .addr
69       .send(Connect {
70         addr: addr.recipient(),
71         ip: self.ip.to_owned(),
72       })
73     .into_actor(self)
74       .then(|res, act, ctx| {
75         match res {
76           Ok(res) => act.id = res,
77           // something is wrong with chat server
78           _ => ctx.stop(),
79         }
80         fut::ok(())
81       })
82     .wait(ctx);
83   }
84
85   fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
86     // notify chat server
87     ctx.state().addr.do_send(Disconnect { 
88       id: self.id,
89       ip: self.ip.to_owned(),
90     });
91     Running::Stop
92   }
93 }
94
95 /// Handle messages from chat server, we simply send it to peer websocket
96 /// These are room messages, IE sent to others in the room
97 impl Handler<WSMessage> for WSSession {
98   type Result = ();
99
100   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
101     // println!("id: {} msg: {}", self.id, msg.0);
102     ctx.text(msg.0);
103   }
104 }
105
106 /// WebSocket message handler
107 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
108   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
109     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
110     match msg {
111       ws::Message::Ping(msg) => {
112         self.hb = Instant::now();
113         ctx.pong(&msg);
114       }
115       ws::Message::Pong(_) => {
116         self.hb = Instant::now();
117       }
118       ws::Message::Text(text) => {
119         let m = text.trim().to_owned();
120         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
121         
122         ctx.state()
123           .addr
124           .send(StandardMessage {
125             id: self.id,
126             msg: m,
127           })
128         .into_actor(self)
129           .then(|res, _, ctx| {
130             match res {
131               Ok(res) => ctx.text(res),
132               Err(e) => {
133                 eprintln!("{}", &e);
134               }
135             }
136             fut::ok(())
137           })
138         .wait(ctx);
139       }
140       ws::Message::Binary(_bin) => println!("Unexpected binary"),
141       ws::Message::Close(_) => {
142         ctx.stop();
143       },
144     }
145   }
146 }
147
148 impl WSSession {
149   /// helper method that sends ping to client every second.
150   ///
151   /// also this method checks heartbeats from client
152   fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) {
153     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
154       // check client heartbeats
155       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
156         // heartbeat timed out
157         println!("Websocket Client heartbeat failed, disconnecting!");
158
159         // notify chat server
160         ctx.state()
161           .addr
162           .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
163
164         // stop actor
165         ctx.stop();
166
167         // don't try to send a ping
168         return;
169       }
170
171       ctx.ping("");
172     });
173   }
174 }
175
176 fn main() {
177   let _ = env_logger::init();
178   let sys = actix::System::new("lemmy");
179
180   // Run the migrations from code
181   let conn = establish_connection();
182   embedded_migrations::run(&conn).unwrap();
183
184   // Start chat server actor in separate thread
185   let server = Arbiter::start(|_| ChatServer::default());
186
187   // Create Http server with websocket support
188   HttpServer::new(move || {
189     // Websocket sessions state
190     let state = WsChatSessionState {
191       addr: server.clone(),
192     };
193
194     App::with_state(state)
195       // .resource("/api/v1/rest", |r| r.method(http::Method::POST).f(|_| {})
196       .resource("/api/v1/ws", |r| r.route().f(chat_route))
197       // static resources
198       .resource("/", |r| r.route().f(index))
199       .handler(
200         "/static",
201         fs::StaticFiles::new(front_end_dir()).unwrap()
202       )
203       .finish()
204   }).bind("0.0.0.0:8536")
205   .unwrap()
206     .start();
207
208   println!("Started http server: 0.0.0.0:8536");
209   let _ = sys.run();
210 }
211
212 fn index(_req: &HttpRequest<WsChatSessionState>) -> Result<NamedFile, actix_web::error::Error> {
213   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
214 }
215
216 fn front_end_dir() -> String {
217   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
218 }