Skip to content

Commit bed0942

Browse files
committed
Spark 4.0: Display write metrics on SQL UI
1 parent 78156e7 commit bed0942

29 files changed

+2242
-1
lines changed

11340.diff

Lines changed: 1264 additions & 0 deletions
Large diffs are not rendered by default.

core/src/main/java/org/apache/iceberg/BaseTable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.iceberg.io.LocationProvider;
2828
import org.apache.iceberg.metrics.LoggingMetricsReporter;
2929
import org.apache.iceberg.metrics.MetricsReporter;
30+
import org.apache.iceberg.metrics.MetricsReporters;
3031
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3132

3233
/**
@@ -41,7 +42,7 @@
4142
public class BaseTable implements Table, HasTableOperations, Serializable {
4243
private final TableOperations ops;
4344
private final String name;
44-
private final MetricsReporter reporter;
45+
private MetricsReporter reporter;
4546

4647
public BaseTable(TableOperations ops, String name) {
4748
this(ops, name, LoggingMetricsReporter.instance());
@@ -58,6 +59,10 @@ public MetricsReporter reporter() {
5859
return reporter;
5960
}
6061

62+
public void combineMetricsReporter(MetricsReporter metricsReporter) {
63+
this.reporter = MetricsReporters.combine(this.reporter, metricsReporter);
64+
}
65+
6166
@Override
6267
public TableOperations operations() {
6368
return ops;

core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.metrics;
2020

21+
import javax.annotation.Nullable;
2122
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2223

2324
public class InMemoryMetricsReporter implements MetricsReporter {
@@ -35,4 +36,13 @@ public ScanReport scanReport() {
3536
"Metrics report is not a scan report");
3637
return (ScanReport) metricsReport;
3738
}
39+
40+
@Nullable
41+
public CommitReport commitReport() {
42+
if (metricsReport instanceof CommitReport) {
43+
return (CommitReport) metricsReport;
44+
} else {
45+
return null;
46+
}
47+
}
3848
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/SparkWriteUtil.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,38 @@
2323
import static org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE;
2424

2525
import java.util.Arrays;
26+
import java.util.List;
2627
import org.apache.iceberg.DistributionMode;
2728
import org.apache.iceberg.MetadataColumns;
2829
import org.apache.iceberg.Table;
30+
import org.apache.iceberg.metrics.CommitMetricsResult;
31+
import org.apache.iceberg.metrics.CommitReport;
32+
import org.apache.iceberg.metrics.CounterResult;
33+
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
34+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2935
import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
36+
import org.apache.iceberg.spark.source.metrics.AddedDataFiles;
37+
import org.apache.iceberg.spark.source.metrics.AddedDeleteFiles;
38+
import org.apache.iceberg.spark.source.metrics.AddedEqualityDeleteFiles;
39+
import org.apache.iceberg.spark.source.metrics.AddedEqualityDeletes;
40+
import org.apache.iceberg.spark.source.metrics.AddedFileSizeInBytes;
41+
import org.apache.iceberg.spark.source.metrics.AddedPositionalDeleteFiles;
42+
import org.apache.iceberg.spark.source.metrics.AddedPositionalDeletes;
43+
import org.apache.iceberg.spark.source.metrics.AddedRecords;
44+
import org.apache.iceberg.spark.source.metrics.RemovedDataFiles;
45+
import org.apache.iceberg.spark.source.metrics.RemovedDeleteFiles;
46+
import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeleteFiles;
47+
import org.apache.iceberg.spark.source.metrics.RemovedEqualityDeletes;
48+
import org.apache.iceberg.spark.source.metrics.RemovedFileSizeInBytes;
49+
import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeleteFiles;
50+
import org.apache.iceberg.spark.source.metrics.RemovedPositionalDeletes;
51+
import org.apache.iceberg.spark.source.metrics.RemovedRecords;
52+
import org.apache.iceberg.spark.source.metrics.TotalDataFiles;
53+
import org.apache.iceberg.spark.source.metrics.TotalDeleteFiles;
54+
import org.apache.iceberg.spark.source.metrics.TotalEqualityDeletes;
55+
import org.apache.iceberg.spark.source.metrics.TotalFileSizeInBytes;
56+
import org.apache.iceberg.spark.source.metrics.TotalPositionalDeletes;
57+
import org.apache.iceberg.spark.source.metrics.TotalRecords;
3058
import org.apache.iceberg.types.Types;
3159
import org.apache.iceberg.util.SortOrderUtil;
3260
import org.apache.spark.sql.connector.distributions.Distribution;
@@ -36,6 +64,8 @@
3664
import org.apache.spark.sql.connector.expressions.NamedReference;
3765
import org.apache.spark.sql.connector.expressions.SortDirection;
3866
import org.apache.spark.sql.connector.expressions.SortOrder;
67+
import org.apache.spark.sql.connector.metric.CustomMetric;
68+
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
3969
import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
4070

4171
/**
@@ -256,4 +286,101 @@ private static SortOrder[] orderBy(Expression... exprs) {
256286
private static SortOrder sort(Expression expr) {
257287
return Expressions.sort(expr, SortDirection.ASCENDING);
258288
}
289+
290+
public static CustomMetric[] supportedCustomMetrics() {
291+
return new CustomMetric[] {
292+
new AddedDataFiles(),
293+
new AddedDeleteFiles(),
294+
new AddedEqualityDeletes(),
295+
new AddedEqualityDeleteFiles(),
296+
new AddedFileSizeInBytes(),
297+
new AddedPositionalDeletes(),
298+
new AddedPositionalDeleteFiles(),
299+
new AddedRecords(),
300+
new RemovedDataFiles(),
301+
new RemovedDeleteFiles(),
302+
new RemovedRecords(),
303+
new RemovedEqualityDeleteFiles(),
304+
new RemovedEqualityDeletes(),
305+
new RemovedFileSizeInBytes(),
306+
new RemovedPositionalDeleteFiles(),
307+
new RemovedPositionalDeletes(),
308+
new TotalDataFiles(),
309+
new TotalDeleteFiles(),
310+
new TotalEqualityDeletes(),
311+
new TotalFileSizeInBytes(),
312+
new TotalPositionalDeletes(),
313+
new TotalRecords()
314+
};
315+
}
316+
317+
public static CustomTaskMetric[] customTaskMetrics(InMemoryMetricsReporter metricsReporter) {
318+
List<CustomTaskMetric> taskMetrics = Lists.newArrayList();
319+
if (metricsReporter != null) {
320+
CommitReport commitReport = metricsReporter.commitReport();
321+
if (commitReport != null) {
322+
CommitMetricsResult metricsResult = commitReport.commitMetrics();
323+
addMetricValue(new AddedDataFiles(), metricsResult.addedDataFiles(), taskMetrics);
324+
addMetricValue(new AddedDeleteFiles(), metricsResult.addedDeleteFiles(), taskMetrics);
325+
addMetricValue(
326+
new AddedEqualityDeletes(), metricsResult.addedEqualityDeletes(), taskMetrics);
327+
addMetricValue(
328+
new AddedEqualityDeleteFiles(), metricsResult.addedEqualityDeleteFiles(), taskMetrics);
329+
addMetricValue(
330+
new AddedFileSizeInBytes(), metricsResult.addedFilesSizeInBytes(), taskMetrics);
331+
addMetricValue(
332+
new AddedPositionalDeletes(), metricsResult.addedPositionalDeletes(), taskMetrics);
333+
addMetricValue(
334+
new AddedPositionalDeleteFiles(),
335+
metricsResult.addedPositionalDeleteFiles(),
336+
taskMetrics);
337+
addMetricValue(new AddedRecords(), metricsResult.addedRecords(), taskMetrics);
338+
addMetricValue(new RemovedDataFiles(), metricsResult.removedDataFiles(), taskMetrics);
339+
addMetricValue(new RemovedDeleteFiles(), metricsResult.removedDeleteFiles(), taskMetrics);
340+
addMetricValue(new RemovedRecords(), metricsResult.removedRecords(), taskMetrics);
341+
addMetricValue(
342+
new RemovedEqualityDeleteFiles(),
343+
metricsResult.removedEqualityDeleteFiles(),
344+
taskMetrics);
345+
addMetricValue(
346+
new RemovedEqualityDeletes(), metricsResult.removedEqualityDeletes(), taskMetrics);
347+
addMetricValue(
348+
new RemovedFileSizeInBytes(), metricsResult.removedFilesSizeInBytes(), taskMetrics);
349+
addMetricValue(
350+
new RemovedPositionalDeleteFiles(),
351+
metricsResult.removedPositionalDeleteFiles(),
352+
taskMetrics);
353+
addMetricValue(
354+
new RemovedPositionalDeletes(), metricsResult.removedPositionalDeletes(), taskMetrics);
355+
addMetricValue(new TotalDataFiles(), metricsResult.totalDataFiles(), taskMetrics);
356+
addMetricValue(new TotalDeleteFiles(), metricsResult.totalDeleteFiles(), taskMetrics);
357+
addMetricValue(
358+
new TotalEqualityDeletes(), metricsResult.totalEqualityDeletes(), taskMetrics);
359+
addMetricValue(
360+
new TotalFileSizeInBytes(), metricsResult.totalFilesSizeInBytes(), taskMetrics);
361+
addMetricValue(
362+
new TotalPositionalDeletes(), metricsResult.totalPositionalDeletes(), taskMetrics);
363+
addMetricValue(new TotalRecords(), metricsResult.totalRecords(), taskMetrics);
364+
}
365+
}
366+
return taskMetrics.toArray(new CustomTaskMetric[0]);
367+
}
368+
369+
private static void addMetricValue(
370+
CustomMetric metric, CounterResult result, List<CustomTaskMetric> taskMetrics) {
371+
if (result != null) {
372+
taskMetrics.add(
373+
new CustomTaskMetric() {
374+
@Override
375+
public String name() {
376+
return metric.name();
377+
}
378+
379+
@Override
380+
public long value() {
381+
return result.value();
382+
}
383+
});
384+
}
385+
}
259386
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.List;
3030
import java.util.Map;
3131
import java.util.function.Function;
32+
import org.apache.iceberg.BaseTable;
3233
import org.apache.iceberg.ContentFile;
3334
import org.apache.iceberg.DataFile;
3435
import org.apache.iceberg.DeleteFile;
@@ -65,12 +66,14 @@
6566
import org.apache.iceberg.io.PartitioningWriter;
6667
import org.apache.iceberg.io.PositionDeltaWriter;
6768
import org.apache.iceberg.io.WriteResult;
69+
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
6870
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
6971
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
7072
import org.apache.iceberg.spark.CommitMetadata;
7173
import org.apache.iceberg.spark.SparkSchemaUtil;
7274
import org.apache.iceberg.spark.SparkWriteConf;
7375
import org.apache.iceberg.spark.SparkWriteRequirements;
76+
import org.apache.iceberg.spark.SparkWriteUtil;
7477
import org.apache.iceberg.types.Types;
7578
import org.apache.iceberg.util.CharSequenceSet;
7679
import org.apache.iceberg.util.DeleteFileSet;
@@ -81,6 +84,8 @@
8184
import org.apache.spark.sql.catalyst.InternalRow;
8285
import org.apache.spark.sql.connector.distributions.Distribution;
8386
import org.apache.spark.sql.connector.expressions.SortOrder;
87+
import org.apache.spark.sql.connector.metric.CustomMetric;
88+
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
8489
import org.apache.spark.sql.connector.write.DeltaBatchWrite;
8590
import org.apache.spark.sql.connector.write.DeltaWrite;
8691
import org.apache.spark.sql.connector.write.DeltaWriter;
@@ -113,6 +118,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
113118
private final Map<String, String> writeProperties;
114119

115120
private boolean cleanupOnAbort = false;
121+
private InMemoryMetricsReporter metricsReporter;
116122

117123
SparkPositionDeltaWrite(
118124
SparkSession spark,
@@ -136,6 +142,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrde
136142
this.writeRequirements = writeConf.positionDeltaRequirements(command);
137143
this.context = new Context(dataSchema, writeConf, info, writeRequirements);
138144
this.writeProperties = writeConf.writeProperties();
145+
146+
if (this.table instanceof BaseTable) {
147+
this.metricsReporter = new InMemoryMetricsReporter();
148+
((BaseTable) this.table).combineMetricsReporter(metricsReporter);
149+
}
139150
}
140151

141152
@Override
@@ -169,6 +180,16 @@ public DeltaBatchWrite toBatch() {
169180
return new PositionDeltaBatchWrite();
170181
}
171182

183+
@Override
184+
public CustomMetric[] supportedCustomMetrics() {
185+
return SparkWriteUtil.supportedCustomMetrics();
186+
}
187+
188+
@Override
189+
public CustomTaskMetric[] reportDriverMetrics() {
190+
return SparkWriteUtil.customTaskMetrics(metricsReporter);
191+
}
192+
172193
private class PositionDeltaBatchWrite implements DeltaBatchWrite {
173194

174195
@Override

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Map;
2828
import java.util.stream.Collectors;
2929
import org.apache.iceberg.AppendFiles;
30+
import org.apache.iceberg.BaseTable;
3031
import org.apache.iceberg.DataFile;
3132
import org.apache.iceberg.FileFormat;
3233
import org.apache.iceberg.FileScanTask;
@@ -51,12 +52,14 @@
5152
import org.apache.iceberg.io.OutputFileFactory;
5253
import org.apache.iceberg.io.PartitioningWriter;
5354
import org.apache.iceberg.io.RollingDataWriter;
55+
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
5456
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
5557
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
5658
import org.apache.iceberg.spark.CommitMetadata;
5759
import org.apache.iceberg.spark.FileRewriteCoordinator;
5860
import org.apache.iceberg.spark.SparkWriteConf;
5961
import org.apache.iceberg.spark.SparkWriteRequirements;
62+
import org.apache.iceberg.spark.SparkWriteUtil;
6063
import org.apache.iceberg.util.DataFileSet;
6164
import org.apache.spark.TaskContext;
6265
import org.apache.spark.TaskContext$;
@@ -67,6 +70,7 @@
6770
import org.apache.spark.sql.catalyst.InternalRow;
6871
import org.apache.spark.sql.connector.distributions.Distribution;
6972
import org.apache.spark.sql.connector.expressions.SortOrder;
73+
import org.apache.spark.sql.connector.metric.CustomTaskMetric;
7074
import org.apache.spark.sql.connector.write.BatchWrite;
7175
import org.apache.spark.sql.connector.write.DataWriter;
7276
import org.apache.spark.sql.connector.write.DataWriterFactory;
@@ -103,6 +107,7 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
103107
private final Map<String, String> writeProperties;
104108

105109
private boolean cleanupOnAbort = false;
110+
private InMemoryMetricsReporter metricsReporter;
106111

107112
SparkWrite(
108113
SparkSession spark,
@@ -130,6 +135,11 @@ abstract class SparkWrite implements Write, RequiresDistributionAndOrdering {
130135
this.writeRequirements = writeRequirements;
131136
this.outputSpecId = writeConf.outputSpecId();
132137
this.writeProperties = writeConf.writeProperties();
138+
139+
if (this.table instanceof BaseTable) {
140+
this.metricsReporter = new InMemoryMetricsReporter();
141+
((BaseTable) this.table).combineMetricsReporter(metricsReporter);
142+
}
133143
}
134144

135145
@Override
@@ -260,6 +270,11 @@ private List<DataFile> files(WriterCommitMessage[] messages) {
260270
return files;
261271
}
262272

273+
@Override
274+
public CustomTaskMetric[] reportDriverMetrics() {
275+
return SparkWriteUtil.customTaskMetrics(metricsReporter);
276+
}
277+
263278
@Override
264279
public String toString() {
265280
return String.format("IcebergWrite(table=%s, format=%s)", table, format);

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.apache.iceberg.spark.SparkUtil;
3131
import org.apache.iceberg.spark.SparkWriteConf;
3232
import org.apache.iceberg.spark.SparkWriteRequirements;
33+
import org.apache.iceberg.spark.SparkWriteUtil;
3334
import org.apache.iceberg.types.TypeUtil;
3435
import org.apache.spark.sql.SparkSession;
36+
import org.apache.spark.sql.connector.metric.CustomMetric;
3537
import org.apache.spark.sql.connector.read.Scan;
3638
import org.apache.spark.sql.connector.write.BatchWrite;
3739
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
@@ -159,6 +161,11 @@ public StreamingWrite toStreaming() {
159161
return asStreamingAppend();
160162
}
161163
}
164+
165+
@Override
166+
public CustomMetric[] supportedCustomMetrics() {
167+
return SparkWriteUtil.supportedCustomMetrics();
168+
}
162169
};
163170
}
164171

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.source.metrics;
20+
21+
import org.apache.spark.sql.connector.metric.CustomSumMetric;
22+
23+
public class AddedDataFiles extends CustomSumMetric {
24+
25+
static final String NAME = "addedDataFiles";
26+
27+
@Override
28+
public String name() {
29+
return NAME;
30+
}
31+
32+
@Override
33+
public String description() {
34+
return "number of added data files";
35+
}
36+
}

0 commit comments

Comments
 (0)