Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit de78983

Browse files
Feature: Plugin V2: Running plugin on host (#461)
1 parent 12450fb commit de78983

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+481
-230
lines changed

.vscode/settings.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,7 @@
1414
"python.formatting.provider": "yapf",
1515
"python.formatting.yapfArgs": [
1616
"--style=.style.yapf"
17-
]
17+
],
18+
"python.venvPath": "${workspaceFolder}/ENV",
19+
"python.pythonPath": "${workspaceFolder}\\ENV\\Scripts\\python.exe"
1820
}

aztk/error.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77

88
class AztkError(Exception):
9-
def __init__(self, message: str = None):
9+
def __init__(self, message: str=None):
1010
super().__init__(message)
1111

1212
class ClusterNotReadyError(AztkError):

aztk/internal/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
"""
44

55
from .configuration_base import *
6+
from .docker_cmd import *

aztk/internal/cluster_data/node_data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
import fnmatch
22
import io
3-
import json
43
import os
5-
import yaml
64
import zipfile
75
from pathlib import Path
86
from typing import List
7+
import yaml
98
from aztk.spark import models
109
from aztk.utils import constants, file_utils, secure_utils
1110
from aztk.error import InvalidCustomScriptError
@@ -147,7 +146,8 @@ def _add_plugins(self):
147146
execute='{0}/{1}'.format(plugin.name, plugin.execute),
148147
args=plugin.args,
149148
env=plugin.env,
150-
runOn=plugin.run_on.value,
149+
target=plugin.target.value,
150+
target_role=plugin.target_role.value,
151151
))
152152

153153
self.zipf.writestr(os.path.join('plugins', 'plugins-manifest.yaml'), yaml.dump(data))

aztk/internal/docker_cmd.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import os
2+
from aztk.utils.command_builder import CommandBuilder
3+
4+
class DockerCmd:
5+
"""
6+
Class helping to write a docker command
7+
"""
8+
9+
def __init__(self, name: str, docker_repo: str, cmd: str, gpu_enabled=False):
10+
if gpu_enabled:
11+
self.cmd = CommandBuilder('nvidia-docker run')
12+
else:
13+
self.cmd = CommandBuilder('docker run')
14+
self.cmd.add_option('--net', 'host')
15+
self.cmd.add_option('--name', name)
16+
self.cmd.add_argument('-d')
17+
self.cmd.add_argument(docker_repo)
18+
self.cmd.add_argument(cmd)
19+
20+
21+
def add_env(self, env: str, value: str):
22+
self.cmd.add_option('-e', '{0}={1}'.format(env, value))
23+
24+
def pass_env(self, env: str):
25+
"""
26+
Give the value of an environment variable in the main process to the docker image
27+
"""
28+
self.cmd.add_option('-e', '{0}'.format(env))
29+
30+
def share_folder(self, folder: str):
31+
self.cmd.add_option('-v', '{0}:{0}'.format(folder))
32+
33+
def open_port(self, port: int):
34+
self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI
35+
36+
37+
def to_str(self):
38+
return self.cmd.to_str()

aztk/models/plugins/internal/plugin_manager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class PluginManager:
2020
jupyterlab=plugins.JupyterLabPlugin,
2121
rstudio_server=plugins.RStudioServerPlugin,
2222
hdfs=plugins.HDFSPlugin,
23+
simple=plugins.SimplePlugin,
2324
spark_ui_proxy=plugins.SparkUIProxyPlugin,
2425
)
2526

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
1-
import inspect
2-
from typing import List, Union
31
from enum import Enum
4-
from .plugin_file import PluginFile
2+
from typing import List, Union
53
from aztk.internal import ConfigurationBase
4+
from aztk.error import InvalidPluginConfigurationError
5+
from .plugin_file import PluginFile
6+
7+
class PluginTarget(Enum):
8+
"""
9+
Where this plugin should run
10+
"""
11+
SparkContainer = "spark-container",
12+
Host = "host",
13+
14+
15+
class PluginTargetRole(Enum):
16+
Master = "master"
17+
Worker = "worker"
18+
All = "all-nodes"
19+
20+
621

722
class PluginPort:
823
"""
@@ -12,8 +27,7 @@ class PluginPort:
1227
:param name: [Optional] name to differentiate ports if you have multiple
1328
"""
1429

15-
def __init__(self, internal: int, public: Union[int, bool] = False, name=None):
16-
30+
def __init__(self, internal: int, public: Union[int, bool]=False, name=None):
1731
self.internal = internal
1832
self.expose_publicly = bool(public)
1933
self.public_port = None
@@ -26,11 +40,6 @@ def __init__(self, internal: int, public: Union[int, bool] = False, name=None):
2640
self.name = name
2741

2842

29-
class PluginRunTarget(Enum):
30-
Master = "master"
31-
Worker = "worker"
32-
All = "all-nodes"
33-
3443

3544

3645
class PluginConfiguration(ConfigurationBase):
@@ -45,15 +54,17 @@ class PluginConfiguration(ConfigurationBase):
4554

4655
def __init__(self,
4756
name: str,
48-
ports: List[PluginPort] = None,
49-
files: List[PluginFile] = None,
50-
execute: str = None,
57+
ports: List[PluginPort]=None,
58+
files: List[PluginFile]=None,
59+
execute: str=None,
5160
args=None,
5261
env=None,
53-
run_on: PluginRunTarget = PluginRunTarget.Master):
62+
target_role: PluginTargetRole=PluginTargetRole.Master,
63+
target: PluginTarget=PluginTarget.SparkContainer):
5464
self.name = name
5565
# self.docker_image = docker_image
56-
self.run_on = run_on
66+
self.target = target
67+
self.target_role = target_role
5768
self.ports = ports or []
5869
self.files = files or []
5970
self.args = args or []
@@ -64,11 +75,18 @@ def has_arg(self, name: str):
6475
for x in self.args:
6576
if x.name == name:
6677
return True
67-
else:
68-
return False
78+
return False
6979

7080
def validate(self):
7181
self._validate_required([
7282
"name",
7383
"execute",
7484
])
85+
86+
if not isinstance(self.target, PluginTarget):
87+
raise InvalidPluginConfigurationError(
88+
"Target must be of type Plugin target but was {0}".format(self.target))
89+
90+
if not isinstance(self.target_role, PluginTargetRole):
91+
raise InvalidPluginConfigurationError(
92+
"Target role must be of type Plugin target role but was {0}".format(self.target))

aztk/node_scripts/core/config.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
'/providers/[^/]+'
1717
'/[^/]+Accounts/(?P<account>[^/]+)$')
1818

19-
batch_account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
20-
batch_account_key = os.environ["BATCH_ACCOUNT_KEY"]
21-
batch_service_url = os.environ["BATCH_SERVICE_URL"]
22-
tenant_id = os.environ["SP_TENANT_ID"]
23-
client_id = os.environ["SP_CLIENT_ID"]
24-
credential = os.environ["SP_CREDENTIAL"]
25-
batch_resource_id = os.environ["SP_BATCH_RESOURCE_ID"]
26-
storage_resource_id = os.environ["SP_STORAGE_RESOURCE_ID"]
19+
batch_account_name = os.environ.get("AZ_BATCH_ACCOUNT_NAME")
20+
batch_account_key = os.environ.get("BATCH_ACCOUNT_KEY")
21+
batch_service_url = os.environ.get("BATCH_SERVICE_URL")
22+
tenant_id = os.environ.get("SP_TENANT_ID")
23+
client_id = os.environ.get("SP_CLIENT_ID")
24+
credential = os.environ.get("SP_CREDENTIAL")
25+
batch_resource_id = os.environ.get("SP_BATCH_RESOURCE_ID")
26+
storage_resource_id = os.environ.get("SP_STORAGE_RESOURCE_ID")
2727

2828
pool_id = os.environ["AZ_BATCH_POOL_ID"]
2929
node_id = os.environ["AZ_BATCH_NODE_ID"]
@@ -33,9 +33,9 @@
3333
spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"]
3434
spark_job_ui_port = os.environ["SPARK_JOB_UI_PORT"]
3535

36-
storage_account_name = os.environ["STORAGE_ACCOUNT_NAME"]
37-
storage_account_key = os.environ["STORAGE_ACCOUNT_KEY"]
38-
storage_account_suffix = os.environ["STORAGE_ACCOUNT_SUFFIX"]
36+
storage_account_name = os.environ.get("STORAGE_ACCOUNT_NAME")
37+
storage_account_key = os.environ.get("STORAGE_ACCOUNT_KEY")
38+
storage_account_suffix = os.environ.get("STORAGE_ACCOUNT_SUFFIX")
3939

4040
def get_blob_client() -> blob.BlockBlobService:
4141
if not storage_resource_id:

aztk/node_scripts/docker_main.sh

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33
# This file is the entry point of the docker container.
44

55
set -e
6+
echo "Initializing spark container"
67

78
# --------------------
89
# Setup custom scripts
910
# --------------------
10-
custom_script_dir=$DOCKER_WORKING_DIR/custom-scripts
11-
aztk_dir=$DOCKER_WORKING_DIR/aztk
11+
custom_script_dir=$AZTK_WORKING_DIR/custom-scripts
12+
aztk_dir=$AZTK_WORKING_DIR/aztk
1213

1314
# -----------------------
1415
# Preload jupyter samples
@@ -28,11 +29,11 @@ done
2829
echo "Starting setup using Docker"
2930

3031
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/pip install -r $(dirname $0)/requirements.txt
31-
export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR
32-
echo 'export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR' >> ~/.bashrc
32+
export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR
33+
echo 'export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR' >> ~/.bashrc
3334

3435
echo "Running main.py script"
35-
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py install
36+
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py setup-spark-container
3637

3738
# sleep to keep container running
3839
while true; do sleep 1; done

aztk/node_scripts/install/create_user.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
from datetime import datetime, timezone, timedelta
77
import yaml
88
'''
9-
Creates a user if the user configuration file at $DOCKER_WORKING_DIR/user.yaml exists
9+
Creates a user if the user configuration file at $AZTK_WORKING_DIR/user.yaml exists
1010
'''
1111

1212
def create_user(batch_client):
13-
path = os.path.join(os.environ['DOCKER_WORKING_DIR'], "user.yaml")
13+
path = os.path.join(os.environ['AZTK_WORKING_DIR'], "user.yaml")
1414

1515
if not os.path.isfile(path):
1616
print("No user to create.")
@@ -43,7 +43,7 @@ def decrypt_password(user_conf):
4343
tag = user_conf['tag']
4444

4545
# Read private key
46-
with open(os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa'), encoding='UTF-8') as f:
46+
with open(os.path.join(os.environ['AZTK_WORKING_DIR'], 'id_rsa'), encoding='UTF-8') as f:
4747
private_key = RSA.import_key(f.read())
4848
# Decrypt the session key with the public RSA key
4949
cipher_rsa = PKCS1_OAEP.new(private_key)

0 commit comments

Comments
 (0)