]> Untitled Git - lemmy.git/commitdiff
Some refactoring of puller.rs
authorFelix Ableitner <me@nutomic.com>
Wed, 8 Apr 2020 12:08:33 +0000 (14:08 +0200)
committerFelix Ableitner <me@nutomic.com>
Wed, 8 Apr 2020 12:08:33 +0000 (14:08 +0200)
server/src/apub/puller.rs

index 01a70d9645e540e05c74a6804c7ec55cd967321e..7e7a86ec02f30333e02dcc16b26e643055258b4d 100644 (file)
@@ -12,6 +12,7 @@ use diesel::result::Error::NotFound;
 use diesel::PgConnection;
 use failure::Error;
 use isahc::prelude::*;
+use log::warn;
 use serde::Deserialize;
 use std::time::Duration;
 
@@ -26,30 +27,29 @@ fn fetch_node_info(domain: &str) -> Result<NodeInfo, Error> {
 }
 
 fn fetch_communities_from_instance(
-  domain: &str,
+  community_list_url: &str,
   conn: &PgConnection,
-) -> Result<Vec<CommunityForm>, Error> {
-  let node_info = fetch_node_info(domain)?;
-
-  if let Some(community_list_url) = node_info.metadata.community_list_url {
-    let collection = fetch_remote_object::<UnorderedCollection>(&community_list_url)?;
-    let object_boxes = collection
-      .collection_props
-      .get_many_items_base_boxes()
-      .unwrap();
-    let communities: Result<Vec<CommunityForm>, Error> = object_boxes
-      .map(|c| {
-        let group = c.to_owned().to_concrete::<GroupExt>()?;
-        CommunityForm::from_group(&group, conn)
-      })
-      .collect();
-    Ok(communities?)
-  } else {
-    Err(format_err!(
-      "{} is not a Lemmy instance, federation is not supported",
-      domain
-    ))
-  }
+) -> Result<Vec<Community>, Error> {
+  fetch_remote_object::<UnorderedCollection>(community_list_url)?
+    .collection_props
+    .get_many_items_base_boxes()
+    .unwrap()
+    .map(|b| -> Result<CommunityForm, Error> {
+      let group = b.to_owned().to_concrete::<GroupExt>()?;
+      Ok(CommunityForm::from_group(&group, conn)?)
+    })
+    .map(
+      |cf: Result<CommunityForm, Error>| -> Result<Community, Error> {
+        let cf2 = cf?;
+        let existing = Community::read_from_actor_id(conn, &cf2.actor_id);
+        match existing {
+          Err(NotFound {}) => Ok(Community::create(conn, &cf2)?),
+          Ok(c) => Ok(Community::update(conn, c.id, &cf2)?),
+          Err(e) => Err(Error::from(e)),
+        }
+      },
+    )
+    .collect()
 }
 
 // TODO: add an optional param last_updated and only fetch if its too old
@@ -60,7 +60,6 @@ where
   if Settings::get().federation.tls_enabled && !uri.starts_with("https://") {
     return Err(format_err!("Activitypub uri is insecure: {}", uri));
   }
-  // TODO: should cache responses here when we are in production
   // TODO: this function should return a future
   let timeout = Duration::from_secs(60);
   let text = Request::get(uri)
@@ -76,23 +75,35 @@ where
 
 fn fetch_remote_community_posts(
   instance: &str,
-  community: &str,
+  community: &Community,
   conn: &PgConnection,
-) -> Result<Vec<PostForm>, Error> {
-  let endpoint = format!("http://{}/federation/c/{}", instance, community);
-  let community = fetch_remote_object::<GroupExt>(&endpoint)?;
-  let outbox_uri = &community.extension.get_outbox().to_string();
+) -> Result<Vec<Post>, Error> {
+  let endpoint = format!("http://{}/federation/c/{}", instance, community.name);
+  let group = fetch_remote_object::<GroupExt>(&endpoint)?;
+  let outbox_uri = &group.extension.get_outbox().to_string();
+  // TODO: outbox url etc should be stored in local db
   let outbox = fetch_remote_object::<OrderedCollection>(outbox_uri)?;
   let items = outbox.collection_props.get_many_items_base_boxes();
 
-  let posts = items
-    .unwrap()
-    .map(|obox: &BaseBox| {
-      let page = obox.clone().to_concrete::<Page>().unwrap();
-      PostForm::from_page(&page, conn)
-    })
-    .collect::<Result<Vec<PostForm>, Error>>()?;
-  Ok(posts)
+  Ok(
+    items
+      .unwrap()
+      .map(|obox: &BaseBox| -> Result<PostForm, Error> {
+        let page = obox.clone().to_concrete::<Page>()?;
+        PostForm::from_page(&page, conn)
+      })
+      .map(|pf: Result<PostForm, Error>| -> Result<Post, Error> {
+        let mut pf2 = pf?;
+        pf2.community_id = community.id;
+        let existing = Post::read_from_apub_id(conn, &pf2.ap_id);
+        match existing {
+          Err(NotFound {}) => Ok(Post::create(conn, &pf2)?),
+          Ok(p) => Ok(Post::update(conn, p.id, &pf2)?),
+          Err(e) => Err(Error::from(e)),
+        }
+      })
+      .collect::<Result<Vec<Post>, Error>>()?,
+  )
 }
 
 pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Error> {
@@ -110,25 +121,17 @@ pub fn fetch_remote_user(apub_id: &str, conn: &PgConnection) -> Result<User_, Er
 //       after that, we should rely in the inbox, and fetch on demand when needed
 pub fn fetch_all(conn: &PgConnection) -> Result<(), Error> {
   for instance in &get_following_instances() {
-    let communities = fetch_communities_from_instance(instance, conn)?;
-
-    for community in &communities {
-      let existing = Community::read_from_actor_id(conn, &community.actor_id);
-      let community_id = match existing {
-        Err(NotFound {}) => Community::create(conn, community)?.id,
-        Ok(c) => Community::update(conn, c.id, community)?.id,
-        Err(e) => return Err(Error::from(e)),
-      };
-      let mut posts = fetch_remote_community_posts(instance, &community.name, conn)?;
-      for post_ in &mut posts {
-        post_.community_id = community_id;
-        let existing = Post::read_from_apub_id(conn, &post_.ap_id);
-        match existing {
-          Err(NotFound {}) => Post::create(conn, post_)?,
-          Ok(p) => Post::update(conn, p.id, post_)?,
-          Err(e) => return Err(Error::from(e)),
-        };
+    let node_info = fetch_node_info(instance)?;
+    if let Some(community_list_url) = node_info.metadata.community_list_url {
+      let communities = fetch_communities_from_instance(&community_list_url, conn)?;
+      for c in communities {
+        fetch_remote_community_posts(instance, &c, conn)?;
       }
+    } else {
+      warn!(
+        "{} is not a Lemmy instance, federation is not supported",
+        instance
+      );
     }
   }
   Ok(())