Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit 8d00a2c

Browse files
authored
Feature: enable mixed mode for jobs (#442)
* enable mixed mode for jobs * simplify * add job configuration validation * whitespace
1 parent 9253aac commit 8d00a2c

File tree

4 files changed

+41
-15
lines changed

4 files changed

+41
-15
lines changed

aztk/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ def __submit_job(self,
300300
auto_scale_formula=autoscale_formula,
301301
auto_scale_evaluation_interval=timedelta(minutes=5),
302302
start_task=start_task,
303-
enable_inter_node_communication=True,
303+
enable_inter_node_communication=not job_configuration.mixed_mode(),
304304
network_configuration=network_conf,
305305
max_tasks_per_node=1,
306306
metadata=[

aztk/models/models.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ def merge(self, other):
5050
])
5151

5252

53-
5453
class ClusterConfiguration(ConfigurationBase):
5554
"""
5655
Cluster configuration model

aztk/spark/client.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,16 @@ def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str)
163163
'''
164164
def submit_job(self, job_configuration):
165165
try:
166+
job_configuration.validate()
166167
cluster_data = self._get_cluster_data(job_configuration.id)
167-
node_data = NodeData(job_configuration.as_cluster_config()).add_core().done()
168+
node_data = NodeData(job_configuration.to_cluster_config()).add_core().done()
168169
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
169170

170171
start_task = create_cluster_helper.generate_cluster_start_task(self,
171172
zip_resource_files,
172173
job_configuration.gpu_enabled,
173174
job_configuration.docker_repo,
175+
mixed_mode=job_configuration.mixed_mode(),
174176
worker_on_master=job_configuration.worker_on_master)
175177

176178
application_tasks = []
@@ -189,17 +191,10 @@ def submit_job(self, job_configuration):
189191
offer='UbuntuServer',
190192
sku='16.04')
191193

192-
if job_configuration.max_dedicated_nodes and not job_configuration.max_low_pri_nodes:
193-
autoscale_formula = "maxNumberofVMs = {0}; targetNumberofVMs = {1};" \
194-
" $TargetDedicatedNodes=min(maxNumberofVMs, targetNumberofVMs)".format(
195-
job_configuration.max_dedicated_nodes, job_configuration.max_dedicated_nodes)
196-
elif job_configuration.max_low_pri_nodes and not job_configuration.max_dedicated_nodes:
197-
autoscale_formula = "maxNumberofVMs = {0}; targetNumberofVMs = {1};" \
198-
" $TargetLowPriorityNodes=min(maxNumberofVMs, targetNumberofVMs)".format(
199-
job_configuration.max_low_pri_nodes, job_configuration.max_low_pri_nodes)
200-
else:
201-
raise error.AztkError("Jobs do not support both dedicated and low priority nodes." \
202-
" JobConfiguration fields max_dedicated_nodes and max_low_pri_nodes are mutually exclusive values.")
194+
autoscale_formula = "$TargetDedicatedNodes = {0}; " \
195+
"$TargetLowPriorityNodes = {1}".format(
196+
job_configuration.max_dedicated_nodes,
197+
job_configuration.max_low_pri_nodes)
203198

204199
job = self.__submit_job(
205200
job_configuration=job_configuration,

aztk/spark/models/models.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from Crypto.PublicKey import RSA
33
from typing import List
44
import aztk.models
5+
from aztk import error
56
from aztk.utils import constants, helpers
67
import azure.batch.models as batch_models
78

@@ -222,14 +223,45 @@ def __init__(
222223
self.subnet_id = subnet_id
223224
self.worker_on_master = worker_on_master
224225

225-
def as_cluster_config(self):
226+
def to_cluster_config(self):
226227
return ClusterConfiguration(
227228
cluster_id = self.id,
228229
custom_scripts = self.custom_scripts,
230+
docker_repo=self.docker_repo,
229231
vm_size=self.vm_size,
232+
vm_count=self.max_dedicated_nodes,
233+
vm_low_pri_count=self.max_low_pri_nodes,
234+
subnet_id=self.subnet_id,
235+
worker_on_master=self.worker_on_master,
230236
spark_configuration=self.spark_configuration,
231237
)
232238

239+
def mixed_mode(self) -> bool:
240+
return self.max_dedicated_nodes > 0 and self.max_low_pri_nodes > 0
241+
242+
def validate(self) -> bool:
243+
"""
244+
Validate the config at its current state.
245+
Raises: Error if invalid
246+
"""
247+
if self.id is None:
248+
raise error.AztkError("Please supply an ID for the Job in your configuration.")
249+
250+
if self.max_dedicated_nodes == 0 and self.max_low_pri_nodes == 0:
251+
raise error.AztkError(
252+
"Please supply a valid (greater than 0) value for either max_dedicated_nodes or max_low_pri_nodes in your configuration."
253+
)
254+
255+
if self.vm_size is None:
256+
raise error.AztkError(
257+
"Please supply a vm_size in your configuration."
258+
)
259+
260+
if self.mixed_mode() and not self.subnet_id:
261+
raise error.AztkError(
262+
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes) and pass the subnet_id in your configuration.."
263+
)
264+
233265

234266
class JobState():
235267
complete = 'completed'

0 commit comments

Comments
 (0)