-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathEarlyArrivalCount.scala
192 lines (159 loc) · 6.19 KB
/
EarlyArrivalCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*
* Copyright 2015 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dataartisans.flink_demo.examples
import com.dataartisans.flink_demo.datatypes.{TaxiRide, GeoPoint}
import com.dataartisans.flink_demo.sinks.ElasticsearchUpsertSink
import com.dataartisans.flink_demo.sources.TaxiRideSource
import com.dataartisans.flink_demo.utils.{DemoStreamEnvironment, NycGeoUtils}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.triggers.Trigger.{TriggerResult, TriggerContext}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* Apache Flink DataStream API demo application.
*
* The program processes a stream of taxi ride events from the New York City Taxi and Limousine
* Commission (TLC).
* It computes every five minutes for each location the total number of persons that arrived
* within the last 15 minutes by taxi. The program emits early partial count results whenever more
* than 50 persons (or a multitude of 50 persons) arrive at a location within 15 minutes.
*
* See
* http://github.com/dataartisans/flink-streaming-demo
* for more detail.
*
*/
object EarlyArrivalCount {
def main(args: Array[String]) {
// input parameters
val data = "./data/nycTaxiData.gz"
val maxServingDelay = 60
val servingSpeedFactor = 600f
// window parameters
val countWindowLength = 15 // window size in min
val countWindowFrequency = 5 // window trigger interval in min
val earlyCountThreshold = 50
// Elasticsearch parameters
val writeToElasticsearch = false
val elasticsearchHost = ""
val elasticsearchPort = -1
// set up streaming execution environment
val env: StreamExecutionEnvironment = DemoStreamEnvironment.env
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Define the data source
val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(
data, maxServingDelay, servingSpeedFactor))
val cleansedRides = rides
// filter for trip end events
.filter( !_.isStart )
// filter for events in NYC
.filter( r => NycGeoUtils.isInNYC(r.location) )
// map location coordinates to cell Id, timestamp, and passenger count
val cellIds: DataStream[(Int, Short)] = cleansedRides
.map( r => ( NycGeoUtils.mapToGridCell(r.location), r.passengerCnt ) )
val passengerCnts: DataStream[(Int, Long, Int)] = cellIds
// key stream by cell Id
.keyBy(_._1)
// define sliding window on keyed streams
.timeWindow(Time.minutes(countWindowLength), Time.minutes(countWindowFrequency))
.trigger(new EarlyCountTrigger(earlyCountThreshold))
// count events in window
.apply { (
cell: Int,
window: TimeWindow,
events: Iterable[(Int, Short)],
out: Collector[(Int, Long, Int)]) =>
out.collect( ( cell, window.getEnd, events.map( _._2 ).sum ) )
}
val cntByLocation: DataStream[(Int, Long, GeoPoint, Int)] = passengerCnts
// map cell Id back to GeoPoint
.map( r => ( r._1, r._2, NycGeoUtils.getGridCellCenter(r._1), r._3 ) )
// print to console
cntByLocation
.print()
if (writeToElasticsearch) {
// write to Elasticsearch
cntByLocation
.addSink(new CntByLocTimeUpsert(elasticsearchHost, elasticsearchPort))
}
env.execute("Early arrival counts per location")
}
class EarlyCountTrigger(triggerCnt: Int) extends Trigger[(Int, Short), TimeWindow] {
override def onElement(
event: (Int, Short),
timestamp: Long,
window: TimeWindow,
ctx: TriggerContext): TriggerResult = {
// register event time timer for end of window
ctx.registerEventTimeTimer(window.getEnd)
// get current count
val personCnt = ctx.getKeyValueState[Integer]("personCnt", 0)
// update count by passenger cnt of new event
personCnt.update(personCnt.value() + event._2)
// check if count is high enough for early notification
if (personCnt.value() < triggerCnt) {
// not yet
TriggerResult.CONTINUE
}
else {
// trigger count is reached
personCnt.update(0)
TriggerResult.FIRE
}
}
override def onEventTime(
time: Long,
window: TimeWindow,
ctx: TriggerContext): TriggerResult = {
// trigger final computation
TriggerResult.FIRE_AND_PURGE
}
override def onProcessingTime(
time: Long,
window: TimeWindow,
ctx: TriggerContext): TriggerResult = {
throw new UnsupportedOperationException("I am not a processing time trigger")
}
}
class CntByLocTimeUpsert(host: String, port: Int)
extends ElasticsearchUpsertSink[(Int, Long, GeoPoint, Int)](
host,
port,
"elasticsearch",
"nyc-idx",
"popular-locations") {
override def insertJson(r: (Int, Long, GeoPoint, Int)): Map[String, AnyRef] = {
Map(
"location" -> (r._3.lat+","+r._3.lon).asInstanceOf[AnyRef],
"time" -> r._2.asInstanceOf[AnyRef],
"cnt" -> r._4.asInstanceOf[AnyRef]
)
}
override def updateJson(r: (Int, Long, GeoPoint, Int)): Map[String, AnyRef] = {
Map[String, AnyRef] (
"cnt" -> r._4.asInstanceOf[AnyRef]
)
}
override def indexKey(r: (Int, Long, GeoPoint, Int)): String = {
// index by location and time
r._1.toString + "/" + r._2.toString
}
}
}