diff --git a/core/src/main/java/com/airwallex/airskiff/spark/AbstractSparkCompiler.java b/core/src/main/java/com/airwallex/airskiff/spark/AbstractSparkCompiler.java index a7a6c08..e069417 100644 --- a/core/src/main/java/com/airwallex/airskiff/spark/AbstractSparkCompiler.java +++ b/core/src/main/java/com/airwallex/airskiff/spark/AbstractSparkCompiler.java @@ -150,9 +150,9 @@ private Dataset compileSql(SqlStream op) { // forced encoding change Dataset> dataset = compile(op.stream).map((MapFunction, Tuple2>) v1 -> v1, Encoders.tuple(Encoders.LONG(), Utils.encodeBean(op.stream.getClazz()))); - dataset.printSchema(); - dataset.show(); - if (!StringUtils.isBlank(debugDir)) { + if (DEBUG) { + dataset.printSchema(); + dataset.show(); dataset.coalesce(1).write().format("json").save(debugDir + "/sql-input-" + UUID.randomUUID().toString()); } @@ -215,9 +215,11 @@ private Dataset compileSql(SqlStream op) { Column[] cc = new Column[cols.size()]; cc = cols.toArray(cc); Dataset fatDs = fatResult.withColumn("_tempDataStruct", struct(cc)); - fatResult.show(); - fatResult.printSchema(); + if (DEBUG) { + fatResult.show(); + fatResult.printSchema(); + } // drop previously added columns for (Field prevField : fields) { @@ -339,8 +341,10 @@ private Dataset compileWindowedViaSQL(WindowedStream sparkSession.udf().register("riskyAgg", udaf(agg, Utils.encodeBean(inClz))); String tempTableName = "windowedTempTable"; rowDs.createOrReplaceTempView(tempTableName); - rowDs.show(); - rowDs.printSchema(); + if (DEBUG){ + rowDs.show(); + rowDs.printSchema(); + } String query = "select ts, key, riskyAgg(" + valueExpr + ") over (PARTITION BY key ORDER BY ts RANGE BETWEEN " + size + " PRECEDING AND CURRENT ROW) as agg_result from " + tempTableName; Dataset sqlResult = sparkSession.sql(query); Dataset> typedDs = sqlResult.as(Encoders.tuple(Encoders.LONG(), Utils.encodeBean(op.keyClass()), Utils.encodeBean(op.uc))); diff --git a/gradle.properties b/gradle.properties index 8ffb3ad..ac6a98c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group=com.airwallex.airskiff name=core -version=2.0.52 +version=2.0.53 org.gradle.daemon=true org.gradle.parallel=true file.encoding=utf-8