Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/scala/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ whisk.spi {
LoadBalancerProvider = org.apache.openwhisk.core.loadBalancer.ShardingContainerPoolBalancer
EntitlementSpiProvider = org.apache.openwhisk.core.entitlement.LocalEntitlementProvider
AuthenticationDirectiveProvider = org.apache.openwhisk.core.controller.BasicAuthenticationDirective
InvokerProvider = org.apache.openwhisk.core.invoker.InvokerReactive
InvokerServerProvider = org.apache.openwhisk.core.invoker.DefaultInvokerServer
}

dispatchers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,22 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import pureconfig.loadConfigOrThrow
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.WhiskConfig._
import org.apache.openwhisk.core.connector.{MessagingProvider, PingMessage}
import org.apache.openwhisk.core.connector.{MessageProducer, MessagingProvider}
import org.apache.openwhisk.core.containerpool.ContainerPoolConfig
import org.apache.openwhisk.core.entity.{ExecManifest, InvokerInstanceId}
import org.apache.openwhisk.core.entity.ActivationEntityLimit
import org.apache.openwhisk.core.entity.{ActivationEntityLimit, ConcurrencyLimitConfig, ExecManifest, InvokerInstanceId}
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.{BasicHttpService, BasicRasService}
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.spi.{Spi, SpiLoader}
import org.apache.openwhisk.utils.ExecutionContextFactory
import pureconfig.loadConfigOrThrow

import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.{Failure, Try}
import scala.concurrent.{Await, ExecutionContext}
import scala.util.Try

case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)

Expand Down Expand Up @@ -71,6 +70,7 @@ object Invoker {
ActorSystem(name = "invoker-actor-system", defaultExecutionContext = Some(ec))
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
val poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
val limitConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit)

// Prepare Kamon shutdown
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseActorSystemTerminate, "shutdownKamon") { () =>
Expand Down Expand Up @@ -156,25 +156,50 @@ object Invoker {
.isFailure) {
abort(s"failure during msgProvider.ensureTopic for topic $topicName")
}

val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))
val invoker = try {
new InvokerReactive(config, invokerInstance, producer, poolConfig)
SpiLoader.get[InvokerProvider].instance(config, invokerInstance, producer, poolConfig, limitConfig)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered many options to apply SPI in invoker.
For example, it would be possible to keep InvokerReactive and apply SPI to ContainerPool and ContainerProxy level.
But I noticed that they are also highly dependent on InvokerReactive.
So it would be more natural to apply SPI at the highest level of implementation.
It will abstract and hide the fundamental implementation such as ContainerPool and ContainerProxy from the top level Invoker implementation.

Also, there could be some dependent RestAPI implementations corresponding to the Invoker implementation.
So I added SPI provider for InvokerServer which provides REST API as well.

} catch {
case e: Exception => abort(s"Failed to initialize reactive invoker: ${e.getMessage}")
}

Scheduler.scheduleWaitAtMost(1.seconds)(() => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some invoker implementations, health checking may not be performed via Kafka.
So I moved this to InvokerReactive.

producer.send("health", PingMessage(invokerInstance)).andThen {
case Failure(t) => logger.error(this, s"failed to ping the controller: $t")
}
})

val port = config.servicePort.toInt
val httpsConfig =
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None

BasicHttpService.startHttpService(new BasicRasService {}.route, port, httpsConfig)(
val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(
actorSystem,
ActorMaterializer.create(actorSystem))
}
}

/**
* An Spi for providing invoker implementation.
*/
trait InvokerProvider extends Spi {
def instance(config: WhiskConfig,
instance: InvokerInstanceId,
producer: MessageProducer,
poolConfig: ContainerPoolConfig,
limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore
}

// this trait can be used to add common implementation
trait InvokerCore {}

/**
* An Spi for providing RestAPI implementation for invoker.
* The given invoker may require corresponding RestAPI implementation.
*/
trait InvokerServerProvider extends Spi {
def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
}

object DefaultInvokerServer extends InvokerServerProvider {
override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new BasicRasService {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,24 @@ import akka.actor.{ActorRefFactory, ActorSystem, Props}
import akka.event.Logging.InfoLevel
import akka.stream.ActorMaterializer
import org.apache.kafka.common.errors.RecordTooLargeException
import pureconfig._
import spray.json._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool._
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database._
import org.apache.openwhisk.core.database.{UserContext, _}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.core.database.UserContext
import pureconfig._
import spray.json._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object InvokerReactive {
object InvokerReactive extends InvokerProvider {

/**
* An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
Expand All @@ -58,6 +56,15 @@ object InvokerReactive {
* @param Boolean is true this is resource free message and false if this is a result forwarding message
*/
type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any]

override def instance(
config: WhiskConfig,
instance: InvokerInstanceId,
producer: MessageProducer,
poolConfig: ContainerPoolConfig,
limitsConfig: ConcurrencyLimitConfig)(implicit actorSystem: ActorSystem, logging: Logging): InvokerCore =
new InvokerReactive(config, instance, producer, poolConfig, limitsConfig)

}

class InvokerReactive(
Expand All @@ -67,7 +74,8 @@ class InvokerReactive(
poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool),
limitsConfig: ConcurrencyLimitConfig = loadConfigOrThrow[ConcurrencyLimitConfig](ConfigKeys.concurrencyLimit))(
implicit actorSystem: ActorSystem,
logging: Logging) {
logging: Logging)
extends InvokerCore {

implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = actorSystem.dispatcher
Expand Down Expand Up @@ -299,4 +307,10 @@ class InvokerReactive(
})
}

private val healthProducer = msgProvider.getProducer(config)
Scheduler.scheduleWaitAtMost(1.seconds)(() => {
healthProducer.send("health", PingMessage(instance)).andThen {
case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
}
})
}