diff --git a/pom.xml b/pom.xml
index f084ee611..d4d576a8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,12 @@
fastutil
+
+ com.yahoo.datasketches
+ sketches-core
+ 0.12.0
+
+
junit
diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java b/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java
index f015b6f8a..9b36b5ea9 100644
--- a/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java
+++ b/src/main/java/com/clearspring/analytics/stream/cardinality/ICardinality.java
@@ -16,6 +16,8 @@
package com.clearspring.analytics.stream.cardinality;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
import java.io.IOException;
@@ -69,4 +71,8 @@ public interface ICardinality {
* @throws CardinalityMergeException If at least one of the estimators is not compatible with this one
*/
ICardinality merge(ICardinality... estimators) throws CardinalityMergeException;
+
+ default ICardinality intersect(ICardinality... estimators) throws CardinalityMergeException{
+ throw new NotImplementedException();
+ }
}
diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/Kmv.java b/src/main/java/com/clearspring/analytics/stream/cardinality/Kmv.java
new file mode 100644
index 000000000..b517d16a7
--- /dev/null
+++ b/src/main/java/com/clearspring/analytics/stream/cardinality/Kmv.java
@@ -0,0 +1,74 @@
+package com.clearspring.analytics.stream.cardinality;
+
+import com.clearspring.analytics.util.IBuilder;
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.theta.Intersection;
+import com.yahoo.sketches.theta.Sketch;
+import com.yahoo.sketches.theta.Sketches;
+import com.yahoo.sketches.theta.Union;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class Kmv implements ICardinality {
+
+ protected Sketch sketch;
+ public Sketch getSketch(){
+ return this.sketch;
+ }
+
+ @Override
+ public long cardinality() {
+ return (long)sketch.getEstimate();
+ }
+
+ @Override
+ public int sizeof() {
+ return sketch.getRetainedEntries();
+ }
+
+ @Override
+ public byte[] getBytes() throws IOException {
+ return sketch.toByteArray();
+ }
+
+ @Override
+ public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException {
+ Union union = Union.builder().buildUnion();
+ for(ICardinality estimator: estimators){
+ if (estimator instanceof Kmv) {
+ union.update(((Kmv) estimator).getSketch());
+ } else {
+ throw new KmvMergeException("Estimators must be of either type UpdateableKmv or NonUpdateableKmv to merge");
+ }
+ }
+
+ union.update(this.sketch);
+ return new NonUpdateableKmv(union.getResult());
+
+ }
+
+ @Override
+ public ICardinality intersect(ICardinality... estimators) throws CardinalityMergeException {
+ Intersection intersection = Intersection.builder().buildIntersection();
+ for(ICardinality estimator: estimators){
+ if (estimator instanceof Kmv) {
+ intersection.update(((Kmv) estimator).getSketch());
+ } else {
+ throw new KmvMergeException("Estimators must be of either type UpdateableKmv or NonUpdateableKmv to merge");
+ }
+ }
+
+ intersection.update(this.sketch);
+ return new NonUpdateableKmv(intersection.getResult());
+ }
+
+
+
+ protected static class KmvMergeException extends CardinalityMergeException {
+
+ public KmvMergeException(String message) {
+ super(message);
+ }
+ }
+}
diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/NonUpdateableKmv.java b/src/main/java/com/clearspring/analytics/stream/cardinality/NonUpdateableKmv.java
new file mode 100644
index 000000000..ec591cd8f
--- /dev/null
+++ b/src/main/java/com/clearspring/analytics/stream/cardinality/NonUpdateableKmv.java
@@ -0,0 +1,39 @@
+package com.clearspring.analytics.stream.cardinality;
+
+import com.yahoo.sketches.theta.CompactSketch;
+
+import java.io.IOException;
+
+public class NonUpdateableKmv extends Kmv {
+ public NonUpdateableKmv(CompactSketch sketch){
+ this.sketch = sketch;
+ }
+
+ @Override
+ public boolean offer(Object o) {
+ return false;
+ }
+
+ @Override
+ public boolean offerHashed(long hashedLong) {
+ return false;
+ }
+
+ @Override
+ public boolean offerHashed(int hashedInt) {
+ return false;
+ }
+
+ @Override
+ public int hashCode(){
+ return sketch.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if (o instanceof NonUpdateableKmv){
+ return this.sketch.equals(((NonUpdateableKmv)o).getSketch());
+ } else return false;
+ }
+
+}
diff --git a/src/main/java/com/clearspring/analytics/stream/cardinality/UpdateableKmv.java b/src/main/java/com/clearspring/analytics/stream/cardinality/UpdateableKmv.java
new file mode 100644
index 000000000..12a2938f7
--- /dev/null
+++ b/src/main/java/com/clearspring/analytics/stream/cardinality/UpdateableKmv.java
@@ -0,0 +1,99 @@
+package com.clearspring.analytics.stream.cardinality;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.util.IBuilder;
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.theta.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+public class UpdateableKmv extends Kmv implements ICardinality {
+
+ public UpdateableKmv(){
+ this.sketch = new UpdateSketchBuilder().setNominalEntries(4096).build();
+ }
+ public UpdateableKmv(int nominalEntries){
+ this.sketch = new UpdateSketchBuilder().setNominalEntries(nominalEntries).build();
+ }
+
+ public UpdateableKmv(UpdateSketch sketch){
+ this.sketch = sketch;
+ }
+
+
+ @Override
+ public boolean offer(Object o) {
+ final int x = MurmurHash.hash(o);
+ return offerHashed(x);
+ }
+
+ @Override
+ public boolean offerHashed(long hashedLong) {
+ UpdateReturnState updateState = ((UpdateSketch)sketch).update(hashedLong);
+ return wasUpdated(updateState);
+ }
+
+ @Override
+ public boolean offerHashed(int hashedInt) {
+ UpdateReturnState updateState = ((UpdateSketch)sketch).update(hashedInt);
+ return wasUpdated(updateState);
+ }
+
+ private boolean wasUpdated(UpdateReturnState state){
+ boolean updated = false;
+ switch (state){
+ case InsertedCountIncremented:
+ updated = true;
+ break;
+ case InsertedCountNotIncremented:
+ updated = true;
+ break;
+ default:
+ updated = false;
+ }
+ return updated;
+ }
+
+ @Override
+ public int hashCode(){
+ return sketch.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if (o instanceof UpdateableKmv){
+ return this.sketch.equals(((UpdateableKmv)o).getSketch());
+ } else return false;
+ }
+
+ public static class Builder implements IBuilder, Serializable {
+ private final int n;
+
+ public Builder(int n) {
+ this.n = n;
+ }
+ public Builder(){ this.n = 4096;}
+
+ @Override
+ public UpdateableKmv build() {
+ return new UpdateableKmv(n);
+ }
+
+ @Override
+ public int sizeof() {
+ return n;
+ }
+
+ public static UpdateableKmv build(byte[] bytes) throws IOException {
+ Sketch sketch = Sketches.wrapSketch(Memory.wrap(bytes));
+ try {
+ return new UpdateableKmv(((UpdateSketch)sketch));
+ }catch(Exception ex){
+ throw new IOException("unable to build UpdateSketch from bytes");
+ }
+ }
+ }
+}