Skip to content

Commit 063a35c

Browse files
committed
Fix close race condition.
1 parent 44a60e1 commit 063a35c

File tree

1 file changed

+52
-35
lines changed

1 file changed

+52
-35
lines changed

periodic-jdk/src/main/scala/ca/dvgi/periodic/jdk/JdkAutoUpdatingVar.scala

+52-35
Original file line numberDiff line numberDiff line change
@@ -66,45 +66,53 @@ class JdkAutoUpdatingVar[T](
6666
override def latest: T = variable.getOrElse(throw UnreadyAutoUpdatingVarException)
6767

6868
override def close(): Unit = {
69-
nextTask.cancel(true)
69+
CloseLock.synchronized {
70+
closed = true
71+
nextTask.foreach(_.cancel(true))
72+
}
7073
if (executorOverride.isEmpty)
7174
executor.shutdownNow()
7275
super.close()
7376
}
7477

78+
private case object CloseLock
79+
80+
@volatile private var closed = false
81+
7582
@volatile private var variable: Option[T] = None
7683

7784
private val _ready = Promise[Unit]()
7885

79-
@volatile private var nextTask: ScheduledFuture[_] =
80-
executor.schedule(
81-
new Runnable {
82-
def run(): Unit = {
83-
val tryV =
84-
Try(try {
85-
try {
86-
updateVar
87-
} catch {
88-
case NonFatal(e) =>
89-
log.error(logString("Failed to initialize var"), e)
90-
throw e
91-
}
92-
} catch (handleInitializationError))
93-
94-
tryV match {
95-
case Success(value) =>
96-
variable = Some(value)
97-
_ready.complete(Success(()))
98-
log.info(logString("Successfully initialized"))
99-
scheduleUpdate(updateInterval.duration(value))
100-
case Failure(e) =>
101-
_ready.complete(Failure(e))
102-
}
86+
@volatile private var nextTask: Option[ScheduledFuture[_]] = None
87+
88+
executor.schedule(
89+
new Runnable {
90+
def run(): Unit = {
91+
val tryV =
92+
Try(try {
93+
try {
94+
updateVar
95+
} catch {
96+
case NonFatal(e) =>
97+
log.error(logString("Failed to initialize var"), e)
98+
throw e
99+
}
100+
} catch (handleInitializationError))
101+
102+
tryV match {
103+
case Success(value) =>
104+
variable = Some(value)
105+
_ready.complete(Success(()))
106+
log.info(logString("Successfully initialized"))
107+
scheduleUpdate(updateInterval.duration(value))
108+
case Failure(e) =>
109+
_ready.complete(Failure(e))
103110
}
104-
},
105-
0,
106-
TimeUnit.NANOSECONDS
107-
)
111+
}
112+
},
113+
0,
114+
TimeUnit.NANOSECONDS
115+
)
108116

109117
blockUntilReadyTimeout.foreach { timeout =>
110118
Await.result(ready, timeout)
@@ -113,7 +121,10 @@ class JdkAutoUpdatingVar[T](
113121
private def scheduleUpdate(nextUpdate: FiniteDuration): Unit = {
114122
log.info(logString(s"Scheduling update of var in: $nextUpdate"))
115123

116-
nextTask = executor.schedule(new UpdateVar(1), nextUpdate.length, nextUpdate.unit)
124+
CloseLock.synchronized {
125+
if (!closed)
126+
nextTask = Some(executor.schedule(new UpdateVar(1), nextUpdate.length, nextUpdate.unit))
127+
}
117128
()
118129
}
119130

@@ -144,11 +155,17 @@ class JdkAutoUpdatingVar[T](
144155
logString(s"Unhandled exception when trying to update var, retrying in $delay"),
145156
e
146157
)
147-
executor.schedule(
148-
new UpdateVar(attempt + 1),
149-
delay.length,
150-
delay.unit
151-
)
158+
159+
CloseLock.synchronized {
160+
if (!closed)
161+
nextTask = Some(
162+
executor.schedule(
163+
new UpdateVar(attempt + 1),
164+
delay.length,
165+
delay.unit
166+
)
167+
)
168+
}
152169
()
153170
}
154171
}

0 commit comments

Comments
 (0)