Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change prewarm container #4225

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default(operation.username) }}"
password: "{{ controller_password | default(operation.password) }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down Expand Up @@ -183,6 +185,8 @@ invoker:
docker:
become: "{{ invoker_docker_become | default(false) }}"
loglevel: "{{ invoker_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ invoker_username | default(operation.username) }}"
password: "{{ invoker_password | default(operation.password) }}"
jmxremote:
jvmArgs: "{% if inventory_hostname in groups['invokers'] %}
{{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ invokerHostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortInvoker + groups['invokers'].index(inventory_hostname) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortInvoker + groups['invokers'].index(inventory_hostname) }}
Expand Down Expand Up @@ -379,3 +383,8 @@ metrics:
port: "{{ metrics_kamon_statsd_port | default('8125') }}"

user_events: "{{ user_events_enabled | default(false) }}"

# Operation configuation
operation:
username: "{{ operation_username | default('admin') }}"
password: "{{ operation_password | default('admin') }}"
12 changes: 12 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@
dest: "{{ controller.confdir }}/{{ controller_name }}/jmxremote.access"
mode: 0777

- name: copy controller auth username file
template:
src: "controllerauth.username.j2"
dest: "{{ controller.confdir }}/{{ controller_name }}/controllerauth.username"
mode: 0777

- name: copy controller auth password file
template:
src: "controllerauth.password.j2"
dest: "{{ controller.confdir }}/{{ controller_name }}/controllerauth.password"
mode: 0777

- name: "copy kafka truststore/keystore"
when: kafka.protocol == 'SSL'
copy:
Expand Down
12 changes: 12 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@
dest: "{{ invoker.confdir }}/{{ invoker_name }}/jmxremote.access"
mode: 0777

- name: copy invoker auth username file
template:
src: "invokerauth.username.j2"
dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/invokerauth.username"
mode: 0777

- name: copy invoker auth password file
template:
src: "invokerauth.password.j2"
dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/invokerauth.password"
mode: 0777

- name: add additional jvm params if jmxremote is enabled
when: jmx.enabled
set_fact:
Expand Down
1 change: 1 addition & 0 deletions ansible/templates/controllerauth.password.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ controller.password }}
1 change: 1 addition & 0 deletions ansible/templates/controllerauth.username.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ controller.username }}
1 change: 1 addition & 0 deletions ansible/templates/invokerauth.password.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ invoker.password }}
1 change: 1 addition & 0 deletions ansible/templates/invokerauth.username.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{{ invoker.username }}
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,12 @@ object EventMessage extends DefaultJsonProtocol {

def parse(msg: String) = format.read(msg.parseJson)
}

case class RuntimeMessage(runtime: String, operType: String) extends Message {
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
}

object RuntimeMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime", "operType")
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ protected[core] object ExecManifest {
mf
}

