Skip to content

Commit

Permalink
Merge pull request #224 from InterestingLab/garyelephant.fea.sql-filter
Browse files Browse the repository at this point in the history
added checkSQLSyntax in sql filter
  • Loading branch information
RickyHuo authored Mar 5, 2019
2 parents b028cc1 + dc81b52 commit ccd0b3d
Showing 1 changed file with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.github.interestinglab.waterdrop.filter

import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseFilter
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Dataset, Row, SparkSession}

class Sql extends BaseFilter {
Expand All @@ -24,11 +25,29 @@ class Sql extends BaseFilter {

override def checkConfig(): (Boolean, String) = {
conf.hasPath("table_name") && conf.hasPath("sql") match {
case true => (true, "")
case true => checkSQLSyntax(conf.getString("sql"))
case false => (false, "please specify [table_name] and [sql]")
}
}

private def checkSQLSyntax(sql: String): (Boolean, String) = {
val sparkSession = SparkSession.builder.getOrCreate
val logicalPlan = sparkSession.sessionState.sqlParser.parsePlan(sql)

if (!logicalPlan.analyzed) {
val logicPlanStr = logicalPlan.toString
logicPlanStr.toLowerCase.contains("unresolvedrelation") match {
case true => (true, "")
case false => {
val msg = "config[sql] cannot be passed through sql parser, sql[" + sql + "], logicPlan: \n" + logicPlanStr
(false, msg)
}
}
} else {
(true, "")
}
}

override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = {
df.createOrReplaceTempView(this.conf.getString("table_name"))
spark.sql(conf.getString("sql"))
Expand Down

0 comments on commit ccd0b3d

Please sign in to comment.