Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an example to demonstrate multithreaded read_parquet pipelines #16828

Merged
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ff2480b
Add the new multithreaded parquet example
mhaseeb123 Sep 18, 2024
d06f7f2
Set the default output path to the current path
mhaseeb123 Sep 18, 2024
c13a408
Style fix
mhaseeb123 Sep 18, 2024
12adeeb
Use stream pool for parquet write as well
mhaseeb123 Sep 18, 2024
a8ae50a
Add more details to the example
mhaseeb123 Sep 18, 2024
6679f89
Minor improvements
mhaseeb123 Sep 18, 2024
e04602c
Minor improvement
mhaseeb123 Sep 18, 2024
21ce7c7
Minor improvements
mhaseeb123 Sep 18, 2024
b649530
Merge branch 'branch-24.10' into fea-parquet-multithreaded-example
mhaseeb123 Sep 18, 2024
b8b8bb9
Move the vector to concatenate tables
mhaseeb123 Sep 19, 2024
188ce11
Minor improvement
mhaseeb123 Sep 23, 2024
1827654
Merge branch 'branch-24.10' into fea-parquet-multithreaded-example
mhaseeb123 Sep 23, 2024
e14916f
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Sep 23, 2024
a528eb3
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Sep 26, 2024
990f2bb
Make multithreaded parquet io example more sophisticated
mhaseeb123 Sep 27, 2024
06817d0
Minor updates
mhaseeb123 Sep 27, 2024
9dc8a1e
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Sep 27, 2024
af8ec6a
Minor improvements
mhaseeb123 Sep 27, 2024
d3778cc
Set default thread count = 1 instead of 2
mhaseeb123 Sep 27, 2024
c2b39cc
Minor improvement
mhaseeb123 Sep 27, 2024
8f39fb2
Add io source types
mhaseeb123 Oct 1, 2024
d0c2a62
Minor comment updates
mhaseeb123 Oct 1, 2024
945c0c0
Style fix and add to CI.
mhaseeb123 Oct 1, 2024
f30c801
Minor improvement
mhaseeb123 Oct 1, 2024
f701e34
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 1, 2024
719bfb6
Updates
mhaseeb123 Oct 1, 2024
2ade064
Style fix.
mhaseeb123 Oct 1, 2024
b559eaf
Print message when skipping a subdirectory
mhaseeb123 Oct 2, 2024
73de5bc
Update cpp/examples/parquet_io/io_source.hpp
mhaseeb123 Oct 2, 2024
52e6953
Update cpp/examples/parquet_io/common_utils.cpp
mhaseeb123 Oct 2, 2024
6194a50
Do not use `fmtlib`
mhaseeb123 Oct 2, 2024
3420c3f
Minor style fix
mhaseeb123 Oct 2, 2024
85d906d
Minor change
mhaseeb123 Oct 2, 2024
70ec6fd
Address minor nits from reviews
mhaseeb123 Oct 3, 2024
00390cd
Update cpp/examples/parquet_io/parquet_io_multithreaded.cpp
mhaseeb123 Oct 3, 2024
5ad8ecd
Move code to cpp files and minor refactoring
mhaseeb123 Oct 3, 2024
74763b0
Minor style fix
mhaseeb123 Oct 3, 2024
06afb49
Minor updates
mhaseeb123 Oct 4, 2024
db9aace
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 4, 2024
1a04409
Nits from code reviews
mhaseeb123 Oct 4, 2024
2fb523f
Merge branch 'fea-parquet-multithreaded-example' of https://github.co…
mhaseeb123 Oct 4, 2024
8be6710
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 7, 2024
2a6db5d
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 7, 2024
cc6242c
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 7, 2024
3a59027
Minor arg setting
mhaseeb123 Oct 10, 2024
60e5d75
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 10, 2024
7cfd7ae
Adjust spacing
mhaseeb123 Oct 11, 2024
d1fbad8
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 11, 2024
d9102f0
Apply suggestion
mhaseeb123 Oct 11, 2024
174e6c9
Merge branch 'fea-parquet-multithreaded-example' of https://github.co…
mhaseeb123 Oct 11, 2024
b61f18e
Minor
mhaseeb123 Oct 11, 2024
803e8c9
Merge branch 'branch-24.12' into fea-parquet-multithreaded-example
mhaseeb123 Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ci/run_cudf_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ compute-sanitizer --tool memcheck custom_optimized names.csv
compute-sanitizer --tool memcheck custom_prealloc names.csv
compute-sanitizer --tool memcheck custom_with_malloc names.csv

compute-sanitizer --tool memcheck parquet_io
compute-sanitizer --tool memcheck parquet_io example.parquet
compute-sanitizer --tool memcheck parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD TRUE

compute-sanitizer --tool memcheck parquet_io_multithreaded example.parquet
compute-sanitizer --tool memcheck parquet_io_multithreaded example.parquet 4 DEVICE_BUFFER 2 2

