From dc81b52761e14d859da66cbf5c01fc700ab6f862 Mon Sep 17 00:00:00 2001 From: garyelephant Date: Fri, 18 Jan 2019 00:01:08 +0800 Subject: [PATCH] added checkSQLSyntax in sql filter --- .../interestinglab/waterdrop/filter/Sql.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Sql.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Sql.scala index 6be16ef527c..380e3153dc4 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Sql.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/filter/Sql.scala @@ -2,6 +2,7 @@ package io.github.interestinglab.waterdrop.filter import com.typesafe.config.{Config, ConfigFactory} import io.github.interestinglab.waterdrop.apis.BaseFilter +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.{Dataset, Row, SparkSession} class Sql extends BaseFilter { @@ -24,11 +25,29 @@ class Sql extends BaseFilter { override def checkConfig(): (Boolean, String) = { conf.hasPath("table_name") && conf.hasPath("sql") match { - case true => (true, "") + case true => checkSQLSyntax(conf.getString("sql")) case false => (false, "please specify [table_name] and [sql]") } } + private def checkSQLSyntax(sql: String): (Boolean, String) = { + val sparkSession = SparkSession.builder.getOrCreate + val logicalPlan = sparkSession.sessionState.sqlParser.parsePlan(sql) + + if (!logicalPlan.analyzed) { + val logicPlanStr = logicalPlan.toString + logicPlanStr.toLowerCase.contains("unresolvedrelation") match { + case true => (true, "") + case false => { + val msg = "config[sql] cannot be passed through sql parser, sql[" + sql + "], logicPlan: \n" + logicPlanStr + (false, msg) + } + } + } else { + (true, "") + } + } + override def process(spark: SparkSession, df: Dataset[Row]): Dataset[Row] = { df.createOrReplaceTempView(this.conf.getString("table_name")) spark.sql(conf.getString("sql"))