Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.

Commit aebfeaa

Browse files
committed
Apply formatting
1 parent e8489f4 commit aebfeaa

29 files changed

+482
-564
lines changed

build.sbt

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,26 @@ developers := List(
1515
organizationName := "lightbend"
1616
organizationHomepage := Option(url("http://lightbend.com/"))
1717
homepage := scmInfo.value map (_.browseUrl)
18-
scmInfo := Option(ScmInfo(url("https://github.com/lightbend/kafka-streams-scala"), "[email protected]:lightbend/kafka-streams-scala.git"))
18+
scmInfo := Option(
19+
ScmInfo(url("https://github.com/lightbend/kafka-streams-scala"), "[email protected]:lightbend/kafka-streams-scala.git")
20+
)
1921

2022
parallelExecution in Test := false
2123
testFrameworks += new TestFramework("minitest.runner.Framework")
2224

2325
libraryDependencies ++= Seq(
24-
kafkaStreams excludeAll(ExclusionRule("org.slf4j", "slf4j-log4j12"), ExclusionRule("org.apache.zookeeper", "zookeeper")),
26+
kafkaStreams excludeAll (ExclusionRule("org.slf4j", "slf4j-log4j12"), ExclusionRule("org.apache.zookeeper",
27+
"zookeeper")),
2528
scalaLogging % "test",
26-
logback % "test",
27-
kafka % "test" excludeAll(ExclusionRule("org.slf4j", "slf4j-log4j12"), ExclusionRule("org.apache.zookeeper", "zookeeper")),
28-
curator % "test",
29-
minitest % "test",
29+
logback % "test",
30+
kafka % "test" excludeAll (ExclusionRule("org.slf4j", "slf4j-log4j12"), ExclusionRule("org.apache.zookeeper",
31+
"zookeeper")),
32+
curator % "test",
33+
minitest % "test",
3034
minitestLaws % "test",
31-
algebird % "test",
32-
chill % "test",
33-
avro4s % "test"
35+
algebird % "test",
36+
chill % "test",
37+
avro4s % "test"
3438
)
3539

3640
credentials += Credentials(Path.userHome / ".ivy2" / ".credentials")

project/Dependencies.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,22 @@ object Dependencies {
55

66
implicit class Exclude(module: ModuleID) {
77
def log4jExclude: ModuleID =
8-
module excludeAll(ExclusionRule("log4j"))
8+
module.excludeAll(ExclusionRule("log4j"))
99

1010
def driverExclusions: ModuleID =
11-
module.log4jExclude.exclude("com.google.guava", "guava")
11+
module.log4jExclude
12+
.exclude("com.google.guava", "guava")
1213
.excludeAll(ExclusionRule("org.slf4j"))
1314
}
1415

15-
val kafkaStreams = "org.apache.kafka" % "kafka-streams" % KafkaVersion
16-
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % ScalaLoggingVersion
17-
val logback = "ch.qos.logback" % "logback-classic" % LogbackVersion
18-
val kafka = "org.apache.kafka" %% "kafka" % KafkaVersion
19-
val curator = "org.apache.curator" % "curator-test" % CuratorVersion
20-
val minitest = "io.monix" %% "minitest" % MinitestVersion
21-
val minitestLaws = "io.monix" %% "minitest-laws" % MinitestVersion
22-
val algebird = "com.twitter" %% "algebird-core" % AlgebirdVersion
23-
val chill = "com.twitter" %% "chill" % ChillVersion
24-
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % Avro4sVersion
16+
val kafkaStreams = "org.apache.kafka" % "kafka-streams" % KafkaVersion
17+
val scalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % ScalaLoggingVersion
18+
val logback = "ch.qos.logback" % "logback-classic" % LogbackVersion
19+
val kafka = "org.apache.kafka" %% "kafka" % KafkaVersion
20+
val curator = "org.apache.curator" % "curator-test" % CuratorVersion
21+
val minitest = "io.monix" %% "minitest" % MinitestVersion
22+
val minitestLaws = "io.monix" %% "minitest-laws" % MinitestVersion
23+
val algebird = "com.twitter" %% "algebird-core" % AlgebirdVersion
24+
val chill = "com.twitter" %% "chill" % ChillVersion
25+
val avro4s = "com.sksamuel.avro4s" %% "avro4s-core" % Avro4sVersion
2526
}

project/Versions.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
object Versions {
2-
val AlgebirdVersion = "0.13.0"
3-
val ChillVersion = "0.9.2"
4-
val LogbackVersion = "1.2.3"
5-
val KafkaVersion = "1.0.0"
2+
val AlgebirdVersion = "0.13.0"
3+
val ChillVersion = "0.9.2"
4+
val LogbackVersion = "1.2.3"
5+
val KafkaVersion = "1.0.0"
66
val ScalaLoggingVersion = "3.5.0"
7-
val CuratorVersion = "4.0.0"
8-
val MinitestVersion = "2.0.0"
9-
val JDKVersion = "1.8"
10-
val Scala_2_12_Version = "2.12.5"
11-
val Scala_2_11_Version = "2.11.11"
12-
val Avro4sVersion = "1.8.3"
13-
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version )
7+
val CuratorVersion = "4.0.0"
8+
val MinitestVersion = "2.0.0"
9+
val JDKVersion = "1.8"
10+
val Scala_2_12_Version = "2.12.5"
11+
val Scala_2_11_Version = "2.11.11"
12+
val Avro4sVersion = "1.8.3"
13+
val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version)
1414
}

