Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit 2bed496

Browse files
Internal: Cluster data helpers and upload_node_script into cluster_data module (#401)
1 parent c237501 commit 2bed496

File tree

15 files changed

+323
-223
lines changed

15 files changed

+323
-223
lines changed

.vscode/settings.json

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
{
2-
// "python.linting.pylintEnabled": false,
3-
"search.exclude": {
4-
"build/**": true,
5-
"bin/**": true
6-
},
7-
"files.exclude": {
8-
"**/__pycache__": true,
9-
"*.egg-info": true,
10-
},
11-
"python.autoComplete.extraPaths": [
12-
"${workspaceRoot}/node_scripts"
13-
],
14-
"python.formatting.provider": "yapf"
2+
// "python.linting.pylintEnabled": false,
3+
"search.exclude": {
4+
"build/**": true,
5+
"bin/**": true
6+
},
7+
"files.exclude": {
8+
"**/__pycache__": true,
9+
"*.egg-info": true,
10+
},
11+
"python.autoComplete.extraPaths": [
12+
"${workspaceRoot}/node_scripts"
13+
],
14+
"python.formatting.provider": "yapf",
15+
"python.formatting.yapfArgs": [
16+
"--style=.style.yapf"
17+
]
1518
}

aztk/client.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import azure.batch.models as batch_models
1717
from azure.batch.models import batch_error
1818
from Crypto.PublicKey import RSA
19-
19+
from aztk.internal import cluster_data
2020

2121
class Client:
2222
def __init__(self, secrets_config: models.SecretsConfiguration):
@@ -26,6 +26,14 @@ def __init__(self, secrets_config: models.SecretsConfiguration):
2626
self.batch_client = azure_api.make_batch_client(secrets_config)
2727
self.blob_client = azure_api.make_blob_client(secrets_config)
2828

29+
def get_cluster_config(self, cluster_id: str) -> models.ClusterConfiguration:
30+
return self._get_cluster_data(cluster_id).read_cluster_config()
31+
32+
def _get_cluster_data(self, cluster_id: str) -> cluster_data.ClusterData:
33+
"""
34+
Returns ClusterData object to manage data related to the given cluster id
35+
"""
36+
return cluster_data.ClusterData(self.blob_client, cluster_id)
2937

3038
'''
3139
General Batch Operations
@@ -66,7 +74,7 @@ def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, softw
6674
:param VmImageModel: the type of image to provision for the cluster
6775
:param wait: wait until the cluster is ready
6876
"""
69-
helpers.save_cluster_config(cluster_conf, self.blob_client)
77+
self._get_cluster_data(cluster_conf.cluster_id).save_cluster_config(cluster_conf)
7078
# reuse pool_id as job_id
7179
pool_id = cluster_conf.cluster_id
7280
job_id = cluster_conf.cluster_id
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .blob_data import *
2+
from .node_data import *
3+
from .cluster_data import *
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import azure.batch.models as batch_models
2+
import datetime
3+
from azure.storage.blob import BlockBlobService, BlobPermissions
4+
5+
class BlobData:
6+
"""
7+
Object mapping to a blob entry. Can generate resource files for batch
8+
"""
9+
def __init__(self, blob_client: BlockBlobService, container: str, blob: str):
10+
self.container = container
11+
self.blob = blob
12+
self.dest = blob
13+
self.blob_client = blob_client
14+
15+
16+
def to_resource_file(self, dest: str = None) -> batch_models.ResourceFile:
17+
sas_token = self.blob_client.generate_blob_shared_access_signature(
18+
self.container,
19+
self.blob,
20+
permission=BlobPermissions.READ,
21+
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365))
22+
23+
sas_url = self.blob_client.make_blob_url(
24+
self.container, self.blob, sas_token=sas_token)
25+
26+
return batch_models.ResourceFile(file_path=dest or self.dest, blob_source=sas_url)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import yaml
2+
import logging
3+
import azure.common
4+
from azure.storage.blob import BlockBlobService
5+
from .node_data import NodeData
6+
from .blob_data import BlobData
7+
8+
9+
class ClusterData:
10+
"""
11+
Class handling the management of data for a cluster
12+
"""
13+
# ALl data related to cluster(config, metadata, etc.) should be under this folder
14+
CLUSTER_DIR = "cluster"
15+
APPLICATIONS_DIR = "applications"
16+
CLUSTER_CONFIG_FILE = "config.yaml"
17+
18+
def __init__(self, blob_client: BlockBlobService, cluster_id: str):
19+
self.blob_client = blob_client
20+
self.cluster_id = cluster_id
21+
self._ensure_container()
22+
23+
def save_cluster_config(self, cluster_config):
24+
blob_path = self.CLUSTER_DIR + "/" + self.CLUSTER_CONFIG_FILE
25+
content = yaml.dump(cluster_config)
26+
container_name = cluster_config.cluster_id
27+
self.blob_client.create_blob_from_text(container_name, blob_path, content)
28+
29+
def read_cluster_config(self):
30+
blob_path = self.CLUSTER_DIR + "/" + self.CLUSTER_CONFIG_FILE
31+
try:
32+
result = self.blob_client.get_blob_to_text(self.cluster_id, blob_path)
33+
return yaml.load(result.content)
34+
except azure.common.AzureMissingResourceHttpError:
35+
logging.warn("Cluster %s doesn't have cluster configuration in storage", self.cluster_id)
36+
except yaml.YAMLError:
37+
logging.warn("Cluster %s contains invalid cluster configuration in blob", self.cluster_id)
38+
39+
def upload_file(self, blob_path: str, local_path: str) -> BlobData:
40+
self.blob_client.create_blob_from_path(self.cluster_id, blob_path, local_path)
41+
return BlobData(self.blob_client, self.cluster_id, blob_path)
42+
43+
def upload_cluster_file(self, blob_path: str, local_path: str) -> BlobData:
44+
blob_data = self.upload_file(self.CLUSTER_DIR + "/" + blob_path, local_path)
45+
blob_data.dest = blob_path
46+
return blob_data
47+
48+
def upload_application_file(self, blob_path: str, local_path: str) -> BlobData:
49+
blob_data = self.upload_file(self.APPLICATIONS_DIR + "/" + blob_path, local_path)
50+
blob_data.dest = blob_path
51+
return blob_data
52+
53+
def upload_node_data(self, node_data: NodeData) -> BlobData:
54+
return self.upload_cluster_file("node-scripts.zip", node_data.zip_path)
55+
56+
def _ensure_container(self):
57+
self.blob_client.create_container(self.cluster_id, fail_on_exist=False)
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import fnmatch
2+
import io
3+
import json
4+
import os
5+
import yaml
6+
import zipfile
7+
from pathlib import Path
8+
from typing import List
9+
from aztk.spark import models
10+
from aztk.utils import constants, file_utils, secure_utils
11+
from aztk.error import InvalidCustomScriptError
12+
13+
ROOT_PATH = constants.ROOT_PATH
14+
15+
# Constants for node data
16+
NODE_SCRIPT_FOLDER = "node_scripts"
17+
CUSTOM_SCRIPT_FOLDER = "custom-scripts"
18+
CUSTOM_SCRIPT_METADATA_FILE = "custom-scripts.yaml"
19+
PLUGIN_FOLDER = "plugins"
20+
21+
22+
class NodeData:
23+
"""
24+
Class made to bundle data to be uploaded to the node as a zip
25+
"""
26+
27+
def __init__(self, cluster_config: models.ClusterConfiguration):
28+
self.zip_path = os.path.join(ROOT_PATH, "tmp/node-scripts.zip")
29+
self.cluster_config = cluster_config
30+
file_utils.ensure_dir(self.zip_path)
31+
self.zipf = zipfile.ZipFile(self.zip_path, "w", zipfile.ZIP_DEFLATED)
32+
33+
def add_core(self):
34+
self._add_node_scripts()
35+
self._add_custom_scripts()
36+
self._add_plugins()
37+
self._add_spark_configuration()
38+
self._add_user_conf()
39+
self.add_file(os.path.join(constants.ROOT_PATH, 'aztk', 'utils', 'command_builder.py'), '', binary=False)
40+
return self
41+
42+
def done(self):
43+
self.zipf.close()
44+
return self
45+
46+
def add_file(self, file: str, zip_dir: str, binary: bool = True):
47+
if not file:
48+
return
49+
if isinstance(file, (str, bytes)):
50+
full_file_path = Path(file)
51+
with io.open(file, 'r') as f:
52+
if binary:
53+
self.zipf.write(file, os.path.join(zip_dir, full_file_path.name))
54+
else:
55+
self.zipf.writestr(os.path.join(zip_dir, full_file_path.name), f.read().replace('\r\n', '\n'))
56+
elif isinstance(file, models.File):
57+
self.zipf.writestr(os.path.join(zip_dir, file.name), file.payload.getvalue())
58+
59+
def add_files(self, file_paths: List[str], zip_dir, binary: bool = True):
60+
"""
61+
Add a list of local files to the node data
62+
"""
63+
for file in file_paths:
64+
self.add_file(file, zip_dir, binary)
65+
66+
def add_dir(self, path: str, exclude: List[str] = []):
67+
"""
68+
Zip all the files in the given directory into the zip file handler
69+
"""
70+
for base, _, files in os.walk(path):
71+
relative_folder = os.path.relpath(base, path)
72+
for file in files:
73+
if self._includeFile(file, exclude):
74+
with io.open(os.path.join(base, file), 'r') as f:
75+
self.zipf.writestr(os.path.join(relative_folder, file), f.read().replace('\r\n', '\n'))
76+
77+
def _add_custom_scripts(self):
78+
data = []
79+
if not self.cluster_config.custom_scripts:
80+
return
81+
82+
for index, custom_script in enumerate(self.cluster_config.custom_scripts):
83+
if isinstance(custom_script.script, (str, bytes)):
84+
new_file_name = str(index) + '_' + os.path.basename(custom_script.script)
85+
data.append(dict(script=new_file_name, runOn=str(custom_script.run_on)))
86+
try:
87+
with io.open(custom_script.script, 'r') as f:
88+
self.zipf.writestr(
89+
os.path.join(CUSTOM_SCRIPT_FOLDER, new_file_name),
90+
f.read().replace('\r\n', '\n'))
91+
except FileNotFoundError:
92+
raise InvalidCustomScriptError("Custom script '{0}' doesn't exists.".format(custom_script.script))
93+
elif isinstance(custom_script.script, models.File):
94+
new_file_name = str(index) + '_' + custom_script.script.name
95+
self.zipf.writestr(os.path.join('custom-scripts', new_file_name), custom_script.script.payload.getvalue())
96+
97+
self.zipf.writestr(
98+
os.path.join(CUSTOM_SCRIPT_FOLDER, CUSTOM_SCRIPT_METADATA_FILE), yaml.dump(data, default_flow_style=False))
99+
100+
def _add_spark_configuration(self):
101+
spark_configuration = self.cluster_config.spark_configuration
102+
if not spark_configuration:
103+
return
104+
self.add_files(
105+
[
106+
spark_configuration.spark_defaults_conf, spark_configuration.spark_env_sh,
107+
spark_configuration.core_site_xml
108+
],
109+
'conf',
110+
binary=False)
111+
112+
# add ssh keys for passwordless ssh
113+
self.zipf.writestr( 'id_rsa.pub', spark_configuration.ssh_key_pair['pub_key'])
114+
self.zipf.writestr( 'id_rsa', spark_configuration.ssh_key_pair['priv_key'])
115+
116+
if spark_configuration.jars:
117+
for jar in spark_configuration.jars:
118+
self.add_file(jar, 'jars', binary=True)
119+
120+
def _add_user_conf(self):
121+
user_conf = self.cluster_config.user_configuration
122+
if not user_conf:
123+
return
124+
encrypted_aes_session_key, cipher_aes_nonce, tag, ciphertext = secure_utils.encrypt_password(
125+
self.cluster_config.spark_configuration.ssh_key_pair['pub_key'], user_conf.password)
126+
user_conf = yaml.dump({
127+
'username': user_conf.username,
128+
'password': ciphertext,
129+
'ssh-key': user_conf.ssh_key,
130+
'aes_session_key': encrypted_aes_session_key,
131+
'cipher_aes_nonce': cipher_aes_nonce,
132+
'tag': tag,
133+
'cluster_id': self.cluster_config.cluster_id
134+
})
135+
self.zipf.writestr('user.yaml', user_conf)
136+
137+
def _add_plugins(self):
138+
if not self.cluster_config.plugins:
139+
return
140+
141+
data = []
142+
for plugin in self.cluster_config.plugins:
143+
for file in plugin.files:
144+
zipf = self.zipf.writestr('plugins/{0}/{1}'.format(plugin.name, file.target), file.content())
145+
if plugin.execute:
146+
data.append(dict(
147+
name=plugin.name,
148+
execute='{0}/{1}'.format(plugin.name, plugin.execute),
149+
args=plugin.args,
150+
env=plugin.env,
151+
runOn=plugin.run_on.value,
152+
))
153+
154+
self.zipf.writestr(os.path.join('plugins', 'plugins-manifest.yaml'), yaml.dump(data))
155+
return zipf
156+
157+
def _add_node_scripts(self):
158+
self.add_dir(os.path.join(ROOT_PATH, NODE_SCRIPT_FOLDER), exclude=['*.pyc'])
159+
160+
def _includeFile(self, filename: str, exclude: List[str] = []) -> bool:
161+
for pattern in exclude:
162+
if fnmatch.fnmatch(filename, pattern):
163+
return False
164+
165+
return True

aztk/spark/client.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from aztk.spark.helpers import submit as cluster_submit_helper
1010
from aztk.spark.helpers import job_submission as job_submit_helper
1111
from aztk.spark.helpers import get_log as get_log_helper
12-
from aztk.spark.utils import upload_node_scripts, util
12+
from aztk.spark.utils import util
13+
from aztk.internal.cluster_data import NodeData
1314
import yaml
1415

1516

@@ -22,14 +23,11 @@ def __init__(self, secrets_config):
2223
'''
2324
def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
2425
cluster_conf.validate()
25-
26+
cluster_data = self._get_cluster_data(cluster_conf.cluster_id)
2627
try:
27-
zip_resource_files = upload_node_scripts.zip_scripts(self.blob_client,
28-
cluster_conf.cluster_id,
29-
cluster_conf.custom_scripts,
30-
cluster_conf.spark_configuration,
31-
cluster_conf.user_configuration,
32-
cluster_conf.plugins)
28+
zip_resource_files = None
29+
node_data = NodeData(cluster_conf).add_core().done()
30+
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
3331

3432
start_task = create_cluster_helper.generate_cluster_start_task(self,
3533
zip_resource_files,
@@ -165,10 +163,9 @@ def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str)
165163
'''
166164
def submit_job(self, job_configuration):
167165
try:
168-
zip_resource_files = upload_node_scripts.zip_scripts(self.blob_client,
169-
job_configuration.id,
170-
job_configuration.custom_scripts,
171-
job_configuration.spark_configuration)
166+
cluster_data = self._get_cluster_data(job_configuration.id)
167+
node_data = NodeData(job_configuration.as_cluster_config()).add_core().done()
168+
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
172169

173170
start_task = create_cluster_helper.generate_cluster_start_task(self,
174171
zip_resource_files,

aztk/spark/models/models.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,14 @@ def __init__(
222222
self.subnet_id = subnet_id
223223
self.worker_on_master = worker_on_master
224224

225+
def as_cluster_config(self):
226+
return ClusterConfiguration(
227+
cluster_id = self.id,
228+
custom_scripts = self.custom_scripts,
229+
vm_size=self.vm_size,
230+
spark_configuration=self.spark_configuration,
231+
)
232+
225233

226234
class JobState():
227235
complete = 'completed'

0 commit comments

Comments
 (0)