From 17054df237e041c5976842d39653aadc8b0726b3 Mon Sep 17 00:00:00 2001 From: ibenian Date: Sat, 24 Feb 2018 19:23:46 -0500 Subject: [PATCH] Lazy minhash implementation for scalability. --- .../com/twitter/algebird/LazyMinHasher.scala | 60 +++++++++++++++++++ .../com/twitter/algebird/MinHasherTest.scala | 36 ++++++++++- 2 files changed, 93 insertions(+), 3 deletions(-) create mode 100644 algebird-core/src/main/scala/com/twitter/algebird/LazyMinHasher.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/LazyMinHasher.scala b/algebird-core/src/main/scala/com/twitter/algebird/LazyMinHasher.scala new file mode 100644 index 000000000..3d05be42f --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/LazyMinHasher.scala @@ -0,0 +1,60 @@ +package com.twitter.algebird + +import java.nio._ + +/** + * Base class minhash signatures; LazyMinHashSignature and MinHashSignature. + * The lazy minhash signature is used to store the values until the actual minhash signature is needed at + * aggregation time. This is to avoid GC overload the early by avoiding early allocation of many large + * byte arrays. Instead, only the values are stored using LazyMinHashWithValue, and we allocate the + * buffer and pass it to LazyMinHashWithBytes. + */ +abstract class LazyMinHash extends java.io.Serializable +case class LazyMinHashWithValue(value: String) extends LazyMinHash +case class LazyMinHashWithBytes(bytes: Array[Byte]) extends LazyMinHash + +/** + * This is used to make minhash algorithm more scalable by deferring per-element minhash signature generation + * to aggregation time. The minhash byte array is created only when LazyMinHasher.plus() + * is called and immediately thrown away after aggregation. This reduces the burden on GC as well as + * reducing the memory footprint significantly. + * Since lazy minhashes are wrappers, we also made the lazy minhaser a wrapper of any minhasher type. + */ +class LazyMinHasher[H](minHasher: MinHasher[H])(implicit n: Numeric[H]) + extends Monoid[LazyMinHash] { + /** Create a minhash signature for a single String value */ + def init(value: String): LazyMinHash = LazyMinHashWithValue(value) + /** Signature for empty set, needed to be a proper Monoid */ + val zero: LazyMinHash = LazyMinHashWithBytes(minHasher.zero.bytes) + + /** Set union */ + override def plus(l: LazyMinHash, r: LazyMinHash): LazyMinHash = { + (l, r) match { + case (LazyMinHashWithBytes(l), LazyMinHashWithValue(r)) => { + LazyMinHashWithBytes(minHasher.plus(MinHashSignature(l), minHasher.init(r)).bytes) + } + case (LazyMinHashWithValue(l), LazyMinHashWithBytes(r)) => { + LazyMinHashWithBytes(minHasher.plus(minHasher.init(l), MinHashSignature(r)).bytes) + } + case (LazyMinHashWithBytes(l), LazyMinHashWithBytes(r)) => { + LazyMinHashWithBytes(minHasher.plus(MinHashSignature(l), MinHashSignature(r)).bytes) + } + case (LazyMinHashWithValue(l), LazyMinHashWithValue(r)) => { + LazyMinHashWithBytes(minHasher.plus(minHasher.init(l), minHasher.init(r)).bytes) + } + case _ => throw new Exception(s"Unhandled term types in plus operation for LazyMinHasher") + } + } + + def toMinHash(lazySig: LazyMinHash): MinHashSignature = { + lazySig match { + case LazyMinHashWithValue(k) => minHasher.init(k) + case LazyMinHashWithBytes(b) => MinHashSignature(b) + case _ => throw new Exception(s"Unhandled LazyMinHash type ${lazySig.getClass.getName}") + } + } + + /** Esimate Jaccard similarity (size of union / size of intersection) */ + def similarity(left: LazyMinHash, right: LazyMinHash): Double = + minHasher.similarity(toMinHash(left), toMinHash(right)) +} diff --git a/algebird-test/src/test/scala/com/twitter/algebird/MinHasherTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/MinHasherTest.scala index b4c309180..956376639 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/MinHasherTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/MinHasherTest.scala @@ -31,6 +31,15 @@ class MinHasherSpec extends WordSpec with Matchers { assert(error < epsilon) } + def testLazyMinHasher[H](mh: LazyMinHasher[H], similarity: Double, epsilon: Double) = { + val (set1, set2) = randomSets(similarity) + + val exact = exactSimilarity(set1, set2) + val sim = lazyApproxSimilarity(mh, set1, set2) + val error: Double = math.abs(exact - sim) + assert(error < epsilon) + } + def randomSets(similarity: Double) = { val s = 10000 val uniqueFraction = if (similarity == 1.0) 0.0 else (1 - similarity) / (1 + similarity) @@ -52,15 +61,36 @@ class MinHasherSpec extends WordSpec with Matchers { mh.similarity(sig1, sig2) } + def lazyApproxSimilarity[T, H](mh: LazyMinHasher[H], x: Set[T], y: Set[T]) = { + val sig1 = x.map{ l => mh.init(l.toString) }.reduce{ (a, b) => mh.plus(a, b) } + val sig2 = y.map{ l => mh.init(l.toString) }.reduce{ (a, b) => mh.plus(a, b) } + mh.similarity(sig1, sig2) + } + + val bands: Int = 1024 + "MinHasher32" should { "measure 0.5 similarity in 1024 bytes with < 0.1 error" in { - test(new MinHasher32(0.5, 1024), 0.5, 0.1) + test(new MinHasher32(0.5, bands), 0.5, 0.1) } "measure 0.8 similarity in 1024 bytes with < 0.1 error" in { - test(new MinHasher32(0.8, 1024), 0.8, 0.1) + test(new MinHasher32(0.8, bands), 0.8, 0.1) } "measure 1.0 similarity in 1024 bytes with < 0.01 error" in { - test(new MinHasher32(1.0, 1024), 1.0, 0.01) + test(new MinHasher32(1.0, bands), 1.0, 0.01) } } + + "LazyMinHasher" should { + "measure 0.5 similarity in 1024 bytes with < 0.1 error" in { + testLazyMinHasher(new LazyMinHasher(new MinHasher32(0.5, bands)), 0.5, 0.1) + } + "measure 0.8 similarity in 1024 bytes with < 0.1 error" in { + testLazyMinHasher(new LazyMinHasher(new MinHasher32(0.8, bands)), 0.8, 0.1) + } + "measure 1.0 similarity in 1024 bytes with < 0.01 error" in { + testLazyMinHasher(new LazyMinHasher(new MinHasher32(1.0, bands)), 1.0, 0.01) + } + } + }