]> Untitled Git - lemmy.git/blob - server/src/main.rs
Implement config (fixes #351)
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::apub;
10 use lemmy_server::db::establish_connection;
11 use lemmy_server::feeds;
12 use lemmy_server::nodeinfo;
13 use lemmy_server::settings::Settings;
14 use lemmy_server::websocket::server::*;
15 use std::env;
16 use std::time::{Duration, Instant};
17
18 embed_migrations!();
19
20 /// How often heartbeat pings are sent
21 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
22 /// How long before lack of client response causes a timeout
23 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
24
25 /// Entry point for our route
26 fn chat_route(
27   req: HttpRequest,
28   stream: web::Payload,
29   chat_server: web::Data<Addr<ChatServer>>,
30 ) -> Result<HttpResponse, Error> {
31   ws::start(
32     WSSession {
33       cs_addr: chat_server.get_ref().to_owned(),
34       id: 0,
35       hb: Instant::now(),
36       ip: req
37         .connection_info()
38         .remote()
39         .unwrap_or("127.0.0.1:12345")
40         .split(":")
41         .next()
42         .unwrap_or("127.0.0.1")
43         .to_string(),
44     },
45     &req,
46     stream,
47   )
48 }
49
50 struct WSSession {
51   cs_addr: Addr<ChatServer>,
52   /// unique session id
53   id: usize,
54   ip: String,
55   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
56   /// otherwise we drop connection.
57   hb: Instant,
58 }
59
60 impl Actor for WSSession {
61   type Context = ws::WebsocketContext<Self>;
62
63   /// Method is called on actor start.
64   /// We register ws session with ChatServer
65   fn started(&mut self, ctx: &mut Self::Context) {
66     // we'll start heartbeat process on session start.
67     self.hb(ctx);
68
69     // register self in chat server. `AsyncContext::wait` register
70     // future within context, but context waits until this future resolves
71     // before processing any other events.
72     // across all routes within application
73     let addr = ctx.address();
74     self
75       .cs_addr
76       .send(Connect {
77         addr: addr.recipient(),
78         ip: self.ip.to_owned(),
79       })
80       .into_actor(self)
81       .then(|res, act, ctx| {
82         match res {
83           Ok(res) => act.id = res,
84           // something is wrong with chat server
85           _ => ctx.stop(),
86         }
87         fut::ok(())
88       })
89       .wait(ctx);
90   }
91
92   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
93     // notify chat server
94     self.cs_addr.do_send(Disconnect {
95       id: self.id,
96       ip: self.ip.to_owned(),
97     });
98     Running::Stop
99   }
100 }
101
102 /// Handle messages from chat server, we simply send it to peer websocket
103 /// These are room messages, IE sent to others in the room
104 impl Handler<WSMessage> for WSSession {
105   type Result = ();
106
107   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
108     // println!("id: {} msg: {}", self.id, msg.0);
109     ctx.text(msg.0);
110   }
111 }
112
113 /// WebSocket message handler
114 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
115   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
116     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
117     match msg {
118       ws::Message::Ping(msg) => {
119         self.hb = Instant::now();
120         ctx.pong(&msg);
121       }
122       ws::Message::Pong(_) => {
123         self.hb = Instant::now();
124       }
125       ws::Message::Text(text) => {
126         let m = text.trim().to_owned();
127         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
128
129         self
130           .cs_addr
131           .send(StandardMessage {
132             id: self.id,
133             msg: m,
134           })
135           .into_actor(self)
136           .then(|res, _, ctx| {
137             match res {
138               Ok(res) => ctx.text(res),
139               Err(e) => {
140                 eprintln!("{}", &e);
141               }
142             }
143             fut::ok(())
144           })
145           .wait(ctx);
146       }
147       ws::Message::Binary(_bin) => println!("Unexpected binary"),
148       ws::Message::Close(_) => {
149         ctx.stop();
150       }
151       _ => {}
152     }
153   }
154 }
155
156 impl WSSession {
157   /// helper method that sends ping to client every second.
158   ///
159   /// also this method checks heartbeats from client
160   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
161     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
162       // check client heartbeats
163       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
164         // heartbeat timed out
165         println!("Websocket Client heartbeat failed, disconnecting!");
166
167         // notify chat server
168         act.cs_addr.do_send(Disconnect {
169           id: act.id,
170           ip: act.ip.to_owned(),
171         });
172
173         // stop actor
174         ctx.stop();
175
176         // don't try to send a ping
177         return;
178       }
179
180       ctx.ping("");
181     });
182   }
183 }
184
185 fn main() {
186   let _ = env_logger::init();
187   let sys = actix::System::new("lemmy");
188
189   // Run the migrations from code
190   let conn = establish_connection();
191   embedded_migrations::run(&conn).unwrap();
192
193   // Start chat server actor in separate thread
194   let server = ChatServer::default().start();
195
196   let settings = Settings::get();
197
198   // Create Http server with websocket support
199   HttpServer::new(move || {
200     App::new()
201       .data(server.clone())
202       // Front end routes
203       .service(actix_files::Files::new("/static", front_end_dir()))
204       .route("/", web::get().to(index))
205       .route(
206         "/home/type/{type}/sort/{sort}/page/{page}",
207         web::get().to(index),
208       )
209       .route("/login", web::get().to(index))
210       .route("/create_post", web::get().to(index))
211       .route("/create_community", web::get().to(index))
212       .route("/communities/page/{page}", web::get().to(index))
213       .route("/communities", web::get().to(index))
214       .route("/post/{id}/comment/{id2}", web::get().to(index))
215       .route("/post/{id}", web::get().to(index))
216       .route("/c/{name}/sort/{sort}/page/{page}", web::get().to(index))
217       .route("/c/{name}", web::get().to(index))
218       .route("/community/{id}", web::get().to(index))
219       .route(
220         "/u/{username}/view/{view}/sort/{sort}/page/{page}",
221         web::get().to(index),
222       )
223       .route("/u/{username}", web::get().to(index))
224       .route("/user/{id}", web::get().to(index))
225       .route("/inbox", web::get().to(index))
226       .route("/modlog/community/{community_id}", web::get().to(index))
227       .route("/modlog", web::get().to(index))
228       .route("/setup", web::get().to(index))
229       .route(
230         "/search/q/{q}/type/{type}/sort/{sort}/page/{page}",
231         web::get().to(index),
232       )
233       .route("/search", web::get().to(index))
234       .route("/sponsors", web::get().to(index))
235       .route("/password_change/{token}", web::get().to(index))
236       // Websocket
237       .service(web::resource("/api/v1/ws").to(chat_route))
238       // NodeInfo
239       .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info))
240       .route(
241         "/.well-known/nodeinfo",
242         web::get().to(nodeinfo::node_info_well_known),
243       )
244       // RSS
245       .route("/feeds/{type}/{name}.xml", web::get().to(feeds::get_feed))
246       .route("/feeds/all.xml", web::get().to(feeds::get_all_feed))
247       // Federation
248       .route(
249         "/federation/c/{community_name}",
250         web::get().to(apub::community::get_apub_community),
251       )
252       .route(
253         "/federation/c/{community_name}/followers",
254         web::get().to(apub::community::get_apub_community_followers),
255       )
256       .route(
257         "/federation/u/{user_name}",
258         web::get().to(apub::user::get_apub_user),
259       )
260   })
261   .bind((settings.bind, settings.port))
262   .unwrap()
263   .start();
264
265   println!("Started http server at {}:{}", settings.bind, settings.port);
266   let _ = sys.run();
267 }
268
269 fn index() -> Result<NamedFile, actix_web::error::Error> {
270   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
271 }
272
273 fn front_end_dir() -> String {
274   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
275 }