Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into hw5
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/ru/vk/itmo/abramovilya/DaoImpl.java
#	src/main/java/ru/vk/itmo/abramovilya/DaoIterator.java
#	src/main/java/ru/vk/itmo/abramovilya/Storage.java
#	src/main/java/ru/vk/itmo/abramovilya/StorageFileWriter.java
#	src/main/java/ru/vk/itmo/test/abramovilya/DaoFactoryImpl.java
  • Loading branch information
IlyaAbramovv committed Dec 11, 2023
2 parents a35d9ff + f402b3f commit c2acea0
Show file tree
Hide file tree
Showing 106 changed files with 7,393 additions and 1,622 deletions.
81 changes: 81 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,84 @@ NB! Под "не блокирует" ниже понимается отсутс
### Report
Когда всё будет готово, присылайте pull request в ветку `main` со своей реализацией на review.
Не забывайте подтягивать **новые тесты и изменения**, **отвечать на комментарии в PR** и **исправлять замечания**!

## Бонусные задания (deadline 2023-12-06 23:59:59 MSK)

После реализации **всех** предыдущих этапов, необходимо выбрать одну из **незанятых** фич, описанных ниже, и предварительно обсудить с преподавателем предполагаемый способ реализации.

При реализации фичи допускается изменение API `Dao`.

Добавление тестов, демонстрирующих работоспособность реализации, является **обязательным**.

### Feedback

Развёрнутая **конструктивная** обратная связь по курсу: достоинства и недостатки курса, сложность тем, предложения по улучшению.

### Autocompact

Регулярный автоматический фоновый compaction.

### Durability (WAL)

Гарантирует durability (отсутствие потерь подтверджённых записей/удалений) даже в случае "падения" процесса за счёт того, что операции модификации подтверждаются только после того, как попадут в write-ahead-log.
При инициализации стораджа обнаруженные записи WAL "проигрываются" перед началом обслуживания новых операций.
Устаревшие WAL должны ротироваться, чтобы не занимать лишнее место на диске.

Существуют разные подходы к реализации `sync()` на диск.

### Reverse Iterator

Текущий интерфейс `DAO` позволяет итерироваться по данным только в лексикографическом порядке.
Требуется реализовать возможность корректной итерации в обратном порядке, т.е. добавить метод `descendingGet(from, to)`.

### Expiration

Операция `upsert()` должна поддерживать опциональный параметр Time-To-Live (TTL) или время, после которого ячейка должна "пропасть".

"Протухшие" ячейки не должны отдаваться клиентам и должны вычищаться при compaction.

### Compression

Необходимо реализовать блочную компрессию в файлах на диске (используя готовые реализации LZ4, Snappy или zstd) и не забыть про compaction.

### Streaming

Необходимо реализовать механизм для записи и чтения значений **больше** чем Java Heap, например, принимая `InputStream` и выдавая `OutputStream` в качестве значения.

### Atomic Batches

Необходимо реализовать возможность атомарного применения набора модификаций (upsert или remove).

Например, можно принимать от клиента список модификаций, писать их в сериализованном виде в **отдельную таблицу** и удалять после успешного применения.
При инициализации стораджа должны "проигрываться" недоприменённые батчи.

### Transactions

