-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathSlidingArrivalCount.scala
142 lines (118 loc) · 4.61 KB
/
SlidingArrivalCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* Copyright 2015 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.dataartisans.flink_demo.examples
import com.dataartisans.flink_demo.datatypes.{TaxiRide, GeoPoint}
import com.dataartisans.flink_demo.sinks.ElasticsearchUpsertSink
import com.dataartisans.flink_demo.sources.TaxiRideSource
import com.dataartisans.flink_demo.utils.{DemoStreamEnvironment, NycGeoUtils}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
* Apache Flink DataStream API demo application.
*
* The program processes a stream of taxi ride events from the New York City Taxi and Limousine
* Commission (TLC).
* It computes every five minutes for each location the total number of persons that arrived
* within the last 15 minutes by taxi.
*
* See
* http://github.com/dataartisans/flink-streaming-demo
* for more detail.
*
*/
object SlidingArrivalCount {
def main(args: Array[String]) {
// input parameters
val data = "./data/nycTaxiData.gz"
val maxServingDelay = 60
val servingSpeedFactor = 600f
// window parameters
val countWindowLength = 15 // window size in min
val countWindowFrequency = 5 // window trigger interval in min
val earlyCountThreshold = 50
// Elasticsearch parameters
val writeToElasticsearch = false
val elasticsearchHost = ""
val elasticsearchPort = -1
// set up streaming execution environment
val env: StreamExecutionEnvironment = DemoStreamEnvironment.env
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Define the data source
val rides: DataStream[TaxiRide] = env.addSource(new TaxiRideSource(
data, maxServingDelay, servingSpeedFactor))
val cleansedRides = rides
// filter for trip end events
.filter( !_.isStart )
// filter for events in NYC
.filter( r => NycGeoUtils.isInNYC(r.location) )
// map location coordinates to cell Id, timestamp, and passenger count
val cellIds: DataStream[(Int, Short)] = cleansedRides
.map( r => ( NycGeoUtils.mapToGridCell(r.location), r.passengerCnt ) )
val passengerCnts: DataStream[(Int, Long, Int)] = cellIds
// key stream by cell Id
.keyBy(_._1)
// define sliding window on keyed streams
.timeWindow(Time.minutes(countWindowLength), Time.minutes(countWindowFrequency))
// count events in window
.apply { (
cell: Int,
window: TimeWindow,
events: Iterable[(Int, Short)],
out: Collector[(Int, Long, Int)]) =>
out.collect( ( cell, window.getEnd, events.map( _._2 ).sum ) )
}
// map cell Id back to GeoPoint
val cntByLocation: DataStream[(Int, Long, GeoPoint, Int)] = passengerCnts
.map( r => ( r._1, r._2, NycGeoUtils.getGridCellCenter(r._1), r._3 ) )
// print to console
cntByLocation
.print()
if (writeToElasticsearch) {
// write to Elasticsearch
cntByLocation
.addSink(new CntByLocTimeUpsert(elasticsearchHost, elasticsearchPort))
}
env.execute("Sliding passenger count per location")
}
class CntByLocTimeUpsert(host: String, port: Int)
extends ElasticsearchUpsertSink[(Int, Long, GeoPoint, Int)](
host,
port,
"elasticsearch",
"nyc-idx",
"popular-locations") {
override def insertJson(r: (Int, Long, GeoPoint, Int)): Map[String, AnyRef] = {
Map(
"location" -> (r._3.lat+","+r._3.lon).asInstanceOf[AnyRef],
"time" -> r._2.asInstanceOf[AnyRef],
"cnt" -> r._4.asInstanceOf[AnyRef]
)
}
override def updateJson(r: (Int, Long, GeoPoint, Int)): Map[String, AnyRef] = {
Map[String, AnyRef] (
"cnt" -> r._4.asInstanceOf[AnyRef]
)
}
override def indexKey(r: (Int, Long, GeoPoint, Int)): String = {
// index by location and time
r._1.toString + "/" + r._2.toString
}
}
}