Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If "visibility" flag is not available for some object it MUST be assumed to be true #17

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
*/
@Slf4j
public class ParallelBinaryWriter implements Closeable {

/**
* Capacity of Queue.
*/
public static final int QUEUE_CAPACITY = 1000000;

/**
* Number of threads to use.
*/
Expand Down Expand Up @@ -60,7 +66,7 @@ private boolean writeHeader(final BoundBox boundBox) {
public ParallelBinaryWriter(final OutputStream outputStream, final int noThreads, final BoundBox boundBox) {
this.writer = new BlobWriter(outputStream);
this.threads = noThreads;
writeQueue = new LinkedBlockingQueue<>(noThreads);
writeQueue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
if (!writeHeader(boundBox)) {
throw new RuntimeException("Error while creating writer and writing header");
}
Expand Down
151 changes: 115 additions & 36 deletions src/main/java/com/wolt/osm/parallelpbf/io/OSMWriter.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.wolt.osm.parallelpbf.io;

import com.wolt.osm.parallelpbf.blob.BlobWriter;
import com.wolt.osm.parallelpbf.encoder.DenseNodesEncoder;
import com.wolt.osm.parallelpbf.encoder.OsmEncoder;
import com.wolt.osm.parallelpbf.encoder.OsmEntityEncoder;
import com.wolt.osm.parallelpbf.encoder.RelationEncoder;
import com.wolt.osm.parallelpbf.encoder.StringTableEncoder;
import com.wolt.osm.parallelpbf.encoder.OsmEncoder;
import com.wolt.osm.parallelpbf.encoder.DenseNodesEncoder;
import com.wolt.osm.parallelpbf.encoder.WayEncoder;
import com.wolt.osm.parallelpbf.encoder.RelationEncoder;
import com.wolt.osm.parallelpbf.entity.Node;
import com.wolt.osm.parallelpbf.entity.OsmEntity;
import com.wolt.osm.parallelpbf.entity.Relation;
import com.wolt.osm.parallelpbf.entity.Way;
import crosby.binary.Osmformat;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Main handler for the OSM entities. Accepts entities over
Expand Down Expand Up @@ -61,16 +62,64 @@ public final class OSMWriter implements Runnable {
*/
private StringTableEncoder stringEncoder;

/**
* Time spent to write Nodes in nanoseconds.
*/
private Long encodingNodes = 0L;

/**
* Time spent to write Ways in nanoseconds.
*/
private Long encodingWays = 0L;

/**
* Time spent to write Relations in nanoseconds.
*/
private Long encodingRelations = 0L;

/**
* Total number of Nodes written.
*/
private Long totalNodes = 0L;

/**
* Total number of Ways written.
*/
private Long totalWays = 0L;

/**
* Total number of Relations written.
*/
private Long totalRelations = 0L;

/**
* Time to write buffered elements to disk in nanoseconds.
*/
private Long totalFlushTime = 0L;


/**
* OSMWriter constructor.
*
* @param output Shared BlobWriter
* @param queue input queue with entities.
*/
public OSMWriter(final BlobWriter output, final LinkedBlockingQueue<OsmEntity> queue) {
this.writer = output;
this.writeQueue = queue;
encodersReset();
}

/**
* Writes contents of encoders to the writer
* and resets encoders.
*
* @param nodesSize Estimated size of nodes group.
* @param waysSize Estimated size of ways group.
* @param nodesSize Estimated size of nodes group.
* @param waysSize Estimated size of ways group.
* @param relationSize Estimated size of relations group.
*/
private void flush(final int nodesSize, final int waysSize, final int relationSize) {
long startTime = System.currentTimeMillis();
if (nodesSize + waysSize + relationSize > 0) {
Osmformat.PrimitiveBlock.Builder block = Osmformat.PrimitiveBlock.newBuilder()
.setStringtable(stringEncoder.getStrings());
Expand All @@ -91,6 +140,10 @@ private void flush(final int nodesSize, final int waysSize, final int relationSi
}

encodersReset();
long endTime = System.currentTimeMillis();

log.info("Time spent flushing in ms {}", endTime - startTime);
totalFlushTime += endTime - startTime;
}

/**
Expand All @@ -103,46 +156,72 @@ private void encodersReset() {
this.relationEncoder = new RelationEncoder(this.stringEncoder);
}

/**
* OSMWriter constructor.
*
* @param output Shared BlobWriter
* @param queue input queue with entities.
*/
public OSMWriter(final BlobWriter output, final LinkedBlockingQueue<OsmEntity> queue) {
this.writer = output;
this.writeQueue = queue;
encodersReset();
}

@SuppressWarnings("java:S2189")
@Override
public void run() {
Thread.currentThread().setName("OSMWriter");
while (true) {
try {
OsmEntity entity = writeQueue.take();
if (entity instanceof Node) {
nodesEncoder.add((Node) entity);
} else if (entity instanceof Way) {
wayEncoder.add((Way) entity);
} else if (entity instanceof Relation) {
relationEncoder.add((Relation) entity);
} else {
log.error("Unknown entity type: {}", entity);
}

int nodesSize = nodesEncoder.estimateSize();
int waysSize = wayEncoder.estimateSize();
int relationSize = relationEncoder.estimateSize();
int blobSize = nodesSize + waysSize + relationSize + stringEncoder.getStringSize();
if (blobSize > LIMIT_BLOB_SIZE) {
flush(nodesSize, waysSize, relationSize);
}
process(entity);
} catch (InterruptedException e) {
// Thread is getting ready to die, but first,
// drain remaining elements on the queue and process them.
final LinkedList<OsmEntity> remainingObjects = new LinkedList<>();
writeQueue.drainTo(remainingObjects);
for (OsmEntity entity : remainingObjects) {
process(entity);
}
flush(nodesEncoder.estimateSize(), wayEncoder.estimateSize(), relationEncoder.estimateSize());
log.debug("OSMWriter requested to stop");
if (log.isDebugEnabled()) {
log.debug("OSMWriter requested to stop");
log.debug("Time spend encoding nodes {} seconds.", TimeUnit.NANOSECONDS.toSeconds(encodingNodes));
log.debug("Time spend encoding ways {} seconds.", TimeUnit.NANOSECONDS.toSeconds(encodingWays));
log.debug("Time spend encoding relations {} seconds.",
TimeUnit.NANOSECONDS.toSeconds(encodingRelations));
log.debug("Total time spent in flush calls in ms {}.", totalFlushTime);
log.debug("Nodes processed {}, ways processed {}, relations processed {}", totalNodes, totalWays,
totalRelations);
}
return;
}
}
}

/**
* Converts given {@link OsmEntity} to protobuf element and flushes queue to disk if necessary.
* @param entity a Node, Way or Relation to be persisted.
*/
private void process(final OsmEntity entity) {
if (entity instanceof Node) {
long startTime = System.nanoTime();
nodesEncoder.add((Node) entity);
long endTime = System.nanoTime();
encodingNodes += endTime - startTime;
totalNodes += 1;
} else if (entity instanceof Way) {
long startTime = System.nanoTime();
wayEncoder.add((Way) entity);
long endTime = System.nanoTime();
encodingWays += endTime - startTime;
totalWays += 1;
} else if (entity instanceof Relation) {
long startTime = System.nanoTime();
relationEncoder.add((Relation) entity);
long endTime = System.nanoTime();
encodingRelations += endTime - startTime;
totalRelations += 1;
} else {
log.error("Unknown entity type: {}", entity);
}

int nodesSize = nodesEncoder.estimateSize();
int waysSize = wayEncoder.estimateSize();
int relationSize = relationEncoder.estimateSize();

int blobSize = nodesSize + waysSize + relationSize + stringEncoder.getStringSize();
if (blobSize > LIMIT_BLOB_SIZE) {
flush(nodesSize, waysSize, relationSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ Info parseInfo(final Osmformat.Relation message) {
private Info convertInfo(final Osmformat.Info infoMessage) {
if (infoMessage != null) {
String username = stringTable.getS(infoMessage.getUserSid()).toStringUtf8();
boolean isVisible = !infoMessage.hasVisible() || infoMessage.getVisible();
return new Info(infoMessage.getUid(),
username,
infoMessage.getVersion(),
infoMessage.getTimestamp(),
infoMessage.getChangeset(),
infoMessage.getVisible());
isVisible);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ void testParser() {
assertEquals(1, taggedWay.getInfo().getVersion());
assertEquals(1334007464L, taggedWay.getInfo().getTimestamp());
assertEquals(11245909, taggedWay.getInfo().getChangeset());
assertFalse(taggedWay.getInfo().isVisible());
assertTrue(taggedWay.getInfo().isVisible());

testRelation();
assertEquals(24119, taggedRelation.getInfo().getUid());
assertEquals("Mauls", taggedRelation.getInfo().getUsername());
assertEquals(81, taggedRelation.getInfo().getVersion());
assertEquals(1337419064L, taggedRelation.getInfo().getTimestamp());
assertEquals(11640673, taggedRelation.getInfo().getChangeset());
assertFalse(taggedRelation.getInfo().isVisible());
assertTrue(taggedRelation.getInfo().isVisible());
}

@Test
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/com/wolt/osm/parallelpbf/TestObjectsFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public class TestObjectsFactory {
public static final Info info = new Info(1, "test", 3, 4, 5, true);

public static final Osmformat.Info infoMessage = Osmformat.Info.newBuilder().setUid(1).setUserSid(2).setVersion(3).setTimestamp(4).setChangeset(5).setVisible(true).build();
public static final Osmformat.Info infoMessageWithNullVisibleFlag = Osmformat.Info.newBuilder()
.setUid(1).setUserSid(2).setVersion(3).setTimestamp(4).setChangeset(5).build();

public static final Osmformat.StringTable stringTable = Osmformat.StringTable.newBuilder()
.addS(ByteString.copyFromUtf8(""))
.addS(ByteString.copyFromUtf8("fail"))
Expand Down Expand Up @@ -65,6 +68,14 @@ public class TestObjectsFactory {
.addRefs(9000)
.build();

public static final Osmformat.Way wayMessageWithNullVisibleFlag = Osmformat.Way.newBuilder()
.setId(1)
.addKeys(3)
.addVals(4)
.setInfo(TestObjectsFactory.infoMessageWithNullVisibleFlag)
.addRefs(9000)
.build();

public static final Osmformat.Relation relationMessage = Osmformat.Relation.newBuilder()
.setId(1)
.addKeys(3)
Expand Down
13 changes: 13 additions & 0 deletions src/test/java/com/wolt/osm/parallelpbf/parser/InfoParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ void testWayInfo() {
Assertions.assertEquals(TestObjectsFactory.info, actual);
}

@Test
void testWayInfoWithNullVisibleFlag() {
var way = Osmformat.Way.newBuilder()
.setId(1)
.setInfo(TestObjectsFactory.infoMessageWithNullVisibleFlag)
.build();

var testedObject = new InfoParser(null, TestObjectsFactory.stringTable);

var actual = testedObject.parseInfo(way);
Assertions.assertEquals(TestObjectsFactory.info, actual);
}

@Test
void testRelationInfoMissing() {
var relation = Osmformat.Relation.newBuilder()
Expand Down