Skip to content

Commit f0e8dc4

Browse files
committed
introduce new scalding-quotation module
1 parent 98ea650 commit f0e8dc4

File tree

50 files changed

+1619
-465
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1619
-465
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ project/plugins/lib_managed/
1313
project/plugins/src_managed/
1414
/.idea/
1515
/.idea_modules/
16+
.project
17+
.classpath
18+
.cache-main
19+
.cache-tests
20+
.tmpBin
21+
bin
1622
*.iml
1723
sonatype.sbt
1824
tutorial/data/cofollows.tsv

build.sbt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ lazy val scalding = Project(
216216
.aggregate(
217217
scaldingArgs,
218218
scaldingDate,
219+
scaldingQuotation,
219220
scaldingCore,
220221
scaldingCommons,
221222
scaldingAvro,
@@ -242,6 +243,7 @@ lazy val scaldingAssembly = Project(
242243
.aggregate(
243244
scaldingArgs,
244245
scaldingDate,
246+
scaldingQuotation,
245247
scaldingCore,
246248
scaldingCommons,
247249
scaldingAvro,
@@ -312,6 +314,13 @@ lazy val scaldingBenchmarks = module("benchmarks")
312314
parallelExecution in Test := false
313315
).dependsOn(scaldingCore)
314316

317+
lazy val scaldingQuotation = module("quotation").settings(
318+
libraryDependencies ++= Seq(
319+
"org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided",
320+
"org.scala-lang" % "scala-compiler" % scalaVersion.value % "provided"
321+
)
322+
)
323+
315324
lazy val scaldingCore = module("core").settings(
316325
libraryDependencies ++= Seq(
317326
"cascading" % "cascading-core" % cascadingVersion,
@@ -333,7 +342,7 @@ lazy val scaldingCore = module("core").settings(
333342
"org.slf4j" % "slf4j-api" % slf4jVersion,
334343
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "provided"),
335344
addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)
336-
).dependsOn(scaldingArgs, scaldingDate, scaldingSerialization, maple)
345+
).dependsOn(scaldingArgs, scaldingDate, scaldingSerialization, maple, scaldingQuotation)
337346

338347
lazy val scaldingCommons = module("commons").settings(
339348
libraryDependencies ++= Seq(

project/plugins.sbt

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ resolvers ++= Seq(
66
"Twitter Maven" at "http://maven.twttr.com"
77
)
88

9-
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
10-
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
11-
addSbtPlugin("com.fortysevendeg" % "sbt-microsites" % "0.3.3")
12-
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")
13-
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
14-
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "4.14.0")
15-
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.14")
16-
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4")
17-
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
18-
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
19-
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
20-
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")
21-
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.1.1")
9+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.10.2")
10+
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
11+
addSbtPlugin("com.fortysevendeg" % "sbt-microsites" % "0.3.3")
12+
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")
13+
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
14+
addSbtPlugin("com.twitter" %% "scrooge-sbt-plugin" % "4.14.0")
15+
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.14")
16+
addSbtPlugin("com.typesafe.sbt" % "sbt-ghpages" % "0.5.4")
17+
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
18+
addSbtPlugin("com.typesafe.sbt" % "sbt-scalariform" % "1.3.0")
19+
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
20+
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")
21+
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.1.1")
22+
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.3")

scalding-avro/src/main/scala/com/twitter/scalding/avro/package.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.twitter.scalding
1717

18+
import com.twitter.scalding.quotation.Quoted
1819
import cascading.flow.FlowDef
1920
import org.apache.avro.Schema
2021
import collection.JavaConverters._
@@ -26,7 +27,8 @@ package object avro {
2627
conv: TupleConverter[T],
2728
set: TupleSetter[T],
2829
flow: FlowDef,
29-
mode: Mode): Unit = {
30+
mode: Mode,
31+
q: Quoted): Unit = {
3032
val sink = PackedAvroSource[T](path)
3133
pipe.write(sink)
3234
}
@@ -35,7 +37,8 @@ package object avro {
3537
conv: TupleConverter[T],
3638
set: TupleSetter[T],
3739
flow: FlowDef,
38-
mode: Mode): Unit = {
40+
mode: Mode,
41+
q: Quoted): Unit = {
3942
import Dsl._
4043
val sink = UnpackedAvroSource[T](path, Some(schema))
4144
val outFields = {

scalding-commons/src/main/scala/com/twitter/scalding/commons/extensions/Checkpoint.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package com.twitter.scalding.commons.extensions
1818

1919
import com.twitter.scalding._
20+
import com.twitter.scalding.quotation.Quoted
2021
import com.twitter.scalding.Dsl._
2122

2223
import cascading.flow.FlowDef
@@ -111,7 +112,7 @@ object Checkpoint {
111112

112113
// Wrapper for Checkpoint when using a TypedPipe
113114
def apply[A](checkpointName: String)(flow: => TypedPipe[A])(implicit args: Args, mode: Mode, flowDef: FlowDef,
114-
conv: TupleConverter[A], setter: TupleSetter[A]): TypedPipe[A] = {
115+
conv: TupleConverter[A], setter: TupleSetter[A], q: Quoted): TypedPipe[A] = {
115116
val rPipe = apply(checkpointName, Dsl.intFields(0 until conv.arity)) {
116117
flow.toPipe(Dsl.intFields(0 until conv.arity))
117118
}

scalding-commons/src/main/scala/com/twitter/scalding/commons/source/VersionedKeyValSource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import com.twitter.scalding.commons.tap.VersionedTap.TapMode
3232
import com.twitter.scalding.source.{ CheckedInversion, MaxFailuresCheck }
3333
import com.twitter.scalding.typed.KeyedListLike
3434
import com.twitter.scalding.typed.TypedSink
35+
import com.twitter.scalding.quotation.Quoted
3536
import org.apache.hadoop.mapred.JobConf
3637
import scala.collection.JavaConverters._
3738

@@ -224,7 +225,7 @@ class TypedRichPipeEx[K: Ordering, V: Monoid](pipe: TypedPipe[(K, V)]) extends j
224225
// the pipe in using an implicit `Monoid[V]` and sinks all results
225226
// into the `sinkVersion` of data (or a new version) specified by
226227
// `src`.
227-
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode): TypedPipe[(K, V)] = {
228+
def writeIncremental(src: VersionedKeyValSource[K, V], reducers: Int = 1)(implicit flowDef: FlowDef, mode: Mode, q: Quoted): TypedPipe[(K, V)] = {
228229
val outPipe =
229230
if (!src.resourceExists(mode))
230231
pipe

scalding-commons/src/main/scala/com/twitter/scalding/examples/KMeans.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.twitter.scalding.examples
22

33
import com.twitter.scalding._
44
import com.twitter.scalding.typed.ComputedValue
5+
import com.twitter.scalding.quotation.Quoted
56

67
object KMeans {
78

@@ -88,12 +89,12 @@ object KMeans {
8889
}
8990
}
9091

91-
def initializeClusters(k: Int, points: TypedPipe[Vector[Double]]): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
92+
def initializeClusters(k: Int, points: TypedPipe[Vector[Double]])(implicit q: Quoted): (ValuePipe[List[LabeledVector]], TypedPipe[LabeledVector]) = {
9293
val rng = new java.util.Random(123)
9394
// take a random k vectors:
9495
val clusters = points.map { v => (rng.nextDouble, v) }
9596
.groupAll
96-
.sortedTake(k)(Ordering.by(_._1))
97+
.sortedTake(k)(Ordering.by(_._1), q)
9798
.mapValues { randk =>
9899
randk.iterator
99100
.zipWithIndex

scalding-core/src/main/scala/com/twitter/scalding/Config.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,11 @@ trait Config extends Serializable {
417417
*/
418418
def setVerboseFileSourceLogging(b: Boolean): Config =
419419
this + (VerboseFileSourceLoggingKey -> b.toString)
420+
421+
def getAutomaticProjectionPushdown: Boolean =
422+
get(ExperimentalAutomaticProjectionPushdown)
423+
.map(_.toBoolean)
424+
.getOrElse(false)
420425

421426
override def hashCode = toMap.hashCode
422427
override def equals(that: Any) = that match {
@@ -487,6 +492,11 @@ object Config {
487492
*/
488493
val HashJoinAutoForceRight: String = "scalding.hashjoin.autoforceright"
489494

495+
/**
496+
* Enables automatic projection pushdown (experimental)
497+
*/
498+
val ExperimentalAutomaticProjectionPushdown: String = "scalding.experimental.automatic_projection_pushdown"
499+
490500
val empty: Config = Config(Map.empty)
491501

492502
/*

scalding-core/src/main/scala/com/twitter/scalding/CumulativeSum.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.twitter.scalding.typed
22

33
import com.twitter.algebird._
4+
import com.twitter.scalding.quotation.Quoted
45

56
/**
67
* Extension for TypedPipe to add a cumulativeSum method.
@@ -39,7 +40,8 @@ object CumulativeSum {
3940
def cumulativeSum(
4041
implicit sg: Semigroup[V],
4142
ordU: Ordering[U],
42-
ordK: Ordering[K]): SortedGrouped[K, (U, V)] = {
43+
ordK: Ordering[K],
44+
quoted: Quoted): SortedGrouped[K, (U, V)] = {
4345
pipe.group
4446
.sortBy { case (u, _) => u }
4547
.scanLeft(Nil: List[(U, V)]) {
@@ -62,7 +64,8 @@ object CumulativeSum {
6264
implicit ordS: Ordering[S],
6365
sg: Semigroup[V],
6466
ordU: Ordering[U],
65-
ordK: Ordering[K]): TypedPipe[(K, (U, V))] = {
67+
ordK: Ordering[K],
68+
q: Quoted): TypedPipe[(K, (U, V))] = {
6669

6770
val sumPerS = pipe
6871
.map { case (k, (u, v)) => (k, partition(u)) -> v }

scalding-core/src/main/scala/com/twitter/scalding/bdd/TBddDsl.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,12 @@ import com.twitter.scalding._
55
import com.twitter.scalding.source.TypedText
66
import scala.collection.mutable.Buffer
77
import TDsl._
8+
import com.twitter.scalding.quotation.Quoted
89

910
trait TBddDsl extends FieldConversions with TypedPipeOperationsConversions {
1011

12+
private implicit val q: Quoted = Quoted.internal
13+
1114
def Given[TypeIn](source: TypedTestSource[TypeIn]): TestCaseGiven1[TypeIn] = new TestCaseGiven1[TypeIn](source)
1215

1316
def GivenSources(sources: List[TypedTestSource[_]]): TestCaseGivenList = new TestCaseGivenList(sources)

scalding-core/src/main/scala/com/twitter/scalding/mathematics/Matrix.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import cascading.tap._
3030
import com.twitter.scalding.Dsl._
3131
import scala.math.max
3232
import scala.annotation.tailrec
33+
import com.twitter.scalding.quotation.Quoted
3334

3435
/**
3536
* Matrix class - represents an infinite (hopefully sparse) matrix.
@@ -138,10 +139,11 @@ class MatrixMappableExtensions[T](mappable: Mappable[T])(implicit fd: FlowDef, m
138139
}
139140

140141
def toBlockMatrix[Group, Row, Col, Val](implicit ev: <:<[T, (Group, Row, Col, Val)], ord: Ordering[(Group, Row)],
141-
setter: TupleSetter[(Group, Row, Col, Val)]): BlockMatrix[Group, Row, Col, Val] =
142+
setter: TupleSetter[(Group, Row, Col, Val)],
143+
q: Quoted): BlockMatrix[Group, Row, Col, Val] =
142144
mapToBlockMatrix { _.asInstanceOf[(Group, Row, Col, Val)] }
143145

144-
def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)]): BlockMatrix[Group, Row, Col, Val] = {
146+
def mapToBlockMatrix[Group, Row, Col, Val](fn: (T) => (Group, Row, Col, Val))(implicit ord: Ordering[(Group, Row)], q: Quoted): BlockMatrix[Group, Row, Col, Val] = {
145147
val matPipe = TypedPipe
146148
.from(mappable)
147149
.map(fn)

0 commit comments

Comments
 (0)