Skip to content

GH-10163: Fix async mode dropping errors #10183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

rrileyca
Copy link
Contributor

@rrileyca rrileyca commented Jul 1, 2025

closes #10163

A clean branch fixing several issues from my "corrupt" previous PR #10164

This PR has the RabbitStreamMessageHandler fallback to a sendFailureChannelName of "errorChannel" when a channel name is not set and when it is operating in async mode.

@rrileyca
Copy link
Contributor Author

rrileyca commented Jul 1, 2025

@artembilan here is the clean branch. Sorry again for the confusion.

@rrileyca rrileyca changed the title Fix async mode dropping errors GH-10163: Fix async mode dropping errors Jul 1, 2025
@artembilan
Copy link
Member

A clean branch fixing several issues from my "corrupt" previous PR

Why don't you create a top-level branch as I suggested in our previous discussion?
Why make your life so hard trying to contribute from your main?
The rule of thumb is to never develop in main.
See more info in our Contribution Guidelines: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc

@rrileyca
Copy link
Contributor Author

rrileyca commented Jul 1, 2025

I must admit I don't do much fork style PRs. I will do that in future. Thanks for the advice.

Do you want me to switch to another branch still?

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add your name to the @author list of the affected classes.

protected String getSendFailureChannelNameOrDefault() {
return this.sendFailureChannelName == null ?
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME :
sendFailureChannelName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should do that trick instead of your extra method calls:

	protected @Nullable MessageChannel getSendFailureChannel() {
		if (this.sendFailureChannel == null && (this.sendFailureChannelName != null || !this.sync)) {
			String sendFailureChannelNameToUse = this.sendFailureChannelName;
			if (sendFailureChannelNameToUse == null) {
				sendFailureChannelNameToUse = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;
			}
			this.sendFailureChannel = getChannelResolver().resolveDestination(sendFailureChannelNameToUse);
		}
		return this.sendFailureChannel;
	}

Copy link
Contributor Author

@rrileyca rrileyca Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will slightly modify this, as it doesn't allow for sync mode with a configured sendFailureChannelName - unless that is your intention that in sync mode, a sendFailureChannel shall never be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you are talking about.
See this condition:

this.sendFailureChannelName != null || !this.sync

So, we go down in a sync mode and provided name.
Then we do if (sendFailureChannelNameToUse == null) {.
Which applies would not be true in sync mode and provided name.
What do I miss?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I misread this :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good. So, please, incorporate whatever I've suggestion.
So, far your code is not what we have discussed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

assertThat(errorMessage).isNotNull();
Throwable caughtException = errorMessage.getPayload();
assertThat(caughtException).isInstanceOf(StreamSendException.class);
assertThat(caughtException).isEqualTo(streamException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can combine all of these into a single chained assert:

assertThat(errorMessage).extracting(Message::getPayload).isEqualTo(streamException);

I don't see a reason in extra isInstanceOf() since that isEqualTo() does cover this as well.

.build())
.build();
assertThatThrownBy(() -> handler.handleMessage(testMessage))
.isInstanceOf(MessageHandlingException.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer to use opposite API: assertThatExceptionOfType()

RabbitStreamMessageHandler handler = RabbitStream.outboundStreamAdapter(streamTemplate)
.getObject();

String failureChannelName = handler.getSendFailureChannelNameOrDefault();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly!
I don't want to have any protected methods just to satisfy tests.
You technically covered this code path with that errorChanelAsync().
Which indeed is kind of an integration test, but it also includes a failureChannel path.
Therefore I don't see a reason in extra tests.

@artembilan
Copy link
Member

Do you want me to switch to another branch still?

It depends how far we still would go out of what we have agreed before 😉

Signed-off-by: rrileyca <[email protected]>
@rrileyca
Copy link
Contributor Author

rrileyca commented Jul 1, 2025

It depends how far we still would go out of what we have agreed before 😉

There was no intention to deviate on what we agreed on. This is my first time contributing to Spring, so I have a lot to learn. Thank you for your feedback and patience.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, see that contributing doc: https://github.com/spring-projects/spring-integration/blob/main/CONTRIBUTING.adoc#provide-a-link-to-the-github-issue-in-the-associated-pull-request.

We recommend to have a link to the issue like Fixes: exactly in the commit message.
Whatever you say on PR won't close issue automatically.

Plus, the first commit of the PR becomes very own PR title and description.
Therefore, here is a best practice for Git commits: https://cbea.ms/git-commit/.

And you already know that it is not very convenient to raise a PR against main.
Usually we pull your branch locally for final review and merge.
If that is done from main, then it causes some headache on merge, not only for your local project copy.

@artembilan
Copy link
Member

Now, since we are very close to the merge, tell us, please, if you are going to contribute similar fix for the KafkaProducerMessageHandler.
We have that discussed in the issue and that one is scheduled for Kafka module fix as well.
I don't think we need dedicated tests in that module since the functionality is exactly the same you have done here.
I'm asking for the fix over there as well, since the issue is also scheduled for back-port to previous versions.

Thank you again!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build has failed:

 Error: eckstyle] [ERROR] /home/runner/work/spring-integration/spring-integration/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandlerTests.java:43:59: Using a static member import should be avoided - org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType. [AvoidStaticImport]

Please, run ./gradlew :spring-integration-amqp:check locally before pushing.

@rrileyca
Copy link
Contributor Author

rrileyca commented Jul 1, 2025

tell us, please, if you are going to contribute similar fix for the KafkaProducerMessageHandler.

Yes, I can add that.

And you already know that it is not very convenient to raise a PR against main.

Apologies, I did not know. Does this mean I should create a new PR from the new branch?

@artembilan
Copy link
Member

Does this mean I should create a new PR from the new branch?

No, it's fine so far.
Just would be some inconvenience if I would do clean up myself for you and manual merge locally.
But right now I had to ask you to do all of that boring stuff around the code perfection 😉 .

And then, when we merge, it would be a problem for your local main to bring those your changes back to your copy.
Right now you would need to do reset force or just remove the branch and fetch from upstream.

@rrileyca
Copy link
Contributor Author

rrileyca commented Jul 1, 2025

Well, I have to squash all the commits in order to make a proper first git commit, even if I keep this fork as is (with the incorrect commits on the main branch.

Plus, I hope to make more contributions in the future, so I think it is a good idea to just clean up and make a new fork, with the proper feature branch and properly written first commit.

@rrileyca rrileyca closed this by deleting the head repository Jul 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

RabbitMQ Streams Client Errors Getting Lost
2 participants