Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
024c304
Wip
timotheeguerin Mar 22, 2018
331aa19
Fix issues
timotheeguerin Mar 23, 2018
9809d91
more tweaks
timotheeguerin Mar 23, 2018
ee307e0
FIx more
timotheeguerin Mar 23, 2018
373c04a
More env renaming
timotheeguerin Mar 23, 2018
77c82ee
Start
timotheeguerin Mar 23, 2018
578657c
Docker runs now
timotheeguerin Mar 23, 2018
8cee57b
Wip docker run on node
timotheeguerin Mar 23, 2018
58e06db
fix issues
timotheeguerin Mar 23, 2018
c77724b
More fix
timotheeguerin Mar 23, 2018
7eece21
Starts
timotheeguerin Mar 26, 2018
ff7d310
Starting spark again
timotheeguerin Mar 26, 2018
d1e7382
Works
timotheeguerin Mar 26, 2018
fdaafca
Fix
timotheeguerin Mar 26, 2018
3ecd48c
More fixes
timotheeguerin Mar 26, 2018
ef60f2c
Running plugins on the host works
timotheeguerin Mar 26, 2018
2ba79fb
tweak
timotheeguerin Mar 26, 2018
6f44148
Merge master
timotheeguerin Mar 26, 2018
a2c5d0d
Fix: tests
timotheeguerin Mar 27, 2018
35ff863
more
timotheeguerin Apr 5, 2018
0f06d69
Define plugin docs
timotheeguerin Apr 5, 2018
7f1e77e
Added types
timotheeguerin Apr 5, 2018
a6c08c4
Fix jupyterlab
timotheeguerin Apr 5, 2018
90aceae
merge master
timotheeguerin Apr 13, 2018
f8c0d74
fix merge issue
timotheeguerin Apr 13, 2018
d9938f1
Added test for invalid target and target role
timotheeguerin Apr 18, 2018
95e8aff
Fix pylint
timotheeguerin Apr 18, 2018
58e6962
Rename
timotheeguerin Apr 18, 2018
b519bc7
Added docs for debug plugins
timotheeguerin Apr 18, 2018
66e1205
Merge master
timotheeguerin Apr 23, 2018
00952ea
merge
timotheeguerin Apr 23, 2018
404159b
Fix breaking change in dependency
timotheeguerin Apr 23, 2018
b6a9de3
fix master
timotheeguerin Apr 23, 2018
ba0c22d
fix
timotheeguerin Apr 23, 2018
bd4a9f6
Merge branch 'master' into feature/pluginv2
timotheeguerin Apr 24, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@
"python.formatting.provider": "yapf",
"python.formatting.yapfArgs": [
"--style=.style.yapf"
]
],
"python.venvPath": "${workspaceFolder}/ENV",
"python.pythonPath": "${workspaceFolder}\\ENV\\Scripts\\python.exe"
}
2 changes: 1 addition & 1 deletion aztk/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


class AztkError(Exception):
def __init__(self, message: str = None):
def __init__(self, message: str=None):
super().__init__(message)

