Skip to content

Commit e46bee5

Browse files
committed
Merge branch '2.1'
2 parents af4955d + 249fac2 commit e46bee5

File tree

2 files changed

+103
-51
lines changed

2 files changed

+103
-51
lines changed

core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java

Lines changed: 102 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,27 @@
4747
import org.slf4j.Logger;
4848
import org.slf4j.LoggerFactory;
4949

50+
import com.google.common.base.Preconditions;
51+
5052
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
5153

54+
/**
55+
* This class uses Sequential Ephemeral ZooKeeper Nodes
56+
* (https://zookeeper.apache.org/doc/r3.9.5/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming)
57+
* to implement locks for Accumulo server processes. This class will create an ephemeral sequential
58+
* node under a base path using the prefix "zlock#" + UUID + "#". The ZooKeeper server will append a
59+
* 10-digit zero-padded number to this prefix using a one-up counter. The base path could be an HA
60+
* service like the Manager or a non-HA service like a TabletServer.
61+
*
62+
* When an instance of this class has the lowest counter number at the base path, then it has the
63+
* lock. When an instance of this class does not have the lowest counter number, then it watches the
64+
* node with the next lowest counter number. When the node that has the next lowest counter number
65+
* is deleted, then this instance could acquire the lock.
66+
*
67+
* Instance of this class also place a Watcher on the base path node. If the base path Watcher
68+
* receives a Session Expired event, then it calls lostLock which should end up halting the Accumulo
69+
* server process.
70+
*/
5271
@SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW",
5372
justification = "Constructor validation is required for proper initialization")
5473
public class ServiceLock implements Watcher {
@@ -89,38 +108,70 @@ public interface AccumuloLockWatcher extends LockWatcher {
89108
void failedToAcquireLock(Exception e);
90109
}
91110

111+
// the base path
92112
private final ServiceLockPath path;
113+
93114
protected final ZooSession zooKeeper;
115+
116+
// "zlock#" + UUID + "#"
94117
private final Prefix vmLockPrefix;
95118

119+
// A LockWatcher instance supplied to this class
120+
// by the caller when trying to acquire the lock
121+
// in ZooKeeper. This object does not represent
122+
// a Watcher in ZooKeeper, but allows an instance
123+
// of this class to communicate with the calling
124+
// object by invoking callback methods.
96125
private LockWatcher lockWatcher;
97-
private String lockNodeName;
126+
127+
// A variable which is initially null, then set
128+
// to the createdNodeName when the lock is acquired.
129+
// This variable is set to null when this instance
130+
// loses the lock.
131+
private volatile String lockNodeName;
132+
133+
// boolean to track if this instance has ever held the lock
98134
private volatile boolean lockWasAcquired;
99-
private volatile boolean watchingParent;
100135

101-
private String createdNodeName;
102-
private String watchingNodeName;
136+
// boolean to track if there is a watcher on the base path.
137+
private volatile boolean watchingBasePath;
138+
139+
// Represents the name of the ephemeral sequential node that
140+
// the ZooKeeper server created for us with the unique one-up
141+
// counter. This variable is set to null when this instance
142+
// acquires the lock.
143+
private volatile String createdNodeName;
144+
145+
// Represents the path of the ephemeral sequential node that
146+
// has the next lowest counter value. An instance of this class
147+
// will watch this node and will attempt to acquire the lock
148+
// when this node is deleted.
149+
private String watchingNodePath;
103150

104151
public ServiceLock(ZooSession zookeeper, ServiceLockPath path, UUID uuid) {
105152
this.zooKeeper = requireNonNull(zookeeper);
106153
this.path = requireNonNull(path);
107154
try {
108155
zooKeeper.exists(path.toString(), this);
109-
watchingParent = true;
156+
watchingBasePath = true;
110157
this.vmLockPrefix = new Prefix(ZLOCK_PREFIX + uuid.toString() + "#");
111158
} catch (KeeperException | InterruptedException ex) {
112159
LOG.error("Error setting initial watch", ex);
113160
throw new IllegalStateException(ex);
114161
}
115162
}
116163

117-
private static class LockWatcherWrapper implements AccumuloLockWatcher {
164+
// Watcher used in tryLock method that wraps the supplied LockWatcher
165+
// so that the supplied LockWatcher.failedToAcquireLock method is not
166+
// invoked. The supplied LockWatcher.failedToAcquireLock method may
167+
// halt the server, but we don't want to do that in a tryLock invocation.
168+
private static class TryLockWatcherWrapper implements AccumuloLockWatcher {
118169

119-
boolean acquiredLock = false;
120-
final LockWatcher lw;
170+
private boolean acquiredLock = false;
171+
private final LockWatcher delegate;
121172

122-
public LockWatcherWrapper(LockWatcher lw2) {
123-
this.lw = lw2;
173+
public TryLockWatcherWrapper(LockWatcher lw2) {
174+
this.delegate = lw2;
124175
}
125176

126177
@Override
@@ -135,20 +186,20 @@ public void failedToAcquireLock(Exception e) {
135186

136187
@Override
137188
public void lostLock(LockLossReason reason) {
138-
lw.lostLock(reason);
189+
delegate.lostLock(reason);
139190
}
140191

141192
@Override
142193
public void unableToMonitorLockNode(Exception e) {
143-
lw.unableToMonitorLockNode(e);
194+
delegate.unableToMonitorLockNode(e);
144195
}
145196

146197
}
147198

148199
public synchronized boolean tryLock(LockWatcher lw, ServiceLockData lockData)
149200
throws KeeperException, InterruptedException {
150201

151-
LockWatcherWrapper lww = new LockWatcherWrapper(lw);
202+
TryLockWatcherWrapper lww = new TryLockWatcherWrapper(lw);
152203

153204
lock(lww, lockData);
154205

@@ -263,48 +314,45 @@ public static String findLowestPrevPrefix(final List<String> children,
263314
return lowestPrevNode;
264315
}
265316

266-
private synchronized void determineLockOwnership(final String createdEphemeralNode,
267-
final AccumuloLockWatcher lw) throws KeeperException, InterruptedException {
317+
private synchronized void determineLockOwnership(final AccumuloLockWatcher lw)
318+
throws KeeperException, InterruptedException {
268319

269-
if (createdNodeName == null) {
270-
throw new IllegalStateException(
271-
"Called determineLockOwnership() when ephemeralNodeName == null");
272-
}
320+
Preconditions.checkState(createdNodeName != null, "createdNodeName cannot be null");
273321

274322
List<String> children = validateAndSort(path, zooKeeper.getChildren(path.toString(), null));
275323

276-
if (!children.contains(createdEphemeralNode)) {
277-
LOG.error("Expected ephemeral node {} to be in the list of children {}", createdEphemeralNode,
324+
if (!children.contains(createdNodeName)) {
325+
LOG.error("Expected ephemeral node {} to be in the list of children {}", createdNodeName,
278326
children);
279327
throw new IllegalStateException(
280-
"Lock attempt ephemeral node no longer exist " + createdEphemeralNode);
328+
"Lock attempt ephemeral node no longer exist " + createdNodeName);
281329
}
282330

283-
if (children.get(0).equals(createdEphemeralNode)) {
331+
if (children.get(0).equals(createdNodeName)) {
284332
LOG.debug("[{}] First candidate is my lock, acquiring...", vmLockPrefix);
285-
if (!watchingParent) {
333+
if (!watchingBasePath) {
286334
throw new IllegalStateException(
287335
"Can not acquire lock, no longer watching parent : " + path);
288336
}
289337
this.lockWatcher = lw;
290-
this.lockNodeName = createdEphemeralNode;
338+
this.lockNodeName = createdNodeName;
291339
createdNodeName = null;
292340
lockWasAcquired = true;
293341
lw.acquiredLock();
294342
} else {
295343
LOG.debug("[{}] Lock held by another process with ephemeral node: {}", vmLockPrefix,
296344
children.get(0));
297345

298-
String lowestPrevNode = findLowestPrevPrefix(children, createdEphemeralNode);
346+
String lowestPrevNode = findLowestPrevPrefix(children, createdNodeName);
299347

300-
watchingNodeName = path + "/" + lowestPrevNode;
301-
final String nodeToWatch = watchingNodeName;
348+
watchingNodePath = path + "/" + lowestPrevNode;
349+
final String nodeToWatch = watchingNodePath;
302350
LOG.debug("[{}] Establishing watch on prior node {}", vmLockPrefix, nodeToWatch);
303351
Watcher priorNodeWatcher = new Watcher() {
304352
@Override
305353
public void process(WatchedEvent event) {
306-
if (LOG.isTraceEnabled()) {
307-
LOG.trace("[{}] Processing {}", vmLockPrefix, event);
354+
if (LOG.isDebugEnabled()) {
355+
LOG.debug("[{}] Processing {}", vmLockPrefix, event);
308356
}
309357
boolean renew = true;
310358
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(nodeToWatch)) {
@@ -313,10 +361,10 @@ public void process(WatchedEvent event) {
313361
synchronized (ServiceLock.this) {
314362
try {
315363
if (createdNodeName != null) {
316-
determineLockOwnership(createdEphemeralNode, lw);
364+
determineLockOwnership(lw);
317365
} else if (LOG.isDebugEnabled()) {
318366
LOG.debug("[{}] While waiting for another lock {}, {} was deleted; {}",
319-
vmLockPrefix, nodeToWatch, createdEphemeralNode, event);
367+
vmLockPrefix, nodeToWatch, createdNodeName, event);
320368
}
321369
} catch (Exception e) {
322370
if (lockNodeName == null) {
@@ -340,20 +388,24 @@ public void process(WatchedEvent event) {
340388
renew = false;
341389
}
342390
if (renew) {
343-
try {
344-
Stat restat = zooKeeper.exists(nodeToWatch, this);
345-
if (restat == null) {
346-
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
347-
// created a Watcher on a node that does not exist. Delete the watcher we just
348-
// created.
349-
zooKeeper.removeWatches(nodeToWatch, this, WatcherType.Any, true);
350-
determineLockOwnership(createdEphemeralNode, lw);
351-
} else {
352-
LOG.debug("[{}] Renewed watch on prior node {}", vmLockPrefix, nodeToWatch);
391+
synchronized (ServiceLock.this) {
392+
if (createdNodeName != null) {
393+
try {
394+
Stat restat = zooKeeper.exists(nodeToWatch, this);
395+
if (restat == null) {
396+
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
397+
// created a Watcher on a node that does not exist. Delete the watcher we just
398+
// created.
399+
zooKeeper.removeWatches(nodeToWatch, this, WatcherType.Any, true);
400+
determineLockOwnership(lw);
401+
} else {
402+
LOG.debug("[{}] Renewed watch on prior node {}", vmLockPrefix, nodeToWatch);
403+
}
404+
} catch (KeeperException | InterruptedException e) {
405+
lw.failedToAcquireLock(
406+
new Exception("Failed to renew watch on prior node: " + nodeToWatch, e));
407+
}
353408
}
354-
} catch (KeeperException | InterruptedException e) {
355-
lw.failedToAcquireLock(
356-
new Exception("Failed to renew watch on other manager node", e));
357409
}
358410
}
359411
}
@@ -365,7 +417,7 @@ public void process(WatchedEvent event) {
365417
// if stat is null from the zookeeper.exists(path, Watcher) call, then we just
366418
// created a Watcher on a node that does not exist. Delete the watcher we just created.
367419
zooKeeper.removeWatches(nodeToWatch, priorNodeWatcher, WatcherType.Any, true);
368-
determineLockOwnership(createdEphemeralNode, lw);
420+
determineLockOwnership(lw);
369421
}
370422
}
371423

@@ -507,7 +559,7 @@ public void process(WatchedEvent event) {
507559
createdNodeName = pathForWatcher.substring(path.toString().length() + 1);
508560

509561
// We have created a node, do we own the lock?
510-
determineLockOwnership(createdNodeName, lw);
562+
determineLockOwnership(lw);
511563

512564
} catch (KeeperException | InterruptedException e) {
513565
lw.failedToAcquireLock(e);
@@ -571,7 +623,7 @@ public synchronized void unlock() throws InterruptedException, KeeperException {
571623
* @return path of node that this lock is watching
572624
*/
573625
public synchronized String getWatching() {
574-
return watchingNodeName;
626+
return watchingNodePath;
575627
}
576628

577629
public synchronized String getLockPath() {
@@ -628,15 +680,15 @@ public synchronized void process(WatchedEvent event) {
628680
LOG.debug("{}", event);
629681
}
630682

631-
watchingParent = false;
683+
watchingBasePath = false;
632684

633685
if (event.getState() == KeeperState.Expired && lockNodeName != null) {
634686
lostLock(LockLossReason.SESSION_EXPIRED);
635687
} else {
636688

637689
try { // set the watch on the parent node again
638690
zooKeeper.exists(path.toString(), this);
639-
watchingParent = true;
691+
watchingBasePath = true;
640692
} catch (KeeperException.ConnectionLossException ex) {
641693
// we can't look at the lock because we aren't connected, but our session is still good
642694
LOG.warn("lost connection to zookeeper", ex);

test/src/main/java/org/apache/accumulo/test/lock/ServiceLockIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ public void testTryLock() throws Exception {
653653

654654
// make sure still watching parent even though a lot of events occurred for the parent
655655
synchronized (zl) {
656-
Field field = zl.getClass().getDeclaredField("watchingParent");
656+
Field field = zl.getClass().getDeclaredField("watchingBasePath");
657657
field.setAccessible(true);
658658
assertTrue((Boolean) field.get(zl));
659659
}

0 commit comments

Comments
 (0)