]> Untitled Git - lemmy.git/blob - server/src/main.rs
Implement webfinger (fixes #149)
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::*;
8 use actix_web_actors::ws;
9 use lemmy_server::apub;
10 use lemmy_server::db::establish_connection;
11 use lemmy_server::feeds;
12 use lemmy_server::nodeinfo;
13 use lemmy_server::settings::Settings;
14 use lemmy_server::webfinger;
15 use lemmy_server::websocket::server::*;
16 use std::env;
17 use std::time::{Duration, Instant};
18
19 embed_migrations!();
20
21 /// How often heartbeat pings are sent
22 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
23 /// How long before lack of client response causes a timeout
24 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
25
26 /// Entry point for our route
27 fn chat_route(
28   req: HttpRequest,
29   stream: web::Payload,
30   chat_server: web::Data<Addr<ChatServer>>,
31 ) -> Result<HttpResponse, Error> {
32   ws::start(
33     WSSession {
34       cs_addr: chat_server.get_ref().to_owned(),
35       id: 0,
36       hb: Instant::now(),
37       ip: req
38         .connection_info()
39         .remote()
40         .unwrap_or("127.0.0.1:12345")
41         .split(":")
42         .next()
43         .unwrap_or("127.0.0.1")
44         .to_string(),
45     },
46     &req,
47     stream,
48   )
49 }
50
51 struct WSSession {
52   cs_addr: Addr<ChatServer>,
53   /// unique session id
54   id: usize,
55   ip: String,
56   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
57   /// otherwise we drop connection.
58   hb: Instant,
59 }
60
61 impl Actor for WSSession {
62   type Context = ws::WebsocketContext<Self>;
63
64   /// Method is called on actor start.
65   /// We register ws session with ChatServer
66   fn started(&mut self, ctx: &mut Self::Context) {
67     // we'll start heartbeat process on session start.
68     self.hb(ctx);
69
70     // register self in chat server. `AsyncContext::wait` register
71     // future within context, but context waits until this future resolves
72     // before processing any other events.
73     // across all routes within application
74     let addr = ctx.address();
75     self
76       .cs_addr
77       .send(Connect {
78         addr: addr.recipient(),
79         ip: self.ip.to_owned(),
80       })
81       .into_actor(self)
82       .then(|res, act, ctx| {
83         match res {
84           Ok(res) => act.id = res,
85           // something is wrong with chat server
86           _ => ctx.stop(),
87         }
88         fut::ok(())
89       })
90       .wait(ctx);
91   }
92
93   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
94     // notify chat server
95     self.cs_addr.do_send(Disconnect {
96       id: self.id,
97       ip: self.ip.to_owned(),
98     });
99     Running::Stop
100   }
101 }
102
103 /// Handle messages from chat server, we simply send it to peer websocket
104 /// These are room messages, IE sent to others in the room
105 impl Handler<WSMessage> for WSSession {
106   type Result = ();
107
108   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
109     // println!("id: {} msg: {}", self.id, msg.0);
110     ctx.text(msg.0);
111   }
112 }
113
114 /// WebSocket message handler
115 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
116   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
117     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
118     match msg {
119       ws::Message::Ping(msg) => {
120         self.hb = Instant::now();
121         ctx.pong(&msg);
122       }
123       ws::Message::Pong(_) => {
124         self.hb = Instant::now();
125       }
126       ws::Message::Text(text) => {
127         let m = text.trim().to_owned();
128         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
129
130         self
131           .cs_addr
132           .send(StandardMessage {
133             id: self.id,
134             msg: m,
135           })
136           .into_actor(self)
137           .then(|res, _, ctx| {
138             match res {
139               Ok(res) => ctx.text(res),
140               Err(e) => {
141                 eprintln!("{}", &e);
142               }
143             }
144             fut::ok(())
145           })
146           .wait(ctx);
147       }
148       ws::Message::Binary(_bin) => println!("Unexpected binary"),
149       ws::Message::Close(_) => {
150         ctx.stop();
151       }
152       _ => {}
153     }
154   }
155 }
156
157 impl WSSession {
158   /// helper method that sends ping to client every second.
159   ///
160   /// also this method checks heartbeats from client
161   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
162     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
163       // check client heartbeats
164       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
165         // heartbeat timed out
166         println!("Websocket Client heartbeat failed, disconnecting!");
167
168         // notify chat server
169         act.cs_addr.do_send(Disconnect {
170           id: act.id,
171           ip: act.ip.to_owned(),
172         });
173
174         // stop actor
175         ctx.stop();
176
177         // don't try to send a ping
178         return;
179       }
180
181       ctx.ping("");
182     });
183   }
184 }
185
186 fn main() {
187   let _ = env_logger::init();
188   let sys = actix::System::new("lemmy");
189
190   // Run the migrations from code
191   let conn = establish_connection();
192   embedded_migrations::run(&conn).unwrap();
193
194   // Start chat server actor in separate thread
195   let server = ChatServer::default().start();
196
197   let settings = Settings::get();
198
199   // Create Http server with websocket support
200   HttpServer::new(move || {
201     App::new()
202       .data(server.clone())
203       // Front end routes
204       .service(actix_files::Files::new("/static", front_end_dir()))
205       .route("/", web::get().to(index))
206       .route(
207         "/home/type/{type}/sort/{sort}/page/{page}",
208         web::get().to(index),
209       )
210       .route("/login", web::get().to(index))
211       .route("/create_post", web::get().to(index))
212       .route("/create_community", web::get().to(index))
213       .route("/communities/page/{page}", web::get().to(index))
214       .route("/communities", web::get().to(index))
215       .route("/post/{id}/comment/{id2}", web::get().to(index))
216       .route("/post/{id}", web::get().to(index))
217       .route("/c/{name}/sort/{sort}/page/{page}", web::get().to(index))
218       .route("/c/{name}", web::get().to(index))
219       .route("/community/{id}", web::get().to(index))
220       .route(
221         "/u/{username}/view/{view}/sort/{sort}/page/{page}",
222         web::get().to(index),
223       )
224       .route("/u/{username}", web::get().to(index))
225       .route("/user/{id}", web::get().to(index))
226       .route("/inbox", web::get().to(index))
227       .route("/modlog/community/{community_id}", web::get().to(index))
228       .route("/modlog", web::get().to(index))
229       .route("/setup", web::get().to(index))
230       .route(
231         "/search/q/{q}/type/{type}/sort/{sort}/page/{page}",
232         web::get().to(index),
233       )
234       .route("/search", web::get().to(index))
235       .route("/sponsors", web::get().to(index))
236       .route("/password_change/{token}", web::get().to(index))
237       // Websocket
238       .service(web::resource("/api/v1/ws").to(chat_route))
239       // NodeInfo
240       .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info))
241       .route(
242         "/.well-known/nodeinfo",
243         web::get().to(nodeinfo::node_info_well_known),
244       )
245       // RSS
246       .route("/feeds/{type}/{name}.xml", web::get().to(feeds::get_feed))
247       .route("/feeds/all.xml", web::get().to(feeds::get_all_feed))
248       // Federation
249       .route(
250         "/federation/c/{community_name}",
251         web::get().to(apub::community::get_apub_community),
252       )
253       .route(
254         "/federation/c/{community_name}/followers",
255         web::get().to(apub::community::get_apub_community_followers),
256       )
257       .route(
258         "/federation/u/{user_name}",
259         web::get().to(apub::user::get_apub_user))
260       .route(
261         ".well-known/webfinger",
262         web::get().to(webfinger::get_webfinger_response),
263       )
264   })
265   .bind((settings.bind, settings.port))
266   .unwrap()
267   .start();
268
269   println!("Started http server at {}:{}", settings.bind, settings.port);
270   let _ = sys.run();
271 }
272
273 fn index() -> Result<NamedFile, actix_web::error::Error> {
274   Ok(NamedFile::open(front_end_dir() + "/index.html")?)
275 }
276
277 fn front_end_dir() -> String {
278   env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
279 }