Skip to content

Commit

Permalink
Change prewarm container
Browse files Browse the repository at this point in the history
- Add prewarm container
- Delete prewarm container
- Add test case
  • Loading branch information
ningyougang committed Jan 31, 2019
1 parent b3a3ef4 commit 36ceff8
Show file tree
Hide file tree
Showing 18 changed files with 476 additions and 66 deletions.
9 changes: 9 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default(operation.username) }}"
password: "{{ controller_password | default(operation.password) }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down Expand Up @@ -183,6 +185,8 @@ invoker:
docker:
become: "{{ invoker_docker_become | default(false) }}"
loglevel: "{{ invoker_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ invoker_username | default(operation.username) }}"
password: "{{ invoker_password | default(operation.password) }}"
jmxremote:
jvmArgs: "{% if inventory_hostname in groups['invokers'] %}
{{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ invokerHostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortInvoker + groups['invokers'].index(inventory_hostname) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortInvoker + groups['invokers'].index(inventory_hostname) }}
Expand Down Expand Up @@ -379,3 +383,8 @@ metrics:
port: "{{ metrics_kamon_statsd_port | default('8125') }}"

user_events: "{{ user_events_enabled | default(false) }}"

# Operation configuation
operation:
username: "{{ operation_username | default('admin') }}"
password: "{{ operation_password | default('admin') }}"
12 changes: 12 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
dest: "{{ controller.confdir }}/{{ controller_name }}/jmxremote.access"
mode: 0777

- name: copy controller auth username file
template:
src: "controllerauth.username.j2"
dest: "{{ controller.confdir }}/{{ controller_name }}/controllerauth.username"
mode: 0777

- name: copy controller auth password file
template:
src: "controllerauth.password.j2"
dest: "{{ controller.confdir }}/{{ controller_name }}/controllerauth.password"
mode: 0777

- name: "copy kafka truststore/keystore"
when: kafka.protocol == 'SSL'
copy:
Expand Down
12 changes: 12 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@
dest: "{{ invoker.confdir }}/{{ invoker_name }}/jmxremote.access"
mode: 0777

- name: copy invoker auth username file
template:
src: "invokerauth.username.j2"
dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/invokerauth.username"
mode: 0777

- name: copy invoker auth password file
template:
src: "invokerauth.password.j2"
dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/invokerauth.password"
mode: 0777

- name: add additional jvm params if jmxremote is enabled
when: jmx.enabled
set_fact:
Expand Down
1 change: 1 addition & 0 deletions ansible/templates/controllerauth.password.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ controller.password }}
1 change: 1 addition & 0 deletions ansible/templates/controllerauth.username.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ controller.username }}
1 change: 1 addition & 0 deletions ansible/templates/invokerauth.password.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ invoker.password }}
1 change: 1 addition & 0 deletions ansible/templates/invokerauth.username.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ invoker.username }}
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,12 @@ object EventMessage extends DefaultJsonProtocol {

def parse(msg: String) = format.read(msg.parseJson)
}

case class RuntimeMessage(runtime: String, operType: String) extends Message {
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
}

object RuntimeMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime", "operType")
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ protected[core] object ExecManifest {
mf
}

