From 29b6c57ea1fc7a26e593299ac79163926819f1e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Mon, 10 Sep 2018 16:47:10 +0200 Subject: [PATCH 01/17] Skeleton AMS Sketch monoid --- .../com/twitter/algebird/AMSSketch.scala | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala new file mode 100644 index 000000000..2296b912c --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -0,0 +1,60 @@ +package com.twitter.algebird + +import cats.kernel.CommutativeMonoid + +import scala.annotation.tailrec +import scala.util.Random + + +/** + * AMS sketch : maintaining a array of counts with all element arriving. + * + * AMS is a matrix of d x t counters (d row of length t). + * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U + * - A other hash function gj maps element from U to {-1, +1} + * + * */ + + +object AMSSketch { + def apply[A](buckets : Int, depth : Int)(implicit hash: Hash128[A]): AMSSketch = new AMSSketch() +} + +class AMSSketch { + +} + + + +class AMSSketchMonoid[K : CMSHash[K]]() extends Monoid[CMS[K]] with CommutativeMonoid[CMS[K]] { + override def zero: CMS[K] = ??? + + override def plus(x: CMS[K], y: CMS[K]): CMS[K] = ??? +} + +class AMSSketchZero[A]() extends AMSSketchMonoid[A] { +} + +object AMSFunction { + def fourwise[K : CMSHasher](a : Long, b : Long, c : Long, d : Long, x : Long)(implicit hash128: Hash128[Long]) : Long = { + 1L + } + + def generateHash[K : CMSHasher](numHashes : Int, counters : Int) : Seq[CMSHash[K]] = { + + @tailrec + def createHash(buffer : Seq[CMSHash[K]], idx : Int, seed : Int): Seq[CMSHash[K]] ={ + if (idx == 0 ) buffer else createHash(buffer:+CMSHasher[K](Random.nextInt(seed)), idx - 1, seed) + } + + + + null + } + +} + +sealed abstract class AMS[A] { + +} + From 37eec55a49a4226c1c8eac22ef53f7da6e745dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Tue, 11 Sep 2018 18:32:35 +0200 Subject: [PATCH 02/17] AMSCounting trait + AMS sealed + AMSZero, Item, Instance --- .../com/twitter/algebird/AMSSketch.scala | 116 +++++++++++++++--- .../com/twitter/algebird/AMSSketchTest.scala | 23 ++++ 2 files changed, 122 insertions(+), 17 deletions(-) create mode 100644 algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index 2296b912c..08d5a40f6 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -1,6 +1,7 @@ package com.twitter.algebird import cats.kernel.CommutativeMonoid +import com.twitter.algebird.CMSInstance.CountsTable import scala.annotation.tailrec import scala.util.Random @@ -15,46 +16,127 @@ import scala.util.Random * * */ +case class AMSParams[K](hashes : Seq[CMSHash[K]], + depth : Int, + bucket : Int){ + require(depth > 0 && bucket > 0, "buckets and depth should be positive") + require( + hashes.size >= depth, + s"we require at least $depth hash functions") +} + +object AMSFunction { + + def fourwise[K : CMSHasher](a : Long, b : Long, c : Long, d : Long, x : Long) : Long = { + 1L + } + + def generateHash[K : CMSHasher](numHashes : Int, counters : Int) : Seq[CMSHash[K]] = { + + @tailrec + def createHash(buffer : Seq[CMSHash[K]], idx : Int, seed : Int): Seq[CMSHash[K]] ={ + if (idx == 0 ) buffer else createHash(buffer:+CMSHash[K](Random.nextInt(seed), 0, counters), idx - 1, seed) + } + createHash(Seq.empty[CMSHash[K]], numHashes, counters) + } +} + object AMSSketch { - def apply[A](buckets : Int, depth : Int)(implicit hash: Hash128[A]): AMSSketch = new AMSSketch() + def apply[A](buckets : Int, depth : Int): AMSSketch = new AMSSketch() } class AMSSketch { + } +trait AMSCounting[K, C[_]] { + + def +(item : K) + + def ++(other : C[K]) + + def f1 : Long = totalCount + def innerProduct(other: C[K]): Approximate[Long] -class AMSSketchMonoid[K : CMSHash[K]]() extends Monoid[CMS[K]] with CommutativeMonoid[CMS[K]] { - override def zero: CMS[K] = ??? + def f2 : Approximate[Long] - override def plus(x: CMS[K], y: CMS[K]): CMS[K] = ??? + def totalCount : Long } -class AMSSketchZero[A]() extends AMSSketchMonoid[A] { +class AMSMonoid[K : CMSHasher](depth : Int, buckets : Int) extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { + override def zero: AMS[K] = ??? + + override def plus(x: AMS[K], y: AMS[K]): AMS[K] = ??? } -object AMSFunction { - def fourwise[K : CMSHasher](a : Long, b : Long, c : Long, d : Long, x : Long)(implicit hash128: Hash128[Long]) : Long = { - 1L - } +class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { + override def depth: Int = 0 - def generateHash[K : CMSHasher](numHashes : Int, counters : Int) : Seq[CMSHash[K]] = { + override val totalCount: Long = 0 - @tailrec - def createHash(buffer : Seq[CMSHash[K]], idx : Int, seed : Int): Seq[CMSHash[K]] ={ - if (idx == 0 ) buffer else createHash(buffer:+CMSHasher[K](Random.nextInt(seed)), idx - 1, seed) - } + override def buckets: Int = 0 + override def +(item: A): Unit = ??? + override def ++(other: AMS[A]): Unit = ??? - null - } + override def innerProduct(other: AMS[A]): Approximate[Long] = ??? + + override def f2: Approximate[Long] = ??? +} + +class AMSItem[A](item : A, + override val totalCount : Long, + override val params: AMSParams[A]) + extends AMS[A](params) { + + override def depth: Int = 1 + + override def buckets: Int = 1 + + override def +(item: A): Unit = ??? + + override def ++(other: AMS[A]): Unit = ??? + + override def innerProduct(other: AMS[A]): Approximate[Long] = ??? + + override def f2: Approximate[Long] = ??? +} +class AMSInstances[A](countsTable: CountsTable[A], + override val params: AMSParams[A], + override val totalCount: Long) + extends AMS[A](params) { + + override def depth: Int = params.depth + + override def buckets: Int = params.bucket + + override def +(item: A): Unit = ??? + + override def ++(other: AMS[A]): Unit = ??? + + override def innerProduct(other: AMS[A]): Approximate[Long] = ??? + + override def f2: Approximate[Long] = ??? } -sealed abstract class AMS[A] { +object AMSInstances { + def apply[A](params: AMSParams[A]): AMSInstances[A] = { + val countsTable = CountsTable[A](params.depth, params.bucket) + new AMSInstances[A](countsTable, params, 0) + } +} + + +sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { + + def depth : Int + + def buckets : Int } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala new file mode 100644 index 000000000..52ffba385 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -0,0 +1,23 @@ +package com.twitter.algebird + +import org.scalatest.{Matchers, WordSpec} + +class AMSSketchFunction extends WordSpec with Matchers{ + + "An AMS Function" should { + "return proper number of hashes : " in { + val hashes = AMSFunction.generateHash[Int](10, 10) + assert(hashes.size == 10) + } + } +} + + +class AMSSketchTest extends WordSpec with Matchers{ + + "An AMS Sketch" should { + + + + } +} From 36ee869ec8309deac5851d3d57e49294ed014889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Tue, 11 Sep 2018 18:51:41 +0200 Subject: [PATCH 03/17] Fix value in trait --- .../com/twitter/algebird/AMSSketch.scala | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index 08d5a40f6..a9310d273 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -53,9 +53,9 @@ class AMSSketch { trait AMSCounting[K, C[_]] { - def +(item : K) + def +(item : K) : C[K] - def ++(other : C[K]) + def ++(other : C[K]) : C[K] def f1 : Long = totalCount @@ -67,46 +67,46 @@ trait AMSCounting[K, C[_]] { } class AMSMonoid[K : CMSHasher](depth : Int, buckets : Int) extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { - override def zero: AMS[K] = ??? + val params = AMSParams(AMSFunction.generateHash(depth, buckets), depth, buckets) - override def plus(x: AMS[K], y: AMS[K]): AMS[K] = ??? + override def zero: AMS[K] = AMSZero[K](params) + + override def plus(x: AMS[K], y: AMS[K]): AMS[K] = x ++ y } -class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { +case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { override def depth: Int = 0 override val totalCount: Long = 0 override def buckets: Int = 0 - override def +(item: A): Unit = ??? - - override def ++(other: AMS[A]): Unit = ??? override def innerProduct(other: AMS[A]): Approximate[Long] = ??? - override def f2: Approximate[Long] = ??? + override def +(item: A): AMS[A] = ??? + + override def ++(other: AMS[A]): AMS[A] = ??? } -class AMSItem[A](item : A, +case class AMSItem[A](item : A, override val totalCount : Long, override val params: AMSParams[A]) extends AMS[A](params) { - + override def depth: Int = 1 override def buckets: Int = 1 - override def +(item: A): Unit = ??? - - override def ++(other: AMS[A]): Unit = ??? override def innerProduct(other: AMS[A]): Approximate[Long] = ??? - override def f2: Approximate[Long] = ??? + override def +(item: A): AMS[A] = ??? + + override def ++(other: AMS[A]): AMS[A] = ??? } -class AMSInstances[A](countsTable: CountsTable[A], +case class AMSInstances[A](countsTable: CountsTable[A], override val params: AMSParams[A], override val totalCount: Long) extends AMS[A](params) { @@ -115,13 +115,12 @@ class AMSInstances[A](countsTable: CountsTable[A], override def buckets: Int = params.bucket - override def +(item: A): Unit = ??? - - override def ++(other: AMS[A]): Unit = ??? override def innerProduct(other: AMS[A]): Approximate[Long] = ??? - override def f2: Approximate[Long] = ??? + override def +(item: A): AMS[A] = ??? + + override def ++(other: AMS[A]): AMS[A] = ??? } @@ -138,5 +137,7 @@ sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AM def depth : Int def buckets : Int + + override val f2: Approximate[Long] = innerProduct(this) } From fc542dbff6c7aa5527c32df05731ad3b4cd0cca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Thu, 13 Sep 2018 11:38:22 +0200 Subject: [PATCH 04/17] Add Sketch Item operator and first test --- .../com/twitter/algebird/AMSSketch.scala | 64 ++++++++++++++----- .../com/twitter/algebird/AMSSketchTest.scala | 14 +++- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index a9310d273..cca6946fa 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -35,7 +35,7 @@ object AMSFunction { @tailrec def createHash(buffer : Seq[CMSHash[K]], idx : Int, seed : Int): Seq[CMSHash[K]] ={ - if (idx == 0 ) buffer else createHash(buffer:+CMSHash[K](Random.nextInt(seed), 0, counters), idx - 1, seed) + if (idx == 0 ) buffer else createHash(buffer:+CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) } createHash(Seq.empty[CMSHash[K]], numHashes, counters) } @@ -53,7 +53,9 @@ class AMSSketch { trait AMSCounting[K, C[_]] { - def +(item : K) : C[K] + def +(item: K): C[K] = this + (item, 1L) + + def +(item : K, count : Long) : C[K] def ++(other : C[K]) : C[K] @@ -67,11 +69,22 @@ trait AMSCounting[K, C[_]] { } class AMSMonoid[K : CMSHasher](depth : Int, buckets : Int) extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { - val params = AMSParams(AMSFunction.generateHash(depth, buckets), depth, buckets) + val params = AMSParams(AMSFunction.generateHash(depth, buckets), depth, buckets) override def zero: AMS[K] = AMSZero[K](params) override def plus(x: AMS[K], y: AMS[K]): AMS[K] = x ++ y + + /** + * Creates a sketch out of a single item. + */ + def create(item: K): AMS[K] = AMSItem[K](item, 1L, params) + + /** + * Creates a sketch out of multiple items. + */ + def create(data: Seq[K]): AMS[K] = ??? + } case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { @@ -81,12 +94,12 @@ case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) override def buckets: Int = 0 - override def innerProduct(other: AMS[A]): Approximate[Long] = ??? - override def +(item: A): AMS[A] = ??? - override def ++(other: AMS[A]): AMS[A] = ??? + override def ++(other: AMS[A]): AMS[A] = other + + override def +(item: A, count: Long): AMS[A] = AMSItem(item, count, params) } case class AMSItem[A](item : A, @@ -94,16 +107,24 @@ case class AMSItem[A](item : A, override val params: AMSParams[A]) extends AMS[A](params) { - override def depth: Int = 1 + override def depth: Int = params.depth - override def buckets: Int = 1 + override def buckets: Int = params.bucket - override def innerProduct(other: AMS[A]): Approximate[Long] = ??? + override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) - override def +(item: A): AMS[A] = ??? + override def ++(other: AMS[A]): AMS[A] = other match { + case other : AMSZero[A] => this - override def ++(other: AMS[A]): AMS[A] = ??? + case other : AMSItem[A] => AMSInstances(params) + (item, totalCount) + (other.item, other.totalCount) + + case other : AMSInstances[A] => other + (item, totalCount) + } + + override def +(item: A, count: Long): AMS[A] = { + AMSInstances(params) + (this.item , totalCount) + (item, count) + } } case class AMSInstances[A](countsTable: CountsTable[A], @@ -115,12 +136,25 @@ case class AMSInstances[A](countsTable: CountsTable[A], override def buckets: Int = params.bucket - - override def innerProduct(other: AMS[A]): Approximate[Long] = ??? - - override def +(item: A): AMS[A] = ??? + override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) override def ++(other: AMS[A]): AMS[A] = ??? + + override def +(item: A, count: Long): AMS[A] = { + require(count >= 0 , "cannot add negative count element to AMS Sketch") + + if (count != 0L){ + var offset = 0 + params.hashes.foreach(hash => { + val h = hash(item) % params.bucket + val mult = Random.nextInt(1) + if (mult == 1) countsTable + ((0, h) , count ) else countsTable + ((0, h) , -count ) + offset += 1 + }) + AMSInstances(countsTable, params, totalCount + count) + } + else this + } } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 52ffba385..4bfd4c4de 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -12,12 +12,20 @@ class AMSSketchFunction extends WordSpec with Matchers{ } } +class AMSSketchItemTest extends WordSpec with Matchers { -class AMSSketchTest extends WordSpec with Matchers{ - - "An AMS Sketch" should { + val width = 10 + val buckets = 15 + "an AMSItem " should { + "return an instance with other item" in { + val params = AMSParams[String](AMSFunction.generateHash[String](width, buckets), width, buckets) + val amsIt = AMSItem[String]("item-0", 1, params) + val res = amsIt + ("item-1", 1) + assert(res.totalCount == 2) + assert(res.isInstanceOf[AMSInstances[String]]) + } } } From 197f5f45b379e97efdcb2b3c774eda290ff07cd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Thu, 13 Sep 2018 19:01:44 +0200 Subject: [PATCH 05/17] Add the fourwise guys in the + operator --- .../com/twitter/algebird/AMSSketch.scala | 160 ++++++++++-------- .../com/twitter/algebird/AMSSketchTest.scala | 32 +++- 2 files changed, 118 insertions(+), 74 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index cca6946fa..b18b9cc50 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -1,3 +1,4 @@ + package com.twitter.algebird import cats.kernel.CommutativeMonoid @@ -6,29 +7,41 @@ import com.twitter.algebird.CMSInstance.CountsTable import scala.annotation.tailrec import scala.util.Random - /** - * AMS sketch : maintaining a array of counts with all element arriving. - * - * AMS is a matrix of d x t counters (d row of length t). - * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U - * - A other hash function gj maps element from U to {-1, +1} - * - * */ - -case class AMSParams[K](hashes : Seq[CMSHash[K]], - depth : Int, - bucket : Int){ + * AMS sketch : maintaining a array of counts with all element arriving. + * + * AMS is a matrix of d x t counters (d row of length t). + * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U + * - A other hash function gj maps element from U to {-1, +1} + * + * */ +case class AMSParams[K : CMSHasher](depth: Int, bucket: Int) { require(depth > 0 && bucket > 0, "buckets and depth should be positive") - require( - hashes.size >= depth, - s"we require at least $depth hash functions") + + def randoms: Seq[Seq[Int]] = AMSFunction.generateRandom(depth) + + def hash(a : Int, b : Int, width : Int = Int.MaxValue) : CMSHash[K] = CMSHash[K](a, b, width) + } -object AMSFunction { +object AMSFunction { + val fourwiseSize = 6 + + def generateRandom(depht : Int) : Seq[Seq[Int]]= { + Seq.fill[Seq[Int]](fourwiseSize)(Seq.fill[Int](depht)(Random.nextInt().abs)) + } + + + def hashValue[K : CMSHasher](item : K, a : Int, b : Int, width : Int = Int.MaxValue) : Int = { + CMSHash[K](a, b, width).apply(item) + } + + def fourwise(a : Int, b : Int, c : Int, d : Int, itemHashed : Int) : Long = { + val hash1 = CMSHash[Int](itemHashed, a, Int.MaxValue).apply(b) + val hash2 = CMSHash[Int](hash1, itemHashed, Int.MaxValue).apply(c) + val hash3 = CMSHash[Int](hash2, itemHashed, Int.MaxValue).apply(d) - def fourwise[K : CMSHasher](a : Long, b : Long, c : Long, d : Long, x : Long) : Long = { - 1L + hash3 } def generateHash[K : CMSHasher](numHashes : Int, counters : Int) : Seq[CMSHash[K]] = { @@ -39,55 +52,56 @@ object AMSFunction { } createHash(Seq.empty[CMSHash[K]], numHashes, counters) } -} +} object AMSSketch { - def apply[A](buckets : Int, depth : Int): AMSSketch = new AMSSketch() + def apply[A](buckets: Int, depth: Int): AMSSketch = new AMSSketch() } -class AMSSketch { - - -} +class AMSSketch {} trait AMSCounting[K, C[_]] { def +(item: K): C[K] = this + (item, 1L) - def +(item : K, count : Long) : C[K] + def +(item: K, count: Long): C[K] - def ++(other : C[K]) : C[K] + def ++(other: C[K]): C[K] - def f1 : Long = totalCount + def f1: Long = totalCount def innerProduct(other: C[K]): Approximate[Long] - def f2 : Approximate[Long] + def f2: Approximate[Long] - def totalCount : Long + def frequency(item: K): Approximate[Long] + + def totalCount: Long } -class AMSMonoid[K : CMSHasher](depth : Int, buckets : Int) extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { - val params = AMSParams(AMSFunction.generateHash(depth, buckets), depth, buckets) +class AMSMonoid[K: CMSHasher](depth: Int, buckets: Int) + extends Monoid[AMS[K]] + with CommutativeMonoid[AMS[K]] { + val params = AMSParams[K](depth, buckets ) override def zero: AMS[K] = AMSZero[K](params) override def plus(x: AMS[K], y: AMS[K]): AMS[K] = x ++ y /** - * Creates a sketch out of a single item. - */ + * Creates a sketch out of a single item. + */ def create(item: K): AMS[K] = AMSItem[K](item, 1L, params) /** - * Creates a sketch out of multiple items. - */ + * Creates a sketch out of multiple items. + */ def create(data: Seq[K]): AMS[K] = ??? } -case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { +case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { override def depth: Int = 0 override val totalCount: Long = 0 @@ -96,82 +110,94 @@ case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) override def innerProduct(other: AMS[A]): Approximate[Long] = ??? - override def ++(other: AMS[A]): AMS[A] = other override def +(item: A, count: Long): AMS[A] = AMSItem(item, count, params) + + override def frequency(item: A): Approximate[Long] = Approximate.exact(0L) } -case class AMSItem[A](item : A, - override val totalCount : Long, - override val params: AMSParams[A]) - extends AMS[A](params) { +case class AMSItem[A](item: A, override val totalCount: Long, override val params: AMSParams[A]) + extends AMS[A](params) { override def depth: Int = params.depth override def buckets: Int = params.bucket - override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) override def ++(other: AMS[A]): AMS[A] = other match { - case other : AMSZero[A] => this + case other: AMSZero[A] => this - case other : AMSItem[A] => AMSInstances(params) + (item, totalCount) + (other.item, other.totalCount) + case other: AMSItem[A] => AMSInstances(params) + (item, totalCount) + (other.item, other.totalCount) - case other : AMSInstances[A] => other + (item, totalCount) + case other: AMSInstances[A] => other + (item, totalCount) } - override def +(item: A, count: Long): AMS[A] = { - AMSInstances(params) + (this.item , totalCount) + (item, count) - } + override def +(item: A, count: Long): AMS[A] = + AMSInstances(params) + (this.item, totalCount) + (item, count) + + override def frequency(item: A): Approximate[Long] = + if (this.item == item) Approximate.exact(1L) else Approximate.exact(0L) } case class AMSInstances[A](countsTable: CountsTable[A], - override val params: AMSParams[A], - override val totalCount: Long) - extends AMS[A](params) { + override val params: AMSParams[A], + override val totalCount: Long) + extends AMS[A](params) { override def depth: Int = params.depth override def buckets: Int = params.bucket + // TODO override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) override def ++(other: AMS[A]): AMS[A] = ??? override def +(item: A, count: Long): AMS[A] = { - require(count >= 0 , "cannot add negative count element to AMS Sketch") - - if (count != 0L){ + require(count >= 0, "cannot add negative count element to AMS Sketch") + if (count != 0L) { var offset = 0 - params.hashes.foreach(hash => { - val h = hash(item) % params.bucket - val mult = Random.nextInt(1) - if (mult == 1) countsTable + ((0, h) , count ) else countsTable + ((0, h) , -count ) + + for (j <- 0 until depth) { + + val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(item) + + val mult = AMSFunction.fourwise(params.randoms(2)(j), + params.randoms(3)(j), + params.randoms(4)(j), + params.randoms(5)(j), hash) + + // TODO : To be changed. + if ((mult & 1) == 1) countsTable + ((offset, hash), count) + else countsTable + ((offset, hash), -count) + offset += 1 - }) + } AMSInstances(countsTable, params, totalCount + count) - } - else this + } else this } -} + override def frequency(item: A): Approximate[Long] = { + for (n <- 0 until params.depth) {} + Approximate.exact(0L) + } +} object AMSInstances { def apply[A](params: AMSParams[A]): AMSInstances[A] = { val countsTable = CountsTable[A](params.depth, params.bucket) + new AMSInstances[A](countsTable, params, 0) } } - sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { - def depth : Int + def depth: Int - def buckets : Int + def buckets: Int - override val f2: Approximate[Long] = innerProduct(this) + override val f2: Approximate[Long] = innerProduct(this) } - diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 4bfd4c4de..8c28c4f29 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -2,14 +2,17 @@ package com.twitter.algebird import org.scalatest.{Matchers, WordSpec} -class AMSSketchFunction extends WordSpec with Matchers{ +class AMSSketchFunction extends WordSpec with Matchers { + + " AMSFunction " should { + "return random number " in { + val randoms = AMSFunction.generateRandom(10) + assert(randoms.size == 6) - "An AMS Function" should { - "return proper number of hashes : " in { - val hashes = AMSFunction.generateHash[Int](10, 10) - assert(hashes.size == 10) } + } + } class AMSSketchItemTest extends WordSpec with Matchers { @@ -19,13 +22,28 @@ class AMSSketchItemTest extends WordSpec with Matchers { "an AMSItem " should { "return an instance with other item" in { - val params = AMSParams[String](AMSFunction.generateHash[String](width, buckets), width, buckets) - val amsIt = AMSItem[String]("item-0", 1, params) + val params = AMSParams[String]( width, buckets) + val amsIt = AMSItem[String]("item-0", 1, params) val res = amsIt + ("item-1", 1) assert(res.totalCount == 2) assert(res.isInstanceOf[AMSInstances[String]]) } + } +} + +class AMSSketchInstanceTest extends WordSpec with Matchers { + val width = 10 + val buckets = 15 + "AMSSketch instance " should { + + "add item and update the count " in { + val params = AMSParams[String]( width, buckets) + val aMSInstances = AMSInstances(params) + val res = aMSInstances + ("item-2", 1) + assert(res.totalCount == 1) + } } + } From cd07d8a7d5d19f7aeb977ca7bd92f28e74862db2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Fri, 14 Sep 2018 07:19:10 +0200 Subject: [PATCH 06/17] Improve add item to AMS instances + start frequency method --- .../com/twitter/algebird/AMSSketch.scala | 53 ++++++++++++------- .../com/twitter/algebird/AMSSketchTest.scala | 5 +- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index b18b9cc50..a20d3bfe7 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -1,4 +1,3 @@ - package com.twitter.algebird import cats.kernel.CommutativeMonoid @@ -15,28 +14,25 @@ import scala.util.Random * - A other hash function gj maps element from U to {-1, +1} * * */ -case class AMSParams[K : CMSHasher](depth: Int, bucket: Int) { +case class AMSParams[K: CMSHasher](depth: Int, bucket: Int) { require(depth > 0 && bucket > 0, "buckets and depth should be positive") - def randoms: Seq[Seq[Int]] = AMSFunction.generateRandom(depth) + val randoms: Seq[Seq[Int]] = AMSFunction.generateRandom(depth) - def hash(a : Int, b : Int, width : Int = Int.MaxValue) : CMSHash[K] = CMSHash[K](a, b, width) + def hash(a: Int, b: Int, width: Int = Int.MaxValue): CMSHash[K] = CMSHash[K](a, b, width) } object AMSFunction { val fourwiseSize = 6 - def generateRandom(depht : Int) : Seq[Seq[Int]]= { + def generateRandom(depht: Int): Seq[Seq[Int]] = Seq.fill[Seq[Int]](fourwiseSize)(Seq.fill[Int](depht)(Random.nextInt().abs)) - } - - def hashValue[K : CMSHasher](item : K, a : Int, b : Int, width : Int = Int.MaxValue) : Int = { + def hashValue[K: CMSHasher](item: K, a: Int, b: Int, width: Int = Int.MaxValue): Int = CMSHash[K](a, b, width).apply(item) - } - def fourwise(a : Int, b : Int, c : Int, d : Int, itemHashed : Int) : Long = { + def fourwise(a: Int, b: Int, c: Int, d: Int, itemHashed: Int): Long = { val hash1 = CMSHash[Int](itemHashed, a, Int.MaxValue).apply(b) val hash2 = CMSHash[Int](hash1, itemHashed, Int.MaxValue).apply(c) val hash3 = CMSHash[Int](hash2, itemHashed, Int.MaxValue).apply(d) @@ -44,15 +40,13 @@ object AMSFunction { hash3 } - def generateHash[K : CMSHasher](numHashes : Int, counters : Int) : Seq[CMSHash[K]] = { + def generateHash[K: CMSHasher](numHashes: Int, counters: Int): Seq[CMSHash[K]] = { @tailrec - def createHash(buffer : Seq[CMSHash[K]], idx : Int, seed : Int): Seq[CMSHash[K]] ={ - if (idx == 0 ) buffer else createHash(buffer:+CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) - } + def createHash(buffer: Seq[CMSHash[K]], idx: Int, seed: Int): Seq[CMSHash[K]] = + if (idx == 0) buffer else createHash(buffer :+ CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) createHash(Seq.empty[CMSHash[K]], numHashes, counters) } - } object AMSSketch { @@ -83,7 +77,7 @@ trait AMSCounting[K, C[_]] { class AMSMonoid[K: CMSHasher](depth: Int, buckets: Int) extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { - val params = AMSParams[K](depth, buckets ) + val params = AMSParams[K](depth, buckets) override def zero: AMS[K] = AMSZero[K](params) @@ -150,7 +144,7 @@ case class AMSInstances[A](countsTable: CountsTable[A], override def buckets: Int = params.bucket - // TODO + // TODO override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) override def ++(other: AMS[A]): AMS[A] = ??? @@ -164,10 +158,12 @@ case class AMSInstances[A](countsTable: CountsTable[A], val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(item) - val mult = AMSFunction.fourwise(params.randoms(2)(j), + val mult = AMSFunction.fourwise( + params.randoms(2)(j), params.randoms(3)(j), params.randoms(4)(j), - params.randoms(5)(j), hash) + params.randoms(5)(j), + hash) // TODO : To be changed. if ((mult & 1) == 1) countsTable + ((offset, hash), count) @@ -180,7 +176,24 @@ case class AMSInstances[A](countsTable: CountsTable[A], } override def frequency(item: A): Approximate[Long] = { - for (n <- 0 until params.depth) {} + var estimate = Array.emptyLongArray + var offset = 0 + for (j <- 1 until params.depth) { + val hash = params.hash(params.randoms.head(j - 1), params.randoms(1)(j - 1), buckets).apply(item) + val mult = AMSFunction.fourwise( + params.randoms(2)(j - 1), + params.randoms(3)(j - 1), + params.randoms(4)(j - 1), + params.randoms(5)(j - 1), + hash) + if ((mult & 1) == 1) estimate = estimate.+:(countsTable.getCount((offset, hash))) + else estimate = estimate.+:(-countsTable.getCount((offset, hash))) + + offset += 1 + } + if (params.depth == 1) return Approximate.exact(estimate.head) + else if (params.depth == 2) Approximate(estimate(0), (estimate(0) + estimate(1)) / 2, estimate(1), 0.5) + Approximate.exact(0L) } } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 8c28c4f29..48e860e31 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -22,10 +22,9 @@ class AMSSketchItemTest extends WordSpec with Matchers { "an AMSItem " should { "return an instance with other item" in { - val params = AMSParams[String]( width, buckets) + val params = AMSParams[String](width, buckets) val amsIt = AMSItem[String]("item-0", 1, params) val res = amsIt + ("item-1", 1) - assert(res.totalCount == 2) assert(res.isInstanceOf[AMSInstances[String]]) } @@ -38,7 +37,7 @@ class AMSSketchInstanceTest extends WordSpec with Matchers { "AMSSketch instance " should { "add item and update the count " in { - val params = AMSParams[String]( width, buckets) + val params = AMSParams[String](width, buckets) val aMSInstances = AMSInstances(params) val res = aMSInstances + ("item-2", 1) assert(res.totalCount == 1) From e09e83c4c93086217be5cf6c9a8c312a0be05906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Fri, 14 Sep 2018 15:20:35 +0200 Subject: [PATCH 07/17] Add Inerproduct + f2 moment + ++ operator in monoid --- .../com/twitter/algebird/AMSSketch.scala | 106 +++++++++++++----- .../com/twitter/algebird/AMSSketchTest.scala | 74 +++++++++++- 2 files changed, 145 insertions(+), 35 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index a20d3bfe7..926eff14e 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -36,10 +36,15 @@ object AMSFunction { val hash1 = CMSHash[Int](itemHashed, a, Int.MaxValue).apply(b) val hash2 = CMSHash[Int](hash1, itemHashed, Int.MaxValue).apply(c) val hash3 = CMSHash[Int](hash2, itemHashed, Int.MaxValue).apply(d) - hash3 } + // TODO : linear in average but ... not the best + def median(raw: Vector[Long]): Long = { + val (lower, upper) = raw.sortWith(_ < _).splitAt(raw.size / 2) + if (raw.size % 2 == 0) (lower.last + upper.head) / 2 else upper.head + } + def generateHash[K: CMSHasher](numHashes: Int, counters: Int): Seq[CMSHash[K]] = { @tailrec @@ -102,13 +107,15 @@ case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) override def buckets: Int = 0 - override def innerProduct(other: AMS[A]): Approximate[Long] = ??? + override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate.exact(0L) override def ++(other: AMS[A]): AMS[A] = other override def +(item: A, count: Long): AMS[A] = AMSItem(item, count, params) override def frequency(item: A): Approximate[Long] = Approximate.exact(0L) + + override def f2: Approximate[Long] = Approximate.exact(0L) } case class AMSItem[A](item: A, override val totalCount: Long, override val params: AMSParams[A]) @@ -118,7 +125,8 @@ case class AMSItem[A](item: A, override val totalCount: Long, override val param override def buckets: Int = params.bucket - override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) + override def innerProduct(other: AMS[A]): Approximate[Long] = + Approximate.exact(totalCount) * other.frequency(item) override def ++(other: AMS[A]): AMS[A] = other match { case other: AMSZero[A] => this @@ -133,6 +141,8 @@ case class AMSItem[A](item: A, override val totalCount: Long, override val param override def frequency(item: A): Approximate[Long] = if (this.item == item) Approximate.exact(1L) else Approximate.exact(0L) + + override def f2: Approximate[Long] = innerProduct(this) } case class AMSInstances[A](countsTable: CountsTable[A], @@ -144,40 +154,63 @@ case class AMSInstances[A](countsTable: CountsTable[A], override def buckets: Int = params.bucket - // TODO - override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate[Long](0, 0, 0, 0.1) + private def compatible(other: AMSInstances[A]): Boolean = + other.params.depth == depth && other.buckets == buckets && other.params.randoms == params.randoms - override def ++(other: AMS[A]): AMS[A] = ??? + override def innerProduct(other: AMS[A]): Approximate[Long] = other match { + case other: AMSInstances[A] => + require(compatible(other)) + def innerProductAt(rawId: Int): Long = + (0 until buckets).iterator.map { w => + countsTable.getCount((rawId, w)) * other.countsTable.getCount((rawId, w)) + }.sum - override def +(item: A, count: Long): AMS[A] = { - require(count >= 0, "cannot add negative count element to AMS Sketch") - if (count != 0L) { - var offset = 0 + val estimate = (0 until depth).map(innerProductAt) + if (depth == 1) Approximate.exact(estimate.head) + else if (depth == 2) Approximate.exact((estimate.head + estimate.last) / 2) + else Approximate(0, AMSFunction.median(estimate.toVector), totalCount * other.totalCount, 0.5) - for (j <- 0 until depth) { + case _ => other.innerProduct(this) - val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(item) + } - val mult = AMSFunction.fourwise( - params.randoms(2)(j), - params.randoms(3)(j), - params.randoms(4)(j), - params.randoms(5)(j), - hash) + override def ++(other: AMS[A]): AMS[A] = other match { + case other: AMSItem[A] => this + (other.item, other.totalCount) + case other: AMSZero[A] => this + case other: AMSInstances[A] => + require(other.params.randoms == params.randoms) + + // tcheck integrity here. + val newCountTable = other.countsTable ++ countsTable + val newTotalCount = other.totalCount + totalCount + + AMSInstances[A](newCountTable, params, newTotalCount) + } - // TODO : To be changed. - if ((mult & 1) == 1) countsTable + ((offset, hash), count) - else countsTable + ((offset, hash), -count) + override def +(item: A, count: Long): AMS[A] = { + require(count >= 0, "cannot add negative count element to AMS Sketch") - offset += 1 + if (count != 0L) { + val newCountsTable = (0 until depth).foldLeft(countsTable) { + case (table, j) => + val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(item) + val mult = AMSFunction.fourwise( + params.randoms(2)(j), + params.randoms(3)(j), + params.randoms(4)(j), + params.randoms(5)(j), + hash) + if ((mult & 1) == 1) table + ((j, hash), count) + else table + ((j, hash), -count) } - AMSInstances(countsTable, params, totalCount + count) + AMSInstances(newCountsTable, params, totalCount + count) } else this } override def frequency(item: A): Approximate[Long] = { - var estimate = Array.emptyLongArray + var estimate = Vector.empty[Long] var offset = 0 + for (j <- 1 until params.depth) { val hash = params.hash(params.randoms.head(j - 1), params.randoms(1)(j - 1), buckets).apply(item) val mult = AMSFunction.fourwise( @@ -191,26 +224,37 @@ case class AMSInstances[A](countsTable: CountsTable[A], offset += 1 } - if (params.depth == 1) return Approximate.exact(estimate.head) + if (params.depth == 1) Approximate.exact(estimate.head) else if (params.depth == 2) Approximate(estimate(0), (estimate(0) + estimate(1)) / 2, estimate(1), 0.5) + else { + Approximate(0, AMSFunction.median(estimate), Int.MaxValue, 1.0) + } + } + + override def f2: Approximate[Long] = { + + def f2At(idx: Int): Long = + (0 until buckets).iterator.map { bucketIndex => + countsTable.getCount((idx, bucketIndex)) * countsTable.getCount((idx, bucketIndex)) + }.sum + + val estimate = (1 until depth).map(f2At) + + if (depth == 1) Approximate.exact(estimate(0)) + else if (depth == 2) Approximate.exact((estimate(0) + estimate(1)) / 2) + else Approximate.exact(AMSFunction.median(estimate.toVector)) - Approximate.exact(0L) } } object AMSInstances { def apply[A](params: AMSParams[A]): AMSInstances[A] = { val countsTable = CountsTable[A](params.depth, params.bucket) - new AMSInstances[A](countsTable, params, 0) } } sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { - def depth: Int - def buckets: Int - - override val f2: Approximate[Long] = innerProduct(this) } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 48e860e31..008662d65 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -8,18 +8,15 @@ class AMSSketchFunction extends WordSpec with Matchers { "return random number " in { val randoms = AMSFunction.generateRandom(10) assert(randoms.size == 6) - + assert(randoms.forall(_.forall(_ >= 0))) } - } - } class AMSSketchItemTest extends WordSpec with Matchers { val width = 10 val buckets = 15 - "an AMSItem " should { "return an instance with other item" in { val params = AMSParams[String](width, buckets) @@ -40,9 +37,78 @@ class AMSSketchInstanceTest extends WordSpec with Matchers { val params = AMSParams[String](width, buckets) val aMSInstances = AMSInstances(params) val res = aMSInstances + ("item-2", 1) + assert(res.totalCount == 1) } + "determine frequency for one item " in { + val params = AMSParams[String](width, buckets) + val aMSInstances = AMSInstances(params) + val res = aMSInstances + ("item-2", 50) + assert(res.frequency("item-2").estimate == 50) + } + + "give a inner product between Two AMSinstances " in { + val params = AMSParams[String](width, buckets) + val aMSInstances1 = AMSInstances(params) + val aMSInstances2 = AMSInstances(params) + val res = aMSInstances1 + ("item-2", 50) + val res1 = aMSInstances2 + ("item-2", 50) + val inner = res.innerProduct(res1) + + assert(inner.estimate == 50 * 50) + } + + "give a inner product of itself " in { + val params = AMSParams[String](width, buckets) + val aMSInstances1 = AMSInstances(params) + val res = aMSInstances1 + ("item-2", 5) + assert(res.innerProduct(res).estimate == 5 * 5) + } + + "give a f2 moment of it " in { + val params = AMSParams[String](width, buckets) + val aMSInstances1 = AMSInstances(params) + val res = aMSInstances1 + ("item-2", 5) + val res1 = res + ("item-3", 67) + assert(res1.f2.estimate == 67 * 67 + 25) + } + + "give correct innerProduct with itself for several values " in { + + var ams: AMS[String] = null + + val data = Array( + Array(0, 45), + Array(3, 48), + Array(6, 51), + Array(9, 54), + Array(12, 57), + Array(15, 60), + Array(18, 63), + Array(21, 66), + Array(24, 69), + Array(27, 72), + Array(30, 75), + Array(33, 78), + Array(36, 81), + Array(39, 84), + Array(42, 87) + ) + + val params = AMSParams[String](width, buckets) + val aMSInstances1 = AMSInstances(params) + + data.foreach(d => { + if (ams == null) + ams = aMSInstances1 + (d(0).toString, d(1)) + else + ams = ams + (d(0).toString, d(1)) + }) + + assert(ams.innerProduct(ams).estimate > 0) + } + } } From fd7ac642572ca8c119ae82b2cc4ab6bd022846d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Fri, 14 Sep 2018 16:40:58 +0200 Subject: [PATCH 08/17] Add the sumOption method for monoid properties --- .../com/twitter/algebird/AMSSketch.scala | 156 +++++++++++++----- .../com/twitter/algebird/AMSSketchTest.scala | 19 +++ 2 files changed, 136 insertions(+), 39 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index 926eff14e..5c4c78be8 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -1,3 +1,19 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ + package com.twitter.algebird import cats.kernel.CommutativeMonoid @@ -6,14 +22,98 @@ import com.twitter.algebird.CMSInstance.CountsTable import scala.annotation.tailrec import scala.util.Random +case class AMSAggregator[K](amsMonoid: AMSMonoid[K]) extends MonoidAggregator[K, AMS[K], AMS[K]] { + val monoid = amsMonoid + + def prepare(value: K): AMS[K] = monoid.create(value) + + def present(cms: AMS[K]): AMS[K] = cms +} + /** - * AMS sketch : maintaining a array of counts with all element arriving. - * - * AMS is a matrix of d x t counters (d row of length t). - * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U - * - A other hash function gj maps element from U to {-1, +1} - * - * */ + * AMSMonoid for a better f2 moment vector result. and a better joint between two of them. + * + * */ +class AMSMonoid[K: CMSHasher](depth: Int, buckets: Int) + extends Monoid[AMS[K]] + with CommutativeMonoid[AMS[K]] { + val params: AMSParams[K] = AMSParams[K](depth, buckets) + + var count : Long = 0 + + override def zero: AMS[K] = AMSZero[K](params) + + override def plus(x: AMS[K], y: AMS[K]): AMS[K] = x ++ y + + override def sumOption(iter: TraversableOnce[AMS[K]]): Option[AMS[K]] = + if (iter.isEmpty)None + else { + var sets = 0 + + var countsTableMonoid = CountsTable[K](depth, buckets) + + @inline def updateCountsTable(pos : (Int, Int), count : Long) : CountsTable[K] = { + countsTableMonoid + (pos, count) + } + + var oneItem : AMSItem[K] = null + + @inline def addItem(it : AMSItem[K]) : CountsTable[K] = { + sets += 1 + oneItem = it + count += 1 + (0 until depth).foldLeft(countsTableMonoid) { + case (table, j) => + val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(it.item) + val mult = AMSFunction.fourwise( + params.randoms(2)(j), + params.randoms(3)(j), + params.randoms(4)(j), + params.randoms(5)(j), + hash) + if ((mult & 1) == 1) table + ((j, hash), it.totalCount) + else table + ((j, hash), -it.totalCount) + } + } + iter.foreach( { + case AMSZero( _) => () + case it@ AMSItem(_, _, _) => countsTableMonoid = addItem(it) + case AMSInstances(amsCountTable , _, totalCount) => + sets += 1 + amsCountTable.counts.zipWithIndex.foreach(counts => counts._1.zipWithIndex.foreach(c =>{ + count += c._1 + updateCountsTable((counts._2, c._2), c._1) + })) + }) + if (sets == 0) Some(zero) + else if (sets == 1) Some(oneItem) + else Some(AMSInstances[K](params, countsTableMonoid, count)) + } + + + /** + * Creates a sketch out of a single item. + */ + def create(item: K): AMS[K] = AMSItem[K](item, 1L, params) + + /** + * Creates a sketch out of multiple items. + */ + def create(data: Seq[K]): AMS[K] = { + sum(data.map(AMSItem(_, 1, params ))) + } + +} + + +/** + * AMS sketch : maintaining a array of counts with all element arriving. + * + * AMS is a matrix of d x t counters (d row of length t). + * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U + * - A other hash function gj maps element from U to {-1, +1} + * + * */ case class AMSParams[K: CMSHasher](depth: Int, bucket: Int) { require(depth > 0 && bucket > 0, "buckets and depth should be positive") @@ -33,10 +133,10 @@ object AMSFunction { CMSHash[K](a, b, width).apply(item) def fourwise(a: Int, b: Int, c: Int, d: Int, itemHashed: Int): Long = { - val hash1 = CMSHash[Int](itemHashed, a, Int.MaxValue).apply(b) - val hash2 = CMSHash[Int](hash1, itemHashed, Int.MaxValue).apply(c) - val hash3 = CMSHash[Int](hash2, itemHashed, Int.MaxValue).apply(d) - hash3 + var hash = CMSHash[Int](itemHashed, a, Int.MaxValue).apply(b) + hash = CMSHash[Int](hash, itemHashed, Int.MaxValue).apply(c) + hash = CMSHash[Int](hash, itemHashed, Int.MaxValue).apply(d) + hash } // TODO : linear in average but ... not the best @@ -54,12 +154,6 @@ object AMSFunction { } } -object AMSSketch { - def apply[A](buckets: Int, depth: Int): AMSSketch = new AMSSketch() -} - -class AMSSketch {} - trait AMSCounting[K, C[_]] { def +(item: K): C[K] = this + (item, 1L) @@ -79,27 +173,6 @@ trait AMSCounting[K, C[_]] { def totalCount: Long } -class AMSMonoid[K: CMSHasher](depth: Int, buckets: Int) - extends Monoid[AMS[K]] - with CommutativeMonoid[AMS[K]] { - val params = AMSParams[K](depth, buckets) - - override def zero: AMS[K] = AMSZero[K](params) - - override def plus(x: AMS[K], y: AMS[K]): AMS[K] = x ++ y - - /** - * Creates a sketch out of a single item. - */ - def create(item: K): AMS[K] = AMSItem[K](item, 1L, params) - - /** - * Creates a sketch out of multiple items. - */ - def create(data: Seq[K]): AMS[K] = ??? - -} - case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { override def depth: Int = 0 @@ -227,7 +300,7 @@ case class AMSInstances[A](countsTable: CountsTable[A], if (params.depth == 1) Approximate.exact(estimate.head) else if (params.depth == 2) Approximate(estimate(0), (estimate(0) + estimate(1)) / 2, estimate(1), 0.5) else { - Approximate(0, AMSFunction.median(estimate), Int.MaxValue, 1.0) + Approximate.exact(AMSFunction.median(estimate)) } } @@ -252,6 +325,11 @@ object AMSInstances { val countsTable = CountsTable[A](params.depth, params.bucket) new AMSInstances[A](countsTable, params, 0) } + + def apply[A](params: AMSParams[A], tables : CountsTable[A], count : Long): AMSInstances[A] = { + val countsTable = CountsTable[A](params.depth, params.bucket) + new AMSInstances[A](tables, params, count) + } } sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 008662d65..4c7f2b091 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -1,7 +1,26 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ package com.twitter.algebird import org.scalatest.{Matchers, WordSpec} +class AMSSketchMonoidTest + + + class AMSSketchFunction extends WordSpec with Matchers { " AMSFunction " should { From 4ecac768d8518699cc56dea82d9c57eed77c906c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Sun, 16 Sep 2018 11:43:33 +0200 Subject: [PATCH 09/17] AMSSketch is a monoid --- .../com/twitter/algebird/AMSSketch.scala | 134 ++++++++++-------- .../com/twitter/algebird/AMSSketchTest.scala | 99 ++++++++++++- 2 files changed, 171 insertions(+), 62 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index 5c4c78be8..876a6b4b1 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -31,95 +31,96 @@ case class AMSAggregator[K](amsMonoid: AMSMonoid[K]) extends MonoidAggregator[K, } /** - * AMSMonoid for a better f2 moment vector result. and a better joint between two of them. - * - * */ + * AMSMonoid for a better f2 moment vector result. and a better joint between two of them. + * + * */ class AMSMonoid[K: CMSHasher](depth: Int, buckets: Int) - extends Monoid[AMS[K]] + extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { val params: AMSParams[K] = AMSParams[K](depth, buckets) - var count : Long = 0 - override def zero: AMS[K] = AMSZero[K](params) override def plus(x: AMS[K], y: AMS[K]): AMS[K] = x ++ y override def sumOption(iter: TraversableOnce[AMS[K]]): Option[AMS[K]] = - if (iter.isEmpty)None + if (iter.isEmpty) None else { var sets = 0 - + var count = 0L var countsTableMonoid = CountsTable[K](depth, buckets) - @inline def updateCountsTable(pos : (Int, Int), count : Long) : CountsTable[K] = { + @inline def updateCountsTable(pos: (Int, Int), count: Long): CountsTable[K] = countsTableMonoid + (pos, count) - } - var oneItem : AMSItem[K] = null + var oneItem: AMSItem[K] = null - @inline def addItem(it : AMSItem[K]) : CountsTable[K] = { + @inline def addItem(it: AMSItem[K]): CountsTable[K] = { sets += 1 oneItem = it - count += 1 + count += it.totalCount (0 until depth).foldLeft(countsTableMonoid) { case (table, j) => - val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(it.item) + val hash = params + .hash(params.randoms.head(j), params.randoms(1)(j), buckets) + .apply(it.item) val mult = AMSFunction.fourwise( params.randoms(2)(j), params.randoms(3)(j), params.randoms(4)(j), params.randoms(5)(j), hash) - if ((mult & 1) == 1) table + ((j, hash), it.totalCount) - else table + ((j, hash), -it.totalCount) + if ((mult & 1) == 1) { + table + ((j, hash), it.totalCount) + } else table + ((j, hash), -it.totalCount) + + case _ => countsTableMonoid } } - iter.foreach( { - case AMSZero( _) => () - case it@ AMSItem(_, _, _) => countsTableMonoid = addItem(it) - case AMSInstances(amsCountTable , _, totalCount) => - sets += 1 - amsCountTable.counts.zipWithIndex.foreach(counts => counts._1.zipWithIndex.foreach(c =>{ - count += c._1 - updateCountsTable((counts._2, c._2), c._1) - })) + iter.foreach({ + case AMSZero(_) => () + case it @ AMSItem(_, _, _) => countsTableMonoid = addItem(it) + case AMSInstances(amsCountTable, _, totalCount) => + count += totalCount + amsCountTable.counts.zipWithIndex.foreach(counts => + counts._1.zipWithIndex.foreach(c => { + sets += 1 + countsTableMonoid = updateCountsTable((counts._2, c._2), c._1) + })) }) if (sets == 0) Some(zero) else if (sets == 1) Some(oneItem) else Some(AMSInstances[K](params, countsTableMonoid, count)) } - /** - * Creates a sketch out of a single item. - */ + * Creates a sketch out of a single item. + */ def create(item: K): AMS[K] = AMSItem[K](item, 1L, params) /** - * Creates a sketch out of multiple items. - */ - def create(data: Seq[K]): AMS[K] = { - sum(data.map(AMSItem(_, 1, params ))) - } + * Creates a sketch out of multiple items. + */ + def create(data: Seq[K]): AMS[K] = + sum(data.map(AMSItem(_, 1, params))) } - /** - * AMS sketch : maintaining a array of counts with all element arriving. - * - * AMS is a matrix of d x t counters (d row of length t). - * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U - * - A other hash function gj maps element from U to {-1, +1} - * - * */ + * AMS sketch : maintaining a array of counts with all element arriving. + * + * AMS is a matrix of d x t counters (d row of length t). + * - Each row j, a hash function hj(x) -> {1, ..., t} , x in U + * - A other hash function gj maps element from U to {-1, +1} + * + * */ case class AMSParams[K: CMSHasher](depth: Int, bucket: Int) { require(depth > 0 && bucket > 0, "buckets and depth should be positive") val randoms: Seq[Seq[Int]] = AMSFunction.generateRandom(depth) - def hash(a: Int, b: Int, width: Int = Int.MaxValue): CMSHash[K] = CMSHash[K](a, b, width) + def hash(a: Int, b: Int, width: Int = Int.MaxValue): CMSHash[K] = + CMSHash[K](a, b, width) } @@ -139,7 +140,7 @@ object AMSFunction { hash } - // TODO : linear in average but ... not the best + // TODO : linear in average but ... not the best, median select is good stuff def median(raw: Vector[Long]): Long = { val (lower, upper) = raw.sortWith(_ < _).splitAt(raw.size / 2) if (raw.size % 2 == 0) (lower.last + upper.head) / 2 else upper.head @@ -149,7 +150,9 @@ object AMSFunction { @tailrec def createHash(buffer: Seq[CMSHash[K]], idx: Int, seed: Int): Seq[CMSHash[K]] = - if (idx == 0) buffer else createHash(buffer :+ CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) + if (idx == 0) buffer + else + createHash(buffer :+ CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) createHash(Seq.empty[CMSHash[K]], numHashes, counters) } } @@ -173,6 +176,11 @@ trait AMSCounting[K, C[_]] { def totalCount: Long } +sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { + def depth: Int + def buckets: Int +} + case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { override def depth: Int = 0 @@ -180,7 +188,8 @@ case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) override def buckets: Int = 0 - override def innerProduct(other: AMS[A]): Approximate[Long] = Approximate.exact(0L) + override def innerProduct(other: AMS[A]): Approximate[Long] = + Approximate.exact(0L) override def ++(other: AMS[A]): AMS[A] = other @@ -204,13 +213,14 @@ case class AMSItem[A](item: A, override val totalCount: Long, override val param override def ++(other: AMS[A]): AMS[A] = other match { case other: AMSZero[A] => this - case other: AMSItem[A] => AMSInstances(params) + (item, totalCount) + (other.item, other.totalCount) + case other: AMSItem[A] => + AMSInstances[A](params) + (item, totalCount) + (other.item, other.totalCount) case other: AMSInstances[A] => other + (item, totalCount) } override def +(item: A, count: Long): AMS[A] = - AMSInstances(params) + (this.item, totalCount) + (item, count) + AMSInstances[A](params) + (this.item, totalCount) + (item, count) override def frequency(item: A): Approximate[Long] = if (this.item == item) Approximate.exact(1L) else Approximate.exact(0L) @@ -240,14 +250,17 @@ case class AMSInstances[A](countsTable: CountsTable[A], val estimate = (0 until depth).map(innerProductAt) if (depth == 1) Approximate.exact(estimate.head) - else if (depth == 2) Approximate.exact((estimate.head + estimate.last) / 2) - else Approximate(0, AMSFunction.median(estimate.toVector), totalCount * other.totalCount, 0.5) + else if (depth == 2) + Approximate.exact((estimate.head + estimate.last) / 2) + else + Approximate(0, AMSFunction.median(estimate.toVector), totalCount * other.totalCount, 0.5) case _ => other.innerProduct(this) } override def ++(other: AMS[A]): AMS[A] = other match { + case other: AMSItem[A] => this + (other.item, other.totalCount) case other: AMSZero[A] => this case other: AMSInstances[A] => @@ -256,17 +269,17 @@ case class AMSInstances[A](countsTable: CountsTable[A], // tcheck integrity here. val newCountTable = other.countsTable ++ countsTable val newTotalCount = other.totalCount + totalCount - AMSInstances[A](newCountTable, params, newTotalCount) } override def +(item: A, count: Long): AMS[A] = { require(count >= 0, "cannot add negative count element to AMS Sketch") - if (count != 0L) { val newCountsTable = (0 until depth).foldLeft(countsTable) { case (table, j) => - val hash = params.hash(params.randoms.head(j), params.randoms(1)(j), buckets).apply(item) + val hash = params + .hash(params.randoms.head(j), params.randoms(1)(j), buckets) + .apply(item) val mult = AMSFunction.fourwise( params.randoms(2)(j), params.randoms(3)(j), @@ -285,20 +298,24 @@ case class AMSInstances[A](countsTable: CountsTable[A], var offset = 0 for (j <- 1 until params.depth) { - val hash = params.hash(params.randoms.head(j - 1), params.randoms(1)(j - 1), buckets).apply(item) + val hash = params + .hash(params.randoms.head(j - 1), params.randoms(1)(j - 1), buckets) + .apply(item) val mult = AMSFunction.fourwise( params.randoms(2)(j - 1), params.randoms(3)(j - 1), params.randoms(4)(j - 1), params.randoms(5)(j - 1), hash) - if ((mult & 1) == 1) estimate = estimate.+:(countsTable.getCount((offset, hash))) + if ((mult & 1) == 1) + estimate = estimate.+:(countsTable.getCount((offset, hash))) else estimate = estimate.+:(-countsTable.getCount((offset, hash))) offset += 1 } if (params.depth == 1) Approximate.exact(estimate.head) - else if (params.depth == 2) Approximate(estimate(0), (estimate(0) + estimate(1)) / 2, estimate(1), 0.5) + else if (params.depth == 2) + Approximate(estimate(0), (estimate(0) + estimate(1)) / 2, estimate(1), 0.5) else { Approximate.exact(AMSFunction.median(estimate)) } @@ -326,13 +343,8 @@ object AMSInstances { new AMSInstances[A](countsTable, params, 0) } - def apply[A](params: AMSParams[A], tables : CountsTable[A], count : Long): AMSInstances[A] = { + def apply[A](params: AMSParams[A], tables: CountsTable[A], count: Long): AMSInstances[A] = { val countsTable = CountsTable[A](params.depth, params.bucket) new AMSInstances[A](tables, params, count) } } - -sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { - def depth: Int - def buckets: Int -} diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 4c7f2b091..c14262b75 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -15,11 +15,98 @@ limitations under the License. */ package com.twitter.algebird +import com.twitter.algebird.CMSInstance.CountsTable +import org.scalacheck.Prop.forAll +import org.scalacheck.{Arbitrary, Gen} import org.scalatest.{Matchers, WordSpec} -class AMSSketchMonoidTest +object AMSTestUtils { + def toInstances[A](ams: AMS[A]): AMSInstances[A] = ams match { + case AMSZero(params) => AMSInstances(params) + case AMSItem(it, count, params) => + (AMSInstances.apply(params) + it).asInstanceOf[AMSInstances[A]] + case instance @ AMSInstances(_, _, _) => instance + } +} + +class AMSSketchMonoidTest extends CheckProperties { + + import BaseProperties._ + + val depht = 2 + val buckets = 2 + + implicit val amsMonoid: AMSMonoid[String] = + new AMSMonoid[String](depht, buckets) + + implicit val amsGen = Arbitrary { + val item = Gen.choose(1, 1000).map { v => + amsMonoid.create(v.toString) + } + val dense = Gen.listOf(item).map { it => + AMSTestUtils.toInstances[String](amsMonoid.sum(it)) + } + val zero = Gen.const(amsMonoid.zero) + Gen.frequency((4, item), (1, zero), (1, dense)) + } + + implicit def amsEquiv[K]: Equiv[AMS[K]] = + new Equiv[AMS[K]] { + def equiv(x: AMS[K], y: AMS[K]): Boolean = { + val r = x == y + r + } + } + property("AMSSKetch is a monoid ") { + commutativeMonoidLaws[AMS[String]] + } + + property("++ is the same as plus") { + forAll { (a: AMS[String], b: AMS[String]) => + Equiv[AMS[String]].equiv(a ++ b, amsMonoid.plus(a, b)) + } + } + +} + +class AMSMonoidSimpleProperties extends WordSpec with Matchers { + val semi: AMSMonoid[String] = new AMSMonoid[String](2, 2) + "an amsMonoid simple properties checker " should { + "check simple associative equivalency mixing AMSItem and AMSInstance" in { + val a = + new AMSInstances[String](CountsTable(Vector(Vector(-8L, -2L), Vector(-3L, -7L))), semi.params, 10) + val b = AMSItem[String]("907", 1, semi.params) + val c = AMSItem[String]("868", 1, semi.params) + + assert(semi.plus(semi.plus(a, b), c) == semi.plus(a, semi.plus(b, c))) + } + + "check simple sumpropertieswork for semigroups between two AMSItem " in { + val head = AMSItem[String]("739", 1, semi.params) + val tail = List[AMS[String]](AMSItem[String]("437", 1, semi.params)) + + val sumOPT = semi.sumOption(head :: tail).get + val plus = head ++ tail.head + + assert(sumOPT == plus) + } + + "check simple sumpropertieswork for semigroups between AMSItem and AMSInstance " in { + val head = AMSItem[String]("591", 1, semi.params) + val tail = List[AMS[String]]( + new AMSInstances[String](CountsTable(Vector(Vector(-2, -3), Vector(-3, -2))), semi.params, 5)) + + val sumOPT = semi.sumOption(head :: tail).get + val plus = head ++ tail.head + + assert(sumOPT == plus) + } + + } + +} class AMSSketchFunction extends WordSpec with Matchers { @@ -44,6 +131,16 @@ class AMSSketchItemTest extends WordSpec with Matchers { assert(res.totalCount == 2) assert(res.isInstanceOf[AMSInstances[String]]) } + + "return instance with exact count " in { + val params = AMSParams[String](width, buckets) + val amsIt = AMSItem[String]("item-0", 14, params) + var res = amsIt + ("item-1", 1) + println("count ==" + res.totalCount) + res = res + ("item-1", 10) + assert(res.totalCount == 25) + assert(res.isInstanceOf[AMSInstances[String]]) + } } } From f8adc4133101ea75ace174cf82f8708598cac6e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Sun, 16 Sep 2018 13:30:40 +0200 Subject: [PATCH 10/17] Add aggregator --- .../com/twitter/algebird/AMSSketch.scala | 8 ++++++- .../com/twitter/algebird/AMSSketchTest.scala | 24 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index 876a6b4b1..065680c5f 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -23,13 +23,19 @@ import scala.annotation.tailrec import scala.util.Random case class AMSAggregator[K](amsMonoid: AMSMonoid[K]) extends MonoidAggregator[K, AMS[K], AMS[K]] { - val monoid = amsMonoid + val monoid: AMSMonoid[K] = amsMonoid def prepare(value: K): AMS[K] = monoid.create(value) def present(cms: AMS[K]): AMS[K] = cms } +object AMSAggregator { + + def apply[K: CMSHasher](depth: Int, buckets: Int): AMSAggregator[K] = + AMSAggregator[K](new AMSMonoid[K](depth, buckets)) +} + /** * AMSMonoid for a better f2 moment vector result. and a better joint between two of them. * diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index c14262b75..af31aed51 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -20,6 +20,8 @@ import org.scalacheck.Prop.forAll import org.scalacheck.{Arbitrary, Gen} import org.scalatest.{Matchers, WordSpec} +import scala.util.Random + object AMSTestUtils { def toInstances[A](ams: AMS[A]): AMSInstances[A] = ams match { @@ -226,5 +228,27 @@ class AMSSketchInstanceTest extends WordSpec with Matchers { } } +} + +class AMSSketchAggregatorTest extends WordSpec with Matchers { + "An AMSSketchMonoid works as an aggregator " should { + "with random value " in { + + (0 to 10).foreach { _ => + { + val aggregator = AMSAggregator[String](10, 10) + val numEntries = 5 + val entries = (0 until numEntries).map(_ => Random.nextInt.toString) + val bf = aggregator(entries) + + entries.foreach { i => + assert(bf.frequency(i.toString).estimate > 0) + } + } + } + + } + + } } From 9e5104f160da2985e9c8d81bea1d7dabf782d5c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Sun, 16 Sep 2018 14:35:53 +0200 Subject: [PATCH 11/17] Add little doc --- .../com/twitter/algebird/AMSSketch.scala | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala index 065680c5f..f55cb47cd 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/AMSSketch.scala @@ -22,6 +22,9 @@ import com.twitter.algebird.CMSInstance.CountsTable import scala.annotation.tailrec import scala.util.Random +/** + * Aggregagtor for AMS + * */ case class AMSAggregator[K](amsMonoid: AMSMonoid[K]) extends MonoidAggregator[K, AMS[K], AMS[K]] { val monoid: AMSMonoid[K] = amsMonoid @@ -38,8 +41,25 @@ object AMSAggregator { /** * AMSMonoid for a better f2 moment vector result. and a better joint between two of them. + * reference : http://dimacs.rutgers.edu/%7Egraham/pubs/papers/encalgs-ams.pdf + * and https://www.cs.rutgers.edu/~muthu/ams.c * - * */ + * ==Join size estimation== + * + * https://people.cs.umass.edu/%7Emcgregor/711S12/sketches1.pdf p. 26 + * part : Comparing AMS and Count-Min sketches for join size estimation. + * + * what's join size ? + * used to answer to something like : + * SELECT COUNT(*) FROM F, F’ + * WHERE F.id = F’.id + * + * Count Min Sketch : + * f ·f' =F2 with error N^2 / depth + * + * AMS : + * f .f' = SQRT( F2(f) F2(f') / depth ) + **/ class AMSMonoid[K: CMSHasher](depth: Int, buckets: Int) extends Monoid[AMS[K]] with CommutativeMonoid[AMS[K]] { @@ -139,6 +159,11 @@ object AMSFunction { def hashValue[K: CMSHasher](item: K, a: Int, b: Int, width: Int = Int.MaxValue): Int = CMSHash[K](a, b, width).apply(item) + /** + * To ensure the random element is really random and "pure" see : + * https://lucatrevisan.wordpress.com/2009/11/12/the-large-deviation-of-fourwise-independent-random-variables/ + * to more details. + * */ def fourwise(a: Int, b: Int, c: Int, d: Int, itemHashed: Int): Long = { var hash = CMSHash[Int](itemHashed, a, Int.MaxValue).apply(b) hash = CMSHash[Int](hash, itemHashed, Int.MaxValue).apply(c) @@ -146,7 +171,7 @@ object AMSFunction { hash } - // TODO : linear in average but ... not the best, median select is good stuff + // TODO : linear in average but ... not the best, median select is better def median(raw: Vector[Long]): Long = { val (lower, upper) = raw.sortWith(_ < _).splitAt(raw.size / 2) if (raw.size % 2 == 0) (lower.last + upper.head) / 2 else upper.head @@ -157,12 +182,14 @@ object AMSFunction { @tailrec def createHash(buffer: Seq[CMSHash[K]], idx: Int, seed: Int): Seq[CMSHash[K]] = if (idx == 0) buffer - else - createHash(buffer :+ CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) + else createHash(buffer :+ CMSHash[K](Random.nextInt(), 0, counters), idx - 1, seed) createHash(Seq.empty[CMSHash[K]], numHashes, counters) } } +/** + * All the method needed for user to take manipulate AMS : metrics and operators. + * */ trait AMSCounting[K, C[_]] { def +(item: K): C[K] = this + (item, 1L) @@ -182,11 +209,17 @@ trait AMSCounting[K, C[_]] { def totalCount: Long } +/** + * The abstract trait for AMS + * */ sealed abstract class AMS[A](val params: AMSParams[A]) extends AMSCounting[A, AMS] { def depth: Int def buckets: Int } +/** + * The AMSZero element + * */ case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) { override def depth: Int = 0 @@ -206,6 +239,9 @@ case class AMSZero[A](override val params: AMSParams[A]) extends AMS[A](params) override def f2: Approximate[Long] = Approximate.exact(0L) } +/** + * An AMS with just one item + * */ case class AMSItem[A](item: A, override val totalCount: Long, override val params: AMSParams[A]) extends AMS[A](params) { @@ -234,6 +270,9 @@ case class AMSItem[A](item: A, override val totalCount: Long, override val param override def f2: Approximate[Long] = innerProduct(this) } +/** + * The Instances AMS algorithm with several values inside. + * */ case class AMSInstances[A](countsTable: CountsTable[A], override val params: AMSParams[A], override val totalCount: Long) @@ -299,6 +338,11 @@ case class AMSInstances[A](countsTable: CountsTable[A], } else this } + /** + * Determine approximative count of a value. + * There's not enough doc on it + * TODO : Create proper approximation. + * */ override def frequency(item: A): Approximate[Long] = { var estimate = Vector.empty[Long] var offset = 0 @@ -327,6 +371,9 @@ case class AMSInstances[A](countsTable: CountsTable[A], } } + /** + * This is much easier and faster than the count min sketch algorithm. + * */ override def f2: Approximate[Long] = { def f2At(idx: Int): Long = @@ -349,8 +396,6 @@ object AMSInstances { new AMSInstances[A](countsTable, params, 0) } - def apply[A](params: AMSParams[A], tables: CountsTable[A], count: Long): AMSInstances[A] = { - val countsTable = CountsTable[A](params.depth, params.bucket) + def apply[A](params: AMSParams[A], tables: CountsTable[A], count: Long): AMSInstances[A] = new AMSInstances[A](tables, params, count) - } } From 7fe5aca68083e90f815b18fbc303a8534208d351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Sun, 16 Sep 2018 22:09:09 +0200 Subject: [PATCH 12/17] Add benchmark for jointure between AMS and CMS for result --- .../algebird/benchmark/AMSJoinBenchmark.scala | 52 ++++++++++++++++++ .../algebird/benchmark/CMSJoinBenchmark.scala | 55 +++++++++++++++++++ 2 files changed, 107 insertions(+) create mode 100644 algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala create mode 100644 algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala new file mode 100644 index 000000000..a231e0164 --- /dev/null +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala @@ -0,0 +1,52 @@ +package com.twitter.algebird +package benchmark +import org.openjdk.jmh.annotations._ + +import scala.util.Random.nextString + +/** + * [info] Benchmark (bucket) (depth) (size) Mode Cnt Score Error Units + * [info] AMSJoinBenchmark.amsJoinBenchmarkString 27 16 1000 thrpt 3 55539,549 ± 44815,541 ops/s + * [info] AMSJoinBenchmark.amsJoinBenchmarkString 543 16 1000 thrpt 3 3036,712 ± 2487,763 ops/s + * + * [info] Benchmark (delta) (eps) (size) Mode Cnt Score Error Units + * [info] CMSJoinBenchmark.amsJoinBenchmarkString 0.0000001 0.1 1000 thrpt 3 37416,918 ± 4578,109 ops/s + * [info] CMSJoinBenchmark.amsJoinBenchmarkString 0.0000001 0.005 1000 thrpt 3 3194,029 ± 335,681 ops/s + */ +object AMSJoinBenchmark { + + @State(Scope.Benchmark) + class AMSJoinState { + + @Param(Array("27", "543")) + var bucket = 0 + + @Param(Array("16")) + var depth = 0 + + @Param(Array("1000")) + var size: Int = 0 + + var amsMonoidString: AMSMonoid[String] = _ + + var amsSketch1: AMS[String] = _ + var amsSketch2: AMS[String] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + val largeStringsSample1 = (1 to size).map(i => nextString(10)).toVector + val largeStringsSample2 = (1 to size).map(i => nextString(10)).toVector + + amsMonoidString = new AMSMonoid[String](depth, bucket) + amsSketch1 = amsMonoidString.create(largeStringsSample1) + amsSketch2 = amsMonoidString.create(largeStringsSample2) + } + } +} +class AMSJoinBenchmark { + import AMSJoinBenchmark._ + + @Benchmark + def amsJoinBenchmarkString(amsS: AMSJoinState): Unit = + amsS.amsSketch1.innerProduct(amsS.amsSketch2) +} diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala new file mode 100644 index 000000000..dc013317b --- /dev/null +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala @@ -0,0 +1,55 @@ +package com.twitter.algebird +package benchmark + +import org.openjdk.jmh.annotations._ + +import scala.util.Random.nextString + +/** + * [info] Benchmark (bucket) (depth) (size) Mode Cnt Score Error Units + * [info] AMSJoinBenchmark.amsJoinBenchmarkString 18 10 1000 thrpt 3 148071,642 ± 98359,117 ops/s + * [info] AMSJoinBenchmark.amsJoinBenchmarkString 18 200 1000 thrpt 3 5926,345 ± 332,592 ops/s + * + * */ +object CMSJoinBenchmark { + + @State(Scope.Benchmark) + class AMSJoinState { + + val Seed: Int = 1 + val MaxBits: Int = 2048 + + @Param(Array("0.1", "0.005")) + var eps: Double = 0.0 + + @Param(Array("0.0000001")) // 1e-8 + var delta: Double = 0.0 + + // number of data values to combine into a CMS + @Param(Array("1000")) + var size: Int = 0 + + var stringMonoid: CMSMonoid[String] = _ + + var cmsSketch1: CMS[String] = _ + var cmsSketch2: CMS[String] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + val largeStringsSample1 = (1 to size).map(i => nextString(10)).toVector + val largeStringsSample2 = (1 to size).map(i => nextString(10)).toVector + + stringMonoid = CMS.monoid[String](eps, delta, Seed) + + cmsSketch1 = stringMonoid.create(largeStringsSample1) + cmsSketch2 = stringMonoid.create(largeStringsSample2) + } + } +} +class CMSJoinBenchmark { + import CMSJoinBenchmark._ + + @Benchmark + def amsJoinBenchmarkString(amsS: AMSJoinState): Unit = + amsS.cmsSketch1.innerProduct(amsS.cmsSketch2) +} From 250b364cfb6508b257751859632ba1636eaf957d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Sun, 16 Sep 2018 22:24:07 +0200 Subject: [PATCH 13/17] Add description benchmark --- .../algebird/benchmark/AMSJoinBenchmark.scala | 13 ++++--------- .../algebird/benchmark/CMSJoinBenchmark.scala | 7 ++----- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala index a231e0164..125b627c8 100644 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala @@ -5,20 +5,15 @@ import org.openjdk.jmh.annotations._ import scala.util.Random.nextString /** - * [info] Benchmark (bucket) (depth) (size) Mode Cnt Score Error Units - * [info] AMSJoinBenchmark.amsJoinBenchmarkString 27 16 1000 thrpt 3 55539,549 ± 44815,541 ops/s - * [info] AMSJoinBenchmark.amsJoinBenchmarkString 543 16 1000 thrpt 3 3036,712 ± 2487,763 ops/s - * - * [info] Benchmark (delta) (eps) (size) Mode Cnt Score Error Units - * [info] CMSJoinBenchmark.amsJoinBenchmarkString 0.0000001 0.1 1000 thrpt 3 37416,918 ± 4578,109 ops/s - * [info] CMSJoinBenchmark.amsJoinBenchmarkString 0.0000001 0.005 1000 thrpt 3 3194,029 ± 335,681 ops/s - */ + * AMS is interesting to compute inner join between two of them. + * + * */ object AMSJoinBenchmark { @State(Scope.Benchmark) class AMSJoinState { - @Param(Array("27", "543")) + @Param(Array("27", "543", "5438")) var bucket = 0 @Param(Array("16")) diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala index dc013317b..7a1ba57ca 100644 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala @@ -6,10 +6,7 @@ import org.openjdk.jmh.annotations._ import scala.util.Random.nextString /** - * [info] Benchmark (bucket) (depth) (size) Mode Cnt Score Error Units - * [info] AMSJoinBenchmark.amsJoinBenchmarkString 18 10 1000 thrpt 3 148071,642 ± 98359,117 ops/s - * [info] AMSJoinBenchmark.amsJoinBenchmarkString 18 200 1000 thrpt 3 5926,345 ± 332,592 ops/s - * + * CMSjoin is made to compare CMS and AMS on inner product between two of them. * */ object CMSJoinBenchmark { @@ -19,7 +16,7 @@ object CMSJoinBenchmark { val Seed: Int = 1 val MaxBits: Int = 2048 - @Param(Array("0.1", "0.005")) + @Param(Array("0.1", "0.005", "0.0005")) var eps: Double = 0.0 @Param(Array("0.0000001")) // 1e-8 From a3736e2ad67035c2a1344cf00c9a57eddb2ac978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Mon, 17 Sep 2018 11:36:03 +0200 Subject: [PATCH 14/17] Compare inner product error between AMS and CMS --- .../algebird/benchmark/AMSJoinBenchmark.scala | 6 ++-- .../algebird/benchmark/CMSJoinBenchmark.scala | 2 +- .../com/twitter/algebird/AMSSketchTest.scala | 34 ++++++++++++++++++- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala index 125b627c8..b9f7942d5 100644 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala @@ -5,9 +5,9 @@ import org.openjdk.jmh.annotations._ import scala.util.Random.nextString /** - * AMS is interesting to compute inner join between two of them. - * - * */ + * AMS is interesting to compute inner join between two of them. + * + * */ object AMSJoinBenchmark { @State(Scope.Benchmark) diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala index 7a1ba57ca..2b4d17c40 100644 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/CMSJoinBenchmark.scala @@ -6,7 +6,7 @@ import org.openjdk.jmh.annotations._ import scala.util.Random.nextString /** - * CMSjoin is made to compare CMS and AMS on inner product between two of them. + * CMSjoin is made to compare CMS and AMS on inner product between two of them. * */ object CMSJoinBenchmark { diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index af31aed51..927b6964d 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -138,7 +138,6 @@ class AMSSketchItemTest extends WordSpec with Matchers { val params = AMSParams[String](width, buckets) val amsIt = AMSItem[String]("item-0", 14, params) var res = amsIt + ("item-1", 1) - println("count ==" + res.totalCount) res = res + ("item-1", 10) assert(res.totalCount == 25) assert(res.isInstanceOf[AMSInstances[String]]) @@ -248,6 +247,39 @@ class AMSSketchAggregatorTest extends WordSpec with Matchers { } } + } +} + +class AMSSketchInnerJoinCMS extends WordSpec with Matchers { + val numEntries = 10000 + + def innerProduct(arr1: Vector[Int], arr2: Vector[Int]): Long = + arr1.zip(arr2).map(p => p._1 * p._2) sum + + val amsMonoid = new AMSMonoid[Int](16, 543) + val cmsMonoid = CMS.monoid[Int](16, 543, 1) + + " an AMSSketch " should { + " have a better approximation of it inner product than CMS " in { + val entries = (0 until 100).map(_ => Random.nextInt(100)) + val entriesCompare = (0 until 100).map(_ => Random.nextInt(100).abs) + + val trueInner = innerProduct(entries.toVector, entriesCompare.toVector) + val ams = amsMonoid.create(entries) + val cms = cmsMonoid.create(entries) + + val amsCompare = amsMonoid.create(entriesCompare) + val cmsCompare = cmsMonoid.create(entriesCompare) + + val innerAMS = ams.innerProduct(amsCompare) + val innerCMS = cms.innerProduct(cmsCompare) + + val errorAMS = trueInner - innerAMS.estimate + val errorCMS = trueInner - innerCMS.estimate + + assert(errorAMS.abs <= errorCMS.abs) + + } } From 7c70a409d835c1b7a2a3fd47da12d34b32540e32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Mon, 17 Sep 2018 13:58:00 +0200 Subject: [PATCH 15/17] add test for example of use --- .../scala/com/twitter/algebird/AMSSketchTest.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index 927b6964d..a0d3c0a2e 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -69,6 +69,18 @@ class AMSSketchMonoidTest extends CheckProperties { Equiv[AMS[String]].equiv(a ++ b, amsMonoid.plus(a, b)) } } +} + +class AMSMonoidUse extends WordSpec with Matchers { + + "An AMSSketchMonoid " should { + "be used like an algebird monoid " in { + val aMSMonoid = new AMSMonoid[String](100, 100) + val sketch = aMSMonoid.create(Seq("aline", "aline", "aline")) + assert(sketch.f2 ~ 9 ) + } + + } } @@ -105,7 +117,6 @@ class AMSMonoidSimpleProperties extends WordSpec with Matchers { assert(sumOPT == plus) } - } } From 9027fd3ece9832f0b1854b9b82a8554e05591426 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Mon, 17 Sep 2018 15:01:32 +0200 Subject: [PATCH 16/17] little scalafmt on algebird-test --- .../src/test/scala/com/twitter/algebird/AMSSketchTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala index a0d3c0a2e..6deec0ebc 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AMSSketchTest.scala @@ -77,7 +77,7 @@ class AMSMonoidUse extends WordSpec with Matchers { "be used like an algebird monoid " in { val aMSMonoid = new AMSMonoid[String](100, 100) val sketch = aMSMonoid.create(Seq("aline", "aline", "aline")) - assert(sketch.f2 ~ 9 ) + assert(sketch.f2 ~ 9) } } From e3d6565104ceea26e800d5060c302346a802e0ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9sar=20Coll=C3=A9?= Date: Mon, 17 Sep 2018 16:41:00 +0200 Subject: [PATCH 17/17] Add benchmark for AMS --- .../algebird/benchmark/AMSJoinBenchmark.scala | 47 ------------------- 1 file changed, 47 deletions(-) delete mode 100644 algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala deleted file mode 100644 index b9f7942d5..000000000 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AMSJoinBenchmark.scala +++ /dev/null @@ -1,47 +0,0 @@ -package com.twitter.algebird -package benchmark -import org.openjdk.jmh.annotations._ - -import scala.util.Random.nextString - -/** - * AMS is interesting to compute inner join between two of them. - * - * */ -object AMSJoinBenchmark { - - @State(Scope.Benchmark) - class AMSJoinState { - - @Param(Array("27", "543", "5438")) - var bucket = 0 - - @Param(Array("16")) - var depth = 0 - - @Param(Array("1000")) - var size: Int = 0 - - var amsMonoidString: AMSMonoid[String] = _ - - var amsSketch1: AMS[String] = _ - var amsSketch2: AMS[String] = _ - - @Setup(Level.Trial) - def setup(): Unit = { - val largeStringsSample1 = (1 to size).map(i => nextString(10)).toVector - val largeStringsSample2 = (1 to size).map(i => nextString(10)).toVector - - amsMonoidString = new AMSMonoid[String](depth, bucket) - amsSketch1 = amsMonoidString.create(largeStringsSample1) - amsSketch2 = amsMonoidString.create(largeStringsSample2) - } - } -} -class AMSJoinBenchmark { - import AMSJoinBenchmark._ - - @Benchmark - def amsJoinBenchmarkString(amsS: AMSJoinState): Unit = - amsS.amsSketch1.innerProduct(amsS.amsSketch2) -}