1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::error::OkuDiscoveryError;
use futures::StreamExt;
use iroh::{
    bytes::{Hash, HashAndFormat},
    sync::NamespaceId,
    ticket::{BlobTicket, DocTicket},
};
use iroh_mainline_content_discovery::announce_dht;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{collections::BTreeSet, error::Error, str::FromStr, time::Duration};

/// The delay between republishing content to the mainline DHT.
pub const REPUBLISH_DELAY: Duration = Duration::from_secs(60 * 60);

/// The initial delay before publishing content to the mainline DHT.
pub const INITIAL_PUBLISH_DELAY: Duration = Duration::from_millis(500);

/// The port used for communication between other Oku filesystem nodes.
pub const DISCOVERY_PORT: u16 = 4938;

/// The number of parallel announcements to make to the mainline DHT.
pub const ANNOUNCE_PARALLELISM: usize = 10;

/// Announces a local replica to the mainline DHT.
///
/// # Arguments
///
/// * `namespace_id` - The ID of the replica to announce.
pub async fn announce_replica(
    namespace_id: NamespaceId,
) -> Result<(), Box<dyn Error + Send + Sync>> {
    let mut content = BTreeSet::new();
    content.insert(HashAndFormat::raw(Hash::new(namespace_id)));
    let dht = mainline::Dht::default();
    let announce_stream = announce_dht(dht, content, DISCOVERY_PORT, ANNOUNCE_PARALLELISM);
    tokio::pin!(announce_stream);
    while let Some((content, res)) = announce_stream.next().await {
        match res {
            Ok(_) => {}
            Err(e) => eprintln!(
                "{}",
                OkuDiscoveryError::ProblemAnnouncingContent(content.to_string(), e.to_string())
            ),
        }
    }
    Ok(())
}

/*
The `ContentRequest` enum is derived from the `ContentArg` enum in the `iroh-examples` repository (https://github.com/n0-computer/iroh-examples/blob/6f184933efa72eec1d8cf2e8d07905650c0fdb46/content-discovery/iroh-mainline-content-discovery-cli/src/args.rs#L23).
*/
#[derive(Debug, Clone, derive_more::From)]
/// A request for content, which can be a raw hash, a hash and format pair, or a blob ticket.
pub enum ContentRequest {
    /// A raw hash.
    Hash(Hash),
    /// A hash and format pair.
    HashAndFormat(HashAndFormat),
    /// A blob ticket.
    Ticket(BlobTicket),
}

impl ContentRequest {
    /// Get the hash and format pair for this content request.
    pub fn hash_and_format(&self) -> HashAndFormat {
        match self {
            ContentRequest::Hash(hash) => HashAndFormat::raw(*hash),
            ContentRequest::HashAndFormat(haf) => *haf,
            ContentRequest::Ticket(ticket) => HashAndFormat {
                hash: ticket.hash(),
                format: ticket.format(),
            },
        }
    }
    /// Get the hash for this content request.
    pub fn hash(&self) -> Hash {
        match self {
            ContentRequest::Hash(hash) => *hash,
            ContentRequest::HashAndFormat(haf) => haf.hash,
            ContentRequest::Ticket(ticket) => ticket.hash(),
        }
    }
}

impl FromStr for ContentRequest {
    type Err = Box<dyn Error + Send + Sync>;
    fn from_str(s: &str) -> Result<Self, Box<dyn Error + Send + Sync>> {
        if let Ok(hash) = Hash::from_str(s) {
            Ok(hash.into())
        } else if let Ok(haf) = HashAndFormat::from_str(s) {
            Ok(haf.into())
        } else if let Ok(ticket) = BlobTicket::from_str(s) {
            Ok(ticket.into())
        } else {
            Err(OkuDiscoveryError::InvalidHashAndFormat.into())
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
/// A content ticket sent in response to a peer requesting content.
pub enum PeerTicketResponse {
    /// A ticket pointing to a replica.
    Document(DocTicket),
    /// A list of tickets pointing to files.
    Entries(Vec<BlobTicket>),
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Hash)]
/// A request for content from a peer.
pub struct PeerContentRequest {
    /// The ID of a requested replica.
    pub namespace_id: NamespaceId,
    /// An optional path of requested files within the replica.
    pub path: Option<PathBuf>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
/// A response to a peer requesting content.
pub struct PeerContentResponse {
    /// A ticket satisfying the content request.
    pub ticket_response: PeerTicketResponse,
    /// The size, in bytes, of the requested content.
    pub content_size: u64,
}