use crate::discovery::{announce_replica, INITIAL_PUBLISH_DELAY, REPUBLISH_DELAY};
use crate::discovery::{
PeerContentRequest, PeerContentResponse, PeerTicketResponse, DISCOVERY_PORT,
};
use crate::{discovery::ContentRequest, error::OkuFsError};
use bytes::Bytes;
use futures::{pin_mut, StreamExt};
use iroh::client::Entry;
use iroh::rpc_protocol::BlobDownloadRequest;
use iroh::ticket::BlobTicket;
use iroh::{
bytes::Hash,
net::discovery::{ConcurrentDiscovery, Discovery},
node::FsNode,
rpc_protocol::ShareMode,
sync::{Author, AuthorId, NamespaceId},
};
use iroh_mainline_content_discovery::protocol::{Query, QueryFlags};
use iroh_mainline_content_discovery::to_infohash;
use iroh_pkarr_node_discovery::PkarrNodeDiscovery;
use path_clean::PathClean;
use rand_core::OsRng;
use serde::{Deserialize, Serialize};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::{error::Error, path::PathBuf};
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::net::TcpStream;
pub const FS_PATH: &str = ".oku";
pub const ALPN_DOCUMENT_TICKET_FETCH: &[u8] = b"oku/document-ticket/fetch/v0";
pub const ALPN_INITIAL_RELAY_CONNECTION: &[u8] = b"oku/relay/connect/v0";
pub const ALPN_RELAY_FETCH: &[u8] = b"oku/relay/fetch/v0";
fn normalise_path(path: PathBuf) -> PathBuf {
PathBuf::from("/").join(path).clean()
}
pub fn path_to_entry_key(path: PathBuf) -> Bytes {
let path = normalise_path(path.clone());
let mut path_bytes = path.into_os_string().into_encoded_bytes();
path_bytes.push(b'\0');
path_bytes.into()
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct OkuFsConfig {
pub relay_address: Option<String>,
}
#[derive(Clone, Debug)]
pub struct OkuFs {
node: FsNode,
author_id: AuthorId,
config: OkuFsConfig,
}
impl OkuFs {
pub async fn start() -> Result<OkuFs, Box<dyn Error + Send + Sync>> {
let node_path = PathBuf::from(FS_PATH).join("node");
let node = FsNode::persistent(node_path).await?.spawn().await?;
let authors = node.authors.list().await?;
futures::pin_mut!(authors);
let authors_count = authors.as_mut().count().await.to_owned();
let author_id = if authors_count == 0 {
node.authors.create().await?
} else {
let authors = node.authors.list().await?;
futures::pin_mut!(authors);
let authors_list: Vec<AuthorId> = authors.map(|author| author.unwrap()).collect().await;
authors_list[0]
};
let config = load_or_create_config()?;
let oku_fs = OkuFs {
node,
author_id,
config,
};
let oku_fs_clone = oku_fs.clone();
let node_addr = oku_fs.node.my_addr().await?;
let addr_info = node_addr.info;
let magic_endpoint = oku_fs.node.magic_endpoint();
let secret_key = magic_endpoint.secret_key();
let mut discovery_service = ConcurrentDiscovery::new();
let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build();
discovery_service.add(pkarr);
discovery_service.publish(&addr_info);
let docs_client = &oku_fs.node.docs;
let docs_client = docs_client.clone();
if let Some(relay_address) = oku_fs_clone.config.relay_address {
let oku_fs_clone = oku_fs.clone();
tokio::spawn(async move {
oku_fs_clone
.connect_to_relay(relay_address.to_string())
.await
.unwrap();
});
}
let oku_fs_clone = oku_fs.clone();
tokio::spawn(async move {
oku_fs_clone
.listen_for_document_ticket_fetch_requests()
.await
.unwrap()
});
tokio::spawn(async move {
loop {
tokio::time::sleep(INITIAL_PUBLISH_DELAY).await;
let replicas = docs_client.list().await.unwrap();
pin_mut!(replicas);
while let Some(replica) = replicas.next().await {
let (namespace_id, _) = replica.unwrap();
announce_replica(namespace_id).await.unwrap();
}
tokio::time::sleep(REPUBLISH_DELAY - INITIAL_PUBLISH_DELAY).await;
}
});
Ok(oku_fs)
}
pub async fn create_discovery_service(
&self,
) -> Result<ConcurrentDiscovery, Box<dyn Error + Send + Sync>> {
let node_addr = self.node.my_addr().await?;
let addr_info = node_addr.info;
let magic_endpoint = self.node.magic_endpoint();
let secret_key = magic_endpoint.secret_key();
let mut discovery_service = ConcurrentDiscovery::new();
let pkarr = PkarrNodeDiscovery::builder().secret_key(secret_key).build();
discovery_service.add(pkarr);
discovery_service.publish(&addr_info);
Ok(discovery_service)
}
pub fn shutdown(self) {
self.node.shutdown();
}
pub async fn create_replica(&self) -> Result<NamespaceId, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let new_document = docs_client.create().await?;
let document_id = new_document.id();
new_document.close().await?;
Ok(document_id)
}
pub async fn delete_replica(
&self,
namespace_id: NamespaceId,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
Ok(docs_client.drop_doc(namespace_id).await?)
}
pub async fn list_replicas(&self) -> Result<Vec<NamespaceId>, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let replicas = docs_client.list().await?;
pin_mut!(replicas);
let replica_ids: Vec<NamespaceId> =
replicas.map(|replica| replica.unwrap().0).collect().await;
Ok(replica_ids)
}
pub async fn list_files(
&self,
namespace_id: NamespaceId,
) -> Result<Vec<Entry>, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace_id)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let query = iroh::sync::store::Query::single_latest_per_key().build();
let entries = document.get_many(query).await?;
pin_mut!(entries);
let files: Vec<Entry> = entries.map(|entry| entry.unwrap()).collect().await;
Ok(files)
}
pub async fn create_or_modify_file(
&self,
namespace_id: NamespaceId,
path: PathBuf,
data: impl Into<Bytes>,
) -> Result<Hash, Box<dyn Error + Send + Sync>> {
let file_key = path_to_entry_key(path);
let data_bytes = data.into();
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace_id)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let entry_hash = document
.set_bytes(self.author_id, file_key, data_bytes)
.await?;
Ok(entry_hash)
}
pub async fn delete_file(
&self,
namespace_id: NamespaceId,
path: PathBuf,
) -> Result<usize, Box<dyn Error + Send + Sync>> {
let file_key = path_to_entry_key(path);
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace_id)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let entries_deleted = document.del(self.author_id, file_key).await?;
Ok(entries_deleted)
}
pub async fn read_file(
&self,
namespace_id: NamespaceId,
path: PathBuf,
) -> Result<Bytes, Box<dyn Error + Send + Sync>> {
let file_key = path_to_entry_key(path);
let docs_client = &self.node.docs;
let document = docs_client
.open(namespace_id)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let entry = document
.get_exact(self.author_id, file_key, false)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
Ok(entry.content_bytes(self.node.client()).await?)
}
pub async fn move_file(
&self,
namespace_id: NamespaceId,
from: PathBuf,
to: PathBuf,
) -> Result<(Hash, usize), Box<dyn Error + Send + Sync>> {
let data = self.read_file(namespace_id, from.clone()).await?;
let hash = self
.create_or_modify_file(namespace_id, to.clone(), data)
.await?;
let entries_deleted = self.delete_file(namespace_id, from).await?;
Ok((hash, entries_deleted))
}
pub async fn delete_directory(
&self,
namespace_id: NamespaceId,
path: PathBuf,
) -> Result<usize, Box<dyn Error + Send + Sync>> {
let path = normalise_path(path).join(""); let docs_client = &self.node.docs;
let document = docs_client
.open(namespace_id)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
let entries_deleted = document
.del(self.author_id, format!("{}", path.display()))
.await?;
Ok(entries_deleted)
}
pub async fn respond_to_content_request(
&self,
request: PeerContentRequest,
) -> Result<PeerContentResponse, Box<dyn Error + Send + Sync>> {
let docs_client = &self.node.docs;
let document = docs_client
.open(request.namespace_id)
.await?
.ok_or(OkuFsError::FsEntryNotFound)?;
match request.path {
None => {
let document_ticket = document.share(ShareMode::Read).await?;
let query = iroh::sync::store::Query::single_latest_per_key().build();
let entries = document.get_many(query).await?;
pin_mut!(entries);
let file_sizes: Vec<u64> = entries
.map(|entry| entry.unwrap().content_len())
.collect()
.await;
let content_length = file_sizes.iter().sum();
Ok(PeerContentResponse {
ticket_response: PeerTicketResponse::Document(document_ticket),
content_size: content_length,
})
}
Some(blob_path) => {
let blobs_client = &self.node.blobs;
let entry_prefix = path_to_entry_key(blob_path);
let query = iroh::sync::store::Query::single_latest_per_key()
.key_prefix(entry_prefix)
.build();
let entries = document.get_many(query).await?;
pin_mut!(entries);
let entry_hashes_and_sizes: Vec<(Hash, u64)> = entries
.map(|entry| {
(
entry.as_ref().unwrap().content_hash(),
entry.unwrap().content_len(),
)
})
.collect()
.await;
let entry_tickets: Vec<BlobTicket> =
futures::future::try_join_all(entry_hashes_and_sizes.iter().map(|entry| {
blobs_client.share(
entry.0,
iroh::bytes::BlobFormat::Raw,
iroh::client::ShareTicketOptions::RelayAndAddresses,
)
}))
.await?;
let content_length = entry_hashes_and_sizes
.iter()
.map(|entry| entry.1)
.collect::<Vec<u64>>()
.iter()
.sum();
Ok(PeerContentResponse {
ticket_response: PeerTicketResponse::Entries(entry_tickets),
content_size: content_length,
})
}
}
}
pub async fn listen_for_document_ticket_fetch_requests(
&self,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let socket = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DISCOVERY_PORT);
let listener = TcpListener::bind(socket).await?;
loop {
let (mut stream, _) = listener.accept().await?;
let self_clone = self.clone();
tokio::spawn(async move {
let mut buf_reader = BufReader::new(&mut stream);
let received: Vec<u8> = buf_reader.fill_buf().await?.to_vec();
buf_reader.consume(received.len());
let mut incoming_lines = received.split(|x| *x == 10);
if let Some(first_line) = incoming_lines.next() {
if first_line == ALPN_DOCUMENT_TICKET_FETCH {
let remaining_lines: Vec<Vec<u8>> =
incoming_lines.map(|x| x.to_owned()).collect();
let peer_content_request_bytes = remaining_lines.concat();
let peer_content_request_str =
String::from_utf8_lossy(&peer_content_request_bytes).to_string();
let peer_content_request = serde_json::from_str(&peer_content_request_str)?;
let peer_content_response = self_clone
.respond_to_content_request(peer_content_request)
.await?;
let peer_content_response_string =
serde_json::to_string(&peer_content_response)?;
stream
.write_all(peer_content_response_string.as_bytes())
.await?;
stream.flush().await?;
}
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
});
}
}
pub async fn get_external_replica(
&self,
namespace_id: NamespaceId,
path: Option<PathBuf>,
partial: bool,
verified: bool,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let content = ContentRequest::Hash(Hash::new(namespace_id));
let dht = mainline::Dht::default();
let q = Query {
content: content.hash_and_format(),
flags: QueryFlags {
complete: !partial,
verified,
},
};
let info_hash = to_infohash(q.content);
let peer_content_request = PeerContentRequest { namespace_id, path };
let peer_content_request_string = serde_json::to_string(&peer_content_request)?;
let docs_client = &self.node.docs;
let mut addrs = dht.get_peers(info_hash);
for peer_response in &mut addrs {
if docs_client.open(namespace_id).await.is_ok() {
break;
}
let peer_content_request_string = peer_content_request_string.clone();
let docs_client = docs_client.clone();
let self_clone = self.clone();
tokio::spawn(async move {
let mut stream = TcpStream::connect(peer_response.peer).await?;
let mut request = Vec::new();
request.write_all(ALPN_DOCUMENT_TICKET_FETCH).await?;
request.write_all(b"\n").await?;
request
.write_all(peer_content_request_string.as_bytes())
.await?;
request.flush().await?;
stream.write_all(&request).await?;
stream.flush().await?;
let mut response_bytes = Vec::new();
stream.read_to_end(&mut response_bytes).await?;
let response: PeerContentResponse =
serde_json::from_str(String::from_utf8_lossy(&response_bytes).as_ref())?;
match response.ticket_response {
PeerTicketResponse::Document(document_ticket) => {
if document_ticket.capability.id() != namespace_id {
return Ok::<(), Box<dyn Error + Send + Sync>>(());
}
docs_client.import(document_ticket).await?;
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
PeerTicketResponse::Entries(entry_tickets) => {
let blobs_client = &self_clone.node.blobs;
for blob_ticket in entry_tickets {
let ticket_parts = blob_ticket.into_parts();
let blob_download_request = BlobDownloadRequest {
hash: ticket_parts.1,
format: ticket_parts.2,
peer: ticket_parts.0,
tag: iroh::rpc_protocol::SetTagOption::Auto,
};
blobs_client.download(blob_download_request).await?;
break;
}
Ok::<(), Box<dyn Error + Send + Sync>>(())
}
}
});
}
Ok(())
}
pub async fn connect_to_relay(
&self,
relay_address: String,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let relay_addr = relay_address.parse::<SocketAddr>()?;
let mut stream = TcpStream::connect(relay_addr).await?;
let all_replicas = self.list_replicas().await?;
let all_replicas_str = serde_json::to_string(&all_replicas)?;
let mut request = Vec::new();
request.write_all(ALPN_INITIAL_RELAY_CONNECTION).await?;
request.write_all(b"\n").await?;
request.write_all(all_replicas_str.as_bytes()).await?;
request.flush().await?;
stream.write_all(&request).await?;
stream.flush().await?;
loop {
let mut response_bytes = Vec::new();
stream.read_to_end(&mut response_bytes).await?;
if response_bytes == ALPN_RELAY_FETCH {
let all_replicas = self.list_replicas().await?;
let all_replicas_str = serde_json::to_string(&all_replicas)?;
stream.write_all(all_replicas_str.as_bytes()).await?;
stream.flush().await?;
}
}
Ok(())
}
}
pub fn load_or_create_author() -> Result<Author, Box<dyn Error + Send + Sync>> {
let path = PathBuf::from(FS_PATH).join("author");
let author_file = std::fs::read(path.clone());
match author_file {
Ok(bytes) => Ok(Author::from_bytes(&bytes[..32].try_into()?)),
Err(_) => {
let mut rng = OsRng;
let author = Author::new(&mut rng);
let author_bytes = author.to_bytes();
std::fs::write(path, author_bytes)?;
Ok(author)
}
}
}
pub fn load_or_create_config() -> Result<OkuFsConfig, Box<dyn Error + Send + Sync>> {
let path = PathBuf::from(FS_PATH).join("config");
let config_file_contents = std::fs::read_to_string(path.clone());
match config_file_contents {
Ok(config_file_toml) => Ok(toml::from_str(&config_file_toml)?),
Err(_) => {
let config = OkuFsConfig {
relay_address: None,
};
let config_toml = toml::to_string(&config)?;
std::fs::write(path, config_toml)?;
Ok(config)
}
}
}