Skip to content

Commit

Permalink
Commit Log Passivation (#76)
Browse files Browse the repository at this point in the history
* - improve resource allocation for commit log
- add passivation for commit log writer actors

* add test for commit log passivation
  • Loading branch information
Saverio Veltri authored Aug 27, 2019
1 parent 01818a3 commit 75eddbc
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 66 deletions.
2 changes: 1 addition & 1 deletion nsdb-cluster/src/main/resources/cluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ nsdb {
directory = "/tmp/nsdb"
// Size expressed in KB
max-size = 10000000
check-interval = 30 seconds
passivate-after = 1h
}

cluster{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import io.radicalbit.nsdb.actors.MetricPerformerActor
import io.radicalbit.nsdb.actors.MetricPerformerActor.PersistedBits
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
import io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter
import io.radicalbit.nsdb.common.protocol.Coordinates
import io.radicalbit.nsdb.util.ActorPathLogging

import scala.collection.mutable
import scala.concurrent.Future

/**
Expand All @@ -38,19 +36,13 @@ import scala.concurrent.Future
*/
class CommitLogCoordinator extends ActorPathLogging {

private val commitLoggerWriters: mutable.Map[Coordinates, ActorRef] = mutable.Map.empty

private def getWriter(db: String, namespace: String, metric: String): ActorRef = {
commitLoggerWriters.getOrElse(
Coordinates(db, namespace, metric), {
val commitLogWriter =
context.actorOf(RollingCommitLogFileWriter.props(db, namespace, metric),
s"commit-log-writer-$db-$namespace-$metric")
commitLoggerWriters += (Coordinates(db, namespace, metric) -> commitLogWriter)
commitLogWriter
}
)
}
private def getWriter(db: String, namespace: String, metric: String): ActorRef =
context
.child(s"commit-log-writer-$db-$namespace-$metric")
.getOrElse(
context.actorOf(RollingCommitLogFileWriter.props(db, namespace, metric),
s"commit-log-writer-$db-$namespace-$metric")
)

implicit val timeout: Timeout = Timeout(
context.system.settings.config.getDuration("nsdb.write-coordinator.timeout", TimeUnit.SECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object MetadataSpec extends MultiNodeConfig {
| writer = "io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter"
| directory = "target/commitLog"
| max-size = 50000
| passivate-after = 5s
| }
|}
""".stripMargin))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ object ReplicatedMetadataCacheSpec extends MultiNodeConfig {
| writer = "io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter"
| directory = "target/commitLog"
| max-size = 50000
| passivate-after = 5s
| }
|}
""".stripMargin))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object ReplicatedSchemaCacheSpec extends MultiNodeConfig {
| writer = "io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter"
| directory = "target/commitLog"
| max-size = 50000
| passivate-after = 5s
| }
|}
""".stripMargin))
Expand Down
2 changes: 1 addition & 1 deletion nsdb-cluster/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ nsdb {
writer = "io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter"
directory = "target/commit-logs/"
max-size = 50000
check-interval = 30 seconds
passivate-after = 5s
}

cluster{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object CommitLogFile {
r = inputStream.read(contents)
}

inputStream.close()
(pending.toList, closedEntries.toList)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ trait CommitLogWriterActor extends ActorPathLogging {

def receive: Receive = {

case _ @WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: ReceivedEntryAction, location) =>
case WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: ReceivedEntryAction, location) =>
createEntry(
ReceivedEntry(db,
namespace,
Expand All @@ -184,7 +184,7 @@ trait CommitLogWriterActor extends ActorPathLogging {
sender() ! WriteToCommitLogFailed(db, namespace, timestamp, metric, ex.getMessage)
}

case _ @WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: AccumulatedEntryAction, location) =>
case WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: AccumulatedEntryAction, location) =>
createEntry(
AccumulatedEntry(db,
namespace,
Expand All @@ -196,7 +196,7 @@ trait CommitLogWriterActor extends ActorPathLogging {
case Failure(ex) =>
sender() ! WriteToCommitLogFailed(db, namespace, timestamp, metric, ex.getMessage)
}
case _ @WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: PersistedEntryAction, location) =>
case WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: PersistedEntryAction, location) =>
createEntry(
PersistedEntry(db,
namespace,
Expand All @@ -208,7 +208,7 @@ trait CommitLogWriterActor extends ActorPathLogging {
case Failure(ex) =>
sender() ! WriteToCommitLogFailed(db, namespace, timestamp, metric, ex.getMessage)
}
case _ @WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: RejectedEntryAction, location) =>
case WriteToCommitLog(db, namespace, metric, timestamp, bitEntryAction: RejectedEntryAction, location) =>
createEntry(
RejectedEntry(db,
namespace,
Expand All @@ -231,13 +231,13 @@ trait CommitLogWriterActor extends ActorPathLogging {
case Failure(ex) =>
sender() ! WriteToCommitLogFailed(db, namespace, timestamp, metric, ex.getMessage)
}
case _ @WriteToCommitLog(db, namespace, metric, timestamp, DeleteMetricAction, location) =>
case WriteToCommitLog(db, namespace, metric, timestamp, DeleteMetricAction, location) =>
createEntry(DeleteMetricEntry(db, namespace, metric, timestamp)) match {
case Success(_) => sender() ! WriteToCommitLogSucceeded(db, namespace, timestamp, metric, location)
case Failure(ex) =>
sender() ! WriteToCommitLogFailed(db, namespace, timestamp, metric, ex.getMessage)
}
case _ @WriteToCommitLog(db, namespace, _, timestamp, DeleteNamespaceAction, location) =>
case WriteToCommitLog(db, namespace, _, timestamp, DeleteNamespaceAction, location) =>
createEntry(DeleteNamespaceEntry(db, namespace, timestamp)) match {
case Success(_) => sender() ! WriteToCommitLogSucceeded(db, namespace, timestamp, "", location)
case Failure(ex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RollingCommitLogFileChecker(db: String, namespace: String, metric: String)
implicit val serializer: CommitLogSerializer =
Class.forName(serializerClass).newInstance().asInstanceOf[CommitLogSerializer]

val pendingOutdatedEntries: mutable.Map[File, (ListBuffer[Int], ListBuffer[Int])] = mutable.Map.empty
val pendingOutdatedEntries: mutable.Map[String, (ListBuffer[Int], ListBuffer[Int])] = mutable.Map.empty

private def isOlder(fileName: String, actualFileName: String): Boolean = {
fileName.split(fileNameSeparator).toList.last.toInt < actualFileName.split(fileNameSeparator).toList.last.toInt
Expand All @@ -79,9 +79,9 @@ class RollingCommitLogFileChecker(db: String, namespace: String, metric: String)
val processedFile = new File(s"$directory/$fileName")
val (pendingEntries, closedEntries) = processedFile.checkPendingEntries

pendingOutdatedEntries.get(processedFile) match {
pendingOutdatedEntries.get(fileName) match {
case None =>
pendingOutdatedEntries += (processedFile -> (pendingEntries.to[ListBuffer], closedEntries.to[ListBuffer]))
pendingOutdatedEntries += (fileName -> (pendingEntries.to[ListBuffer], closedEntries.to[ListBuffer]))
case Some(_) =>
}

Expand All @@ -90,20 +90,20 @@ class RollingCommitLogFileChecker(db: String, namespace: String, metric: String)
pendingOutdatedEntries.foreach {
case (file, (pending, _)) =>
if (pending.toList.contains(closedEntry)) {
log.debug(s"removing entry: $closedEntry in file ${file.getName} processing file: $fileName")
pendingOutdatedEntries(file)._1 -= closedEntry
pendingOutdatedEntries(processedFile)._2 -= closedEntry
log.debug(s"removing entry: $closedEntry in file $fileName processing file: $fileName")
pendingOutdatedEntries(fileName)._1 -= closedEntry
pendingOutdatedEntries(fileName)._2 -= closedEntry
pending -= closedEntry
}
log.debug(s"pending entries for file: ${file.getName} are : ${pending.size}")
log.debug(s"pending entries for file: $fileName are : ${pending.size}")
}
}
})
pendingOutdatedEntries.foreach {
case (file, (pending, _)) if pending.isEmpty =>
log.debug(s"deleting file: ${file.getName}")
pendingOutdatedEntries -= file
file.delete()
case (fileName, (pending, _)) if pending.isEmpty =>
log.debug(s"deleting file: $fileName")
pendingOutdatedEntries -= fileName
new File(s"$directory/$fileName").delete()
case _ =>
}
case msg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ package io.radicalbit.nsdb.commit_log

import java.io._
import java.nio.file.Paths
import java.util.concurrent.TimeUnit

import akka.actor.Props
import akka.actor.{PoisonPill, Props, ReceiveTimeout}
import com.typesafe.config.Config
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
import io.radicalbit.nsdb.commit_log.RollingCommitLogFileChecker.CheckFiles
import io.radicalbit.nsdb.util.Config._

import scala.concurrent.duration.FiniteDuration
import scala.util.Try

object RollingCommitLogFileWriter {
Expand Down Expand Up @@ -81,16 +83,20 @@ class RollingCommitLogFileWriter(db: String, namespace: String, metric: String)

private val childName = s"commit-log-checker-$db-$namespace-$metric"

log.info("Initializing the commit log serializer {}...", serializerClass)
private var file: File = _
private var fileOS: FileOutputStream = _

override protected implicit val serializer: CommitLogSerializer =
Class.forName(serializerClass).newInstance().asInstanceOf[CommitLogSerializer]
log.info("Commit log serializer {} initialized successfully.", serializerClass)

private var file: File = _
private var fileOS: FileOutputStream = _
lazy val passivateAfter = FiniteDuration(
context.system.settings.config.getDuration("nsdb.commit-log.passivate-after").toNanos,
TimeUnit.NANOSECONDS)

override def preStart(): Unit = {
context.setReceiveTimeout(passivateAfter)

override def preStart(): Unit = {
log.info("Initializing the commit log serializer {}...", serializerClass)
val checker = context.actorOf(RollingCommitLogFileChecker.props(db, namespace, metric), childName)

new File(directory).mkdirs()
Expand All @@ -112,6 +118,12 @@ class RollingCommitLogFileWriter(db: String, namespace: String, metric: String)

checker ! CheckFiles(file)

log.info("Commit log serializer {} initialized successfully.", serializerClass)
}

override def postStop(): Unit = {
log.debug(s"closing input stream for file $file")
Option(fileOS).foreach(os => os.close())
}

override protected def createEntry(entry: CommitLogEntry): Try[Unit] = {
Expand All @@ -128,8 +140,6 @@ class RollingCommitLogFileWriter(db: String, namespace: String, metric: String)
operation
}

protected def close(): Unit = fileOS.close()

protected def appendToDisk(entry: CommitLogEntry): Unit = {
fileOS.write(serializer.serialize(entry))
fileOS.flush()
Expand All @@ -142,7 +152,7 @@ class RollingCommitLogFileWriter(db: String, namespace: String, metric: String)
val f = newFile(current)

context.child(childName).foreach {
log.debug(s"Sending commitlog check for actual file : ${f.getName}")
log.debug(s"Sending commit log check for actual file : ${f.getName}")
_ ! CheckFiles(f)
}

Expand All @@ -160,11 +170,13 @@ class RollingCommitLogFileWriter(db: String, namespace: String, metric: String)
protected def newOutputStream(file: File): FileOutputStream = new FileOutputStream(file, true)

override def receive: Receive = super.receive orElse {
case ReceiveTimeout =>
self ! PoisonPill
case ForceRolling =>
val f = newFile(file)
file = f
context.child(childName).foreach {
log.debug(s"Sending commitlog check for actual file : ${f.getName}")
log.debug(s"Sending commit log check for actual file : ${f.getName}")
_ ! CheckFiles(f)
}
fileOS.close()
Expand Down
2 changes: 1 addition & 1 deletion nsdb-core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ nsdb{
writer = "io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter"
directory = "target/commit_log"
max-size = 50000
check-interval = 5 seconds
passivate-after = 6s
}

read-coordinator.timeout = 10 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.file.Paths
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor.{RejectedEntryAction, WriteToCommitLog}
import io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter.ForceRolling
import io.radicalbit.nsdb.model.Location
Expand All @@ -40,9 +40,8 @@ class RollingCommitLogFileWriterSpec
private val prefix = RollingCommitLogFileWriter.fileNamePrefix
private val fileNameSeparator = RollingCommitLogFileWriter.fileNameSeparator

private val interval = FiniteDuration(
system.settings.config.getDuration("nsdb.commit-log.check-interval", TimeUnit.SECONDS),
TimeUnit.SECONDS)
lazy val passivateAfter =
FiniteDuration(system.settings.config.getDuration("nsdb.commit-log.passivate-after").toNanos, TimeUnit.NANOSECONDS)

before {
new File(directory).mkdirs()
Expand Down Expand Up @@ -83,15 +82,14 @@ class RollingCommitLogFileWriterSpec

system.actorOf(RollingCommitLogFileWriter.props(db, namespace, metric))

Thread.sleep(interval.toMillis + 1000)

val existingFiles = Option(Paths.get(directory).toFile.list())
.map(_.toSet)
.getOrElse(Set.empty)

existingFiles.size shouldBe 1
existingFiles shouldBe Set(secondFileName)
awaitAssert {
val existingFiles = Option(Paths.get(directory).toFile.list())
.map(_.toSet)
.getOrElse(Set.empty)

existingFiles.size shouldBe 1
existingFiles shouldBe Set(secondFileName)
}
}

"check and delete old files when they are not balanced" in {
Expand All @@ -110,20 +108,26 @@ class RollingCommitLogFileWriterSpec

val rolling = system.actorOf(RollingCommitLogFileWriter.props(db, namespace, metric))

Thread.sleep(interval.toMillis + 1000)

rolling ! WriteToCommitLog(db, namespace, metric, 1, RejectedEntryAction(bit1), Location(metric, "node", 0, 0))

rolling ! ForceRolling

Thread.sleep(interval.toMillis + 1000)
awaitAssert {
val existingFiles = Option(Paths.get(directory).toFile.list())
.map(_.toSet)
.getOrElse(Set.empty)

val existingFiles = Option(Paths.get(directory).toFile.list())
.map(_.toSet)
.getOrElse(Set.empty)
existingFiles.size shouldBe 1
new File(s"$directory/${existingFiles.head}").length() shouldBe 0
}
}

"passivate itself after a period of inactivity" in {
val rolling = system.actorOf(RollingCommitLogFileWriter.props(db, namespace, metric))

existingFiles.size shouldBe 1
new File(s"$directory/${existingFiles.head}").length() shouldBe 0
val probe = TestProbe()
probe.watch(rolling)
probe.expectTerminated(rolling, passivateAfter)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion nsdb-it/src/test/resources/minicluster.conf
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ nsdb {
writer = "io.radicalbit.nsdb.commit_log.RollingCommitLogFileWriter"
directory = "/tmp/"
max-size = 50000
check-interval = 30 seconds
passivate-after = 1h
}

cluster{
Expand Down

0 comments on commit 75eddbc

Please sign in to comment.