Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@
@StateModelInfo(initialState = "OFFLINE", states = {"LEADER", "STANDBY"})
public class DistClusterControllerStateModel extends AbstractHelixLeaderStandbyStateModel {
private static Logger logger = LoggerFactory.getLogger(DistClusterControllerStateModel.class);
protected Optional<HelixManager> _controllerOpt = Optional.empty();
protected volatile Optional<HelixManager> _controllerOpt = Optional.empty();
private final Set<Pipeline.Type> _enabledPipelineTypes;

// dedicated lock object to avoid cross-instance contention from Optional.empty() singleton
private final Object _controllerLock = new Object();

public DistClusterControllerStateModel(String zkAddr) {
this(zkAddr, Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK));
}
Expand All @@ -62,7 +65,7 @@ public void onBecomeLeaderFromStandby(Message message, NotificationContext conte

logger.info(controllerName + " becoming leader from standby for " + clusterName);

synchronized (_controllerOpt) {
synchronized (_controllerLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Optional.empty() is the root cause of locking problem, why not just make the function synchronized without creating new lock object?

Copy link
Author

@LZD-PratyushBhatt LZD-PratyushBhatt Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Appreciate the suggestion @junkaixue, that’s a fair point. I agree that synchronized at method level on this would technically provide mutual exclusion in this case.

I opted for a dedicated lock object (_controllerLock) to keep the synchronization scope narrowly focused on controller lifecycle transitions, and also I didnt want to change the already implemented dedicated lock logic. This makes the locking intent more explicit and avoids potential contention if other unrelated synchronized methods are ever added to this class (either now or later, or lets say by mistake).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know what you think, if we are sure this class will be small in size in future and not much complex, then yeah synchronized methods make more sense then.

Copy link

@laxman-ch laxman-ch Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junkaixue : a new lock based implementation is close to the existing implementation while fixing the problem on the lock contention issue found as mentioned in the issue (due to JDK implementation of Optional.empty as singleton).

imho, making the methods synchronized may be of larger scope as we need to relook at the thread safety of other instance variables in current class and its base classes. So, if you suggest to fix this, should this be addressed in a different issue and PR.

if (!_controllerOpt.isPresent()) {
HelixManager newController = HelixManagerFactory
.getZKHelixManager(clusterName, controllerName, InstanceType.CONTROLLER, _zkAddr);
Expand Down Expand Up @@ -112,7 +115,7 @@ public String getStateModeInstanceDescription(String partitionName, String insta

@Override
public void reset() {
synchronized (_controllerOpt) {
synchronized (_controllerLock) {
if (_controllerOpt.isPresent()) {
logger.info("Disconnecting controller: " + _controllerOpt.get().getInstanceName() + " for "
+ _controllerOpt.get().getClusterName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,19 @@
import org.apache.helix.model.Message.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class TestDistControllerStateModel extends ZkUnitTestBase {
private static Logger LOG = LoggerFactory.getLogger(TestDistControllerStateModel.class);

Expand Down Expand Up @@ -124,4 +133,136 @@ public void testReset() {
stateModel.reset();
}

/**
* Test to verify that different DistClusterControllerStateModel instances
* use separate lock objects, ensuring no cross-instance blocking.
*/
@Test()
public void testNoSharedLockAcrossInstances() throws Exception {
LOG.info("Testing that lock objects are not shared across DistClusterControllerStateModel instances");

// Verify different instances have different lock objects
DistClusterControllerStateModel instance1 = new DistClusterControllerStateModel(ZK_ADDR);
DistClusterControllerStateModel instance2 = new DistClusterControllerStateModel(ZK_ADDR);

Field lockField = DistClusterControllerStateModel.class.getDeclaredField("_controllerLock");
lockField.setAccessible(true);

Object lock1 = lockField.get(instance1);
Object lock2 = lockField.get(instance2);

Assert.assertNotNull(lock1, "First instance should have a lock object");
Assert.assertNotNull(lock2, "Second instance should have a lock object");
Assert.assertNotSame(lock1, lock2, "Different instances must have different lock objects");

// Verify concurrent access doesn't block across instances
final int NUM_INSTANCES = 10;
ExecutorService executor = Executors.newFixedThreadPool(NUM_INSTANCES);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch completionLatch = new CountDownLatch(NUM_INSTANCES);
AtomicInteger completedInstances = new AtomicInteger(0);

for (int i = 0; i < NUM_INSTANCES; i++) {
final int instanceId = i;
final DistClusterControllerStateModel instance = new DistClusterControllerStateModel(ZK_ADDR);

executor.submit(() -> {
try {
startLatch.await(); // wait for all threads to be ready

// Simulate state transition operations that would use the lock
synchronized (lockField.get(instance)) {
// hold the lock here briefly to simulate real state transition work
Thread.sleep(100);
completedInstances.incrementAndGet();
}

} catch (Exception e) {
LOG.error("Instance {} failed during concurrent test", instanceId, e);
} finally {
completionLatch.countDown();
}
});
}

// start all threads simultaneously
startLatch.countDown();

// All instances should complete within reasonable time since they don't block each other
boolean allCompleted = completionLatch.await(500, TimeUnit.MILLISECONDS);

executor.shutdown();
executor.awaitTermination(2, TimeUnit.SECONDS);

Assert.assertTrue(allCompleted, "All instances should complete without blocking each other");
Assert.assertEquals(completedInstances.get(), NUM_INSTANCES,
"All instances should successfully complete their synchronized work");
}

/**
* Explicit test to verify that while one instance holds its lock indefinitely,
* another instance with a different lock can complete immediately.
*/
@Test()
public void testExplicitLockIndependence() throws Exception {
LOG.info("Testing explicit lock independence - one blocked, other should complete");

DistClusterControllerStateModel instance1 = new DistClusterControllerStateModel(ZK_ADDR);
DistClusterControllerStateModel instance2 = new DistClusterControllerStateModel(ZK_ADDR);

Field lockField = DistClusterControllerStateModel.class.getDeclaredField("_controllerLock");
lockField.setAccessible(true);

Object lock1 = lockField.get(instance1);
Object lock2 = lockField.get(instance2);

Assert.assertNotSame(lock1, lock2, "Different instances must have different lock objects");

CountDownLatch instance1Started = new CountDownLatch(1);
CountDownLatch instance2Completed = new CountDownLatch(1);
AtomicBoolean instance1Interrupted = new AtomicBoolean(false);

// Thread 1: Hold lock1 for 5 seconds
Thread thread1 = new Thread(() -> {
try {
synchronized (lock1) {
instance1Started.countDown();
Thread.sleep(5000); // Hold much longer than test timeout
}
} catch (InterruptedException e) {
instance1Interrupted.set(true);
Thread.currentThread().interrupt();
}
}, "BlockingThread");

// Thread 2: Should complete immediately since it uses lock2
Thread thread2 = new Thread(() -> {
try {
instance1Started.await(1000, TimeUnit.MILLISECONDS); // Wait for thread1 to acquire lock1
synchronized (lock2) {
// Should acquire immediately since lock2 != lock1
Thread.sleep(50);
instance2Completed.countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "NonBlockingThread");

thread1.start();
thread2.start();

// Instance2 should complete immediately even though instance1 is blocked
boolean instance2CompletedQuickly = instance2Completed.await(200, TimeUnit.MILLISECONDS);

// Clean up
thread1.interrupt();
thread1.join(1000);
thread2.join(1000);

Assert.assertTrue(instance2CompletedQuickly,
"Instance2 should complete immediately, proving locks are not shared");
Assert.assertTrue(instance1Interrupted.get(),
"Instance1 should have been interrupted while holding its lock");
}
}