Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit 8fea9ce

Browse files
Feature: Disable scheduling on group of nodes (#540)
1 parent 02f336b commit 8fea9ce

File tree

20 files changed

+227
-23
lines changed

20 files changed

+227
-23
lines changed

aztk/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@
1515
from .vm_image import VmImage
1616
from .software import Software
1717
from .cluster import Cluster
18+
from .scheduling_target import SchedulingTarget
1819
from .plugins import *

aztk/models/cluster_configuration.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import aztk.error as error
22
from aztk.core.models import Model, fields
3-
from aztk.utils import deprecate, helpers
3+
from aztk.utils import deprecated,deprecate, helpers
44

55
from .custom_script import CustomScript
66
from .file_share import FileShare
77
from .plugins import PluginConfiguration
88
from .toolkit import Toolkit
99
from .user_configuration import UserConfiguration
10-
10+
from .scheduling_target import SchedulingTarget
1111

1212
class ClusterConfiguration(Model):
1313
"""
@@ -37,6 +37,7 @@ class ClusterConfiguration(Model):
3737
custom_scripts = fields.List(CustomScript)
3838
file_shares = fields.List(FileShare)
3939
user_configuration = fields.Model(UserConfiguration, default=None)
40+
scheduling_target = fields.Enum(SchedulingTarget, default=None)
4041

4142
def __init__(self, *args, **kwargs):
4243
if 'vm_count' in kwargs:
@@ -49,6 +50,26 @@ def __init__(self, *args, **kwargs):
4950

5051
super().__init__(*args, **kwargs)
5152

53+
@property
54+
@deprecated()
55+
def vm_count(self):
56+
return self.size
57+
58+
@vm_count.setter
59+
@deprecated()
60+
def vm_count(self, value):
61+
self.size = value
62+
63+
@property
64+
@deprecated()
65+
def vm_low_pri_count(self):
66+
return self.size_low_priority
67+
68+
@vm_low_pri_count.setter
69+
@deprecated()
70+
def vm_low_pri_count(self, value):
71+
self.size_low_priority = value
72+
5273
def mixed_mode(self) -> bool:
5374
"""
5475
Return:
@@ -81,3 +102,6 @@ def __validate__(self) -> bool:
81102

82103
if self.custom_scripts:
83104
deprecate("Custom scripts are DEPRECATED and will be removed in 0.8.0. Use plugins instead See https://aztk.readthedocs.io/en/v0.7.0/15-plugins.html")
105+
106+
if self.scheduling_target == SchedulingTarget.Dedicated and self.vm_count == 0:
107+
raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")

aztk/models/scheduling_target.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from enum import Enum
2+
3+
class SchedulingTarget(Enum):
4+
"""
5+
Target where task will get scheduled.
6+
For spark this is where the driver will live.
7+
"""
8+
9+
Master = "master"
10+
"""
11+
Only master is allowed to run task
12+
"""
13+
14+
Dedicated = "dedicated"
15+
"""
16+
Any dedicated node is allowed to run task(Default)
17+
"""
18+
19+
Any = "any"
20+
"""
21+
Any node(Not reconmmended if using low pri)
22+
"""

aztk/node_scripts/core/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .logger import log

aztk/node_scripts/core/config.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import azure.batch.batch_service_client as batch
55
import azure.storage.blob as blob
66
import azure.batch.batch_auth as batchauth
7+
from core import log
78
from azure.common.credentials import ServicePrincipalCredentials
89
from azure.mgmt.batch import BatchManagementClient
910
from azure.mgmt.storage import StorageManagementClient
@@ -28,7 +29,7 @@
2829
cluster_id = os.environ.get("AZTK_CLUSTER_ID")
2930
pool_id = os.environ["AZ_BATCH_POOL_ID"]
3031
node_id = os.environ["AZ_BATCH_NODE_ID"]
31-
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
32+
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"] == "true"
3233

3334
spark_web_ui_port = os.environ["SPARK_WEB_UI_PORT"]
3435
spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"]
@@ -87,7 +88,7 @@ def get_batch_client() -> batch.BatchServiceClient:
8788
batch_client = get_batch_client()
8889
blob_client = get_blob_client()
8990

90-
logging.info("Pool id is %s", pool_id)
91-
logging.info("Node id is %s", node_id)
92-
logging.info("Batch account name %s", batch_account_name)
93-
logging.info("Is dedicated %s", is_dedicated)
91+
log.info("Pool id is %s", pool_id)
92+
log.info("Node id is %s", node_id)
93+
log.info("Batch account name %s", batch_account_name)
94+
log.info("Is dedicated %s", is_dedicated)

aztk/node_scripts/core/logger.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import sys
2+
import logging
3+
4+
log = logging.getLogger("aztk.node-agent")
5+
6+
DEFAULT_FORMAT = '%(message)s'
7+
8+
def setup_logging():
9+
for handler in logging.root.handlers[:]:
10+
logging.root.removeHandler(handler)
11+
12+
log.setLevel(logging.INFO)
13+
logging.basicConfig(stream=sys.stdout, format=DEFAULT_FORMAT)

aztk/node_scripts/install/install.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import wait_until_master_selected
55
from aztk.models.plugins import PluginTarget
66
from aztk.internal import cluster_data
7+
from .node_scheduling import setup_node_scheduling
8+
79

810
def read_cluster_config():
911
data = cluster_data.ClusterData(config.blob_client, config.cluster_id)
@@ -41,6 +43,8 @@ def setup_host(docker_repo: str):
4143

4244
cluster_conf = read_cluster_config()
4345

46+
setup_node_scheduling(client, cluster_conf, is_master)
47+
4448
#TODO pass azure file shares
4549
spark_container.start_spark_container(
4650
docker_repo=docker_repo,
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
from azure import batch
2+
from aztk.models import ClusterConfiguration, SchedulingTarget
3+
from core import config, log
4+
5+
def disable_scheduling(batch_client: batch.BatchServiceClient):
6+
"""
7+
Disable scheduling for the current node
8+
"""
9+
pool_id = config.pool_id
10+
node_id = config.node_id
11+
12+
node = batch_client.compute_node.get(pool_id, node_id)
13+
if node.scheduling_state == batch.models.SchedulingState.enabled:
14+
batch_client.compute_node.disable_scheduling(pool_id, node_id)
15+
log.info("Disabled task scheduling for this node")
16+
else:
17+
log.info("Task scheduling is already disabled for this node")
18+
19+
def enable_scheduling(batch_client: batch.BatchServiceClient):
20+
"""
21+
Disable scheduling for the current node
22+
"""
23+
pool_id = config.pool_id
24+
node_id = config.node_id
25+
26+
node = batch_client.compute_node.get(pool_id, node_id)
27+
if node.scheduling_state == batch.models.SchedulingState.disabled:
28+
batch_client.compute_node.disable_scheduling(pool_id, node_id)
29+
log.info("Enabled task scheduling for this node")
30+
else:
31+
log.info("Task scheduling is already enabled for this node")
32+
33+
34+
def setup_node_scheduling(
35+
batch_client: batch.BatchServiceClient,
36+
cluster_config: ClusterConfiguration,
37+
is_master: bool):
38+
39+
is_dedicated = config.is_dedicated
40+
enable = False
41+
log.info("Resolving scheduling for this node")
42+
log.info(" Scheduling target: %s", cluster_config.scheduling_target and cluster_config.scheduling_target.value)
43+
log.info(" Is node dedicated: %s", str(is_dedicated))
44+
log.info(" Is node master: %s", str(is_master))
45+
46+
if cluster_config.scheduling_target == SchedulingTarget.Any or cluster_config.scheduling_target is None:
47+
enable = True
48+
elif cluster_config.scheduling_target == SchedulingTarget.Dedicated and is_dedicated:
49+
enable = True
50+
elif cluster_config.scheduling_target == SchedulingTarget.Master and is_master:
51+
enable = True
52+
53+
if enable:
54+
log.info("Scheduling will be enabled on this node as it satifies the right conditions")
55+
enable_scheduling(batch_client)
56+
else:
57+
log.info("Scheduling will be disabled on this node as it does NOT satifies the right conditions")
58+
disable_scheduling(batch_client)

aztk/node_scripts/main.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import sys
2-
import aztk.spark
32
from install import install
3+
from core import logger
4+
5+
46

57
def run():
68
if len(sys.argv) < 2:
@@ -18,4 +20,5 @@ def run():
1820

1921

2022
if __name__ == "__main__":
23+
logger.setup_logging()
2124
run()

aztk/spark/client.py

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ class Client(BaseClient):
2121
Args:
2222
secrets_config(aztk.spark.models.models.SecretsConfiguration): Configuration with all the needed credentials
2323
"""
24-
def __init__(self, secrets_config):
25-
super().__init__(secrets_config)
2624

2725
def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
2826
"""
@@ -35,6 +33,7 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool =
3533
Returns:
3634
aztk.spark.models.Cluster
3735
"""
36+
cluster_conf = _apply_default_for_cluster_config(cluster_conf)
3837
cluster_conf.validate()
3938

4039
cluster_data = self._get_cluster_data(cluster_conf.cluster_id)
@@ -200,8 +199,9 @@ def cluster_download(self, cluster_id: str, source_path: str, destination_path:
200199
'''
201200
job submission
202201
'''
203-
def submit_job(self, job_configuration):
202+
def submit_job(self, job_configuration: models.JobConfiguration):
204203
try:
204+
job_configuration = _apply_default_for_job_config(job_configuration)
205205
job_configuration.validate()
206206
cluster_data = self._get_cluster_data(job_configuration.id)
207207
node_data = NodeData(job_configuration.to_cluster_config()).add_core().done()
@@ -319,3 +319,23 @@ def run_cluster_diagnostics(self, cluster_id, output_directory):
319319
return output
320320
except batch_error.BatchErrorException as e:
321321
raise error.AztkError(helpers.format_batch_exception(e))
322+
323+
324+
def _default_scheduling_target(vm_count: int):
325+
if vm_count == 0:
326+
return models.SchedulingTarget.Any
327+
else:
328+
return models.SchedulingTarget.Dedicated
329+
330+
def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration):
331+
cluster_conf = models.ClusterConfiguration()
332+
cluster_conf.merge(configuration)
333+
if cluster_conf.scheduling_target is None:
334+
cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
335+
return cluster_conf
336+
337+
def _apply_default_for_job_config(job_conf: models.JobConfiguration):
338+
if job_conf.scheduling_target is None:
339+
job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)
340+
341+
return job_conf

0 commit comments

Comments
 (0)