Skip to content

Commit

Permalink
[FLINK-36607][table-planner] Introduce configuration options for adap…
Browse files Browse the repository at this point in the history
…tive broadcast join.
  • Loading branch information
SinBex authored and zhuzhurk committed Jan 3, 2025
1 parent c3daf84 commit 2b48260
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.optimizer.adaptive-broadcast-join.strategy</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
<td>Flink will perform broadcast hash join optimization when the runtime statistics on one side of a join operator is less than the threshold `table.optimizer.join.broadcast-threshold`. The value of this configuration option decides when Flink should perform this optimization. AUTO means Flink will automatically choose the timing for optimization, RUNTIME_ONLY means broadcast hash join optimization is only performed at runtime, and NONE means the optimization is only carried out at compile time.<br /><br />Possible values:<ul><li>"auto": Flink will automatically choose the timing for optimization</li><li>"runtime_only": Broadcast hash join optimization is only performed at runtime.</li><li>"none": Broadcast hash join optimization is only carried out at compile time.</li></ul></td>
</tr>
<tr>
<td><h5>table.optimizer.agg-phase-strategy</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">AUTO</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;

/**
* This class holds configuration constants used by Flink's table planner module.
Expand Down Expand Up @@ -158,6 +161,22 @@ public class OptimizerConfigOptions {
"A flag to enable or disable the runtime filter. "
+ "When it is true, the optimizer will try to inject a runtime filter for eligible join.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
public static final ConfigOption<AdaptiveBroadcastJoinStrategy>
TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY =
key("table.optimizer.adaptive-broadcast-join.strategy")
.enumType(AdaptiveBroadcastJoinStrategy.class)
.defaultValue(AdaptiveBroadcastJoinStrategy.NONE)
.withDescription(
"Flink will perform broadcast hash join optimization when the runtime "
+ "statistics on one side of a join operator is less than the "
+ "threshold `table.optimizer.join.broadcast-threshold`. The "
+ "value of this configuration option decides when Flink should "
+ "perform this optimization. AUTO means Flink will automatically "
+ "choose the timing for optimization, RUNTIME_ONLY means broadcast "
+ "hash join optimization is only performed at runtime, and NONE "
+ "means the optimization is only carried out at compile time.");

/**
* The data volume of build side needs to be under this value. If the data volume of build side
* is too large, the building overhead will be too large, which may lead to a negative impact on
Expand Down Expand Up @@ -304,4 +323,33 @@ public enum NonDeterministicUpdateStrategy {
*/
IGNORE
}

/** Strategies used for {@link #TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY}. */
@PublicEvolving
public enum AdaptiveBroadcastJoinStrategy implements DescribedEnum {
AUTO("auto", text("Flink will automatically choose the timing for optimization")),
RUNTIME_ONLY(
"runtime_only",
text("Broadcast hash join optimization is only performed at runtime.")),
NONE("none", text("Broadcast hash join optimization is only carried out at compile time."));

private final String value;

private final InlineElement description;

AdaptiveBroadcastJoinStrategy(String value, InlineElement description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}

@Override
public InlineElement getDescription() {
return description;
}
}
}

0 comments on commit 2b48260

Please sign in to comment.