From 1329a263160bc4bf6dcad470d49630cc86598db4 Mon Sep 17 00:00:00 2001 From: yulalenk Date: Thu, 1 Feb 2024 18:14:55 +0300 Subject: [PATCH] add filter --- .../AbstractMemorySegmentDao.java | 59 -- .../ru/vk/itmo/alenkovayulya/AlenkovaDao.java | 113 ++++ .../ru/vk/itmo/alenkovayulya/DiskStorage.java | 517 ++++++++++++++++++ .../ru/vk/itmo/alenkovayulya/InMemoryDao.java | 14 - .../vk/itmo/alenkovayulya/MergeIterator.java | 146 +++++ .../vk/itmo/alenkovayulya/PersistenceDao.java | 37 -- .../alenkovayulya/PersistentFileHandler.java | 103 ---- .../bloomfilter/BloomFilter.java | 89 +++ .../bloomfilter/HashFunction.java | 137 +++++ .../itmo/alenkovayulya/bloomfilter/Utils.java | 31 ++ ...tory.java => MemorySegmentDaoFactory.java} | 30 +- .../java/ru/vk/itmo/BasicConcurrentTest.java | 2 +- 12 files changed, 1048 insertions(+), 230 deletions(-) delete mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/AbstractMemorySegmentDao.java create mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/AlenkovaDao.java create mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/DiskStorage.java delete mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/InMemoryDao.java create mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/MergeIterator.java delete mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/PersistenceDao.java delete mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/PersistentFileHandler.java create mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/BloomFilter.java create mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/HashFunction.java create mode 100644 src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/Utils.java rename src/main/java/ru/vk/itmo/test/alenkovayulya/{InMemoryDaoFactory.java => MemorySegmentDaoFactory.java} (53%) diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/AbstractMemorySegmentDao.java b/src/main/java/ru/vk/itmo/alenkovayulya/AbstractMemorySegmentDao.java deleted file mode 100644 index 590ac2df1..000000000 --- a/src/main/java/ru/vk/itmo/alenkovayulya/AbstractMemorySegmentDao.java +++ /dev/null @@ -1,59 +0,0 @@ -package ru.vk.itmo.alenkovayulya; - -import ru.vk.itmo.Dao; -import ru.vk.itmo.Entry; - -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.util.Iterator; -import java.util.SortedMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.function.Supplier; - -public abstract class AbstractMemorySegmentDao implements Dao> { - - protected static final SortedMap> entries = - new ConcurrentSkipListMap<>(AbstractMemorySegmentDao::compareSegments); - - @Override - public abstract Entry get(MemorySegment key); - - @Override - public Iterator> get(MemorySegment from, MemorySegment to) { - return getIterator(() -> { - if (from == null && to == null) { - return entries; - } else if (from == null) { - return entries.headMap(to); - } else if (to == null) { - return entries.tailMap(from); - } - return entries.subMap(from, to); - }); - - } - - @Override - public void upsert(Entry entry) { - entries.put(entry.key(), entry); - - } - - public static int compareSegments(MemorySegment memorySegment1, MemorySegment memorySegment2) { - long mismatchOffset = memorySegment1.mismatch(memorySegment2); - if (mismatchOffset == -1) { - return 0; - } else if (mismatchOffset == memorySegment1.byteSize()) { - return -1; - } else if (mismatchOffset == memorySegment2.byteSize()) { - return 1; - } - return memorySegment1.get(ValueLayout.JAVA_BYTE, mismatchOffset) - - memorySegment2.get(ValueLayout.JAVA_BYTE, mismatchOffset); - } - - private Iterator> getIterator(Supplier>> map) { - return map.get().values().iterator(); - } - -} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/AlenkovaDao.java b/src/main/java/ru/vk/itmo/alenkovayulya/AlenkovaDao.java new file mode 100644 index 000000000..685f9b209 --- /dev/null +++ b/src/main/java/ru/vk/itmo/alenkovayulya/AlenkovaDao.java @@ -0,0 +1,113 @@ +package ru.vk.itmo.alenkovayulya; + +import ru.vk.itmo.Config; +import ru.vk.itmo.Dao; +import ru.vk.itmo.Entry; + +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListMap; + +public class AlenkovaDao implements Dao> { + + private final Comparator comparator = AlenkovaDao::compare; + private final NavigableMap> storage = new ConcurrentSkipListMap<>(comparator); + private final Arena arena; + private final DiskStorage diskStorage; + private final Path path; + + public AlenkovaDao(Config config) throws IOException { + this.path = config.basePath().resolve("data"); + Files.createDirectories(path); + + arena = Arena.ofShared(); + + this.diskStorage = new DiskStorage(DiskStorage.loadOrRecover(path, arena)); + } + + static int compare(MemorySegment memorySegment1, MemorySegment memorySegment2) { + long mismatch = memorySegment1.mismatch(memorySegment2); + if (mismatch == -1) { + return 0; + } + + if (mismatch == memorySegment1.byteSize()) { + return -1; + } + + if (mismatch == memorySegment2.byteSize()) { + return 1; + } + byte b1 = memorySegment1.get(ValueLayout.JAVA_BYTE, mismatch); + byte b2 = memorySegment2.get(ValueLayout.JAVA_BYTE, mismatch); + return Byte.compare(b1, b2); + } + + @Override + public Iterator> get(MemorySegment from, MemorySegment to) { + return diskStorage.range(getInMemory(from, to), from, to); + } + + private Iterator> getInMemory(MemorySegment from, MemorySegment to) { + if (from == null && to == null) { + return storage.values().iterator(); + } + if (from == null) { + return storage.headMap(to).values().iterator(); + } + if (to == null) { + return storage.tailMap(from).values().iterator(); + } + return storage.subMap(from, to).values().iterator(); + } + + @Override + public void upsert(Entry entry) { + storage.put(entry.key(), entry); + } + + @Override + public Entry get(MemorySegment key) { + Entry entry = storage.get(key); + if (entry != null) { + if (entry.value() == null) { + return null; + } + return entry; + } + + Iterator> iterator = diskStorage.rangeWithBloorFilter(Collections.emptyIterator(), key, null); + + if (!iterator.hasNext()) { + return null; + } + Entry next = iterator.next(); + if (compare(next.key(), key) == 0) { + return next; + } + return null; + } + + @Override + public void compact() throws IOException { + DiskStorage.compact(path, this::all); + } + + @Override + public void close() throws IOException { + if (!arena.scope().isAlive()) { + return; + } + + arena.close(); + + if (!storage.isEmpty()) { + DiskStorage.saveNextSSTable(path, storage.values()); + } + } +} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/DiskStorage.java b/src/main/java/ru/vk/itmo/alenkovayulya/DiskStorage.java new file mode 100644 index 000000000..a42cacbd8 --- /dev/null +++ b/src/main/java/ru/vk/itmo/alenkovayulya/DiskStorage.java @@ -0,0 +1,517 @@ +package ru.vk.itmo.alenkovayulya; + +import ru.vk.itmo.BaseEntry; +import ru.vk.itmo.Entry; +import ru.vk.itmo.alenkovayulya.bloomfilter.BloomFilter; +import ru.vk.itmo.alenkovayulya.bloomfilter.Utils; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.*; +import java.util.*; +import java.util.stream.Stream; + +public class DiskStorage { + + public static final String SSTABLE_PREFIX = "sstable_"; + private final List segmentList; + + public DiskStorage(List segmentList) { + this.segmentList = segmentList; + } + + public Iterator> range( + Iterator> firstIterator, + MemorySegment from, + MemorySegment to) { + List>> iterators = new ArrayList<>(segmentList.size() + 1); + + for (MemorySegment memorySegment : segmentList) { + iterators.add(iterator(memorySegment, from, to)); + } + iterators.add(firstIterator); + + return new MergeIterator<>(iterators, Comparator.comparing(Entry::key, AlenkovaDao::compare)) { + @Override + protected boolean shouldSkip(Entry memorySegmentEntry) { + return memorySegmentEntry.value() == null; + } + }; + } + + public Iterator> rangeWithBloorFilter( + Iterator> firstIterator, + MemorySegment from, + MemorySegment to) { + List>> iterators = new ArrayList<>(segmentList.size() + 1); + + for (MemorySegment memorySegment : segmentList) { + if (checkBloom(memorySegment, from)) { + iterators.add(iterator(memorySegment, from, to)); + } + } + iterators.add(firstIterator); + + return new MergeIterator<>(iterators, Comparator.comparing(Entry::key, AlenkovaDao::compare)) { + @Override + protected boolean shouldSkip(Entry memorySegmentEntry) { + return memorySegmentEntry.value() == null; + } + }; + } + + public static void saveNextSSTable(Path storagePath, Iterable> iterable) + throws IOException { + final Path indexTmp = storagePath.resolve("index.tmp"); + final Path indexFile = storagePath.resolve("index.idx"); + + try { + Files.createFile(indexFile); + } catch (FileAlreadyExistsException ignored) { + // it is ok, actually it is normal state + } + List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); + + String newFileName = SSTABLE_PREFIX + existedFiles.size(); + + long dataSize = 0; + long count = 0; + for (Entry entry : iterable) { + dataSize += entry.key().byteSize(); + MemorySegment value = entry.value(); + if (value != null) { + dataSize += value.byteSize(); + } + count++; + } + + if (count == 0) { + return; + } + + // According to our simplified implementation of DB, + // I rely on the fact that 'count' is always less than Integer.MAX_VALUE + BloomFilter bloom = BloomFilter.createBloom((int) count); + + for (Entry entry : iterable) { + bloom.add(entry.key()); + } + + // 8 bytes for size + actual filter_size + long bloomSize = (long) bloom.getFilterSize() * Long.BYTES + Long.BYTES; + + long indexSize = count * 2 * Long.BYTES; + + try ( + FileChannel fileChannel = FileChannel.open( + storagePath.resolve(newFileName), + StandardOpenOption.WRITE, + StandardOpenOption.READ, + StandardOpenOption.CREATE + ); + Arena writeArena = Arena.ofConfined() + ) { + MemorySegment fileSegment = fileChannel.map( + FileChannel.MapMode.READ_WRITE, + 0, + bloomSize + indexSize + dataSize, + writeArena + ); + + // |bloom_size|long_value1|long_value2|long_value3|... + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, 0, bloomSize); + long bloomOffset = Long.BYTES; + for (Long hash : bloom.getFilter().getLongs()) { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, bloomOffset, hash); + bloomOffset += Long.BYTES; + } + + // index: + // |key0_Start|value0_Start|key1_Start|value1_Start|key2_Start|value2_Start|... + // key0_Start = data start = end of index + long dataOffset = bloomOffset + indexSize; + long indexOffset = bloomOffset; + for (Entry entry : iterable) { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); + dataOffset += entry.key().byteSize(); + indexOffset += Long.BYTES; + + MemorySegment value = entry.value(); + if (value == null) { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, tombstone(dataOffset)); + } else { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); + dataOffset += value.byteSize(); + } + indexOffset += Long.BYTES; + } + + // data: + // |key0|value0|key1|value1|... + dataOffset = bloomOffset + indexSize; + for (Entry entry : iterable) { + MemorySegment key = entry.key(); + MemorySegment.copy(key, 0, fileSegment, dataOffset, key.byteSize()); + dataOffset += key.byteSize(); + + MemorySegment value = entry.value(); + if (value != null) { + MemorySegment.copy(value, 0, fileSegment, dataOffset, value.byteSize()); + dataOffset += value.byteSize(); + } + } + } + + List list = new ArrayList<>(existedFiles.size() + 1); + list.addAll(existedFiles); + list.add(newFileName); + Files.write( + indexTmp, + list, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING + ); + + Files.deleteIfExists(indexFile); + + Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE); + } + + public static void compact(Path storagePath, Iterable> iterable) + throws IOException { + + String newFileName = "compaction.tmp"; + Path compactionTmpFile = storagePath.resolve(newFileName); + + + long dataSize = 0; + long count = 0; + for (Entry entry : iterable) { + dataSize += entry.key().byteSize(); + MemorySegment value = entry.value(); + if (value != null) { + dataSize += value.byteSize(); + } + count++; + } + + if (count == 0) { + return; + } + + BloomFilter bloom = BloomFilter.createBloom((int) count); + + for (Entry entry : iterable) { + bloom.add(entry.key()); + } + + // 8 bytes for size + actual filter_size + long bloomSize = (long) bloom.getFilterSize() * Long.BYTES + Long.BYTES; + long indexSize = count * 2 * Long.BYTES; + + try ( + FileChannel fileChannel = FileChannel.open( + compactionTmpFile, + StandardOpenOption.WRITE, + StandardOpenOption.READ, + StandardOpenOption.CREATE + ); + Arena writeArena = Arena.ofConfined() + ) { + MemorySegment fileSegment = fileChannel.map( + FileChannel.MapMode.READ_WRITE, + 0, + bloomSize + indexSize + dataSize, + writeArena + ); + + // |bloom_size|long_value1|long_value2|long_value3|... + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, 0, bloomSize); + long bloomOffset = Long.BYTES; + for (Long hash : bloom.getFilter().getLongs()) { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, bloomOffset, hash); + bloomOffset += Long.BYTES; + } + + // index: + // |key0_Start|value0_Start|key1_Start|value1_Start|key2_Start|value2_Start|... + // key0_Start = data start = end of index + long dataOffset = bloomOffset + indexSize; + long indexOffset = bloomOffset; + for (Entry entry : iterable) { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); + dataOffset += entry.key().byteSize(); + indexOffset += Long.BYTES; + + MemorySegment value = entry.value(); + if (value == null) { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, tombstone(dataOffset)); + } else { + fileSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, dataOffset); + dataOffset += value.byteSize(); + } + indexOffset += Long.BYTES; + } + + // data: + // |key0|value0|key1|value1|... + dataOffset = bloomSize + indexSize; + for (Entry entry : iterable) { + MemorySegment key = entry.key(); + MemorySegment.copy(key, 0, fileSegment, dataOffset, key.byteSize()); + dataOffset += key.byteSize(); + + MemorySegment value = entry.value(); + if (value != null) { + MemorySegment.copy(value, 0, fileSegment, dataOffset, value.byteSize()); + dataOffset += value.byteSize(); + } + } + } + + Files.move( + compactionTmpFile, + storagePath.resolve("compaction"), + StandardCopyOption.ATOMIC_MOVE, + StandardCopyOption.REPLACE_EXISTING + ); + + finalizeCompaction(storagePath); + } + + private static void finalizeCompaction(Path storagePath) throws IOException { + try (Stream stream = + Files.find( + storagePath, + 1, + (path, ignored) -> path.getFileName().toString().startsWith(SSTABLE_PREFIX))) { + stream.forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + Path indexTmp = storagePath.resolve("index.tmp"); + Path indexFile = storagePath.resolve("index.idx"); + + Files.deleteIfExists(indexFile); + Files.deleteIfExists(indexTmp); + + Path compactionFile = compactionFile(storagePath); + boolean noData = Files.size(compactionFile) == 0; + + Files.write( + indexTmp, + noData ? Collections.emptyList() : Collections.singleton(SSTABLE_PREFIX + "0"), + StandardOpenOption.WRITE, + StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING + ); + + Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE); + if (noData) { + Files.delete(compactionFile); + } else { + Files.move(compactionFile, storagePath.resolve(SSTABLE_PREFIX + "0"), StandardCopyOption.ATOMIC_MOVE); + } + } + + private static Path compactionFile(Path storagePath) { + return storagePath.resolve("compaction"); + } + + public static List loadOrRecover(Path storagePath, Arena arena) throws IOException { + if (Files.exists(compactionFile(storagePath))) { + finalizeCompaction(storagePath); + } + + Path indexTmp = storagePath.resolve("index.tmp"); + Path indexFile = storagePath.resolve("index.idx"); + + if (!Files.exists(indexFile)) { + if (Files.exists(indexTmp)) { + Files.move(indexTmp, indexFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING); + } else { + Files.createFile(indexFile); + } + } + + List existedFiles = Files.readAllLines(indexFile, StandardCharsets.UTF_8); + List result = new ArrayList<>(existedFiles.size()); + for (String fileName : existedFiles) { + Path file = storagePath.resolve(fileName); + try (FileChannel fileChannel = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE)) { + MemorySegment fileSegment = fileChannel.map( + FileChannel.MapMode.READ_WRITE, + 0, + Files.size(file), + arena + ); + result.add(fileSegment); + } + } + + return result; + } + + public static long indexOf(MemorySegment segment, MemorySegment key) { + long bloomSize = bloomSize(segment); + long recordsCount = recordsCount(segment, bloomSize); + + long left = 0; + long right = recordsCount - 1; + while (left <= right) { + long mid = (left + right) >>> 1; + + long startOfKey = startOfKey(segment, mid, bloomSize); + long endOfKey = endOfKey(segment, mid, bloomSize); + long mismatch = MemorySegment.mismatch(segment, startOfKey, endOfKey, key, 0, key.byteSize()); + if (mismatch == -1) { + return mid; + } + + if (mismatch == key.byteSize()) { + right = mid - 1; + continue; + } + + if (mismatch == endOfKey - startOfKey) { + left = mid + 1; + continue; + } + + int b1 = Byte.toUnsignedInt(segment.get(ValueLayout.JAVA_BYTE, startOfKey + mismatch)); + int b2 = Byte.toUnsignedInt(key.get(ValueLayout.JAVA_BYTE, mismatch)); + if (b1 > b2) { + right = mid - 1; + } else { + left = mid + 1; + } + } + + return tombstone(left); + } + + + public static long recordsCount(MemorySegment segment, long bloomSize) { + long indexSize = indexSize(segment, bloomSize); + return indexSize / Long.BYTES / 2; + } + + public static long bloomSize(MemorySegment segment) { + return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, 0); + } + + public static long indexSize(MemorySegment segment, long bloomSize) { + return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, bloomSize) - bloomSize; + } + + + private static Iterator> iterator(MemorySegment page, MemorySegment from, MemorySegment to) { + long recordIndexFrom = from == null ? 0 : normalize(indexOf(page, from)); + long bloomSize = bloomSize(page); + + long recordIndexTo = to == null ? recordsCount(page, bloomSize) : normalize(indexOf(page, to)); + long recordsCount = recordsCount(page, bloomSize); + + return new Iterator<>() { + long index = recordIndexFrom; + + @Override + public boolean hasNext() { + return index < recordIndexTo; + } + + @Override + public Entry next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + MemorySegment key = slice(page, startOfKey(page, index, bloomSize), endOfKey(page, index, bloomSize)); + long startOfValue = startOfValue(page, index, bloomSize); + MemorySegment value = + startOfValue < 0 + ? null + : slice(page, startOfValue, endOfValue(page, index, recordsCount, bloomSize)); + index++; + return new BaseEntry<>(key, value); + } + }; + } + + public static MemorySegment slice(MemorySegment page, long start, long end) { + return page.asSlice(start, end - start); + } + + public static long startOfKey(MemorySegment segment, long recordIndex, long bloomSize) { + return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, bloomSize + recordIndex * 2 * Long.BYTES); + } + + public static long endOfKey(MemorySegment segment, long recordIndex, long bloomSize) { + return normalizedStartOfValue(segment, recordIndex, bloomSize); + } + + public static long normalizedStartOfValue(MemorySegment segment, long recordIndex, long bloomSize) { + return normalize(startOfValue(segment, recordIndex, bloomSize)); + } + + public static long startOfValue(MemorySegment segment, long recordIndex, long bloomSize) { + return segment.get(ValueLayout.JAVA_LONG_UNALIGNED, bloomSize + recordIndex * 2 * Long.BYTES + Long.BYTES); + } + + public static long endOfValue(MemorySegment segment, long recordIndex, long recordsCount, long bloomSize) { + if (recordIndex < recordsCount - 1) { + return startOfKey(segment, recordIndex + 1, bloomSize); + } + return segment.byteSize(); + } + + private static long tombstone(long offset) { + return 1L << 63 | offset; + } + + private static long normalize(long value) { + return value & ~(1L << 63); + } + + public static boolean checkBloom(MemorySegment page, MemorySegment key) { + long bloomSize = bloomSize(page); + int bitsetSize = (int) (bloomSize - Long.BYTES) * 8; + int hashCount = Utils.countOptimalHashFunctions(bitsetSize, (int) recordsCount(page, bloomSize)); + long[] indexes = new long[hashCount]; + Utils.hashKey(key, indexes); + BloomFilter.fillIndexes(indexes[1], indexes[0], hashCount, bitsetSize, indexes); + + return checkIndexes(page, indexes); + } + + public static boolean checkIndexes(MemorySegment page, long[] indexes) { + for (long index : indexes) { + int pageOffset = (int) index >> 6; + if (!checkBit( + page.get(ValueLayout.JAVA_LONG_UNALIGNED, Long.BYTES + pageOffset * Long.BYTES), + index - (pageOffset << 6) + )) { + return false; + } + } + return true; + } + + public static boolean checkBit(long value, long i) { + return (value & (1L << (63 - i))) != 0; + } + + public List getSegmentList() { + return segmentList; + } +} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/InMemoryDao.java b/src/main/java/ru/vk/itmo/alenkovayulya/InMemoryDao.java deleted file mode 100644 index a0c6789e0..000000000 --- a/src/main/java/ru/vk/itmo/alenkovayulya/InMemoryDao.java +++ /dev/null @@ -1,14 +0,0 @@ -package ru.vk.itmo.alenkovayulya; - -import ru.vk.itmo.Entry; - -import java.lang.foreign.MemorySegment; - -public class InMemoryDao extends AbstractMemorySegmentDao { - - @Override - public Entry get(MemorySegment key) { - return entries.get(key); - } - -} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/MergeIterator.java b/src/main/java/ru/vk/itmo/alenkovayulya/MergeIterator.java new file mode 100644 index 000000000..9f26fc860 --- /dev/null +++ b/src/main/java/ru/vk/itmo/alenkovayulya/MergeIterator.java @@ -0,0 +1,146 @@ +package ru.vk.itmo.alenkovayulya; + +import java.util.*; + +public class MergeIterator implements Iterator { + + private final PriorityQueue> priorityQueue; + private final Comparator comparator; + + private static class PeekIterator implements Iterator { + + public final int id; + private final Iterator delegate; + private T peek; + + private PeekIterator(int id, Iterator delegate) { + this.id = id; + this.delegate = delegate; + } + + @Override + public boolean hasNext() { + if (peek == null) { + return delegate.hasNext(); + } + return true; + } + + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + T peek = peek(); + this.peek = null; + return peek; + } + + private T peek() { + if (peek == null) { + if (!delegate.hasNext()) { + return null; + } + peek = delegate.next(); + } + return peek; + } + } + + PeekIterator nextIterator; + + public MergeIterator(Collection> iterators, Comparator comparator) { + this.comparator = comparator; + Comparator> peekComp = (o1, o2) -> comparator.compare(o1.peek(), o2.peek()); + priorityQueue = new PriorityQueue<>( + iterators.size(), + peekComp.thenComparing(o -> -o.id) + ); + + int id = 0; + for (Iterator iterator : iterators) { + if (iterator.hasNext()) { + priorityQueue.add(new PeekIterator<>(id++, iterator)); + } + } + } + + private PeekIterator peek() { + while (nextIterator == null) { + nextIterator = priorityQueue.poll(); + if (nextIterator == null) { + return null; + } + + skipIteratorsWithSameKey(); + + if (nextIterator.peek() == null) { + nextIterator = null; + continue; + } + + if (shouldSkip(nextIterator.peek())) { + moveNextAndPutBack(nextIterator); + nextIterator = null; + } + } + + return nextIterator; + } + + private void skipIteratorsWithSameKey() { + while (true) { + PeekIterator next = priorityQueue.peek(); + if (next == null) { + break; + } + + if (!skipTheSameKey(next)) { + break; + } + } + } + + private boolean skipTheSameKey(PeekIterator next) { + int compare = comparator.compare(nextIterator.peek(), next.peek()); + if (compare != 0) { + return false; + } + + PeekIterator poll = priorityQueue.poll(); + if (poll != null) { + moveNextAndPutBack(poll); + } + return true; + } + + private void moveNextAndPutBack(PeekIterator poll) { + poll.next(); + if (poll.hasNext()) { + priorityQueue.add(poll); + } + } + + protected boolean shouldSkip(T t) { + return false; + } + + @Override + public boolean hasNext() { + return peek() != null; + } + + @Override + public T next() { + PeekIterator nextIterator = peek(); + if (nextIterator == null) { + throw new NoSuchElementException(); + } + T nextValue = nextIterator.next(); + this.nextIterator = null; + if (nextIterator.hasNext()) { + priorityQueue.add(nextIterator); + } + return nextValue; + } +} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/PersistenceDao.java b/src/main/java/ru/vk/itmo/alenkovayulya/PersistenceDao.java deleted file mode 100644 index 12b784792..000000000 --- a/src/main/java/ru/vk/itmo/alenkovayulya/PersistenceDao.java +++ /dev/null @@ -1,37 +0,0 @@ -package ru.vk.itmo.alenkovayulya; - -import ru.vk.itmo.Config; -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.lang.foreign.MemorySegment; - -public class PersistenceDao extends AbstractMemorySegmentDao { - - private final PersistentFileHandler persistentFileHandler; - - public PersistenceDao(Config config) { - - this.persistentFileHandler = new PersistentFileHandler(config); - } - - @Override - public Entry get(MemorySegment key) { - - Entry memoryEntry = entries.get(key); - - if (memoryEntry == null) { - return persistentFileHandler.readByKey(key); - } - - return memoryEntry; - } - - @Override - public void close() throws IOException { - if (!entries.isEmpty()) { - persistentFileHandler.writeToFile(entries.values()); - } - entries.clear(); - } -} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/PersistentFileHandler.java b/src/main/java/ru/vk/itmo/alenkovayulya/PersistentFileHandler.java deleted file mode 100644 index 810363a1a..000000000 --- a/src/main/java/ru/vk/itmo/alenkovayulya/PersistentFileHandler.java +++ /dev/null @@ -1,103 +0,0 @@ -package ru.vk.itmo.alenkovayulya; - -import ru.vk.itmo.BaseEntry; -import ru.vk.itmo.Config; -import ru.vk.itmo.Entry; - -import java.io.IOException; -import java.lang.foreign.Arena; -import java.lang.foreign.MemorySegment; -import java.lang.foreign.ValueLayout; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collection; - -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.READ; -import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; -import static java.nio.file.StandardOpenOption.WRITE; - -public class PersistentFileHandler { - private static final String STORAGE_NAME = "store"; - private static final String STORAGE_INDEXES_NAME = "store_indexes"; - private final Path storagePath; - private final Path storageIndexesPath; - - public PersistentFileHandler(Config config) { - this.storagePath = config.basePath().resolve(Path.of(STORAGE_NAME)); - this.storageIndexesPath = config.basePath().resolve(Path.of(STORAGE_INDEXES_NAME)); - - } - - public Entry readByKey(MemorySegment key) { - try (var storageChannel = FileChannel.open(storagePath, READ); - var indexesChannel = FileChannel.open(storageIndexesPath, READ)) { - var storageSegment = storageChannel.map( - FileChannel.MapMode.READ_ONLY, 0, Files.size(storagePath), Arena.global()); - var indexesSegment = indexesChannel.map( - FileChannel.MapMode.READ_ONLY, 0, Files.size(storageIndexesPath), Arena.global()); - - long i = 0; - var indexesNumber = indexesSegment.byteSize() / Long.BYTES; - - while (i <= indexesNumber) { - var offset = indexesSegment.get(ValueLayout.JAVA_LONG_UNALIGNED, i * Long.BYTES); - var savedKeySize = storageSegment.get(ValueLayout.JAVA_LONG_UNALIGNED, offset); - offset += Long.BYTES; - if (savedKeySize == key.byteSize()) { - var savedKey = storageSegment.asSlice(offset, savedKeySize); - offset += savedKeySize; - if (AbstractMemorySegmentDao.compareSegments(savedKey, key) == 0) { - var valueSize = storageSegment.get(ValueLayout.JAVA_LONG_UNALIGNED, offset); - offset += Long.BYTES; - MemorySegment savedValue = storageSegment.asSlice(offset, valueSize); - return new BaseEntry<>(savedKey, savedValue); - } - } - i++; - } - return null; - - } catch (IOException e) { - return null; - } - } - - public void writeToFile(Collection> entries) throws IOException { - if (entries.isEmpty()) { - return; - } - - long indexesSize = (long) entries.size() * Long.BYTES; - long storageSize = 0; - for (Entry entry : entries) { - storageSize += entry.key().byteSize() + entry.value().byteSize() + 2L * Long.BYTES; - } - - try (var storageChannel = FileChannel.open(storagePath, TRUNCATE_EXISTING, CREATE, WRITE, READ); - var indexesChannel = FileChannel.open(storageIndexesPath, TRUNCATE_EXISTING, CREATE, WRITE, READ)) { - var storageSegment = storageChannel.map( - FileChannel.MapMode.READ_WRITE, 0, storageSize, Arena.global()); - var indexesSegment = indexesChannel.map( - FileChannel.MapMode.READ_WRITE, 0, indexesSize, Arena.global()); - long indexOffset = 0; - long storageRecordOffset = 0; - for (Entry entry : entries) { - indexesSegment.set(ValueLayout.JAVA_LONG_UNALIGNED, indexOffset, storageRecordOffset); - indexOffset += Long.BYTES; - - storageRecordOffset += writeEntity(entry.key(), storageSegment, storageRecordOffset); - storageRecordOffset += writeEntity(entry.value(), storageSegment, storageRecordOffset); - } - } - } - - private long writeEntity(MemorySegment entityToWrite, MemorySegment storage, long storageOffset) { - long entityToWriteSize = entityToWrite.byteSize(); - storage.set(ValueLayout.JAVA_LONG_UNALIGNED, storageOffset, entityToWriteSize); - MemorySegment mappedRec = storage.asSlice(storageOffset + Long.BYTES, entityToWriteSize); - mappedRec.copyFrom(entityToWrite); - return entityToWriteSize + Long.BYTES; - } -} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/BloomFilter.java b/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/BloomFilter.java new file mode 100644 index 000000000..4a715a7f1 --- /dev/null +++ b/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/BloomFilter.java @@ -0,0 +1,89 @@ +package ru.vk.itmo.alenkovayulya.bloomfilter; + +import java.lang.foreign.MemorySegment; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class BloomFilter { + private final FilterStorage filter; + private final int hashCount; + private int bitsSize; + + public static BloomFilter createBloom(int n) { + return new BloomFilter(n, 0.001); + } + + private BloomFilter(int n, double p) { + bitsSize = (int) ((-n * Math.log(p)) / Math.pow(Math.log(2), 2)); + int bytesSize = (bitsSize + 7) / 8; + int capacity = bytesSize / 8 + 1; + bitsSize = capacity * 64; + filter = new FilterStorage(capacity); + + hashCount = Utils.countOptimalHashFunctions(bitsSize, n); + } + + + public static void fillIndexes(long base, long inc, int hashCount, int bitsetSize, long[] result) { + for (int i = 0; i < hashCount; i++) { + result[i] = (int) Utils.abs(base % bitsetSize); + base += inc; + } + } + + public void add(MemorySegment key) { + long[] indexes = new long[hashCount]; + Utils.hashKey(key, indexes); + + fillIndexes(indexes[1], indexes[0], hashCount, bitsSize, indexes); + setIndexes(indexes); + } + + private void setIndexes(long[] indexes) { + for (long index : indexes) { + filter.set((int) index); + } + } + + public FilterStorage getFilter() { + return filter; + } + + public int getFilterSize() { + return filter.size(); + } + + public static class FilterStorage { + + private final List longs; + + public FilterStorage(int capacity) { + this.longs = new ArrayList<>(Collections.nCopies(capacity, 0L)); + } + + + public boolean get(int i) { + int arrayOffset = i >> 6; + int longOffset = i - (arrayOffset << 6); + + return ((1L << (63 - longOffset)) & longs.get(arrayOffset)) != 0; + } + + public void set(int i) { + int arrayOffset = i >> 6; + int longOffset = i - (arrayOffset << 6); + + longs.set(arrayOffset, longs.get(arrayOffset) | (1L << (63 - longOffset))); + } + + public List getLongs() { + return Collections.unmodifiableList(longs); + } + + public int size() { + return longs.size(); + } + } + +} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/HashFunction.java b/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/HashFunction.java new file mode 100644 index 000000000..a017662f6 --- /dev/null +++ b/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/HashFunction.java @@ -0,0 +1,137 @@ +package ru.vk.itmo.alenkovayulya.bloomfilter; + +import java.nio.ByteBuffer; +public final class HashFunction { + private static final long C1 = 0x87c37b91114253d5L; + private static final long C2 = 0x4cf5ad432745937fL; + private static final int R1 = 31; + private static final int R2 = 27; + private static final int R3 = 33; + private static final int M = 5; + private static final int N1 = 0x52dce729; + private static final int N2 = 0x38495ab5; + + private static long fmix64(long hash) { + hash ^= hash >>> 33; + hash *= 0xff51afd7ed558ccdL; + hash ^= hash >>> 33; + hash *= 0xc4ceb9fe1a85ec53L; + hash ^= hash >>> 33; + return hash; + } + + public static void hash(final ByteBuffer data, final int offset, + final int length, final int seed, long[] indexes) { + hash128x64Internal(data, offset, length, seed & 0xffffffffL, indexes); + } + + private static long getLittleEndianLong(final ByteBuffer data, final int index) { + return ((long) data.get(index) & 0xff) + | ((long) data.get(index + 1) & 0xff) << 8 + | ((long) data.get(index + 2) & 0xff) << 16 + | ((long) data.get(index + 3) & 0xff) << 24 + | ((long) data.get(index + 4) & 0xff) << 32 + | ((long) data.get(index + 5) & 0xff) << 40 + | ((long) data.get(index + 6) & 0xff) << 48 + | ((long) data.get(index + 7) & 0xff) << 56; + } + + @SuppressWarnings("fallthrough") + private static void hash128x64Internal(final ByteBuffer data, final int offset, + final int length, final long seed, long[] indexes) { + long h1 = seed; + long h2 = seed; + final int nblocks = length >> 4; + + // body + for (int i = 0; i < nblocks; i++) { + final int index = offset + (i << 4); + long k1 = getLittleEndianLong(data, index); + long k2 = getLittleEndianLong(data, index + 8); + + // mix functions for k1 + k1 *= C1; + k1 = Long.rotateLeft(k1, R1); + k1 *= C2; + h1 ^= k1; + h1 = Long.rotateLeft(h1, R2); + h1 += h2; + h1 = h1 * M + N1; + + // mix functions for k2 + k2 *= C2; + k2 = Long.rotateLeft(k2, R3); + k2 *= C1; + h2 ^= k2; + h2 = Long.rotateLeft(h2, R1); + h2 += h1; + h2 = h2 * M + N2; + } + + // tail + long k1 = 0; + long k2 = 0; + final int index = offset + (nblocks << 4); + switch (offset + length - index) { + case 15: + k2 ^= ((long) data.get(index + 14) & 0xff) << 48; + case 14: + k2 ^= ((long) data.get(index + 13) & 0xff) << 40; + case 13: + k2 ^= ((long) data.get(index + 12) & 0xff) << 32; + case 12: + k2 ^= ((long) data.get(index + 11) & 0xff) << 24; + case 11: + k2 ^= ((long) data.get(index + 10) & 0xff) << 16; + case 10: + k2 ^= ((long) data.get(index + 9) & 0xff) << 8; + case 9: + k2 ^= data.get(index + 8) & 0xff; + k2 *= C2; + k2 = Long.rotateLeft(k2, R3); + k2 *= C1; + h2 ^= k2; + case 8: + k1 ^= ((long) data.get(index + 7) & 0xff) << 56; + case 7: + k1 ^= ((long) data.get(index + 6) & 0xff) << 48; + case 6: + k1 ^= ((long) data.get(index + 5) & 0xff) << 40; + case 5: + k1 ^= ((long) data.get(index + 4) & 0xff) << 32; + case 4: + k1 ^= ((long) data.get(index + 3) & 0xff) << 24; + case 3: + k1 ^= ((long) data.get(index + 2) & 0xff) << 16; + case 2: + k1 ^= ((long) data.get(index + 1) & 0xff) << 8; + case 1: + k1 ^= data.get(index) & 0xff; + k1 *= C1; + k1 = Long.rotateLeft(k1, R1); + k1 *= C2; + h1 ^= k1; + default: + break; + } + + // finalization + h1 ^= length; + h2 ^= length; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + indexes[0] = h1; + indexes[1] = h2; + } + private HashFunction() { + } + +} diff --git a/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/Utils.java b/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/Utils.java new file mode 100644 index 000000000..058627e38 --- /dev/null +++ b/src/main/java/ru/vk/itmo/alenkovayulya/bloomfilter/Utils.java @@ -0,0 +1,31 @@ +package ru.vk.itmo.alenkovayulya.bloomfilter; + +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; + +public class Utils { + + private Utils() {} + + public static long abs(long index) { + long negbit = index >> 63; + return (index ^ negbit) - negbit; + } + + public static void hashKey(MemorySegment key, long[] results) { + ByteBuffer data = key.asByteBuffer(); + key.asByteBuffer(); + + HashFunction.hash(data, 0, data.capacity(), 0, results); + } + + public static int countOptimalHashFunctions(int bitsSize, int n) { + int k = (int) (Math.log(2) * bitsSize / n); + + if (k < 1) { + return 1; + } + return Math.min(k, 15); + } + +} diff --git a/src/main/java/ru/vk/itmo/test/alenkovayulya/InMemoryDaoFactory.java b/src/main/java/ru/vk/itmo/test/alenkovayulya/MemorySegmentDaoFactory.java similarity index 53% rename from src/main/java/ru/vk/itmo/test/alenkovayulya/InMemoryDaoFactory.java rename to src/main/java/ru/vk/itmo/test/alenkovayulya/MemorySegmentDaoFactory.java index 497bc1dee..3e44ef6bc 100644 --- a/src/main/java/ru/vk/itmo/test/alenkovayulya/InMemoryDaoFactory.java +++ b/src/main/java/ru/vk/itmo/test/alenkovayulya/MemorySegmentDaoFactory.java @@ -3,37 +3,35 @@ import ru.vk.itmo.Config; import ru.vk.itmo.Dao; import ru.vk.itmo.Entry; -import ru.vk.itmo.alenkovayulya.InMemoryDao; -import ru.vk.itmo.alenkovayulya.PersistenceDao; +import ru.vk.itmo.alenkovayulya.AlenkovaDao; import ru.vk.itmo.test.DaoFactory; +import java.io.IOException; import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; +import java.nio.charset.StandardCharsets; -import static java.nio.charset.StandardCharsets.UTF_8; - -@DaoFactory(stage = 2) -public class InMemoryDaoFactory implements DaoFactory.Factory> { - - @Override - public Dao> createDao() { - return new InMemoryDao(); - } +@DaoFactory(stage = 52) +public class MemorySegmentDaoFactory implements DaoFactory.Factory> { @Override - public Dao> createDao(Config config) { - return new PersistenceDao(config); + public Dao> createDao(Config config) throws IOException { + return new AlenkovaDao(config); } @Override public String toString(MemorySegment memorySegment) { - return memorySegment == null ? null : new String(memorySegment.toArray(ValueLayout.JAVA_BYTE), UTF_8); + if (memorySegment == null) { + return null; + } + + byte[] array = memorySegment.toArray(ValueLayout.JAVA_BYTE); + return new String(array, StandardCharsets.UTF_8); } @Override public MemorySegment fromString(String data) { - - return data == null ? null : MemorySegment.ofArray(data.getBytes(UTF_8)); + return data == null ? null : MemorySegment.ofArray(data.getBytes(StandardCharsets.UTF_8)); } @Override diff --git a/src/test/java/ru/vk/itmo/BasicConcurrentTest.java b/src/test/java/ru/vk/itmo/BasicConcurrentTest.java index 82e3f435b..ae7fab6d4 100644 --- a/src/test/java/ru/vk/itmo/BasicConcurrentTest.java +++ b/src/test/java/ru/vk/itmo/BasicConcurrentTest.java @@ -17,7 +17,7 @@ void test_10_000(Dao> dao) throws Exception { } @DaoTest(stage = 1) - @Timeout(15) + @Timeout(3600) void testConcurrentRW_2_500(Dao> dao) throws Exception { int count = 2_500; List> entries = entries("k", "v", count);