Skip to content

Conversation

oneby-wang
Copy link
Contributor

@oneby-wang oneby-wang commented Sep 19, 2025

Fixes #23897

Motivation

Set local policies will overwrite "number of bundles" passed during namespace creation. If we don't call any get namespace policies operation, local policies will not be created, so it will be overwritten.

private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName namespace) {
return pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
.thenCompose(optPolicies -> {
if (!optPolicies.isPresent()) {
return CompletableFuture.completedFuture(getBundles(namespace, Optional.empty()));
}
Policies policies = optPolicies.get();
LocalPolicies localPolicies = new LocalPolicies(policies.bundles,
null,
null);
return pulsar.getPulsarResources().getLocalPolicies()
.createLocalPoliciesAsync(namespace, localPolicies)
.thenApply(stat -> getBundles(namespace,
Optional.of(Pair.of(localPolicies, 0L))));
});
}

The following steps can reproduce the problem.

  1. pulsar-admin namespaces create t/n --bundles 10
  2. pulsar-admin namespaces set-bookie-affinity-group --primary-group temp t/n
  3. pulsar-admin namespaces bundles t/n

But the following steps will not reproduce the problem.

  1. pulsar-admin namespaces create t/n --bundles 10
  2. pulsar-admin namespaces bundles t/n
  3. pulsar-admin namespaces set-bookie-affinity-group --primary-group temp t/n
  4. pulsar-admin namespaces bundles t/n

Modifications

If local policies is not present, first try to use namespace policies bundle data, then fallback to config().getDefaultNumberOfNamespaceBundles()

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: oneby-wang#5

Copy link

@oneby-wang Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Sep 19, 2025
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the PR description, it seems possible to add a test?

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Sep 19, 2025

From the PR description, it seems possible to add a test?

Thanks for advice, I'll add unit tests.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari added this to the 4.2.0 milestone Sep 23, 2025
@Technoboy- Technoboy- changed the title [fix][admin] set local policies overwrites "number of bundles" passed during namespace creation [fix][admin] Set local policies overwrites "number of bundles" passed during namespace creation Sep 24, 2025
@oneby-wang
Copy link
Contributor Author

oneby-wang commented Sep 24, 2025

ExtensibleLoadManagerTest.testAntiaffinityPolicy() unit test failure in fork repository PR.
https://github.com/oneby-wang/pulsar/actions/runs/17970886848/job/51114222121?pr=5

Seems this default bundle config affects this unit test. Previous: static default 1 numBundles. After: namespace policies numBundles.

.orElseGet(() -> new LocalPolicies(defaultBundle(),
null, antiAffinityGroup))

PoliciesUtil.defaultBundle() method.

public static BundlesData defaultBundle() {
List<String> boundaries = new ArrayList<>();
boundaries.add(FIRST_BOUNDARY);
boundaries.add(LAST_BOUNDARY);
return BundlesData.builder()
.numBundles(1)
.boundaries(boundaries)
.build();
}

Seems some bugs in ExtensibleLoadManager or ExtensibleLoadManagerTest?
https://pulsar.apache.org/docs/3.0.x/administration-load-balance/#distribute-anti-affinity-namespaces-across-failure-domains

Not familiar with ExtensibleLoadManager, may give me some entry point, may be I can help fix this problem? Seems here?

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
long startTime = System.nanoTime();
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic)
.thenCompose(bundle -> {
// Do redirection if the cluster is in rollback or deploying.
return findRedirectLookupResultAsync(bundle).thenCompose(optResult -> {
if (optResult.isPresent()) {
LOG.info("[{}] Redirect lookup request to {} for topic {}",
pulsar.getBrokerId(), optResult.get(), topic);
return CompletableFuture.completedFuture(optResult);
}
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle, options);

@oneby-wang
Copy link
Contributor Author

oneby-wang commented Sep 24, 2025

Seems the anti-affinity works at the bundle level.

public static CompletableFuture<Map<String, Integer>> getAntiAffinityNamespaceOwnedBrokers(
final PulsarService pulsar, final String namespaceName,
Set<Map.Entry<String, ServiceUnitStateData>> bundleOwnershipData) {
CompletableFuture<Map<String, Integer>> antiAffinityNsBrokersResult = new CompletableFuture<>();
getNamespaceAntiAffinityGroupAsync(pulsar, namespaceName)
.thenAccept(antiAffinityGroupOptional -> {
if (antiAffinityGroupOptional.isEmpty()) {
antiAffinityNsBrokersResult.complete(null);
return;
}
final String antiAffinityGroup = antiAffinityGroupOptional.get();
final Map<String, Integer> brokerToAntiAffinityNamespaceCount = new ConcurrentHashMap<>();
final List<CompletableFuture<Void>> futures = new ArrayList<>();
bundleOwnershipData
.forEach(etr -> {
var stateData = etr.getValue();
var bundle = etr.getKey();
if (stateData.state() == ServiceUnitState.Owned
&& StringUtils.isNotBlank(stateData.dstBroker())) {
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
countAntiAffinityNamespaceOwnedBrokers
(stateData.dstBroker(),
LoadManagerShared.getNamespaceNameFromBundleName(bundle),
future, pulsar,
antiAffinityGroup, brokerToAntiAffinityNamespaceCount);
}
});
FutureUtil.waitForAll(futures)
.thenAccept(r -> antiAffinityNsBrokersResult.complete(brokerToAntiAffinityNamespaceCount));

I think this unit test may work in a wrong way due to this PR related issue, so I changed numBundles to 1 in createNamespace method.

for (int i = 0; i < activeBrokers.size(); i++) {
String namespace = antiAffinityEnabledNameSpace + "-" + i;
admin.namespaces().createNamespace(namespace, 10);
admin.namespaces().setNamespaceAntiAffinityGroup(namespace, namespaceAntiAffinityGroup);
admin.clusters().createFailureDomain(clusterName, namespaceAntiAffinityGroup, FailureDomain.builder()
.brokers(Set.of(activeBrokers.get(i))).build());
}

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@oneby-wang
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 31.25000% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.16%. Comparing base (8879b3b) to head (1f566f8).
⚠️ Report is 40 commits behind head on master.

Files with missing lines Patch % Lines
...pache/pulsar/broker/admin/impl/NamespacesBase.java 31.25% 10 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24762       +/-   ##
=============================================
+ Coverage     38.31%   74.16%   +35.84%     
- Complexity    13047    33228    +20181     
=============================================
  Files          1844     1904       +60     
  Lines        144274   148520     +4246     
  Branches      16726    17214      +488     
=============================================
+ Hits          55279   110145    +54866     
+ Misses        81481    29583    -51898     
- Partials       7514     8792     +1278     
Flag Coverage Δ
inttests 26.35% <0.00%> (-0.14%) ⬇️
systests 22.75% <0.00%> (+0.02%) ⬆️
unittests 73.67% <31.25%> (+39.20%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...pache/pulsar/broker/admin/impl/NamespacesBase.java 76.31% <31.25%> (+44.87%) ⬆️

... and 1408 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] set-bookie-affinity-group overwrites "number of bundles" passed during namespace creation

6 participants