Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions bin/spark-cluster-create
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ if __name__ == '__main__':
_pool_id = None
_vm_count = None
_vm_size = None
_custom_script = None

_wait = True

Expand All @@ -29,6 +30,8 @@ if __name__ == '__main__':
help="number of vms in your cluster")
parser.add_argument("--cluster-vm-size", required=True,
help="size of each vm in your cluster")
parser.add_argument("--custom-script",
help="absolute path of custom bash script (.sh) to run on each node")
parser.add_argument('--wait', dest='wait', action='store_true')
parser.add_argument('--no-wait', dest='wait', action='store_false')
parser.set_defaults(wait=False)
Expand All @@ -48,6 +51,10 @@ if __name__ == '__main__':
_vm_size = args.cluster_vm_size
print("spark cluster vm size: %s" % _vm_size)

if args.custom_script is not None:
_custom_script = args.custom_script
print("path to custom script: %s" % _custom_script)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: going forward I usually prefer print('foo {}'.format('myString')). Do you have an opinion?


if args.wait is not None:
if args.wait == False:
_wait = False
Expand All @@ -62,15 +69,28 @@ if __name__ == '__main__':
batch_account_name = global_config.get('Batch', 'batchaccountname')
batch_service_url = global_config.get('Batch', 'batchserviceurl')

# Set up storage configuration
storage_account_key = global_config.get('Storage', 'storageaccountkey')
storage_account_name = global_config.get('Storage', 'storageaccountname')
storage_account_suffix = global_config.get('Storage', 'storageaccountsuffix')

# create batch client
batch_client = util.create_batch_client(
batch_account_key,
batch_account_name,
batch_service_url)

# create storage client
blob_client = util.create_blob_client(
storage_account_key,
storage_account_name,
storage_account_suffix)

# create spark cluster
clusterlib.create_cluster(
batch_client,
blob_client,
_custom_script,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do some variables start with _ and not others?

_pool_id,
_vm_count,
_vm_size,
Expand Down
12 changes: 8 additions & 4 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,18 @@ if __name__ == '__main__':

parser.add_argument("--jars",
help="Comma-separated list of local jars to include \
on the driver and executor classpaths")
on the driver and executor classpaths. Use \
absolute path to reference files.")

parser.add_argument("--py-files",
help="Comma-separated list of .zip, .egg, or .py files \
to place on the PYTHONPATH for Python apps.")
to place on the PYTHONPATH for Python apps. Use \
absolute path to reference files.")

parser.add_argument("--files",
help="Comma-separated list of .zip, .egg, or .py files \
to place on the PYTHONPATH for Python apps.")
to place on the PYTHONPATH for Python apps. Use \
absolute path ot reference files.")

parser.add_argument("--driver-java-options",
help="Extra Java options to pass to the driver.")
Expand All @@ -95,7 +98,8 @@ if __name__ == '__main__':
available cores on the worker")

parser.add_argument("application", nargs='*',
help="App jar OR python file to execute")
help="App jar OR python file to execute. Use absolute \
path to reference file.")

args = parser.parse_args()

Expand Down
29 changes: 27 additions & 2 deletions dtde/clusterlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,23 @@
import azure.batch.models as batch_models
from subprocess import call

def cluster_install_cmd():
def cluster_install_cmd(custom_script_file):

run_custom_script = ''
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some places use "", while other use ''. We should consolidate on one.

if custom_script_file is not None:
run_custom_script = '/bin/sh -c ' + custom_script_file
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use '/bin/sh -c {}'.format(custom_script_file)


return [
# setup spark home and permissions for spark folder
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
'chmod -R 777 $SPARK_HOME',
'chmod -R 777 /usr/local/share/jupyter/kernels',

# To avoid error: "sudo: sorry, you must have a tty to run sudo"
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
run_custom_script,

'exit 0'
]

Expand Down Expand Up @@ -106,6 +117,8 @@ def cluster_start_cmd(webui_port, jupyter_port):

def create_cluster(
batch_client,
blob_client,
custom_script,
pool_id,
vm_count,
vm_size,
Expand All @@ -122,8 +135,19 @@ def create_cluster(
# reuse pool_id as job_id
job_id = pool_id

# Upload custom script file
resource_files = []
if custom_script is not None:
resource_files.append(
util.upload_file_to_container(
blob_client,
container_name = pool_id,
file_path = custom_script,
use_full_path = True))

# start task command
start_task_commands = cluster_install_cmd()
start_task_commands = \
cluster_install_cmd(custom_script)

# Get a verified node agent sku
sku_to_use, image_ref_to_use = \
Expand All @@ -140,6 +164,7 @@ def create_cluster(
target_dedicated = vm_count,
start_task = batch_models.StartTask(
command_line = util.wrap_commands_in_shell(start_task_commands),
resource_files = resource_files,
user_identity = batch_models.UserIdentity(
auto_user = batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool,
Expand Down