Skip to content

Commit

Permalink
Merge pull request #48 from earthstar-project/store-trait-query
Browse files Browse the repository at this point in the history
Add query methods to Store trait
  • Loading branch information
sgwilym authored Aug 27, 2024
2 parents cc2a67e + 74e6202 commit ade83c7
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 11 deletions.
83 changes: 82 additions & 1 deletion data-model/src/lengthy_entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Entry, NamespaceId, PayloadDigest, SubspaceId};
use crate::{AuthorisationToken, AuthorisedEntry, Entry, NamespaceId, PayloadDigest, SubspaceId};

/// An [`Entry`] together with information about how much of its payload a given [`Store`] holds.
///
Expand Down Expand Up @@ -36,4 +36,85 @@ where
pub fn available(&self) -> u64 {
self.available
}

/// Turn this into a regular [`Entry`].
pub fn into_entry(self) -> Entry<MCL, MCC, MPL, N, S, PD> {
self.entry
}
}

impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD>
AsRef<Entry<MCL, MCC, MPL, N, S, PD>> for LengthyEntry<MCL, MCC, MPL, N, S, PD>
where
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
{
fn as_ref(&self) -> &Entry<MCL, MCC, MPL, N, S, PD> {
&self.entry
}
}

/// An [`AuthorisedEntry`] together with information about how much of its payload a given [`Store`] holds.
pub struct LengthyAuthorisedEntry<
const MCL: usize,
const MCC: usize,
const MPL: usize,
N,
S,
PD,
AT,
> where
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
/// The Entry in question.
entry: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
/// The number of consecutive bytes from the start of the entry’s payload that the peer holds.
available: u64,
}

impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>
where
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
/// Create a new lengthy entry from a given [`AuthorisedEntry`] and the number of consecutive bytes from the start of the entry’s payload that are held.
pub fn new(entry: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>, available: u64) -> Self {
Self { entry, available }
}

/// The entry in question.
pub fn entry(&self) -> &AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT> {
&self.entry
}

/// The number of consecutive bytes from the start of the entry’s Payload that the peer holds.
pub fn available(&self) -> u64 {
self.available
}

/// Turn this into a [`AuthorisedEntry`].
pub fn into_authorised_entry(self) -> AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT> {
self.entry
}
}

impl<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
AsRef<AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>
for LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>
where
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
fn as_ref(&self) -> &AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT> {
&self.entry
}
}
130 changes: 120 additions & 10 deletions data-model/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::future::Future;

use ufotofu::nb::BulkProducer;
use ufotofu::{local_nb::Producer, nb::BulkProducer};

use crate::{
entry::AuthorisedEntry,
grouping::{Area, AreaOfInterest},
parameters::{AuthorisationToken, NamespaceId, PayloadDigest, SubspaceId},
LengthyEntry, Path,
LengthyAuthorisedEntry, LengthyEntry, Path,
};

