diff --git a/ansible/group_vars/all b/ansible/group_vars/all index a80620016ec..86b7b024c78 100755 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -88,6 +88,8 @@ controller: authentication: spi: "{{ controller_authentication_spi | default('') }}" loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}" + username: "{{ controller_username | default(operation.username) }}" + password: "{{ controller_password | default(operation.password) }}" entitlement: spi: "{{ controller_entitlement_spi | default('') }}" protocol: "{{ controller_protocol | default('https') }}" @@ -183,6 +185,8 @@ invoker: docker: become: "{{ invoker_docker_become | default(false) }}" loglevel: "{{ invoker_loglevel | default(whisk_loglevel) | default('INFO') }}" + username: "{{ invoker_username | default(operation.username) }}" + password: "{{ invoker_password | default(operation.password) }}" jmxremote: jvmArgs: "{% if inventory_hostname in groups['invokers'] %} {{ jmx.jvmCommonArgs }} -Djava.rmi.server.hostname={{ invokerHostname }} -Dcom.sun.management.jmxremote.rmi.port={{ jmx.rmiBasePortInvoker + groups['invokers'].index(inventory_hostname) }} -Dcom.sun.management.jmxremote.port={{ jmx.basePortInvoker + groups['invokers'].index(inventory_hostname) }} @@ -379,3 +383,8 @@ metrics: port: "{{ metrics_kamon_statsd_port | default('8125') }}" user_events: "{{ user_events_enabled | default(false) }}" + +# Operation configuation +operation: + username: "{{ operation_username | default('admin') }}" + password: "{{ operation_password | default('admin') }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index e13b6f93ce0..e09c78dd29e 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -57,6 +57,18 @@ dest: "{{ controller.confdir }}/{{ controller_name }}/jmxremote.access" mode: 0777 +- name: copy controller auth username file + template: + src: "controllerauth.username.j2" + dest: "{{ controller.confdir }}/{{ controller_name }}/controllerauth.username" + mode: 0777 + +- name: copy controller auth password file + template: + src: "controllerauth.password.j2" + dest: "{{ controller.confdir }}/{{ controller_name }}/controllerauth.password" + mode: 0777 + - name: "copy kafka truststore/keystore" when: kafka.protocol == 'SSL' copy: diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 9e37b0308b1..3c816a7db40 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -159,6 +159,18 @@ dest: "{{ invoker.confdir }}/{{ invoker_name }}/jmxremote.access" mode: 0777 +- name: copy invoker auth username file + template: + src: "invokerauth.username.j2" + dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/invokerauth.username" + mode: 0777 + +- name: copy invoker auth password file + template: + src: "invokerauth.password.j2" + dest: "{{ invoker.confdir }}/invoker{{ groups['invokers'].index(inventory_hostname) }}/invokerauth.password" + mode: 0777 + - name: add additional jvm params if jmxremote is enabled when: jmx.enabled set_fact: diff --git a/ansible/templates/controllerauth.password.j2 b/ansible/templates/controllerauth.password.j2 new file mode 100644 index 00000000000..46e7f119989 --- /dev/null +++ b/ansible/templates/controllerauth.password.j2 @@ -0,0 +1 @@ +{{ controller.password }} diff --git a/ansible/templates/controllerauth.username.j2 b/ansible/templates/controllerauth.username.j2 new file mode 100644 index 00000000000..7739661801f --- /dev/null +++ b/ansible/templates/controllerauth.username.j2 @@ -0,0 +1 @@ +{{ controller.username }} diff --git a/ansible/templates/invokerauth.password.j2 b/ansible/templates/invokerauth.password.j2 new file mode 100644 index 00000000000..2d31b481cd3 --- /dev/null +++ b/ansible/templates/invokerauth.password.j2 @@ -0,0 +1 @@ +{{ invoker.password }} diff --git a/ansible/templates/invokerauth.username.j2 b/ansible/templates/invokerauth.username.j2 new file mode 100644 index 00000000000..cd915356156 --- /dev/null +++ b/ansible/templates/invokerauth.username.j2 @@ -0,0 +1 @@ +{{ invoker.username }} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index 07277cff509..a074b4828cc 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -282,3 +282,12 @@ object EventMessage extends DefaultJsonProtocol { def parse(msg: String) = format.read(msg.parseJson) } + +case class RuntimeMessage(runtime: String, operType: String) extends Message { + override def serialize = RuntimeMessage.serdes.write(this).compactPrint +} + +object RuntimeMessage extends DefaultJsonProtocol { + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime", "operType") +} diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala index 33439c5b28b..3ef0845cbc5 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala @@ -53,6 +53,14 @@ protected[core] object ExecManifest { mf } + protected[core] def initializePrewarm(prewarmRuntime: String): Try[Runtimes] = { + val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes) + val mf = Try(prewarmRuntime.parseJson.asJsObject).flatMap(runtimes(_, rmc)) + var manifest: Option[Runtimes] = None + mf.foreach(m => manifest = Some(m)) + mf + } + /** * Gets existing runtime manifests. * diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 5136a66b402..751feac5048 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -21,7 +21,8 @@ import akka.Done import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.event.Logging.InfoLevel import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.{StatusCodes, Uri} +import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import kamon.Kamon @@ -78,6 +79,17 @@ class Controller(val instance: ControllerInstanceId, implicit val logging: Logging) extends BasicRasService { + val controllerUsername = { + val source = scala.io.Source.fromFile("/conf/controllerauth.username"); + try source.mkString.replaceAll("\r|\n", "") + finally source.close() + } + val controllerPassword = { + val source = scala.io.Source.fromFile("/conf/controllerauth.password"); + try source.mkString.replaceAll("\r|\n", "") + finally source.close() + } + TransactionId.controller.mark( this, LoggingMarkers.CONTROLLER_STARTUP(instance.asString), @@ -95,7 +107,7 @@ class Controller(val instance: ControllerInstanceId, (pathEndOrSingleSlash & get) { complete(info) } - } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth + } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ changeRuntime } // initialize datastores @@ -154,6 +166,38 @@ class Controller(val instance: ControllerInstanceId, } } + /** + * Handles POST/DELETE prewarm container + */ + private val changeRuntime = { + implicit val executionContext = actorSystem.dispatcher + (path("prewarmContainer") & (post | delete)) { + extractCredentials { + case Some(BasicHttpCredentials(username, password)) => + if (username == controllerUsername && password == controllerPassword) { + extractMethod { method => + entity(as[String]) { prewarmRuntime => + val execManifest = ExecManifest.initializePrewarm(prewarmRuntime) + if (execManifest.isFailure) { + logging.error( + this, + s"Received invalid prewarm runtimes manifest with ${method.name} request:${execManifest.failed.get}") + complete(s"Received invalid prewarm runtimes manifest with ${method.name} request") + } else { + logging.info(this, s"Received valid prewarm runtimes manifest with ${method.name} request") + loadBalancer.sendRuntimeToManageInvoker(prewarmRuntime, method.name) + complete(s"Change prewarm container request with ${method.name} is already sent to managed invokers") + } + } + } + } else { + complete("username or password is wrong") + } + case _ => complete(StatusCodes.Unauthorized) + } + } + } + // controller top level info private val info = Controller.info( whiskConfig, diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala index da1e1b4fd69..ff7428f1159 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala @@ -59,6 +59,14 @@ trait LoadBalancer { def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] + /** + * send runtime to all managed invokers + * + * @param runtime + * @param operType add runtime or delete runtime + */ + def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit + /** * Returns a message indicating the health of the containers and/or container pool in general. * diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 914b3acafb5..cee13bdc049 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -288,6 +288,13 @@ class ShardingContainerPoolBalancer( override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue()) override def clusterSize: Int = schedulingState.clusterSize + /** send runtime to all managed invokers*/ + override def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit = { + schedulingState.managedInvokers foreach { invokerHealth => + sendRuntimeToInvoker(messageProducer, runtime, invokerHealth.id, operType) + } + } + /** 1. Publish a message to the loadbalancer */ override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = { @@ -423,6 +430,21 @@ class ShardingContainerPoolBalancer( } } + /** Send the runtime to the invoker */ + private def sendRuntimeToInvoker(producer: MessageProducer, + runtime: String, + invoker: InvokerInstanceId, + operType: String): Future[RecordMetadata] = { + val topic = s"invoker${invoker.toInt}" + val runtimeMessage = RuntimeMessage(runtime, operType) + producer.send(topic, runtimeMessage).andThen { + case Success(_) => + logging.info(this, s"Successfully posted runtime: ${runtime} to topic $topic") + case Failure(_) => + logging.error(this, s"Failed posted runtime: ${runtime} to topic $topic") + } + } + /** * Subscribes to active acks (completion messages from the invokers), and * registers a handler for received active acks from invokers. diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 81c542714fd..0e8ac5ffa33 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -22,6 +22,7 @@ import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId} import org.apache.openwhisk.core.connector.MessageFeed import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ +import spray.json.{JsNumber, JsObject} import scala.collection.immutable import scala.concurrent.duration._ @@ -32,6 +33,9 @@ case object Busy extends WorkerState case object Free extends WorkerState case class WorkerData(data: ContainerData, state: WorkerState) +case class AddPreWarmConfigList(list: List[PrewarmingConfig]) +case class DeletePreWarmConfigList(list: List[PrewarmingConfig]) +case class PreWarmConfig(kind: String, memory: ByteSize) /** * A pool managing containers to run actions on. @@ -93,6 +97,30 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } def receive: Receive = { + case prewarmConfigList: AddPreWarmConfigList => + prewarmConfigList.list foreach { config => + logging.info(this, s"add extra pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")( + TransactionId.invokerWarmup) + (1 to config.count).foreach { _ => + prewarmContainer(config.exec, config.memoryLimit) + } + } + + case prewarmConfigList: DeletePreWarmConfigList => + prewarmConfigList.list foreach { config => + logging.info(this, s"delete pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")( + TransactionId.invokerWarmup) + (1 to config.count).foreach { _ => + deletePrewarmContainer(config.exec.kind, config.memoryLimit) + } + } + + case prewarmConfig: PreWarmConfig => + val numberResponse = { + JsObject("number" -> JsNumber(getPrewarmContainerNumber(prewarmConfig.kind, prewarmConfig.memory))) + } + sender() ! numberResponse + // A job to run on a container // // Run messages are received either via the feed or from child containers which cannot process @@ -293,6 +321,38 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } } + /** + * Delete the prewarm container + * @param kind + * @param memory + * @return + */ + def deletePrewarmContainer(kind: String, memory: ByteSize) = { + prewarmedPool + .find { + case (_, PreWarmedData(_, `kind`, `memory`, _)) => true + case _ => false + } + .map { + case (ref, data) => + ref ! Remove + prewarmedPool = prewarmedPool - ref + } + } + + /** + * get the prewarm container number + * @param kind + * @param memory + * @return + */ + def getPrewarmContainerNumber(kind: String, memory: ByteSize) = { + prewarmedPool.filter { + case (_, PreWarmedData(_, `kind`, `memory`, _)) => true + case _ => false + }.size + } + /** Removes a container and updates state accordingly. */ def removeContainer(toDelete: ActorRef) = { toDelete ! Remove diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 2f35afbf81a..94ebfbf0ec5 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -46,6 +46,8 @@ object Invoker { protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol") + var invokerReactive: Option[InvokerReactive] = None + /** * An object which records the environment variables required for this component to run. */ @@ -157,7 +159,7 @@ object Invoker { } val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)) val invoker = try { - new InvokerReactive(config, invokerInstance, producer, poolConfig) + invokerReactive = Some(new InvokerReactive(config, invokerInstance, producer, poolConfig)) } catch { case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}") } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 5f682d71b4e..42fe92ce301 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -20,8 +20,9 @@ package org.apache.openwhisk.core.invoker import java.nio.charset.StandardCharsets import java.time.Instant -import akka.actor.{ActorRefFactory, ActorSystem, Props} +import akka.actor.{ActorRef, ActorRefFactory, ActorSystem, Props} import akka.event.Logging.InfoLevel +import akka.http.javadsl.model.HttpMethods import akka.stream.ActorMaterializer import org.apache.kafka.common.errors.RecordTooLargeException import pureconfig._ @@ -195,76 +196,107 @@ class InvokerReactive( private val pool = actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, activationFeed, prewarmingConfigs)) + def getContainerPoolInstance: ActorRef = { + pool + } + /** Is called when an ActivationMessage is read from Kafka */ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = { - Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8))) + Future( + ActivationMessage + .parse(new String(bytes, StandardCharsets.UTF_8)) + .orElse(RuntimeMessage.parse(new String(bytes, StandardCharsets.UTF_8)))) .flatMap(Future.fromTry) - .flatMap { msg => - // The message has been parsed correctly, thus the following code needs to *always* produce at least an - // active-ack. - - implicit val transid: TransactionId = msg.transid - - //set trace context to continue tracing - WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) - - if (!namespaceBlacklist.isBlacklisted(msg.user)) { - val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) - val namespace = msg.action.path - val name = msg.action.name - val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) - val subject = msg.user.subject - - logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") - - // caching is enabled since actions have revision id and an updated - // action will not hit in the cache due to change in the revision id; - // if the doc revision is missing, then bypass cache - if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}") - - WhiskAction - .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) - .flatMap { action => - action.toExecutableWhiskAction match { - case Some(executable) => - pool ! Run(executable, msg) + .flatMap { + case msg: ActivationMessage => + // The message has been parsed correctly, thus the following code needs to *always* produce at least an + // active-ack. + + implicit val transid: TransactionId = msg.transid + + //set trace context to continue tracing + WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) + + if (!namespaceBlacklist.isBlacklisted(msg.user)) { + val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) + val namespace = msg.action.path + val name = msg.action.name + val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) + val subject = msg.user.subject + + logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") + + // caching is enabled since actions have revision id and an updated + // action will not hit in the cache due to change in the revision id; + // if the doc revision is missing, then bypass cache + if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}") + + WhiskAction + .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) + .flatMap { action => + action.toExecutableWhiskAction match { + case Some(executable) => + pool ! Run(executable, msg) + Future.successful(()) + case None => + logging.error( + this, + s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") + Future.failed(new IllegalStateException("non-executable action reached the invoker")) + } + } + .recoverWith { + case t => + // If the action cannot be found, the user has concurrently deleted it, + // making this an application error. All other errors are considered system + // errors and should cause the invoker to be considered unhealthy. + val response = t match { + case _: NoDocumentException => + ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) + case _: DocumentTypeMismatchException | _: DocumentUnreadable => + ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) + case _ => + ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) + } + + val context = UserContext(msg.user) + val activation = generateFallbackActivation(msg, response) + activationFeed ! MessageFeed.Processed + ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true) + store(msg.transid, activation, context) Future.successful(()) - case None => - logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") - Future.failed(new IllegalStateException("non-executable action reached the invoker")) } - } - .recoverWith { - case t => - // If the action cannot be found, the user has concurrently deleted it, - // making this an application error. All other errors are considered system - // errors and should cause the invoker to be considered unhealthy. - val response = t match { - case _: NoDocumentException => - ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) - case _: DocumentTypeMismatchException | _: DocumentUnreadable => - ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) - case _ => - ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) + } else { + // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol + // Due to the protective nature of the blacklist, a database entry is not written. + activationFeed ! MessageFeed.Processed + val activation = + generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) + ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true) + logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") + Future.successful(()) + } + case msg: RuntimeMessage => + val execManifest = ExecManifest.initializePrewarm(msg.runtime) + if (execManifest.isFailure) { + logging.error( + this, + s"Received invalid prewarm runtimes manifest with operType:${msg.operType}:${execManifest.failed.get}") + Future.failed(new Exception(s"Received invalid prewarm runtimes manifest with operType:${msg.operType}")) + } else { + val prewarmingConfigs: List[PrewarmingConfig] = execManifest.get.stemcells.flatMap { + case (mf, cells) => + cells.map { cell => + PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), cell.memory) } - - val context = UserContext(msg.user) - val activation = generateFallbackActivation(msg, response) - activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true) - store(msg.transid, activation, context) - Future.successful(()) + }.toList + if (msg.operType == HttpMethods.POST.name) { + pool ! AddPreWarmConfigList(prewarmingConfigs) + } else if (msg.operType == HttpMethods.DELETE.name) { + pool ! DeletePreWarmConfigList(prewarmingConfigs) } - } else { - // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol - // Due to the protective nature of the blacklist, a database entry is not written. - activationFeed ! MessageFeed.Processed - val activation = - generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true) - logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") + } Future.successful(()) - } } .recoverWith { case t => diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerServer.scala new file mode 100644 index 00000000000..c8d722c871b --- /dev/null +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerServer.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.invoker + +import akka.actor.ActorSystem +import akka.http.javadsl.model.HttpMethods +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.server.Route +import akka.pattern.ask +import akka.util.Timeout +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.containerpool._ +import org.apache.openwhisk.core.entity.{CodeExecAsString, ExecManifest} +import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.http.BasicRasService +import spray.json.JsObject + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ + +/** + * Implements web server to handle certain REST API calls. + */ +class InvokerServer(implicit val ec: ExecutionContext, + implicit val actorSystem: ActorSystem, + implicit val logger: Logging) + extends BasicRasService { + + val invokerUsername = { + val source = scala.io.Source.fromFile("/conf/invokerauth.username"); + try source.mkString.replaceAll("\r|\n", "") + finally source.close() + } + val invokerPassword = { + val source = scala.io.Source.fromFile("/conf/invokerauth.password"); + try source.mkString.replaceAll("\r|\n", "") + finally source.close() + } + + override def routes(implicit transid: TransactionId): Route = { + super.routes ~ { + (path("prewarmContainer") & (post | delete)) { + extractCredentials { + case Some(BasicHttpCredentials(username, password)) => + if (username == invokerUsername && password == invokerPassword) { + extractMethod { method => + entity(as[String]) { prewarmRuntime => + val execManifest = ExecManifest.initializePrewarm(prewarmRuntime) + if (execManifest.isFailure) { + logger.error( + this, + s"Received invalid prewarm runtimes manifest with ${method.name} request:${execManifest.failed.get}") + complete(s"Received invalid prewarm runtimes manifest with ${method.name} request") + } else { + val prewarmingConfigs: List[PrewarmingConfig] = execManifest.get.stemcells.flatMap { + case (mf, cells) => + cells.map { cell => + PrewarmingConfig(cell.count, new CodeExecAsString(mf, "", None), cell.memory) + } + }.toList + if (method.name == HttpMethods.POST.name) { + Invoker.invokerReactive.get.getContainerPoolInstance ! AddPreWarmConfigList(prewarmingConfigs) + } else { + Invoker.invokerReactive.get.getContainerPoolInstance ! DeletePreWarmConfigList(prewarmingConfigs) + } + complete(s"Change prewarm container request with ${method.name} is handling") + } + } + } + } else { + complete("username or password is wrong") + } + case _ => complete(StatusCodes.Unauthorized) + } + } + } ~ { + (path("prewarmContainerNumber") & get) { + parameter('kind.as[String], 'memory.as[Int]) { (kind, memory) => + implicit val timeout = Timeout(5.seconds) + val numberFuture = Future { + Invoker.invokerReactive.get.getContainerPoolInstance + .ask(PreWarmConfig(kind, memory.MB)) + .mapTo[JsObject] + }.flatten + complete(numberFuture) + } + } + } + } +} diff --git a/docs/operation.md b/docs/operation.md new file mode 100644 index 00000000000..53fa2248704 --- /dev/null +++ b/docs/operation.md @@ -0,0 +1,80 @@ + + +# Prewarm container operation +## Add prewarm container on assigned invoker, e.g: +``` +curl -u ${username}:${password} -X POST http://${invokerAddress}:${invokerPort}/prewarmContainer -d ' +{ + "runtimes": { + "nodejs": [{ + "kind": "nodejs:6", + "default": true, + "image": { + "prefix": "openwhisk", + "name": "nodejs6action", + "tag": "latest" + }, + "deprecated": false, + "attached": { + "attachmentName": "codefile", + "attachmentType": "text/plain" + }, + "stemCells": [{ + "count": 2, + "memory": "128 MB" + }] + }] + } +} +' +``` +You can also send this request to controller to add prewarm container to all managed invokers +## Delete prewarm container on assigned invoker, e.g: +``` +curl -u ${username}:${password} -X DELETE http://${invokerAddress}:${invokerPort}/prewarmContainer -d ' +{ + "runtimes": { + "nodejs": [{ + "kind": "nodejs:6", + "default": true, + "image": { + "prefix": "openwhisk", + "name": "nodejs6action", + "tag": "latest" + }, + "deprecated": false, + "attached": { + "attachmentName": "codefile", + "attachmentType": "text/plain" + }, + "stemCells": [{ + "count": 2, + "memory": "128 MB" + }] + }] + } +} +' +``` +You can also send this request to controller to delete prewarm container to all managed invokers +## Get prewarm container from assigned invoker +``` +curl -X GET 'http://${invokerAddress}:${invokerPort}/prewarmContainerNumber?kind=nodejs:6&memory=128' +``` diff --git a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala index 9e586392170..3bf2ee23bd1 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/controller/test/ControllerTestCommon.scala @@ -235,6 +235,7 @@ class DegenerateLoadBalancerService(config: WhiskConfig)(implicit ec: ExecutionC // unit tests that need an activation via active ack/fast path should set this to value expected var whiskActivationStub: Option[(FiniteDuration, WhiskActivation)] = None + override def sendRuntimeToManageInvoker(runtime: String, operType: String): Unit = {} override def totalActiveActivations = Future.successful(0) override def activeActivationsFor(namespace: UUID) = Future.successful(0)