extern crate diesel_migrations;
use actix::prelude::*;
-use actix_files::NamedFile;
use actix_web::*;
-use actix_web_actors::ws;
-use lemmy_server::db::establish_connection;
-use lemmy_server::nodeinfo;
-use lemmy_server::feeds;
+use diesel::r2d2::{ConnectionManager, Pool};
+use diesel::PgConnection;
+use failure::Error;
+use lemmy_server::apub::fetcher::fetch_all;
+use lemmy_server::db::code_migrations::run_advanced_migrations;
+use lemmy_server::routes::{api, federation, feeds, index, nodeinfo, webfinger, websocket};
+use lemmy_server::settings::Settings;
use lemmy_server::websocket::server::*;
-use std::env;
-use std::time::{Duration, Instant};
+use log::warn;
+use std::thread;
+use std::thread::sleep;
+use std::time::Duration;
embed_migrations!();
-/// How often heartbeat pings are sent
-const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
-/// How long before lack of client response causes a timeout
-const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
+#[actix_rt::main]
+async fn main() -> Result<(), Error> {
+ env_logger::init();
+ let settings = Settings::get();
-/// Entry point for our route
-fn chat_route(
- req: HttpRequest,
- stream: web::Payload,
- chat_server: web::Data<Addr<ChatServer>>,
-) -> Result<HttpResponse, Error> {
- ws::start(
- WSSession {
- cs_addr: chat_server.get_ref().to_owned(),
- id: 0,
- hb: Instant::now(),
- ip: req
- .connection_info()
- .remote()
- .unwrap_or("127.0.0.1:12345")
- .split(":")
- .next()
- .unwrap_or("127.0.0.1")
- .to_string(),
- },
- &req,
- stream,
- )
-}
-
-struct WSSession {
- cs_addr: Addr<ChatServer>,
- /// unique session id
- id: usize,
- ip: String,
- /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT),
- /// otherwise we drop connection.
- hb: Instant,
-}
-
-impl Actor for WSSession {
- type Context = ws::WebsocketContext<Self>;
-
- /// Method is called on actor start.
- /// We register ws session with ChatServer
- fn started(&mut self, ctx: &mut Self::Context) {
- // we'll start heartbeat process on session start.
- self.hb(ctx);
-
- // register self in chat server. `AsyncContext::wait` register
- // future within context, but context waits until this future resolves
- // before processing any other events.
- // across all routes within application
- let addr = ctx.address();
- self
- .cs_addr
- .send(Connect {
- addr: addr.recipient(),
- ip: self.ip.to_owned(),
- })
- .into_actor(self)
- .then(|res, act, ctx| {
- match res {
- Ok(res) => act.id = res,
- // something is wrong with chat server
- _ => ctx.stop(),
- }
- fut::ok(())
- })
- .wait(ctx);
- }
-
- fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
- // notify chat server
- self.cs_addr.do_send(Disconnect {
- id: self.id,
- ip: self.ip.to_owned(),
- });
- Running::Stop
- }
-}
-
-/// Handle messages from chat server, we simply send it to peer websocket
-/// These are room messages, IE sent to others in the room
-impl Handler<WSMessage> for WSSession {
- type Result = ();
-
- fn handle(&mut self, msg: WSMessage, ctx: &mut Self::Context) {
- // println!("id: {} msg: {}", self.id, msg.0);
- ctx.text(msg.0);
- }
-}
-
-/// WebSocket message handler
-impl StreamHandler<ws::Message, ws::ProtocolError> for WSSession {
- fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
- // println!("WEBSOCKET MESSAGE: {:?} from id: {}", msg, self.id);
- match msg {
- ws::Message::Ping(msg) => {
- self.hb = Instant::now();
- ctx.pong(&msg);
- }
- ws::Message::Pong(_) => {
- self.hb = Instant::now();
- }
- ws::Message::Text(text) => {
- let m = text.trim().to_owned();
- println!("WEBSOCKET MESSAGE: {:?} from id: {}", &m, self.id);
-
- self
- .cs_addr
- .send(StandardMessage {
- id: self.id,
- msg: m,
- })
- .into_actor(self)
- .then(|res, _, ctx| {
- match res {
- Ok(res) => ctx.text(res),
- Err(e) => {
- eprintln!("{}", &e);
- }
- }
- fut::ok(())
- })
- .wait(ctx);
- }
- ws::Message::Binary(_bin) => println!("Unexpected binary"),
- ws::Message::Close(_) => {
- ctx.stop();
- }
- _ => {}
- }
- }
-}
-
-impl WSSession {
- /// helper method that sends ping to client every second.
- ///
- /// also this method checks heartbeats from client
- fn hb(&self, ctx: &mut ws::WebsocketContext<Self>) {
- ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
- // check client heartbeats
- if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
- // heartbeat timed out
- println!("Websocket Client heartbeat failed, disconnecting!");
-
- // notify chat server
- act.cs_addr.do_send(Disconnect {
- id: act.id,
- ip: act.ip.to_owned(),
- });
-
- // stop actor
- ctx.stop();
-
- // don't try to send a ping
- return;
- }
-
- ctx.ping("");
- });
- }
-}
-
-fn main() {
- let _ = env_logger::init();
- let sys = actix::System::new("lemmy");
+ // Set up the r2d2 connection pool
+ let manager = ConnectionManager::<PgConnection>::new(&settings.get_database_url());
+ let pool = Pool::builder()
+ .max_size(settings.database.pool_size)
+ .build(manager)
+ .unwrap_or_else(|_| panic!("Error connecting to {}", settings.get_database_url()));
// Run the migrations from code
- let conn = establish_connection();
+ let conn = pool.get().unwrap();
embedded_migrations::run(&conn).unwrap();
+ run_advanced_migrations(&conn).unwrap();
+
+ // Set up websocket server
+ let server = ChatServer::startup(pool.clone()).start();
+
+ thread::spawn(move || {
+ // some work here
+ sleep(Duration::from_secs(5));
+ println!("Fetching apub data");
+ match fetch_all(&conn) {
+ Ok(_) => {}
+ Err(e) => warn!("Error during apub fetch: {}", e),
+ }
+ });
- // Start chat server actor in separate thread
- let server = ChatServer::default().start();
- // Create Http server with websocket support
-
- HttpServer::new(move || {
- App::new()
- .data(server.clone())
- .service(web::resource("/api/v1/ws").to(chat_route))
- // .service(web::resource("/api/v1/rest").route(web::post().to(||{})))
- .service(web::resource("/").to(index))
- .route("/nodeinfo/2.0.json", web::get().to(nodeinfo::node_info))
- .route(
- "/.well-known/nodeinfo",
- web::get().to(nodeinfo::node_info_well_known),
- )
- .route("/feeds/{type}/{name}.xml", web::get().to(feeds::get_feed))
- // TODO: probably need a different function for this (or just handle all of /feeds?
- // TODO: would be nice to use ListingType, but that doesnt include user
- .route("/feeds/all.xml", web::get().to(feeds::get_all_feed))
- // static resources
- .service(actix_files::Files::new("/static", front_end_dir()))
- })
- .bind("0.0.0.0:8536")
- .unwrap()
- .start();
-
- println!("Started http server: 0.0.0.0:8536");
- let _ = sys.run();
-}
-
-fn index() -> Result<NamedFile, actix_web::error::Error> {
- Ok(NamedFile::open(front_end_dir() + "/index.html")?)
-}
+ println!(
+ "Starting http server at {}:{}",
+ settings.bind, settings.port
+ );
-fn front_end_dir() -> String {
- env::var("LEMMY_FRONT_END_DIR").unwrap_or("../ui/dist".to_string())
+ // Create Http server with websocket support
+ Ok(
+ HttpServer::new(move || {
+ let settings = Settings::get();
+ App::new()
+ .wrap(middleware::Logger::default())
+ .data(pool.clone())
+ .data(server.clone())
+ // The routes
+ .configure(api::config)
+ .configure(federation::config)
+ .configure(feeds::config)
+ .configure(index::config)
+ .configure(nodeinfo::config)
+ .configure(webfinger::config)
+ .configure(websocket::config)
+ // static files
+ .service(actix_files::Files::new(
+ "/static",
+ settings.front_end_dir.to_owned(),
+ ))
+ .service(actix_files::Files::new(
+ "/docs",
+ settings.front_end_dir.to_owned() + "/documentation",
+ ))
+ })
+ .bind((settings.bind, settings.port))?
+ .run()
+ .await?,
+ )
}