Skip to content

First sketch at a sketch join #758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jan 30, 2014
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.typed

import com.twitter.algebird.{CMS,MurmurHash128}

case class Sketched[K,V]
(pipe: TypedPipe[(K,V)],
delta: Double,
eps: Double,
seed: Int,
reducers: Option[Int])
(implicit serialization: K => Array[Byte],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a type that does not extend function here? When you have an implicit function in scope, it can be used for implicit conversions, which will give K all the methods of Array[Byte](there are a lot: http://www.scala-lang.org/api/current/index.html#scala.Array) and might be confusing.

trait Encoder[K, T] {
  def apply(k: K): T
}

or we can punt, add this to bijection, and make bijection a dep of scalding.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just copying what SketchMap did. It seems like importing bijection provides something this can use for String, at least, though TBH I'm not sure exactly what's going on there. I'm +1 on an explicit Encoder typeclass though, and I think having Scalding depend on bijection is inevitable and fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... BTW since really this is about hashing, it could also be an argument for a Hashable typeclass in scalding, which I know has come up before.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is probably cleaner, or Hashable in Algebird.

There was a proposal, but I could never get something looked really good. Something like:

trait Hashable32[K] {
  def hash(k: K): Int
}

trait Hashable64[K] extends Hashable32[K] {
  def hash2(k: K): Long
  def hash(k: K): Int = {
    val l = hash2(k)
    ((l >>> 32) ^ l).toInt
   } 
}

trait Hashable128[K] extends Hashable64[K] {
  def hash4(k: K): (Long, Long)
  def hash2(k: K) = {
    val h = hash4(k)
    h._1 ^ h._2
  }
}

might do. I went overboard last time (shocker).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks pretty good, though I might use hash64 and hash128 for the method names, for consistency with the trait names?

So... does this go in algebird or in bijection?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess algebird. algebird-hash? and then depend on that in algebird-core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds right. Do we want to block this PR on that? My vote is that we get this into 0.9.0 as is (once I've addressed other feedback), then try to do algebird-hash "right" for the next round of releases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Let's get this in with (K => Array[Byte]) and fix all later.

ordering: Ordering[K])
extends WithReducers[Sketched[K,V]] {

def withReducers(n: Int) = Sketched(pipe, delta, eps, seed, Some(n))

private lazy val murmurHash = MurmurHash128(seed)
def hash(key: K) = murmurHash(serialization(key))._1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type on all public methods, otherwise binary compatibility is difficult.
hash(k: K): Long


private lazy implicit val cms = CMS.monoid(eps, delta, seed)
lazy val sketch = pipe.map{kv => cms.create(hash(kv._1))}.sum
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy val sketch: TypedPipe[CountMin] or make it private.


def cogroup[V2,R](right: TypedPipe[(K,V2)])(joiner: (K, V, Iterable[V2]) => Iterator[R]) =
new SketchJoined(this, right, reducers)(joiner)

def join[V2](right: TypedPipe[(K,V2)]) = cogroup(right)(Joiner.hashInner2)
def leftJoin[V2](right: TypedPipe[(K,V2)]) = cogroup(right)(Joiner.hashLeft2)
}

case class SketchJoined[K:Ordering,V,V2,R]
(left: Sketched[K,V],
right: TypedPipe[(K,V2)],
reducers: Option[Int])
(joiner: (K, V, Iterable[V2]) => Iterator[R])
extends WithReducers[SketchJoined[K,V,V2,R]] {

private lazy val numReducers = reducers.getOrElse(sys.error("Must specify number of reducers"))
def withReducers(n: Int) = SketchJoined(left, right, Some(n))(joiner)

//the most of any one reducer we want to try to take up with a single key
val maxReducerFraction = 0.1

private def flatMapWithReplicas[V](pipe: TypedPipe[(K,V)])(fn: Int => Iterable[Int]) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this V parameter is confusing since there is also a V on the class. Are they distinct? I think so. Can you make this W or something not used in the class?

pipe.flatMapWithValue(left.sketch){(v,sketchOpt) =>
sketchOpt.toList.flatMap{cms =>
val maxPerReducer = (cms.totalCount / numReducers) * maxReducerFraction + 1
val maxReplicas = (cms.frequency(left.hash(v._1)).estimate.toDouble / maxPerReducer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some comments around the following cases:

  1. if the cms.freq == 0, we don't send to reducers.
  2. if the cms.freq < maxPerReducer we still send something (the ceil.toInt seems to do this, but a comment will help).


//if the frequency is 0, maxReplicas.ceil will be 0 so we will filter out this key entirely
//if it's < maxPerReducer, the ceil will round maxReplicas up to 1 to ensure we still see it
val replicas = fn(maxReplicas.ceil.toInt.min(numReducers))
replicas.toList.map{i => (i,v._1) -> v._2}
}
}

lazy val toTypedPipe = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toTypedPipe: TypedPipe[(K, R)]

lazy val rand = new scala.util.Random(left.seed)
val lhs = flatMapWithReplicas(left.pipe){n => Some(rand.nextInt(n) + 1)}
val rhs = flatMapWithReplicas(right){n => 1.to(n)}

lhs
.group
.cogroup(rhs.group){(k,itv,itu) => itv.flatMap{v => joiner(k._2,v,itu)}}
.withReducers(numReducers)
.map{case ((r,k),v) => (k,v)}
}
}

object SketchJoined {
implicit def toTypedPipe[K,V, V2, R]
(joined: SketchJoined[K, V, V2, R]): TypedPipe[(K, R)] = joined.toTypedPipe
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ trait TypedPipe[+T] extends Serializable {
.hashLeftJoin(grouped)
.map { case (t, (_, optV)) => (t, optV) }

def sketch[K,V]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include a zero-arg version, for convenience? Otherwise you need to always do sketch()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but question: Should this be in an enrichment rather than adding the method to TypedPipe? I am wondering how to draw the line of what we add to TypedPipe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think anything we accept into scalding-core should go on TypedPipe. From a discoverability point of view I think we want a small number of very rich APIs, not stuff scattered around everywhere.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all into one class, rather than an enrichment pattern? Having a CMS joiner is very powerful, but hard to call it 'core' to a typed pipe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my feeling is this: it's hard to dispute that joins are core; almost anyone using joins is going to run into problems with skew at some point, and ask how to solve that; having a good answer for that seems pretty fundamental.

In the very earliest days of Scalding, one piece of FUD that was often directed at is was "but Pig has SKEW JOIN, we can't take anything seriously if it doesn't have SKEW JOIN". I think having a definitive reply to that counts as 'core'.

(delta: Double = 0.05,
eps: Double = 0.01,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you comment on the eps/delta choice? Since we are dealing with map/reduce, it makes little sense to default to computing a small CMS. I think defaulting to something like 10MB should be done. Otherwise, the overhead to computing the CMS overwhelms the value we get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I didn't give it a ton of thought because even a crappy small CMS makes such an incredible difference; the main thing is to reliably identify the large keys without including too many of the small keys, and it doesn't take much to do that.

10MB seems reasonable. My guess is that we want to make this CMS deeper and narrower than usual: we care less about the precision of the estimates than we do about false positives where a key spuriously registers as large. But depth has diminishing returns... even depth 20 is probably complete overkill. And at 10MB that would put us at width 100,000, is that right?

Suggestions?

seed: Int = 12345)
(implicit ev: TypedPipe[T] <:< TypedPipe[(K,V)],
serialization: K => Array[Byte],
ordering: Ordering[K]): Sketched[K,V] =
Sketched(ev(this), delta, eps, seed, None)
}


Expand Down