project/plugins.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
2-
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.0")
2+
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.0")
Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
* Copyright 2017-2018 Alexis Seigneurin.
4-
*/
5-
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
* Copyright 2017-2018 Alexis Seigneurin.
4+
*/
65
package com.lightbend.kafka.scala.streams
76

87
import org.apache.kafka.common.serialization.{Serde, Serdes}
98

10-
119
/**
12-
* Implicit values for default serdes
13-
*/
10+
* Implicit values for default serdes
11+
*/
1412
object DefaultSerdes {
15-
implicit val stringSerde: Serde[String] = Serdes.String()
16-
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
17-
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
13+
implicit val stringSerde: Serde[String] = Serdes.String()
14+
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
15+
implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray()
1816
implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes()
19-
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
20-
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
21-
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
17+
implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]]
18+
implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]]
19+
implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]]
2220
}

src/main/scala/com/lightbend/kafka/scala/streams/FunctionConversions.scala

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
* Copyright 2017-2018 Alexis Seigneurin.
4-
*/
5-
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
* Copyright 2017-2018 Alexis Seigneurin.
4+
*/
65
package com.lightbend.kafka.scala.streams
76

87
import org.apache.kafka.streams.KeyValue
98
import org.apache.kafka.streams.kstream._
109

