Skip to content

Commit

Permalink
Storage Strategy (#117)
Browse files Browse the repository at this point in the history
* introduce storage strategy

* finalize storage strategy

* make concrete constructurs for file system privates

* remove obsolete index

* add storage strategy in production config file
  • Loading branch information
Saverio Veltri authored Feb 6, 2020
1 parent 488a92e commit e4cc73e
Show file tree
Hide file tree
Showing 22 changed files with 188 additions and 317 deletions.
2 changes: 2 additions & 0 deletions nsdb-cluster/src/main/resources/nsdb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ nsdb {
index-path = ${nsdb.storage.base-path}"/index"
commit-log-path = ${nsdb.storage.base-path}"/cl"
metadata-path = ${nsdb.storage.base-path}"/metadata"
strategy = "memory"
strategy = ${?STORAGE_STRATEGY}
}

commit-log {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import io.radicalbit.nsdb.cluster.createNodeName
import io.radicalbit.nsdb.cluster.logic.CapacityWriteNodesSelectionLogic
import io.radicalbit.nsdb.cluster.util.ErrorManagementUtils._
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
import io.radicalbit.nsdb.common.configuration.NSDbConfig
import io.radicalbit.nsdb.common.model.MetricInfo
import io.radicalbit.nsdb.common.protocol.{Coordinates, NSDbSerializable}
import io.radicalbit.nsdb.common.statement._
import io.radicalbit.nsdb.index.DirectorySupport
import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy}
import io.radicalbit.nsdb.model.{Location, Schema}
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
import io.radicalbit.nsdb.protocol.MessageProtocol.Events._
Expand Down Expand Up @@ -69,6 +70,9 @@ class MetadataCoordinator(clusterListener: ActorRef, metadataCache: ActorRef, sc
Timeout(config.getDuration("nsdb.metadata-coordinator.timeout", TimeUnit.SECONDS), TimeUnit.SECONDS)
import context.dispatcher

override lazy val indexStorageStrategy: StorageStrategy =
StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy))

lazy val defaultShardingInterval: Long =
config.getDuration("nsdb.sharding.interval").toMillis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import akka.pattern.{ask, pipe}
import akka.util.Timeout
import io.radicalbit.nsdb.cluster.coordinator.SchemaCoordinator.commands.DeleteNamespaceSchema
import io.radicalbit.nsdb.cluster.coordinator.SchemaCoordinator.events.NamespaceSchemaDeleted
import io.radicalbit.nsdb.common.configuration.NSDbConfig
import io.radicalbit.nsdb.common.protocol.{Coordinates, NSDbSerializable}
import io.radicalbit.nsdb.index.DirectorySupport
import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy}
import io.radicalbit.nsdb.model.Schema
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
import io.radicalbit.nsdb.protocol.MessageProtocol.Events._
Expand All @@ -45,6 +46,9 @@ class SchemaCoordinator(schemaCache: ActorRef) extends ActorPathLogging with Sta
TimeUnit.SECONDS)
import context.dispatcher

override lazy val indexStorageStrategy: StorageStrategy =
StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy))

