Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ controller:
heap: "{{ controller_heap | default('2g') }}"
arguments: "{{ controller_arguments | default('') }}"
blackboxFraction: "{{ controller_blackbox_fraction | default(0.10) }}"
timeoutFactor: "{{ controller_timeout_factor | default(2) }}"
instances: "{{ groups['controllers'] | length }}"
localBookkeeping: "{{ controller_local_bookkeeping | default('false') }}"
akka:
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@
"{{ invoker.busyThreshold }}"
"CONFIG_whisk_loadbalancer_blackboxFraction":
"{{ controller.blackboxFraction }}"
"CONFIG_whisk_loadbalancer_timeoutFactor":
"{{ controller.timeoutFactor }}"

"CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
"CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
Expand Down
4 changes: 4 additions & 0 deletions core/controller/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ whisk {
loadbalancer {
invoker-busy-threshold: 4
blackbox-fraction: 10%
# factor to increase the timeout for forced active acks
# timeout = time-limit.std * timeoutfactor + 1m
# default is 2 because init and run can both use the configured timeout fully
timeout-factor = 2
}
controller {
protocol: http
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,16 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
None
}

private val lbConfig = loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer)

/** State related to invocations and throttling */
private val activations = TrieMap[ActivationId, ActivationEntry]()
private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
private val totalActivations = new LongAdder()
private val totalActivationMemory = new LongAdder()

/** State needed for scheduling. */
private val schedulingState = ShardingContainerPoolBalancerState()()
private val schedulingState = ShardingContainerPoolBalancerState()(lbConfig)

actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance), totalActivations.longValue)
Expand Down Expand Up @@ -254,7 +256,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
totalActivationMemory.add(action.limits.memory.megabytes)
activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new LongAdder()).increment()

val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 1.minute
// Timeout is a multiple of the configured maximum action duration. The minimum timeout is the configured standard
// value for action durations to avoid too tight timeouts.
// Timeouts in general are diluted by a configurable factor. In essence this factor controls how much slack you want
// to allow in your topics before you start reporting failed activations.
val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 1.minute

// Install a timeout handler for the catastrophic case where an active ack is not received at all
// (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
// the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
Expand Down Expand Up @@ -579,8 +586,9 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
*
* @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes
* @param invokerBusyThreshold how many slots an invoker has available in total
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + 1m)
*/
case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int)
case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int, timeoutFactor: Int)

/**
* State kept for each activation until completion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =
IndexedSeq.fill(count)(new ForcableSemaphore(max))

def lbConfig(blackboxFraction: Double, invokerBusyThreshold: Int) =
ShardingContainerPoolBalancerConfig(blackboxFraction, invokerBusyThreshold, 1)

it should "update invoker's state, growing the slots data and keeping valid old data" in {
// start empty
val slots = 10
val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
state.invokers shouldBe 'empty
state.blackboxInvokers shouldBe 'empty
state.managedInvokers shouldBe 'empty
Expand Down Expand Up @@ -85,7 +88,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str

it should "allow managed partition to overlap with blackbox for small N" in {
Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf =>
val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(bf, 1))
val state = ShardingContainerPoolBalancerState()(lbConfig(bf, 1))

(1 to 100).toSeq.foreach { i =>
state.updateInvokers((1 to i).map(_ => healthy(1)))
Expand All @@ -112,7 +115,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str

it should "update the cluster size, adjusting the invoker slots accordingly" in {
val slots = 10
val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
state.updateInvokers(IndexedSeq(healthy(0)))

state.invokerSlots.head.tryAcquire()
Expand All @@ -124,7 +127,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str

it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
val slots = 10
val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
state.updateInvokers(IndexedSeq(healthy(0)))

state.invokerSlots.head.availablePermits shouldBe slots
Expand All @@ -141,7 +144,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str

it should "set the threshold to 1 if the cluster is bigger than there are slots on 1 invoker" in {
val slots = 10
val state = ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, slots))
val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
state.updateInvokers(IndexedSeq(healthy(0)))

state.invokerSlots.head.availablePermits shouldBe slots
Expand Down