Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
Expand Down Expand Up @@ -79,6 +81,10 @@
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.zookeeper.server.ContainerManager;
import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -98,7 +104,9 @@ public class ZkTestBase {
protected static HelixZkClient _gZkClient;
protected static ClusterSetup _gSetupTool;
protected static BaseDataAccessor<ZNRecord> _baseAccessor;
protected static ContainerManager _containerManager;
protected static MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer();
protected static AtomicLong _fakeElapsed = new AtomicLong(0);

private final Map<String, Map<String, HelixZkClient>> _liveInstanceOwners = new HashMap<>();

Expand All @@ -116,6 +124,7 @@ public class ZkTestBase {
*/
// The following maps hold ZK connect string as keys
protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>();
protected static final Map<ZkServer, ContainerManager> _zkServerContainerManagerMap = new HashMap<>();
protected static final Map<String, HelixZkClient> _helixZkClientMap = new HashMap<>();
protected static final Map<String, ClusterSetup> _clusterSetupMap = new HashMap<>();
protected static final Map<String, BaseDataAccessor> _baseDataAccessorMap = new HashMap<>();
Expand Down Expand Up @@ -170,6 +179,7 @@ public void beforeSuite() throws Exception {
_gZkClient = _helixZkClientMap.get(ZK_ADDR);
_gSetupTool = _clusterSetupMap.get(ZK_ADDR);
_baseAccessor = _baseDataAccessorMap.get(ZK_ADDR);
_containerManager = _zkServerContainerManagerMap.get(_zkServer);

// Clean up all JMX objects
for (ObjectName mbean : _server.queryNames(null, null)) {
Expand All @@ -192,6 +202,40 @@ private static synchronized void startZooKeeper(int i) {
_helixZkClientMap.computeIfAbsent(zkAddress, ZkTestBase::createZkClient);
_clusterSetupMap.computeIfAbsent(zkAddress, key -> new ClusterSetup(_helixZkClientMap.get(key)));
_baseDataAccessorMap.computeIfAbsent(zkAddress, key -> new ZkBaseDataAccessor(_helixZkClientMap.get(key)));
_zkServerContainerManagerMap.computeIfAbsent(_zkServerMap.get(zkAddress), ZkTestBase::createContainerManager);
}

/**
* Advances the fake elapsed time used by the ContainerManager
* @param additionalTime time to add in milliseconds
*/
public static void advanceFakeElapsedTime(long additionalTime) {
_fakeElapsed.addAndGet(additionalTime);
}

private static ContainerManager createContainerManager(ZkServer zkServer) {
try {
ZooKeeperServer zooKeeperServer = zkServer.getZooKeeperServer();

Field firstProcessorField = ZooKeeperServer.class.getDeclaredField("firstProcessor");
firstProcessorField.setAccessible(true);
RequestProcessor firstProcessor = (RequestProcessor) firstProcessorField.get(zooKeeperServer);

// Create a ContainerManager with a custom elapsed time logic
return new ContainerManager(
zooKeeperServer.getZKDatabase(),
firstProcessor,
100, // Check interval in ms
100 // Max containers to check per interval
) {
@Override
protected long getElapsed(DataNode node) {
return _fakeElapsed.get();
}
};
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Failed to access firstProcessor field in ZooKeeperServer", e);
}
}

private static ZkServer createZookeeperServer(String zkAddress) {
Expand Down Expand Up @@ -226,6 +270,7 @@ public void afterSuite() throws IOException {
_clusterSetupMap.values().forEach(ClusterSetup::close);
_helixZkClientMap.values().forEach(HelixZkClient::close);
_zkServerMap.values().forEach(TestHelper::stopZkServer);
_zkServerContainerManagerMap.values().forEach(ContainerManager::stop);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void testSyncCreate() {
}

@Test
public void testSyncCreateWithTTL() {
public void testSyncCreateWithTTL() throws InterruptedException {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
Expand Down Expand Up @@ -271,6 +271,10 @@ public void testSyncCreateWithTTL() {
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getSimpleFields().size(), 0);

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
_containerManager.checkContainers();
Assert.assertFalse(accessor.exists(path, 0));
System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
Expand Down Expand Up @@ -300,7 +304,6 @@ public void testSyncCreateContainer() {
getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getSimpleFields().size(), 0);

System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,13 @@ public void shutdown() {
public ZkClient getZkClient() {
return _zkClient;
}

/**
* Get the ZooKeeper server instance.
* @return The ZooKeeper server instance
*/
public ZooKeeperServer getZooKeeperServer() {
return _zk;
}
Comment on lines +158 to +160
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using Helix style to format it? It looked wired. For the place you touched, we should fix it. Maybe it was not handled before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junkaixue , What do we mean by "helix style" here? I had earlier formatted the method in same way just as the getZkClient method above this.


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Field;
import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;

import org.apache.commons.io.FileUtils;
import org.apache.helix.zookeeper.constant.TestConstants;
import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.zookeeper.server.ContainerManager;
import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -60,8 +66,18 @@ public class ZkTestBase {
*/
// The following maps hold ZK connect string as keys
protected static final Map<String, ZkServer> _zkServerMap = new ConcurrentHashMap<>();
protected static final Map<ZkServer, ContainerManager> _zkServerContainerManagerMap = new ConcurrentHashMap<>();
protected static AtomicLong _fakeElapsed = new AtomicLong(0);
protected static int _numZk = 1; // Initial value

/**
* Advances the fake elapsed time used by the ContainerManager
* @param additionalTime time to add in milliseconds
*/
public static void advanceFakeElapsedTime(long additionalTime) {
_fakeElapsed.addAndGet(additionalTime);
}

@BeforeSuite
public void beforeSuite() throws IOException {
// Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
Expand Down Expand Up @@ -91,6 +107,9 @@ public void afterSuite() throws IOException {
}
}

// Shut down ContainerManagers
_zkServerContainerManagerMap.values().forEach(ContainerManager::stop);

// Shut down all ZkServers
_zkServerMap.values().forEach(ZkServer::shutdown);
}
Expand All @@ -115,6 +134,36 @@ private void setupZooKeepers() {
for (int i = 0; i < _numZk; i++) {
String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
_zkServerMap.computeIfAbsent(zkAddress, ZkTestBase::startZkServer);
_zkServerContainerManagerMap.computeIfAbsent(_zkServerMap.get(zkAddress), ZkTestBase::createContainerManager);
}
}

/**
* Creates a ContainerManager with custom elapsed time functionality for a ZkServer
*/
private static ContainerManager createContainerManager(ZkServer zkServer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function has been repeated. Better make it as Util or in base class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is in helix-core module and other is in zookeeper-api. And in zookeeper-api I dont see any packaging of test classes being done. So if we make this common then we will need to modify pom.xml in some places.

try {
ZooKeeperServer zooKeeperServer = zkServer.getZooKeeperServer();

Field firstProcessorField = ZooKeeperServer.class.getDeclaredField("firstProcessor");
firstProcessorField.setAccessible(true);
RequestProcessor firstProcessor = (RequestProcessor) firstProcessorField.get(zooKeeperServer);

// Create a ContainerManager with a custom elapsed time logic
return new ContainerManager(
zooKeeperServer.getZKDatabase(),
firstProcessor,
10, // Check interval in ms
100, // Max containers to check per interval
10 // the max time in milliseconds that a container that has never had any children is retained
) {
@Override
protected long getElapsed(DataNode node) {
return _fakeElapsed.get();
}
};
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Failed to access firstProcessor field in ZooKeeperServer", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.ZkTestBase;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.zookeeper.server.ContainerManager;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -138,7 +140,7 @@ public void testRealmAwareZkClientCreateContainer() {
* Test creating a sequential TTL node.
*/
@Test(dependsOnMethods = "testRealmAwareZkClientCreateContainer")
public void testRealmAwareZkClientCreateSequentialWithTTL() {
public void testRealmAwareZkClientCreateSequentialWithTTL() throws InterruptedException {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
// Test writing and reading data
_realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
Expand All @@ -149,6 +151,12 @@ public void testRealmAwareZkClientCreateSequentialWithTTL() {
Assert.assertEquals(DUMMY_RECORD.getSimpleField("Dummy"),
retrievedRecord.getSimpleField("Dummy"));

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
ContainerManager containerManager = _zkServerContainerManagerMap.get(_zkServerMap.get(ZkTestBase.ZK_ADDR));
containerManager.checkContainers();
Assert.assertFalse(_realmAwareZkClient.exists(childPath));

// Clean up
_realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
System.clearProperty("zookeeper.extendedTypesEnabled");
Expand All @@ -158,7 +166,7 @@ public void testRealmAwareZkClientCreateSequentialWithTTL() {
* Test creating a TTL node.
*/
@Test(dependsOnMethods = "testRealmAwareZkClientCreateSequentialWithTTL")
public void testRealmAwareZkClientCreateWithTTL() {
public void testRealmAwareZkClientCreateWithTTL() throws InterruptedException {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
// Test with createParents = true
long ttl = 1L;
Expand All @@ -172,6 +180,12 @@ public void testRealmAwareZkClientCreateWithTTL() {
Assert.assertEquals(DUMMY_RECORD.getSimpleField("Dummy"),
retrievedRecord.getSimpleField("Dummy"));

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
ContainerManager containerManager = _zkServerContainerManagerMap.get(_zkServerMap.get(ZkTestBase.ZK_ADDR));
containerManager.checkContainers();
Assert.assertFalse(_realmAwareZkClient.exists(childPath));

// Clean up
_realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
System.clearProperty("zookeeper.extendedTypesEnabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import org.apache.helix.zookeeper.constant.TestConstants;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.ZkTestBase;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.ContainerManager;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -184,7 +186,7 @@ public void testRealmAwareZkClientCreateContainer() {
* Test creating a sequential TTL node.
*/
@Test(dependsOnMethods = "testRealmAwareZkClientCreateContainer")
public void testRealmAwareZkClientCreateSequentialWithTTL() {
public void testRealmAwareZkClientCreateSequentialWithTTL() throws InterruptedException {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
// Create a dummy ZNRecord
ZNRecord znRecord = new ZNRecord("DummyRecord");
Expand All @@ -199,6 +201,12 @@ public void testRealmAwareZkClientCreateSequentialWithTTL() {
Assert.assertEquals(znRecord.getSimpleField("Dummy"),
retrievedRecord.getSimpleField("Dummy"));

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
ContainerManager containerManager = _zkServerContainerManagerMap.get(_zkServerMap.get(ZkTestBase.ZK_ADDR));
containerManager.checkContainers();
Assert.assertFalse(_realmAwareZkClient.exists(childPath));

// Clean up
_realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
System.clearProperty("zookeeper.extendedTypesEnabled");
Expand All @@ -208,7 +216,7 @@ public void testRealmAwareZkClientCreateSequentialWithTTL() {
* Test creating a TTL node.
*/
@Test(dependsOnMethods = "testRealmAwareZkClientCreateSequentialWithTTL")
public void testRealmAwareZkClientCreateWithTTL() {
public void testRealmAwareZkClientCreateWithTTL() throws InterruptedException {
System.setProperty("zookeeper.extendedTypesEnabled", "true");
// Create a dummy ZNRecord
ZNRecord znRecord = new ZNRecord("DummyRecord");
Expand All @@ -226,6 +234,12 @@ public void testRealmAwareZkClientCreateWithTTL() {
Assert.assertEquals(znRecord.getSimpleField("Dummy"),
retrievedRecord.getSimpleField("Dummy"));

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
ContainerManager containerManager = _zkServerContainerManagerMap.get(_zkServerMap.get(ZkTestBase.ZK_ADDR));
containerManager.checkContainers();
Assert.assertFalse(_realmAwareZkClient.exists(childPath));

// Clean up
_realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
System.clearProperty("zookeeper.extendedTypesEnabled");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ContainerManager;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -115,7 +116,7 @@ void testUnimplementedTypes() {
}

@Test
void testCreatePersistentWithTTL() {
void testCreatePersistentWithTTL() throws InterruptedException {
// Enable extended types and create a ZkClient
System.setProperty("zookeeper.extendedTypesEnabled", "true");
ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
Expand Down Expand Up @@ -145,14 +146,19 @@ void testCreatePersistentWithTTL() {
zkClient.createPersistentWithTTL(path, true, ttl);
AssertJUnit.assertTrue(zkClient.exists(path));

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
ContainerManager containerManager = _zkServerContainerManagerMap.get(_zkServerMap.get(ZkTestBase.ZK_ADDR));
containerManager.checkContainers();

// Clean up
zkClient.deleteRecursively(parentPath);
zkClient.close();
System.clearProperty("zookeeper.extendedTypesEnabled");
}

@Test
void testCreatePersistentSequentialWithTTL() {
void testCreatePersistentSequentialWithTTL() throws InterruptedException {
// Enable extended types and create a ZkClient
System.setProperty("zookeeper.extendedTypesEnabled", "true");
ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
Expand All @@ -178,6 +184,11 @@ void testCreatePersistentSequentialWithTTL() {
ZNRecord retrievedRecord = zkClient.readData(path + "0000000000");
AssertJUnit.assertEquals(value, retrievedRecord.getSimpleField(key));

// Check if the TTL znode expires or not.
advanceFakeElapsedTime(2000);
ContainerManager containerManager = _zkServerContainerManagerMap.get(_zkServerMap.get(ZkTestBase.ZK_ADDR));
containerManager.checkContainers();

// Clean up
zkClient.deleteRecursively(parentPath);
zkClient.close();
Expand Down