diff --git a/nsdb-cluster/src/main/resources/nsdb.conf b/nsdb-cluster/src/main/resources/nsdb.conf index ad3a1b350..c0d299108 100644 --- a/nsdb-cluster/src/main/resources/nsdb.conf +++ b/nsdb-cluster/src/main/resources/nsdb.conf @@ -148,4 +148,12 @@ nsdb { retention-size = 10 retention-size = ${?WS_RETENTION_SIZE} } + + retry-policy { + delay = 1 second + delay = ${?RETRY_POLICY_DELAY} + + n-retries = 2 + n-retries = ${?RETRY_POLICY_N_RETRIES} + } } \ No newline at end of file diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NSDbActors.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NSDbActors.scala index bf4dd2740..26250ce1e 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NSDbActors.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/NSDbActors.scala @@ -27,10 +27,8 @@ import akka.management.scaladsl.AkkaManagement import akka.util.Timeout import io.radicalbit.nsdb.cluster.actor._ -import scala.concurrent.ExecutionContextExecutor - /** - * Creates the top level actor [[DatabaseActorsGuardian]] and grpc endpoint [[GrpcEndpoint]] based on coordinators + * Creates the top level actor [[DatabaseActorsGuardian]] and grpc endpoint [[io.radicalbit.nsdb.cluster.endpoint.GrpcEndpoint]] based on coordinators */ trait NSDbActors { @@ -39,8 +37,6 @@ trait NSDbActors { implicit lazy val timeout: Timeout = Timeout(system.settings.config.getDuration("nsdb.global.timeout", TimeUnit.SECONDS), TimeUnit.SECONDS) - implicit lazy val executionContext: ExecutionContextExecutor = system.dispatcher - def initTopLevelActors(): Unit = { AkkaManagement(system).start() ClusterBootstrap(system).start() @@ -52,14 +48,15 @@ trait NSDbActors { name = "databaseActorGuardian" ) - DistributedData(system).replicator - system.actorOf( ClusterSingletonProxy.props(singletonManagerPath = "/user/databaseActorGuardian", settings = ClusterSingletonProxySettings(system)), name = "databaseActorGuardianProxy" ) - system.actorOf(Props[ClusterListener], name = s"cluster-listener_${createNodeName(Cluster(system).selfMember)}") + DistributedData(system).replicator + + system.actorOf(ClusterListener.props(true), + name = s"cluster-listener_${createNodeName(Cluster(system).selfMember)}") } } diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ClusterListener.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ClusterListener.scala index a950b97a0..64594126c 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ClusterListener.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/actor/ClusterListener.scala @@ -17,42 +17,47 @@ package io.radicalbit.nsdb.cluster.actor import java.nio.file.{Files, NoSuchFileException, Paths} +import java.util.concurrent.TimeUnit import akka.actor._ -import akka.cluster.Cluster +import akka.cluster.{Cluster, Member} import akka.cluster.ClusterEvent._ import akka.cluster.metrics.{ClusterMetricsChanged, ClusterMetricsExtension, Metric, NodeMetrics} import akka.cluster.pubsub.DistributedPubSub import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe} +import akka.event.LoggingAdapter import akka.pattern.ask import akka.remote.RemoteScope import akka.util.Timeout import io.radicalbit.nsdb.cluster.PubSubTopics._ +import io.radicalbit.nsdb.cluster._ import io.radicalbit.nsdb.cluster.actor.ClusterListener.{DiskOccupationChanged, GetNodeMetrics, NodeMetricsGot} import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{AddLocations, RemoveNodeMetadata} import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events._ -import io.radicalbit.nsdb.cluster.util.{ErrorManagementUtils, FileUtils} -import io.radicalbit.nsdb.cluster._ import io.radicalbit.nsdb.cluster.metrics.NSDbMetrics -import io.radicalbit.nsdb.model.LocationWithCoordinates -import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ +import io.radicalbit.nsdb.cluster.util.{ErrorManagementUtils, FileUtils} import io.radicalbit.nsdb.common.configuration.NSDbConfig.HighLevel._ import io.radicalbit.nsdb.common.protocol.NSDbSerializable +import io.radicalbit.nsdb.model.LocationWithCoordinates +import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ import io.radicalbit.nsdb.protocol.MessageProtocol.Events.{ CommitLogCoordinatorUnSubscribed, MetricsDataActorUnSubscribed, PublisherUnSubscribed } +import io.radicalbit.nsdb.util.FutureRetryUtility import scala.collection.mutable +import scala.concurrent.Future import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContextExecutor, Future} -import scala.util.{Failure, Success, Try} +import scala.util.{Success, Try} /** * Actor subscribed to akka cluster events. It creates all the actors needed when a node joins the cluster */ -class ClusterListener() extends Actor with ActorLogging { +class ClusterListener(enableClusterMetricsExtension: Boolean) extends Actor with ActorLogging with FutureRetryUtility { + + import context.dispatcher private lazy val cluster = Cluster(context.system) private lazy val clusterMetricSystem = ClusterMetricsExtension(context.system) @@ -63,8 +68,6 @@ class ClusterListener() extends Actor with ActorLogging { private lazy val config = context.system.settings.config private lazy val indexPath = config.getString(StorageIndexPath) - implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher - implicit val defaultTimeout: Timeout = Timeout(5.seconds) /** @@ -77,38 +80,69 @@ class ClusterListener() extends Actor with ActorLogging { */ private val nsdbMetrics: mutable.Map[String, Set[NodeMetrics]] = mutable.Map.empty + /** + * Retry policy + */ + private lazy val delay = FiniteDuration(config.getDuration(retryPolicyDelay, TimeUnit.SECONDS), TimeUnit.SECONDS) + private lazy val retries = config.getInt(retryPolicyNRetries) + override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) log.info("Created ClusterListener at path {} and subscribed to member events", self.path) - clusterMetricSystem.subscribe(self) + if (enableClusterMetricsExtension) clusterMetricSystem.subscribe(self) mediator ! Subscribe(NSDB_METRICS_TOPIC, self) } override def postStop(): Unit = cluster.unsubscribe(self) + protected def createNodeActorsGuardian(): ActorRef = + context.system.actorOf( + NodeActorsGuardian.props(self).withDeploy(Deploy(scope = RemoteScope(cluster.selfMember.address))), + name = s"guardian_$selfNodeName" + ) + + protected def retrieveLocationsToAdd: List[LocationWithCoordinates] = + FileUtils.getLocationsFromFilesystem(indexPath, selfNodeName) + + protected def onSuccessBehaviour(readCoordinator: ActorRef, + writeCoordinator: ActorRef, + metadataCoordinator: ActorRef, + publisherActor: ActorRef): Unit = + new NsdbNodeEndpoint(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)(context.system) + + protected def onFailureBehaviour(member: Member, error: Any): Unit = { + log.error("received wrong response {}", error) + cluster.leave(member.address) + } + + protected def onRemoveNodeMetadataResponse: RemoveNodeMetadataResponse => Unit = { + case NodeMetadataRemoved(nodeName) => + log.info(s"metadata successfully removed for node $nodeName") + case RemoveNodeMetadataFailed(nodeName) => + log.error(s"RemoveNodeMetadataFailed for node $nodeName") + } + def receive: Receive = { case MemberUp(member) if member == cluster.selfMember => log.info("Member is Up: {}", member.address) - val nodeName = createNodeName(member) - - val nodeActorsGuardian = - context.system.actorOf(NodeActorsGuardian.props(self).withDeploy(Deploy(scope = RemoteScope(member.address))), - name = s"guardian_$nodeName") + val nodeActorsGuardian = createNodeActorsGuardian() (nodeActorsGuardian ? GetNodeChildActors) .map { case NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, publisherActor) => mediator ! Subscribe(NODE_GUARDIANS_TOPIC, nodeActorsGuardian) - val locationsToAdd: Seq[LocationWithCoordinates] = - FileUtils.getLocationsFromFilesystem(indexPath, nodeName) + val locationsToAdd: Seq[LocationWithCoordinates] = retrieveLocationsToAdd val locationsGroupedBy: Map[(String, String), Seq[LocationWithCoordinates]] = locationsToAdd.groupBy { case LocationWithCoordinates(database, namespace, _) => (database, namespace) } + implicit val scheduler: Scheduler = context.system.scheduler + implicit val _log: LoggingAdapter = log + Future .sequence { locationsGroupedBy.map { @@ -119,16 +153,14 @@ class ClusterListener() extends Actor with ActorLogging { } } .map(ErrorManagementUtils.partitionResponses[LocationsAdded, AddLocationsFailed]) + .retry(delay, retries) { + case (_, addLocationsFailedList) => addLocationsFailedList.isEmpty + } .onComplete { case Success((_, failures)) if failures.isEmpty => - new NsdbNodeEndpoint(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)( - context.system) - case Success((_, failures)) => - log.error(s" failures $failures") - cluster.leave(member.address) - case Failure(ex) => - log.error(s" failure", ex) - cluster.leave(member.address) + onSuccessBehaviour(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor) + case e => + onFailureBehaviour(member, e) } case unknownResponse => log.error(s"unknown response from nodeActorsGuardian ? GetNodeChildActors $unknownResponse") @@ -136,27 +168,30 @@ class ClusterListener() extends Actor with ActorLogging { case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member) + val nodeName = createNodeName(member) + + implicit val scheduler: Scheduler = context.system.scheduler + implicit val _log: LoggingAdapter = log + (for { NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, _) <- (context.actorSelection( - s"/user/guardian_$selfNodeName") ? GetNodeChildActors).mapTo[NodeChildActorsGot] - _ <- (readCoordinator ? UnsubscribeMetricsDataActor(createNodeName(member))).mapTo[MetricsDataActorUnSubscribed] - _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(createNodeName(member))) + s"/user/guardian_$selfNodeName") ? GetNodeChildActors) + .mapTo[NodeChildActorsGot] + _ <- (readCoordinator ? UnsubscribeMetricsDataActor(nodeName)).mapTo[MetricsDataActorUnSubscribed] + _ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(nodeName)) .mapTo[CommitLogCoordinatorUnSubscribed] - _ <- (writeCoordinator ? UnSubscribePublisher(createNodeName(member))).mapTo[PublisherUnSubscribed] - _ <- (writeCoordinator ? UnsubscribeMetricsDataActor(createNodeName(member))) + _ <- (writeCoordinator ? UnSubscribePublisher(nodeName)).mapTo[PublisherUnSubscribed] + _ <- (writeCoordinator ? UnsubscribeMetricsDataActor(nodeName)) .mapTo[MetricsDataActorUnSubscribed] - _ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(createNodeName(member))) + _ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(nodeName)) .mapTo[MetricsDataActorUnSubscribed] - _ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(createNodeName(member))) + _ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(nodeName)) .mapTo[CommitLogCoordinatorUnSubscribed] - removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(createNodeName(member))) + removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(nodeName)) .mapTo[RemoveNodeMetadataResponse] - } yield removeNodeMetadataResponse).map { - case NodeMetadataRemoved(nodeName) => - log.info(s"metadata successfully removed for node $nodeName") - case RemoveNodeMetadataFailed(nodeName) => - log.error(s"RemoveNodeMetadataFailed for node $nodeName") - } + } yield removeNodeMetadataResponse) + .retry(delay, retries)(_.isInstanceOf[NodeMetadataRemoved]) + .map(onRemoveNodeMetadataResponse) case MemberRemoved(member, previousStatus) => log.info("Member is Removed: {} after {}", member.address, previousStatus) @@ -215,4 +250,6 @@ object ClusterListener { */ case class NodeMetricsGot(nodeMetrics: Set[NodeMetrics]) extends NSDbSerializable + def props(enableClusterMetricsExtension: Boolean) = Props(new ClusterListener(enableClusterMetricsExtension)) + } diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala index 4f8ad3426..8e898bcb7 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinator.scala @@ -85,9 +85,9 @@ class MetadataCoordinator(clusterListener: ActorRef, metadataCache: ActorRef, sc private val metricsDataActors: mutable.Map[String, ActorRef] = mutable.Map.empty private val commitLogCoordinators: mutable.Map[String, ActorRef] = mutable.Map.empty - private def getShardStartIstant(timestamp: Long, shardInterval: Long) = (timestamp / shardInterval) * shardInterval + private def getShardStartInstant(timestamp: Long, shardInterval: Long) = (timestamp / shardInterval) * shardInterval - private def getShardEndIstant(startShard: Long, shardInterval: Long) = startShard + shardInterval + private def getShardEndInstant(startShard: Long, shardInterval: Long) = startShard + shardInterval private def performAddLocationIntoCache(db: String, namespace: String, locations: Seq[Location]) = Future @@ -435,8 +435,8 @@ class MetadataCoordinator(clusterListener: ActorRef, metadataCache: ActorRef, sc shardInterval <- getShardInterval(db, namespace, metric) nodeMetrics <- (clusterListener ? GetNodeMetrics).mapTo[NodeMetricsGot] addLocationResult <- { - val start = getShardStartIstant(timestamp, shardInterval) - val end = getShardEndIstant(start, shardInterval) + val start = getShardStartInstant(timestamp, shardInterval) + val end = getShardEndInstant(start, shardInterval) val nodes = if (nodeMetrics.nodeMetrics.nonEmpty) diff --git a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/package.scala b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/package.scala index 4c67b7910..930310082 100644 --- a/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/package.scala +++ b/nsdb-cluster/src/main/scala/io/radicalbit/nsdb/cluster/package.scala @@ -31,11 +31,11 @@ package object cluster { * @param nodeName the node name [host]_[port] */ def createAddress(nodeName: String): Address = { - val splittedNodeName = nodeName.split("_") + val splitNodeName = nodeName.split("_") Address("nsdb", "NSDb", - Option(splittedNodeName(0)).getOrElse("noHost"), - Option(splittedNodeName(1)).map(_.toInt).getOrElse(0)) + Option(splitNodeName(0)).getOrElse("noHost"), + Option(splitNodeName(1)).map(_.toInt).getOrElse(0)) } final object PubSubTopics { diff --git a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerSpec.scala b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerSpec.scala new file mode 100644 index 000000000..a487c9914 --- /dev/null +++ b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerSpec.scala @@ -0,0 +1,166 @@ +package io.radicalbit.nsdb.cluster.actor + +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSelection, ActorSystem, Props} +import akka.cluster.ClusterEvent.UnreachableMember +import akka.cluster.pubsub.DistributedPubSubMediator.SubscribeAck +import akka.cluster.{Cluster, Member} +import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec} +import akka.testkit.{ImplicitSender, TestProbe} +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{AddLocations, RemoveNodeMetadata} +import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events +import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events.{ + AddLocationsFailed, + LocationsAdded, + NodeMetadataRemoved, + RemoveNodeMetadataFailed +} +import io.radicalbit.nsdb.model.{Location, LocationWithCoordinates} +import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ +import io.radicalbit.nsdb.protocol.MessageProtocol.Events.{ + CommitLogCoordinatorUnSubscribed, + MetricsDataActorUnSubscribed, + PublisherUnSubscribed +} +import io.radicalbit.rtsae.STMultiNodeSpec + +import scala.concurrent.duration._ + +object ClusterListenerSpecConfig extends MultiNodeConfig { + val node1 = role("node-1") + val node2 = role("node-2") + + commonConfig(ConfigFactory.parseString(""" + |akka.loglevel = ERROR + |akka.actor.provider = "cluster" + |nsdb { + | retry-policy { + | delay = 1 second + | n-retries = 2 + | } + |} + |""".stripMargin)) + +} + +class ClusterListenerSpecMultiJvmNode1 extends ClusterListenerSpec +class ClusterListenerSpecMultiJvmNode2 extends ClusterListenerSpec + +sealed trait TestType +case object SuccessTest extends TestType +case object FailureTest extends TestType + +class MetaDataCoordinatorForTest extends Actor with ActorLogging { + def receive: Receive = { + case AddLocations("success", namespace, locations) => + sender() ! LocationsAdded("success", namespace, locations) + case AddLocations("failure", namespace, locations) => + sender() ! AddLocationsFailed("failure", namespace, locations) + case UnsubscribeMetricsDataActor(nodeName) => sender() ! MetricsDataActorUnSubscribed(nodeName) + case UnSubscribeCommitLogCoordinator(nodeName) => sender() ! CommitLogCoordinatorUnSubscribed(nodeName) + case RemoveNodeMetadata(nodeName) => sender() ! RemoveNodeMetadataFailed(nodeName) + case _ => + log.warning("Unhandled message on purpose") + } +} + +class ReadCoordinatorForTest extends Actor with ActorLogging { + def receive: Receive = { + case UnsubscribeMetricsDataActor(nodeName) => sender() ! MetricsDataActorUnSubscribed(nodeName) + } +} + +class WriteCoordinatorForTest extends Actor with ActorLogging { + def receive: Receive = { + case UnSubscribeCommitLogCoordinator(nodeName) => sender() ! CommitLogCoordinatorUnSubscribed(nodeName) + case UnSubscribePublisher(nodeName) => sender() ! PublisherUnSubscribed(nodeName) + case UnsubscribeMetricsDataActor(nodeName) => sender() ! MetricsDataActorUnSubscribed(nodeName) + } +} + +class NodeActorsGuardianForTest extends Actor with ActorLogging { + private lazy val metaDataCoordinator = context.actorOf(Props(new MetaDataCoordinatorForTest)) + private lazy val writeCoordinator = context.actorOf(Props(new WriteCoordinatorForTest)) + private lazy val readCoordinator = context.actorOf(Props(new ReadCoordinatorForTest)) + + def receive: Receive = { + case GetNodeChildActors => + sender() ! NodeChildActorsGot(metaDataCoordinator, writeCoordinator, readCoordinator, ActorRef.noSender) + } +} + +class ClusterListenerForTest(resultActor: ActorRef, testType: TestType) + extends ClusterListener(false) { + + val nodeActorsGuardianForTest = + context.actorOf(Props(new NodeActorsGuardianForTest)) + + override val defaultTimeout = Timeout(1.seconds) + + override def receive: Receive = super.receive orElse { + case SubscribeAck(subscribe) => log.info("subscribe {}", subscribe) + } + override def createNodeActorsGuardian(): ActorRef = nodeActorsGuardianForTest + + override def retrieveLocationsToAdd(): List[LocationWithCoordinates] = testType match { + case SuccessTest => + List(LocationWithCoordinates("success", "namespace", Location("metric", "node", 0L, 1L))) + case FailureTest => + List(LocationWithCoordinates("failure", "namespace", Location("metric", "node", 0L, 1L))) + } + + override def onSuccessBehaviour(readCoordinator: ActorRef, + writeCoordinator: ActorRef, + metadataCoordinator: ActorRef, + publisherActor: ActorRef): Unit = { + resultActor ! "Success" + } + + override protected def onFailureBehaviour(member: Member, error: Any): Unit = { + resultActor ! "Failure" + } + + override protected def onRemoveNodeMetadataResponse: events.RemoveNodeMetadataResponse => Unit = { + case NodeMetadataRemoved(_) => //ignore + case RemoveNodeMetadataFailed(_) => resultActor ! "Failure" + } +} + +class ClusterListenerSpec extends MultiNodeSpec(ClusterListenerSpecConfig) with STMultiNodeSpec with ImplicitSender { + + import ClusterListenerSpecConfig._ + + def initialParticipants: Int = roles.size + + private val cluster = Cluster(system) + + "ClusterListener" must { + "successfully create a NsdbNodeEndpoint when a new member in the cluster is Up" in { + val resultActor = TestProbe("resultActor") + cluster.system.actorOf(Props(new ClusterListenerForTest(resultActor.testActor, SuccessTest))) + cluster.join(node(node1).address) + cluster.join(node(node2).address) + enterBarrier(5 seconds, "nodes joined") + awaitAssert(resultActor.expectMsg("Success")) + } + + "return a failure and leave the cluster" in { + val resultActor = TestProbe("resultActor") + cluster.system.actorOf(Props(new ClusterListenerForTest(resultActor.testActor, FailureTest))) + cluster.join(node(node1).address) + cluster.join(node(node2).address) + enterBarrier(5 seconds, "nodes joined") + awaitAssert(resultActor.expectMsg("Failure")) + } + + "correctly handle 'UnreachableMember' msg" in { + val resultActor = TestProbe("resultActor") + val clusterListener = + cluster.system.actorOf(Props(new ClusterListenerForTest(resultActor.testActor, FailureTest)), + name = "clusterListener") + awaitAssert { clusterListener ! UnreachableMember(cluster.selfMember); resultActor.expectMsg("Failure") } + } + } + +} diff --git a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerTestActor.scala b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerTestActor.scala index 85dc75902..e5deffe7b 100644 --- a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerTestActor.scala +++ b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/ClusterListenerTestActor.scala @@ -9,7 +9,7 @@ import akka.remote.RemoteScope import io.radicalbit.nsdb.cluster.PubSubTopics.NODE_GUARDIANS_TOPIC import io.radicalbit.nsdb.cluster.createNodeName -class ClusterListenerTestActor() extends Actor with ActorLogging { +class ClusterListenerTestActor extends Actor with ActorLogging { private lazy val cluster = Cluster(context.system) private lazy val mediator = DistributedPubSub(context.system).mediator diff --git a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/MetadataSpec.scala b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/MetadataSpec.scala index ac6bbeac7..7bd20ea5b 100644 --- a/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/MetadataSpec.scala +++ b/nsdb-cluster/src/multi-jvm/scala/io/radicalbit/nsdb/cluster/actor/MetadataSpec.scala @@ -1,6 +1,6 @@ package io.radicalbit.nsdb.cluster.actor -import akka.actor.{ActorRef, ActorSelection, Props} +import akka.actor.{ActorSelection, Props} import akka.cluster.pubsub.DistributedPubSub import akka.cluster.{Cluster, MemberStatus} import akka.remote.testconductor.RoleName @@ -14,14 +14,8 @@ import io.radicalbit.nsdb.common.model.MetricInfo import io.radicalbit.nsdb.common.protocol.{DimensionFieldType, TagFieldType, TimestampFieldType, ValueFieldType} import io.radicalbit.nsdb.index.{BIGINT, DECIMAL, VARCHAR} import io.radicalbit.nsdb.model.{Location, Schema, SchemaField} -import io.radicalbit.nsdb.protocol.MessageProtocol.Commands.{ - GetDbs, - GetMetricInfo, - GetMetrics, - GetNamespaces, - GetSchema -} -import io.radicalbit.nsdb.protocol.MessageProtocol.Events.{DbsGot, MetricInfoGot, MetricsGot, NamespacesGot, SchemaGot} +import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._ +import io.radicalbit.nsdb.protocol.MessageProtocol.Events._ import io.radicalbit.rtsae.STMultiNodeSpec import scala.concurrent.Await @@ -33,7 +27,7 @@ object MetadataSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" |akka.loglevel = ERROR - |akka.actor{ + |akka.actor { | provider = "cluster" | | serialization-bindings { @@ -45,7 +39,7 @@ object MetadataSpec extends MultiNodeConfig { | } |} |akka.log-dead-letters-during-shutdown = off - |nsdb{ + |nsdb { | | grpc { | interface = "0.0.0.0" diff --git a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinatorSpec.scala b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinatorSpec.scala index 8cec8b597..7925742f2 100644 --- a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinatorSpec.scala +++ b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/MetadataCoordinatorSpec.scala @@ -67,7 +67,7 @@ class MetadataCoordinatorSpec system.actorOf(SchemaCoordinator.props(schemaCache), "schemacoordinator") val metricsDataActorProbe = TestProbe() val metadataCache = system.actorOf(Props[LocalMetadataCache]) - val clusterListener = system.actorOf(Props[ClusterListener]) + val clusterListener = system.actorOf(ClusterListener.props(true)) val metadataCoordinator = system.actorOf(MetadataCoordinator.props(clusterListener, metadataCache, schemaCache, probe.ref)) diff --git a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/RetentionSpec.scala b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/RetentionSpec.scala index 700ee0365..9dda95896 100644 --- a/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/RetentionSpec.scala +++ b/nsdb-cluster/src/test/scala/io/radicalbit/nsdb/cluster/coordinator/RetentionSpec.scala @@ -98,7 +98,10 @@ class RetentionSpec val metadataCoordinator = system.actorOf( MetadataCoordinator - .props(system.actorOf(Props[ClusterListener]), localMetadataCache, schemaCache, system.actorOf(Props.empty)) + .props(system.actorOf(ClusterListener.props(true)), + localMetadataCache, + schemaCache, + system.actorOf(Props.empty)) .withDispatcher("akka.actor.control-aware-dispatcher"), "metadata-coordinator" ) diff --git a/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala b/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala index 787a4eb54..828df0749 100644 --- a/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala +++ b/nsdb-common/src/main/scala/io/radicalbit/nsdb/common/configuration/NSDbConfig.scala @@ -44,6 +44,9 @@ object NSDbConfig { final val ClusterMode = "nsdb.cluster.mode" final val MonitoringEnabled = "nsdb.monitoring.enabled" + + final val retryPolicyDelay = "nsdb.retry-policy.delay" + final val retryPolicyNRetries = "nsdb.retry-policy.n-retries" } object LowLevel { diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricReaderActor.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricReaderActor.scala index e8b04a6ba..80f6381b7 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricReaderActor.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/actors/MetricReaderActor.scala @@ -291,7 +291,7 @@ class MetricReaderActor(val basePath: String, nodeName: String, val db: String, } shardResults.pipeTo(sender) - case Right(ParsedAggregatedQuery(_, _, _, _ : InternalCountSimpleAggregation, _, _)) => + case Right(ParsedAggregatedQuery(_, _, _, _: InternalCountSimpleAggregation, _, _)) => val filteredIndexes = actorsForLocations(locations) @@ -323,7 +323,7 @@ class MetricReaderActor(val basePath: String, nodeName: String, val db: String, } rawResult.pipeTo(sender) - case Right(_ : ParsedTemporalAggregatedQuery) => + case Right(_: ParsedTemporalAggregatedQuery) => val actors = actorsForLocations(locations) diff --git a/nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FutureRetryUtility.scala b/nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FutureRetryUtility.scala index e1aaef613..56b648c2d 100644 --- a/nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FutureRetryUtility.scala +++ b/nsdb-core/src/main/scala/io/radicalbit/nsdb/util/FutureRetryUtility.scala @@ -25,24 +25,30 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} /** - * Utility Component for handling future retry mechanism - */ + * Utility Component for handling future retry mechanism + */ trait FutureRetryUtility { implicit class FutureRetry[T](f: => Future[T]) { - def retry(delay: FiniteDuration, - retries: Int)(implicit ec: ExecutionContext, s: Scheduler, log: LoggingAdapter): Future[T] = - f recoverWith { - case t if retries > 0 => log.warning("{}. Retrying...", t); after(delay, s)(retry(delay, retries - 1)) + def retry(delay: FiniteDuration, retries: Int)( + wasSuccessful: T => Boolean)(implicit ec: ExecutionContext, s: Scheduler, log: LoggingAdapter): Future[T] = + (for { + a <- f + result <- if (wasSuccessful(a) || retries < 1) Future(a) + else { log.warning("{}. Retrying...", a); after(delay, s)(retry(delay, retries - 1)(wasSuccessful)) } + } yield result) recoverWith { + case t if retries > 0 => + log.warning("{}. Retrying...", t); after(delay, s)(retry(delay, retries - 1)(wasSuccessful)) } } implicit class PipeToFutureRetry[T](f: => Future[T]) { - def pipeTo(delay: FiniteDuration, retries: Int, recipient: ActorRef)(implicit ec: ExecutionContext, - s: Scheduler, - log: LoggingAdapter, - sender: ActorRef = Actor.noSender) = - f.retry(delay, retries) andThen { + def pipeTo(delay: FiniteDuration, retries: Int, recipient: ActorRef)(wasSuccessful: T => Boolean = _ => true)( + implicit ec: ExecutionContext, + s: Scheduler, + log: LoggingAdapter, + sender: ActorRef = Actor.noSender) = + f.retry(delay, retries)(wasSuccessful) andThen { case Success(r) ⇒ recipient ! r case Failure(f) ⇒ recipient ! Status.Failure(f) } diff --git a/nsdb-core/src/test/scala/io/radicalbit/nsdb/util/FutureRetryUtilitySpec.scala b/nsdb-core/src/test/scala/io/radicalbit/nsdb/util/FutureRetryUtilitySpec.scala index 92dea9989..01bd5bc82 100644 --- a/nsdb-core/src/test/scala/io/radicalbit/nsdb/util/FutureRetryUtilitySpec.scala +++ b/nsdb-core/src/test/scala/io/radicalbit/nsdb/util/FutureRetryUtilitySpec.scala @@ -39,26 +39,27 @@ class FutureRetryUtilitySpec private final val retries: Int = 3 private def future(flag: Boolean) = - if (flag) Future.successful("Success") else Future.failed(new RuntimeException("Failure")) + if (flag) Future.successful(3) else Future.failed(new RuntimeException("Failure")) "retry function in FutureRetryUtility" must { "successfully returns whether, after retries, the future is eventually successful" in { - Await.result(future(true).retry(delay, retries), Duration.Inf) shouldBe "Success" + Await.result(future(true).retry(delay, retries)(_ > 2), Duration.Inf) shouldBe 3 } "thrown an Exception whether, after retries, the future eventually returns an Exception" in { - an[RuntimeException] shouldBe thrownBy(Await.result(future(false).retry(delay, retries), Duration.Inf)) + an[RuntimeException] shouldBe thrownBy(Await.result(future(false).retry(delay, retries)(_ => true), Duration.Inf)) } "consider the number of retries" in { val q = mutable.Queue(0) def future = { val nRetries = q.dequeue() - if (nRetries < 2) { q.enqueue(nRetries + 1); Future.failed(new RuntimeException) } - else { q.enqueue(nRetries + 1); Future.successful(nRetries) } + if (nRetries < 2) { q.enqueue(nRetries + 1); Future.failed(new RuntimeException) } else { + q.enqueue(nRetries + 1); Future.successful(nRetries) + } } - Await.result(future.retry(delay, retries), Duration.Inf) shouldBe 2 + Await.result(future.retry(delay, retries)(_ > 2), Duration.Inf) shouldBe 3 } } @@ -66,13 +67,13 @@ class FutureRetryUtilitySpec "returns a successful future and send the content of it through pipe" in { val testProbe = TestProbe("actor-test") - future(true).pipeTo(delay, retries, testProbe.testActor) - testProbe.expectMsg("Success") + future(true).pipeTo(delay, retries, testProbe.testActor)() + testProbe.expectMsg(3) } "return a failed future and send a status failure through pipe" in { val testProbe = TestProbe("actor-test") - future(false).pipeTo(delay, retries, testProbe.testActor) + future(false).pipeTo(delay, retries, testProbe.testActor)() testProbe.expectMsgAllClassOf(classOf[Status.Failure]) } } diff --git a/nsdb-it/src/main/resources/nsdb-minicluster.conf b/nsdb-it/src/main/resources/nsdb-minicluster.conf index 031746847..5d6a2254a 100644 --- a/nsdb-it/src/main/resources/nsdb-minicluster.conf +++ b/nsdb-it/src/main/resources/nsdb-minicluster.conf @@ -131,4 +131,12 @@ nsdb { //Websocket retention size retention-size = 10 } + + retry-policy { + delay = 1 second + delay = ${?RETRY_POLICY_DELAY} + + n-retries = 2 + n-retries = ${?RETRY_POLICY_N_RETRIES} + } } \ No newline at end of file