Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM s3 random cache #9435

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions dbms/src/Storages/S3/S3RandomAccessFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr<TiFlashS3Client> client_p
{
RUNTIME_CHECK(client_ptr != nullptr);
RUNTIME_CHECK(initialize(), remote_fname);
// TODO Update parameters
prefetch = std::make_unique<PrefetchCache>(
1,
[this](auto && arg1, auto && arg2) {
return readImpl(std::forward<decltype(arg1)>(arg1), std::forward<decltype(arg2)>(arg2));
},
10);
}

std::string S3RandomAccessFile::getFileName() const
Expand All @@ -61,7 +68,7 @@ ssize_t S3RandomAccessFile::read(char * buf, size_t size)
{
while (true)
{
auto n = readImpl(buf, size);
auto n = prefetch->read(buf, size);
if (unlikely(n < 0 && isRetryableError(errno)))
{
// If it is a retryable error, then initialize again
Expand Down Expand Up @@ -147,7 +154,9 @@ off_t S3RandomAccessFile::seekImpl(off_t offset_, int whence)
}
Stopwatch sw;
auto & istr = read_result.GetBody();
if (!istr.ignore(offset_ - cur_offset))
auto ignore_count = offset_ - cur_offset;
auto direct_ignore_count = prefetch->skip(ignore_count);
if (!istr.ignore(direct_ignore_count))
{
LOG_ERROR(log, "Cannot ignore from istream, errmsg={}, cost={}ns", strerror(errno), sw.elapsed());
return -1;
Expand Down Expand Up @@ -178,6 +187,12 @@ bool S3RandomAccessFile::initialize()
{
Stopwatch sw;
bool request_succ = false;
if (prefetch != nullptr) {
auto to_revert = prefetch->getRevertCount();
LOG_INFO(log, "S3 revert cache {}", to_revert);
cur_offset -= to_revert;
}
prefetch = std::make_unique<PrefetchCache>(10, std::bind(&S3RandomAccessFile::readImpl, this, std::placeholders::_1, std::placeholders::_2), 1024 * 1024);
Aws::S3::Model::GetObjectRequest req;
req.SetRange(readRangeOfObject());
client_ptr->setBucketAndKeyWithRoot(req, remote_fname);
Expand Down
127 changes: 127 additions & 0 deletions dbms/src/Storages/S3/S3RandomAccessFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,131 @@ extern const int NOT_IMPLEMENTED;

namespace DB::S3
{
struct PrefetchCache
{
using ReadFunc = std::function<ssize_t(char *, size_t)>;

PrefetchCache(UInt32 hit_limit_, ReadFunc read_func_, size_t buffer_size_)
: hit_limit(hit_limit_)
, hit_count(0)
, read_func(read_func_)
, buffer_size(buffer_size_)
{
pos = buffer_size;
}

// - If the read is filled entirely by cache, then we will return the "gcount" of the cache.
// - If the read is filled by both the cache and `read_func`,
// + If the read_func returns a positive number, we will add the contribution of the cache, and then return.
// + Otherwise, we will return what `read)func` returns.
ssize_t read(char * buf, size_t size)
{
if (hit_count++ < hit_limit)
{
// Do not use the cache.
return read_func(buf, size);
}
maybePrefetch();
if (pos + size > buffer_limit)
{
// No enough data in cache.
::memcpy(buf, write_buffer.data() + pos, buffer_limit);
auto read_from_cache = buffer_limit - pos;
cache_read += read_from_cache;
pos = buffer_limit;
auto expected_direct_read_bytes = size - read_from_cache;
auto res = read_func(buf + read_from_cache, expected_direct_read_bytes);
if (res < 0)
return res;
direct_read += res;
// We may not read `size` data.
return res + read_from_cache;
}
else
{
::memcpy(buf, write_buffer.data() + pos, size);
cache_read += size;
pos += size;
return size;
}
}

size_t skip(size_t ignore_count)
{
if (hit_count++ < hit_limit)
{
return ignore_count;
}
maybePrefetch();
if (pos + ignore_count > buffer_limit)
{
// No enough data in cache.
auto read_from_cache = buffer_limit - pos;
pos = buffer_limit;
auto expected_direct_read_bytes = ignore_count - read_from_cache;
return expected_direct_read_bytes;
}
else
{
pos += ignore_count;
return 0;
}
}

enum class PrefetchRes
{
NeedNot,
Ok,
};

PrefetchRes maybePrefetch()
{
if (eof)
{
return PrefetchRes::NeedNot;
}
if (pos >= buffer_size)
{
write_buffer.reserve(buffer_size);
// TODO Check if it is OK to read when the rest of the chars are less than size.
auto res = read_func(write_buffer.data(), buffer_size);
if (res < 0)
{
// Error state.
eof = true;
}
else
{
// If we actually got some data.
pos = 0;
buffer_limit = res;
}
}
return PrefetchRes::NeedNot;
}

size_t getCacheRead() const { return cache_read; }
size_t getDirectRead() const { return direct_read; }
size_t getRevertCount() const {
if (hit_count < hit_limit) return 0;
return buffer_limit;
}
bool needsRefill() const { return pos >= buffer_limit; }

private:
UInt32 hit_limit;
std::atomic<UInt32> hit_count;
bool eof = false;
ReadFunc read_func;
// Equal to size of `write_buffer`.
size_t buffer_size;
size_t pos;
// How many data is actually in the buffer.
size_t buffer_limit;
std::vector<char> write_buffer;
size_t direct_read = 0;
size_t cache_read = 0;
};
class S3RandomAccessFile final : public RandomAccessFile
{
public:
Expand Down Expand Up @@ -100,6 +225,8 @@ class S3RandomAccessFile final : public RandomAccessFile

Int32 cur_retry = 0;
static constexpr Int32 max_retry = 3;

std::unique_ptr<PrefetchCache> prefetch;
};

} // namespace DB::S3
9 changes: 9 additions & 0 deletions dbms/src/Storages/S3/tests/gtest_s3file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ try
std::iota(expected.begin(), expected.end(), 1);
ASSERT_EQ(tmp_buf, expected);
}
{
std::vector<char> tmp_buf(10);
auto n = file.read(tmp_buf.data(), tmp_buf.size());
ASSERT_EQ(n, tmp_buf.size());

std::vector<char> expected(256);
std::iota(expected.begin(), expected.end(), 11);
ASSERT_EQ(tmp_buf, expected);
}
}
CATCH

Expand Down