Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AWS glue catalog support for iceberg_scan() #51

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
PROPERTIES IMPORTED_LOCATION ${AVROCPP_LIBRARY_DEBUG}
INTERFACE_INCLUDE_DIRECTORIES ${AVROCPP_INCLUDE_DIR})

# For the Glue catalog support.
find_package(AWSSDK REQUIRED COMPONENTS core;glue)

# Note: for some reason avro-cpp port does properly handle static deps, so we
# need to manually ensure avro's deps are linked too
set(SNAPPY_HOME "${AVROCPP_ROOT_FIND_DIR}")
Expand All @@ -55,6 +58,9 @@
filesystem)
find_package(Snappy CONFIG REQUIRED)
find_package(ZLIB REQUIRED)

target_include_directories(${EXTENSION_NAME}
PUBLIC $<BUILD_INTERFACE:${AWSSDK_INCLUDE_DIRS}>)
target_link_libraries(
avro_static_release
INTERFACE Boost::boost
Expand All @@ -76,9 +82,9 @@

# Link dependencies into extension
target_link_libraries(${EXTENSION_NAME} PUBLIC optimized avro_static_release
debug avro_static_debug)
debug avro_static_debug ${AWSSDK_LINK_LIBRARIES})
target_link_libraries(${TARGET_NAME}_loadable_extension optimized

Check failure on line 86 in CMakeLists.txt

View workflow job for this annotation

GitHub Actions / Build extension binaries / Linux (linux_amd64, ubuntu:18.04, x64-linux)

Cannot specify link libraries for target "AWS::crypto_loadable_extension"
avro_static_release debug avro_static_debug)
avro_static_release debug avro_static_debug ${AWSSDK_LINK_LIBRARIES})