exit ${EXITCODE}
19 changes: 16 additions & 3 deletions cpp/examples/parquet_io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@ project(

include(../fetch_dependencies.cmake)

# Configure your project here
add_library(parquet_io_utils OBJECT common_utils.cpp io_source.cpp)
target_compile_features(parquet_io_utils PRIVATE cxx_std_17)
target_link_libraries(parquet_io_utils PRIVATE cudf::cudf)

# Build and install parquet_io
add_executable(parquet_io parquet_io.cpp)
target_link_libraries(parquet_io PRIVATE cudf::cudf)
target_link_libraries(parquet_io PRIVATE cudf::cudf nvToolsExt $<TARGET_OBJECTS:parquet_io_utils>)
target_compile_features(parquet_io PRIVATE cxx_std_17)

install(TARGETS parquet_io DESTINATION bin/examples/libcudf)

# Build and install parquet_io_multithreaded
add_executable(parquet_io_multithreaded parquet_io_multithreaded.cpp)
target_link_libraries(
parquet_io_multithreaded PRIVATE cudf::cudf nvToolsExt $<TARGET_OBJECTS:parquet_io_utils>
)
target_compile_features(parquet_io_multithreaded PRIVATE cxx_std_17)
install(TARGETS parquet_io_multithreaded DESTINATION bin/examples/libcudf)

# Install the example.parquet file
install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf)
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@
* limitations under the License.
*/

#pragma once
#include "common_utils.hpp"

#include <cudf/io/parquet.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/io/types.hpp>
#include <cudf/join.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_device.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <chrono>
#include <iostream>
#include <optional>
#include <iomanip>
#include <string>

/**
* @brief Create memory resource for libcudf functions
* @file common_utils.cpp
* @brief Definitions for common utilities for `parquet_io` examples
*
* @param pool Whether to use a pool memory resource.
* @return Memory resource instance
*/

std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used)
{
auto cuda_mr = std::make_shared<rmm::mr::cuda_memory_resource>();
Expand All @@ -48,17 +45,11 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_
return cuda_mr;
}

/**
* @brief Get encoding type from the keyword
*
* @param name encoding keyword name
* @return corresponding column encoding type
*/
[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name)
cudf::io::column_encoding get_encoding_type(std::string name)
{
using encoding_type = cudf::io::column_encoding;

static const std::unordered_map<std::string_view, cudf::io::column_encoding> map = {
static std::unordered_map<std::string_view, encoding_type> const map = {
{"DEFAULT", encoding_type::USE_DEFAULT},
{"DICTIONARY", encoding_type::DICTIONARY},
{"PLAIN", encoding_type::PLAIN},
Expand All @@ -69,26 +60,18 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) { return map.at(name); }
throw std::invalid_argument("FATAL: " + std::string(name) +
throw std::invalid_argument(name +
" is not a valid encoding type.\n\n"
"Available encoding types: DEFAULT, DICTIONARY, PLAIN,\n"
"DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY,\n"
"DELTA_BYTE_ARRAY\n"
"\n"
"Exiting...\n");
"DELTA_BYTE_ARRAY\n\n");
}

/**
* @brief Get compression type from the keyword
*
* @param name compression keyword name
* @return corresponding compression type
*/
[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name)
cudf::io::compression_type get_compression_type(std::string name)
{
using compression_type = cudf::io::compression_type;

static const std::unordered_map<std::string_view, cudf::io::compression_type> map = {
static std::unordered_map<std::string_view, compression_type> const map = {
{"NONE", compression_type::NONE},
{"AUTO", compression_type::AUTO},
{"SNAPPY", compression_type::SNAPPY},
Expand All @@ -97,30 +80,58 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) { return map.at(name); }
throw std::invalid_argument("FATAL: " + std::string(name) +
throw std::invalid_argument(name +
" is not a valid compression type.\n\n"
"Available compression_type types: NONE, AUTO, SNAPPY,\n"
"LZ4, ZSTD\n"
"\n"
"Exiting...\n");
"Available compression types: NONE, AUTO, SNAPPY,\n"
"LZ4, ZSTD\n\n");
}

/**
* @brief Get the optional page size stat frequency from they keyword
*
* @param use_stats keyword affirmation string such as: Y, T, YES, TRUE, ON
* @return optional page statistics frequency set to full (STATISTICS_COLUMN)
*/
[[nodiscard]] std::optional<cudf::io::statistics_freq> get_page_size_stats(std::string use_stats)
bool get_boolean(std::string input)
{
std::transform(use_stats.begin(), use_stats.end(), use_stats.begin(), ::toupper);
std::transform(input.begin(), input.end(), input.begin(), ::toupper);

// Check if the input string matches to any of the following
if (not use_stats.compare("ON") or not use_stats.compare("TRUE") or
not use_stats.compare("YES") or not use_stats.compare("Y") or not use_stats.compare("T")) {
// Full column and offset indices - STATISTICS_COLUMN
return std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN);
return input == "ON" or input == "TRUE" or input == "YES" or input == "Y" or input == "T";
}

void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table)
{
try {
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(lhs_table, rhs_table, cudf::null_equality::EQUAL);

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Tables identical: " << valid << "\n\n";
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
throw std::runtime_error("Tables identical: false\n\n");
}
}

return std::nullopt;
std::unique_ptr<cudf::table> concatenate_tables(std::vector<std::unique_ptr<cudf::table>> tables,
rmm::cuda_stream_view stream)
{
if (tables.size() == 1) { return std::move(tables[0]); }

std::vector<cudf::table_view> table_views;
table_views.reserve(tables.size());
std::transform(
tables.begin(), tables.end(), std::back_inserter(table_views), [&](auto const& tbl) {
return tbl->view();
});
// Construct the final table
return cudf::concatenate(table_views, stream);
}

std::string current_date_and_time()
{
auto const time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto const local_time = *std::localtime(&time);
// Stringstream to format the date and time
std::stringstream ss;
ss << std::put_time(&local_time, "%Y-%m-%d-%H-%M-%S");
return ss.str();
}
89 changes: 89 additions & 0 deletions cpp/examples/parquet_io/common_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

#include <memory>
#include <string>
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved

/**
* @file common_utils.hpp
* @brief Common utilities for `parquet_io` examples
*
*/

/**
* @brief Create memory resource for libcudf functions
*
* @param pool Whether to use a pool memory resource.
* @return Memory resource instance
*/
std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used);

