Skip to content

Commit

Permalink
Add an example to demonstrate multithreaded read_parquet pipelines (#…
Browse files Browse the repository at this point in the history
…16828)

Closes #16717. This PR adds a new example to read multiple parquet files using multiple threads.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)
  - David Wendt (https://github.com/davidwendt)
  - Basit Ayantunde (https://github.com/lamarrr)

URL: #16828
  • Loading branch information
mhaseeb123 authored Oct 11, 2024
1 parent c8a56a5 commit be1dd32
Show file tree
Hide file tree
Showing 8 changed files with 875 additions and 98 deletions.
5 changes: 4 additions & 1 deletion ci/run_cudf_examples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ compute-sanitizer --tool memcheck custom_optimized names.csv
compute-sanitizer --tool memcheck custom_prealloc names.csv
compute-sanitizer --tool memcheck custom_with_malloc names.csv

compute-sanitizer --tool memcheck parquet_io
compute-sanitizer --tool memcheck parquet_io example.parquet
compute-sanitizer --tool memcheck parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD TRUE

compute-sanitizer --tool memcheck parquet_io_multithreaded example.parquet
compute-sanitizer --tool memcheck parquet_io_multithreaded example.parquet 4 DEVICE_BUFFER 2 2

exit ${EXITCODE}
19 changes: 16 additions & 3 deletions cpp/examples/parquet_io/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,23 @@ project(

include(../fetch_dependencies.cmake)

# Configure your project here
add_library(parquet_io_utils OBJECT common_utils.cpp io_source.cpp)
target_compile_features(parquet_io_utils PRIVATE cxx_std_17)
target_link_libraries(parquet_io_utils PRIVATE cudf::cudf)

# Build and install parquet_io
add_executable(parquet_io parquet_io.cpp)
target_link_libraries(parquet_io PRIVATE cudf::cudf)
target_link_libraries(parquet_io PRIVATE cudf::cudf nvToolsExt $<TARGET_OBJECTS:parquet_io_utils>)
target_compile_features(parquet_io PRIVATE cxx_std_17)

install(TARGETS parquet_io DESTINATION bin/examples/libcudf)

# Build and install parquet_io_multithreaded
add_executable(parquet_io_multithreaded parquet_io_multithreaded.cpp)
target_link_libraries(
parquet_io_multithreaded PRIVATE cudf::cudf nvToolsExt $<TARGET_OBJECTS:parquet_io_utils>
)
target_compile_features(parquet_io_multithreaded PRIVATE cxx_std_17)
install(TARGETS parquet_io_multithreaded DESTINATION bin/examples/libcudf)

# Install the example.parquet file
install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf)
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@
* limitations under the License.
*/

#pragma once
#include "common_utils.hpp"

#include <cudf/io/parquet.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/io/types.hpp>
#include <cudf/join.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_device.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>
#include <rmm/mr/device/owning_wrapper.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>

#include <chrono>
#include <iostream>
#include <optional>
#include <iomanip>
#include <string>

/**
* @brief Create memory resource for libcudf functions
* @file common_utils.cpp
* @brief Definitions for common utilities for `parquet_io` examples
*
* @param pool Whether to use a pool memory resource.
* @return Memory resource instance
*/

std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used)
{
auto cuda_mr = std::make_shared<rmm::mr::cuda_memory_resource>();
Expand All @@ -48,17 +45,11 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_
return cuda_mr;
}

/**
* @brief Get encoding type from the keyword
*
* @param name encoding keyword name
* @return corresponding column encoding type
*/
[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name)
cudf::io::column_encoding get_encoding_type(std::string name)
{
using encoding_type = cudf::io::column_encoding;

static const std::unordered_map<std::string_view, cudf::io::column_encoding> map = {
static std::unordered_map<std::string_view, encoding_type> const map = {
{"DEFAULT", encoding_type::USE_DEFAULT},
{"DICTIONARY", encoding_type::DICTIONARY},
{"PLAIN", encoding_type::PLAIN},
Expand All @@ -69,26 +60,18 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) { return map.at(name); }
throw std::invalid_argument("FATAL: " + std::string(name) +
throw std::invalid_argument(name +
" is not a valid encoding type.\n\n"
"Available encoding types: DEFAULT, DICTIONARY, PLAIN,\n"
"DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY,\n"
"DELTA_BYTE_ARRAY\n"
"\n"
"Exiting...\n");
"DELTA_BYTE_ARRAY\n\n");
}

/**
* @brief Get compression type from the keyword
*
* @param name compression keyword name
* @return corresponding compression type
*/
[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name)
cudf::io::compression_type get_compression_type(std::string name)
{
using compression_type = cudf::io::compression_type;

static const std::unordered_map<std::string_view, cudf::io::compression_type> map = {
static std::unordered_map<std::string_view, compression_type> const map = {
{"NONE", compression_type::NONE},
{"AUTO", compression_type::AUTO},
{"SNAPPY", compression_type::SNAPPY},
Expand All @@ -97,30 +80,58 @@ std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) { return map.at(name); }
throw std::invalid_argument("FATAL: " + std::string(name) +
throw std::invalid_argument(name +
" is not a valid compression type.\n\n"
"Available compression_type types: NONE, AUTO, SNAPPY,\n"
"LZ4, ZSTD\n"
"\n"
"Exiting...\n");
"Available compression types: NONE, AUTO, SNAPPY,\n"
"LZ4, ZSTD\n\n");
}

/**
* @brief Get the optional page size stat frequency from they keyword
*
* @param use_stats keyword affirmation string such as: Y, T, YES, TRUE, ON
* @return optional page statistics frequency set to full (STATISTICS_COLUMN)
*/
[[nodiscard]] std::optional<cudf::io::statistics_freq> get_page_size_stats(std::string use_stats)
bool get_boolean(std::string input)
{
std::transform(use_stats.begin(), use_stats.end(), use_stats.begin(), ::toupper);
std::transform(input.begin(), input.end(), input.begin(), ::toupper);

// Check if the input string matches to any of the following
if (not use_stats.compare("ON") or not use_stats.compare("TRUE") or
not use_stats.compare("YES") or not use_stats.compare("Y") or not use_stats.compare("T")) {
// Full column and offset indices - STATISTICS_COLUMN
return std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN);
return input == "ON" or input == "TRUE" or input == "YES" or input == "Y" or input == "T";
}

void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table)
{
try {
// Left anti-join the original and transcoded tables
// identical tables should not throw an exception and
// return an empty indices vector
auto const indices = cudf::left_anti_join(lhs_table, rhs_table, cudf::null_equality::EQUAL);

// No exception thrown, check indices
auto const valid = indices->size() == 0;
std::cout << "Tables identical: " << valid << "\n\n";
} catch (std::exception& e) {
std::cerr << e.what() << std::endl << std::endl;
throw std::runtime_error("Tables identical: false\n\n");
}
}

return std::nullopt;
std::unique_ptr<cudf::table> concatenate_tables(std::vector<std::unique_ptr<cudf::table>> tables,
rmm::cuda_stream_view stream)
{
if (tables.size() == 1) { return std::move(tables[0]); }

std::vector<cudf::table_view> table_views;
table_views.reserve(tables.size());
std::transform(
tables.begin(), tables.end(), std::back_inserter(table_views), [&](auto const& tbl) {
return tbl->view();
});
// Construct the final table
return cudf::concatenate(table_views, stream);
}

std::string current_date_and_time()
{
auto const time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto const local_time = *std::localtime(&time);
// Stringstream to format the date and time
std::stringstream ss;
ss << std::put_time(&local_time, "%Y-%m-%d-%H-%M-%S");
return ss.str();
}
89 changes: 89 additions & 0 deletions cpp/examples/parquet_io/common_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cudf/io/types.hpp>
#include <cudf/table/table_view.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/device/device_memory_resource.hpp>

#include <memory>
#include <string>

/**
* @file common_utils.hpp
* @brief Common utilities for `parquet_io` examples
*
*/

/**
* @brief Create memory resource for libcudf functions
*
* @param pool Whether to use a pool memory resource.
* @return Memory resource instance
*/
std::shared_ptr<rmm::mr::device_memory_resource> create_memory_resource(bool is_pool_used);

/**
* @brief Get encoding type from the keyword
*
* @param name encoding keyword name
* @return corresponding column encoding type
*/
[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name);

/**
* @brief Get compression type from the keyword
*
* @param name compression keyword name
* @return corresponding compression type
*/
[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name);

/**
* @brief Get boolean from they keyword
*
* @param input keyword affirmation string such as: Y, T, YES, TRUE, ON
* @return true or false
*/
[[nodiscard]] bool get_boolean(std::string input);

/**
* @brief Check if two tables are identical, throw an error otherwise
*
* @param lhs_table View to lhs table
* @param rhs_table View to rhs table
*/
void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table);

/**
* @brief Concatenate a vector of tables and return the resultant table
*
* @param tables Vector of tables to concatenate
* @param stream CUDA stream to use
*
* @return Unique pointer to the resultant concatenated table.
*/
std::unique_ptr<cudf::table> concatenate_tables(std::vector<std::unique_ptr<cudf::table>> tables,
rmm::cuda_stream_view stream);

/**
* @brief Returns a string containing current date and time
*
*/
std::string current_date_and_time();
97 changes: 97 additions & 0 deletions cpp/examples/parquet_io/io_source.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "io_source.hpp"

#include <cudf/io/types.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/mr/pinned_host_memory_resource.hpp>

#include <thrust/host_vector.h>

#include <filesystem>
#include <fstream>
#include <string>

rmm::host_async_resource_ref pinned_memory_resource()
{
static auto mr = rmm::mr::pinned_host_memory_resource{};
return mr;
}

io_source_type get_io_source_type(std::string name)
{
static std::unordered_map<std::string_view, io_source_type> const map = {
{"FILEPATH", io_source_type::FILEPATH},
{"HOST_BUFFER", io_source_type::HOST_BUFFER},
{"PINNED_BUFFER", io_source_type::PINNED_BUFFER},
{"DEVICE_BUFFER", io_source_type::DEVICE_BUFFER}};

std::transform(name.begin(), name.end(), name.begin(), ::toupper);
if (map.find(name) != map.end()) {
return map.at(name);
} else {
throw std::invalid_argument(name +
" is not a valid io source type. Available: FILEPATH,\n"
"HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER.\n\n");
}
}

io_source::io_source(std::string_view file_path, io_source_type type, rmm::cuda_stream_view stream)
: pinned_buffer({pinned_memory_resource(), stream}), d_buffer{0, stream}
{
std::string const file_name{file_path};
auto const file_size = std::filesystem::file_size(file_name);

// For filepath make a quick source_info and return early
if (type == io_source_type::FILEPATH) {
source_info = cudf::io::source_info(file_name);
return;
}

std::ifstream file{file_name, std::ifstream::binary};

// Copy file contents to the specified io source buffer
switch (type) {
case io_source_type::HOST_BUFFER: {
h_buffer.resize(file_size);
file.read(h_buffer.data(), file_size);
source_info = cudf::io::source_info(h_buffer.data(), file_size);
break;
}
case io_source_type::PINNED_BUFFER: {
pinned_buffer.resize(file_size);
file.read(pinned_buffer.data(), file_size);
source_info = cudf::io::source_info(pinned_buffer.data(), file_size);
break;
}
case io_source_type::DEVICE_BUFFER: {
h_buffer.resize(file_size);
file.read(h_buffer.data(), file_size);
d_buffer.resize(file_size, stream);
CUDF_CUDA_TRY(cudaMemcpyAsync(
d_buffer.data(), h_buffer.data(), file_size, cudaMemcpyDefault, stream.value()));

source_info = cudf::io::source_info(d_buffer);
break;
}
default: {
throw std::runtime_error("Encountered unexpected source type\n\n");
}
}
}
Loading

0 comments on commit be1dd32

Please sign in to comment.