Skip to content
This repository has been archived by the owner on Mar 3, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into imlena/timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
incubos authored Feb 29, 2024
2 parents 848f15d + 913870a commit ffc3b79
Show file tree
Hide file tree
Showing 219 changed files with 14,241 additions and 3,823 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
217 changes: 42 additions & 175 deletions src/main/java/ru/vk/itmo/abramovilya/DaoImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ru.vk.itmo.abramovilya;

import ru.vk.itmo.BaseEntry;
import ru.vk.itmo.Config;
import ru.vk.itmo.Dao;
import ru.vk.itmo.Entry;
Expand All @@ -9,66 +8,23 @@
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class DaoImpl implements Dao<MemorySegment, Entry<MemorySegment>> {
private final ConcurrentNavigableMap<MemorySegment, Entry<MemorySegment>> map =
new ConcurrentSkipListMap<>(DaoImpl::compareMemorySegments);
private final Path storagePath;
private final Arena arena = Arena.ofShared();
private static final String SSTABLE_BASE_NAME = "storage";
private static final String INDEX_BASE_NAME = "table";
private final Path metaFilePath;
private final List<FileChannel> sstableFileChannels = new ArrayList<>();
private final List<MemorySegment> sstableMappedList = new ArrayList<>();
private final List<FileChannel> indexFileChannels = new ArrayList<>();
private final List<MemorySegment> indexMappedList = new ArrayList<>();
private final Storage storage;

public DaoImpl(Config config) throws IOException {
storagePath = config.basePath();

Files.createDirectories(storagePath);
metaFilePath = storagePath.resolve("meta");
if (!Files.exists(metaFilePath)) {
Files.createFile(metaFilePath);
Files.writeString(metaFilePath, "0", StandardOpenOption.WRITE);
}

int totalSSTables = Integer.parseInt(Files.readString(metaFilePath));
for (int sstableNum = 0; sstableNum < totalSSTables; sstableNum++) {
Path sstablePath = storagePath.resolve(SSTABLE_BASE_NAME + sstableNum);
Path indexPath = storagePath.resolve(INDEX_BASE_NAME + sstableNum);

FileChannel sstableFileChannel = FileChannel.open(sstablePath, StandardOpenOption.READ);
sstableFileChannels.add(sstableFileChannel);
MemorySegment sstableMapped =
sstableFileChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(sstablePath), arena);
sstableMappedList.add(sstableMapped);

FileChannel indexFileChannel = FileChannel.open(indexPath, StandardOpenOption.READ);
indexFileChannels.add(indexFileChannel);
MemorySegment indexMapped =
indexFileChannel.map(FileChannel.MapMode.READ_ONLY, 0, Files.size(indexPath), arena);
indexMappedList.add(indexMapped);
}
this.storage = new Storage(config, arena);
}

@Override
public Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to) {
return new DaoIterator(getTotalSStables(), from, to, sstableMappedList, indexMappedList, map);
}

@Override
public void upsert(Entry<MemorySegment> entry) {
map.put(entry.key(), entry);
return new DaoIterator(storage.getTotalSStables(), from, to, storage, map);
}

@Override
Expand All @@ -80,147 +36,47 @@ public Entry<MemorySegment> get(MemorySegment key) {
}
return null;
}

int totalSStables = getTotalSStables();
for (int sstableNum = totalSStables; sstableNum >= 0; sstableNum--) {
var foundEntry = seekForValueInFile(key, sstableNum);
if (foundEntry != null) {
if (foundEntry.value() != null) {
return foundEntry;
}
return null;
}
}
return null;
return storage.get(key);
}

private Entry<MemorySegment> seekForValueInFile(MemorySegment key, int sstableNum) {
if (sstableNum >= sstableFileChannels.size()) {
return null;
}

MemorySegment storageMapped = sstableMappedList.get(sstableNum);
MemorySegment indexMapped = indexMappedList.get(sstableNum);

int foundIndex = upperBound(key, storageMapped, indexMapped, indexMapped.byteSize());
long keyStorageOffset = getKeyStorageOffset(indexMapped, foundIndex);
long foundKeySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, keyStorageOffset);
keyStorageOffset += Long.BYTES;

if (MemorySegment.mismatch(key,
0,
key.byteSize(),
storageMapped,
keyStorageOffset,
keyStorageOffset + foundKeySize) == -1) {
return getEntryFromIndexFile(storageMapped, indexMapped, foundIndex);
}
return null;
}

