]> Untitled Git - lemmy.git/blobdiff - server/src/main.rs
Merge branch 'dev' into federation
[lemmy.git] / server / src / main.rs
index 5e9a1dae8831ad8dc8f558d8543fbde0e2293489..88d62eb997c9fd374a4a1fc3661257d1f64029a0 100644 (file)
@@ -3,211 +3,85 @@ extern crate lemmy_server;
 extern crate diesel_migrations;
 
 use actix::prelude::*;
-use actix_files::NamedFile;
 use actix_web::*;
-use actix_web_actors::ws;
-use lemmy_server::db::establish_connection;
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
+use failure::Error;
+use lemmy_server::apub::fetcher::fetch_all;
+use lemmy_server::db::code_migrations::run_advanced_migrations;
+use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket};
+use lemmy_server::settings::Settings;
 use lemmy_server::websocket::server::*;
-use std::env;
-use std::time::{Duration, Instant};
+use log::warn;
+use std::thread;
+use std::thread::sleep;
+use std::time::Duration;
 
 embed_migrations!();
 
-/// How often heartbeat pings are sent
-const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
-/// How long before lack of client response causes a timeout
-const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
+#[actix_rt::main]
+async fn main() -> Result<(), Error> {
+  env_logger::init();
+  let settings = Settings::get();
 
-/// Entry point for our route
-fn chat_route(
-  req: HttpRequest,
-  stream: web::Payload,
-  chat_server: web::Data<Addr<ChatServer>>,
-  ) -> Result<HttpResponse, Error> {
-  ws::start(
-    WSSession {
-      cs_addr: chat_server.get_ref().to_owned(),
-      id: 0,
-      hb: Instant::now(),
-      ip: req
-        .connection_info()
-        .remote()
-        .unwrap_or("127.0.0.1:12345")
-        .split(":")
-        .next()
-        .unwrap_or("127.0.0.1")
-        .to_string(),
-    },
-    &req,
-    stream,
-    )
-}
-
-struct WSSession {
-  cs_addr: Addr<ChatServer>,
-  /// unique session id
-  id: usize,
-  ip: String,
-  /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
-  /// otherwise we drop connection.
-  hb: Instant,
-}
-
-impl Actor for WSSession {
-  type Context = ws::WebsocketContext<Self>;
-
-  /// Method is called on actor start.
-  /// We register ws session with ChatServer
-  fn started(&mut self, ctx: &mut Self::Context) {
-    // we'll start heartbeat process on session start.
-    self.hb(ctx);
-
-    // register self in chat server. `AsyncContext::wait` register
-    // future within context, but context waits until this future resolves
-    // before processing any other events.
-    // across all routes within application
-    let addr = ctx.address();
-    self.cs_addr
-      .send(Connect {
-        addr: addr.recipient(),
-        ip: self.ip.to_owned(),
-      })
-    .into_actor(self)
-      .then(|res, act, ctx| {
-        match res {
-          Ok(res) => act.id = res,
-          // something is wrong with chat server
-          _ => ctx.stop(),
-        }
-        fut::ok(())
-      })
-    .wait(ctx);
-  }
-
-  fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
-    // notify chat server
-    self.cs_addr.do_send(Disconnect {
-      id: self.id,
-      ip: self.ip.to_owned(),
-    });
-    Running::Stop
-  }
-}
-
-/// Handle messages from chat server, we simply send it to peer websocket
-/// These are room messages, IE sent to others in the room
-impl Handler<WSMessage> for WSSession {
-  type Result = ();
-
-  fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
-    // println!("id: {} msg: {}", self.id, msg.0);
-    ctx.text(msg.0);
-  }
-}
-
-/// WebSocket message handler
-impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
-  fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
-    // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
-    match msg {
-      ws::Message::Ping(msg) => {
-        self.hb = Instant::now();
-        ctx.pong(&msg);
-      }
-      ws::Message::Pong(_) => {
-        self.hb = Instant::now();
-      }
-      ws::Message::Text(text) => {
-        let m = text.trim().to_owned();
-        println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
-
-        self.cs_addr
-          .send(StandardMessage {
-            id: self.id,
-            msg: m,
-          })
-        .into_actor(self)
-          .then(|res, _, ctx| {
-            match res {
-              Ok(res) => ctx.text(res),
-              Err(e) => {
-                eprintln!("{}", &e);
-              }
-            }
-            fut::ok(())
-          })
-        .wait(ctx);
-      }
-      ws::Message::Binary(_bin) => println!("Unexpected binary"),
-      ws::Message::Close(_) => {
-        ctx.stop();
-      }
-      _ => {}
-    }
-  }
-}
-
-impl WSSession {
-  /// helper method that sends ping to client every second.
-  ///
-  /// also this method checks heartbeats from client
-  fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
-    ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
-      // check client heartbeats
-      if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
-        // heartbeat timed out
-        println!("Websocket Client heartbeat failed, disconnecting!");
-
-        // notify chat server
-        act.cs_addr.do_send(Disconnect {
-          id: act.id,
-          ip: act.ip.to_owned(),
-        });
-
-        // stop actor
-        ctx.stop();
-
-        // don't try to send a ping
-        return;
-      }
-
-      ctx.ping("");
-    });
-  }
-}
-
-fn main() {
-  let _ = env_logger::init();
-  let sys = actix::System::new("lemmy");
+  // Set up the r2d2 connection pool
+  let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
+  let pool = Pool::builder()
+    .max_size(settings.database.pool_size)
+    .build(manager)
+    .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
 
   // Run the migrations from code
-  let conn = establish_connection();
+  let conn = pool.get().unwrap();
   embedded_migrations::run(&conn).unwrap();
+  run_advanced_migrations(&conn).unwrap();
+
+  // Set up websocket server
+  let server = ChatServer::startup(pool.clone()).start();
+
+  thread::spawn(move || {
+    // some work here
+    sleep(Duration::from_secs(5));
+    println!("Fetching apub data");
+    match fetch_all(&conn) {
+      Ok(_) => {}
+      Err(e) => warn!("Error during apub fetch: {}", e),
+    }
+  });
 
-  // Start chat server actor in separate thread
-  let server = ChatServer::default().start();
-  // Create Http server with websocket support
-  HttpServer::new(move || {
-    App::new()
-      .data(server.clone())
-      .service(web::resource("/api/v1/ws").to(chat_route))
-      //            .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
-      .service(web::resource("/").to(index))
-      // static resources
-      .service(actix_files::Files::new("/static", front_end_dir()))
-  })
-  .bind("0.0.0.0:8536")
-    .unwrap()
-    .start();
-
-  println!("Started http server: 0.0.0.0:8536");
-  let _ = sys.run();
-}
-
-fn index() -> Result<NamedFile, actix_web::error::Error> {
-  Ok(NamedFile::open(front_end_dir() + "/index.html")?)
-}
+  println!(
+    "Starting http server at {}:{}",
+    settings.bind, settings.port
+  );
 
-fn front_end_dir() -> String {
-  env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
+  // Create Http server with websocket support
+  Ok(
+    HttpServer::new(move || {
+      let settings = Settings::get();
+      App::new()
+        .wrap(middleware::Logger::default())
+        .data(pool.clone())
+        .data(server.clone())
+        // The routes
+        .configure(api::config)
+        .configure(federation::config)
+        .configure(feeds::config)
+        .configure(index::config)
+        .configure(nodeinfo::config)
+        .configure(webfinger::config)
+        .configure(websocket::config)
+        // static files
+        .service(actix_files::Files::new(
+          "/static",
+          settings.front_end_dir.to_owned(),
+        ))
+        .service(actix_files::Files::new(
+          "/docs",
+          settings.front_end_dir.to_owned() + "/documentation",
+        ))
+    })
+    .bind((settings.bind, settings.port))?
+    .run()
+    .await?,
+  )
 }