protected[core] def initializePrewarm(prewarmRuntime: String): Try[Runtimes] = {
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
val mf = Try(prewarmRuntime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
var manifest: Option[Runtimes] = None
mf.foreach(m => manifest = Some(m))
mf
}

/**
* Gets existing runtime manifests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand Down Expand Up @@ -78,6 +79,17 @@ class Controller(val instance: ControllerInstanceId,
implicit val logging: Logging)
extends BasicRasService {

val controllerUsername = {
val source = scala.io.Source.fromFile("/conf/controllerauth.username");
try source.mkString.replaceAll("\r|\n", "")
finally source.close()
}
val controllerPassword = {
val source = scala.io.Source.fromFile("/conf/controllerauth.password");
try source.mkString.replaceAll("\r|\n", "")
finally source.close()
}

TransactionId.controller.mark(
this,
LoggingMarkers.CONTROLLER_STARTUP(instance.asString),
Expand All @@ -95,7 +107,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ changeRuntime
}

// initialize datastores
Expand Down Expand Up @@ -154,6 +166,38 @@ class Controller(val instance: ControllerInstanceId,
}
}

/**
* Handles POST/DELETE prewarm container
*/
private val changeRuntime = {
implicit val executionContext = actorSystem.dispatcher
(path("prewarmContainer") & (post | delete)) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerUsername && password == controllerPassword) {
extractMethod { method =>
entity(as[String]) { prewarmRuntime =>
val execManifest = ExecManifest.initializePrewarm(prewarmRuntime)
if (execManifest.isFailure) {
logging.error(
this,
s"Received invalid prewarm runtimes manifest with ${method.name} request:${execManifest.failed.get}")
complete(s"Received invalid prewarm runtimes manifest with ${method.name} request")
} else {
logging.info(this, s"Received valid prewarm runtimes manifest with ${method.name} request")
loadBalancer.sendRuntimeToManageInvoker(prewarmRuntime, method.name)
complete(s"Change prewarm container request with ${method.name} is already sent to managed invokers")
}
}
}
} else {
complete("username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}

// controller top level info
private val info = Controller.info(
whiskConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send runtime to all managed invokers
*
* @param runtime
* @param operType add runtime or delete runtime
*/
def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ class ShardingContainerPoolBalancer(
override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue())
override def clusterSize: Int = schedulingState.clusterSize

/** send runtime to all managed invokers*/
override def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit = {
schedulingState.managedInvokers foreach { invokerHealth =>
sendRuntimeToInvoker(messageProducer, runtime, invokerHealth.id, operType)
}
}

/** 1. Publish a message to the loadbalancer */
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
Expand Down Expand Up @@ -423,6 +430,21 @@ class ShardingContainerPoolBalancer(
}
}

/** Send the runtime to the invoker */
private def sendRuntimeToInvoker(producer: MessageProducer,
runtime: String,
invoker: InvokerInstanceId,
operType: String): Future[RecordMetadata] = {
val topic = s"invoker${invoker.toInt}"
val runtimeMessage = RuntimeMessage(runtime, operType)
producer.send(topic, runtimeMessage).andThen {
case Success(_) =>
logging.info(this, s"Successfully posted runtime: ${runtime} to topic $topic")
case Failure(_) =>
logging.error(this, s"Failed posted runtime: ${runtime} to topic $topic")
}
}

/**
* Subscribes to active acks (completion messages from the invokers), and
* registers a handler for received active acks from invokers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import spray.json.{JsNumber, JsObject}

import scala.collection.immutable
import scala.concurrent.duration._
Expand All @@ -32,6 +33,9 @@ case object Busy extends WorkerState
case object Free extends WorkerState

case class WorkerData(data: ContainerData, state: WorkerState)
case class AddPreWarmConfigList(list: List[PrewarmingConfig])
case class DeletePreWarmConfigList(list: List[PrewarmingConfig])
case class PreWarmConfig(kind: String, memory: ByteSize)

/**
* A pool managing containers to run actions on.
Expand Down Expand Up @@ -93,6 +97,30 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}

def receive: Receive = {
case prewarmConfigList: AddPreWarmConfigList =>
prewarmConfigList.list foreach { config =>
logging.info(this, s"add extra pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
}
}

case prewarmConfigList: DeletePreWarmConfigList =>
prewarmConfigList.list foreach { config =>
logging.info(this, s"delete pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
deletePrewarmContainer(config.exec.kind, config.memoryLimit)
}
}

case prewarmConfig: PreWarmConfig =>
val numberResponse = {
JsObject("number" -> JsNumber(getPrewarmContainerNumber(prewarmConfig.kind, prewarmConfig.memory)))
}
sender() ! numberResponse

// A job to run on a container
//
// Run messages are received either via the feed or from child containers which cannot process
Expand Down Expand Up @@ -293,6 +321,38 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
}

/**
* Delete the prewarm container
* @param kind
* @param memory
* @return
*/
def deletePrewarmContainer(kind: String, memory: ByteSize) = {
prewarmedPool
.find {
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
case _ => false
}
.map {
case (ref, data) =>
ref ! Remove
prewarmedPool = prewarmedPool - ref
}
}

/**
* get the prewarm container number
* @param kind
* @param memory
* @return
*/
def getPrewarmContainerNumber(kind: String, memory: ByteSize) = {
prewarmedPool.filter {
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
case _ => false
}.size
}

/** Removes a container and updates state accordingly. */
def removeContainer(toDelete: ActorRef) = {
toDelete ! Remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object Invoker {

protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")

var invokerReactive: Option[InvokerReactive] = None

/**
* An object which records the environment variables required for this component to run.
*/
Expand Down Expand Up @@ -157,7 +159,7 @@ object Invoker {
}
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
new InvokerReactive(config, invokerInstance, producer, poolConfig)
invokerReactive = Some(new InvokerReactive(config, invokerInstance, producer, poolConfig))
} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}
Expand Down
Loading

0 comments on commit 36ceff8

Please sign in to comment.