Skip to content

Commit 2c018fa

Browse files
authored
Merge pull request #564 from samspills/sam/559/fairness
Add a `fairness` parameter
2 parents 10d86a6 + 325172e commit 2c018fa

File tree

10 files changed

+615
-6
lines changed

10 files changed

+615
-6
lines changed

NOTICE

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,9 @@ Licensed under the MIT License (see LICENSE)
55
This software contains portions of code derived from http-client
66
https://github.com/snoyberg/http-client
77
Copyright (c) 2013 Michael Snoyman
8-
Licensed under MIT (see licenses/LICENSE_http-client)
8+
Licensed under MIT (see licenses/LICENSE_http-client)
9+
10+
This software contains portions of code derived from cats-effect
11+
https://github.com/typelevel/cats-effect
12+
Copyright (c) 2020-2023 The Typelevel Cats-effect Project Developers
13+
Licensed under Apache License 2.0 (see licenses/LICENSE_cats-effect)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2019 Typelevel
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package org.typelevel.keypool
23+
24+
/**
25+
* Fairness defines the order in which pending requests acquire a connection from the pool.
26+
*
27+
* Lifo will process requests in last-in-first-out order. Fifo will process requests in
28+
* first-in-first-out order.
29+
*/
30+
sealed trait Fairness extends Product with Serializable
31+
object Fairness {
32+
case object Lifo extends Fairness
33+
case object Fifo extends Fairness
34+
}

core/src/main/scala/org/typelevel/keypool/KeyPool.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ package org.typelevel.keypool
2424
import cats._
2525
import cats.effect.kernel._
2626
import cats.effect.kernel.syntax.spawn._
27-
import cats.effect.std.Semaphore
2827
import cats.syntax.all._
2928
import scala.concurrent.duration._
3029
import org.typelevel.keypool.internal._
@@ -67,7 +66,7 @@ object KeyPool {
6766
private[keypool] val kpMaxPerKey: A => Int,
6867
private[keypool] val kpMaxIdle: Int,
6968
private[keypool] val kpMaxTotal: Int,
70-
private[keypool] val kpMaxTotalSem: Semaphore[F],
69+
private[keypool] val kpMaxTotalSem: RequestSemaphore[F],
7170
private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]]
7271
) extends KeyPool[F, A, B] {
7372

@@ -322,6 +321,7 @@ object KeyPool {
322321
val kpMaxPerKey: A => Int,
323322
val kpMaxIdle: Int,
324323
val kpMaxTotal: Int,
324+
val fairness: Fairness,
325325
val onReaperException: Throwable => F[Unit]
326326
) {
327327
private def copy(
@@ -332,6 +332,7 @@ object KeyPool {
332332
kpMaxPerKey: A => Int = this.kpMaxPerKey,
333333
kpMaxIdle: Int = this.kpMaxIdle,
334334
kpMaxTotal: Int = this.kpMaxTotal,
335+
fairness: Fairness = this.fairness,
335336
onReaperException: Throwable => F[Unit] = this.onReaperException
336337
): Builder[F, A, B] = new Builder[F, A, B](
337338
kpRes,
@@ -341,6 +342,7 @@ object KeyPool {
341342
kpMaxPerKey,
342343
kpMaxIdle,
343344
kpMaxTotal,
345+
fairness,
344346
onReaperException
345347
)
346348

@@ -370,6 +372,9 @@ object KeyPool {
370372
def withMaxTotal(total: Int): Builder[F, A, B] =
371373
copy(kpMaxTotal = total)
372374

375+
def withFairness(fairness: Fairness): Builder[F, A, B] =
376+
copy(fairness = fairness)
377+
373378
def withOnReaperException(f: Throwable => F[Unit]): Builder[F, A, B] =
374379
copy(onReaperException = f)
375380

@@ -380,7 +385,7 @@ object KeyPool {
380385
kpVar <- Resource.make(
381386
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
382387
)(kpVar => KeyPool.destroy(kpVar))
383-
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
388+
kpMaxTotalSem <- Resource.eval(RequestSemaphore[F](fairness, kpMaxTotal))
384389
_ <- (idleTimeAllowedInPool, durationBetweenEvictionRuns) match {
385390
case (fdI: FiniteDuration, fdE: FiniteDuration) if fdE >= 0.seconds =>
386391
val idleNanos = 0.seconds.max(fdI)
@@ -414,6 +419,7 @@ object KeyPool {
414419
Defaults.maxPerKey,
415420
Defaults.maxIdle,
416421
Defaults.maxTotal,
422+
Defaults.fairness,
417423
Defaults.onReaperException[F]
418424
)
419425

@@ -430,6 +436,7 @@ object KeyPool {
430436
def maxPerKey[K](k: K): Int = Function.const(100)(k)
431437
val maxIdle = 100
432438
val maxTotal = 100
439+
val fairness = Fairness.Fifo
433440
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
434441
Function.const(Applicative[F].unit)(t)
435442
}

core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import cats._
2626
import cats.syntax.all._
2727
import cats.effect.kernel._
2828
import cats.effect.kernel.syntax.spawn._
29-
import cats.effect.std.Semaphore
3029
import scala.concurrent.duration._
30+
import org.typelevel.keypool.internal._
3131

3232
@deprecated("use KeyPool.Builder", "0.4.7")
3333
final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
@@ -91,7 +91,7 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private (
9191
kpVar <- Resource.make(
9292
Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]]))
9393
)(kpVar => KeyPool.destroy(kpVar))
94-
kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong))
94+
kpMaxTotalSem <- Resource.eval(RequestSemaphore[F](Fairness.Fifo, kpMaxTotal))
9595
_ <- idleTimeAllowedInPool match {
9696
case fd: FiniteDuration =>
9797
val nanos = 0.seconds.max(fd)

core/src/main/scala/org/typelevel/keypool/Pool.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ object Pool {
7777
val durationBetweenEvictionRuns: Duration,
7878
val kpMaxIdle: Int,
7979
val kpMaxTotal: Int,
80+
val fairness: Fairness,
8081
val onReaperException: Throwable => F[Unit]
8182
) {
8283
private def copy(
@@ -86,6 +87,7 @@ object Pool {
8687
durationBetweenEvictionRuns: Duration = this.durationBetweenEvictionRuns,
8788
kpMaxIdle: Int = this.kpMaxIdle,
8889
kpMaxTotal: Int = this.kpMaxTotal,
90+
fairness: Fairness = this.fairness,
8991
onReaperException: Throwable => F[Unit] = this.onReaperException
9092
): Builder[F, B] = new Builder[F, B](
9193
kpRes,
@@ -94,6 +96,7 @@ object Pool {
9496
durationBetweenEvictionRuns,
9597
kpMaxIdle,
9698
kpMaxTotal,
99+
fairness,
97100
onReaperException
98101
)
99102

@@ -120,6 +123,9 @@ object Pool {
120123
def withMaxTotal(total: Int): Builder[F, B] =
121124
copy(kpMaxTotal = total)
122125

126+
def withFairness(fairness: Fairness): Builder[F, B] =
127+
copy(fairness = fairness)
128+
123129
def withOnReaperException(f: Throwable => F[Unit]): Builder[F, B] =
124130
copy(onReaperException = f)
125131

@@ -132,6 +138,7 @@ object Pool {
132138
kpMaxPerKey = _ => kpMaxTotal,
133139
kpMaxIdle = kpMaxIdle,
134140
kpMaxTotal = kpMaxTotal,
141+
fairness = fairness,
135142
onReaperException = onReaperException
136143
)
137144

@@ -155,6 +162,7 @@ object Pool {
155162
Defaults.durationBetweenEvictionRuns,
156163
Defaults.maxIdle,
157164
Defaults.maxTotal,
165+
Defaults.fairness,
158166
Defaults.onReaperException[F]
159167
)
160168

@@ -170,6 +178,7 @@ object Pool {
170178
val durationBetweenEvictionRuns = 5.seconds
171179
val maxIdle = 100
172180
val maxTotal = 100
181+
val fairness = Fairness.Fifo
173182
def onReaperException[F[_]: Applicative] = { (t: Throwable) =>
174183
Function.const(Applicative[F].unit)(t)
175184
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) 2019 Typelevel
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package org.typelevel.keypool.internal
23+
24+
import cats.syntax.all._
25+
import cats.effect.kernel.syntax.all._
26+
import cats.effect.kernel._
27+
import scala.collection.immutable.{Queue => ScalaQueue}
28+
import scala.annotation.nowarn
29+
30+
import org.typelevel.keypool.Fairness
31+
32+
/**
33+
* RequestSemaphore moderates access to pooled connections by setting the number of permits
34+
* available to the total number of connections. This is a custom semaphore implementation that only
35+
* provides the `permit` operation. Additionally it takes a [[Fairness]] parameter, used to toggle
36+
* the order in which requests acquire a permit.
37+
*
38+
* Derived from cats-effect MiniSemaphore
39+
* https://github.com/typelevel/cats-effect/blob/v3.5.4/kernel/shared/src/main/scala/cats/effect/kernel/MiniSemaphore.scala#L29
40+
*/
41+
private[keypool] abstract class RequestSemaphore[F[_]] {
42+
def permit: Resource[F, Unit]
43+
}
44+
45+
private[keypool] object RequestSemaphore {
46+
private trait BackingQueue[CC[_], A] {
47+
def cleanup(fa: CC[A], elem: A): CC[A]
48+
def offer(fa: CC[A], elem: A): CC[A]
49+
def take(fa: CC[A]): (CC[A], A)
50+
def nonEmpty(fa: CC[A]): Boolean
51+
}
52+
53+
private implicit def listQueue[A <: AnyRef]: BackingQueue[List, A] = new BackingQueue[List, A] {
54+
def cleanup(fa: List[A], elem: A) = fa.filterNot(_ eq elem)
55+
def offer(fa: List[A], elem: A) = elem :: fa
56+
def take(fa: List[A]) = (fa.tail, fa.head)
57+
def nonEmpty(fa: List[A]) = fa.nonEmpty
58+
}
59+
60+
private implicit def queueQueue[A <: AnyRef]: BackingQueue[ScalaQueue, A] =
61+
new BackingQueue[ScalaQueue, A] {
62+
def cleanup(fa: ScalaQueue[A], elem: A) = fa.filterNot(_ eq elem)
63+
def offer(fa: ScalaQueue[A], elem: A) = fa :+ elem
64+
def take(fa: ScalaQueue[A]) = (fa.tail, fa.head)
65+
def nonEmpty(fa: ScalaQueue[A]) = fa.nonEmpty
66+
}
67+
68+
private case class State[CC[_], A](waiting: CC[A], permits: Int)(implicit
69+
@nowarn ev: BackingQueue[CC, A]
70+
)
71+
72+
def apply[F[_]](fairness: Fairness, numPermits: Int)(implicit
73+
F: Concurrent[F]
74+
): F[RequestSemaphore[F]] = {
75+
76+
def require(condition: Boolean, errorMessage: => String): F[Unit] =
77+
if (condition) F.unit else new IllegalArgumentException(errorMessage).raiseError[F, Unit]
78+
79+
require(numPermits >= 0, s"numPermits must be nonnegative, was: $numPermits") *> {
80+
fairness match {
81+
case Fairness.Fifo =>
82+
F.ref(State(ScalaQueue.empty[Deferred[F, Unit]], numPermits)).map(semaphore(_))
83+
case Fairness.Lifo =>
84+
F.ref(State(List.empty[Deferred[F, Unit]], numPermits)).map(semaphore(_))
85+
}
86+
}
87+
}
88+
89+
private def semaphore[F[_], CC[_]](
90+
state: Ref[F, State[CC, Deferred[F, Unit]]]
91+
)(implicit
92+
F: GenConcurrent[F, _],
93+
B: BackingQueue[CC, Deferred[F, Unit]]
94+
): RequestSemaphore[F] = {
95+
new RequestSemaphore[F] {
96+
private def acquire: F[Unit] =
97+
F.deferred[Unit].flatMap { wait =>
98+
val cleanup = state.update { case s @ State(waiting, permits) =>
99+
if (B.nonEmpty(waiting))
100+
State(B.cleanup(waiting, wait), permits)
101+
else s
102+
}
103+
104+
state.flatModifyFull { case (poll, State(waiting, permits)) =>
105+
if (permits == 0) {
106+
State(B.offer(waiting, wait), permits) -> poll(wait.get).onCancel(cleanup)
107+
} else
108+
State(waiting, permits - 1) -> F.unit
109+
}
110+
}
111+
112+
private def release: F[Unit] =
113+
state.flatModify { case State(waiting, permits) =>
114+
if (B.nonEmpty(waiting)) {
115+
val (rest, next) = B.take(waiting)
116+
State(rest, permits) -> next.complete(()).void
117+
} else
118+
State(waiting, permits + 1) -> F.unit
119+
}
120+
121+
def permit: Resource[F, Unit] =
122+
Resource.makeFull((poll: Poll[F]) => poll(acquire))(_ => release)
123+
}
124+
}
125+
}

core/src/test/scala/org/typelevel/keypool/KeyPoolSpec.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,65 @@ class KeyPoolSpec extends CatsEffectSuite {
228228
}
229229
}
230230

231+
test("requests served in FIFO order by default") {
232+
TestControl.executeEmbed {
233+
KeyPool
234+
.Builder(
235+
(i: Int) => Ref.of[IO, Int](i),
236+
nothing
237+
)
238+
.withMaxTotal(1)
239+
.build
240+
.use { pool =>
241+
for {
242+
ref <- IO.ref(List.empty[Int])
243+
f1 <- reqAction(pool, ref, 1).start <* IO.sleep(1.milli)
244+
f2 <- reqAction(pool, ref, 2).start <* IO.sleep(1.milli)
245+
f3 <- reqAction(pool, ref, 3).start <* IO.sleep(1.milli)
246+
f4 <- reqAction(pool, ref, 4).start <* IO.sleep(1.milli)
247+
_ <- f1.cancel
248+
_ <- f2.join *> f3.join *> f4.join
249+
order <- ref.get
250+
} yield assertEquals(order, List(1, 2, 3, 4))
251+
}
252+
}
253+
}
254+
255+
test("requests served in LIFO order if fairness is false") {
256+
TestControl.executeEmbed {
257+
KeyPool
258+
.Builder(
259+
(i: Int) => Ref.of[IO, Int](i),
260+
nothing
261+
)
262+
.withMaxTotal(1)
263+
.withFairness(Fairness.Lifo)
264+
.build
265+
.use { pool =>
266+
for {
267+
ref <- IO.ref(List.empty[Int])
268+
f1 <- reqAction(pool, ref, 1).start <* IO.sleep(1.milli)
269+
f2 <- reqAction(pool, ref, 2).start <* IO.sleep(1.milli)
270+
f3 <- reqAction(pool, ref, 3).start <* IO.sleep(1.milli)
271+
f4 <- reqAction(pool, ref, 4).start <* IO.sleep(1.milli)
272+
_ <- f1.cancel
273+
_ <- f2.join *> f3.join *> f4.join
274+
order <- ref.get
275+
} yield assertEquals(order, List(1, 4, 3, 2))
276+
}
277+
}
278+
}
279+
280+
private def reqAction(
281+
pool: KeyPool[IO, Int, Ref[IO, Int]],
282+
ref: Ref[IO, List[Int]],
283+
id: Int
284+
) =
285+
if (id == 1)
286+
pool.take(1).use(_ => ref.update(l => l :+ id) *> IO.never)
287+
else
288+
pool.take(1).use(_ => ref.update(l => l :+ id))
289+
231290
private def nothing(ref: Ref[IO, Int]): IO[Unit] =
232291
ref.get.void
233292

0 commit comments

Comments
 (0)