/**
* Checks if a newSchema is compatible with an oldSchema. If schemas are compatible, the metric schema will be updated.
* @param namespace schema's namespace.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, Props, Stash}
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
import akka.util.Timeout
import io.radicalbit.nsdb.cluster.NsdbPerfLogger
import io.radicalbit.nsdb.cluster.PubSubTopics.{COORDINATORS_TOPIC, NODE_GUARDIANS_TOPIC}
import io.radicalbit.nsdb.cluster.actor.MetricsDataActor.{
AddRecordToLocation,
Expand All @@ -31,11 +32,11 @@ import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{GetL
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events.LocationsGot
import io.radicalbit.nsdb.cluster.coordinator.WriteCoordinator._
import io.radicalbit.nsdb.cluster.util.ErrorManagementUtils._
import io.radicalbit.nsdb.cluster.NsdbPerfLogger
import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._
import io.radicalbit.nsdb.common.configuration.NSDbConfig
import io.radicalbit.nsdb.common.protocol.Bit
import io.radicalbit.nsdb.common.statement.DeleteSQLStatement
import io.radicalbit.nsdb.index.DirectorySupport
import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy}
import io.radicalbit.nsdb.model.{Location, Schema}
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
import io.radicalbit.nsdb.protocol.MessageProtocol.Events._
Expand Down Expand Up @@ -76,6 +77,9 @@ class WriteCoordinator(metadataCoordinator: ActorRef, schemaCoordinator: ActorRe

lazy val shardingInterval: Duration = context.system.settings.config.getDuration("nsdb.sharding.interval")

override lazy val indexStorageStrategy: StorageStrategy =
StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy))

lazy val consistencyLevel: Int =
context.system.settings.config.getInt("nsdb.cluster.consistency-level")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ object ReplicatedSchemaCacheSpec extends MultiNodeConfig {
| publisher.scheduler.interval = 5 seconds
| write.scheduler.interval = 15 seconds
|
| index.base-path = "target/test_index/ReplicatedCacheSpec"
| write-coordinator.timeout = 5 seconds
| metadata-coordinator.timeout = 5 seconds
| commit-log {
Expand Down
1 change: 1 addition & 0 deletions nsdb-cluster/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ nsdb {
index-path = ${nsdb.storage.base-path}"/index"
commit-log-path = ${nsdb.storage.base-path}"/commit_log"
metadata-path = ${nsdb.storage.base-path}"/metadata"
strategy = "memory"
}

commit-log {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ package io.radicalbit.nsdb.cluster.index
import java.nio.file.Files
import java.util.UUID

import io.radicalbit.nsdb.index.DirectorySupport
import io.radicalbit.nsdb.index.StorageStrategy.Memory
import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy}
import io.radicalbit.nsdb.model.Location
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.index.{IndexWriter, IndexWriterConfig}
import org.scalatest.{FlatSpec, Matchers, OneInstancePerTest}

class LocationIndexTest extends FlatSpec with Matchers with OneInstancePerTest with DirectorySupport {

override def indexStorageStrategy: StorageStrategy = Memory

"LocationsIndex" should "write and read properly" in {

lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString))
lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString))

implicit val writer = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer))

Expand All @@ -54,7 +57,7 @@ class LocationIndexTest extends FlatSpec with Matchers with OneInstancePerTest w

"LocationsIndex" should "get a single location for a metric" in {

lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString))
lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString))

implicit val writer = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer))

Expand Down Expand Up @@ -84,5 +87,4 @@ class LocationIndexTest extends FlatSpec with Matchers with OneInstancePerTest w
Location(s"metric_0", s"node_0", 9, 10)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ import java.nio.file.Files
import java.util.UUID

import io.radicalbit.nsdb.common.model.MetricInfo
import io.radicalbit.nsdb.index.DirectorySupport
import io.radicalbit.nsdb.index.StorageStrategy.Memory
import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy}
import org.scalatest.{FlatSpec, Matchers, OneInstancePerTest}

class MetricInfoIndexTest extends FlatSpec with Matchers with OneInstancePerTest with DirectorySupport {

override def indexStorageStrategy: StorageStrategy = Memory

"MetricInfoIndex" should "write and read properly" in {

lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString))
lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString))

val metricInfoIndex = new MetricInfoIndex(directory)

Expand All @@ -53,7 +56,7 @@ class MetricInfoIndexTest extends FlatSpec with Matchers with OneInstancePerTest

"MetricInfoIndex" should "write and delete properly" in {

lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString))
lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString))

val metricInfoIndex = new MetricInfoIndex(directory)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ object NSDbConfig {
final val CommitLogBufferSize = "nsdb.commit-log.buffer-size"

final val StorageIndexPath = "nsdb.storage.index-path"
final val StorageStrategy = "nsdb.storage.strategy"

final val GrpcInterface = "nsdb.grpc.interface"
final val GrpcPort = "nsdb.grpc.port"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import akka.routing.Broadcast
import akka.util.Timeout
import io.radicalbit.nsdb.actors.MetricAccumulatorActor.Refresh
import io.radicalbit.nsdb.actors.MetricPerformerActor.PerformShardWrites
import io.radicalbit.nsdb.common.configuration.NSDbConfig
import io.radicalbit.nsdb.common.protocol.NSDbSerializable
import io.radicalbit.nsdb.index.StorageStrategy
import io.radicalbit.nsdb.model.Location
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
import io.radicalbit.nsdb.protocol.MessageProtocol.Events._
Expand Down Expand Up @@ -58,6 +60,9 @@ class MetricAccumulatorActor(val basePath: String,

implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

override lazy val indexStorageStrategy: StorageStrategy =
StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy))

/**
* Actor responsible for the actual writes into indexes.
*/
Expand Down Expand Up @@ -218,6 +223,7 @@ class MetricAccumulatorActor(val basePath: String,
sender() ! DeleteStatementFailed(db = db, namespace = namespace, metric = statement.metric, ex.getMessage)
}
case msg @ Refresh(writeIds, keys) =>
garbageCollectIndexes()
opBufferMap --= writeIds
performingOps = Map.empty
keys.foreach { key =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import akka.pattern.ask
import akka.util.Timeout
import io.radicalbit.nsdb.actors.MetricAccumulatorActor.Refresh
import io.radicalbit.nsdb.actors.MetricPerformerActor.{PerformRetry, PerformShardWrites, PersistedBit, PersistedBits}
import io.radicalbit.nsdb.common.configuration.NSDbConfig
import io.radicalbit.nsdb.common.exception.TooManyRetriesException
import io.radicalbit.nsdb.common.protocol.{Bit, NSDbSerializable}
import io.radicalbit.nsdb.index.AllFacetIndexes
import io.radicalbit.nsdb.common.protocol.Bit
import io.radicalbit.nsdb.index.{AllFacetIndexes, StorageStrategy}
import io.radicalbit.nsdb.model.Location
import io.radicalbit.nsdb.statement.StatementParser
import io.radicalbit.nsdb.util.ActorPathLogging
Expand Down Expand Up @@ -56,6 +59,9 @@ class MetricPerformerActor(val basePath: String,

private val toRetryOperations: ListBuffer[(ShardOperation, Int)] = ListBuffer.empty

override lazy val indexStorageStrategy: StorageStrategy =
StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy))