/// Returned when an entry could be ingested into a [`Store`].
Expand All @@ -21,8 +21,6 @@ pub enum EntryIngestionSuccess<
> {
/// The entry was successfully ingested.
Success,
/// The entry was successfully ingested and prefix pruned some entries.
SuccessAndPruned(Vec<AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>),
/// The entry was not ingested because a newer entry with same
Obsolete {
/// The obsolete entry which was not ingested.
Expand Down Expand Up @@ -113,7 +111,76 @@ pub enum PayloadAppendError<OE> {
}

/// Returned when no entry was found for some criteria.
pub struct NoSuchEntryError();
pub struct NoSuchEntryError;

/// The order by which entries should be returned for a given query.
pub enum QueryOrder {
/// Ordered by subspace, then path, then timestamp.
Subspace,
/// Ordered by path, then by an arbitrary order determined by the implementation.
Path,
/// Ordered by timestamp, then by an arbitrary order determined by the implementation.
Timestamp,
/// An arbitrary order chosen by the implementation, hopefully the most efficient one.
Arbitrary,
}

/// Describes an [`AuthorisedEntry`] which was pruned and the [`AuthorisedEntry`] which triggered the pruning.
pub struct PruneEvent<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
where
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
/// The entry which was pruned.
pub pruned: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
/// The entry which triggered the pruning.
pub by: AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>,
}

/// An event which took place within a [`Store`].
/// Each event includes a *progress ID* which can be used to *resume* a subscription at any point in the future.
pub enum StoreEvent<const MCL: usize, const MCC: usize, const MPL: usize, N, S, PD, AT>
where
N: NamespaceId,
S: SubspaceId,
PD: PayloadDigest,
AT: AuthorisationToken<MCL, MCC, MPL, N, S, PD>,
{
/// A new entry was ingested.
Ingested(u64, AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>),
/// An existing entry received a portion of its corresponding payload.
Appended(u64, LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>),
/// An entry was forgotten.
EntryForgotten(u64, AuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>),
/// A payload was forgotten.
PayloadForgotten(u64, PD),
/// An entry was pruned via prefix pruning.
Pruned(u64, PruneEvent<MCL, MCC, MPL, N, S, PD, AT>),
}

/// Returned when the store chooses to not resume a subscription.
pub struct ResumptionFailedError(pub u64);

/// Describes which entries to ignore during a query.
#[derive(Default)]
pub struct QueryIgnoreParams {
/// Omit entries with locally incomplete corresponding payloads.
pub ignore_incomplete_payloads: bool,
/// Omit entries whose payload is the empty string.
pub ignore_empty_payloads: bool,
}

impl QueryIgnoreParams {
pub fn ignore_incomplete_payloads(&mut self) {
self.ignore_incomplete_payloads = true;
}

pub fn ignore_empty_payloads(&mut self) {
self.ignore_empty_payloads = true;
}
}

/// Returned when a payload could not be forgotten.
pub enum ForgetPayloadError {
Expand Down Expand Up @@ -185,7 +252,7 @@ where
/// This method **cannot** verify the integrity of partial payloads. This means that arbitrary (and possibly malicious) payloads smaller than the expected size will be stored unless partial verification is implemented upstream (e.g. during [the Willow General Sync Protocol's payload transformation](https://willowprotocol.org/specs/sync/index.html#sync_payloads_transform)).
fn append_payload<Producer>(
&self,
expected_digest: PD,
expected_digest: &PD,
expected_size: u64,
payload_source: &mut Producer,
) -> impl Future<
Expand All @@ -197,14 +264,15 @@ where
where
Producer: BulkProducer<Item = u8>;

/// Locally forget an entry with a given [path] and [subspace] id, succeeding if there is no entry at that path and subspace ID, or returning an implementation-specific error if something went wrong.
/// Locally forget an entry with a given [`Path`] and [subspace](https://willowprotocol.org/specs/data-model/index.html#subspace) id, returning the forgotten entry, or an error if no entry with that path and subspace ID are held by this store.
///
/// If the `traceless` parameter is `true`, the store will keep no record of ever having had the entry. If `false`, it *may* persist what was forgotten for an arbitrary amount of time.
///
/// Forgetting is not the same as [pruning](https://willowprotocol.org/specs/data-model/index.html#prefix_pruning)! Subsequent joins with other [`Store`]s may bring the forgotten entry back.
fn forget_entry(
&self,
path: &Path<MCL, MCC, MPL>,
subspace_id: S,
subspace_id: &S,
traceless: bool,
) -> impl Future<Output = Result<(), Self::OperationsError>>;

Expand All @@ -216,6 +284,7 @@ where
///
/// Forgetting is not the same as [pruning](https://willowprotocol.org/specs/data-model/index.html#prefix_pruning)! Subsequent joins with other [`Store`]s may bring the forgotten entries back.
fn forget_area(
&self,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
protected: Option<Area<MCL, MCC, MPL, S>>,
traceless: bool,
Expand All @@ -236,8 +305,6 @@ where
///
/// If the `traceless` parameter is `true`, the store will keep no record of ever having had the payload. If `false`, it *may* persist what was forgetten for an arbitrary amount of time.
///
/// If the `even_if_referred_to_by_other_entries` parameter is `true`, the payload will be forgotten even if other entries with a different path and/or subspace ID refer to this payload.
///
/// Forgetting is not the same as [pruning](https://willowprotocol.org/specs/data-model/index.html#prefix_pruning)! Subsequent joins with other [`Store`]s may bring the forgotten payload back.
fn forget_payload_unchecked(
path: &Path<MCL, MCC, MPL>,
Expand All @@ -251,6 +318,7 @@ where
///
/// Forgetting is not the same as [pruning](https://willowprotocol.org/specs/data-model/index.html#prefix_pruning)! Subsequent joins with other [`Store`]s may bring the forgotten payloads back.
fn forget_area_payloads(
&self,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
traceless: bool,
) -> impl Future<Output = Vec<PD>>;
Expand All @@ -261,10 +329,52 @@ where
///
/// Forgetting is not the same as [pruning](https://willowprotocol.org/specs/data-model/index.html#prefix_pruning)! Subsequent joins with other [`Store`]s may bring the forgotten payloads back.
fn forget_everything_but_area_payloads(
&self,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
traceless: bool,
) -> impl Future<Output = Vec<PD>>;

/// Force persistence of all previous mutations
fn flush() -> impl Future<Output = Result<(), Self::FlushError>>;

/// Return a [`LengthyAuthorisedEntry`] with the given [`Path`] and [subspace](https://willowprotocol.org/specs/data-model/index.html#subspace) ID, if present.
///
/// If `ignore_incomplete_payloads` is `true`, will return `None` if the entry's corresponding payload is incomplete, even if there is an entry present.
/// If `ignore_empty_payloads` is `true`, will return `None` if the entry's payload length is `0`, even if there is an entry present.
fn entry(
&self,
path: &Path<MCL, MCC, MPL>,
subspace_id: &S,
ignore: Option<QueryIgnoreParams>,
) -> impl Future<Output = LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>;

/// Query which entries are [included](https://willowprotocol.org/specs/grouping-entries/index.html#area_include) by an [`AreaOfInterest`], returning a producer of [`LengthyAuthorisedEntry`].
///
/// If `ignore_incomplete_payloads` is `true`, the producer will not produce entries with incomplete corresponding payloads.
/// If `ignore_empty_payloads` is `true`, the producer will not produce entries with a `payload_length` of `0`.
fn query_area(
&self,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
order: &QueryOrder,
reverse: bool,
ignore: Option<QueryIgnoreParams>,
) -> impl Producer<Item = LengthyAuthorisedEntry<MCL, MCC, MPL, N, S, PD, AT>>;

/// Subscribe to events concerning entries [included](https://willowprotocol.org/specs/grouping-entries/index.html#area_include) by an [`AreaOfInterest`], returning a producer of `StoreEvent`s which occurred since the moment of calling this function.
///
/// If `ignore_incomplete_payloads` is `true`, the producer will not produce entries with incomplete corresponding payloads.
/// If `ignore_empty_payloads` is `true`, the producer will not produce entries with a `payload_length` of `0`.
fn subscribe_area(
&self,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
ignore: Option<QueryIgnoreParams>,
) -> impl Producer<Item = StoreEvent<MCL, MCC, MPL, N, S, PD, AT>>;

/// Attempt to resume a subscription using a *progress ID* obtained from a previous subscription, or return an error if this store implementation is unable to resume the subscription.
fn resume_subscription(
&self,
progress_id: u64,
area: &AreaOfInterest<MCL, MCC, MPL, S>,
ignore: Option<QueryIgnoreParams>,
) -> impl Producer<Item = StoreEvent<MCL, MCC, MPL, N, S, PD, AT>>;
}

0 comments on commit ade83c7

Please sign in to comment.