Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48bfc9c
start implementation of cluster debug utility
jafreck Mar 21, 2018
dc55972
update debug program
jafreck Mar 21, 2018
0cb1c2c
update debug
jafreck Mar 22, 2018
d67e474
fix output directory structure
jafreck Mar 22, 2018
878c56d
cleanup output, add error checking
jafreck Mar 22, 2018
9ae6f31
sort imports
jafreck Mar 22, 2018
ac4b706
start untar
jafreck Mar 22, 2018
33dd49f
extract tar
jafreck Mar 22, 2018
c789cc1
add debug.py to pylintc ignore, line too long
jafreck Mar 22, 2018
677326a
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck Mar 22, 2018
0d35286
crlf->lf
jafreck Mar 22, 2018
cc7992a
Merge branch 'feature/spark-diagnostic-tool' of github.com:jafreck/az…
jafreck Mar 22, 2018
27fc883
add app logs
jafreck Mar 26, 2018
c2cbc4c
call get_spark_app_logs, typos
jafreck Mar 26, 2018
bbffe88
add docs
jafreck Mar 26, 2018
dd7034e
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck Mar 26, 2018
1da088e
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck Mar 27, 2018
044740d
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck Apr 4, 2018
e0f37c6
remove debug.py from pylintrc ignore
jafreck Apr 5, 2018
39b5b3f
added debug.py back to pylint ignore
jafreck Apr 5, 2018
b881c9f
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck Apr 5, 2018
f6377e1
change pylint ignore
jafreck Apr 5, 2018
0a19b72
Merge branch 'feature/spark-diagnostic-tool' of github.com:jafreck/az…
jafreck Apr 5, 2018
9c722d1
remove commented log
jafreck Apr 5, 2018
aa1b055
merge
jafreck Apr 9, 2018
dbee669
update cluster_run
jafreck Apr 9, 2018
964b075
refactor cluster_copy
jafreck Apr 9, 2018
c15c411
update debug, add spinner for run and copy
jafreck Apr 9, 2018
e43aa92
make new sdk cluster_download endpoint
jafreck Apr 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,43 +229,48 @@ def __delete_user_on_pool(self, username, pool_id, nodes):
concurrent.futures.wait(futures)


def __cluster_run(self, cluster_id, container_name, command, internal):
def __cluster_run(self, cluster_id, command, internal, container_name=None):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = [node for node in nodes]
if internal:
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
container_name,
'aztk',
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8')))
output = asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
'aztk',
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8'),
container_name=container_name))
return output
except OSError as exc:
raise exc
finally:
self.__delete_user_on_pool('aztk', pool.id, nodes)

def __cluster_copy(self, cluster_id, container_name, source_path, destination_path, internal):
def __cluster_copy(self, cluster_id, source_path, destination_path, container_name=None, internal=False, get=False):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = [node for node in nodes]
if internal:
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_copy(container_name=container_name,
username='aztk',
nodes=cluster_nodes,
source_path=source_path,
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8')))
self.__delete_user_on_pool('aztk', pool.id, nodes)
output = asyncio.get_event_loop().run_until_complete(
ssh_lib.clus_copy(container_name=container_name,
username='aztk',
nodes=cluster_nodes,
source_path=source_path,
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8'),
get=get))
return output
except (OSError, batch_error.BatchErrorException) as exc:
raise exc
finally:
self.__delete_user_on_pool('aztk', pool.id, nodes)

def __submit_job(self,
job_configuration,
Expand Down Expand Up @@ -388,5 +393,8 @@ def cluster_run(self, cluster_id, command):
def cluster_copy(self, cluster_id, source_path, destination_path):
raise NotImplementedError()

def cluster_download(self, cluster_id, source_path, destination_path):
raise NotImplementedError()

def submit_job(self, job):
raise NotImplementedError()
24 changes: 20 additions & 4 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from aztk.spark.helpers import submit as cluster_submit_helper
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.helpers import get_log as get_log_helper
from aztk.spark.helpers import cluster_diagnostic_helper
from aztk.spark.utils import util
from aztk.internal.cluster_data import NodeData
import yaml
Expand Down Expand Up @@ -146,15 +147,23 @@ def get_application_status(self, cluster_id: str, app_name: str):
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_run(self, cluster_id: str, command: str, internal: bool = False):
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False):
try:
return self.__cluster_run(cluster_id, 'spark', command, internal)
return self.__cluster_run(cluster_id, command, internal, container_name='spark' if not host else None)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, internal: bool = False):
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
try:
return self.__cluster_copy(cluster_id, 'spark', source_path, destination_path, internal)
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=False, internal=internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
try:
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=True, internal=internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down Expand Up @@ -272,3 +281,10 @@ def wait_until_job_finished(self, job_id):
def wait_until_all_jobs_finished(self, jobs):
for job in jobs:
self.wait_until_job_finished(job)

def run_cluster_diagnostics(self, cluster_id, output_directory):
try:
output = cluster_diagnostic_helper.run(self, cluster_id, output_directory)
return output
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
26 changes: 26 additions & 0 deletions aztk/spark/helpers/cluster_diagnostic_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from aztk.utils import ssh
from aztk.utils.command_builder import CommandBuilder
from aztk import models as aztk_models
import azure.batch.models as batch_models

