Skip to content

Commit bac8ba9

Browse files
authored
Fix PersistentWatcher not working with NamespaceFacade (#1262)
`NamespaceFacade` does not support `getCuratorListenable` while #520 use it to listen for `CuratorEventType.CLOSING` to fix CURATOR-729. This commit exports `CuratorFrameworkBase::client` to retrieve underlying framework client to listen for for `CuratorEventType.CLOSING`. Fixes #1259.
1 parent 3f631ac commit bac8ba9

File tree

5 files changed

+57
-1
lines changed

5 files changed

+57
-1
lines changed

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@
5858
public abstract class CuratorFrameworkBase implements CuratorFramework {
5959
abstract NamespaceImpl getNamespaceImpl();
6060

61+
/**
62+
* Return the underlying client which is the one constructed from {@link org.apache.curator.framework.CuratorFrameworkFactory}.
63+
*/
64+
public abstract CuratorFramework client();
65+
6166
@Override
6267
public final CuratorFramework nonNamespaceView() {
6368
return usingNamespace(null);

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ public void clearWatcherReferences(Watcher watcher) {
254254
// NOP
255255
}
256256

257+
@Override
258+
public CuratorFramework client() {
259+
return this;
260+
}
261+
257262
@Override
258263
public CuratorFrameworkState getState() {
259264
return state.get();

curator-framework/src/main/java/org/apache/curator/framework/imps/DelegatingCuratorFramework.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public DelegatingCuratorFramework(CuratorFrameworkBase client) {
4747
this.client = client;
4848
}
4949

50+
@Override
51+
public final CuratorFramework client() {
52+
return client.client();
53+
}
54+
5055
@Override
5156
public CuratorFrameworkState getState() {
5257
return client.getState();

curator-recipes/src/main/java/org/apache/curator/framework/recipes/watch/PersistentWatcher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.curator.framework.api.BackgroundCallback;
2828
import org.apache.curator.framework.api.CuratorClosedException;
2929
import org.apache.curator.framework.api.CuratorEventType;
30+
import org.apache.curator.framework.imps.CuratorFrameworkBase;
3031
import org.apache.curator.framework.listen.Listenable;
3132
import org.apache.curator.framework.listen.StandardListenerManager;
3233
import org.apache.curator.framework.state.ConnectionStateListener;
@@ -80,7 +81,8 @@ public PersistentWatcher(CuratorFramework client, String basePath, boolean recur
8081
public void start() {
8182
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Already started");
8283
client.getConnectionStateListenable().addListener(connectionStateListener);
83-
client.getCuratorListenable().addListener(((ignored, event) -> {
84+
// This could be a namespaced facade which does not support getCuratorListenable.
85+
((CuratorFrameworkBase) client).client().getCuratorListenable().addListener(((ignored, event) -> {
8486
if (event.getType() == CuratorEventType.CLOSING) {
8587
onClientClosed();
8688
}

curator-recipes/src/test/java/org/apache/curator/framework/recipes/watch/TestPersistentWatcher.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,45 @@ public void testConnectionLost() throws Exception {
4848
internalTest(false);
4949
}
5050

51+
@Test
52+
public void testNamespacedWatching() throws Exception {
53+
BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();
54+
55+
try (CuratorFramework client = CuratorFrameworkFactory.newClient(
56+
server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1))) {
57+
client.start();
58+
// given: connected curator client
59+
client.blockUntilConnected();
60+
61+
// given: started persistent watcher under namespaced facade
62+
PersistentWatcher persistentWatcher = new PersistentWatcher(client.usingNamespace("top"), "/main", true);
63+
persistentWatcher.getListenable().addListener(events::add);
64+
persistentWatcher.start();
65+
66+
// when: create paths
67+
client.create().forPath("/top/main");
68+
client.create().forPath("/top/main/a");
69+
70+
// then: receive node watch events
71+
WatchedEvent event1 = events.poll(5, TimeUnit.SECONDS);
72+
assertNotNull(event1);
73+
assertEquals(Watcher.Event.EventType.NodeCreated, event1.getType());
74+
assertEquals("/main", event1.getPath());
75+
76+
WatchedEvent event2 = events.poll(5, TimeUnit.SECONDS);
77+
assertNotNull(event2);
78+
assertEquals(Watcher.Event.EventType.NodeCreated, event2.getType());
79+
assertEquals("/main/a", event2.getPath());
80+
}
81+
82+
// when: curator client closed
83+
// then: listener get Closed notification
84+
WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
85+
assertNotNull(event);
86+
assertEquals(Watcher.Event.EventType.None, event.getType());
87+
assertEquals(Watcher.Event.KeeperState.Closed, event.getState());
88+
}
89+
5190
@Test
5291
public void testConcurrentClientClose() throws Exception {
5392
BlockingQueue<WatchedEvent> events = new LinkedBlockingQueue<>();

0 commit comments

Comments
 (0)