]> Untitled Git - lemmy.git/blob - server/src/main.rs
More reorg
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use] extern crate diesel_migrations;
3
4 use std::time::{Instant, Duration};
5 use std::env;
6 use lemmy_server::actix::*;
7 use lemmy_server::actix_web::server::HttpServer;
8 use lemmy_server::actix_web::{ws, App, Error, HttpRequest, HttpResponse, fs::NamedFile, fs};
9 use lemmy_server::websocket::server::*;
10 use lemmy_server::establish_connection;
11
12 embed_migrations!();
13
14 /// How often heartbeat pings are sent
15 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
16 /// How long before lack of client response causes a timeout
17 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
18
19 /// This is our websocket route state, this state is shared with all route
20 /// instances via `HttpContext::state()`
21 struct WsChatSessionState {
22   addr: Addr<ChatServer>,
23 }
24
25 /// Entry point for our route
26 fn chat_route(req: &HttpRequest<WsChatSessionState>) -> Result<HttpResponse, Error> {
27   ws::start(
28     req,
29     WSSession {
30       id: 0,
31       hb: Instant::now(),
32       ip: req.connection_info()
33         .remote()
34         .unwrap_or("127.0.0.1:12345")
35         .split(":")
36         .next()
37         .unwrap_or("127.0.0.1")
38         .to_string()
39     },
40     )
41 }
42
43 struct WSSession {
44   /// unique session id
45   id: usize,
46   ip: String,
47   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
48   /// otherwise we drop connection.
49   hb: Instant
50 }
51
52 impl Actor for WSSession {
53   type Context = ws::WebsocketContext<Self, WsChatSessionState>;
54
55   /// Method is called on actor start.
56   /// We register ws session with ChatServer
57   fn started(&mut self, ctx: &mut Self::Context) {
58     // we'll start heartbeat process on session start.
59     self.hb(ctx);
60
61     // register self in chat server. `AsyncContext::wait` register
62     // future within context, but context waits until this future resolves
63     // before processing any other events.
64     // HttpContext::state() is instance of WsChatSessionState, state is shared
65     // across all routes within application
66     let addr = ctx.address();
67     ctx.state()
68       .addr
69       .send(Connect {
70         addr: addr.recipient(),
71         ip: self.ip.to_owned(),
72       })
73     .into_actor(self)
74       .then(|res, act, ctx| {
75         match res {
76           Ok(res) => act.id = res,
77           // something is wrong with chat server
78           _ => ctx.stop(),
79         }
80         fut::ok(())
81       })
82     .wait(ctx);
83   }
84
85   fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
86     // notify chat server
87     ctx.state().addr.do_send(Disconnect { 
88       id: self.id,
89       ip: self.ip.to_owned(),
90     });
91     Running::Stop
92   }
93 }
94
95 /// Handle messages from chat server, we simply send it to peer websocket
96 /// These are room messages, IE sent to others in the room
97 impl Handler<WSMessage> for WSSession {
98   type Result = ();
99
100   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
101     // println!("id: {} msg: {}", self.id, msg.0);
102     ctx.text(msg.0);
103   }
104 }
105
106 /// WebSocket message handler
107 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
108   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
109     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
110     match msg {
111       ws::Message::Ping(msg) => {
112         self.hb = Instant::now();
113         ctx.pong(&msg);
114       }
115       ws::Message::Pong(_) => {
116         self.hb = Instant::now();
117       }
118       ws::Message::Text(text) => {
119         let m = text.trim().to_owned();
120         
121         ctx.state()
122           .addr
123           .send(StandardMessage {
124             id: self.id,
125             msg: m,
126           })
127         .into_actor(self)
128           .then(|res, _, ctx| {
129             match res {
130               Ok(res) => ctx.text(res),
131               Err(e) => {
132                 eprintln!("{}", &e);
133                 // ctx.text(e);
134               }
135             }
136             // Ok(res) => ctx.text(res),
137             // // something is wrong with chat server
138             // _ => ctx.stop(),
139             fut::ok(())
140           })
141         .wait(ctx);
142
143         // we check for /sss type of messages
144         // if m.starts_with('/') {
145         //     let v: Vec<&str> = m.splitn(2, ' ').collect();
146         //     match v[0] {
147         //         "/list" => {
148         //             // Send ListRooms message to chat server and wait for
149         //             // response
150         //             println!("List rooms");
151         //             ctx.state()
152         //                 .addr
153         //                 .send(ListRooms)
154         //                 .into_actor(self)
155         //                 .then(|res, _, ctx| {
156         //                     match res {
157         //                         Ok(rooms) => {
158         //                             for room in rooms {
159         //                                 ctx.text(room);
160         //                             }
161         //                         }
162         //                         _ => println!("Something is wrong"),
163         //                     }
164         //                     fut::ok(())
165         //                 })
166         //                 .wait(ctx)
167         // .wait(ctx) pauses all events in context,
168         // so actor wont receive any new messages until it get list
169         // of rooms back
170         // }
171         // "/join" => {
172         //     if v.len() == 2 {
173         //         self.room = v[1].to_owned();
174         //         ctx.state().addr.do_send(Join {
175         //             id: self.id,
176         //             name: self.room.clone(),
177         //         });
178
179         //         ctx.text("joined");
180         //     } else {
181         //         ctx.text("!!! room name is required");
182         //     }
183         // }
184         // "/name" => {
185         //     if v.len() == 2 {
186         //         self.name = Some(v[1].to_owned());
187         //     } else {
188         //         ctx.text("!!! name is required");
189         //     }
190         // }
191         // _ => ctx.text(format!("!!! unknown command: {:?}", m)),
192         // }
193         // } else {
194         // let msg = if let Some(ref name) = self.name {
195         //     format!("{}: {}", name, m)
196         // } else {
197         //     m.to_owned()
198         // };
199         // send message to chat server
200         // ctx.state().addr.do_send(ClientMessage {
201         // id: self.id,
202         // msg: msg,
203         // room: self.room.clone(),
204         // })
205         // }
206       }
207       ws::Message::Binary(_bin) => println!("Unexpected binary"),
208       ws::Message::Close(_) => {
209         ctx.stop();
210       },
211     }
212   }
213 }
214
215 impl WSSession {
216   /// helper method that sends ping to client every second.
217   ///
218   /// also this method checks heartbeats from client
219   fn hb(&self, ctx: &mut ws::WebsocketContext<Self, WsChatSessionState>) {
220     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
221       // check client heartbeats
222       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
223         // heartbeat timed out
224         println!("Websocket Client heartbeat failed, disconnecting!");
225
226         // notify chat server
227         ctx.state()
228           .addr
229           .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
230
231         // stop actor
232         ctx.stop();
233
234         // don't try to send a ping
235         return;
236       }
237
238       ctx.ping("");
239     });
240   }
241 }
242
243 fn main() {
244   let _ = env_logger::init();
245   let sys = actix::System::new("lemmy");
246
247   // Run the migrations from code
248   let conn = establish_connection();
249   embedded_migrations::run(&conn).unwrap();
250
251   // Start chat server actor in separate thread
252   let server = Arbiter::start(|_| ChatServer::default());
253
254   // Create Http server with websocket support
255   HttpServer::new(move || {
256     // Websocket sessions state
257     let state = WsChatSessionState {
258       addr: server.clone(),
259     };
260
261     App::with_state(state)
262       // redirect to websocket.html
263       // .resource("/", |r| r.method(http::Method::GET).f(|_| {
264       // HttpResponse::Found()
265       // .header("LOCATION", "/static/websocket.html")
266       // .finish()
267       // }))
268       .resource("/service/ws", |r| r.route().f(chat_route))
269       // static resources
270       .resource("/", |r| r.route().f(index))
271       .handler(
272         "/static",
273         fs::StaticFiles::new(front_end_dir()).unwrap()
274       )
275       .finish()
276   }).bind("0.0.0.0:8536")
277   .unwrap()
278     .start();
279
280   println!("Started http server: 0.0.0.0:8536");
281   let _ = sys.run();
282 }
283
284 fn index(_req: &HttpRequest<WsChatSessionState>) -> Result<NamedFile, actix_web::error::Error> {
285   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
286 }
287
288 fn front_end_dir() -> String {
289   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
290 }