protected[core] def initializePrewarm(prewarmRuntime: String): Try[Runtimes] = {
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
val mf = Try(prewarmRuntime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
var manifest: Option[Runtimes] = None
mf.foreach(m => manifest = Some(m))
mf
}

/**
* Gets existing runtime manifests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand Down Expand Up @@ -78,6 +79,17 @@ class Controller(val instance: ControllerInstanceId,
implicit val logging: Logging)
extends BasicRasService {

val controllerUsername = {
val source = scala.io.Source.fromFile("/conf/controllerauth.username");
try source.mkString.replaceAll("\r|\n", "")
finally source.close()
}
val controllerPassword = {
val source = scala.io.Source.fromFile("/conf/controllerauth.password");
try source.mkString.replaceAll("\r|\n", "")
finally source.close()
}

TransactionId.controller.mark(
this,
LoggingMarkers.CONTROLLER_STARTUP(instance.asString),
Expand All @@ -95,7 +107,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ changeRuntime
}

// initialize datastores
Expand Down Expand Up @@ -154,6 +166,38 @@ class Controller(val instance: ControllerInstanceId,
}
}

/**
* Handles POST/DELETE prewarm container
*/
private val changeRuntime = {
implicit val executionContext = actorSystem.dispatcher
(path("prewarmContainer") & (post | delete)) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerUsername && password == controllerPassword) {
extractMethod { method =>
entity(as[String]) { prewarmRuntime =>
val execManifest = ExecManifest.initializePrewarm(prewarmRuntime)
if (execManifest.isFailure) {
logging.error(
this,
s"Received invalid prewarm runtimes manifest with ${method.name} request:${execManifest.failed.get}")
complete(s"Received invalid prewarm runtimes manifest with ${method.name} request")
} else {
logging.info(this, s"Received valid prewarm runtimes manifest with ${method.name} request")
loadBalancer.sendRuntimeToManageInvoker(prewarmRuntime, method.name)
complete(s"Change prewarm container request with ${method.name} is already sent to managed invokers")
}
}
}
} else {
complete("username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}

// controller top level info
private val info = Controller.info(
whiskConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send runtime to all managed invokers
*
* @param runtime
* @param operType add runtime or delete runtime
*/
def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ class ShardingContainerPoolBalancer(
override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue())
override def clusterSize: Int = schedulingState.clusterSize

/** send runtime to all managed invokers*/
override def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit = {
schedulingState.managedInvokers foreach { invokerHealth =>
sendRuntimeToInvoker(messageProducer, runtime, invokerHealth.id, operType)
}
}

/** 1. Publish a message to the loadbalancer */
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
Expand Down Expand Up @@ -423,6 +430,21 @@ class ShardingContainerPoolBalancer(
}
}

/** Send the runtime to the invoker */
private def sendRuntimeToInvoker(producer: MessageProducer,
runtime: String,
invoker: InvokerInstanceId,
operType: String): Future[RecordMetadata] = {
val topic = s"invoker${invoker.toInt}"
val runtimeMessage = RuntimeMessage(runtime, operType)
producer.send(topic, runtimeMessage).andThen {
case Success(_) =>
logging.info(this, s"Successfully posted runtime: ${runtime} to topic $topic")
case Failure(_) =>
logging.error(this, s"Failed posted runtime: ${runtime} to topic $topic")
}
}

/**
* Subscribes to active acks (completion messages from the invokers), and
* registers a handler for received active acks from invokers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import spray.json.{JsNumber, JsObject}

import scala.collection.immutable
import scala.concurrent.duration._
Expand All @@ -32,6 +33,9 @@ case object Busy extends WorkerState
case object Free extends WorkerState

case class WorkerData(data: ContainerData, state: WorkerState)
case class AddPreWarmConfigList(list: List[PrewarmingConfig])
case class DeletePreWarmConfigList(list: List[PrewarmingConfig])
case class PreWarmConfig(kind: String, memory: ByteSize)

/**
* A pool managing containers to run actions on.
Expand Down Expand Up @@ -93,6 +97,30 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}

def receive: Receive = {
case prewarmConfigList: AddPreWarmConfigList =>
prewarmConfigList.list foreach { config =>
logging.info(this, s"add extra pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit)
}
}

case prewarmConfigList: DeletePreWarmConfigList =>
prewarmConfigList.list foreach { config =>
logging.info(this, s"delete pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.count).foreach { _ =>
deletePrewarmContainer(config.exec.kind, config.memoryLimit)
}
}

case prewarmConfig: PreWarmConfig =>
val numberResponse = {
JsObject("number" -> JsNumber(getPrewarmContainerNumber(prewarmConfig.kind, prewarmConfig.memory)))
}
sender() ! numberResponse

// A job to run on a container
//
// Run messages are received either via the feed or from child containers which cannot process
Expand Down Expand Up @@ -293,6 +321,38 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
}

/**
* Delete the prewarm container
* @param kind
* @param memory
* @return
*/
def deletePrewarmContainer(kind: String, memory: ByteSize) = {
prewarmedPool
.find {
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
case _ => false
}
.map {
case (ref, data) =>
ref ! Remove
prewarmedPool = prewarmedPool - ref
}
}

/**
* get the prewarm container number
* @param kind
* @param memory
* @return
*/
def getPrewarmContainerNumber(kind: String, memory: ByteSize) = {
prewarmedPool.filter {
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
case _ => false
}.size
}

/** Removes a container and updates state accordingly. */
def removeContainer(toDelete: ActorRef) = {
toDelete ! Remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object Invoker {

protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")

var invokerReactive: Option[InvokerReactive] = None

/**
* An object which records the environment variables required for this component to run.
*/
Expand Down Expand Up @@ -157,7 +159,7 @@ object Invoker {
}
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
new InvokerReactive(config, invokerInstance, producer, poolConfig)
invokerReactive = Some(new InvokerReactive(config, invokerInstance, producer, poolConfig))
} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}
Expand Down
Loading