]> Untitled Git - lemmy.git/blob - server/src/apub/fetcher.rs
Merge branch 'dev' into federation
[lemmy.git] / server / src / apub / fetcher.rs
1 use crate::apub::*;
2 use crate::db::community::{Community, CommunityForm};
3 use crate::db::post::{Post, PostForm};
4 use crate::db::user::{UserForm, User_};
5 use crate::db::Crud;
6 use crate::routes::nodeinfo::{NodeInfo, NodeInfoWellKnown};
7 use crate::settings::Settings;
8 use activitystreams::collection::{OrderedCollection, UnorderedCollection};
9 use activitystreams::object::Page;
10 use activitystreams::BaseBox;
11 use diesel::result::Error::NotFound;
12 use diesel::PgConnection;
13 use failure::Error;
14 use isahc::prelude::*;
15 use log::warn;
16 use serde::Deserialize;
17 use std::time::Duration;
18 use url::Url;
19
20 fn fetch_node_info(instance: &Instance) -> Result<NodeInfo, Error> {
21   let well_known_uri = Url::parse(&format!(
22     "{}://{}/.well-known/nodeinfo",
23     get_apub_protocol_string(),
24     instance.domain
25   ))?;
26   let well_known = fetch_remote_object::<NodeInfoWellKnown>(&well_known_uri)?;
27   Ok(fetch_remote_object::<NodeInfo>(&well_known.links.href)?)
28 }
29
30 fn fetch_communities_from_instance(
31   community_list: &Url,
32   conn: &PgConnection,
33 ) -> Result<Vec<Community>, Error> {
34   fetch_remote_object::<UnorderedCollection>(community_list)?
35     .collection_props
36     .get_many_items_base_boxes()
37     .unwrap()
38     .map(|b| -> Result<CommunityForm, Error> {
39       let group = b.to_owned().to_concrete::<GroupExt>()?;
40       Ok(CommunityForm::from_group(&group, conn)?)
41     })
42     .map(
43       |cf: Result<CommunityForm, Error>| -> Result<Community, Error> {
44         let cf2 = cf?;
45         let existing = Community::read_from_actor_id(conn, &cf2.actor_id);
46         match existing {
47           Err(NotFound {}) => Ok(Community::create(conn, &cf2)?),
48           Ok(c) => Ok(Community::update(conn, c.id, &cf2)?),
49           Err(e) => Err(Error::from(e)),
50         }
51       },
52     )
53     .collect()
54 }
55
56 // TODO: add an optional param last_updated and only fetch if its too old
57 pub fn fetch_remote_object<Response>(url: &Url) -> Result<Response, Error>
58 where
59   Response: for<'de> Deserialize<'de>,
60 {
61   if Settings::get().federation.tls_enabled && url.scheme() != "https" {
62     return Err(format_err!("Activitypub uri is insecure: {}", url));
63   }
64   // TODO: this function should return a future
65   let timeout = Duration::from_secs(60);
66   let text = Request::get(url.as_str())
67     .header("Accept", APUB_JSON_CONTENT_TYPE)
68     .connect_timeout(timeout)
69     .timeout(timeout)
70     .body(())?
71     .send()?
72     .text()?;
73   let res: Response = serde_json::from_str(&text)?;
74   Ok(res)
75 }
76
77 fn fetch_remote_community_posts(
78   community: &Community,
79   conn: &PgConnection,
80 ) -> Result<Vec<Post>, Error> {
81   let outbox_url = Url::parse(&community.get_outbox_url())?;
82   let outbox = fetch_remote_object::<OrderedCollection>(&outbox_url)?;
83   let items = outbox.collection_props.get_many_items_base_boxes();
84
85   Ok(
86     items
87       .unwrap()
88       .map(|obox: &BaseBox| -> Result<PostForm, Error> {
89         let page = obox.clone().to_concrete::<Page>()?;
90         PostForm::from_page(&page, conn)
91       })
92       .map(|pf: Result<PostForm, Error>| -> Result<Post, Error> {
93         let pf2 = pf?;
94         let existing = Post::read_from_apub_id(conn, &pf2.ap_id);
95         match existing {
96           Err(NotFound {}) => Ok(Post::create(conn, &pf2)?),
97           Ok(p) => Ok(Post::update(conn, p.id, &pf2)?),
98           Err(e) => Err(Error::from(e)),
99         }
100       })
101       .collect::<Result<Vec<Post>, Error>>()?,
102   )
103 }
104
105 // TODO: can probably merge these two methods?
106 pub fn fetch_remote_user(apub_id: &Url, conn: &PgConnection) -> Result<User_, Error> {
107   let person = fetch_remote_object::<PersonExt>(apub_id)?;
108   let uf = UserForm::from_person(&person)?;
109   let existing = User_::read_from_apub_id(conn, &uf.actor_id);
110   Ok(match existing {
111     Err(NotFound {}) => User_::create(conn, &uf)?,
112     Ok(u) => User_::update(conn, u.id, &uf)?,
113     Err(e) => return Err(Error::from(e)),
114   })
115 }
116 pub fn fetch_remote_community(apub_id: &Url, conn: &PgConnection) -> Result<Community, Error> {
117   let group = fetch_remote_object::<GroupExt>(apub_id)?;
118   let cf = CommunityForm::from_group(&group, conn)?;
119   let existing = Community::read_from_actor_id(conn, &cf.actor_id);
120   Ok(match existing {
121     Err(NotFound {}) => Community::create(conn, &cf)?,
122     Ok(u) => Community::update(conn, u.id, &cf)?,
123     Err(e) => return Err(Error::from(e)),
124   })
125 }
126
127 // TODO: in the future, this should only be done when an instance is followed for the first time
128 //       after that, we should rely in the inbox, and fetch on demand when needed
129 pub fn fetch_all(conn: &PgConnection) -> Result<(), Error> {
130   for instance in &get_following_instances() {
131     let node_info = fetch_node_info(instance)?;
132     if let Some(community_list) = node_info.metadata.community_list_url {
133       let communities = fetch_communities_from_instance(&community_list, conn)?;
134       for c in communities {
135         fetch_remote_community_posts(&c, conn)?;
136       }
137     } else {
138       warn!(
139         "{} is not a Lemmy instance, federation is not supported",
140         instance.domain
141       );
142     }
143   }
144   Ok(())
145 }