Необходимо обеспечить возможность транзакционного выполнения набора любых операций (upsert/remove/get).
При возникновении конфликта (любой другой транзакции, работающей с теми же ключами несовместимым способом, т.е. не read/read конфликта) клиент должен получать `ConcurrentModificationException`.
Пример реализации -- [NewSQL](https://habr.com/ru/company/odnoklassniki/blog/417593/).

### Bloom Filters

Для каждой таблицы на диске необходимо поддерживать Bloom Filter для содержащихся в ней ключей, чтобы пропускать таблицы, гарантированно не содержащие запрашиваемых ключей.

Очевидно, что это будет работать только в случае "точечных", а не range-запросов.

### Column Families

Поддержка независимых таблиц/keyspace/database/namespace/whatever.

### Snapshots

Получение слепка БД на текущий момент времени с возможностью чтения из него вне зависимости от "развития" основной БД.

Здесь могут помочь hard links.

### Custom Comparators

Возможность указания клиентом пользовательского `Comparator` вместо лексикографического.

### Real 64-bit

* Поддержка файлов больше 2ГБ
* Модульные тесты для ключей/значений больше 2ГБ
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/ru/vk/itmo/abramovilya/DaoImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class DaoImpl implements Dao<MemorySegment, Entry<MemorySegment>> {

public DaoImpl(Config config) throws IOException {
flushThresholdBytes = config.flushThresholdBytes();
storage = new Storage(config);
this.storage = new Storage(config);
}

Iterator<Entry<MemorySegment>> firstNsstablesIterator(int n) {
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/ru/vk/itmo/abramovilya/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Storage implements Closeable {
static final String COMPACTING_SUFFIX = "_compacting";
static final String SSTABLE_BASE_NAME = "storage";
static final String INDEX_BASE_NAME = "index";
private static final int INDEX_ENTRY_SIZE = Integer.BYTES + Long.BYTES;
final Path storagePath;
final Path metaFilePath;
final Path compactedTablesAmountFilePath;
Expand All @@ -45,7 +46,9 @@ class Storage implements Closeable {
metaFilePath = storagePath.resolve("meta");
if (!Files.exists(metaFilePath)) {
Files.createFile(metaFilePath);
Files.writeString(metaFilePath, "0", StandardOpenOption.WRITE);

int totalSStables = 0;
Files.writeString(metaFilePath, String.valueOf(totalSStables), StandardOpenOption.WRITE);
}

// Restore consistent state if db was dropped during compaction
Expand Down Expand Up @@ -128,7 +131,7 @@ private Entry<MemorySegment> getEntryFromIndexFile(MemorySegment sstableMapped,
int entryNum) {
long offsetInStorageFile = indexMapped.get(
ValueLayout.JAVA_LONG_UNALIGNED,
(long) (Integer.BYTES + Long.BYTES) * entryNum + Integer.BYTES
(long) INDEX_ENTRY_SIZE * entryNum + Integer.BYTES
);

long keySize = sstableMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, offsetInStorageFile);
Expand Down Expand Up @@ -265,7 +268,7 @@ long findOffsetInIndex(MemorySegment from, MemorySegment to, int fileNum) {
to, storageMapped, keyStorageOffset, keySize) <= 0)) {
return -1;
}
return (long) foundIndex * (Integer.BYTES + Long.BYTES) + Integer.BYTES;
return (long) foundIndex * INDEX_ENTRY_SIZE + Integer.BYTES;
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/ru/vk/itmo/abramovilya/table/MemTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class MemTable implements Table {
final int number;

public MemTable(NavigableMap<MemorySegment, Entry<MemorySegment>> map, int number) {
iterator = map.values().iterator();
this.iterator = map.values().iterator();
this.number = number;
if (iterator.hasNext()) {
currentEntry = new MemTableEntry(iterator.next(), this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class MemTableEntry implements TableEntry {
private final MemTable memTable;

public MemTableEntry(Entry<MemorySegment> entry, MemTable memTable) {
current = entry;
this.current = entry;
this.memTable = memTable;
}

Expand Down
72 changes: 72 additions & 0 deletions src/main/java/ru/vk/itmo/bazhenovkirill/MemorySegmentUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package ru.vk.itmo.bazhenovkirill;

import ru.vk.itmo.BaseEntry;
import ru.vk.itmo.Entry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;

public final class MemorySegmentUtils {

private MemorySegmentUtils() {

}

public static MemorySegment getSlice(MemorySegment segment, long start, long end) {
return segment.asSlice(start, end - start);
}

public static long startOfKey(MemorySegment segment, long inx) {
return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, inx * 2 * Long.BYTES);
}

public static long endOfKey(MemorySegment segment, long inx) {
return normalize(startOfValue(segment, inx));
}

public static MemorySegment getKey(MemorySegment segment, long inx) {
return getSlice(segment, startOfKey(segment, inx), endOfKey(segment, inx));
}

public static long startOfValue(MemorySegment segment, long inx) {
return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, Long.BYTES + inx * 2 * Long.BYTES);
}

public static long endOfValue(MemorySegment segment, long inx) {
if (inx < recordsCount(segment) - 1) {
return startOfKey(segment, inx + 1);
}
return segment.byteSize();
}

public static Entry<MemorySegment> getEntry(MemorySegment segment, long inx) {
MemorySegment key = getKey(segment, inx);
MemorySegment value = getValue(segment, inx);
return new BaseEntry<>(key, value);
}

public static MemorySegment getValue(MemorySegment segment, long inx) {
long start = startOfValue(segment, inx);
if (start < 0) {
return null;
}
return getSlice(segment, start, endOfValue(segment, inx));
}

public static long tombstone(long offset) {
return 1L << 63 | offset;
}

public static long normalize(long offset) {
return offset & ~(1L << 63);
}

public static long recordsCount(MemorySegment segment) {
return indexSize(segment) / (2 * Long.BYTES);
}

public static long indexSize(MemorySegment segment) {
return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, 0);
}

}
123 changes: 123 additions & 0 deletions src/main/java/ru/vk/itmo/bazhenovkirill/MergeIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package ru.vk.itmo.bazhenovkirill;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;

public class MergeIterator<T> implements Iterator<T> {

private final PriorityQueue<PeekIterator<T>> priorityQueue;

private final Comparator<T> comparator;

private PeekIterator<T> peek;

private static class PeekIterator<T> implements Iterator<T> {

private final Iterator<T> iterator;
private T next;
private final int id;

public PeekIterator(int id, Iterator<T> iterator) {
this.id = id;
this.iterator = iterator;
if (iterator.hasNext()) {
next = iterator.next();
}
}

private T peek() {
return next;
}

@Override
public boolean hasNext() {
return next != null || iterator.hasNext();
}

@Override
public T next() {
T curr = next;
next = iterator.hasNext() ? iterator.next() : null;
return curr;
}
}

public MergeIterator(Collection<Iterator<T>> iterators, Comparator<T> comparator) {
this.comparator = comparator;
Comparator<PeekIterator<T>> peekComp = (o1, o2) -> comparator.compare(o1.peek(), o2.peek());
priorityQueue = new PriorityQueue<>(
iterators.size(),
peekComp.thenComparing(o -> -o.id)
);

int id = 0;
for (Iterator<T> iterator : iterators) {
if (iterator.hasNext()) {
priorityQueue.add(new PeekIterator<>(id++, iterator));
}
}
}

protected boolean skip(T t) {
return t == null;
}

private PeekIterator<T> peek() {
while (peek == null) {
peek = priorityQueue.poll();
if (peek == null) {
return null;
}

PeekIterator<T> next = priorityQueue.peek();
while (next != null && comparator.compare(peek.peek(), next.peek()) == 0) {
PeekIterator<T> poll = priorityQueue.poll();
if (poll != null) {
skipElement(poll);
}
next = priorityQueue.peek();
}

if (!peek.hasNext()) {
peek = null;
continue;
}

if (skip(peek.peek())) {
skipElement(peek);
peek = null;
}
}

return peek;
}

private void skipElement(PeekIterator<T> iterator) {
iterator.next();
if (iterator.hasNext()) {
priorityQueue.add(iterator);
}
}

@Override
public boolean hasNext() {
return peek() != null;
}

@Override
public T next() {
PeekIterator<T> peekIterator = peek();
if (peekIterator == null) {
throw new NoSuchElementException();
}
T next = peek.next();
this.peek = null;
if (peekIterator.hasNext()) {
priorityQueue.add(peekIterator);
}
return next;
}
}
11 changes: 11 additions & 0 deletions src/main/java/ru/vk/itmo/bazhenovkirill/Offset.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package ru.vk.itmo.bazhenovkirill;

public class Offset {
long data;
long index;

Offset(long data, long index) {
this.data = data;
this.index = index;
}
}
Loading

0 comments on commit c2acea0

Please sign in to comment.