Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Коротких Виктор / ИТМО DWS / Stage 6 / Compression #304

Closed
wants to merge 21 commits into from

Conversation

vitekkor
Copy link
Contributor

@vitekkor vitekkor commented Dec 31, 2023

Реализация сжатия данных, хранящихся на диске в sstable.

Записывается 3 файла:

  1. compression info:
1b 1b 4b 4b 4b 4b 4b
isCompressed algorithm blocksCount uncompressedBlockSize block1Offset block2Offset blockNOffset
  1. index:
1b 8b 4b 4b 4b 4b
hasNoTombstones entriesSize key1BlockNumber key1SizeBlockOffset key2BlockNumber key2SizeBlockOffset

keyNBlockNumber - номер блока для начала ключа номер N (key1Size|key1|value1Size|value1)

keyNSizeBlockOffset - смещение начала размера ключа внутри блока

  1. sstableN.db - данные
block1 block2 ... blockN

Запись ведётся с помощью реализации AbstractSSTableWriter:

  • BaseSSTableWriter - запись без сжатия. Вместо блоков записываются обычные данные key1Size|key1|value1Size|value1
  • CompressedSSTableWriter - запись с сжатием. Для сжатия используются реализации интерфейса Compressor

Чтение ведётся с помощью реализации AbstractSSTableReader:

  • BaseSSTableReader - читает sstable, у которых в compressionInfo isCompressed == false (0).
  • CompressedSSTableReader - читает сжатые данные, разжимает их по-блочно в буфер, из которого по смещениям выделяет entity

CompressedSSTableReader также использует буфер для расжатых данных и сохраняет информацию о последнем прочитанном блоке, чтобы если необходимо будет прочитать данные из того же блока, то не приходилось заново читать и разжимать данные. Особенно полезно для итератора.

Чтение данных происходит следующим образом:

  1. Бинарный поиск log(CountOfEntities)
  2. Получили номер блока
  3. Получили смещение начала размера ключа внутри блока
  4. Разжимаем блок (или читаем просто данные из буфера)
  5. Разжимаем блоки и читаем данные пока не получим entity (вычитываем размер ключа, ключ, размер значения, значение)

Также сжатие можно настраивать через конфиг при открытии дао

  • включить/отключить сжатие - при отключении сжатия любые старые данные на диске будут доступны на чтение, новые данные будут записаны без сжатия. При compaction просто решим нужно ли их сжать при записи
  • размер блока сжатия
  • алгоритм

По сути изменение конфига при переоткрытии будет влиять на новые флашнутые файлы и на результат compaction.

Алгоритмы сжатия:

  • LZ4
  • ZSTD

@vitekkor vitekkor changed the title Коротких Виктор / ИТМО DWS / Stage 6 Коротких Виктор / ИТМО DWS / Stage 6 / Compression Dec 31, 2023
@vitekkor
Copy link
Contributor Author

vitekkor commented Jan 9, 2024

@incubos Добрый день! Можете, пожалуйста, посмотреть моё решение 6го этапа, как будет время?

@incubos incubos self-requested a review January 9, 2024 12:05
@incubos incubos self-assigned this Jan 9, 2024
Copy link
Member

@incubos incubos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ревьюил бегло. Есть некоторые замечания. Хороший дизайн и набор тестов, хотя всегда есть, что улучшать. Интересное решение с кеширование разжатого блока. Не хватает бенчмарков: время чтения/флаша с компрессией vs выигрыш по месту на диске -- но там многое зависит от набора данных. Тем не менее, предлагаю на этом остановиться.
50 баллов за бонусную фичу.

@Override
public byte[] compress(byte[] src, int len) throws IOException {
byte[] dst = new byte[len];
long originalSize = Zstd.compressByteArray(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что будет, если не влезет в dst?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По идее в самых вырожденных случаях сжатые данные будут по размеру равны несжатым.
Но вы правы - тут лучше было бы сделать так:

byte[] dst = new byte[(int) Zstd.compressBound(len)];

Тогда мы будем аллоцировать больше памяти, но зато будем уверены, что 100% влезем в массив

if (from != null) {
fromPosition = getEntryOffset(from, SearchOption.GTE);
if (fromPosition == -1) {
return new BaseSSTableIterator(0, -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Можно возвращать single empty iterator и не аллоцировать, верно?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, не учёл этот момент. Но надо тогда создать имплементацию LSMPointerIterator:

public class EmptyLSMPointerIterator extends LSMPointerIterator {
    ...
}

if (to != null) {
toPosition = getEntryOffset(to, SearchOption.LT);
if (toPosition == -1) {
return new BaseSSTableIterator(0, -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Можно возвращать single empty iterator и не аллоцировать, верно?

Comment on lines +46 to +47
long fromPosition = getMinKeySizeOffset();
long toPosition = getMaxKeySizeOffset();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Видимо, эта инициализация должна быть в ветках else ниже? Reassignment подобных переменных усложняет понимание кода.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь подразумевалось, что мы изначально инициализируем эти переменные границами всей sstable, а потом если у нас заданы границы from и to, то переназначаем новыми значениями.
Соглашусь, что наверное через else ветки было бы понятнее

Comment on lines +171 to +173
long keySize = mappedSSTable.get(ValueLayout.JAVA_LONG_UNALIGNED, fromPosition);
long valueOffset = fromPosition + Long.BYTES + keySize;
long valueSize = mappedSSTable.get(ValueLayout.JAVA_LONG_UNALIGNED, valueOffset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Этот код дублируется в нескольких местах. Отрефачить бы...

Comment on lines +369 to +370
// ZSTD specific
if (decompressor instanceof ZstdDecompressor) return uncompressedBlockSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Текут абстракции :/

Comment on lines +405 to +408
for (int i = Long.BYTES - 1; i >= 0; i--) {
value <<= 8;
value |= (bytes[i] & 0xFFL);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не факт, что соптимизируется. Надёжнее по старинке без цикла собрать long из восьми байт.

*/
public void write(
boolean isCompacted,
Supplier<? extends Iterator<? extends Entry<MemorySegment>>> iteratorSupplier,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем здесь Supplier, если get() на нём вызывается ровно один раз?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Остатки старого кода, в котором несколько раз пробегались по итератору

while (entries.hasNext()) {
// Then write the entry
final Entry<MemorySegment> entry = entries.next();
hasNoTombstones = entry.value() != null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Значение может многократно меняться между true и false, верно?

Suggested change
hasNoTombstones = entry.value() != null;
hasNoTombstones &= entry.value() != null;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, тут сам себя запутал. Ваше решение более корректное

@@ -54,7 +54,7 @@ private static String duration(int seconds) {
}

@DaoTest(stage = 3)
void database(Dao<String, Entry<String>> dao) throws Exception {
public void database(Dao<String, Entry<String>> dao) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Зачем это?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тестировал сжатие данных с помощью реализаций существующих тестов, а так как пакеты разные, то пришлось сделать public.

Не смог придумать ничего лучше, чтобы при этом не копировать код тестов. В целом можно было бы попробовать по умолчанию в тестах использовать конфиг с включённым сжатием (затронуло бы только мою реализацию), но тогда есть шанс, что тесты по таймаутам будут падать, потому что сжатие/разжатие будет съедать время (и памяти из-за буфферов будет требоваться больше)

@incubos incubos closed this Jan 11, 2024
@incubos
Copy link
Member

incubos commented Jan 11, 2024

Спасибо за ответы.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants