Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: refactor manifest related traits to encapsulate path handling #2738

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions rust/lance-table/src/io/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use async_trait::async_trait;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{Bytes, BytesMut};
use futures::future::BoxFuture;
use lance_arrow::DataTypeExt;
use lance_file::{version::LanceFileVersion, writer::ManifestProvider};
use object_store::path::Path;
Expand All @@ -19,10 +20,10 @@
object_store::ObjectStore,
object_writer::ObjectWriter,
traits::{WriteExt, Writer},
utils::read_message,

Check warning on line 23 in rust/lance-table/src/io/manifest.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/io/manifest.rs
};

use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC};
use crate::format::{pb, DataStorageFormat, Index, Manifest, MAGIC, MAJOR_VERSION, MINOR_VERSION};

/// Read Manifest on URI.
///
Expand Down Expand Up @@ -116,10 +117,10 @@
async fn do_write_manifest(
writer: &mut dyn Writer,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
indices: &[Index],
) -> Result<usize> {
// Write indices if presented.
if let Some(indices) = indices.as_ref() {
if !indices.is_empty() {
let section = pb::IndexSection {
indices: indices.iter().map(|i| i.into()).collect(),
};
Expand All @@ -134,7 +135,7 @@
pub async fn write_manifest(
writer: &mut dyn Writer,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
indices: &[Index],
) -> Result<usize> {
// Write dictionary values.
let max_field_id = manifest.schema.max_field_id().unwrap_or(-1);
Expand Down Expand Up @@ -187,6 +188,23 @@
do_write_manifest(writer, manifest, indices).await
}

pub fn write_manifest_file_to_path<'a>(
object_store: &'a ObjectStore,
manifest: &'a mut Manifest,
indices: &'a [Index],
path: &'a Path,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async {
let mut object_writer = ObjectWriter::new(object_store, path).await?;
let pos = write_manifest(&mut object_writer, manifest, indices).await?;
object_writer
.write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
.await?;
object_writer.shutdown().await?;
Ok(())
})
}

/// Implementation of ManifestProvider that describes a Lance file by writing
/// a manifest that contains nothing but default fields and the schema
pub struct ManifestDescribing {}
Expand All @@ -202,7 +220,7 @@
Arc::new(vec![]),
DataStorageFormat::new(LanceFileVersion::Legacy),
);
let pos = do_write_manifest(object_writer, &mut manifest, None).await?;
let pos = do_write_manifest(object_writer, &mut manifest, &[]).await?;
Ok(Some(pos))
}
}
Expand Down Expand Up @@ -244,7 +262,7 @@
ArrowSchema::new(vec![ArrowField::new(long_name, DataType::Int64, false)]);
let schema = Schema::try_from(&arrow_schema).unwrap();
let mut manifest = Manifest::new(schema, Arc::new(vec![]), DataStorageFormat::default());
let pos = write_manifest(&mut writer, &mut manifest, None)
let pos = write_manifest(&mut writer, &mut manifest, &[])
.await
.unwrap();
writer
Expand Down
1 change: 1 addition & 0 deletions rust/lance-table/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
pub mod feature_flags;
pub mod format;
pub mod io;
pub mod manifest_store;
pub mod rowids;
pub mod utils;
97 changes: 97 additions & 0 deletions rust/lance-table/src/manifest_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! [ManifestStore] encapsulates the logic for reading, writing and managing
//! manifest files. Different implementations may be used, depending on the
//! capabilities of the underlying storage system.

use futures::future::Future;

use futures::stream::BoxStream;
use lance_core::{Error, Result};
use lance_io::traits::Reader;

Check warning on line 13 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
use crate::format::{Index, Manifest};

pub mod legacy;
#[cfg(feature = "dynamodb")]
pub mod dynamodb;

const MANIFEST_EXTENSION: &str = "manifest";

/// A store of manifests. This provides fast access to the latest version
/// of the dataset and allows for listing and opening older versions.
pub trait ManifestStore {

Check warning on line 24 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
/// Get the latest version of the dataset.
fn latest_version(&self) -> impl Future<Output = Result<ManifestVersion>>;

/// Open the latest manifest file.
fn open_latest_manifest(&self) -> impl Future<Output = Result<Box<dyn Reader>>>;

/// Open the manifest file for the given version.

Check warning on line 31 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
///
/// Should use the provided size if available to avoid an extra HEAD request.
fn open_manifest(&self, version: impl Into<ManifestVersion>) -> impl Future<Output = Result<Box<dyn Reader>>>;

/// List all the versions of the dataset.
///
/// This should return them in descending order.
fn list_versions(&self) -> BoxStream<Result<ManifestVersion>>;

/// Try to commit the given manifest as the given version.
///
/// If the version already exists, this should return an error, even if
/// the version was created by a concurrent process.
///

Check warning on line 45 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
/// Any temporary files created during the commit should be cleaned up
/// if the commit fails.
///
/// The `manifest` is mutable because the offsets to certain internal
/// structures are updated during the writing process.
fn try_commit(
&self,
manifest: &mut Manifest,
indices: &[Index],
) -> impl Future<Output = std::result::Result<(), CommitError>>;

// TODO: what about cleanup?
}

pub struct ManifestVersion {
version: u64,
known_size: Option<u64>,
}

impl From<u64> for ManifestVersion {
fn from(version: u64) -> Self {
Self {
version,
known_size: None,
}
}

Check warning on line 71 in rust/lance-table/src/manifest_store.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store.rs
}


/// Errors that can occur when committing a manifest.
#[derive(Debug)]
pub enum CommitError {
/// Another transaction has already been written to the path
CommitConflict,
/// Something else went wrong
OtherError(Error),
}

impl From<Error> for CommitError {
fn from(e: Error) -> Self {
Self::OtherError(e)
}
}

// Goal: make the paths opaque, so that the store implementation can choose how
// the paths are represented.

// Goal 2: separate idea of commit handler (what happens when we write the manifest)
// from the idea of the store (how we read the manifests). Allow customizing both.

// This is really just a cleaned up version of CommitHandler. We can provide
// an adapter for now.
3 changes: 3 additions & 0 deletions rust/lance-table/src/manifest_store/dynamodb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// SPDX-License-Identifier: Apache-2.0

Check warning on line 1 in rust/lance-table/src/manifest_store/dynamodb.rs

View workflow job for this annotation

GitHub Actions / linux-arm

Diff in /runner/_work/lance/lance/rust/lance-table/src/manifest_store/dynamodb.rs
// SPDX-FileCopyrightText: Copyright The Lance Authors

Loading
Loading