diff --git a/nsdb-cluster/src/main/resources/nsdb.conf b/nsdb-cluster/src/main/resources/nsdb.conf index b90c583c4..ad3a1b350 100644 --- a/nsdb-cluster/src/main/resources/nsdb.conf +++ b/nsdb-cluster/src/main/resources/nsdb.conf @@ -42,6 +42,8 @@ nsdb { index-path = ${nsdb.storage.base-path}"/index" commit-log-path = ${nsdb.storage.base-path}"/cl" metadata-path = ${nsdb.storage.base-path}"/metadata" + strategy = "memory" + strategy = ${?STORAGE_STRATEGY} } commit-log { diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala index cbc10b063..4f8ad3426 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala @@ -38,10 +38,11 @@ import io.radicalbit.nsdb.cluster.createNodeName import io.radicalbit.nsdb.cluster.logic.CapacityWriteNodesSelectionLogic import io.radicalbit.nsdb.cluster.util.ErrorManagementUtils._ import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._ +import io.radicalbit.nsdb.common.configuration.NSDbConfig import io.radicalbit.nsdb.common.model.MetricInfo import io.radicalbit.nsdb.common.protocol.{Coordinates, NSDbSerializable} import io.radicalbit.nsdb.common.statement._ -import io.radicalbit.nsdb.index.DirectorySupport +import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy} import io.radicalbit.nsdb.model.{Location, Schema} import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ import io.radicalbit.nsdb.protocol.MessageProtocol.Events._ @@ -69,6 +70,9 @@ class MetadataCoordinator(clusterListener: ActorRef, metadataCache: ActorRef, sc Timeout(config.getDuration("nsdb.metadata-coordinator.timeout", TimeUnit.SECONDS), TimeUnit.SECONDS) import context.dispatcher + override lazy val indexStorageStrategy: StorageStrategy = + StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy)) + lazy val defaultShardingInterval: Long = config.getDuration("nsdb.sharding.interval").toMillis diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/SchemaCoordinator.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/SchemaCoordinator.scala index 1b8265321..3f34e5e0f 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/SchemaCoordinator.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/SchemaCoordinator.scala @@ -23,8 +23,9 @@ import akka.pattern.{ask, pipe} import akka.util.Timeout import io.radicalbit.nsdb.cluster.coordinator.SchemaCoordinator.commands.DeleteNamespaceSchema import io.radicalbit.nsdb.cluster.coordinator.SchemaCoordinator.events.NamespaceSchemaDeleted +import io.radicalbit.nsdb.common.configuration.NSDbConfig import io.radicalbit.nsdb.common.protocol.{Coordinates, NSDbSerializable} -import io.radicalbit.nsdb.index.DirectorySupport +import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy} import io.radicalbit.nsdb.model.Schema import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ import io.radicalbit.nsdb.protocol.MessageProtocol.Events._ @@ -45,6 +46,9 @@ class SchemaCoordinator(schemaCache: ActorRef) extends ActorPathLogging with Sta TimeUnit.SECONDS) import context.dispatcher + override lazy val indexStorageStrategy: StorageStrategy = + StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy)) + /** * Checks if a newSchema is compatible with an oldSchema. If schemas are compatible, the metric schema will be updated. * @param namespace schema's namespace. diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala index 41ac9391b..461012211 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/WriteCoordinator.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import akka.actor.{ActorRef, Props, Stash} import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe} import akka.util.Timeout +import io.radicalbit.nsdb.cluster.NsdbPerfLogger import io.radicalbit.nsdb.cluster.PubSubTopics.{COORDINATORS_TOPIC, NODE_GUARDIANS_TOPIC} import io.radicalbit.nsdb.cluster.actor.MetricsDataActor.{ AddRecordToLocation, @@ -31,11 +32,11 @@ import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{GetL import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events.LocationsGot import io.radicalbit.nsdb.cluster.coordinator.WriteCoordinator._ import io.radicalbit.nsdb.cluster.util.ErrorManagementUtils._ -import io.radicalbit.nsdb.cluster.NsdbPerfLogger import io.radicalbit.nsdb.commit_log.CommitLogWriterActor._ +import io.radicalbit.nsdb.common.configuration.NSDbConfig import io.radicalbit.nsdb.common.protocol.Bit import io.radicalbit.nsdb.common.statement.DeleteSQLStatement -import io.radicalbit.nsdb.index.DirectorySupport +import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy} import io.radicalbit.nsdb.model.{Location, Schema} import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ import io.radicalbit.nsdb.protocol.MessageProtocol.Events._ @@ -76,6 +77,9 @@ class WriteCoordinator(metadataCoordinator: ActorRef, schemaCoordinator: ActorRe lazy val shardingInterval: Duration = context.system.settings.config.getDuration("nsdb.sharding.interval") + override lazy val indexStorageStrategy: StorageStrategy = + StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy)) + lazy val consistencyLevel: Int = context.system.settings.config.getInt("nsdb.cluster.consistency-level") diff --git a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedSchemaCacheSpec.scala b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedSchemaCacheSpec.scala index cef5bf701..d4b66e53f 100644 --- a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedSchemaCacheSpec.scala +++ b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ReplicatedSchemaCacheSpec.scala @@ -47,7 +47,6 @@ object ReplicatedSchemaCacheSpec extends MultiNodeConfig { | publisher.scheduler.interval = 5 seconds | write.scheduler.interval = 15 seconds | - | index.base-path = "target/test_index/ReplicatedCacheSpec" | write-coordinator.timeout = 5 seconds | metadata-coordinator.timeout = 5 seconds | commit-log { diff --git a/nsdb-cluster/src/test/resources/application.conf b/nsdb-cluster/src/test/resources/application.conf index bfd886697..12fa8e08f 100644 --- a/nsdb-cluster/src/test/resources/application.conf +++ b/nsdb-cluster/src/test/resources/application.conf @@ -41,6 +41,7 @@ nsdb { index-path = ${nsdb.storage.base-path}"/index" commit-log-path = ${nsdb.storage.base-path}"/commit_log" metadata-path = ${nsdb.storage.base-path}"/metadata" + strategy = "memory" } commit-log { diff --git a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/LocationIndexTest.scala b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/LocationIndexTest.scala index cdf260b75..de7a77861 100644 --- a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/LocationIndexTest.scala +++ b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/LocationIndexTest.scala @@ -19,7 +19,8 @@ package io.radicalbit.nsdb.cluster.index import java.nio.file.Files import java.util.UUID -import io.radicalbit.nsdb.index.DirectorySupport +import io.radicalbit.nsdb.index.StorageStrategy.Memory +import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy} import io.radicalbit.nsdb.model.Location import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.index.{IndexWriter, IndexWriterConfig} @@ -27,9 +28,11 @@ import org.scalatest.{FlatSpec, Matchers, OneInstancePerTest} class LocationIndexTest extends FlatSpec with Matchers with OneInstancePerTest with DirectorySupport { + override def indexStorageStrategy: StorageStrategy = Memory + "LocationsIndex" should "write and read properly" in { - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) + lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) implicit val writer = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer)) @@ -54,7 +57,7 @@ class LocationIndexTest extends FlatSpec with Matchers with OneInstancePerTest w "LocationsIndex" should "get a single location for a metric" in { - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) + lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) implicit val writer = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer)) @@ -84,5 +87,4 @@ class LocationIndexTest extends FlatSpec with Matchers with OneInstancePerTest w Location(s"metric_0", s"node_0", 9, 10) ) } - } diff --git a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/MetricInfoIndexTest.scala b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/MetricInfoIndexTest.scala index aacda4a8d..a54557122 100644 --- a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/MetricInfoIndexTest.scala +++ b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/index/MetricInfoIndexTest.scala @@ -20,14 +20,17 @@ import java.nio.file.Files import java.util.UUID import io.radicalbit.nsdb.common.model.MetricInfo -import io.radicalbit.nsdb.index.DirectorySupport +import io.radicalbit.nsdb.index.StorageStrategy.Memory +import io.radicalbit.nsdb.index.{DirectorySupport, StorageStrategy} import org.scalatest.{FlatSpec, Matchers, OneInstancePerTest} class MetricInfoIndexTest extends FlatSpec with Matchers with OneInstancePerTest with DirectorySupport { + override def indexStorageStrategy: StorageStrategy = Memory + "MetricInfoIndex" should "write and read properly" in { - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) + lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) val metricInfoIndex = new MetricInfoIndex(directory) @@ -53,7 +56,7 @@ class MetricInfoIndexTest extends FlatSpec with Matchers with OneInstancePerTest "MetricInfoIndex" should "write and delete properly" in { - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) + lazy val directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) val metricInfoIndex = new MetricInfoIndex(directory) diff --git a/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala b/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala index 88bb1cdc0..787a4eb54 100644 --- a/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala +++ b/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala @@ -32,6 +32,7 @@ object NSDbConfig { final val CommitLogBufferSize = "nsdb.commit-log.buffer-size" final val StorageIndexPath = "nsdb.storage.index-path" + final val StorageStrategy = "nsdb.storage.strategy" final val GrpcInterface = "nsdb.grpc.interface" final val GrpcPort = "nsdb.grpc.port" diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricAccumulatorActor.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricAccumulatorActor.scala index 5ba6c579d..2a7ef3196 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricAccumulatorActor.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricAccumulatorActor.scala @@ -25,7 +25,9 @@ import akka.routing.Broadcast import akka.util.Timeout import io.radicalbit.nsdb.actors.MetricAccumulatorActor.Refresh import io.radicalbit.nsdb.actors.MetricPerformerActor.PerformShardWrites +import io.radicalbit.nsdb.common.configuration.NSDbConfig import io.radicalbit.nsdb.common.protocol.NSDbSerializable +import io.radicalbit.nsdb.index.StorageStrategy import io.radicalbit.nsdb.model.Location import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ import io.radicalbit.nsdb.protocol.MessageProtocol.Events._ @@ -58,6 +60,9 @@ class MetricAccumulatorActor(val basePath: String, implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher + override lazy val indexStorageStrategy: StorageStrategy = + StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy)) + /** * Actor responsible for the actual writes into indexes. */ @@ -218,6 +223,7 @@ class MetricAccumulatorActor(val basePath: String, sender() ! DeleteStatementFailed(db = db, namespace = namespace, metric = statement.metric, ex.getMessage) } case msg @ Refresh(writeIds, keys) => + garbageCollectIndexes() opBufferMap --= writeIds performingOps = Map.empty keys.foreach { key => diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricPerformerActor.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricPerformerActor.scala index 570554726..3577a5e55 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricPerformerActor.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricPerformerActor.scala @@ -23,9 +23,12 @@ import akka.pattern.ask import akka.util.Timeout import io.radicalbit.nsdb.actors.MetricAccumulatorActor.Refresh import io.radicalbit.nsdb.actors.MetricPerformerActor.{PerformRetry, PerformShardWrites, PersistedBit, PersistedBits} +import io.radicalbit.nsdb.common.configuration.NSDbConfig import io.radicalbit.nsdb.common.exception.TooManyRetriesException import io.radicalbit.nsdb.common.protocol.{Bit, NSDbSerializable} import io.radicalbit.nsdb.index.AllFacetIndexes +import io.radicalbit.nsdb.common.protocol.Bit +import io.radicalbit.nsdb.index.{AllFacetIndexes, StorageStrategy} import io.radicalbit.nsdb.model.Location import io.radicalbit.nsdb.statement.StatementParser import io.radicalbit.nsdb.util.ActorPathLogging @@ -56,6 +59,9 @@ class MetricPerformerActor(val basePath: String, private val toRetryOperations: ListBuffer[(ShardOperation, Int)] = ListBuffer.empty + override lazy val indexStorageStrategy: StorageStrategy = + StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy)) + private val maxAttempts = context.system.settings.config.getInt("nsdb.write.retry-attempts") def receive: Receive = { @@ -153,7 +159,11 @@ class MetricPerformerActor(val basePath: String, val facetIndexes = getOrCreatefacetIndexesFor(loc) implicit val writer: IndexWriter = index.getWriter - val facets = new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = loc) + val facets = new AllFacetIndexes(basePath = basePath, + db = db, + namespace = namespace, + location = loc, + indexStorageStrategy = indexStorageStrategy) val facetsIndexWriter = facets.newIndexWriter val facetsTaxoWriter = facets.newDirectoryTaxonomyWriter diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricsActor.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricsActor.scala index e66756606..d5a220cb8 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricsActor.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricsActor.scala @@ -92,7 +92,7 @@ trait MetricsActor extends DirectorySupport { this: Actor => shards.getOrElse( location, { val directory = - createMmapDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}")) + getDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}")) val newIndex = new TimeSeriesIndex(directory) shards += (location -> newIndex) newIndex @@ -120,7 +120,11 @@ trait MetricsActor extends DirectorySupport { this: Actor => shardsAccess += (location -> System.currentTimeMillis()) facetIndexShards.getOrElse( location, { - val facetIndexes = new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = location) + val facetIndexes = new AllFacetIndexes(basePath = basePath, + db = db, + namespace = namespace, + location = location, + indexStorageStrategy = indexStorageStrategy) facetIndexShards += (location -> facetIndexes) facetIndexes } diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardReaderActor.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardReaderActor.scala index 153b95915..4f606d8c7 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardReaderActor.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/ShardReaderActor.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import akka.actor.{PoisonPill, Props, ReceiveTimeout} import io.radicalbit.nsdb.actors.ShardReaderActor.RefreshShard +import io.radicalbit.nsdb.common.configuration.NSDbConfig import io.radicalbit.nsdb.common.protocol.{Bit, NSDbSerializable} import io.radicalbit.nsdb.common.{NSDbNumericType, NSDbType} import io.radicalbit.nsdb.index._ @@ -48,12 +49,16 @@ class ShardReaderActor(val basePath: String, val db: String, val namespace: Stri extends ActorPathLogging with DirectorySupport { + override lazy val indexStorageStrategy: StorageStrategy = + StorageStrategy.withValue(context.system.settings.config.getString(NSDbConfig.HighLevel.StorageStrategy)) + lazy val directory: Directory = - createMmapDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}")) + getDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}")) lazy val index = new TimeSeriesIndex(directory) - lazy val facetIndexes = new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = location) + lazy val facetIndexes = + new AllFacetIndexes(basePath = basePath, db = db, namespace = namespace, location = location, indexStorageStrategy) lazy val passivateAfter: FiniteDuration = FiniteDuration( context.system.settings.config.getDuration("nsdb.sharding.passivate-after").toNanos, diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AllFacetIndexes.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AllFacetIndexes.scala index 0afc7482b..16dba3591 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AllFacetIndexes.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/AllFacetIndexes.scala @@ -30,14 +30,18 @@ import org.apache.lucene.util.InfoStream import scala.annotation.tailrec import scala.util.{Failure, Success, Try} -class AllFacetIndexes(basePath: String, db: String, namespace: String, location: Location) +class AllFacetIndexes(basePath: String, + db: String, + namespace: String, + location: Location, + override val indexStorageStrategy: StorageStrategy) extends LazyLogging with DirectorySupport { private val directory = - createMmapDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}", "facet")) + getDirectory(Paths.get(basePath, db, namespace, "shards", s"${location.shardName}", "facet")) - private val taxoDirectory = createMmapDirectory( + private val taxoDirectory = getDirectory( Paths .get(basePath, db, namespace, "shards", s"${location.shardName}", "facet", "taxo")) diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/DirectorySupport.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/DirectorySupport.scala index fcb3af784..5acfd0dbf 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/DirectorySupport.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/DirectorySupport.scala @@ -18,6 +18,9 @@ package io.radicalbit.nsdb.index import java.nio.file.Path +import org.apache.lucene.store.{Directory, FileSwitchDirectory, MMapDirectory, NIOFSDirectory} + +import scala.collection.JavaConverters._ import org.apache.lucene.store.MMapDirectory /** @@ -25,6 +28,53 @@ import org.apache.lucene.store.MMapDirectory */ trait DirectorySupport { - def createMmapDirectory(path: Path): MMapDirectory = new MMapDirectory(path) + def indexStorageStrategy: StorageStrategy + + /** + * extensions for norms, docvaules and term dictionaries + */ + private val PRIMARY_EXTENSIONS = Set("nvd", "dvd", "tim") + + /** + * Creates an in memory directory. + * The memory allocated is off heap (mmap). + * @param path the root path. + * @return the mmap directory. + */ + private def createMmapDirectory(path: Path): MMapDirectory = new MMapDirectory(path) + + /** + * Creates an file system directory. + * @param path the root path. + * @return the file system directory. + */ + private def createFileSystemDirectory(path: Path): NIOFSDirectory = new NIOFSDirectory(path) + + /** + * Creates an hybrid Lucene Directory subclass that Maps in memory all the files with primaries extensions, all other files are served through NIOFS. + * @param path the root path. + * @return the hybrid directory. + */ + private def createHybridDirectory(path: Path): FileSwitchDirectory = + new FileSwitchDirectory(PRIMARY_EXTENSIONS.asJava, new MMapDirectory(path), new NIOFSDirectory(path), true) { + + /** + * to avoid listall() call twice. + */ + override def listAll(): Array[String] = getPrimaryDir.listAll() + } + + /** + * Creates a Directory based on the configured [StorageStrategy]. + * @param path the root path. + * @return the directory. + */ + def getDirectory(path: Path): Directory = { + indexStorageStrategy match { + case StorageStrategy.Hybrid => createHybridDirectory(path) + case StorageStrategy.Memory => createMmapDirectory(path) + case StorageStrategy.FileSystem => createFileSystemDirectory(path) + } + } } diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SchemaIndex.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SchemaIndex.scala deleted file mode 100644 index 5cda05c48..000000000 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/SchemaIndex.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2018-2020 Radicalbit S.r.l. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.radicalbit.nsdb.index - -import io.radicalbit.nsdb.common.protocol._ -import io.radicalbit.nsdb.model.{Schema, SchemaField} -import io.radicalbit.nsdb.statement.StatementParser.SimpleField -import org.apache.lucene.document.Field.Store -import org.apache.lucene.document.{Document, Field, StringField} -import org.apache.lucene.index.{IndexWriter, Term} -import org.apache.lucene.search.TermQuery -import org.apache.lucene.store.Directory - -import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} - -/** - * Index for entry of class [[Schema]]. - * @param directory index base directory. - */ -class SchemaIndex(override val directory: Directory) extends SimpleIndex[Schema] { - - import SchemaIndex._ - - override val _keyField: String = "metric" - - override def validateRecord(data: Schema): Try[Seq[Field]] = - Success( - Seq( - new StringField(_keyField, data.metric, Store.YES) - ) ++ - data.fieldsMap.map { case (_, e) => new StringField(e.name, stringFieldValue(e), Store.YES) } - ) - - override def write(data: Schema)(implicit writer: IndexWriter): Try[Long] = { - val doc = new Document - validateRecord(data) match { - case Success(fields) => - Try { - fields.foreach(doc.add) - writer.addDocument(doc) - } - case Failure(t) => Failure(t) - } - } - - override def toRecord(document: Document, fields: Seq[SimpleField]): Schema = { - val fields = document.getFields.asScala.filterNot(f => f.name() == _keyField || f.name() == _countField) - Schema( - document.get(_keyField), - fields.map { f => - val (fieldType, indexType) = fieldValue(f.stringValue) - f.name() -> SchemaField(f.name(), fieldType, indexType) - }.toMap - ) - } - - def getSchema(metric: String): Option[Schema] = { - Try(query(_keyField, metric, Seq.empty, 1)(identity).headOption) match { - case Success(schemaOpt) => schemaOpt - case Failure(_) => None - } - } - - def update(metric: String, newSchema: Schema)(implicit writer: IndexWriter): Try[Long] = { - getSchema(metric) match { - case Some(oldSchema) => - delete(oldSchema) - write(newSchema) - case None => write(newSchema) - } - } - - override def delete(data: Schema)(implicit writer: IndexWriter): Try[Long] = - deleteMetricSchema(data.metric) - - def deleteMetricSchema(metric: String)(implicit writer: IndexWriter): Try[Long] = { - Try { - val query = new TermQuery(new Term(_keyField, metric)) - val result = writer.deleteDocuments(query) - writer.forceMergeDeletes(true) - result - } - } -} - -object SchemaIndex { - - def stringFieldValue(sf: SchemaField): String = s"${sf.fieldClassType}-${sf.indexType.getClass.getCanonicalName}" - - def fieldValue(fieldSchemaType: String): (FieldClassType, IndexType[_]) = { - val array = fieldSchemaType.split("-") - val fieldType = array(0) match { - case "TimestampFieldType" => TimestampFieldType - case "ValueFieldType" => ValueFieldType - case "DimensionFieldType" => DimensionFieldType - case "TagFieldType" => TagFieldType - } - (fieldType, Class.forName(array(1)).newInstance().asInstanceOf[IndexType[_]]) - } -} diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/StorageStrategy.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/StorageStrategy.scala new file mode 100644 index 000000000..11fcb4f04 --- /dev/null +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/StorageStrategy.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2018-2020 Radicalbit S.r.l. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.radicalbit.nsdb.index + +/** + * Index Storage Strategies + * Subclasses are + * + * - [[Memory]] Index is loaded in off-heap Memory (mmap) + * + * - [[FileSystem]] Index is loaded on the File system + * + * - [[Hybrid]] Index is loaded part in memory and on the File system + * + */ +sealed trait StorageStrategy + +object StorageStrategy { + + def withValue(storage: String): StorageStrategy = storage.toLowerCase match { + case "hybrid" => Hybrid + case "memory" => Memory + case "filesystem" => FileSystem + case _ => throw new IllegalArgumentException(s"unrecognized name for StorageStrategy $storage") + } + + case object Hybrid extends StorageStrategy + case object Memory extends StorageStrategy + case object FileSystem extends StorageStrategy +} diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TemporaryIndex.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TemporaryIndex.scala index ffb038320..952be0b9c 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TemporaryIndex.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/index/TemporaryIndex.scala @@ -19,9 +19,14 @@ package io.radicalbit.nsdb.index import java.nio.file.Files import java.util.UUID +import org.apache.lucene.store.Directory + /** * Concrete implementation of [[AbstractStructuredIndex]] which store data in memory. */ class TemporaryIndex extends AbstractStructuredIndex with DirectorySupport { - override lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) + + override def indexStorageStrategy: StorageStrategy = StorageStrategy.Memory + + override lazy val directory: Directory = getDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) } diff --git a/nsdb-core/src/test/resources/application.conf b/nsdb-core/src/test/resources/application.conf index 3f794a0c6..f7f0769e4 100644 --- a/nsdb-core/src/test/resources/application.conf +++ b/nsdb-core/src/test/resources/application.conf @@ -42,6 +42,7 @@ nsdb{ index-path = ${nsdb.storage.base-path}"/index" commit-log-path = ${nsdb.storage.base-path}"/commit_log" metadata-path = ${nsdb.storage.base-path}"/metadata" + strategy = "memory" } commit-log { diff --git a/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/FacetIndexTest.scala b/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/FacetIndexTest.scala index 1e17ad1ac..9a225d349 100644 --- a/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/FacetIndexTest.scala +++ b/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/FacetIndexTest.scala @@ -29,13 +29,16 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { val nodeName = "node1" + val indexStorageStrategy = StorageStrategy.Memory + "FacetIndex" should "write and read properly on disk" in { val facetIndexes = new AllFacetIndexes( basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter @@ -82,7 +85,8 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter @@ -125,7 +129,8 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter @@ -167,7 +172,8 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter @@ -212,7 +218,8 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter @@ -275,7 +282,8 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter @@ -315,7 +323,8 @@ class FacetIndexTest extends FlatSpec with Matchers with OneInstancePerTest { basePath = "target", db = "test_index", namespace = "test_facet_index", - location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue) + location = Location(metric = UUID.randomUUID.toString, node = nodeName, from = 0, to = Long.MaxValue), + indexStorageStrategy = indexStorageStrategy ) implicit val writer = facetIndexes.newIndexWriter diff --git a/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/SchemaIndexTest.scala b/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/SchemaIndexTest.scala deleted file mode 100644 index 80b2c60ad..000000000 --- a/nsdb-core/src/test/scala/io/radicalbit/nsdb/index/SchemaIndexTest.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright 2018-2020 Radicalbit S.r.l. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.radicalbit.nsdb.index - -import java.nio.file.Files -import java.util.UUID - -import io.radicalbit.nsdb.common.protocol.{DimensionFieldType, TagFieldType} -import io.radicalbit.nsdb.model.{Schema, SchemaField} -import org.scalatest.{FlatSpec, Matchers, OneInstancePerTest} - -import scala.util.Success - -class SchemaIndexTest extends FlatSpec with Matchers with OneInstancePerTest with DirectorySupport { - - "SchemaIndex" should "union schemas properly" in { - - val schema1 = Schema( - s"metric", - Map( - "field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", TagFieldType, VARCHAR()), - "field3" -> SchemaField("field3", DimensionFieldType, VARCHAR()) - ) - ) - - val schema2 = Schema( - s"metric", - Map( - "field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", TagFieldType, VARCHAR()) - ) - ) - - Schema.union(schema1, schema2) shouldBe Success(schema1) - - } - - "SchemaIndex" should "write and read properly" in { - - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) - - val schemaIndex = new SchemaIndex(directory) - - implicit val writer = schemaIndex.getWriter - - (0 to 100).foreach { i => - val testData = Schema( - s"metric_$i", - Map( - "field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", DimensionFieldType, VARCHAR()), - s"field$i" -> SchemaField(s"field$i", DimensionFieldType, VARCHAR()) - ) - ) - schemaIndex.write(testData) - } - writer.close() - - val result = schemaIndex.query(schemaIndex._keyField, "metric_*", Seq.empty, 100)(identity) - - result.size shouldBe 100 - - val firstSchema = schemaIndex.getSchema("metric_0") - - firstSchema.get.metric shouldBe "metric_0" - firstSchema.get.fieldsMap("field1") shouldBe SchemaField("field1", DimensionFieldType, BIGINT()) - - firstSchema.get.fieldsMap("field2") shouldBe SchemaField("field2", DimensionFieldType, VARCHAR()) - firstSchema.get.fieldsMap("field0") shouldBe SchemaField("field0", DimensionFieldType, VARCHAR()) - - firstSchema shouldBe Some( - Schema( - "metric_0", - Map( - "field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", DimensionFieldType, VARCHAR()), - "field0" -> SchemaField("field0", DimensionFieldType, VARCHAR()) - ) - )) - } - - "SchemaIndex" should "update records" in { - - val testData = Schema( - "metric_1", - Map("field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", DimensionFieldType, VARCHAR())) - ) - val testData2 = Schema( - "metric_1", - Map("field1" -> SchemaField("field1", DimensionFieldType, INT()), - "field2" -> SchemaField("field2", DimensionFieldType, VARCHAR())) - ) - - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) - - val schemaIndex = new SchemaIndex(directory) - - implicit val writer = schemaIndex.getWriter - - schemaIndex.write(testData) - writer.close() - - schemaIndex.getSchema("metric_1") shouldBe Some(testData) - - (1 to 100).foreach { i => - val writer2 = schemaIndex.getWriter - schemaIndex.update("metric_1", testData2)(writer2) - writer2.close() - schemaIndex.refresh() - } - - schemaIndex.all.size shouldBe 1 - - schemaIndex.getSchema("metric_1") shouldBe Some(testData2) - } - - "SchemaIndex" should "drop schema" in { - - lazy val directory = createMmapDirectory(Files.createTempDirectory(UUID.randomUUID().toString)) - - val schemaIndex = new SchemaIndex(directory) - - implicit val writer = schemaIndex.getWriter - - val testData = Schema( - s"metric_2", - Map("field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", DimensionFieldType, VARCHAR())) - ) - schemaIndex.write(testData) - - val testDataBis = Schema( - s"metric_3", - Map("field1" -> SchemaField("field1", DimensionFieldType, BIGINT()), - "field2" -> SchemaField("field2", DimensionFieldType, VARCHAR())) - ) - schemaIndex.write(testDataBis) - writer.close() - - implicit val writerDrop = schemaIndex.getWriter - - val result = schemaIndex.getSchema("metric_2") - - result shouldBe Some(testData) - - schemaIndex.delete(testData)(writerDrop) - - writerDrop.close() - - schemaIndex.refresh() - - schemaIndex.getSchema("metric_2") shouldBe None - schemaIndex.getSchema("metric_3") shouldBe Some(testDataBis) - - } - -} diff --git a/nsdb-it/src/main/resources/nsdb-minicluster.conf b/nsdb-it/src/main/resources/nsdb-minicluster.conf index ef12c2dbd..031746847 100644 --- a/nsdb-it/src/main/resources/nsdb-minicluster.conf +++ b/nsdb-it/src/main/resources/nsdb-minicluster.conf @@ -41,6 +41,7 @@ nsdb { index-path = ${nsdb.storage.base-path}"/index" commit-log-path = ${nsdb.storage.base-path}"/cl" metadata-path = ${nsdb.storage.base-path}"/metadata" + strategy = memory } commit-log {