install(
TARGETS ${EXTENSION_NAME} ${TARGET_NAME}_loadable_extension
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
> **Disclaimer:** This extension is currently in an experimental state. Feel free to try it out, but be aware that things may not work as expected

# DuckDB extension for Apache Iceberg
# DuckDB extension for Apache Iceberg

This repository contains a DuckDB extension that adds support for [Apache Iceberg](https://iceberg.apache.org/). In its current state, the extension offers some basics features that allow listing snapshots and reading specific snapshots
of an iceberg tables.
Expand All @@ -13,15 +13,15 @@ See the [Iceberg page in the DuckDB documentation](https://duckdb.org/docs/exten

### Dependencies

This extension has several dependencies. Currently, the main way to install them is through vcpkg. To install vcpkg,
This extension has several dependencies. Currently, the main way to install them is through vcpkg. To install vcpkg,
check out the docs [here](https://vcpkg.io/en/getting-started.html). Note that this extension contains a custom vcpkg port
that overrides the existing 'avro-cpp' port of vcpkg. The reason for this is that the other versions of avro-cpp have
some issue that seems to cause issues with the avro files produced by the spark iceberg extension.

### Test data generation

To generate test data, the script in 'scripts/test_data_generator' is used to have spark generate some test data. This is
based on pyspark 3.5, which you can install through pip.
To generate test data, the script in 'scripts/test_data_generator' is used to have spark generate some test data. This is
based on pyspark 3.5, which you can install through pip.

### Building the extension

Expand Down Expand Up @@ -53,7 +53,7 @@ running `python3 -m pip install duckdb pyspark[sql]==3.5.0` should do the trick.
#### Running unit tests

```shell
make test
make test
```

#### Running the local S3 test server
Expand Down
76 changes: 64 additions & 12 deletions src/common/iceberg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@
#include "avro/ValidSchema.hh"
#include "avro/Stream.hh"

namespace duckdb {
#include <regex>
#include <aws/core/Aws.h>
#include <aws/glue/GlueClient.h>
#include <aws/glue/model/GetTableRequest.h>
#include <aws/glue/model/GetTableResult.h>

IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs,
bool allow_moved_paths, string metadata_compression_codec) {
IcebergTable ret;
ret.path = iceberg_path;
ret.snapshot = snapshot;
namespace duckdb {

IcebergTable::IcebergTable(const string &iceberg_path, IcebergSnapshot &snapshot, FileSystem &fs,
bool allow_moved_paths, string metadata_compression_codec) : snapshot(snapshot), path(iceberg_path) {
auto manifest_list_full_path = allow_moved_paths
? IcebergUtils::GetFullPath(iceberg_path, snapshot.manifest_list, fs)
: snapshot.manifest_list;
Expand All @@ -29,10 +31,8 @@ IcebergTable IcebergTable::Load(const string &iceberg_path, IcebergSnapshot &sna
: manifest.manifest_path;
auto manifest_paths = ReadManifestEntries(manifest_entry_full_path, fs, snapshot.iceberg_format_version);

ret.entries.push_back({std::move(manifest), std::move(manifest_paths)});
entries.push_back({std::move(manifest), std::move(manifest_paths)});
}

return ret;
}

vector<IcebergManifest> IcebergTable::ReadManifestListFile(const string &path, FileSystem &fs, idx_t iceberg_format_version) {
Expand Down Expand Up @@ -175,7 +175,59 @@ string GenerateMetaDataUrl(FileSystem &fs, const string &meta_path, const string
return fs.JoinPath(meta_path, "v" + table_version + ".gz.metadata.json");
}

// Read the current table metadata location from AWS Glue via the AWS SDK.
string IcebergSnapshot::ReadMetaDataFromAWSGlue(const string &path, FileSystem &fs, string metadata_compression_codec) {
Aws::SDKOptions options;
Aws::InitAPI(options);
try {
Aws::Client::ClientConfiguration config;
config.region = this->region;

Aws::Glue::GlueClient glueClient(config);

Aws::Glue::Model::GetTableRequest request;
if (!this->catalog.empty()) {
request.SetCatalogId(this->catalog);
}

request.SetDatabaseName(this->database_name);
request.SetName(path);

auto get_table_result = glueClient.GetTable(request);
if (!get_table_result.IsSuccess()) {
const Aws::Client::AWSError<Aws::Glue::GlueErrors>& error = get_table_result.GetError();
throw IOException("AWS Glue: Error calling GetTable API for table " + path + " " + error.GetExceptionName() + " - " + error.GetMessage());
}
const Aws::Glue::Model::Table& table = get_table_result.GetResult().GetTable();

const Aws::Map<Aws::String, Aws::String>& table_parameters = table.GetParameters();
auto table_type = table_parameters.find("table_type");
if (table_type == table_parameters.end()) {
throw IOException("AWS Glue: table_type is not defined for table, is it an Iceberg table?");
}
if (table_type->second != "ICEBERG") {
throw IOException("AWS Glue: type_type is not set to 'ICEBERG', is this an Iceberg table?");
}
auto table_metadata_location = table_parameters.find("metadata_location");
if(table_metadata_location == table_parameters.end()) {
throw IOException("AWS Glue: No Iceberg metadata location is specified for the table.");
}
Aws::ShutdownAPI(options);
return IcebergUtils::FileToString(table_metadata_location->second, fs);
} catch(Exception &exception) {
Aws::ShutdownAPI(options);
throw exception;
}
}


string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string metadata_compression_codec) {
// If the path starts with a { it is presumed to be a JSON encoded representation of a remote
// iceberg catalog.
if (catalog_type == "glue") {
return ReadMetaDataFromAWSGlue(path, fs, metadata_compression_codec);
}

string metadata_file_path;
if (StringUtil::EndsWith(path, ".json")) {
metadata_file_path = path;
Expand All @@ -193,9 +245,9 @@ string IcebergSnapshot::ReadMetaData(const string &path, FileSystem &fs, string

IcebergSnapshot IcebergSnapshot::ParseSnapShot(yyjson_val *snapshot, idx_t iceberg_format_version, idx_t schema_id,
vector<yyjson_val *> &schemas, string metadata_compression_codec,
bool skip_schema_inference) {
IcebergSnapshot ret;
auto snapshot_tag = yyjson_get_tag(snapshot);
bool skip_schema_inference) {
IcebergSnapshot ret(catalog_type, catalog, region, database_name);
auto snapshot_tag = yyjson_get_type(snapshot);
if (snapshot_tag != YYJSON_TYPE_OBJ) {
throw IOException("Invalid snapshot field found parsing iceberg metadata.json");
}
Expand Down
14 changes: 7 additions & 7 deletions src/common/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace duckdb {
static LogicalType ParseType(yyjson_val *type);

static LogicalType ParseStruct(yyjson_val *struct_type) {
D_ASSERT(yyjson_get_tag(struct_type) == YYJSON_TYPE_OBJ);
D_ASSERT(yyjson_get_type(struct_type) == YYJSON_TYPE_OBJ);
D_ASSERT(IcebergUtils::TryGetStrFromObject(struct_type, "type") == "struct");

child_list_t<LogicalType> children;
Expand All @@ -28,7 +28,7 @@ static LogicalType ParseStruct(yyjson_val *struct_type) {
}

static LogicalType ParseList(yyjson_val *list_type) {
D_ASSERT(yyjson_get_tag(list_type) == YYJSON_TYPE_OBJ);
D_ASSERT(yyjson_get_type(list_type) == YYJSON_TYPE_OBJ);
D_ASSERT(IcebergUtils::TryGetStrFromObject(list_type, "type") == "list");

// NOTE: 'element-id', 'element-required' are ignored for now
Expand All @@ -38,7 +38,7 @@ static LogicalType ParseList(yyjson_val *list_type) {
}

static LogicalType ParseMap(yyjson_val *map_type) {
D_ASSERT(yyjson_get_tag(map_type) == YYJSON_TYPE_OBJ);
D_ASSERT(yyjson_get_type(map_type) == YYJSON_TYPE_OBJ);
D_ASSERT(IcebergUtils::TryGetStrFromObject(map_type, "type") == "map");

// NOTE: 'key-id', 'value-id', 'value-required' are ignored for now
Expand All @@ -51,7 +51,7 @@ static LogicalType ParseMap(yyjson_val *map_type) {
}

static LogicalType ParseComplexType(yyjson_val *type) {
D_ASSERT(yyjson_get_tag(type) == YYJSON_TYPE_OBJ);
D_ASSERT(yyjson_get_type(type) == YYJSON_TYPE_OBJ);
auto type_str = IcebergUtils::TryGetStrFromObject(type, "type");

if (type_str == "struct") {
Expand All @@ -73,10 +73,10 @@ static LogicalType ParseType(yyjson_val *type) {
if (!val) {
throw IOException("Invalid field found while parsing field: type");
}
if (yyjson_get_tag(val) == YYJSON_TYPE_OBJ) {
if (yyjson_get_type(val) == YYJSON_TYPE_OBJ) {
return ParseComplexType(val);
}
if (yyjson_get_tag(val) != YYJSON_TYPE_STR) {
if (yyjson_get_type(val) != YYJSON_TYPE_STR) {
throw IOException("Invalid field found while parsing field: type");
}

Expand Down Expand Up @@ -154,7 +154,7 @@ static vector<IcebergColumnDefinition> ParseSchemaFromJson(yyjson_val *schema_js
if (type_str != "struct") {
throw IOException("Schema in JSON Metadata is invalid");
}
D_ASSERT(yyjson_get_tag(schema_json) == YYJSON_TYPE_OBJ);
D_ASSERT(yyjson_get_type(schema_json) == YYJSON_TYPE_OBJ);
D_ASSERT(IcebergUtils::TryGetStrFromObject(schema_json, "type") == "struct");
yyjson_val *field;
size_t max, idx;
Expand Down
6 changes: 3 additions & 3 deletions src/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ string IcebergUtils::GetFullPath(const string &iceberg_path, const string &relat

uint64_t IcebergUtils::TryGetNumFromObject(yyjson_val *obj, const string &field) {
auto val = yyjson_obj_getn(obj, field.c_str(), field.size());
if (!val || yyjson_get_tag(val) != YYJSON_TYPE_NUM) {
if (!val || yyjson_get_type(val) != YYJSON_TYPE_NUM) {
throw IOException("Invalid field found while parsing field: " + field);
}
return yyjson_get_uint(val);
}

bool IcebergUtils::TryGetBoolFromObject(yyjson_val *obj, const string &field) {
auto val = yyjson_obj_getn(obj, field.c_str(), field.size());
if (!val || yyjson_get_tag(val) != YYJSON_TYPE_BOOL) {
if (!val || yyjson_get_type(val) != YYJSON_TYPE_BOOL) {
throw IOException("Invalid field found while parsing field: " + field);
}
return yyjson_get_bool(val);
}

string IcebergUtils::TryGetStrFromObject(yyjson_val *obj, const string &field) {
auto val = yyjson_obj_getn(obj, field.c_str(), field.size());
if (!val || yyjson_get_tag(val) != YYJSON_TYPE_STR) {
if (!val || yyjson_get_type(val) != YYJSON_TYPE_STR) {
throw IOException("Invalid field found while parsing field: " + field);
}
return yyjson_get_str(val);
Expand Down
45 changes: 38 additions & 7 deletions src/iceberg_functions/iceberg_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
bool allow_moved_paths = false;
string metadata_compression_codec = "none";
bool skip_schema_inference = false;


string catalog_type = "";
string catalog = "";
string region = "";
string database_name = "";

for (auto &kv : input.named_parameters) {
auto loption = StringUtil::Lower(kv.first);
if (loption == "allow_moved_paths") {
Expand All @@ -66,24 +71,32 @@ static unique_ptr<FunctionData> IcebergMetaDataBind(ClientContext &context, Tabl
metadata_compression_codec = StringValue::Get(kv.second);
} else if (loption == "skip_schema_inference") {
skip_schema_inference = BooleanValue::Get(kv.second);
} else if (loption == "catalog_type") {
catalog_type = StringValue::Get(kv.second);
} else if (loption == "catalog") {
catalog = StringValue::Get(kv.second);
} else if (loption == "region") {
region = StringValue::Get(kv.second);
} else if (loption == "database_name") {
database_name = StringValue::Get(kv.second);
}
}
IcebergSnapshot snapshot_to_scan;
IcebergSnapshot snapshot_to_scan(catalog_type, catalog, region, database_name);

if (input.inputs.size() > 1) {
if (input.inputs[1].type() == LogicalType::UBIGINT) {
snapshot_to_scan = IcebergSnapshot::GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = snapshot_to_scan.GetSnapshotById(iceberg_path, fs, input.inputs[1].GetValue<uint64_t>(), metadata_compression_codec, skip_schema_inference);
} else if (input.inputs[1].type() == LogicalType::TIMESTAMP) {
snapshot_to_scan =
IcebergSnapshot::GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
snapshot_to_scan.GetSnapshotByTimestamp(iceberg_path, fs, input.inputs[1].GetValue<timestamp_t>(), metadata_compression_codec, skip_schema_inference);
} else {
throw InvalidInputException("Unknown argument type in IcebergScanBindReplace.");
}
} else {
snapshot_to_scan = IcebergSnapshot::GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
snapshot_to_scan = snapshot_to_scan.GetLatestSnapshot(iceberg_path, fs, metadata_compression_codec, skip_schema_inference);
}

ret->iceberg_table =
make_uniq<IcebergTable>(IcebergTable::Load(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec));
ret->iceberg_table = make_uniq<IcebergTable>(IcebergTable(iceberg_path, snapshot_to_scan, fs, allow_moved_paths, metadata_compression_codec));

auto manifest_types = IcebergManifest::Types();
return_types.insert(return_types.end(), manifest_types.begin(), manifest_types.end());
Expand Down Expand Up @@ -146,20 +159,38 @@ TableFunctionSet IcebergFunctions::GetIcebergMetadataFunction() {
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;

fun.named_parameters["catalog_type"] = LogicalType::VARCHAR;
fun.named_parameters["catalog"] = LogicalType::VARCHAR;
fun.named_parameters["region"] = LogicalType::VARCHAR;
fun.named_parameters["database_name"] = LogicalType::VARCHAR;

function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::UBIGINT}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;

fun.named_parameters["catalog_type"] = LogicalType::VARCHAR;
fun.named_parameters["catalog"] = LogicalType::VARCHAR;
fun.named_parameters["region"] = LogicalType::VARCHAR;
fun.named_parameters["database_name"] = LogicalType::VARCHAR;

function_set.AddFunction(fun);

fun = TableFunction({LogicalType::VARCHAR, LogicalType::TIMESTAMP}, IcebergMetaDataFunction, IcebergMetaDataBind,
IcebergMetaDataGlobalTableFunctionState::Init);
fun.named_parameters["skip_schema_inference"] = LogicalType::BOOLEAN;
fun.named_parameters["allow_moved_paths"] = LogicalType::BOOLEAN;
fun.named_parameters["metadata_compression_codec"] = LogicalType::VARCHAR;

fun.named_parameters["catalog_type"] = LogicalType::VARCHAR;
fun.named_parameters["catalog"] = LogicalType::VARCHAR;
fun.named_parameters["region"] = LogicalType::VARCHAR;
fun.named_parameters["database_name"] = LogicalType::VARCHAR;

function_set.AddFunction(fun);

return function_set;
Expand Down
Loading
Loading