private val maxAttempts = context.system.settings.config.getInt("nsdb.write.retry-attempts")

def receive: Receive = {
Expand Down Expand Up @@ -153,7 +159,11 @@ class MetricPerformerActor(val basePath: String,
val facetIndexes = getOrCreatefacetIndexesFor(loc)
implicit val writer: IndexWriter = index.getWriter

val facets = new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = loc)
val facets = new AllFacetIndexes(basePath = basePath,
db = db,
namespace = namespace,
location = loc,
indexStorageStrategy = indexStorageStrategy)
val facetsIndexWriter = facets.newIndexWriter
val facetsTaxoWriter = facets.newDirectoryTaxonomyWriter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ trait MetricsActor extends DirectorySupport { this: Actor =>
shards.getOrElse(
location, {
val directory =
createMmapDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}"))
getDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}"))
val newIndex = new TimeSeriesIndex(directory)
shards += (location -> newIndex)
newIndex
Expand Down Expand Up @@ -120,7 +120,11 @@ trait MetricsActor extends DirectorySupport { this: Actor =>
shardsAccess += (location -> System.currentTimeMillis())
facetIndexShards.getOrElse(
location, {
val facetIndexes = new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = location)
val facetIndexes = new AllFacetIndexes(basePath = basePath,
db = db,
namespace = namespace,
location = location,
indexStorageStrategy = indexStorageStrategy)
facetIndexShards += (location -> facetIndexes)
facetIndexes
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit

import akka.actor.{PoisonPill, Props, ReceiveTimeout}
import io.radicalbit.nsdb.actors.ShardReaderActor.RefreshShard
import io.radicalbit.nsdb.common.configuration.NSDbConfig
import io.radicalbit.nsdb.common.protocol.{Bit, NSDbSerializable}
import io.radicalbit.nsdb.common.{NSDbNumericType, NSDbType}
import io.radicalbit.nsdb.index._
Expand Down Expand Up @@ -48,12 +49,16 @@ class ShardReaderActor(val basePath: String, val db: String, val namespace: Stri
extends ActorPathLogging
with DirectorySupport {

override lazy val indexStorageStrategy: StorageStrategy =
StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy))

lazy val directory: Directory =
createMmapDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}"))
getDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}"))

lazy val index = new TimeSeriesIndex(directory)

lazy val facetIndexes = new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = location)
lazy val facetIndexes =
new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = location, indexStorageStrategy)