class ClusterNotReadyError(AztkError):
Expand Down
1 change: 1 addition & 0 deletions aztk/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
"""

from .configuration_base import *
from .docker_cmd import *
6 changes: 3 additions & 3 deletions aztk/internal/cluster_data/node_data.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import fnmatch
import io
import json
import os
import yaml
import zipfile
from pathlib import Path
from typing import List
import yaml
from aztk.spark import models
from aztk.utils import constants, file_utils, secure_utils
from aztk.error import InvalidCustomScriptError
Expand Down Expand Up @@ -147,7 +146,8 @@ def _add_plugins(self):
execute='{0}/{1}'.format(plugin.name, plugin.execute),
args=plugin.args,
env=plugin.env,
runOn=plugin.run_on.value,
target=plugin.target.value,
target_role=plugin.target_role.value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. All existing plugins use the runOn field which breaks them. This change should update all existing plusings and maybe deprecate runOn instead of remove it for now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since plugins don't exist in the latest release (v0.6.0), this shouldn't be a breaking change. I think going forward we should limit deprecation to the cross release changes.

Anyone using plugins today is using master, and should not expect the same level of stability as if they were upgrading from a previous minor release.

))

self.zipf.writestr(os.path.join('plugins', 'plugins-manifest.yaml'), yaml.dump(data))
Expand Down
38 changes: 38 additions & 0 deletions aztk/internal/docker_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os
from aztk.utils.command_builder import CommandBuilder

class DockerCmd:
"""
Class helping to write a docker command
"""

def __init__(self, name: str, docker_repo: str, cmd: str, gpu_enabled=False):
if gpu_enabled:
self.cmd = CommandBuilder('nvidia-docker run')
else:
self.cmd = CommandBuilder('docker run')
self.cmd.add_option('--net', 'host')
self.cmd.add_option('--name', name)
self.cmd.add_argument('-d')
self.cmd.add_argument(docker_repo)
self.cmd.add_argument(cmd)


def add_env(self, env: str, value: str):
self.cmd.add_option('-e', '{0}={1}'.format(env, value))

def pass_env(self, env: str):
"""
Give the value of an environment variable in the main process to the docker image
"""
self.cmd.add_option('-e', '{0}'.format(env))

def share_folder(self, folder: str):
self.cmd.add_option('-v', '{0}:{0}'.format(folder))

def open_port(self, port: int):
self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI


def to_str(self):
return self.cmd.to_str()
1 change: 1 addition & 0 deletions aztk/models/plugins/internal/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PluginManager:
jupyterlab=plugins.JupyterLabPlugin,
rstudio_server=plugins.RStudioServerPlugin,
hdfs=plugins.HDFSPlugin,
simple=plugins.SimplePlugin,
spark_ui_proxy=plugins.SparkUIProxyPlugin,
)

Expand Down
52 changes: 35 additions & 17 deletions aztk/models/plugins/plugin_configuration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
import inspect
from typing import List, Union
from enum import Enum
from .plugin_file import PluginFile
from typing import List, Union
from aztk.internal import ConfigurationBase
from aztk.error import InvalidPluginConfigurationError
from .plugin_file import PluginFile

class PluginTarget(Enum):
"""
Where this plugin should run
"""
SparkContainer = "spark-container",
Host = "host",


class PluginTargetRole(Enum):
Master = "master"
Worker = "worker"
All = "all-nodes"



class PluginPort:
"""
Expand All @@ -12,8 +27,7 @@ class PluginPort:
:param name: [Optional] name to differentiate ports if you have multiple
"""

def __init__(self, internal: int, public: Union[int, bool] = False, name=None):

def __init__(self, internal: int, public: Union[int, bool]=False, name=None):
self.internal = internal
self.expose_publicly = bool(public)
self.public_port = None
Expand All @@ -26,11 +40,6 @@ def __init__(self, internal: int, public: Union[int, bool] = False, name=None):
self.name = name


class PluginRunTarget(Enum):
Master = "master"
Worker = "worker"
All = "all-nodes"



class PluginConfiguration(ConfigurationBase):
Expand All @@ -45,15 +54,17 @@ class PluginConfiguration(ConfigurationBase):

def __init__(self,
name: str,
ports: List[PluginPort] = None,
files: List[PluginFile] = None,
execute: str = None,
ports: List[PluginPort]=None,
files: List[PluginFile]=None,
execute: str=None,
args=None,
env=None,
run_on: PluginRunTarget = PluginRunTarget.Master):
target_role: PluginTargetRole=PluginTargetRole.Master,
target: PluginTarget=PluginTarget.SparkContainer):
self.name = name
# self.docker_image = docker_image
self.run_on = run_on
self.target = target
self.target_role = target_role
self.ports = ports or []
self.files = files or []
self.args = args or []
Expand All @@ -64,11 +75,18 @@ def has_arg(self, name: str):
for x in self.args:
if x.name == name:
return True
else:
return False
return False

def validate(self):
self._validate_required([
"name",
"execute",
])

if not isinstance(self.target, PluginTarget):
raise InvalidPluginConfigurationError(
"Target must be of type Plugin target but was {0}".format(self.target))

if not isinstance(self.target_role, PluginTargetRole):
raise InvalidPluginConfigurationError(
"Target role must be of type Plugin target role but was {0}".format(self.target))
22 changes: 11 additions & 11 deletions aztk/node_scripts/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
'/providers/[^/]+'
'/[^/]+Accounts/(?P<account>[^/]+)$')

batch_account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
batch_account_key = os.environ["BATCH_ACCOUNT_KEY"]
batch_service_url = os.environ["BATCH_SERVICE_URL"]
tenant_id = os.environ["SP_TENANT_ID"]
client_id = os.environ["SP_CLIENT_ID"]
credential = os.environ["SP_CREDENTIAL"]
batch_resource_id = os.environ["SP_BATCH_RESOURCE_ID"]
storage_resource_id = os.environ["SP_STORAGE_RESOURCE_ID"]
batch_account_name = os.environ.get("AZ_BATCH_ACCOUNT_NAME")
batch_account_key = os.environ.get("BATCH_ACCOUNT_KEY")
batch_service_url = os.environ.get("BATCH_SERVICE_URL")
tenant_id = os.environ.get("SP_TENANT_ID")
client_id = os.environ.get("SP_CLIENT_ID")
credential = os.environ.get("SP_CREDENTIAL")
batch_resource_id = os.environ.get("SP_BATCH_RESOURCE_ID")
storage_resource_id = os.environ.get("SP_STORAGE_RESOURCE_ID")

pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
Expand All @@ -33,9 +33,9 @@
spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"]
spark_job_ui_port = os.environ["SPARK_JOB_UI_PORT"]

storage_account_name = os.environ["STORAGE_ACCOUNT_NAME"]
storage_account_key = os.environ["STORAGE_ACCOUNT_KEY"]
storage_account_suffix = os.environ["STORAGE_ACCOUNT_SUFFIX"]
storage_account_name = os.environ.get("STORAGE_ACCOUNT_NAME")
storage_account_key = os.environ.get("STORAGE_ACCOUNT_KEY")
storage_account_suffix = os.environ.get("STORAGE_ACCOUNT_SUFFIX")

def get_blob_client() -> blob.BlockBlobService:
if not storage_resource_id:
Expand Down
11 changes: 6 additions & 5 deletions aztk/node_scripts/docker_main.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
# This file is the entry point of the docker container.

set -e
echo "Initializing spark container"

# --------------------
# Setup custom scripts
# --------------------
custom_script_dir=$DOCKER_WORKING_DIR/custom-scripts
aztk_dir=$DOCKER_WORKING_DIR/aztk
custom_script_dir=$AZTK_WORKING_DIR/custom-scripts
aztk_dir=$AZTK_WORKING_DIR/aztk

# -----------------------
# Preload jupyter samples
Expand All @@ -28,11 +29,11 @@ done
echo "Starting setup using Docker"

$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/pip install -r $(dirname $0)/requirements.txt
export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR
echo 'export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR' >> ~/.bashrc
export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR
echo 'export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR' >> ~/.bashrc

echo "Running main.py script"
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py install
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py setup-spark-container

# sleep to keep container running
while true; do sleep 1; done
6 changes: 3 additions & 3 deletions aztk/node_scripts/install/create_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from datetime import datetime, timezone, timedelta
import yaml
'''
Creates a user if the user configuration file at $DOCKER_WORKING_DIR/user.yaml exists
Creates a user if the user configuration file at $AZTK_WORKING_DIR/user.yaml exists
'''

def create_user(batch_client):
path = os.path.join(os.environ['DOCKER_WORKING_DIR'], "user.yaml")
path = os.path.join(os.environ['AZTK_WORKING_DIR'], "user.yaml")

if not os.path.isfile(path):
print("No user to create.")
Expand Down Expand Up @@ -43,7 +43,7 @@ def decrypt_password(user_conf):
tag = user_conf['tag']

# Read private key
with open(os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa'), encoding='UTF-8') as f:
with open(os.path.join(os.environ['AZTK_WORKING_DIR'], 'id_rsa'), encoding='UTF-8') as f:
private_key = RSA.import_key(f.read())
# Decrypt the session key with the public RSA key
cipher_rsa = PKCS1_OAEP.new(private_key)
Expand Down
75 changes: 51 additions & 24 deletions aztk/node_scripts/install/install.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,74 @@
import os
from core import config
from install import pick_master, spark, scripts, create_user, plugins
from install import pick_master, spark, scripts, create_user, plugins, spark_container
import wait_until_master_selected
from aztk.models.plugins import PluginTarget
from aztk.internal import cluster_data

def read_cluster_config():
data = cluster_data.ClusterData(config.blob_client, config.pool_id)
cluster_config = data.read_cluster_config()
print("Got cluster config", cluster_config)
return cluster_config

def setup_node():
def setup_host(docker_repo: str):
"""
Code to be run on the node(NOT in a container)
"""
client = config.batch_client

create_user.create_user(batch_client=client)

spark.setup_conf()

if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['MIXED_MODE'] == "False":
if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['AZTK_MIXED_MODE'] == "False":
is_master = pick_master.find_master(client)
else:
is_master = False
wait_until_master_selected.main()

is_worker = not is_master or os.environ["AZTK_WORKER_ON_MASTER"]
master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id))
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id)

os.environ["MASTER_IP"] = master_node.ip_address

if is_master:
setup_as_master()
plugins.setup_plugins(is_master=True, is_worker=True)
scripts.run_custom_scripts(is_master=True, is_worker=True)
else:
setup_as_worker()
plugins.setup_plugins(is_master=False, is_worker=True)
scripts.run_custom_scripts(is_master=False, is_worker=True)
os.environ["AZTK_IS_MASTER"] = "1"
if is_worker:
os.environ["AZTK_IS_WORKER"] = "1"

open("/tmp/setup_complete", 'a').close()
os.environ["AZTK_MASTER_IP"] = master_node.ip_address

cluster_conf = read_cluster_config()

spark_container.start_spark_container(
docker_repo=docker_repo,
gpu_enabled=os.environ.get("AZTK_GPU_ENABLED") == "true",
plugins=cluster_conf.plugins,
)
plugins.setup_plugins(target=PluginTarget.Host, is_master=is_master, is_worker=is_worker)


def setup_spark_container():
"""
Code run in the main spark container
"""
is_master = os.environ["AZTK_IS_MASTER"]
is_worker = os.environ["AZTK_IS_WORKER"]
print("Setting spark container. Master: ", is_master, ", Worker: ", is_worker)

print("Copying spark setup config")
spark.setup_conf()
print("Done copying spark setup config")

master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id))
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id)

def setup_as_master():
print("Setting up as master.")
spark.setup_connection()
spark.start_spark_master()
if os.environ["WORKER_ON_MASTER"] == "True":

if is_master:
spark.start_spark_master()

if is_worker:
spark.start_spark_worker()

def setup_as_worker():
print("Setting up as worker.")
spark.setup_connection()
spark.start_spark_worker()
plugins.setup_plugins(target=PluginTarget.SparkContainer, is_master=is_master, is_worker=is_worker)
scripts.run_custom_scripts(is_master=is_master, is_worker=is_worker)

open("/tmp/setup_complete", 'a').close()
Loading