]> Untitled Git - lemmy.git/blob - server/src/main.rs
Implementing very basic federation including test setup
[lemmy.git] / server / src / main.rs
1 extern crate lemmy_server;
2 #[macro_use]
3 extern crate diesel_migrations;
4
5 use actix::prelude::*;
6 use actix_files::NamedFile;
7 use actix_web::web::Query;
8 use actix_web::*;
9 use actix_web_actors::ws;
10 use lemmy_server::api::community::ListCommunities;
11 use lemmy_server::api::Oper;
12 use lemmy_server::api::Perform;
13 use lemmy_server::api::UserOperation;
14 use lemmy_server::apub;
15 use lemmy_server::db::establish_connection;
16 use lemmy_server::feeds;
17 use lemmy_server::nodeinfo;
18 use lemmy_server::settings::Settings;
19 use lemmy_server::webfinger;
20 use lemmy_server::websocket::server::*;
21 use std::time::{Duration, Instant};
22
23 embed_migrations!();
24
25 /// How often heartbeat pings are sent
26 const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
27 /// How long before lack of client response causes a timeout
28 const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
29
30 /// Entry point for our route
31 fn chat_route(
32   req: HttpRequest,
33   stream: web::Payload,
34   chat_server: web::Data<Addr<ChatServer>>,
35 ) -> Result<HttpResponse, Error> {
36   ws::start(
37     WSSession {
38       cs_addr: chat_server.get_ref().to_owned(),
39       id: 0,
40       hb: Instant::now(),
41       ip: req
42         .connection_info()
43         .remote()
44         .unwrap_or("127.0.0.1:12345")
45         .split(":")
46         .next()
47         .unwrap_or("127.0.0.1")
48         .to_string(),
49     },
50     &req,
51     stream,
52   )
53 }
54
55 struct WSSession {
56   cs_addr: Addr<ChatServer>,
57   /// unique session id
58   id: usize,
59   ip: String,
60   /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
61   /// otherwise we drop connection.
62   hb: Instant,
63 }
64
65 impl Actor for WSSession {
66   type Context = ws::WebsocketContext<Self>;
67
68   /// Method is called on actor start.
69   /// We register ws session with ChatServer
70   fn started(&mut self, ctx: &mut Self::Context) {
71     // we'll start heartbeat process on session start.
72     self.hb(ctx);
73
74     // register self in chat server. `AsyncContext::wait` register
75     // future within context, but context waits until this future resolves
76     // before processing any other events.
77     // across all routes within application
78     let addr = ctx.address();
79     self
80       .cs_addr
81       .send(Connect {
82         addr: addr.recipient(),
83         ip: self.ip.to_owned(),
84       })
85       .into_actor(self)
86       .then(|res, act, ctx| {
87         match res {
88           Ok(res) => act.id = res,
89           // something is wrong with chat server
90           _ => ctx.stop(),
91         }
92         fut::ok(())
93       })
94       .wait(ctx);
95   }
96
97   fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
98     // notify chat server
99     self.cs_addr.do_send(Disconnect {
100       id: self.id,
101       ip: self.ip.to_owned(),
102     });
103     Running::Stop
104   }
105 }
106
107 /// Handle messages from chat server, we simply send it to peer websocket
108 /// These are room messages, IE sent to others in the room
109 impl Handler<WSMessage> for WSSession {
110   type Result = ();
111
112   fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
113     // println!("id: {} msg: {}", self.id, msg.0);
114     ctx.text(msg.0);
115   }
116 }
117
118 /// WebSocket message handler
119 impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
120   fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
121     // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
122     match msg {
123       ws::Message::Ping(msg) => {
124         self.hb = Instant::now();
125         ctx.pong(&msg);
126       }
127       ws::Message::Pong(_) => {
128         self.hb = Instant::now();
129       }
130       ws::Message::Text(text) => {
131         let m = text.trim().to_owned();
132         println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
133
134         self
135           .cs_addr
136           .send(StandardMessage {
137             id: self.id,
138             msg: m,
139           })
140           .into_actor(self)
141           .then(|res, _, ctx| {
142             match res {
143               Ok(res) => ctx.text(res),
144               Err(e) => {
145                 eprintln!("{}", &e);
146               }
147             }
148             fut::ok(())
149           })
150           .wait(ctx);
151       }
152       ws::Message::Binary(_bin) => println!("Unexpected binary"),
153       ws::Message::Close(_) => {
154         ctx.stop();
155       }
156       _ => {}
157     }
158   }
159 }
160
161 impl WSSession {
162   /// helper method that sends ping to client every second.
163   ///
164   /// also this method checks heartbeats from client
165   fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
166     ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
167       // check client heartbeats
168       if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
169         // heartbeat timed out
170         println!("Websocket Client heartbeat failed, disconnecting!");
171
172         // notify chat server
173         act.cs_addr.do_send(Disconnect {
174           id: act.id,
175           ip: act.ip.to_owned(),
176         });
177
178         // stop actor
179         ctx.stop();
180
181         // don't try to send a ping
182         return;
183       }
184
185       ctx.ping("");
186     });
187   }
188 }
189
190 fn main() {
191   let _ = env_logger::init();
192   let sys = actix::System::new("lemmy");
193
194   // Run the migrations from code
195   let conn = establish_connection();
196   embedded_migrations::run(&conn).unwrap();
197
198   // Start chat server actor in separate thread
199   let server = ChatServer::default().start();
200
201   let settings = Settings::get();
202
203   // Create Http server with websocket support
204   HttpServer::new(move || {
205     let app = App::new()
206       .data(server.clone())
207       // Front end routes
208       .service(actix_files::Files::new(
209         "/static",
210         settings.front_end_dir.to_owned(),
211       ))
212       .route("/", web::get().to(index))
213       .route(
214         "/home/type/{type}/sort/{sort}/page/{page}",
215         web::get().to(index),
216       )
217       .route("/login", web::get().to(index))
218       .route("/create_post", web::get().to(index))
219       .route("/create_community", web::get().to(index))
220       .route("/communities/page/{page}", web::get().to(index))
221       .route("/communities", web::get().to(index))
222       .route("/post/{id}/comment/{id2}", web::get().to(index))
223       .route("/post/{id}", web::get().to(index))
224       .route("/c/{name}/sort/{sort}/page/{page}", web::get().to(index))
225       .route("/c/{name}", web::get().to(index))
226       .route("/community/{id}", web::get().to(index))
227       .route(
228         "/u/{username}/view/{view}/sort/{sort}/page/{page}",
229         web::get().to(index),
230       )
231       .route("/u/{username}", web::get().to(index))
232       .route("/user/{id}", web::get().to(index))
233       .route("/inbox", web::get().to(index))
234       .route("/modlog/community/{community_id}", web::get().to(index))
235       .route("/modlog", web::get().to(index))
236       .route("/setup", web::get().to(index))
237       .route(
238         "/search/q/{q}/type/{type}/sort/{sort}/page/{page}",
239         web::get().to(index),
240       )
241       .route("/search", web::get().to(index))
242       .route("/sponsors", web::get().to(index))
243       .route("/password_change/{token}", web::get().to(index))
244       // Websocket
245       .service(web::resource("/api/v1/ws").to(chat_route))
246       // NodeInfo
247       .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info))
248       .route(
249         "/.well-known/nodeinfo",
250         web::get().to(nodeinfo::node_info_well_known),
251       )
252       // RSS
253       .route("/feeds/{type}/{name}.xml", web::get().to(feeds::get_feed))
254       .route("/feeds/all.xml", web::get().to(feeds::get_all_feed))
255       // Federation
256       .route(
257         "/federation/c/{community_name}",
258         web::get().to(apub::community::get_apub_community),
259       )
260       .route(
261         "/federation/c/{community_name}/followers",
262         web::get().to(apub::community::get_apub_community_followers),
263       )
264       .route(
265         "/federation/u/{user_name}",
266         web::get().to(apub::user::get_apub_user),
267       )
268       .route("/feeds/all.xml", web::get().to(feeds::get_all_feed));
269
270     // Federation
271     if Settings::get().federation_enabled {
272       println!("federation enabled, host is {}", Settings::get().hostname);
273       app
274         .route(
275           ".well-known/webfinger",
276           web::get().to(webfinger::get_webfinger_response),
277         )
278         // TODO: this is a very quick and dirty implementation for http api calls
279         .route(
280           "/api/v1/communities/list",
281           web::get().to(|query: Query<ListCommunities>| {
282             let res = Oper::new(UserOperation::ListCommunities, query.into_inner())
283               .perform()
284               .unwrap();
285             HttpResponse::Ok()
286               .content_type("application/json")
287               .body(serde_json::to_string(&res).unwrap())
288           }),
289         )
290     } else {
291       app
292     }
293   })
294   .bind((settings.bind, settings.port))
295   .unwrap()
296   .start();
297
298   println!("Started http server at {}:{}", settings.bind, settings.port);
299
300   let _ = sys.run();
301 }
302
303 fn index() -> Result<NamedFile, actix_web::error::Error> {
304   Ok(NamedFile::open(
305     Settings::get().front_end_dir.to_owned() + "/index.html",
306   )?)
307 }