diff --git a/ci/run_cudf_examples.sh b/ci/run_cudf_examples.sh index 0819eacf636..2439af5b644 100755 --- a/ci/run_cudf_examples.sh +++ b/ci/run_cudf_examples.sh @@ -23,7 +23,10 @@ compute-sanitizer --tool memcheck custom_optimized names.csv compute-sanitizer --tool memcheck custom_prealloc names.csv compute-sanitizer --tool memcheck custom_with_malloc names.csv -compute-sanitizer --tool memcheck parquet_io +compute-sanitizer --tool memcheck parquet_io example.parquet compute-sanitizer --tool memcheck parquet_io example.parquet output.parquet DELTA_BINARY_PACKED ZSTD TRUE +compute-sanitizer --tool memcheck parquet_io_multithreaded example.parquet +compute-sanitizer --tool memcheck parquet_io_multithreaded example.parquet 4 DEVICE_BUFFER 2 2 + exit ${EXITCODE} diff --git a/cpp/examples/parquet_io/CMakeLists.txt b/cpp/examples/parquet_io/CMakeLists.txt index d8e9205ffd4..a7d0146b170 100644 --- a/cpp/examples/parquet_io/CMakeLists.txt +++ b/cpp/examples/parquet_io/CMakeLists.txt @@ -16,10 +16,23 @@ project( include(../fetch_dependencies.cmake) -# Configure your project here +add_library(parquet_io_utils OBJECT common_utils.cpp io_source.cpp) +target_compile_features(parquet_io_utils PRIVATE cxx_std_17) +target_link_libraries(parquet_io_utils PRIVATE cudf::cudf) + +# Build and install parquet_io add_executable(parquet_io parquet_io.cpp) -target_link_libraries(parquet_io PRIVATE cudf::cudf) +target_link_libraries(parquet_io PRIVATE cudf::cudf nvToolsExt $) target_compile_features(parquet_io PRIVATE cxx_std_17) - install(TARGETS parquet_io DESTINATION bin/examples/libcudf) + +# Build and install parquet_io_multithreaded +add_executable(parquet_io_multithreaded parquet_io_multithreaded.cpp) +target_link_libraries( + parquet_io_multithreaded PRIVATE cudf::cudf nvToolsExt $ +) +target_compile_features(parquet_io_multithreaded PRIVATE cxx_std_17) +install(TARGETS parquet_io_multithreaded DESTINATION bin/examples/libcudf) + +# Install the example.parquet file install(FILES ${CMAKE_CURRENT_LIST_DIR}/example.parquet DESTINATION bin/examples/libcudf) diff --git a/cpp/examples/parquet_io/parquet_io.hpp b/cpp/examples/parquet_io/common_utils.cpp similarity index 50% rename from cpp/examples/parquet_io/parquet_io.hpp rename to cpp/examples/parquet_io/common_utils.cpp index e27cbec4fce..a79ca48af86 100644 --- a/cpp/examples/parquet_io/parquet_io.hpp +++ b/cpp/examples/parquet_io/common_utils.cpp @@ -14,30 +14,27 @@ * limitations under the License. */ -#pragma once +#include "common_utils.hpp" -#include +#include #include #include #include -#include -#include #include #include #include #include -#include -#include +#include #include /** - * @brief Create memory resource for libcudf functions + * @file common_utils.cpp + * @brief Definitions for common utilities for `parquet_io` examples * - * @param pool Whether to use a pool memory resource. - * @return Memory resource instance */ + std::shared_ptr create_memory_resource(bool is_pool_used) { auto cuda_mr = std::make_shared(); @@ -48,17 +45,11 @@ std::shared_ptr create_memory_resource(bool is_ return cuda_mr; } -/** - * @brief Get encoding type from the keyword - * - * @param name encoding keyword name - * @return corresponding column encoding type - */ -[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name) +cudf::io::column_encoding get_encoding_type(std::string name) { using encoding_type = cudf::io::column_encoding; - static const std::unordered_map map = { + static std::unordered_map const map = { {"DEFAULT", encoding_type::USE_DEFAULT}, {"DICTIONARY", encoding_type::DICTIONARY}, {"PLAIN", encoding_type::PLAIN}, @@ -69,26 +60,18 @@ std::shared_ptr create_memory_resource(bool is_ std::transform(name.begin(), name.end(), name.begin(), ::toupper); if (map.find(name) != map.end()) { return map.at(name); } - throw std::invalid_argument("FATAL: " + std::string(name) + + throw std::invalid_argument(name + " is not a valid encoding type.\n\n" "Available encoding types: DEFAULT, DICTIONARY, PLAIN,\n" "DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY,\n" - "DELTA_BYTE_ARRAY\n" - "\n" - "Exiting...\n"); + "DELTA_BYTE_ARRAY\n\n"); } -/** - * @brief Get compression type from the keyword - * - * @param name compression keyword name - * @return corresponding compression type - */ -[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name) +cudf::io::compression_type get_compression_type(std::string name) { using compression_type = cudf::io::compression_type; - static const std::unordered_map map = { + static std::unordered_map const map = { {"NONE", compression_type::NONE}, {"AUTO", compression_type::AUTO}, {"SNAPPY", compression_type::SNAPPY}, @@ -97,30 +80,58 @@ std::shared_ptr create_memory_resource(bool is_ std::transform(name.begin(), name.end(), name.begin(), ::toupper); if (map.find(name) != map.end()) { return map.at(name); } - throw std::invalid_argument("FATAL: " + std::string(name) + + throw std::invalid_argument(name + " is not a valid compression type.\n\n" - "Available compression_type types: NONE, AUTO, SNAPPY,\n" - "LZ4, ZSTD\n" - "\n" - "Exiting...\n"); + "Available compression types: NONE, AUTO, SNAPPY,\n" + "LZ4, ZSTD\n\n"); } -/** - * @brief Get the optional page size stat frequency from they keyword - * - * @param use_stats keyword affirmation string such as: Y, T, YES, TRUE, ON - * @return optional page statistics frequency set to full (STATISTICS_COLUMN) - */ -[[nodiscard]] std::optional get_page_size_stats(std::string use_stats) +bool get_boolean(std::string input) { - std::transform(use_stats.begin(), use_stats.end(), use_stats.begin(), ::toupper); + std::transform(input.begin(), input.end(), input.begin(), ::toupper); // Check if the input string matches to any of the following - if (not use_stats.compare("ON") or not use_stats.compare("TRUE") or - not use_stats.compare("YES") or not use_stats.compare("Y") or not use_stats.compare("T")) { - // Full column and offset indices - STATISTICS_COLUMN - return std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN); + return input == "ON" or input == "TRUE" or input == "YES" or input == "Y" or input == "T"; +} + +void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table) +{ + try { + // Left anti-join the original and transcoded tables + // identical tables should not throw an exception and + // return an empty indices vector + auto const indices = cudf::left_anti_join(lhs_table, rhs_table, cudf::null_equality::EQUAL); + + // No exception thrown, check indices + auto const valid = indices->size() == 0; + std::cout << "Tables identical: " << valid << "\n\n"; + } catch (std::exception& e) { + std::cerr << e.what() << std::endl << std::endl; + throw std::runtime_error("Tables identical: false\n\n"); } +} - return std::nullopt; +std::unique_ptr concatenate_tables(std::vector> tables, + rmm::cuda_stream_view stream) +{ + if (tables.size() == 1) { return std::move(tables[0]); } + + std::vector table_views; + table_views.reserve(tables.size()); + std::transform( + tables.begin(), tables.end(), std::back_inserter(table_views), [&](auto const& tbl) { + return tbl->view(); + }); + // Construct the final table + return cudf::concatenate(table_views, stream); +} + +std::string current_date_and_time() +{ + auto const time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); + auto const local_time = *std::localtime(&time); + // Stringstream to format the date and time + std::stringstream ss; + ss << std::put_time(&local_time, "%Y-%m-%d-%H-%M-%S"); + return ss.str(); } diff --git a/cpp/examples/parquet_io/common_utils.hpp b/cpp/examples/parquet_io/common_utils.hpp new file mode 100644 index 00000000000..12896e61a0d --- /dev/null +++ b/cpp/examples/parquet_io/common_utils.hpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +#include +#include + +/** + * @file common_utils.hpp + * @brief Common utilities for `parquet_io` examples + * + */ + +/** + * @brief Create memory resource for libcudf functions + * + * @param pool Whether to use a pool memory resource. + * @return Memory resource instance + */ +std::shared_ptr create_memory_resource(bool is_pool_used); + +/** + * @brief Get encoding type from the keyword + * + * @param name encoding keyword name + * @return corresponding column encoding type + */ +[[nodiscard]] cudf::io::column_encoding get_encoding_type(std::string name); + +/** + * @brief Get compression type from the keyword + * + * @param name compression keyword name + * @return corresponding compression type + */ +[[nodiscard]] cudf::io::compression_type get_compression_type(std::string name); + +/** + * @brief Get boolean from they keyword + * + * @param input keyword affirmation string such as: Y, T, YES, TRUE, ON + * @return true or false + */ +[[nodiscard]] bool get_boolean(std::string input); + +/** + * @brief Check if two tables are identical, throw an error otherwise + * + * @param lhs_table View to lhs table + * @param rhs_table View to rhs table + */ +void check_tables_equal(cudf::table_view const& lhs_table, cudf::table_view const& rhs_table); + +/** + * @brief Concatenate a vector of tables and return the resultant table + * + * @param tables Vector of tables to concatenate + * @param stream CUDA stream to use + * + * @return Unique pointer to the resultant concatenated table. + */ +std::unique_ptr concatenate_tables(std::vector> tables, + rmm::cuda_stream_view stream); + +/** + * @brief Returns a string containing current date and time + * + */ +std::string current_date_and_time(); diff --git a/cpp/examples/parquet_io/io_source.cpp b/cpp/examples/parquet_io/io_source.cpp new file mode 100644 index 00000000000..019b3f96474 --- /dev/null +++ b/cpp/examples/parquet_io/io_source.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "io_source.hpp" + +#include +#include + +#include +#include + +#include + +#include +#include +#include + +rmm::host_async_resource_ref pinned_memory_resource() +{ + static auto mr = rmm::mr::pinned_host_memory_resource{}; + return mr; +} + +io_source_type get_io_source_type(std::string name) +{ + static std::unordered_map const map = { + {"FILEPATH", io_source_type::FILEPATH}, + {"HOST_BUFFER", io_source_type::HOST_BUFFER}, + {"PINNED_BUFFER", io_source_type::PINNED_BUFFER}, + {"DEVICE_BUFFER", io_source_type::DEVICE_BUFFER}}; + + std::transform(name.begin(), name.end(), name.begin(), ::toupper); + if (map.find(name) != map.end()) { + return map.at(name); + } else { + throw std::invalid_argument(name + + " is not a valid io source type. Available: FILEPATH,\n" + "HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER.\n\n"); + } +} + +io_source::io_source(std::string_view file_path, io_source_type type, rmm::cuda_stream_view stream) + : pinned_buffer({pinned_memory_resource(), stream}), d_buffer{0, stream} +{ + std::string const file_name{file_path}; + auto const file_size = std::filesystem::file_size(file_name); + + // For filepath make a quick source_info and return early + if (type == io_source_type::FILEPATH) { + source_info = cudf::io::source_info(file_name); + return; + } + + std::ifstream file{file_name, std::ifstream::binary}; + + // Copy file contents to the specified io source buffer + switch (type) { + case io_source_type::HOST_BUFFER: { + h_buffer.resize(file_size); + file.read(h_buffer.data(), file_size); + source_info = cudf::io::source_info(h_buffer.data(), file_size); + break; + } + case io_source_type::PINNED_BUFFER: { + pinned_buffer.resize(file_size); + file.read(pinned_buffer.data(), file_size); + source_info = cudf::io::source_info(pinned_buffer.data(), file_size); + break; + } + case io_source_type::DEVICE_BUFFER: { + h_buffer.resize(file_size); + file.read(h_buffer.data(), file_size); + d_buffer.resize(file_size, stream); + CUDF_CUDA_TRY(cudaMemcpyAsync( + d_buffer.data(), h_buffer.data(), file_size, cudaMemcpyDefault, stream.value())); + + source_info = cudf::io::source_info(d_buffer); + break; + } + default: { + throw std::runtime_error("Encountered unexpected source type\n\n"); + } + } +} diff --git a/cpp/examples/parquet_io/io_source.hpp b/cpp/examples/parquet_io/io_source.hpp new file mode 100644 index 00000000000..a614d348fae --- /dev/null +++ b/cpp/examples/parquet_io/io_source.hpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include + +#include + +#include + +/** + * @file io_source.hpp + * @brief Utilities for constructing the specified IO sources from the input parquet files. + * + */ + +/** + * @brief Available IO source types + */ +enum class io_source_type { FILEPATH, HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER }; + +/** + * @brief Get io source type from the string keyword argument + * + * @param name io source type keyword name + * @return io source type + */ +[[nodiscard]] io_source_type get_io_source_type(std::string name); + +/** + * @brief Create and return a reference to a static pinned memory pool + * + * @return Reference to a static pinned memory pool + */ +rmm::host_async_resource_ref pinned_memory_resource(); + +/** + * @brief Custom allocator for pinned_buffer via RMM. + */ +template +struct pinned_allocator : public std::allocator { + pinned_allocator(rmm::host_async_resource_ref _mr, rmm::cuda_stream_view _stream) + : mr{_mr}, stream{_stream} + { + } + + T* allocate(std::size_t n) + { + auto ptr = mr.allocate_async(n * sizeof(T), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + stream.synchronize(); + return static_cast(ptr); + } + + void deallocate(T* ptr, std::size_t n) + { + mr.deallocate_async(ptr, n * sizeof(T), rmm::RMM_DEFAULT_HOST_ALIGNMENT, stream); + } + + private: + rmm::host_async_resource_ref mr; + rmm::cuda_stream_view stream; +}; + +/** + * @brief Class to create a cudf::io::source_info of given type from the input parquet file + * + */ +class io_source { + public: + io_source(std::string_view file_path, io_source_type io_type, rmm::cuda_stream_view stream); + + // Get the internal source info + [[nodiscard]] cudf::io::source_info get_source_info() const { return source_info; } + + private: + // alias for pinned vector + template + using pinned_vector = thrust::host_vector>; + cudf::io::source_info source_info; + std::vector h_buffer; + pinned_vector pinned_buffer; + rmm::device_uvector d_buffer; +}; diff --git a/cpp/examples/parquet_io/parquet_io.cpp b/cpp/examples/parquet_io/parquet_io.cpp index 9cda22d0695..c11b8de82b5 100644 --- a/cpp/examples/parquet_io/parquet_io.cpp +++ b/cpp/examples/parquet_io/parquet_io.cpp @@ -14,11 +14,15 @@ * limitations under the License. */ -#include "parquet_io.hpp" - #include "../utilities/timer.hpp" +#include "common_utils.hpp" +#include "io_source.hpp" + +#include +#include +#include -#include +#include /** * @file parquet_io.cpp @@ -81,6 +85,18 @@ void write_parquet(cudf::table_view input, cudf::io::write_parquet(options); } +/** + * @brief Function to print example usage and argument information. + */ +void print_usage() +{ + std::cout << "\nUsage: parquet_io \n" + " \n\n" + "Available encoding types: DEFAULT, DICTIONARY, PLAIN, DELTA_BINARY_PACKED,\n" + " DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY\n\n" + "Available compression types: NONE, AUTO, SNAPPY, LZ4, ZSTD\n\n"; +} + /** * @brief Main for nested_types examples * @@ -97,29 +113,28 @@ void write_parquet(cudf::table_view input, */ int main(int argc, char const** argv) { - std::string input_filepath; - std::string output_filepath; - cudf::io::column_encoding encoding; - cudf::io::compression_type compression; - std::optional page_stats; + std::string input_filepath = "example.parquet"; + std::string output_filepath = "output.parquet"; + cudf::io::column_encoding encoding = get_encoding_type("DELTA_BINARY_PACKED"); + cudf::io::compression_type compression = get_compression_type("ZSTD"); + std::optional page_stats = std::nullopt; switch (argc) { - case 1: - input_filepath = "example.parquet"; - output_filepath = "output.parquet"; - encoding = get_encoding_type("DELTA_BINARY_PACKED"); - compression = get_compression_type("ZSTD"); - break; - case 6: page_stats = get_page_size_stats(argv[5]); [[fallthrough]]; - case 5: - input_filepath = argv[1]; - output_filepath = argv[2]; - encoding = get_encoding_type(argv[3]); - compression = get_compression_type(argv[4]); - break; - default: - throw std::runtime_error( - "Either provide all command-line arguments, or none to use defaults\n"); + case 6: + page_stats = get_boolean(argv[5]) + ? std::make_optional(cudf::io::statistics_freq::STATISTICS_COLUMN) + : std::nullopt; + [[fallthrough]]; + case 5: compression = get_compression_type(argv[4]); [[fallthrough]]; + case 4: encoding = get_encoding_type(argv[3]); [[fallthrough]]; + case 3: output_filepath = argv[2]; [[fallthrough]]; + case 2: // Check if instead of input_paths, the first argument is `-h` or `--help` + if (auto arg = std::string{argv[1]}; arg != "-h" and arg != "--help") { + input_filepath = std::move(arg); + break; + } + [[fallthrough]]; + default: print_usage(); throw std::runtime_error(""); } // Create and use a memory pool @@ -130,18 +145,16 @@ int main(int argc, char const** argv) // Read input parquet file // We do not want to time the initial read time as it may include // time for nvcomp, cufile loading and RMM growth - std::cout << std::endl << "Reading " << input_filepath << "..." << std::endl; + std::cout << "\nReading " << input_filepath << "...\n"; std::cout << "Note: Not timing the initial parquet read as it may include\n" - "times for nvcomp, cufile loading and RMM growth." - << std::endl - << std::endl; + "times for nvcomp, cufile loading and RMM growth.\n\n"; auto [input, metadata] = read_parquet(input_filepath); // Status string to indicate if page stats are set to be written or not auto page_stat_string = (page_stats.has_value()) ? "page stats" : "no page stats"; // Write parquet file with the specified encoding and compression std::cout << "Writing " << output_filepath << " with encoding, compression and " - << page_stat_string << ".." << std::endl; + << page_stat_string << "..\n"; // `timer` is automatically started here cudf::examples::timer timer; @@ -149,7 +162,7 @@ int main(int argc, char const** argv) timer.print_elapsed_millis(); // Read the parquet file written with encoding and compression - std::cout << "Reading " << output_filepath << "..." << std::endl; + std::cout << "Reading " << output_filepath << "...\n"; // Reset the timer timer.reset(); @@ -157,23 +170,7 @@ int main(int argc, char const** argv) timer.print_elapsed_millis(); // Check for validity - try { - // Left anti-join the original and transcoded tables - // identical tables should not throw an exception and - // return an empty indices vector - auto const indices = cudf::left_anti_join(input->view(), - transcoded_input->view(), - cudf::null_equality::EQUAL, - cudf::get_default_stream(), - resource.get()); - - // No exception thrown, check indices - auto const valid = indices->size() == 0; - std::cout << "Transcoding valid: " << std::boolalpha << valid << std::endl; - } catch (std::exception& e) { - std::cerr << e.what() << std::endl << std::endl; - std::cout << "Transcoding valid: false" << std::endl; - } + check_tables_equal(input->view(), transcoded_input->view()); return 0; } diff --git a/cpp/examples/parquet_io/parquet_io_multithreaded.cpp b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp new file mode 100644 index 00000000000..6ad4b862240 --- /dev/null +++ b/cpp/examples/parquet_io/parquet_io_multithreaded.cpp @@ -0,0 +1,466 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../utilities/timer.hpp" +#include "common_utils.hpp" +#include "io_source.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + +/** + * @file parquet_io_multithreaded.cpp + * @brief Demonstrates reading parquet data from the specified io source using multiple threads. + * + * The input parquet data is provided via files which are converted to the specified io source type + * to be read using multiple threads. Optionally, the parquet data read by each thread can be + * written to corresponding files and checked for validatity of the output files against the input + * data. + * + * Run: ``parquet_io_multithreaded -h`` to see help with input args and more information. + * + * The following io source types are supported: + * IO source types: FILEPATH, HOST_BUFFER, PINNED_BUFFER, DEVICE_BUFFER + * + */ + +// Type alias for unique ptr to cudf table +using table_t = std::unique_ptr; + +/** + * @brief Behavior when handling the read tables by multiple threads + */ +enum class read_mode { + NO_CONCATENATE, ///< Only read and discard tables + CONCATENATE_THREAD, ///< Read and concatenate tables from each thread + CONCATENATE_ALL, ///< Read and concatenate everything to a single table +}; + +/** + * @brief Functor for multithreaded parquet reading based on the provided read_mode + */ +template +struct read_fn { + std::vector const& input_sources; + std::vector& tables; + int const thread_id; + int const thread_count; + rmm::cuda_stream_view stream; + + void operator()() + { + // Tables read by this thread + std::vector tables_this_thread; + + // Sweep the available input files + for (auto curr_file_idx = thread_id; curr_file_idx < input_sources.size(); + curr_file_idx += thread_count) { + auto builder = + cudf::io::parquet_reader_options::builder(input_sources[curr_file_idx].get_source_info()); + auto const options = builder.build(); + if constexpr (read_mode != read_mode::NO_CONCATENATE) { + tables_this_thread.push_back(cudf::io::read_parquet(options, stream).tbl); + } else { + cudf::io::read_parquet(options, stream); + } + } + + // Concatenate the tables read by this thread if not NO_CONCATENATE read_mode. + if constexpr (read_mode != read_mode::NO_CONCATENATE) { + auto table = concatenate_tables(std::move(tables_this_thread), stream); + stream.synchronize_no_throw(); + tables[thread_id] = std::move(table); + } else { + // Just synchronize this stream and exit + stream.synchronize_no_throw(); + } + } +}; + +/** + * @brief Function to setup and launch multithreaded parquet reading. + * + * @tparam read_mode Specifies if to concatenate and return the actual + * tables or discard them and return an empty vector + * + * @param input_sources List of input sources to read + * @param thread_count Number of threads + * @param stream_pool CUDA stream pool to use for threads + * + * @return Vector of read tables. + */ +template +std::vector read_parquet_multithreaded(std::vector const& input_sources, + int32_t thread_count, + rmm::cuda_stream_pool& stream_pool) +{ + // Tables read by each thread + std::vector tables(thread_count); + + // Table reading tasks + std::vector> read_tasks; + read_tasks.reserve(thread_count); + + // Create the read tasks + std::for_each( + thrust::make_counting_iterator(0), thrust::make_counting_iterator(thread_count), [&](auto tid) { + read_tasks.emplace_back( + read_fn{input_sources, tables, tid, thread_count, stream_pool.get_stream()}); + }); + + // Create threads with tasks + std::vector threads; + threads.reserve(thread_count); + for (auto& c : read_tasks) { + threads.emplace_back(c); + } + for (auto& t : threads) { + t.join(); + } + + // If CONCATENATE_ALL mode, then concatenate to a vector of one final table. + if (read_mode == read_mode::CONCATENATE_ALL) { + auto stream = stream_pool.get_stream(); + auto final_tbl = concatenate_tables(std::move(tables), stream); + stream.synchronize(); + tables.clear(); + tables.emplace_back(std::move(final_tbl)); + } + + return tables; +} + +/** + * @brief Functor for multithreaded parquet writing + */ +struct write_fn { + std::string const& output_path; + std::vector const& table_views; + int const thread_id; + rmm::cuda_stream_view stream; + + void operator()() + { + // Create a sink + cudf::io::sink_info const sink_info{output_path + "/table_" + std::to_string(thread_id) + + ".parquet"}; + // Writer options builder + auto builder = cudf::io::parquet_writer_options::builder(sink_info, table_views[thread_id]); + // Create a new metadata for the table + auto table_metadata = cudf::io::table_input_metadata{table_views[thread_id]}; + + builder.metadata(table_metadata); + auto options = builder.build(); + + // Write parquet data + cudf::io::write_parquet(options, stream); + + // Done with this stream + stream.synchronize_no_throw(); + } +}; + +/** + * @brief Function to setup and launch multithreaded writing parquet files. + * + * @param output_path Path to output directory + * @param tables List of at least table views to be written + * @param thread_count Number of threads to use for writing tables. + * @param stream_pool CUDA stream pool to use for threads + * + */ +void write_parquet_multithreaded(std::string const& output_path, + std::vector const& tables, + int32_t thread_count, + rmm::cuda_stream_pool& stream_pool) +{ + // Table writing tasks + std::vector write_tasks; + write_tasks.reserve(thread_count); + std::for_each( + thrust::make_counting_iterator(0), thrust::make_counting_iterator(thread_count), [&](auto tid) { + write_tasks.emplace_back(write_fn{output_path, tables, tid, stream_pool.get_stream()}); + }); + + // Writer threads + std::vector threads; + threads.reserve(thread_count); + for (auto& c : write_tasks) { + threads.emplace_back(c); + } + for (auto& t : threads) { + t.join(); + } +} + +/** + * @brief Function to print example usage and argument information. + */ +void print_usage() +{ + std::cout + << "\nUsage: parquet_io_multithreaded \n" + " \n" + " \n\n" + "Available IO source types: FILEPATH, HOST_BUFFER, PINNED_BUFFER (Default), " + "DEVICE_BUFFER\n\n" + "Note: Provide as many arguments as you like in the above order. Default values\n" + " for the unprovided arguments will be used. All input parquet files will\n" + " be converted to the specified IO source type before reading\n\n"; +} + +/** + * @brief Function to process comma delimited input paths string to parquet files and/or dirs + * and convert them to specified io sources. + * + * Process the input path string containing directories (of parquet files) and/or individual + * parquet files into a list of input parquet files, multiple the list by `input_multiplier`, + * make sure to have at least `thread_count` files to satisfy at least file per parallel thread, + * and convert the final list of files to a list of `io_source` and return. + * + * @param paths Comma delimited input paths string + * @param input_multiplier Multiplier for the input files list + * @param thread_count Number of threads being used in the example + * @param io_source_type Specified IO source type to convert input files to + * @param stream CUDA stream to use + * + * @return Vector of input sources for the given paths + */ +std::vector extract_input_sources(std::string const& paths, + int32_t input_multiplier, + int32_t thread_count, + io_source_type io_source_type, + rmm::cuda_stream_view stream) +{ + // Get the delimited paths to directory and/or files. + std::vector const delimited_paths = [&]() { + std::vector paths_list; + std::stringstream strstream{paths}; + std::string path; + // Extract the delimited paths. + while (std::getline(strstream, path, char{','})) { + paths_list.push_back(path); + } + return paths_list; + }(); + + // List of parquet files + std::vector parquet_files; + std::for_each(delimited_paths.cbegin(), delimited_paths.cend(), [&](auto const& path_string) { + std::filesystem::path path{path_string}; + // If this is a parquet file, add it. + if (std::filesystem::is_regular_file(path)) { + parquet_files.push_back(path_string); + } + // If this is a directory, add all files in the directory. + else if (std::filesystem::is_directory(path)) { + for (auto const& file : std::filesystem::directory_iterator(path)) { + if (std::filesystem::is_regular_file(file.path())) { + parquet_files.push_back(file.path().string()); + } else { + std::cout << "Skipping sub-directory: " << file.path().string() << "\n"; + } + } + } else { + print_usage(); + throw std::runtime_error("Encountered an invalid input path\n"); + } + }); + + // Current size of list of parquet files + auto const initial_size = parquet_files.size(); + if (initial_size == 0) { return {}; } + + // Reserve space + parquet_files.reserve(std::max(thread_count, input_multiplier * parquet_files.size())); + + // Append the input files by input_multiplier times + std::for_each(thrust::make_counting_iterator(1), + thrust::make_counting_iterator(input_multiplier), + [&](auto i) { + parquet_files.insert(parquet_files.end(), + parquet_files.begin(), + parquet_files.begin() + initial_size); + }); + + // Cycle append parquet files from the existing ones if less than the thread_count + std::cout << "Warning: Number of input sources < thread count. Cycling from\n" + "and appending to current input sources such that the number of\n" + "input source == thread count\n"; + for (size_t idx = 0; thread_count > static_cast(parquet_files.size()); idx++) { + parquet_files.emplace_back(parquet_files[idx % initial_size]); + } + + // Vector of io sources + std::vector input_sources; + input_sources.reserve(parquet_files.size()); + // Transform input files to the specified io sources + std::transform(parquet_files.begin(), + parquet_files.end(), + std::back_inserter(input_sources), + [&](auto const& file_name) { + return io_source{file_name, io_source_type, stream}; + }); + stream.synchronize(); + return input_sources; +} + +/** + * @brief The main function + */ +int32_t main(int argc, char const** argv) +{ + // Set arguments to defaults + std::string input_paths = "example.parquet"; + int32_t input_multiplier = 1; + int32_t num_reads = 1; + int32_t thread_count = 1; + io_source_type io_source_type = io_source_type::PINNED_BUFFER; + bool write_and_validate = false; + + // Set to the provided args + switch (argc) { + case 7: write_and_validate = get_boolean(argv[6]); [[fallthrough]]; + case 6: thread_count = std::max(thread_count, std::stoi(std::string{argv[5]})); [[fallthrough]]; + case 5: num_reads = std::max(1, std::stoi(argv[4])); [[fallthrough]]; + case 4: io_source_type = get_io_source_type(argv[3]); [[fallthrough]]; + case 3: + input_multiplier = std::max(input_multiplier, std::stoi(std::string{argv[2]})); + [[fallthrough]]; + case 2: + // Check if instead of input_paths, the first argument is `-h` or `--help` + if (auto arg = std::string{argv[1]}; arg != "-h" and arg != "--help") { + input_paths = std::move(arg); + break; + } + [[fallthrough]]; + default: print_usage(); throw std::runtime_error(""); + } + + // Initialize mr, default stream and stream pool + auto const is_pool_used = true; + auto resource = create_memory_resource(is_pool_used); + auto default_stream = cudf::get_default_stream(); + auto stream_pool = rmm::cuda_stream_pool(thread_count); + auto stats_mr = + rmm::mr::statistics_resource_adaptor(resource.get()); + rmm::mr::set_current_device_resource(&stats_mr); + + // List of input sources from the input_paths string. + auto const input_sources = extract_input_sources( + input_paths, input_multiplier, thread_count, io_source_type, default_stream); + + // Check if there is nothing to do + if (input_sources.empty()) { + print_usage(); + throw std::runtime_error("No input files to read. Exiting early.\n"); + } + + // Read the same parquet files specified times with multiple threads and discard the read tables + { + // Print status + std::cout << "\nReading " << input_sources.size() << " input sources " << num_reads + << " time(s) using " << thread_count + << " threads and discarding output " + "tables..\n"; + + if (io_source_type == io_source_type::FILEPATH) { + std::cout << "Note that the first read may include times for nvcomp, cufile loading and RMM " + "growth.\n\n"; + } + + cudf::examples::timer timer; + std::for_each(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_reads), + [&](auto i) { // Read parquet files and discard the tables + std::ignore = read_parquet_multithreaded( + input_sources, thread_count, stream_pool); + }); + default_stream.synchronize(); + timer.print_elapsed_millis(); + } + + // Write parquet files and validate if needed + if (write_and_validate) { + // read_mode::CONCATENATE_THREADS returns a vector of `thread_count` tables + auto const tables = read_parquet_multithreaded( + input_sources, thread_count, stream_pool); + default_stream.synchronize(); + + // Construct a vector of table views for write_parquet_multithreaded + auto const table_views = [&tables]() { + std::vector table_views; + table_views.reserve(tables.size()); + std::transform( + tables.cbegin(), tables.cend(), std::back_inserter(table_views), [](auto const& tbl) { + return tbl->view(); + }); + return table_views; + }(); + + // Write tables to parquet + std::cout << "Writing parquet output files..\n"; + + // Create a directory at the tmpdir path. + std::string output_path = + std::filesystem::temp_directory_path().string() + "/output_" + current_date_and_time(); + std::filesystem::create_directory({output_path}); + cudf::examples::timer timer; + write_parquet_multithreaded(output_path, table_views, thread_count, stream_pool); + default_stream.synchronize(); + timer.print_elapsed_millis(); + + // Verify the output + std::cout << "Verifying output..\n"; + + // Simply concatenate the previously read tables from input sources + auto const input_table = cudf::concatenate(table_views, default_stream); + + // Sources from written parquet files + auto const written_pq_sources = extract_input_sources( + output_path, input_multiplier, thread_count, io_source_type, default_stream); + + // read_mode::CONCATENATE_ALL returns a concatenated vector of 1 table only + auto const transcoded_table = std::move(read_parquet_multithreaded( + written_pq_sources, thread_count, stream_pool) + .back()); + default_stream.synchronize(); + + // Check if the tables are identical + check_tables_equal(input_table->view(), transcoded_table->view()); + + // Remove the created temp directory and parquet data + std::filesystem::remove_all(output_path); + } + + // Print peak memory + std::cout << "Peak memory: " << (stats_mr.get_bytes_counter().peak / 1048576.0) << " MB\n\n"; + + return 0; +}