]> Untitled Git - lemmy.git/blobdiff - server/src/main.rs
Merge branch 'dev' into federation
[lemmy.git] / server / src / main.rs
index 75a48865be575672094cd37c32c15bff15a80743..88d62eb997c9fd374a4a1fc3661257d1f64029a0 100644 (file)
@@ -2,206 +2,86 @@ extern crate lemmy_server;
 #[macro_use]
 extern crate diesel_migrations;
 
-use std::time::{Instant, Duration};
-use std::env;
-use actix_web::*;
 use actix::prelude::*;
-use actix_files::NamedFile;
-use actix_web_actors::ws;
+use actix_web::*;
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
+use failure::Error;
+use lemmy_server::apub::fetcher::fetch_all;
+use lemmy_server::db::code_migrations::run_advanced_migrations;
+use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket};
+use lemmy_server::settings::Settings;
 use lemmy_server::websocket::server::*;
-use lemmy_server::db::establish_connection;
+use log::warn;
+use std::thread;
+use std::thread::sleep;
+use std::time::Duration;
 
 embed_migrations!();
 
-/// How often heartbeat pings are sent
-const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
-/// How long before lack of client response causes a timeout
-const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
-
-
-/// Entry point for our route
-fn chat_route(req: HttpRequest, stream: web::Payload, chat_server: web::Data<Addr<ChatServer>>) -> Result<HttpResponse, Error> {
-    ws::start(
-        WSSession {
-            cs_addr: chat_server.get_ref().to_owned(),
-            id: 0,
-            hb: Instant::now(),
-            ip: req.connection_info()
-                .remote()
-                .unwrap_or("127.0.0.1:12345")
-                .split(":")
-                .next()
-                .unwrap_or("127.0.0.1")
-                .to_string(),
-        },
-        &req,
-        stream,
-    )
-}
-
-struct WSSession {
-    cs_addr: Addr<ChatServer>,
-    /// unique session id
-    id: usize,
-    ip: String,
-    /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
-    /// otherwise we drop connection.
-    hb: Instant,
-}
-
-impl Actor for WSSession {
-    type Context = ws::WebsocketContext<Self>;
-
-    /// Method is called on actor start.
-    /// We register ws session with ChatServer
-    fn started(&mut self, ctx: &mut Self::Context) {
-        // we'll start heartbeat process on session start.
-        self.hb(ctx);
-
-        // register self in chat server. `AsyncContext::wait` register
-        // future within context, but context waits until this future resolves
-        // before processing any other events.
-        // across all routes within application
-        let addr = ctx.address();
-        self.cs_addr
-            .send(Connect {
-                addr: addr.recipient(),
-                ip: self.ip.to_owned(),
-            })
-            .into_actor(self)
-            .then(|res, act, ctx| {
-                match res {
-                    Ok(res) => act.id = res,
-                    // something is wrong with chat server
-                    _ => ctx.stop(),
-                }
-                fut::ok(())
-            })
-            .wait(ctx);
+#[actix_rt::main]
+async fn main() -> Result<(), Error> {
+  env_logger::init();
+  let settings = Settings::get();
+
+  // Set up the r2d2 connection pool
+  let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
+  let pool = Pool::builder()
+    .max_size(settings.database.pool_size)
+    .build(manager)
+    .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
+
+  // Run the migrations from code
+  let conn = pool.get().unwrap();
+  embedded_migrations::run(&conn).unwrap();
+  run_advanced_migrations(&conn).unwrap();
+
+  // Set up websocket server
+  let server = ChatServer::startup(pool.clone()).start();
+
+  thread::spawn(move || {
+    // some work here
+    sleep(Duration::from_secs(5));
+    println!("Fetching apub data");
+    match fetch_all(&conn) {
+      Ok(_) => {}
+      Err(e) => warn!("Error during apub fetch: {}", e),
     }
+  });
 
-    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
-        // notify chat server
-        self.cs_addr.do_send(Disconnect {
-            id: self.id,
-            ip: self.ip.to_owned(),
-        });
-        Running::Stop
-    }
-}
+  println!(
+    "Starting http server at {}:{}",
+    settings.bind, settings.port
+  );
 
-/// Handle messages from chat server, we simply send it to peer websocket
-/// These are room messages, IE sent to others in the room
-impl Handler<WSMessage> for WSSession {
-    type Result = ();
-
-    fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
-        // println!("id: {} msg: {}", self.id, msg.0);
-        ctx.text(msg.0);
-    }
-}
-
-/// WebSocket message handler
-impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
-    fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
-        // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
-        match msg {
-            ws::Message::Ping(msg) => {
-                self.hb = Instant::now();
-                ctx.pong(&msg);
-            }
-            ws::Message::Pong(_) => {
-                self.hb = Instant::now();
-            }
-            ws::Message::Text(text) => {
-                let m = text.trim().to_owned();
-                println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
-
-                self.cs_addr
-                    .send(StandardMessage {
-                        id: self.id,
-                        msg: m,
-                    })
-                    .into_actor(self)
-                    .then(|res, _, ctx| {
-                        match res {
-                            Ok(res) => ctx.text(res),
-                            Err(e) => {
-                                eprintln!("{}", &e);
-                            }
-                        }
-                        fut::ok(())
-                    })
-                    .wait(ctx);
-            }
-            ws::Message::Binary(_bin) => println!("Unexpected binary"),
-            ws::Message::Close(_) => {
-                ctx.stop();
-            }
-            _ => {}
-        }
-    }
-}
-
-impl WSSession {
-    /// helper method that sends ping to client every second.
-    ///
-    /// also this method checks heartbeats from client
-    fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
-        ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
-            // check client heartbeats
-            if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
-                // heartbeat timed out
-                println!("Websocket Client heartbeat failed, disconnecting!");
-
-                // notify chat server
-                act.cs_addr
-                    .do_send(Disconnect { id: act.id, ip: act.ip.to_owned() });
-
-                // stop actor
-                ctx.stop();
-
-                // don't try to send a ping
-                return;
-            }
-
-            ctx.ping("");
-        });
-    }
-}
-
-fn main() {
-    let _ = env_logger::init();
-    let sys = actix::System::new("lemmy");
-
-    // Run the migrations from code
-    let conn = establish_connection();
-    embedded_migrations::run(&conn).unwrap();
-
-    // Start chat server actor in separate thread
-    let server = ChatServer::default().start();
-    // Create Http server with websocket support
+  // Create Http server with websocket support
+  Ok(
     HttpServer::new(move || {
-
-        App::new()
-            .data(server.clone())
-            .service(web::resource("/api/v1/ws").to(chat_route))
-//            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
-            .service(web::resource("/").to(index))
-            // static resources
-            .service(actix_files::Files::new("/static", front_end_dir()))
-    }).bind("0.0.0.0:8536")
-        .unwrap()
-        .start();
-
-    println!("Started http server: 0.0.0.0:8536");
-    let _ = sys.run();
-}
-
-fn index() -> Result<NamedFile, actix_web::error::Error> {
-    Ok(NamedFile::open(front_end_dir() + "/index.html")?)
-}
-
-fn front_end_dir() -> String {
-    env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
+      let settings = Settings::get();
+      App::new()
+        .wrap(middleware::Logger::default())
+        .data(pool.clone())
+        .data(server.clone())
+        // The routes
+        .configure(api::config)
+        .configure(federation::config)
+        .configure(feeds::config)
+        .configure(index::config)
+        .configure(nodeinfo::config)
+        .configure(webfinger::config)
+        .configure(websocket::config)
+        // static files
+        .service(actix_files::Files::new(
+          "/static",
+          settings.front_end_dir.to_owned(),
+        ))
+        .service(actix_files::Files::new(
+          "/docs",
+          settings.front_end_dir.to_owned() + "/documentation",
+        ))
+    })
+    .bind((settings.bind, settings.port))?
+    .run()
+    .await?,
+  )
 }