]> Untitled Git - lemmy.git/blob - server/src/main.rs
got it working
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::db::establish_connection;
10 use lemmy_server::websocket::server::*;
11 use std::env;
12 use std::time::{Duration, Instant};
13 use lemmy_server::nodeinfo;
14
15 embed_migrations!();
16
17 /// How often heartbeat pings are sent
18 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
19 /// How long before lack of client response causes a timeout
20 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
21
22 /// Entry point for our route
23 fn chat_route(
24   req: HttpRequest,
25   stream: web::Payload,
26   chat_server: web::Data<Addr<ChatServer>>,
27 ) -> Result<HttpResponse, Error> {
28   ws::start(
29     WSSession {
30       cs_addr: chat_server.get_ref().to_owned(),
31       id: 0,
32       hb: Instant::now(),
33       ip: req
34         .connection_info()
35         .remote()
36         .unwrap_or("127.0.0.1:12345")
37         .split(":")
38         .next()
39         .unwrap_or("127.0.0.1")
40         .to_string(),
41     },
42     &req,
43     stream,
44   )
45 }
46
47 struct WSSession {
48   cs_addr: Addr<ChatServer>,
49   /// unique session id
50   id: usize,
51   ip: String,
52   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
53   /// otherwise we drop connection.
54   hb: Instant,
55 }
56
57 impl Actor for WSSession {
58   type Context = ws::WebsocketContext<Self>;
59
60   /// Method is called on actor start.
61   /// We register ws session with ChatServer
62   fn started(&mut self, ctx: &mut Self::Context) {
63     // we'll start heartbeat process on session start.
64     self.hb(ctx);
65
66     // register self in chat server. `AsyncContext::wait` register
67     // future within context, but context waits until this future resolves
68     // before processing any other events.
69     // across all routes within application
70     let addr = ctx.address();
71     self
72       .cs_addr
73       .send(Connect {
74         addr: addr.recipient(),
75         ip: self.ip.to_owned(),
76       })
77       .into_actor(self)
78       .then(|res, act, ctx| {
79         match res {
80           Ok(res) => act.id = res,
81           // something is wrong with chat server
82           _ => ctx.stop(),
83         }
84         fut::ok(())
85       })
86       .wait(ctx);
87   }
88
89   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
90     // notify chat server
91     self.cs_addr.do_send(Disconnect {
92       id: self.id,
93       ip: self.ip.to_owned(),
94     });
95     Running::Stop
96   }
97 }
98
99 /// Handle messages from chat server, we simply send it to peer websocket
100 /// These are room messages, IE sent to others in the room
101 impl Handler<WSMessage> for WSSession {
102   type Result = ();
103
104   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
105     // println!("id: {} msg: {}", self.id, msg.0);
106     ctx.text(msg.0);
107   }
108 }
109
110 /// WebSocket message handler
111 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
112   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
113     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
114     match msg {
115       ws::Message::Ping(msg) => {
116         self.hb = Instant::now();
117         ctx.pong(&msg);
118       }
119       ws::Message::Pong(_) => {
120         self.hb = Instant::now();
121       }
122       ws::Message::Text(text) => {
123         let m = text.trim().to_owned();
124         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
125
126         self
127           .cs_addr
128           .send(StandardMessage {
129             id: self.id,
130             msg: m,
131           })
132           .into_actor(self)
133           .then(|res, _, ctx| {
134             match res {
135               Ok(res) => ctx.text(res),
136               Err(e) => {
137                 eprintln!("{}", &e);
138               }
139             }
140             fut::ok(())
141           })
142           .wait(ctx);
143       }
144       ws::Message::Binary(_bin) => println!("Unexpected binary"),
145       ws::Message::Close(_) => {
146         ctx.stop();
147       }
148       _ => {}
149     }
150   }
151 }
152
153 impl WSSession {
154   /// helper method that sends ping to client every second.
155   ///
156   /// also this method checks heartbeats from client
157   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
158     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
159       // check client heartbeats
160       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
161         // heartbeat timed out
162         println!("Websocket Client heartbeat failed, disconnecting!");
163
164         // notify chat server
165         act.cs_addr.do_send(Disconnect {
166           id: act.id,
167           ip: act.ip.to_owned(),
168         });
169
170         // stop actor
171         ctx.stop();
172
173         // don't try to send a ping
174         return;
175       }
176
177       ctx.ping("");
178     });
179   }
180 }
181
182 fn main() {
183   let _ = env_logger::init();
184   let sys = actix::System::new("lemmy");
185
186   // Run the migrations from code
187   let conn = establish_connection();
188   embedded_migrations::run(&conn).unwrap();
189
190   // Start chat server actor in separate thread
191   let server = ChatServer::default().start();
192   // Create Http server with websocket support
193   HttpServer::new(move || {
194     App::new()
195       .data(server.clone())
196       .service(web::resource("/api/v1/ws").to(chat_route))
197       //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
198       .service(web::resource("/").to(index))
199       // static resources
200       .service(actix_files::Files::new("/static", front_end_dir()))
201       .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info))
202   })
203   .bind("0.0.0.0:8536")
204   .unwrap()
205   .start();
206
207   println!("Started http server: 0.0.0.0:8536");
208   let _ = sys.run();
209 }
210
211 fn index() -> Result<NamedFile, actix_web::error::Error> {
212   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
213 }
214
215 fn front_end_dir() -> String {
216   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
217 }