static int upperBound(MemorySegment key, MemorySegment storageMapped, MemorySegment indexMapped, long indexSize) {
int l = -1;
int r = indexMapped.get(ValueLayout.JAVA_INT_UNALIGNED, indexSize - Long.BYTES - Integer.BYTES);

while (r - l > 1) {
int m = (r + l) / 2;
long keyStorageOffset = getKeyStorageOffset(indexMapped, m);
long keySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, keyStorageOffset);
keyStorageOffset += Long.BYTES;

if (compareMemorySegmentsUsingOffset(key, storageMapped, keyStorageOffset, keySize) > 0) {
l = m;
} else {
r = m;
}
}
return r;
}

static long getKeyStorageOffset(MemorySegment indexMapped, int entryNum) {
return indexMapped.get(
ValueLayout.JAVA_LONG_UNALIGNED,
(long) (Integer.BYTES + Long.BYTES) * entryNum + Integer.BYTES
);
@Override
public void upsert(Entry<MemorySegment> entry) {
map.put(entry.key(), entry);
}

private Entry<MemorySegment> getEntryFromIndexFile(MemorySegment storageMapped,
MemorySegment indexMapped,
int entryNum) {
long offsetInStorageFile = indexMapped.get(
ValueLayout.JAVA_LONG_UNALIGNED,
(long) (Integer.BYTES + Long.BYTES) * entryNum + Integer.BYTES
);

long keySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, offsetInStorageFile);
offsetInStorageFile += Long.BYTES;
offsetInStorageFile += keySize;

long valueSize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, offsetInStorageFile);
offsetInStorageFile += Long.BYTES;
MemorySegment key = storageMapped.asSlice(offsetInStorageFile - keySize - Long.BYTES, keySize);
MemorySegment value;
if (valueSize == -1) {
value = null;
} else {
value = storageMapped.asSlice(offsetInStorageFile, valueSize);
@Override
public void compact() throws IOException {
var iterator = get(null, null);
if (!iterator.hasNext()) {
return;
}
return new BaseEntry<>(key, value);
storage.compact(iterator, get(null, null));
map.clear();
}

@Override
public void flush() throws IOException {
writeMapIntoFile();
if (!map.isEmpty()) incTotalSStablesAmount();
}

private void incTotalSStablesAmount() throws IOException {
int totalSStables = getTotalSStables();
Files.writeString(metaFilePath, String.valueOf(totalSStables + 1));
}

@Override
public void close() throws IOException {
if (arena.scope().isAlive()) {
arena.close();
}
flush();
for (FileChannel fc : sstableFileChannels) {
if (fc.isOpen()) fc.close();
}
for (FileChannel fc : indexFileChannels) {
if (fc.isOpen()) fc.close();
if (!map.isEmpty()) {
writeMapIntoFile();
storage.incTotalSStablesAmount();
}
}

private void writeMapIntoFile() throws IOException {
if (map.isEmpty()) {
return;
}

int currSStableNum = getTotalSStables();
Path sstablePath = storagePath.resolve(SSTABLE_BASE_NAME + currSStableNum);
Path indexPath = storagePath.resolve(INDEX_BASE_NAME + currSStableNum);

StorageWriter.writeSStableAndIndex(sstablePath,
calcMapByteSizeInFile(),
indexPath,
calcIndexByteSizeInFile(),
map);
}

private int getTotalSStables() {
return sstableFileChannels.size();
}

private long calcIndexByteSizeInFile() {
return (long) map.size() * (Integer.BYTES + Long.BYTES);
storage.writeMapIntoFile(
mapByteSizeInFile(),
indexByteSizeInFile(),
map
);
}

private long calcMapByteSizeInFile() {
private long mapByteSizeInFile() {
long size = 0;
for (var entry : map.values()) {
size += 2 * Long.BYTES;
size += Storage.BYTES_TO_STORE_ENTRY_SIZE;
size += entry.key().byteSize();
if (entry.value() != null) {
size += entry.value().byteSize();
Expand All @@ -229,17 +85,29 @@ private long calcMapByteSizeInFile() {
return size;
}

private long indexByteSizeInFile() {
return (long) map.size() * Storage.INDEX_ENTRY_SIZE;
}

@Override
public void close() throws IOException {
if (arena.scope().isAlive()) {
arena.close();
}
flush();
storage.close();
}

public static int compareMemorySegments(MemorySegment segment1, MemorySegment segment2) {
long mismatch = segment1.mismatch(segment2);
if (mismatch == -1) {
long offset = segment1.mismatch(segment2);
if (offset == -1) {
return 0;
} else if (mismatch == segment1.byteSize()) {
} else if (offset == segment1.byteSize()) {
return -1;
} else if (mismatch == segment2.byteSize()) {
} else if (offset == segment2.byteSize()) {
return 1;
}
return Byte.compare(segment1.get(ValueLayout.JAVA_BYTE, mismatch),
segment2.get(ValueLayout.JAVA_BYTE, mismatch));
return Byte.compare(segment1.get(ValueLayout.JAVA_BYTE, offset), segment2.get(ValueLayout.JAVA_BYTE, offset));
}

public static int compareMemorySegmentsUsingOffset(MemorySegment segment1,
Expand All @@ -261,6 +129,5 @@ public static int compareMemorySegmentsUsingOffset(MemorySegment segment1,
}
return Byte.compare(segment1.get(ValueLayout.JAVA_BYTE, mismatch),
segment2.get(ValueLayout.JAVA_BYTE, segment2Offset + mismatch));

}
}
46 changes: 8 additions & 38 deletions src/main/java/ru/vk/itmo/abramovilya/DaoIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import ru.vk.itmo.abramovilya.table.TableEntry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
Expand All @@ -18,19 +16,17 @@ class DaoIterator implements Iterator<Entry<MemorySegment>> {
private final PriorityQueue<TableEntry> priorityQueue = new PriorityQueue<>();
private final MemorySegment from;
private final MemorySegment to;
private final List<MemorySegment> sstableMappedList;
private final List<MemorySegment> indexMappedList;
private final Storage storage;

DaoIterator(int totalSStables,
MemorySegment from,
MemorySegment to,
List<MemorySegment> sstableMappedList,
List<MemorySegment> indexMappedList,
Storage storage,
NavigableMap<MemorySegment, Entry<MemorySegment>> memTable) {

this.from = from;
this.to = to;
this.sstableMappedList = sstableMappedList;
this.indexMappedList = indexMappedList;
this.storage = storage;

NavigableMap<MemorySegment, Entry<MemorySegment>> subMap = getSubMap(memTable);
for (int i = 0; i < totalSStables; i++) {
Expand All @@ -39,8 +35,8 @@ class DaoIterator implements Iterator<Entry<MemorySegment>> {
priorityQueue.add(new SSTable(
i,
offset,
sstableMappedList.get(i),
indexMappedList.get(i)
storage.mappedSStable(i),
storage.mappedIndex(i)
).currentEntry());
}
}
Expand Down Expand Up @@ -115,33 +111,7 @@ private void cleanUpSStableQueue() {
}
}

private long findOffsetInIndex(MemorySegment from, MemorySegment to, int i) {
long readOffset = 0;
MemorySegment storageMapped = sstableMappedList.get(i);
MemorySegment indexMapped = indexMappedList.get(i);

if (from == null && to == null) {
return Integer.BYTES;
} else if (from == null) {
long firstKeySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, readOffset);
readOffset += Long.BYTES;
MemorySegment firstKey = storageMapped.asSlice(readOffset, firstKeySize);
if (DaoImpl.compareMemorySegments(firstKey, to) >= 0) {
return -1;
}
return Integer.BYTES;
} else {
int foundIndex = DaoImpl.upperBound(from, storageMapped, indexMapped, indexMapped.byteSize());
long keyStorageOffset = DaoImpl.getKeyStorageOffset(indexMapped, foundIndex);
long keySize = storageMapped.get(ValueLayout.JAVA_LONG_UNALIGNED, keyStorageOffset);
keyStorageOffset += Long.BYTES;

if (DaoImpl.compareMemorySegmentsUsingOffset(from, storageMapped, keyStorageOffset, keySize) > 0
|| (to != null && DaoImpl.compareMemorySegmentsUsingOffset(
to, storageMapped, keyStorageOffset, keySize) <= 0)) {
return -1;
}
return (long) foundIndex * (Integer.BYTES + Long.BYTES) + Integer.BYTES;
}
private long findOffsetInIndex(MemorySegment from, MemorySegment to, int fileNum) {
return storage.findOffsetInIndex(from, to, fileNum);
}
}
Loading

0 comments on commit ffc3b79

Please sign in to comment.