lazy val passivateAfter: FiniteDuration = FiniteDuration(
context.system.settings.config.getDuration("nsdb.sharding.passivate-after").toNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ import org.apache.lucene.util.InfoStream
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

class AllFacetIndexes(basePath: String, db: String, namespace: String, location: Location)
class AllFacetIndexes(basePath: String,
db: String,
namespace: String,
location: Location,
override val indexStorageStrategy: StorageStrategy)
extends LazyLogging
with DirectorySupport {

private val directory =
createMmapDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}", "facet"))
getDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}", "facet"))

private val taxoDirectory = createMmapDirectory(
private val taxoDirectory = getDirectory(
Paths
.get(basePath, db, namespace, "shards", s"${location.shardName}", "facet", "taxo"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,63 @@ package io.radicalbit.nsdb.index

import java.nio.file.Path

import org.apache.lucene.store.{Directory, FileSwitchDirectory, MMapDirectory, NIOFSDirectory}

import scala.collection.JavaConverters._
import org.apache.lucene.store.MMapDirectory

/**
* Trait containing common Lucene [[org.apache.lucene.store.Directory]] custom factory methods.
*/
trait DirectorySupport {

def createMmapDirectory(path: Path): MMapDirectory = new MMapDirectory(path)
def indexStorageStrategy: StorageStrategy

/**
* extensions for norms, docvaules and term dictionaries
*/
private val PRIMARY_EXTENSIONS = Set("nvd", "dvd", "tim")

/**
* Creates an in memory directory.
* The memory allocated is off heap (mmap).
* @param path the root path.
* @return the mmap directory.
*/
private def createMmapDirectory(path: Path): MMapDirectory = new MMapDirectory(path)

/**
* Creates an file system directory.
* @param path the root path.
* @return the file system directory.
*/
private def createFileSystemDirectory(path: Path): NIOFSDirectory = new NIOFSDirectory(path)

/**
* Creates an hybrid Lucene Directory subclass that Maps in memory all the files with primaries extensions, all other files are served through NIOFS.
* @param path the root path.
* @return the hybrid directory.
*/
private def createHybridDirectory(path: Path): FileSwitchDirectory =
new FileSwitchDirectory(PRIMARY_EXTENSIONS.asJava, new MMapDirectory(path), new NIOFSDirectory(path), true) {

/**
* to avoid listall() call twice.
*/
override def listAll(): Array[String] = getPrimaryDir.listAll()
}

/**
* Creates a Directory based on the configured [StorageStrategy].
* @param path the root path.
* @return the directory.
*/
def getDirectory(path: Path): Directory = {
indexStorageStrategy match {
case StorageStrategy.Hybrid => createHybridDirectory(path)
case StorageStrategy.Memory => createMmapDirectory(path)
case StorageStrategy.FileSystem => createFileSystemDirectory(path)
}
}

}
Loading

0 comments on commit e4cc73e

Please sign in to comment.