Skip to content

Commit

Permalink
Feature/add retry mechanism in cluster (#118)
Browse files Browse the repository at this point in the history
* introduce test for MemberUp method

* plug method in MemberUp ClusterListener with retry mechanism

* minor

* aggregate failure cases in MemberUp method

* update retry function

* unit test for UnreachableMember msg

* apply retry policy into UnreachableMember handler

* modify await timing

* awaitAssert for ClusterListenerSpec, retry policy config

* test, remove cluster metrics extension

* sbt fix

* parametrize clusterMetricSystem in ClusterListener

* refactoring ClusterListener Props

* insert nsdb retry policy props for minicluster

* remove ActorSelection mock, minor
  • Loading branch information
riccardo14 authored Feb 10, 2020
1 parent 57fbfb8 commit 60905f7
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 91 deletions.
8 changes: 8 additions & 0 deletions nsdb-cluster/src/main/resources/nsdb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,12 @@ nsdb {
retention-size = 10
retention-size = ${?WS_RETENTION_SIZE}
}

retry-policy {
delay = 1 second
delay = ${?RETRY_POLICY_DELAY}

n-retries = 2
n-retries = ${?RETRY_POLICY_N_RETRIES}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ import akka.management.scaladsl.AkkaManagement
import akka.util.Timeout
import io.radicalbit.nsdb.cluster.actor._

import scala.concurrent.ExecutionContextExecutor

/**
* Creates the top level actor [[DatabaseActorsGuardian]] and grpc endpoint [[GrpcEndpoint]] based on coordinators
* Creates the top level actor [[DatabaseActorsGuardian]] and grpc endpoint [[io.radicalbit.nsdb.cluster.endpoint.GrpcEndpoint]] based on coordinators
*/
trait NSDbActors {

Expand All @@ -39,8 +37,6 @@ trait NSDbActors {
implicit lazy val timeout: Timeout =
Timeout(system.settings.config.getDuration("nsdb.global.timeout", TimeUnit.SECONDS), TimeUnit.SECONDS)

implicit lazy val executionContext: ExecutionContextExecutor = system.dispatcher

def initTopLevelActors(): Unit = {
AkkaManagement(system).start()
ClusterBootstrap(system).start()
Expand All @@ -52,14 +48,15 @@ trait NSDbActors {
name = "databaseActorGuardian"
)

DistributedData(system).replicator

system.actorOf(
ClusterSingletonProxy.props(singletonManagerPath = "/user/databaseActorGuardian",
settings = ClusterSingletonProxySettings(system)),
name = "databaseActorGuardianProxy"
)

system.actorOf(Props[ClusterListener], name = s"cluster-listener_${createNodeName(Cluster(system).selfMember)}")
DistributedData(system).replicator

system.actorOf(ClusterListener.props(true),
name = s"cluster-listener_${createNodeName(Cluster(system).selfMember)}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,47 @@
package io.radicalbit.nsdb.cluster.actor

import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.concurrent.TimeUnit

import akka.actor._
import akka.cluster.Cluster
import akka.cluster.{Cluster, Member}
import akka.cluster.ClusterEvent._
import akka.cluster.metrics.{ClusterMetricsChanged, ClusterMetricsExtension, Metric, NodeMetrics}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
import akka.event.LoggingAdapter
import akka.pattern.ask
import akka.remote.RemoteScope
import akka.util.Timeout
import io.radicalbit.nsdb.cluster.PubSubTopics._
import io.radicalbit.nsdb.cluster._
import io.radicalbit.nsdb.cluster.actor.ClusterListener.{DiskOccupationChanged, GetNodeMetrics, NodeMetricsGot}
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.commands.{AddLocations, RemoveNodeMetadata}
import io.radicalbit.nsdb.cluster.coordinator.MetadataCoordinator.events._
import io.radicalbit.nsdb.cluster.util.{ErrorManagementUtils, FileUtils}
import io.radicalbit.nsdb.cluster._
import io.radicalbit.nsdb.cluster.metrics.NSDbMetrics
import io.radicalbit.nsdb.model.LocationWithCoordinates
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
import io.radicalbit.nsdb.cluster.util.{ErrorManagementUtils, FileUtils}
import io.radicalbit.nsdb.common.configuration.NSDbConfig.HighLevel._
import io.radicalbit.nsdb.common.protocol.NSDbSerializable
import io.radicalbit.nsdb.model.LocationWithCoordinates
import io.radicalbit.nsdb.protocol.MessageProtocol.Commands._
import io.radicalbit.nsdb.protocol.MessageProtocol.Events.{
CommitLogCoordinatorUnSubscribed,
MetricsDataActorUnSubscribed,
PublisherUnSubscribed
}
import io.radicalbit.nsdb.util.FutureRetryUtility

import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success, Try}
import scala.util.{Success, Try}

/**
* Actor subscribed to akka cluster events. It creates all the actors needed when a node joins the cluster
*/
class ClusterListener() extends Actor with ActorLogging {
class ClusterListener(enableClusterMetricsExtension: Boolean) extends Actor with ActorLogging with FutureRetryUtility {

import context.dispatcher

private lazy val cluster = Cluster(context.system)
private lazy val clusterMetricSystem = ClusterMetricsExtension(context.system)
Expand All @@ -63,8 +68,6 @@ class ClusterListener() extends Actor with ActorLogging {
private lazy val config = context.system.settings.config
private lazy val indexPath = config.getString(StorageIndexPath)

implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher

implicit val defaultTimeout: Timeout = Timeout(5.seconds)

/**
Expand All @@ -77,38 +80,69 @@ class ClusterListener() extends Actor with ActorLogging {
*/
private val nsdbMetrics: mutable.Map[String, Set[NodeMetrics]] = mutable.Map.empty

/**
* Retry policy
*/
private lazy val delay = FiniteDuration(config.getDuration(retryPolicyDelay, TimeUnit.SECONDS), TimeUnit.SECONDS)
private lazy val retries = config.getInt(retryPolicyNRetries)

override def preStart(): Unit = {
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
log.info("Created ClusterListener at path {} and subscribed to member events", self.path)
clusterMetricSystem.subscribe(self)
if (enableClusterMetricsExtension) clusterMetricSystem.subscribe(self)
mediator ! Subscribe(NSDB_METRICS_TOPIC, self)

}

override def postStop(): Unit = cluster.unsubscribe(self)

protected def createNodeActorsGuardian(): ActorRef =
context.system.actorOf(
NodeActorsGuardian.props(self).withDeploy(Deploy(scope = RemoteScope(cluster.selfMember.address))),
name = s"guardian_$selfNodeName"
)

protected def retrieveLocationsToAdd: List[LocationWithCoordinates] =
FileUtils.getLocationsFromFilesystem(indexPath, selfNodeName)

protected def onSuccessBehaviour(readCoordinator: ActorRef,
writeCoordinator: ActorRef,
metadataCoordinator: ActorRef,
publisherActor: ActorRef): Unit =
new NsdbNodeEndpoint(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)(context.system)

protected def onFailureBehaviour(member: Member, error: Any): Unit = {
log.error("received wrong response {}", error)
cluster.leave(member.address)
}

protected def onRemoveNodeMetadataResponse: RemoveNodeMetadataResponse => Unit = {
case NodeMetadataRemoved(nodeName) =>
log.info(s"metadata successfully removed for node $nodeName")
case RemoveNodeMetadataFailed(nodeName) =>
log.error(s"RemoveNodeMetadataFailed for node $nodeName")
}

def receive: Receive = {
case MemberUp(member) if member == cluster.selfMember =>
log.info("Member is Up: {}", member.address)

val nodeName = createNodeName(member)

val nodeActorsGuardian =
context.system.actorOf(NodeActorsGuardian.props(self).withDeploy(Deploy(scope = RemoteScope(member.address))),
name = s"guardian_$nodeName")
val nodeActorsGuardian = createNodeActorsGuardian()

(nodeActorsGuardian ? GetNodeChildActors)
.map {
case NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, publisherActor) =>
mediator ! Subscribe(NODE_GUARDIANS_TOPIC, nodeActorsGuardian)

val locationsToAdd: Seq[LocationWithCoordinates] =
FileUtils.getLocationsFromFilesystem(indexPath, nodeName)
val locationsToAdd: Seq[LocationWithCoordinates] = retrieveLocationsToAdd

val locationsGroupedBy: Map[(String, String), Seq[LocationWithCoordinates]] = locationsToAdd.groupBy {
case LocationWithCoordinates(database, namespace, _) => (database, namespace)
}

implicit val scheduler: Scheduler = context.system.scheduler
implicit val _log: LoggingAdapter = log

Future
.sequence {
locationsGroupedBy.map {
Expand All @@ -119,44 +153,45 @@ class ClusterListener() extends Actor with ActorLogging {
}
}
.map(ErrorManagementUtils.partitionResponses[LocationsAdded, AddLocationsFailed])
.retry(delay, retries) {
case (_, addLocationsFailedList) => addLocationsFailedList.isEmpty
}
.onComplete {
case Success((_, failures)) if failures.isEmpty =>
new NsdbNodeEndpoint(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)(
context.system)
case Success((_, failures)) =>
log.error(s" failures $failures")
cluster.leave(member.address)
case Failure(ex) =>
log.error(s" failure", ex)
cluster.leave(member.address)
onSuccessBehaviour(readCoordinator, writeCoordinator, metadataCoordinator, publisherActor)
case e =>
onFailureBehaviour(member, e)
}
case unknownResponse =>
log.error(s"unknown response from nodeActorsGuardian ? GetNodeChildActors $unknownResponse")
}
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)

val nodeName = createNodeName(member)

implicit val scheduler: Scheduler = context.system.scheduler
implicit val _log: LoggingAdapter = log

(for {
NodeChildActorsGot(metadataCoordinator, writeCoordinator, readCoordinator, _) <- (context.actorSelection(
s"/user/guardian_$selfNodeName") ? GetNodeChildActors).mapTo[NodeChildActorsGot]
_ <- (readCoordinator ? UnsubscribeMetricsDataActor(createNodeName(member))).mapTo[MetricsDataActorUnSubscribed]
_ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(createNodeName(member)))
s"/user/guardian_$selfNodeName") ? GetNodeChildActors)
.mapTo[NodeChildActorsGot]
_ <- (readCoordinator ? UnsubscribeMetricsDataActor(nodeName)).mapTo[MetricsDataActorUnSubscribed]
_ <- (writeCoordinator ? UnSubscribeCommitLogCoordinator(nodeName))
.mapTo[CommitLogCoordinatorUnSubscribed]
_ <- (writeCoordinator ? UnSubscribePublisher(createNodeName(member))).mapTo[PublisherUnSubscribed]
_ <- (writeCoordinator ? UnsubscribeMetricsDataActor(createNodeName(member)))
_ <- (writeCoordinator ? UnSubscribePublisher(nodeName)).mapTo[PublisherUnSubscribed]
_ <- (writeCoordinator ? UnsubscribeMetricsDataActor(nodeName))
.mapTo[MetricsDataActorUnSubscribed]
_ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(createNodeName(member)))
_ <- (metadataCoordinator ? UnsubscribeMetricsDataActor(nodeName))
.mapTo[MetricsDataActorUnSubscribed]
_ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(createNodeName(member)))
_ <- (metadataCoordinator ? UnSubscribeCommitLogCoordinator(nodeName))
.mapTo[CommitLogCoordinatorUnSubscribed]
removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(createNodeName(member)))
removeNodeMetadataResponse <- (metadataCoordinator ? RemoveNodeMetadata(nodeName))
.mapTo[RemoveNodeMetadataResponse]
} yield removeNodeMetadataResponse).map {
case NodeMetadataRemoved(nodeName) =>
log.info(s"metadata successfully removed for node $nodeName")
case RemoveNodeMetadataFailed(nodeName) =>
log.error(s"RemoveNodeMetadataFailed for node $nodeName")
}
} yield removeNodeMetadataResponse)
.retry(delay, retries)(_.isInstanceOf[NodeMetadataRemoved])
.map(onRemoveNodeMetadataResponse)