def run(spark_client, cluster_id, output_directory):
# copy debug program to each node
spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True)
local_path = os.path.join(os.path.abspath(output_directory), "debug", "debug.zip")
remote_path = "/tmp/debug.zip"
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True)
# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(line + '\n') for node_id, result in run_output for line in result]
return output


def _build_diagnostic_ssh_command():
return "sudo rm -rf /tmp/debug.zip; "\
"sudo apt-get install -y python3-pip; "\
"sudo -H pip3 install --upgrade pip; "\
"sudo -H pip3 install docker; "\
"sudo python3 /tmp/debug.py"
160 changes: 160 additions & 0 deletions aztk/spark/utils/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Diagnostic program that runs on each node in the cluster
This program must be run with sudo
"""
import io
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: alpha sort

Copy link
Member Author

@jafreck jafreck Apr 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are PEP8 sorted, and alpha sorted within the groups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea why I felt these were not sorted... You're right.

import json
import os
import socket
import tarfile
from subprocess import STDOUT, CalledProcessError, check_output
from zipfile import ZIP_DEFLATED, ZipFile

import docker # pylint: disable=import-error


def main():
zipf = create_zip_archive()

# general node diagnostics
zipf.writestr("hostname.txt", data=get_hostname())
zipf.writestr("df.txt", data=get_disk_free())

# docker container diagnostics
docker_client = docker.from_env()
for filename, data in get_docker_diagnostics(docker_client):
zipf.writestr(filename, data=data)

zipf.close()


def create_zip_archive():
zip_file_path = "/tmp/debug.zip"
return ZipFile(zip_file_path, "w", ZIP_DEFLATED)


def get_hostname():
return socket.gethostname()


def cmd_check_output(cmd):
try:
output = check_output(cmd, shell=True, stderr=STDOUT)
except CalledProcessError as e:
return "CMD: {0}\n"\
"returncode: {1}"\
"output: {2}".format(e.cmd, e.returncode, e.output)
else:
return output


def get_disk_free():
return cmd_check_output("df -h")


def get_docker_diagnostics(docker_client):
'''
returns list of tuples (filename, data) to be written in the zip
'''
output = []
output.append(get_docker_images(docker_client))
logs = get_docker_containers(docker_client)
for item in logs:
output.append(item)

return output


def get_docker_images(docker_client):
output = ""
try:
images = docker_client.images.list()
for image in images:
output += json.dumps(image.attrs, sort_keys=True, indent=4)
return ("docker-images.txt", output)
except docker.errors.APIError as e:
return ("docker-images.err", e.__str__())


def get_docker_containers(docker_client):
container_attrs = ""
logs = []
try:
containers = docker_client.containers.list()
for container in containers:
container_attrs += json.dumps(container.attrs, sort_keys=True, indent=4)
# get docker container logs
logs.append((container.name + "/docker.log", container.logs()))
logs.append(get_docker_process_status(container))
if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers
logs.extend(get_container_aztk_script(container))
logs.extend(get_spark_logs(container))
logs.extend(get_spark_app_logs(container))

logs.append(("docker-containers.txt", container_attrs))
return logs
except docker.errors.APIError as e:
return [("docker-containers.err", e.__str__())]


def get_docker_process_status(container):
try:
exit_code, output = container.exec_run("ps -auxw", tty=True, privileged=True)
out_file_name = container.name + "/ps_aux.txt"
if exit_code == 0:
return (out_file_name, output)
else:
return (out_file_name, "exit_code: {0}\n{1}".format(exit_code, output))
except docker.errors.APIError as e:
return (container.name + "ps_aux.err", e.__str__())


def get_container_aztk_script(container):
aztk_path = "/mnt/batch/tasks/startup/wd"
try:
stream, _ = container.get_archive(aztk_path) # second item is stat info
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return (container.name + "/" + "aztk-scripts.err", e.__str__())


def get_spark_logs(container):
spark_logs_path = "/home/spark-current/logs"
try:
stream, _ = container.get_archive(spark_logs_path) # second item is stat info
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return [(container.name + "/" + "spark-logs.err", e.__str__())]


def get_spark_app_logs(container):
spark_app_logs_path = "/home/spark-current/work"
try:
stream, _ = container.get_archive(spark_app_logs_path)
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return [(container.name + "/" + "spark-work-logs.err", e.__str__())]


def filter_members(members):
skip_files = ["id_rsa", "id_rsa.pub", "docker.log"]
skip_extensions = [".pyc", ".zip"]
for tarinfo in members:
if (os.path.splitext(tarinfo.name)[1] not in skip_extensions and
os.path.basename(tarinfo.name) not in skip_files):
yield tarinfo


def extract_tar_in_memory(container, data):
data = io.BytesIO(b''.join([item for item in data]))
tarf = tarfile.open(fileobj=data)
logs = []
for member in filter_members(tarf):
file_bytes = tarf.extractfile(member)
if file_bytes is not None:
logs.append((container.name + "/" + member.name, b''.join(file_bytes.readlines())))
return logs


if __name__ == "__main__":
main()
Loading