/**
* @brief Get encoding type from the keyword
*
* @param name encoding keyword name
* @return corresponding column encoding type
*/
[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name);

/**
* @brief Get compression type from the keyword
*
* @param name compression keyword name
* @return corresponding compression type
*/
[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name);

/**
* @brief Get boolean from they keyword
*
* @param input keyword affirmation string such as: Y, T, YES, TRUE, ON
* @return true or false
*/
[[nodiscard]] bool get_boolean(std::string input);

/**
* @brief Check if two tables are identical, throw an error otherwise
*
* @param lhs_table View to lhs table
* @param rhs_table View to rhs table
*/
void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table);

/**
* @brief Concatenate a vector of tables and return the resultant table
*
* @param tables Vector of tables to concatenate
* @param stream CUDA stream to use
*
* @return Unique pointer to the resultant concatenated table.
*/
std::unique_ptr<cudf::table> concatenate_tables(std::vector<std::unique_ptr<cudf::table>> tables,
rmm::cuda_stream_view stream);

/**
* @brief Returns a string containing current date and time
*
*/
std::string current_date_and_time();
97 changes: 97 additions & 0 deletions cpp/examples/parquet_io/io_source.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "io_source.hpp"

#include <cudf/io/types.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/pinned_host_memory_resource.hpp>

#include <thrust/host_vector.h>

#include <filesystem>
#include <fstream>
#include <string>

rmm::host_async_resource_ref pinned_memory_resource()
{
static auto mr = rmm::mr::pinned_host_memory_resource{};
return mr;
}

io_source_type get_io_source_type(std::string name)
{
static std::unordered_map<std::string_view, io_source_type> const map = {
{"FILEPATH", io_source_type::FILEPATH},
{"HOST_BUFFER", io_source_type::HOST_BUFFER},
{"PINNED_BUFFER", io_source_type::PINNED_BUFFER},
{"DEVICE_BUFFER", io_source_type::DEVICE_BUFFER}};

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) {
return map.at(name);
} else {
throw std::invalid_argument(name +
" is not a valid io source type. Available: FILEPATH,\n"
"HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER.\n\n");
}
}

io_source::io_source(std::string_view file_path, io_source_type type, rmm::cuda_stream_view stream)
: pinned_buffer({pinned_memory_resource(), stream}), d_buffer{0, stream}
{
std::string const file_name{file_path};
auto const file_size = std::filesystem::file_size(file_name);

// For filepath make a quick source_info and return early
if (type == io_source_type::FILEPATH) {
source_info = cudf::io::source_info(file_name);
return;
}

std::ifstream file{file_name, std::ifstream::binary};

// Copy file contents to the specified io source buffer
switch (type) {
case io_source_type::HOST_BUFFER: {
h_buffer.resize(file_size);
file.read(h_buffer.data(), file_size);
source_info = cudf::io::source_info(h_buffer.data(), file_size);
break;
}
case io_source_type::PINNED_BUFFER: {
pinned_buffer.resize(file_size);
file.read(pinned_buffer.data(), file_size);
source_info = cudf::io::source_info(pinned_buffer.data(), file_size);
break;
}
case io_source_type::DEVICE_BUFFER: {
h_buffer.resize(file_size);
file.read(h_buffer.data(), file_size);
d_buffer.resize(file_size, stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(
d_buffer.data(), h_buffer.data(), file_size, cudaMemcpyDefault, stream.value()));

source_info = cudf::io::source_info(d_buffer);
break;
}
default: {
throw std::runtime_error("Encountered unexpected source type\n\n");
}
}
}
Loading
Loading