-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1369280 Close channels asynchronously #841
SNOW-1369280 Close channels asynchronously #841
Conversation
37db638
to
b10f038
Compare
src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
Show resolved
Hide resolved
// Whether to close streaming channels in parallel. | ||
public static final String SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL = | ||
"snowflake.streaming.closeChannelsInParallel.enabled"; | ||
public static final boolean SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL_DEFAULT = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't feel like a configuration should be exposed to customer. As a customer, I don't know whether I should close the channels in parallel or not. In fact, I don't really care, I just want my system to run without any failure. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed but I think we want to try this out for customers who has a lot of partitions in their single task. so enabling by default will not benefit a lot of customers
- also its good to have param protection.
- We should stress test this and slow roll out in couple of releases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the risk of having it default true and then getting rid of the parameter in a future release?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-tzhang
This is not a publicly announced parameter. It's going to behave as a knob that's going to allow tho customer to easily roll the change back in case something goes wrong. Exactly what @sfc-gh-japatel wrote.
@sfc-gh-xhuang
Imo it's better to be safe than sorry. I'm not sure how the parallelization is going to work on the customer environments as channel closing is assigned to a shared ForkJoinPool
that seems to exist as a single instance and shared across all tasks on the same host.
I suggest we approach the "change rollout" parameters the following way:
- Protect the change with a parameter and disable it by default. If some customers experience problems that can be solved by the change, they can turn it on.
- Enable the change by default. If some customers experience problems that cause by the change, they can turn it off and communicate to us.
- Get rid of the older implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood but I was just thinking that there is not much difference/value between steps 1 and 2 but yes we can be a bit safer this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this still be a behavior change after we finish step 3? This configuration is gone and customer's KC won't be able to start, then they will need to get the logs to understand why. Assuming they know how to look into the log, then they realize that the configuration is being removed and they will probably need to reach out to Snowflake to understand why it's being removed?
It seems to me a better way is to do a beta release with the fix so customer can try it (or just provide a private jar)? Other customers won't use this version since it's a beta version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-tzhang I don't see any schema validation for configuration properties. We will just stop respecting the snowflake.streaming.closeChannelsInParallel.enabled
config key, but no errors should be observed. So, I suppose there will be no behavioral change afterwards.
Or do I miss anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, then that sounds better, thanks for confirming.
src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
Outdated
Show resolved
Hide resolved
test/rest_request_template/schema_evolution_nullable_values_after_smt.json
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added couple of comments but overall LGTM!
b10f038
to
48922a6
Compare
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
// Whether to close streaming channels in parallel. | ||
public static final String SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL = | ||
"snowflake.streaming.closeChannelsInParallel.enabled"; | ||
public static final boolean SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL_DEFAULT = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this still be a behavior change after we finish step 3? This configuration is gone and customer's KC won't be able to start, then they will need to get the logs to understand why. Assuming they know how to look into the log, then they realize that the configuration is being removed and they will probably need to reach out to Snowflake to understand why it's being removed?
It seems to me a better way is to do a beta release with the fix so customer can try it (or just provide a private jar)? Other customers won't use this version since it's a beta version.
|
||
StreamingClientProvider.getStreamingClientProviderInstance() | ||
.closeClient(this.connectorConfig, this.streamingIngestClient); | ||
private void closeAllInParallel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think about more I'm not sure this approach would help a lot. If you look into the close function in the SDK, it will flush everything in the client and then call get status on this particular channel. If we're doing it in parallel, the same number of flush will be called, every flush will still be run in serial and block other flushes. I thought most of time is spent on waiting for server side to commit, which will stay the same since everything will be flush at once. Have you verified locally that it reduces the close time dramatically with a large number of channels?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @sfc-gh-tzhang, as Lex is OOO, I prepared a short test for that. I am not sure whether the IT is sufficient to give meaningful information, but I ran such a test closing 50 channels. It shows the parallelism speeded the execution up.
13513.7 ms of mean sequential execution time vs 1508.0 of parallel
But the scenario does not cover the case where we have rows present in net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal
buffer (the scenario described by you) - I cannot reproduce it even though I played with the buffering parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, do you mean that closing 50 empty channels in serial takes 13.5s? That doesn't sound right, any idea where it spends most of the time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved to unblock the PR, but I think this change is a risky change since I'm not sure if something weird would happen if we close a few hundreds of channels in parallel, since every close will spin up a background flush thread. Also another concern is calling get status in parallel would potentially overload DS node. I think longer term, the better solution is to add a close function in the SDK which doesn't call flush or get status so it will be very fast. Could we create a JIRA to track this? Thanks!
// Whether to close streaming channels in parallel. | ||
public static final String SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL = | ||
"snowflake.streaming.closeChannelsInParallel.enabled"; | ||
public static final boolean SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL_DEFAULT = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, then that sounds better, thanks for confirming.
|
||
StreamingClientProvider.getStreamingClientProviderInstance() | ||
.closeClient(this.connectorConfig, this.streamingIngestClient); | ||
private void closeAllInParallel() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, do you mean that closing 50 empty channels in serial takes 13.5s? That doesn't sound right, any idea where it spends most of the time?
As this is for better performance, is it a way to monitor it please? This could help debugging crash/rebalance issues. Thanks you, cant wait to test it! |
}) | ||
.toArray(CompletableFuture[]::new); | ||
|
||
CompletableFuture.allOf(futures).join(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my small comprehension, CompletableFuture
uses the ForkJoinPool.commonPool()
as its default thread pool with (Runtime.getRuntime().availableProcessors() - 1)
threads. We can cheat by setting JVM cpu count but I rather suggest to create a thread pools like this:
import java.util.concurrent.*;
private void closeAllInParallel() {
ExecutorService executor = Executors.newFixedThreadPool(CONFIG_THREAD_POOL_SIZE);
CompletableFuture<?>[] futures =
partitionsToChannel.entrySet().stream()
.map(
entry -> {
String channelKey = entry.getKey();
TopicPartitionChannel topicPartitionChannel = entry.getValue();
LOGGER.info("Closing partition channel:{}", channelKey);
return CompletableFuture.runAsync(topicPartitionChannel::closeChannelAsync, executor);
})
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
executor.shutdown(); // Don't forget to shutdown the executor
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @ebuildy, for providing an example. Let me merge the PR in its current form, I'll check with the SDK team on a possible built-in solution. If the timeline is not optimistic, I'll propose this change to the rest of the team.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fantastic! Any plan to do the same for open channel operations?
The default thread pool used by
Would be better to create a static thread pool |
Thanks, created SNOW-1437461 |
Overview
SNOW-1369280
By closing channels in parallel, speeds up task rebalancing. As in #839,
SFException
is ignored and doesn't fail a connector task.Pre-review checklist
snowflake.streaming.closeChannelsInParallel.enabled
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected