Skip to content

Commit

Permalink
Merge pull request #278 from Kazark/ce3
Browse files Browse the repository at this point in the history
feat: upgrade to Cats Effect 3
  • Loading branch information
ChristopherDavenport authored Aug 2, 2021
2 parents 9b9adfa + 211a94d commit 3c98e90
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 200 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ lazy val reload = project.in(file("modules/reload"))
)

val catsV = "2.6.1"
val catsEffectV = "2.5.1"
val catsEffectV = "3.1.0"
val catsCollectionV = "0.9.2"

val munitV = "0.7.25"
Expand All @@ -96,12 +96,12 @@ lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-core" % catsV,
"org.typelevel" %% "cats-effect" % catsEffectV,
"io.chrisdavenport" %% "mapref" % "0.1.1",
"io.chrisdavenport" %% "mapref" % "0.2.0-M1",

"org.typelevel" %% "cats-effect-laws" % catsEffectV % Test,
"org.scalameta" %% "munit" % munitV % Test,
"org.scalameta" %% "munit-scalacheck" % munitV % Test,
"org.typelevel" %% "munit-cats-effect-2" % munitCEV % Test,
"org.typelevel" %% "munit-cats-effect-3" % munitCEV % Test,
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.openjdk.jmh.annotations._

import cats.implicits._
import cats.effect._
import cats.effect.unsafe.IORuntime
import io.chrisdavenport.mules.caffeine.CaffeineCache


Expand All @@ -14,37 +15,37 @@ class LookUpBench {
import LookUpBench._

@Benchmark
def contentionSingleImmutableMap(in: BenchStateRef) =
testUnderContention(in.memoryCache, in.readList, in.writeList)(in.CS)
def contentionSingleImmutableMap(in: BenchStateRef) =
testUnderContention(in.memoryCache, in.readList, in.writeList)(in.R)

@Benchmark
def contentionConcurrentHashMap(in: BenchStateCHM) =
testUnderContention(in.memoryCache, in.readList, in.writeList)(in.CS)
testUnderContention(in.memoryCache, in.readList, in.writeList)(in.R)

@Benchmark
def contentionCaffeine(in: BenchStateCaffeine) =
testUnderContention(in.cache, in.readList, in.writeList)(in.CS)
testUnderContention(in.cache, in.readList, in.writeList)(in.R)

def testUnderContention(m: Cache[IO, Int, String], r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = {
def testUnderContention(m: Cache[IO, Int, String], r: List[Int], w: List[Int])(implicit R: IORuntime) = {
val set = w.traverse( m.insert(_, "foo"))
val read = r.traverse(m.lookup(_))
val action = (set, read).parMapN((_, _) => ())
action.unsafeRunSync()
}

@Benchmark
def contentionReadsSingleImmutableMap(in: BenchStateRef) =
underContentionWaitReads(in.memoryCache, in.readList, in.writeList)(in.CS)
def contentionReadsSingleImmutableMap(in: BenchStateRef) =
underContentionWaitReads(in.memoryCache, in.readList, in.writeList)(in.R)

@Benchmark
def contentionReadsConcurrentHashMap(in: BenchStateCHM) =
underContentionWaitReads(in.memoryCache, in.readList, in.writeList)(in.CS)
def contentionReadsConcurrentHashMap(in: BenchStateCHM) =
underContentionWaitReads(in.memoryCache, in.readList, in.writeList)(in.R)

@Benchmark
def contentionReadsCaffeine(in: BenchStateCaffeine) =
underContentionWaitReads(in.cache, in.readList, in.writeList)(in.CS)
underContentionWaitReads(in.cache, in.readList, in.writeList)(in.R)

def underContentionWaitReads(m: Cache[IO, Int, String], r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = {
def underContentionWaitReads(m: Cache[IO, Int, String], r: List[Int], w: List[Int])(implicit R: IORuntime) = {
val set = w.traverse(m.insert(_, "foo"))
val read = r.traverse(m.lookup(_))
Concurrent[IO].bracket(set.start)(
Expand All @@ -53,18 +54,18 @@ class LookUpBench {
}

@Benchmark
def contentionWritesSingleImmutableMap(in: BenchStateRef) =
underContentionWaitWrites(in.memoryCache, in.readList, in.writeList)(in.CS)
def contentionWritesSingleImmutableMap(in: BenchStateRef) =
underContentionWaitWrites(in.memoryCache, in.readList, in.writeList)(in.R)

@Benchmark
def contentionWritesConcurrentHashMap(in: BenchStateCHM) =
underContentionWaitWrites(in.memoryCache, in.readList, in.writeList)(in.CS)
def contentionWritesConcurrentHashMap(in: BenchStateCHM) =
underContentionWaitWrites(in.memoryCache, in.readList, in.writeList)(in.R)

@Benchmark
def contentionWritesCaffeine(in: BenchStateCaffeine) =
underContentionWaitWrites(in.cache, in.readList, in.writeList)(in.CS)
def contentionWritesCaffeine(in: BenchStateCaffeine) =
underContentionWaitWrites(in.cache, in.readList, in.writeList)(in.R)

def underContentionWaitWrites(m: Cache[IO, Int, String],r: List[Int], w: List[Int])(implicit CS: ContextShift[IO]) = {
def underContentionWaitWrites(m: Cache[IO, Int, String],r: List[Int], w: List[Int])(implicit R: IORuntime) = {
val set = w.traverse( m.insert(_, "foo"))
val read = r.traverse(m.lookup(_))
Concurrent[IO].bracket(read.start)(
Expand All @@ -80,8 +81,7 @@ object LookUpBench {
var memoryCache: MemoryCache[IO, Int, String] = _
val writeList: List[Int] = (1 to 100).toList
val readList : List[Int] = (1 to 100).toList
implicit val T = IO.timer(scala.concurrent.ExecutionContext.global)
implicit val CS = IO.contextShift(scala.concurrent.ExecutionContext.global)
implicit val R = IORuntime.global

@Setup(Level.Trial)
def setup(): Unit = {
Expand All @@ -94,8 +94,7 @@ object LookUpBench {
var memoryCache: MemoryCache[IO, Int, String] = _
val writeList: List[Int] = (1 to 100).toList
val readList : List[Int] = (1 to 100).toList
implicit val T = IO.timer(scala.concurrent.ExecutionContext.global)
implicit val CS = IO.contextShift(scala.concurrent.ExecutionContext.global)
implicit val R = IORuntime.global

@Setup(Level.Trial)
def setup(): Unit = {
Expand All @@ -110,13 +109,12 @@ object LookUpBench {
var cache: Cache[IO, Int, String] = _
val writeList: List[Int] = (1 to 100).toList
val readList : List[Int] = (1 to 100).toList
implicit val T = IO.timer(scala.concurrent.ExecutionContext.global)
implicit val CS = IO.contextShift(scala.concurrent.ExecutionContext.global)
implicit val R = IORuntime.global

@Setup(Level.Trial)
def setup(): Unit = {
cache = CaffeineCache.build[IO, Int, String](None, None, None).unsafeRunSync()
cache.insert(1, "yellow").unsafeRunSync()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class CaffeineCacheSpec extends CatsEffectSuite {
for {
cache <- CaffeineCache.build[IO, String, Int](Some(TimeSpec.unsafeFromDuration(1.second)), None, None)
_ <- cache.insert("Foo", 1)
_ <- Timer[IO].sleep(1.milli)
_ <- Temporal[IO].sleep(1.milli)
value <- cache.lookup("Foo")
} yield {
assertEquals(value, Some(1))
Expand All @@ -34,7 +34,7 @@ class CaffeineCacheSpec extends CatsEffectSuite {
for {
cache <- CaffeineCache.build[IO, String, Int](Some(TimeSpec.unsafeFromDuration(1.second)), None, None)
_ <- cache.insert("Foo", 1)
_ <- Timer[IO].sleep(2.second)
_ <- Temporal[IO].sleep(2.second)
value <- cache.lookup("Foo")
} yield {
assertEquals(value, None)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.chrisdavenport.mules

import cats.effect._
import cats.effect.concurrent._
import cats.effect.implicits._
import cats.implicits._
import scala.concurrent.duration._
import scala.collection.immutable.Map

import io.chrisdavenport.mapref.MapRef
Expand Down Expand Up @@ -37,19 +35,19 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
private val purgeExpiredEntries: Long => F[List[K]] =
purgeExpiredEntriesOpt.getOrElse(purgeExpiredEntriesDefault)

private val emptyFV = F.pure(Option.empty[TryableDeferred[F, Either[Throwable, V]]])
private val emptyFV = F.pure(Option.empty[Deferred[F, Either[Throwable, V]]])

private val createEmptyIfUnset: K => F[Option[TryableDeferred[F, Either[Throwable, V]]]] =
k => Deferred.tryable[F, Either[Throwable, V]].flatMap{deferred =>
C.monotonic(NANOSECONDS).flatMap{ now =>
val timeout = defaultExpiration.map(ts => TimeSpec.unsafeFromNanos(now + ts.nanos))
private val createEmptyIfUnset: K => F[Option[Deferred[F, Either[Throwable, V]]]] =
k => Deferred[F, Either[Throwable, V]].flatMap{deferred =>
C.monotonic.flatMap{ now =>
val timeout = defaultExpiration.map(ts => TimeSpec.unsafeFromNanos(now.toNanos + ts.nanos))
mapRef(k).modify{
case None => (DispatchOneCacheItem[F, V](deferred, timeout).some, deferred.some)
case s@Some(_) => (s, None)
}}
}

private val updateIfFailedThenCreate: (K, DispatchOneCacheItem[F, V]) => F[Option[TryableDeferred[F, Either[Throwable, V]]]] =
private val updateIfFailedThenCreate: (K, DispatchOneCacheItem[F, V]) => F[Option[Deferred[F, Either[Throwable, V]]]] =
(k, cacheItem) => cacheItem.item.tryGet.flatMap{
case Some(Left(_)) =>
mapRef(k).modify{
Expand All @@ -72,8 +70,8 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
maybeDeferred.bracketCase(_.traverse_{ deferred =>
action(k).attempt.flatMap(e => deferred.complete(e).attempt.void)
}){
case (Some(deferred), ExitCase.Canceled) => deferred.complete(CancelationDuringDispatchOneCacheInsertProcessing.asLeft).attempt.void
case (Some(deferred), ExitCase.Error(e)) => deferred.complete(e.asLeft).attempt.void
case (Some(deferred), Outcome.Canceled()) => deferred.complete(CancelationDuringDispatchOneCacheInsertProcessing.asLeft).attempt.void
case (Some(deferred), Outcome.Errored(e)) => deferred.complete(e.asLeft).attempt.void
case _ => F.unit
}
}
Expand All @@ -84,11 +82,11 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
* gets the value in the system
**/
def lookupOrLoad(k: K, action: K => F[V]): F[V] = {
C.monotonic(NANOSECONDS)
C.monotonic
.flatMap{now =>
mapRef(k).modify[Option[DispatchOneCacheItem[F, V]]]{
case s@Some(value) =>
if (DispatchOneCache.isExpired(now, value)){
if (DispatchOneCache.isExpired(now.toNanos, value)){
(None, None)
} else {
(s, s)
Expand All @@ -108,22 +106,22 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (

def insertWith(k: K, action: K => F[V]): F[Unit] = {
for {
defer <- Deferred.tryable[F, Either[Throwable, V]]
now <- Clock[F].monotonic(NANOSECONDS)
item = DispatchOneCacheItem(defer, defaultExpiration.map(spec => TimeSpec.unsafeFromNanos(now + spec.nanos))).some
defer <- Deferred[F, Either[Throwable, V]]
now <- Clock[F].monotonic
item = DispatchOneCacheItem(defer, defaultExpiration.map(spec => TimeSpec.unsafeFromNanos(now.toNanos + spec.nanos))).some
out <- mapRef(k).getAndSet(item)
.bracketCase{oldDeferOpt =>
action(k).flatMap[Unit]{ a =>
val set = a.asRight
oldDeferOpt.traverse_(oldDefer => oldDefer.item.complete(set)).attempt >>
defer.complete(set)
defer.complete(set).void
}
}{
case (_, ExitCase.Completed) => F.unit
case (oldItem, ExitCase.Canceled) =>
case (_, Outcome.Succeeded(_)) => F.unit
case (oldItem, Outcome.Canceled()) =>
val set = CancelationDuringDispatchOneCacheInsertProcessing.asLeft
oldItem.traverse_(_.item.complete(set)).attempt >> defer.complete(set).attempt.void
case (oldItem, ExitCase.Error(e)) =>
case (oldItem, Outcome.Errored(e)) =>
val set = e.asLeft
oldItem.traverse_(_.item.complete(set)).attempt >> defer.complete(set).attempt.void
}
Expand All @@ -134,11 +132,11 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
* Overrides any background insert
**/
def insert(k: K, v: V): F[Unit] = for {
defered <- Deferred.tryable[F, Either[Throwable, V]]
defered <- Deferred[F, Either[Throwable, V]]
setAs = v.asRight
_ <- defered.complete(setAs)
now <- C.monotonic(NANOSECONDS)
item = DispatchOneCacheItem(defered, defaultExpiration.map(spec => TimeSpec.unsafeFromNanos(now + spec.nanos))).some
now <- C.monotonic
item = DispatchOneCacheItem(defered, defaultExpiration.map(spec => TimeSpec.unsafeFromNanos(now.toNanos + spec.nanos))).some
action <- mapRef(k).modify{
case None =>
(item, F.unit)
Expand All @@ -153,11 +151,11 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
* Overrides any background insert
**/
def insertWithTimeout(optionTimeout: Option[TimeSpec])(k: K, v: V): F[Unit] = for {
defered <- Deferred.tryable[F, Either[Throwable, V]]
defered <- Deferred[F, Either[Throwable, V]]
setAs = v.asRight
_ <- defered.complete(setAs)
now <- C.monotonic(NANOSECONDS)
item = DispatchOneCacheItem(defered, optionTimeout.map(spec => TimeSpec.unsafeFromNanos(now + spec.nanos))).some
now <- C.monotonic
item = DispatchOneCacheItem(defered, optionTimeout.map(spec => TimeSpec.unsafeFromNanos(now.toNanos + spec.nanos))).some
action <- mapRef(k).modify{
case None =>
(item, F.unit)
Expand All @@ -168,11 +166,11 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
} yield out

def lookup(k: K): F[Option[V]] = {
C.monotonic(NANOSECONDS)
C.monotonic
.flatMap{now =>
mapRef(k).modify[Option[DispatchOneCacheItem[F, V]]]{
case s@Some(value) =>
if (DispatchOneCache.isExpired(now, value)){
if (DispatchOneCache.isExpired(now.toNanos, value)){
(None, None)
} else {
(s, s)
Expand Down Expand Up @@ -208,16 +206,16 @@ final class DispatchOneCache[F[_], K, V] private[DispatchOneCache] (
**/
def purgeExpired: F[Unit] = {
for {
now <- C.monotonic(NANOSECONDS)
_ <- purgeExpiredEntries(now)
now <- C.monotonic
_ <- purgeExpiredEntries(now.toNanos)
} yield ()
}

}

object DispatchOneCache {
private case class DispatchOneCacheItem[F[_], A](
item: TryableDeferred[F, Either[Throwable, A]],
item: Deferred[F, Either[Throwable, A]],
itemExpiration: Option[TimeSpec]
)
private case object CancelationDuringDispatchOneCacheInsertProcessing extends scala.util.control.NoStackTrace
Expand All @@ -231,13 +229,13 @@ object DispatchOneCache {
*
* @return an `Resource[F, Unit]` that will keep removing expired entries in the background.
**/
def liftToAuto[F[_]: Concurrent: Timer, K, V](
def liftToAuto[F[_]: Temporal, K, V](
DispatchOneCache: DispatchOneCache[F, K, V],
checkOnExpirationsEvery: TimeSpec
): Resource[F, Unit] = {
def runExpiration(cache: DispatchOneCache[F, K, V]): F[Unit] = {
val check = TimeSpec.toDuration(checkOnExpirationsEvery)
Timer[F].sleep(check) >> cache.purgeExpired >> runExpiration(cache)
Temporal[F].sleep(check) >> cache.purgeExpired >> runExpiration(cache)
}

Resource.make(runExpiration(DispatchOneCache).start)(_.cancel).void
Expand All @@ -248,7 +246,7 @@ object DispatchOneCache {
*
* If the specified default expiration value is None, items inserted by insert will never expire.
**/
def ofSingleImmutableMap[F[_]: Concurrent: Clock, K, V](
def ofSingleImmutableMap[F[_]: Async, K, V](
defaultExpiration: Option[TimeSpec]
): F[DispatchOneCache[F, K, V]] =
Ref.of[F, Map[K, DispatchOneCacheItem[F, V]]](Map.empty[K, DispatchOneCacheItem[F, V]])
Expand All @@ -258,7 +256,7 @@ object DispatchOneCache {
defaultExpiration
))

def ofShardedImmutableMap[F[_]: Concurrent : Clock, K, V](
def ofShardedImmutableMap[F[_]: Async, K, V](
shardCount: Int,
defaultExpiration: Option[TimeSpec]
): F[DispatchOneCache[F, K, V]] =
Expand All @@ -270,7 +268,7 @@ object DispatchOneCache {
)
}

def ofConcurrentHashMap[F[_]: Concurrent: Clock, K, V](
def ofConcurrentHashMap[F[_]: Async, K, V](
defaultExpiration: Option[TimeSpec],
initialCapacity: Int = 16,
loadFactor: Float = 0.75f,
Expand Down
Loading

0 comments on commit 3c98e90

Please sign in to comment.