19
19
package com .metamx .tranquility .beam
20
20
21
21
import com .fasterxml .jackson .databind .ObjectMapper
22
+ import com .github .nscala_time .time .Imports ._
22
23
import com .google .common .util .concurrent .ThreadFactoryBuilder
23
24
import com .metamx .common .scala .Logging
24
25
import com .metamx .common .scala .Predef ._
@@ -30,22 +31,17 @@ import com.metamx.common.scala.timekeeper.Timekeeper
30
31
import com .metamx .common .scala .untyped ._
31
32
import com .metamx .emitter .service .ServiceEmitter
32
33
import com .metamx .tranquility .typeclass .Timestamper
33
- import com .twitter .util .Future
34
- import com .twitter .util .FuturePool
35
- import com .twitter .util .Promise
36
- import com .twitter .util .Return
37
- import com .twitter .util .Throw
34
+ import com .twitter .util ._
38
35
import java .util .UUID
39
36
import java .util .concurrent .Executors
40
37
import java .util .concurrent .atomic .AtomicBoolean
41
38
import org .apache .curator .framework .CuratorFramework
42
39
import org .apache .curator .framework .recipes .locks .InterProcessSemaphoreMutex
43
40
import org .apache .zookeeper .KeeperException .NodeExistsException
41
+ import org .joda .time .chrono .ISOChronology
44
42
import org .joda .time .DateTime
45
43
import org .joda .time .DateTimeZone
46
44
import org .joda .time .Interval
47
- import org .joda .time .chrono .ISOChronology
48
- import org .scala_tools .time .Implicits ._
49
45
import scala .collection .JavaConverters ._
50
46
import scala .collection .mutable
51
47
import scala .language .reflectiveCalls
@@ -190,16 +186,16 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
190
186
private [this ] def beam (timestamp : DateTime , now : DateTime ): Future [Beam [EventType ]] = {
191
187
val bucket = tuning.segmentBucket(timestamp)
192
188
val creationInterval = new Interval (
193
- tuning.segmentBucket(now - tuning.windowPeriod).start.millis ,
194
- tuning.segmentBucket(Seq (now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.millis )).end.millis ,
189
+ tuning.segmentBucket(now - tuning.windowPeriod).start.getMillis ,
190
+ tuning.segmentBucket(Seq (now + tuning.warmingPeriod, now + tuning.windowPeriod).maxBy(_.getMillis )).end.getMillis ,
195
191
ISOChronology .getInstanceUTC
196
192
)
197
193
val windowInterval = new Interval (
198
- tuning.segmentBucket(now - tuning.windowPeriod).start.millis ,
199
- tuning.segmentBucket(now + tuning.windowPeriod).end.millis ,
194
+ tuning.segmentBucket(now - tuning.windowPeriod).start.getMillis ,
195
+ tuning.segmentBucket(now + tuning.windowPeriod).end.getMillis ,
200
196
ISOChronology .getInstanceUTC
201
197
)
202
- val futureBeamOption = beams.get(timestamp.millis ) match {
198
+ val futureBeamOption = beams.get(timestamp.getMillis ) match {
203
199
case _ if ! open => Future .value(None )
204
200
case Some (x) if windowInterval.overlaps(bucket) => Future .value(Some (x))
205
201
case Some (x) => Future .value(None )
@@ -210,7 +206,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
210
206
// This could be more efficient, but it's happening infrequently so it's probably not a big deal.
211
207
data.modify {
212
208
prev =>
213
- val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.millis , Nil )
209
+ val prevBeamDicts = prev.beamDictss.getOrElse(timestamp.getMillis , Nil )
214
210
if (prevBeamDicts.size >= tuning.partitions) {
215
211
log.info(
216
212
" Merged beam already created for identifier[%s] timestamp[%s], with sufficient partitions (target = %d, actual = %d)" ,
@@ -236,8 +232,8 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
236
232
val numSegmentsToCover = tuning.minSegmentsPerBeam +
237
233
rand.nextInt(tuning.maxSegmentsPerBeam - tuning.minSegmentsPerBeam + 1 )
238
234
val intervalToCover = new Interval (
239
- timestamp.millis ,
240
- tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).millis ,
235
+ timestamp.getMillis ,
236
+ tuning.segmentGranularity.increment(timestamp, numSegmentsToCover).getMillis ,
241
237
ISOChronology .getInstanceUTC
242
238
)
243
239
val timestampsToCover = tuning.segmentGranularity.getIterable(intervalToCover).asScala.map(_.start)
@@ -249,7 +245,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
249
245
// Expire old beamDicts
250
246
tuning.segmentGranularity.increment(new DateTime (millis)) + tuning.windowPeriod < now
251
247
}) ++ (for (ts <- timestampsToCover) yield {
252
- val tsPrevDicts = prev.beamDictss.getOrElse(ts.millis , Nil )
248
+ val tsPrevDicts = prev.beamDictss.getOrElse(ts.getMillis , Nil )
253
249
log.info(
254
250
" Creating new merged beam for identifier[%s] timestamp[%s] (target = %d, actual = %d)" ,
255
251
identifier,
@@ -272,10 +268,10 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
272
268
}
273
269
)
274
270
})
275
- (ts.millis , tsNewDicts)
271
+ (ts.getMillis , tsNewDicts)
276
272
})
277
273
val newLatestCloseTime = new DateTime (
278
- (Seq (prev.latestCloseTime.millis ) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max,
274
+ (Seq (prev.latestCloseTime.getMillis ) ++ (prev.beamDictss.keySet -- newBeamDictss.keySet)).max,
279
275
ISOChronology .getInstanceUTC
280
276
)
281
277
ClusteredBeamMeta (
@@ -298,12 +294,12 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
298
294
// Only add the beams we actually wanted at this time. This is because there might be other beams in ZK
299
295
// that we don't want to add just yet, on account of maybe they need their partitions expanded (this only
300
296
// happens when they are the necessary ones).
301
- if (! beams.contains(timestamp.millis ) && meta.beamDictss.contains(timestamp.millis )) {
302
- val beamDicts = meta.beamDictss(timestamp.millis )
297
+ if (! beams.contains(timestamp.getMillis ) && meta.beamDictss.contains(timestamp.getMillis )) {
298
+ val beamDicts = meta.beamDictss(timestamp.getMillis )
303
299
log.info(" Adding beams for identifier[%s] timestamp[%s]: %s" , identifier, timestamp, beamDicts)
304
300
// Should have better handling of unparseable zk data. Changing BeamMaker implementations currently
305
301
// just causes exceptions until the old dicts are cleared out.
306
- beams(timestamp.millis ) = beamMergeFn(
302
+ beams(timestamp.getMillis ) = beamMergeFn(
307
303
beamDicts.zipWithIndex map {
308
304
case (beamDict, partitionNum) =>
309
305
val decorate = beamDecorateFn(tuning.segmentBucket(timestamp), partitionNum)
@@ -319,7 +315,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
319
315
beams.remove(timestamp)
320
316
}
321
317
// Return requested beam. It may not have actually been created, so it's an Option.
322
- beams.get(timestamp.millis ) ifEmpty {
318
+ beams.get(timestamp.getMillis ) ifEmpty {
323
319
log.info(
324
320
" Turns out we decided not to actually make beams for identifier[%s] timestamp[%s]. Returning None." ,
325
321
identifier,
@@ -352,7 +348,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
352
348
val grouped : Seq [(DateTime , IndexedSeq [(EventType , Promise [SendResult ])])] = (eventsWithPromises groupBy {
353
349
case (event, promise) =>
354
350
tuning.segmentBucket(timestamper(event)).start
355
- }).toSeq.sortBy(_._1.millis )
351
+ }).toSeq.sortBy(_._1.getMillis )
356
352
// Possibly warm up future beams
357
353
def toBeWarmed (dt : DateTime , end : DateTime ): List [DateTime ] = {
358
354
if (dt <= end) {
@@ -362,7 +358,7 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
362
358
}
363
359
}
364
360
val latestEventTimestamp : Option [DateTime ] = grouped.lastOption map { case (truncatedTimestamp, group) =>
365
- val event : EventType = group.maxBy(tuple => timestamper(tuple._1).millis )._1
361
+ val event : EventType = group.maxBy(tuple => timestamper(tuple._1).getMillis )._1
366
362
timestamper(event)
367
363
}
368
364
val warmingBeams : Future [Seq [Beam [EventType ]]] = Future .collect(
@@ -406,13 +402,13 @@ class ClusteredBeam[EventType: Timestamper, InnerBeamType <: Beam[EventType]](
406
402
data.modify {
407
403
prev =>
408
404
ClusteredBeamMeta (
409
- Seq (prev.latestCloseTime, timestamp).maxBy(_.millis ),
410
- prev.beamDictss - timestamp.millis
405
+ Seq (prev.latestCloseTime, timestamp).maxBy(_.getMillis ),
406
+ prev.beamDictss - timestamp.getMillis
411
407
)
412
408
} onSuccess {
413
409
meta =>
414
410
beamWriteMonitor.synchronized {
415
- beams.remove(timestamp.millis )
411
+ beams.remove(timestamp.getMillis )
416
412
}
417
413
} map (_ => SendResult .Dropped )
418
414
} else {
@@ -479,7 +475,7 @@ case class ClusteredBeamMeta(latestCloseTime: DateTime, beamDictss: Map[Long, Se
479
475
Dict (
480
476
// latestTime is only being written for backwards compatibility
481
477
" latestTime" -> new DateTime (
482
- (Seq (latestCloseTime.millis ) ++ beamDictss.map(_._1)).max,
478
+ (Seq (latestCloseTime.getMillis ) ++ beamDictss.map(_._1)).max,
483
479
ISOChronology .getInstanceUTC
484
480
).toString(),
485
481
" latestCloseTime" -> latestCloseTime.toString(),
@@ -499,7 +495,7 @@ object ClusteredBeamMeta
499
495
case (k, vs) =>
500
496
val ts = new DateTime (k, ISOChronology .getInstanceUTC)
501
497
val beamDicts = list(vs) map (dict(_))
502
- (ts.millis , beamDicts)
498
+ (ts.getMillis , beamDicts)
503
499
}
504
500
val latestCloseTime = new DateTime (d.getOrElse(" latestCloseTime" , 0L ), ISOChronology .getInstanceUTC)
505
501
Right (ClusteredBeamMeta (latestCloseTime, beams))
0 commit comments