Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aztk/node_scripts/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __app_submit_cmd(
spark_submit_cmd.add_option('--executor-cores', str(executor_cores))

spark_submit_cmd.add_argument(
os.environ['AZ_BATCH_TASK_WORKING_DIR'] + '/' + app + ' ' +
os.path.expandvars(app) + ' ' +
' '.join(['\'' + str(app_arg) + '\'' for app_arg in (app_args or [])]))

with open("spark-submit.txt", mode="w", encoding="UTF-8") as stream:
Expand Down Expand Up @@ -156,7 +156,7 @@ def recieve_submit_request(application_file_path):

cmd = __app_submit_cmd(
name=application['name'],
app=os.path.basename(application['application']),
app=application['application'],
app_args=application['application_args'],
main_class=application['main_class'],
jars=application['jars'],
Expand Down
4 changes: 2 additions & 2 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ def get_remote_login_settings(self, cluster_id: str, node_id: str):
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def submit(self, cluster_id: str, application: models.ApplicationConfiguration, wait: bool = False):
def submit(self, cluster_id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False):
try:
cluster_submit_helper.submit_application(self, cluster_id, application, wait)
cluster_submit_helper.submit_application(self, cluster_id, application, remote, wait)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down
25 changes: 14 additions & 11 deletions aztk/spark/helpers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ def __get_node(spark_client, node_id: str, cluster_id: str) -> batch_models.Comp
return spark_client.batch_client.compute_node.get(cluster_id, node_id)


def generate_task(spark_client, container_id, application):
def generate_task(spark_client, container_id, application, remote=False):
resource_files = []

app_resource_file = helpers.upload_file_to_container(container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
use_full_path=False)
# The application provided is not hosted remotely and therefore must be uploaded
if not remote:
app_resource_file = helpers.upload_file_to_container(container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
use_full_path=False)

# Upload application file
resource_files.append(app_resource_file)
# Upload application file
resource_files.append(app_resource_file)

application.application = '$AZ_BATCH_TASK_WORKING_DIR/' + os.path.basename(application.application)

# Upload dependent JARS
jar_resource_file_paths = []
Expand Down Expand Up @@ -64,7 +68,6 @@ def generate_task(spark_client, container_id, application):
resource_files.append(files_resource_file_path)

# Upload application definition
application.application = os.path.basename(application.application)
application.jars = [os.path.basename(jar) for jar in application.jars]
application.py_files = [os.path.basename(py_files) for py_files in application.py_files]
application.files = [os.path.basename(files) for files in application.files]
Expand Down Expand Up @@ -112,11 +115,11 @@ def affinitize_task_to_master(spark_client, cluster_id, task):
return task


def submit_application(spark_client, cluster_id, application, wait: bool = False):
def submit_application(spark_client, cluster_id, application, remote: bool = False, wait: bool = False):
"""
Submit a spark app
"""
task = generate_task(spark_client, cluster_id, application)
task = generate_task(spark_client, cluster_id, application, remote)
task = affinitize_task_to_master(spark_client, cluster_id, task)


Expand Down
12 changes: 10 additions & 2 deletions aztk_cli/spark/endpoints/cluster/cluster_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ def setup_parser(parser: argparse.ArgumentParser):
help='Path to the file you wish to output to. If not \
specified, output is printed to stdout')

parser.add_argument('--remote', action='store_true',
help='Do not upload the app to the cluster, assume it is \
already accessible at the given path')

parser.add_argument('app',
help='App jar OR python file to execute. Use absolute \
path to reference file.')
help='App jar OR python file to execute. A path to a local \
file is expected, unless used in conjunction with \
the --remote flag. When the --remote flag is set, a \
remote path that is accessible from the cluster is \
expected. Remote paths are not validated up-front.')

parser.add_argument('app_args', nargs='*',
help='Arguments for the application')
Expand Down Expand Up @@ -146,6 +153,7 @@ def execute(args: typing.NamedTuple):
executor_cores=args.executor_cores,
max_retry_count=args.max_retry_count
),
remote=args.remote,
wait=False
)

Expand Down
7 changes: 6 additions & 1 deletion docs/20-spark-submit.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ Run a Spark job:
aztk spark cluster submit --id <name_of_spark_cluster> --name <name_of_spark_job> <executable> <executable_params>
```

For example, run a local pi.py file on a Spark cluster
For example, to run a local pi.py file on a Spark cluster, simply specify the local path of the file:
```sh
aztk spark cluster submit --id spark --name pipy examples/src/main/python/pi.py 100
```

To run a remotely hosted pi.py file on a Spark cluster, specify the remote path of the file and use the '--remote' flag:
```sh
aztk spark cluster submit --id spark --name pipy --remote wasbs://path@remote/pi.py 100
```

NOTE: The job name (--name) must be atleast 3 characters long, can only contain alphanumeric characters including hyphens but excluding underscores, and cannot contain uppercase letters. Each job you submit **must** have a unique name.

## Monitoring job
Expand Down