From 35dd6de356162a200c008aa4ee8a20704332cd8d Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 17 Sep 2024 23:11:07 +0800 Subject: [PATCH 1/5] a Signed-off-by: Calvin Neo --- dbms/src/Storages/S3/S3RandomAccessFile.cpp | 8 +- dbms/src/Storages/S3/S3RandomAccessFile.h | 143 ++++++++++++++++++++ 2 files changed, 149 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index 97e0180b07e..a32968b1e28 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -45,6 +45,8 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_p { RUNTIME_CHECK(client_ptr != nullptr); RUNTIME_CHECK(initialize(), remote_fname); + // TODO Update parameters + prefetch = std::make_unique(1, std::bind(&S3RandomAccessFile::readImpl, this, std::placeholders::_1, std::placeholders::_2), 10240); } std::string S3RandomAccessFile::getFileName() const @@ -61,7 +63,7 @@ ssize_t S3RandomAccessFile::read(char * buf, size_t size) { while (true) { - auto n = readImpl(buf, size); + auto n = prefetch->read(buf, size); if (unlikely(n < 0 && isRetryableError(errno))) { // If it is a retryable error, then initialize again @@ -147,7 +149,9 @@ off_t S3RandomAccessFile::seekImpl(off_t offset_, int whence) } Stopwatch sw; auto & istr = read_result.GetBody(); - if (!istr.ignore(offset_ - cur_offset)) + auto ignore_count = offset_ - cur_offset; + auto direct_ignore_count = prefetch->skip(ignore_count); + if (!istr.ignore(direct_ignore_count)) { LOG_ERROR(log, "Cannot ignore from istream, errmsg={}, cost={}ns", strerror(errno), sw.elapsed()); return -1; diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.h b/dbms/src/Storages/S3/S3RandomAccessFile.h index c7809411619..95247751bc5 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.h +++ b/dbms/src/Storages/S3/S3RandomAccessFile.h @@ -40,6 +40,147 @@ extern const int NOT_IMPLEMENTED; namespace DB::S3 { +struct PrefetchCache +{ + using ReadFunc = std::function; + + PrefetchCache(UInt32 hit_limit_, ReadFunc read_func_, size_t buffer_size_) + : hit_limit(hit_limit_) + , hit_count(0) + , read_func(read_func_) + , buffer_size(buffer_size_) + { + pos = buffer_size; + } + + // - If the read is filled entirely by cache, then we will return the "gcount" of the cache. + // - If the read is filled by both the cache and `read_func`, + // + If the read_func returns a positive number, we will add the contribution of the cache, and then return. + // + Otherwise, we will return what `read)func` returns. + ssize_t read(char * buf, size_t size) + { + if (hit_count++ < hit_limit) + { + // Do not use the cache. + return read_func(buf, size); + } + maybePrefetch(); + if (pos + size > buffer_limit) + { + // LOG_INFO( + // DB::Logger::get(), + // "!!!! try read {} from cache + direct buffer_limit={} pos={}", + // size, + // buffer_limit, + // pos); + // No enough data in cache. + ::memcpy(buf, write_buffer.data() + pos, buffer_limit); + auto read_from_cache = buffer_limit - pos; + cache_read += read_from_cache; + pos = buffer_limit; + auto expected_direct_read_bytes = size - read_from_cache; + auto res = read_func(buf + read_from_cache, expected_direct_read_bytes); + // LOG_INFO( + // DB::Logger::get(), + // "!!!! refill pos={} size={} buffer_size={} buffer_limit={} expected_direct_read_bytes={} " + // "read_from_cache={} res={} direct={}", + // pos, + // size, + // buffer_size, + // buffer_limit, + // expected_direct_read_bytes, + // read_from_cache, + // res, + // direct_read); + if (res < 0) + return res; + direct_read += res; + // We may not read `size` data. + // LOG_INFO(DB::Logger::get(), "!!!! result res={} buffer_limit={} pos={}", res, buffer_limit, pos); + return res + read_from_cache; + } + else + { + // LOG_INFO(DB::Logger::get(), "!!!! try read {} from cache", size); + ::memcpy(buf, write_buffer.data() + pos, size); + cache_read += size; + pos += size; + return size; + } + } + + size_t skip(size_t ignore_count) { + if (hit_count++ < hit_limit) + { + return ignore_count; + } + maybePrefetch(); + if (pos + ignore_count > buffer_limit) + { + // No enough data in cache. + auto read_from_cache = buffer_limit - pos; + pos = buffer_limit; + auto expected_direct_read_bytes = ignore_count - read_from_cache; + return expected_direct_read_bytes; + } + else + { + pos += ignore_count; + return 0; + } + } + + enum class PrefetchRes + { + NeedNot, + Ok, + }; + + PrefetchRes maybePrefetch() + { + if (eof) + { + return PrefetchRes::NeedNot; + } + if (pos >= buffer_size) + { + write_buffer.reserve(buffer_size); + // TODO Check if it is OK to read when the rest of the chars are less than size. + auto res = read_func(write_buffer.data(), buffer_size); + // LOG_INFO(DB::Logger::get(), "!!!! prefetch res res={}", res); + if (res < 0) + { + // Error state. + eof = true; + } + else + { + // If we actually got some data. + pos = 0; + buffer_limit = res; + } + } + return PrefetchRes::NeedNot; + } + + size_t getCacheRead() const { return cache_read; } + size_t getDirectRead() const { return direct_read; } + bool needsRefill() const { return pos >= buffer_limit; } + +private: + UInt32 hit_limit; + std::atomic hit_count; + bool eof = false; + ReadFunc read_func; + // Equal to size of `write_buffer`. + size_t buffer_size; + size_t pos; + // How many data is actually in the buffer. + size_t buffer_limit; + std::vector write_buffer; + size_t direct_read = 0; + size_t cache_read = 0; +}; class S3RandomAccessFile final : public RandomAccessFile { public: @@ -100,6 +241,8 @@ class S3RandomAccessFile final : public RandomAccessFile Int32 cur_retry = 0; static constexpr Int32 max_retry = 3; + + std::unique_ptr prefetch; }; } // namespace DB::S3 From fcd82e480460995f8c0fdc0bfa99806afbc123de Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 17 Sep 2024 23:22:05 +0800 Subject: [PATCH 2/5] format Signed-off-by: Calvin Neo --- dbms/src/Storages/S3/S3RandomAccessFile.cpp | 5 ++++- dbms/src/Storages/S3/S3RandomAccessFile.h | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index a32968b1e28..8b6bbcceae6 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -46,7 +46,10 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_p RUNTIME_CHECK(client_ptr != nullptr); RUNTIME_CHECK(initialize(), remote_fname); // TODO Update parameters - prefetch = std::make_unique(1, std::bind(&S3RandomAccessFile::readImpl, this, std::placeholders::_1, std::placeholders::_2), 10240); + prefetch = std::make_unique( + 1, + std::bind(&S3RandomAccessFile::readImpl, this, std::placeholders::_1, std::placeholders::_2), + 10240); } std::string S3RandomAccessFile::getFileName() const diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.h b/dbms/src/Storages/S3/S3RandomAccessFile.h index 95247751bc5..de24ba8d530 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.h +++ b/dbms/src/Storages/S3/S3RandomAccessFile.h @@ -109,7 +109,8 @@ struct PrefetchCache } } - size_t skip(size_t ignore_count) { + size_t skip(size_t ignore_count) + { if (hit_count++ < hit_limit) { return ignore_count; From b680f06161d792b5c728d30a96a71d389c9dc7bb Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 17 Sep 2024 23:56:41 +0800 Subject: [PATCH 3/5] fix Signed-off-by: Calvin Neo --- dbms/src/Storages/S3/S3RandomAccessFile.cpp | 4 +++- dbms/src/Storages/S3/tests/gtest_s3file.cpp | 9 +++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index 8b6bbcceae6..0a2ec7d6abc 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -48,7 +48,9 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_p // TODO Update parameters prefetch = std::make_unique( 1, - std::bind(&S3RandomAccessFile::readImpl, this, std::placeholders::_1, std::placeholders::_2), + [this](auto && arg1, auto && arg2) { + readImpl(std::forward(arg1), std::forward(arg2)); + }, 10240); } diff --git a/dbms/src/Storages/S3/tests/gtest_s3file.cpp b/dbms/src/Storages/S3/tests/gtest_s3file.cpp index 23e36b93ae1..feb21d5658b 100644 --- a/dbms/src/Storages/S3/tests/gtest_s3file.cpp +++ b/dbms/src/Storages/S3/tests/gtest_s3file.cpp @@ -265,6 +265,15 @@ try std::iota(expected.begin(), expected.end(), 1); ASSERT_EQ(tmp_buf, expected); } + { + std::vector tmp_buf(10); + auto n = file.read(tmp_buf.data(), tmp_buf.size()); + ASSERT_EQ(n, tmp_buf.size()); + + std::vector expected(256); + std::iota(expected.begin(), expected.end(), 11); + ASSERT_EQ(tmp_buf, expected); + } } CATCH From 22ec003e2a09e9440ca28d36198a5facfe9820e7 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 18 Sep 2024 00:20:43 +0800 Subject: [PATCH 4/5] a Signed-off-by: Calvin Neo --- dbms/src/Storages/S3/S3RandomAccessFile.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index 0a2ec7d6abc..66ff9795c66 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -49,7 +49,7 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_p prefetch = std::make_unique( 1, [this](auto && arg1, auto && arg2) { - readImpl(std::forward(arg1), std::forward(arg2)); + return readImpl(std::forward(arg1), std::forward(arg2)); }, 10240); } From b5b889bfb02bc9390b7e64f45e79b120af2daeaf Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 18 Sep 2024 01:08:30 +0800 Subject: [PATCH 5/5] pick Signed-off-by: Calvin Neo --- dbms/src/Storages/S3/S3RandomAccessFile.cpp | 8 ++++++- dbms/src/Storages/S3/S3RandomAccessFile.h | 25 ++++----------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index 66ff9795c66..9b03e25f317 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -51,7 +51,7 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_p [this](auto && arg1, auto && arg2) { return readImpl(std::forward(arg1), std::forward(arg2)); }, - 10240); + 10); } std::string S3RandomAccessFile::getFileName() const @@ -187,6 +187,12 @@ bool S3RandomAccessFile::initialize() { Stopwatch sw; bool request_succ = false; + if (prefetch != nullptr) { + auto to_revert = prefetch->getRevertCount(); + LOG_INFO(log, "S3 revert cache {}", to_revert); + cur_offset -= to_revert; + } + prefetch = std::make_unique(10, std::bind(&S3RandomAccessFile::readImpl, this, std::placeholders::_1, std::placeholders::_2), 1024 * 1024); Aws::S3::Model::GetObjectRequest req; req.SetRange(readRangeOfObject()); client_ptr->setBucketAndKeyWithRoot(req, remote_fname); diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.h b/dbms/src/Storages/S3/S3RandomAccessFile.h index de24ba8d530..f58c67b6f0e 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.h +++ b/dbms/src/Storages/S3/S3RandomAccessFile.h @@ -67,12 +67,6 @@ struct PrefetchCache maybePrefetch(); if (pos + size > buffer_limit) { - // LOG_INFO( - // DB::Logger::get(), - // "!!!! try read {} from cache + direct buffer_limit={} pos={}", - // size, - // buffer_limit, - // pos); // No enough data in cache. ::memcpy(buf, write_buffer.data() + pos, buffer_limit); auto read_from_cache = buffer_limit - pos; @@ -80,28 +74,14 @@ struct PrefetchCache pos = buffer_limit; auto expected_direct_read_bytes = size - read_from_cache; auto res = read_func(buf + read_from_cache, expected_direct_read_bytes); - // LOG_INFO( - // DB::Logger::get(), - // "!!!! refill pos={} size={} buffer_size={} buffer_limit={} expected_direct_read_bytes={} " - // "read_from_cache={} res={} direct={}", - // pos, - // size, - // buffer_size, - // buffer_limit, - // expected_direct_read_bytes, - // read_from_cache, - // res, - // direct_read); if (res < 0) return res; direct_read += res; // We may not read `size` data. - // LOG_INFO(DB::Logger::get(), "!!!! result res={} buffer_limit={} pos={}", res, buffer_limit, pos); return res + read_from_cache; } else { - // LOG_INFO(DB::Logger::get(), "!!!! try read {} from cache", size); ::memcpy(buf, write_buffer.data() + pos, size); cache_read += size; pos += size; @@ -148,7 +128,6 @@ struct PrefetchCache write_buffer.reserve(buffer_size); // TODO Check if it is OK to read when the rest of the chars are less than size. auto res = read_func(write_buffer.data(), buffer_size); - // LOG_INFO(DB::Logger::get(), "!!!! prefetch res res={}", res); if (res < 0) { // Error state. @@ -166,6 +145,10 @@ struct PrefetchCache size_t getCacheRead() const { return cache_read; } size_t getDirectRead() const { return direct_read; } + size_t getRevertCount() const { + if (hit_count < hit_limit) return 0; + return buffer_limit; + } bool needsRefill() const { return pos >= buffer_limit; } private: