Skip to content
This repository has been archived by the owner on Jul 7, 2020. It is now read-only.

WIP:implemnt kmv in icardinality #152

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<artifactId>fastutil</artifactId>
</dependency>

<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.12.0</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.clearspring.analytics.stream.cardinality;

import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.io.IOException;


Expand Down Expand Up @@ -69,4 +71,8 @@ public interface ICardinality {
* @throws CardinalityMergeException If at least one of the estimators is not compatible with this one
*/
ICardinality merge(ICardinality... estimators) throws CardinalityMergeException;

default ICardinality intersect(ICardinality... estimators) throws CardinalityMergeException{
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.clearspring.analytics.stream.cardinality;

import com.clearspring.analytics.util.IBuilder;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.theta.Intersection;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;

import java.io.IOException;
import java.io.Serializable;

public abstract class Kmv implements ICardinality {

protected Sketch sketch;
public Sketch getSketch(){
return this.sketch;
}

@Override
public long cardinality() {
return (long)sketch.getEstimate();
}

@Override
public int sizeof() {
return sketch.getRetainedEntries();
}

@Override
public byte[] getBytes() throws IOException {
return sketch.toByteArray();
}

@Override
public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException {
Union union = Union.builder().buildUnion();
for(ICardinality estimator: estimators){
if (estimator instanceof Kmv) {
union.update(((Kmv) estimator).getSketch());
} else {
throw new KmvMergeException("Estimators must be of either type UpdateableKmv or NonUpdateableKmv to merge");
}
}

union.update(this.sketch);
return new NonUpdateableKmv(union.getResult());

}

@Override
public ICardinality intersect(ICardinality... estimators) throws CardinalityMergeException {
Intersection intersection = Intersection.builder().buildIntersection();
for(ICardinality estimator: estimators){
if (estimator instanceof Kmv) {
intersection.update(((Kmv) estimator).getSketch());
} else {
throw new KmvMergeException("Estimators must be of either type UpdateableKmv or NonUpdateableKmv to merge");
}
}

intersection.update(this.sketch);
return new NonUpdateableKmv(intersection.getResult());
}



protected static class KmvMergeException extends CardinalityMergeException {

public KmvMergeException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.clearspring.analytics.stream.cardinality;

import com.yahoo.sketches.theta.CompactSketch;

import java.io.IOException;

public class NonUpdateableKmv extends Kmv {
public NonUpdateableKmv(CompactSketch sketch){
this.sketch = sketch;
}

@Override
public boolean offer(Object o) {
return false;
}

@Override
public boolean offerHashed(long hashedLong) {
return false;
}

@Override
public boolean offerHashed(int hashedInt) {
return false;
}

@Override
public int hashCode(){
return sketch.hashCode();
}

@Override
public boolean equals(Object o){
if (o instanceof NonUpdateableKmv){
return this.sketch.equals(((NonUpdateableKmv)o).getSketch());
} else return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.clearspring.analytics.stream.cardinality;

import com.clearspring.analytics.hash.MurmurHash;
import com.clearspring.analytics.util.IBuilder;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.theta.*;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.Serializable;

public class UpdateableKmv extends Kmv implements ICardinality {

public UpdateableKmv(){
this.sketch = new UpdateSketchBuilder().setNominalEntries(4096).build();
}
public UpdateableKmv(int nominalEntries){
this.sketch = new UpdateSketchBuilder().setNominalEntries(nominalEntries).build();
}

public UpdateableKmv(UpdateSketch sketch){
this.sketch = sketch;
}


@Override
public boolean offer(Object o) {
final int x = MurmurHash.hash(o);
return offerHashed(x);
}

@Override
public boolean offerHashed(long hashedLong) {
UpdateReturnState updateState = ((UpdateSketch)sketch).update(hashedLong);
return wasUpdated(updateState);
}

@Override
public boolean offerHashed(int hashedInt) {
UpdateReturnState updateState = ((UpdateSketch)sketch).update(hashedInt);
return wasUpdated(updateState);
}

private boolean wasUpdated(UpdateReturnState state){
boolean updated = false;
switch (state){
case InsertedCountIncremented:
updated = true;
break;
case InsertedCountNotIncremented:
updated = true;
break;
default:
updated = false;
}
return updated;
}

@Override
public int hashCode(){
return sketch.hashCode();
}

@Override
public boolean equals(Object o){
if (o instanceof UpdateableKmv){
return this.sketch.equals(((UpdateableKmv)o).getSketch());
} else return false;
}

public static class Builder implements IBuilder<ICardinality>, Serializable {
private final int n;

public Builder(int n) {
this.n = n;
}
public Builder(){ this.n = 4096;}

@Override
public UpdateableKmv build() {
return new UpdateableKmv(n);
}

@Override
public int sizeof() {
return n;
}

public static UpdateableKmv build(byte[] bytes) throws IOException {
Sketch sketch = Sketches.wrapSketch(Memory.wrap(bytes));
try {
return new UpdateableKmv(((UpdateSketch)sketch));
}catch(Exception ex){
throw new IOException("unable to build UpdateSketch from bytes");
}
}
}
}