This repository was archived by the owner on Feb 3, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 64
Feature: spark debug tool #455
Merged
Merged
Changes from all commits
Commits
Show all changes
29 commits
Select commit
Hold shift + click to select a range
48bfc9c
start implementation of cluster debug utility
jafreck dc55972
update debug program
jafreck 0cb1c2c
update debug
jafreck d67e474
fix output directory structure
jafreck 878c56d
cleanup output, add error checking
jafreck 9ae6f31
sort imports
jafreck ac4b706
start untar
jafreck 33dd49f
extract tar
jafreck c789cc1
add debug.py to pylintc ignore, line too long
jafreck 677326a
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck 0d35286
crlf->lf
jafreck cc7992a
Merge branch 'feature/spark-diagnostic-tool' of github.com:jafreck/az…
jafreck 27fc883
add app logs
jafreck c2cbc4c
call get_spark_app_logs, typos
jafreck bbffe88
add docs
jafreck dd7034e
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck 1da088e
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck 044740d
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck e0f37c6
remove debug.py from pylintrc ignore
jafreck 39b5b3f
added debug.py back to pylint ignore
jafreck b881c9f
Merge branch 'master' into feature/spark-diagnostic-tool
jafreck f6377e1
change pylint ignore
jafreck 0a19b72
Merge branch 'feature/spark-diagnostic-tool' of github.com:jafreck/az…
jafreck 9c722d1
remove commented log
jafreck aa1b055
merge
jafreck dbee669
update cluster_run
jafreck 964b075
refactor cluster_copy
jafreck c15c411
update debug, add spinner for run and copy
jafreck e43aa92
make new sdk cluster_download endpoint
jafreck File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| import os | ||
| from aztk.utils import ssh | ||
| from aztk.utils.command_builder import CommandBuilder | ||
| from aztk import models as aztk_models | ||
| import azure.batch.models as batch_models | ||
|
|
||
| def run(spark_client, cluster_id, output_directory): | ||
| # copy debug program to each node | ||
| spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True) | ||
| ssh_cmd = _build_diagnostic_ssh_command() | ||
| run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True) | ||
| local_path = os.path.join(os.path.abspath(output_directory), "debug", "debug.zip") | ||
| remote_path = "/tmp/debug.zip" | ||
| output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True) | ||
| # write run output to debug/ directory | ||
| with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f: | ||
| [f.write(line + '\n') for node_id, result in run_output for line in result] | ||
| return output | ||
|
|
||
|
|
||
| def _build_diagnostic_ssh_command(): | ||
| return "sudo rm -rf /tmp/debug.zip; "\ | ||
| "sudo apt-get install -y python3-pip; "\ | ||
| "sudo -H pip3 install --upgrade pip; "\ | ||
| "sudo -H pip3 install docker; "\ | ||
| "sudo python3 /tmp/debug.py" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| """ | ||
| Diagnostic program that runs on each node in the cluster | ||
| This program must be run with sudo | ||
| """ | ||
| import io | ||
| import json | ||
| import os | ||
| import socket | ||
| import tarfile | ||
| from subprocess import STDOUT, CalledProcessError, check_output | ||
| from zipfile import ZIP_DEFLATED, ZipFile | ||
|
|
||
| import docker # pylint: disable=import-error | ||
|
|
||
|
|
||
| def main(): | ||
| zipf = create_zip_archive() | ||
|
|
||
| # general node diagnostics | ||
| zipf.writestr("hostname.txt", data=get_hostname()) | ||
| zipf.writestr("df.txt", data=get_disk_free()) | ||
|
|
||
| # docker container diagnostics | ||
| docker_client = docker.from_env() | ||
| for filename, data in get_docker_diagnostics(docker_client): | ||
| zipf.writestr(filename, data=data) | ||
|
|
||
| zipf.close() | ||
|
|
||
|
|
||
| def create_zip_archive(): | ||
| zip_file_path = "/tmp/debug.zip" | ||
| return ZipFile(zip_file_path, "w", ZIP_DEFLATED) | ||
|
|
||
|
|
||
| def get_hostname(): | ||
| return socket.gethostname() | ||
|
|
||
|
|
||
| def cmd_check_output(cmd): | ||
| try: | ||
| output = check_output(cmd, shell=True, stderr=STDOUT) | ||
| except CalledProcessError as e: | ||
| return "CMD: {0}\n"\ | ||
| "returncode: {1}"\ | ||
| "output: {2}".format(e.cmd, e.returncode, e.output) | ||
| else: | ||
| return output | ||
|
|
||
|
|
||
| def get_disk_free(): | ||
| return cmd_check_output("df -h") | ||
|
|
||
|
|
||
| def get_docker_diagnostics(docker_client): | ||
| ''' | ||
| returns list of tuples (filename, data) to be written in the zip | ||
| ''' | ||
| output = [] | ||
| output.append(get_docker_images(docker_client)) | ||
| logs = get_docker_containers(docker_client) | ||
| for item in logs: | ||
| output.append(item) | ||
|
|
||
| return output | ||
|
|
||
|
|
||
| def get_docker_images(docker_client): | ||
| output = "" | ||
| try: | ||
| images = docker_client.images.list() | ||
| for image in images: | ||
| output += json.dumps(image.attrs, sort_keys=True, indent=4) | ||
| return ("docker-images.txt", output) | ||
| except docker.errors.APIError as e: | ||
| return ("docker-images.err", e.__str__()) | ||
|
|
||
|
|
||
| def get_docker_containers(docker_client): | ||
| container_attrs = "" | ||
| logs = [] | ||
| try: | ||
| containers = docker_client.containers.list() | ||
| for container in containers: | ||
| container_attrs += json.dumps(container.attrs, sort_keys=True, indent=4) | ||
| # get docker container logs | ||
| logs.append((container.name + "/docker.log", container.logs())) | ||
| logs.append(get_docker_process_status(container)) | ||
| if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers | ||
| logs.extend(get_container_aztk_script(container)) | ||
| logs.extend(get_spark_logs(container)) | ||
| logs.extend(get_spark_app_logs(container)) | ||
|
|
||
| logs.append(("docker-containers.txt", container_attrs)) | ||
| return logs | ||
| except docker.errors.APIError as e: | ||
| return [("docker-containers.err", e.__str__())] | ||
|
|
||
|
|
||
| def get_docker_process_status(container): | ||
| try: | ||
| exit_code, output = container.exec_run("ps -auxw", tty=True, privileged=True) | ||
| out_file_name = container.name + "/ps_aux.txt" | ||
| if exit_code == 0: | ||
| return (out_file_name, output) | ||
| else: | ||
| return (out_file_name, "exit_code: {0}\n{1}".format(exit_code, output)) | ||
| except docker.errors.APIError as e: | ||
| return (container.name + "ps_aux.err", e.__str__()) | ||
|
|
||
|
|
||
| def get_container_aztk_script(container): | ||
| aztk_path = "/mnt/batch/tasks/startup/wd" | ||
| try: | ||
| stream, _ = container.get_archive(aztk_path) # second item is stat info | ||
| return extract_tar_in_memory(container, stream) | ||
| except docker.errors.APIError as e: | ||
| return (container.name + "/" + "aztk-scripts.err", e.__str__()) | ||
|
|
||
|
|
||
| def get_spark_logs(container): | ||
| spark_logs_path = "/home/spark-current/logs" | ||
| try: | ||
| stream, _ = container.get_archive(spark_logs_path) # second item is stat info | ||
| return extract_tar_in_memory(container, stream) | ||
| except docker.errors.APIError as e: | ||
| return [(container.name + "/" + "spark-logs.err", e.__str__())] | ||
|
|
||
|
|
||
| def get_spark_app_logs(container): | ||
| spark_app_logs_path = "/home/spark-current/work" | ||
| try: | ||
| stream, _ = container.get_archive(spark_app_logs_path) | ||
| return extract_tar_in_memory(container, stream) | ||
| except docker.errors.APIError as e: | ||
| return [(container.name + "/" + "spark-work-logs.err", e.__str__())] | ||
|
|
||
|
|
||
| def filter_members(members): | ||
| skip_files = ["id_rsa", "id_rsa.pub", "docker.log"] | ||
| skip_extensions = [".pyc", ".zip"] | ||
| for tarinfo in members: | ||
| if (os.path.splitext(tarinfo.name)[1] not in skip_extensions and | ||
| os.path.basename(tarinfo.name) not in skip_files): | ||
| yield tarinfo | ||
|
|
||
|
|
||
| def extract_tar_in_memory(container, data): | ||
| data = io.BytesIO(b''.join([item for item in data])) | ||
| tarf = tarfile.open(fileobj=data) | ||
| logs = [] | ||
| for member in filter_members(tarf): | ||
| file_bytes = tarf.extractfile(member) | ||
| if file_bytes is not None: | ||
| logs.append((container.name + "/" + member.name, b''.join(file_bytes.readlines()))) | ||
| return logs | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: alpha sort
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are PEP8 sorted, and alpha sorted within the groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea why I felt these were not sorted... You're right.