case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}", member.address, previousStatus)
Expand Down Expand Up @@ -215,4 +250,6 @@ object ClusterListener {
*/
case class NodeMetricsGot(nodeMetrics: Set[NodeMetrics]) extends NSDbSerializable

def props(enableClusterMetricsExtension: Boolean) = Props(new ClusterListener(enableClusterMetricsExtension))

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class MetadataCoordinator(clusterListener: ActorRef, metadataCache: ActorRef, sc
private val metricsDataActors: mutable.Map[String, ActorRef] = mutable.Map.empty
private val commitLogCoordinators: mutable.Map[String, ActorRef] = mutable.Map.empty

private def getShardStartIstant(timestamp: Long, shardInterval: Long) = (timestamp / shardInterval) * shardInterval
private def getShardStartInstant(timestamp: Long, shardInterval: Long) = (timestamp / shardInterval) * shardInterval

private def getShardEndIstant(startShard: Long, shardInterval: Long) = startShard + shardInterval
private def getShardEndInstant(startShard: Long, shardInterval: Long) = startShard + shardInterval

private def performAddLocationIntoCache(db: String, namespace: String, locations: Seq[Location]) =
Future
Expand Down Expand Up @@ -435,8 +435,8 @@ class MetadataCoordinator(clusterListener: ActorRef, metadataCache: ActorRef, sc
shardInterval <- getShardInterval(db, namespace, metric)
nodeMetrics <- (clusterListener ? GetNodeMetrics).mapTo[NodeMetricsGot]
addLocationResult <- {
val start = getShardStartIstant(timestamp, shardInterval)
val end = getShardEndIstant(start, shardInterval)
val start = getShardStartInstant(timestamp, shardInterval)
val end = getShardEndInstant(start, shardInterval)

val nodes =
if (nodeMetrics.nodeMetrics.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ package object cluster {
* @param nodeName the node name [host]_[port]
*/
def createAddress(nodeName: String): Address = {
val splittedNodeName = nodeName.split("_")
val splitNodeName = nodeName.split("_")
Address("nsdb",
"NSDb",
Option(splittedNodeName(0)).getOrElse("noHost"),
Option(splittedNodeName(1)).map(_.toInt).getOrElse(0))
Option(splitNodeName(0)).getOrElse("noHost"),
Option(splitNodeName(1)).map(_.toInt).getOrElse(0))
}

final object PubSubTopics {
Expand Down
Loading

0 comments on commit 60905f7

Please sign in to comment.