Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
garyelephant committed Feb 16, 2018
1 parent 6d98656 commit 6f886b2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.github.interestinglab.waterdrop.core

object RowConstant {
val ROOT = "__root__"
val TMP = "__tmp__"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.interestinglab.waterdrop.filter

import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.core.RowConstant
import io.github.interestinglab.waterdrop.utils.{FormatParser, StringTemplate, UnixMSParser, UnixParser}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
Expand All @@ -26,7 +27,7 @@ class Date(var config: Config) extends BaseFilter(config) {
super.prepare(spark, ssc)
val defaultConfig = ConfigFactory.parseMap(
Map(
"source_field" -> Json.ROOT,
"source_field" -> RowConstant.ROOT,
"target_field" -> "datetime",
"source_time_format" -> "UNIX_MS",
"target_time_format" -> "yyyy/MM/dd HH:mm:ss",
Expand Down Expand Up @@ -59,7 +60,7 @@ class Date(var config: Config) extends BaseFilter(config) {
})

config.getString("source_field") match {
case Json.ROOT => df.withColumn(targetField, func(lit(System.currentTimeMillis().toString)))
case RowConstant.ROOT => df.withColumn(targetField, func(lit(System.currentTimeMillis().toString)))
case srcField: String => df.withColumn(targetField, func(col(srcField)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.util
import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.config.Common
import io.github.interestinglab.waterdrop.core.RowConstant
import io.thekraken.grok.api.{Grok => GrokLib}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
Expand All @@ -18,7 +19,6 @@ class Grok(var conf: Config) extends BaseFilter(conf) {

val grok = GrokLib.EMPTY


def this() = {
this(ConfigFactory.empty())
}
Expand Down Expand Up @@ -47,7 +47,7 @@ class Grok(var conf: Config) extends BaseFilter(conf) {
.toString,
"named_captures_only" -> true,
"source_field" -> "raw_message",
"target_field" -> Json.ROOT
"target_field" -> RowConstant.ROOT
).asJava
)
conf = conf.withFallback(defaultConfig)
Expand All @@ -65,13 +65,13 @@ class Grok(var conf: Config) extends BaseFilter(conf) {
val grokUDF = udf((str: String) => grokMatch(str))
val keys = getKeysOfPattern(conf.getString("pattern"))
conf.getString("target_field") match {
case Json.ROOT => {
var tmpDf = df.withColumn(Json.TMP, grokUDF(col(conf.getString("source_field"))))
case RowConstant.ROOT => {
var tmpDf = df.withColumn(RowConstant.TMP, grokUDF(col(conf.getString("source_field"))))
while (keys.hasNext) {
val field = keys.next()
tmpDf = tmpDf.withColumn(field, col(Json.TMP)(field))
tmpDf = tmpDf.withColumn(field, col(RowConstant.TMP)(field))
}
tmpDf.drop(Json.TMP)
tmpDf.drop(RowConstant.TMP)
}
case targetField => {
df.withColumn(targetField, grokUDF(col(conf.getString("source_field"))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.github.interestinglab.waterdrop.filter
import scala.collection.JavaConversions._
import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.core.RowConstant
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.functions._
Expand All @@ -26,7 +27,7 @@ class Json(var conf: Config) extends BaseFilter(conf) {
val defaultConfig = ConfigFactory.parseMap(
Map(
"source_field" -> "raw_message",
"target_field" -> Json.ROOT
"target_field" -> RowConstant.ROOT
)
)
conf = conf.withFallback(defaultConfig)
Expand All @@ -38,19 +39,19 @@ class Json(var conf: Config) extends BaseFilter(conf) {
import spark.implicits._

conf.getString("target_field") match {
case Json.ROOT => {
case RowConstant.ROOT => {

val stringDataSet = df.select(srcField).as[String]

val newDF = srcField match {
case "raw_message" => spark.read.json(stringDataSet)
case s: String => {
val schema = spark.read.json(stringDataSet).schema
var tmpDf = df.withColumn(Json.TMP, from_json(col(s), schema))
var tmpDf = df.withColumn(RowConstant.TMP, from_json(col(s), schema))
schema.map { field =>
tmpDf = tmpDf.withColumn(field.name, col(Json.TMP)(field.name))
tmpDf = tmpDf.withColumn(field.name, col(RowConstant.TMP)(field.name))
}
tmpDf.drop(Json.TMP)
tmpDf.drop(RowConstant.TMP)
}
}

Expand All @@ -64,8 +65,3 @@ class Json(var conf: Config) extends BaseFilter(conf) {
}
}
}

object Json {
val ROOT = "__root__"
val TMP = "__tmp__"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util

import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.core.RowConstant
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.sql.functions.{col, udf}
Expand Down Expand Up @@ -33,15 +34,15 @@ class Kv(var conf: Config) extends BaseFilter(conf) {
"include_fields" -> util.Arrays.asList(),
"exclude_fields" -> util.Arrays.asList(),
"source_field" -> "raw_message",
"target_field" -> Json.ROOT
"target_field" -> RowConstant.ROOT
)
)
conf = conf.withFallback(defaultConfig)
}

override def process(spark: SparkSession, df: DataFrame): DataFrame = {
conf.getString("target_field") match {
case Json.ROOT => df // TODO: implement
case RowConstant.ROOT => df // TODO: implement
case targetField: String => {
val kvUDF = udf((s: String) => kv(s))
df.withColumn(targetField, kvUDF(col(conf.getString("source_field"))))
Expand All @@ -64,8 +65,7 @@ class Kv(var conf: Config) extends BaseFilter(conf) {

if (includeFields.length == 0 && excludeFields.length == 0) {
map += (conf.getString("field_prefix") + key -> value)
}
else if (includeFields.length > 0 && includeFields.contains(key)) {
} else if (includeFields.length > 0 && includeFields.contains(key)) {
map += (conf.getString("field_prefix") + key -> value)
} else if (excludeFields.length > 0 && !excludeFields.contains(key)) {
map += (conf.getString("field_prefix") + key -> value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.interestinglab.waterdrop.filter

import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import io.github.interestinglab.waterdrop.core.RowConstant
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.StreamingContext
Expand All @@ -28,7 +29,7 @@ class Split(var conf: Config) extends BaseFilter(conf) {
Map(
"delimiter" -> " ",
"source_field" -> "raw_message",
"target_field" -> Json.ROOT
"target_field" -> RowConstant.ROOT
)
)

Expand All @@ -42,15 +43,15 @@ class Split(var conf: Config) extends BaseFilter(conf) {

// https://stackoverflow.com/a/33345698/1145750
conf.getString("target_field") match {
case Json.ROOT => {
case RowConstant.ROOT => {
val func = udf((s: String) => {
split(s, conf.getString("delimiter"), keys.size())
})
var filterDf = df.withColumn(Json.TMP, func(col(srcField)))
var filterDf = df.withColumn(RowConstant.TMP, func(col(srcField)))
for (i <- 0 until keys.size()) {
filterDf = filterDf.withColumn(keys.get(i), col(Json.TMP)(i))
filterDf = filterDf.withColumn(keys.get(i), col(RowConstant.TMP)(i))
}
filterDf.drop(Json.TMP)
filterDf.drop(RowConstant.TMP)
}
case targetField: String => {
val func = udf((s: String) => {
Expand Down

0 comments on commit 6f886b2

Please sign in to comment.