1110
/**
12-
* Implicit classes that offer conversions of Scala function literals to
13-
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
14-
* more expressive, with less boilerplate and more succinct.
15-
*/
11+
* Implicit classes that offer conversions of Scala function literals to
12+
* SAM (Single Abstract Method) objects in Java. These make the Scala APIs much
13+
* more expressive, with less boilerplate and more succinct.
14+
*/
1615
object FunctionConversions {
1716

1817
implicit class PredicateFromFunction[K, V](val test: (K, V) => Boolean) extends AnyVal {
19-
def asPredicate: Predicate[K,V] = test(_,_)
18+
def asPredicate: Predicate[K, V] = test(_, _)
2019
}
2120

22-
implicit class MapperFromFunction[T, U, V](val f:(T,U) => V) extends AnyVal {
21+
implicit class MapperFromFunction[T, U, V](val f: (T, U) => V) extends AnyVal {
2322
def asKeyValueMapper: KeyValueMapper[T, U, V] = (k: T, v: U) => f(k, v)
24-
def asValueJoiner: ValueJoiner[T,U,V] = (v1, v2) => f(v1, v2)
23+
def asValueJoiner: ValueJoiner[T, U, V] = (v1, v2) => f(v1, v2)
2524
}
2625

27-
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f:(K,V) => (KR, VR)) extends AnyVal {
26+
implicit class KeyValueMapperFromFunction[K, V, KR, VR](val f: (K, V) => (KR, VR)) extends AnyVal {
2827
def asKeyValueMapper: KeyValueMapper[K, V, KeyValue[KR, VR]] = (k, v) => {
2928
val (kr, vr) = f(k, v)
3029
KeyValue.pair(kr, vr)
@@ -36,10 +35,10 @@ object FunctionConversions {
3635
}
3736

3837
implicit class AggregatorFromFunction[K, V, VR](val f: (K, V, VR) => VR) extends AnyVal {
39-
def asAggregator: Aggregator[K, V, VR] = (k,v,r) => f(k,v,r)
38+
def asAggregator: Aggregator[K, V, VR] = (k, v, r) => f(k, v, r)
4039
}
4140

42-
implicit class MergerFromFunction[K,VR](val f: (K, VR, VR) => VR) extends AnyVal {
41+
implicit class MergerFromFunction[K, VR](val f: (K, VR, VR) => VR) extends AnyVal {
4342
def asMerger: Merger[K, VR] = (k, v1, v2) => f(k, v1, v2)
4443
}
4544

src/main/scala/com/lightbend/kafka/scala/streams/ImplicitConversions.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
* Copyright 2017-2018 Alexis Seigneurin.
4-
*/
5-
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
* Copyright 2017-2018 Alexis Seigneurin.
4+
*/
65
package com.lightbend.kafka.scala.streams
76

87
import org.apache.kafka.streams.kstream._
9-
import org.apache.kafka.streams.{ KeyValue, Consumed }
8+
import org.apache.kafka.streams.{Consumed, KeyValue}
109
import org.apache.kafka.common.serialization.Serde
1110

1211
import scala.language.implicitConversions
1312

1413
/**
15-
* Implicit conversions between the Scala wrapper objects and the underlying Java
16-
* objects.
17-
*/
14+
* Implicit conversions between the Scala wrapper objects and the underlying Java
15+
* objects.
16+
*/
1817
object ImplicitConversions {
1918

2019
implicit def wrapKStream[K, V](inner: KStream[K, V]): KStreamS[K, V] =
@@ -50,7 +49,8 @@ object ImplicitConversions {
5049
implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
5150
Produced.`with`(keySerde, valueSerde)
5251

53-
implicit def joinedFromKVOSerde[K, V, VO](implicit keySerde: Serde[K], valueSerde: Serde[V],
52+
implicit def joinedFromKVOSerde[K, V, VO](implicit keySerde: Serde[K],
53+
valueSerde: Serde[V],
5454
otherValueSerde: Serde[VO]): Joined[K, V, VO] =
5555
Joined.`with`(keySerde, valueSerde, otherValueSerde)
5656
}

src/main/scala/com/lightbend/kafka/scala/streams/KGroupedStreamS.scala

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
* Copyright 2017-2018 Alexis Seigneurin.
4-
*/
5-
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
* Copyright 2017-2018 Alexis Seigneurin.
4+
*/
65
package com.lightbend.kafka.scala.streams
76

87
import org.apache.kafka.streams.kstream._
@@ -12,10 +11,9 @@ import org.apache.kafka.common.serialization.Serde
1211
import ImplicitConversions._
1312
import FunctionConversions._
1413

15-
1614
/**
17-
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
18-
*/
15+
* Wraps the Java class KGroupedStream and delegates method calls to the underlying Java object.
16+
*/
1917
class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
2018

2119
def count(): KTableS[K, Long] = {
@@ -24,47 +22,41 @@ class KGroupedStreamS[K, V](inner: KGroupedStream[K, V]) {
2422
}
2523

2624
def count(store: String, keySerde: Option[Serde[K]] = None): KTableS[K, Long] = {
27-
val materialized = keySerde.foldLeft(Materialized.as[K, java.lang.Long, KeyValueStore[Bytes, Array[Byte]]](store))((m,serde)=> m.withKeySerde(serde))
25+
val materialized = keySerde.foldLeft(Materialized.as[K, java.lang.Long, KeyValueStore[Bytes, Array[Byte]]](store))(
26+
(m, serde) => m.withKeySerde(serde)
27+
)
2828

2929
val c: KTableS[K, java.lang.Long] = inner.count(materialized)
3030
c.mapValues[Long](Long2long _)
3131
}
3232

33-
def reduce(reducer: (V, V) => V): KTableS[K, V] = {
33+
def reduce(reducer: (V, V) => V): KTableS[K, V] =
3434
inner.reduce((v1, v2) => reducer(v1, v2))
35-
}
36-
37-
def reduce(reducer: (V, V) => V,
38-
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] = {
3935

36+
def reduce(reducer: (V, V) => V, materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
4037
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
4138
// works perfectly with Scala 2.12 though
4239
inner.reduce(((v1: V, v2: V) => reducer(v1, v2)).asReducer, materialized)
43-
}
44-
45-
def reduce(reducer: (V, V) => V,
46-
storeName: String)(implicit keySerde: Serde[K], valueSerde: Serde[V]): KTableS[K, V] = {
4740

41+
def reduce(reducer: (V, V) => V, storeName: String)(implicit keySerde: Serde[K],
42+
valueSerde: Serde[V]): KTableS[K, V] =
4843
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
4944
// works perfectly with Scala 2.12 though
50-
inner.reduce(((v1: V, v2: V) =>
51-
reducer(v1, v2)).asReducer,
52-
Materialized.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
45+
inner.reduce(
46+
((v1: V, v2: V) => reducer(v1, v2)).asReducer,
47+
Materialized
48+
.as[K, V, KeyValueStore[Bytes, Array[Byte]]](storeName)
5349
.withKeySerde(keySerde)
5450
.withValueSerde(valueSerde)
5551
)
56-
}
5752

58-
def aggregate[VR](initializer: () => VR,
59-
aggregator: (K, V, VR) => VR): KTableS[K, VR] = {
53+
def aggregate[VR](initializer: () => VR, aggregator: (K, V, VR) => VR): KTableS[K, VR] =
6054
inner.aggregate(initializer.asInitializer, aggregator.asAggregator)
61-
}
6255

6356
def aggregate[VR](initializer: () => VR,
64-
aggregator: (K, V, VR) => VR,
65-
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, VR] = {
57+
aggregator: (K, V, VR) => VR,
58+
materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, VR] =
6659
inner.aggregate(initializer.asInitializer, aggregator.asAggregator, materialized)
67-
}
6860

6961
def windowedBy(windows: SessionWindows): SessionWindowedKStreamS[K, V] =
7062
inner.windowedBy(windows)
Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
/**
2-
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3-
* Copyright 2017-2018 Alexis Seigneurin.
4-
*/
5-
2+
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
3+
* Copyright 2017-2018 Alexis Seigneurin.
4+
*/
65
package com.lightbend.kafka.scala.streams
76

87
import ImplicitConversions._
@@ -12,8 +11,8 @@ import org.apache.kafka.common.utils.Bytes
1211
import FunctionConversions._
1312

1413
/**
15-
* Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object.
16-
*/
14+
* Wraps the Java class KGroupedTable and delegates method calls to the underlying Java object.
15+
*/
1716
class KGroupedTableS[K, V](inner: KGroupedTable[K, V]) {
1817

1918
type ByteArrayKVStore = KeyValueStore[Bytes, Array[Byte]]
@@ -26,35 +25,24 @@ class KGroupedTableS[K, V](inner: KGroupedTable[K, V]) {
2625
def count(materialized: Materialized[K, Long, ByteArrayKVStore]): KTableS[K, Long] =
2726
inner.count(materialized)
2827

29-
def reduce(adder: (V, V) => V,
30-
subTractor: (V, V) => V): KTableS[K, V] = {
31-
28+
def reduce(adder: (V, V) => V, subTractor: (V, V) => V): KTableS[K, V] =
3229
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
3330
// works perfectly with Scala 2.12 though
3431
inner.reduce(((v1, v2) => adder(v1, v2)).asReducer, ((v1, v2) => subTractor(v1, v2)).asReducer)
35-
}
3632

3733
def reduce(adder: (V, V) => V,
3834
subtractor: (V, V) => V,
39-
materialized: Materialized[K, V, ByteArrayKVStore]): KTableS[K, V] = {
40-
35+
materialized: Materialized[K, V, ByteArrayKVStore]): KTableS[K, V] =
4136
// need this explicit asReducer for Scala 2.11 or else the SAM conversion doesn't take place
4237
// works perfectly with Scala 2.12 though
4338
inner.reduce(((v1, v2) => adder(v1, v2)).asReducer, ((v1, v2) => subtractor(v1, v2)).asReducer, materialized)
44-
}
45-
46-
def aggregate[VR](initializer: () => VR,
47-
adder: (K, V, VR) => VR,
48-
subtractor: (K, V, VR) => VR): KTableS[K, VR] = {
4939

40+
def aggregate[VR](initializer: () => VR, adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR): KTableS[K, VR] =
5041
inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator)
51-
}
5242

5343
def aggregate[VR](initializer: () => VR,
5444
adder: (K, V, VR) => VR,
5545
subtractor: (K, V, VR) => VR,
56-
materialized: Materialized[K, VR, ByteArrayKVStore]): KTableS[K, VR] = {
57-
46+
materialized: Materialized[K, VR, ByteArrayKVStore]): KTableS[K, VR] =
5847
inner.aggregate(initializer.asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
59-
}
6048
}

0 commit comments

Comments
 (0)