]> Untitled Git - lemmy.git/blob - server/src/main.rs
Reorganizing files before splitting out API
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use] extern crate diesel_migrations;
3
4 use std::time::{Instant, Duration};
5 use std::env;
6 use lemmy_server::actix::*;
7 use lemmy_server::actix_web::server::HttpServer;
8 use lemmy_server::actix_web::{ws, App, Error, HttpRequest, HttpResponse, fs::NamedFile, fs};
9 use lemmy_server::websocket::server::*;
10 use lemmy_server::establish_connection;
11
12 embed_migrations!();
13
14 /// How often heartbeat pings are sent
15 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
16 /// How long before lack of client response causes a timeout
17 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
18
19 /// This is our websocket route state, this state is shared with all route
20 /// instances via `HttpContext::state()`
21 struct WsChatSessionState {
22   addr: Addr<ChatServer>,
23 }
24
25 /// Entry point for our route
26 fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
27   ws::start(
28     req,
29     WSSession {
30       id: 0,
31       hb: Instant::now(),
32       ip: req.connection_info().remote().unwrap_or("127.0.0.1:12345").split(":").next().unwrap_or("127.0.0.1").to_string()
33     },
34     )
35 }
36
37 struct WSSession {
38   /// unique session id
39   id: usize,
40   ip: String,
41   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
42   /// otherwise we drop connection.
43   hb: Instant
44 }
45
46 impl Actor for WSSession {
47   type Context = ws::WebsocketContext<Self, WsChatSessionState>;
48
49   /// Method is called on actor start.
50   /// We register ws session with ChatServer
51   fn started(&mut self, ctx: &mut Self::Context) {
52     // we'll start heartbeat process on session start.
53     self.hb(ctx);
54
55     // register self in chat server. `AsyncContext::wait` register
56     // future within context, but context waits until this future resolves
57     // before processing any other events.
58     // HttpContext::state() is instance of WsChatSessionState, state is shared
59     // across all routes within application
60     let addr = ctx.address();
61     ctx.state()
62       .addr
63       .send(Connect {
64         addr: addr.recipient(),
65         ip: self.ip.to_owned(),
66       })
67     .into_actor(self)
68       .then(|res, act, ctx| {
69         match res {
70           Ok(res) => act.id = res,
71           // something is wrong with chat server
72           _ => ctx.stop(),
73         }
74         fut::ok(())
75       })
76     .wait(ctx);
77   }
78
79   fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
80     // notify chat server
81     ctx.state().addr.do_send(Disconnect { 
82       id: self.id,
83       ip: self.ip.to_owned(),
84     });
85     Running::Stop
86   }
87 }
88
89 /// Handle messages from chat server, we simply send it to peer websocket
90 /// These are room messages, IE sent to others in the room
91 impl Handler<WSMessage> for WSSession {
92   type Result = ();
93
94   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
95     // println!("id: {} msg: {}", self.id, msg.0);
96     ctx.text(msg.0);
97   }
98 }
99
100 /// WebSocket message handler
101 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
102   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
103     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
104     match msg {
105       ws::Message::Ping(msg) => {
106         self.hb = Instant::now();
107         ctx.pong(&msg);
108       }
109       ws::Message::Pong(_) => {
110         self.hb = Instant::now();
111       }
112       ws::Message::Text(text) => {
113         let m = text.trim().to_owned();
114         
115         ctx.state()
116           .addr
117           .send(StandardMessage {
118             id: self.id,
119             msg: m,
120           })
121         .into_actor(self)
122           .then(|res, _, ctx| {
123             match res {
124               Ok(res) => ctx.text(res),
125               Err(e) => {
126                 eprintln!("{}", &e);
127                 // ctx.text(e);
128               }
129             }
130             // Ok(res) => ctx.text(res),
131             // // something is wrong with chat server
132             // _ => ctx.stop(),
133             fut::ok(())
134           })
135         .wait(ctx);
136
137         // we check for /sss type of messages
138         // if m.starts_with('/') {
139         //     let v: Vec<&str> = m.splitn(2, ' ').collect();
140         //     match v[0] {
141         //         "/list" => {
142         //             // Send ListRooms message to chat server and wait for
143         //             // response
144         //             println!("List rooms");
145         //             ctx.state()
146         //                 .addr
147         //                 .send(ListRooms)
148         //                 .into_actor(self)
149         //                 .then(|res, _, ctx| {
150         //                     match res {
151         //                         Ok(rooms) => {
152         //                             for room in rooms {
153         //                                 ctx.text(room);
154         //                             }
155         //                         }
156         //                         _ => println!("Something is wrong"),
157         //                     }
158         //                     fut::ok(())
159         //                 })
160         //                 .wait(ctx)
161         // .wait(ctx) pauses all events in context,
162         // so actor wont receive any new messages until it get list
163         // of rooms back
164         // }
165         // "/join" => {
166         //     if v.len() == 2 {
167         //         self.room = v[1].to_owned();
168         //         ctx.state().addr.do_send(Join {
169         //             id: self.id,
170         //             name: self.room.clone(),
171         //         });
172
173         //         ctx.text("joined");
174         //     } else {
175         //         ctx.text("!!! room name is required");
176         //     }
177         // }
178         // "/name" => {
179         //     if v.len() == 2 {
180         //         self.name = Some(v[1].to_owned());
181         //     } else {
182         //         ctx.text("!!! name is required");
183         //     }
184         // }
185         // _ => ctx.text(format!("!!! unknown command: {:?}", m)),
186         // }
187         // } else {
188         // let msg = if let Some(ref name) = self.name {
189         //     format!("{}: {}", name, m)
190         // } else {
191         //     m.to_owned()
192         // };
193         // send message to chat server
194         // ctx.state().addr.do_send(ClientMessage {
195         // id: self.id,
196         // msg: msg,
197         // room: self.room.clone(),
198         // })
199         // }
200       }
201       ws::Message::Binary(_bin) => println!("Unexpected binary"),
202       ws::Message::Close(_) => {
203         ctx.stop();
204       },
205     }
206   }
207 }
208
209 impl WSSession {
210   /// helper method that sends ping to client every second.
211   ///
212   /// also this method checks heartbeats from client
213   fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) {
214     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
215       // check client heartbeats
216       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
217         // heartbeat timed out
218         println!("Websocket Client heartbeat failed, disconnecting!");
219
220         // notify chat server
221         ctx.state()
222           .addr
223           .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
224
225         // stop actor
226         ctx.stop();
227
228         // don't try to send a ping
229         return;
230       }
231
232       ctx.ping("");
233     });
234   }
235 }
236
237 fn main() {
238   let _ = env_logger::init();
239   let sys = actix::System::new("lemmy");
240
241   // Run the migrations from code
242   let conn = establish_connection();
243   embedded_migrations::run(&conn).unwrap();
244
245   // Start chat server actor in separate thread
246   let server = Arbiter::start(|_| ChatServer::default());
247
248   // Create Http server with websocket support
249   HttpServer::new(move || {
250     // Websocket sessions state
251     let state = WsChatSessionState {
252       addr: server.clone(),
253     };
254
255     App::with_state(state)
256       // redirect to websocket.html
257       // .resource("/", |r| r.method(http::Method::GET).f(|_| {
258       // HttpResponse::Found()
259       // .header("LOCATION", "/static/websocket.html")
260       // .finish()
261       // }))
262       .resource("/service/ws", |r| r.route().f(chat_route))
263       // static resources
264       .resource("/", |r| r.route().f(index))
265       .handler(
266         "/static",
267         fs::StaticFiles::new(front_end_dir()).unwrap()
268       )
269       .finish()
270   }).bind("0.0.0.0:8536")
271   .unwrap()
272     .start();
273
274   println!("Started http server: 0.0.0.0:8536");
275   let _ = sys.run();
276 }
277
278 fn index(_req: &HttpRequest<WsChatSessionState>) -> Result<NamedFile, actix_web::error::Error> {
279   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
280 }
281
282 fn front_end_dir() -> String {
283   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
284 }