Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit 44a0765

Browse files
authored
Feature: spark debug tool (#455)
* start implementation of cluster debug utility * update debug program * update debug * fix output directory structure * cleanup output, add error checking * sort imports * start untar * extract tar * add debug.py to pylintc ignore, line too long * crlf->lf * add app logs * call get_spark_app_logs, typos * add docs * remove debug.py from pylintrc ignore * added debug.py back to pylint ignore * change pylint ignore * remove commented log * update cluster_run * refactor cluster_copy * update debug, add spinner for run and copy * make new sdk cluster_download endpoint
1 parent 61e7c59 commit 44a0765

File tree

10 files changed

+383
-71
lines changed

10 files changed

+383
-71
lines changed

aztk/client.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -229,43 +229,48 @@ def __delete_user_on_pool(self, username, pool_id, nodes):
229229
concurrent.futures.wait(futures)
230230

231231

232-
def __cluster_run(self, cluster_id, container_name, command, internal):
232+
def __cluster_run(self, cluster_id, command, internal, container_name=None):
233233
pool, nodes = self.__get_pool_details(cluster_id)
234234
nodes = [node for node in nodes]
235235
if internal:
236-
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
236+
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
237237
else:
238-
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
238+
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
239239
try:
240240
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
241-
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
242-
container_name,
243-
'aztk',
244-
cluster_nodes,
245-
ssh_key=ssh_key.exportKey().decode('utf-8')))
241+
output = asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
242+
'aztk',
243+
cluster_nodes,
244+
ssh_key=ssh_key.exportKey().decode('utf-8'),
245+
container_name=container_name))
246+
return output
246247
except OSError as exc:
247248
raise exc
248249
finally:
249250
self.__delete_user_on_pool('aztk', pool.id, nodes)
250251

251-
def __cluster_copy(self, cluster_id, container_name, source_path, destination_path, internal):
252+
def __cluster_copy(self, cluster_id, source_path, destination_path, container_name=None, internal=False, get=False):
252253
pool, nodes = self.__get_pool_details(cluster_id)
253254
nodes = [node for node in nodes]
254255
if internal:
255-
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
256+
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
256257
else:
257-
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
258+
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
258259
try:
259260
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
260-
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_copy(container_name=container_name,
261-
username='aztk',
262-
nodes=cluster_nodes,
263-
source_path=source_path,
264-
destination_path=destination_path,
265-
ssh_key=ssh_key.exportKey().decode('utf-8')))
266-
self.__delete_user_on_pool('aztk', pool.id, nodes)
261+
output = asyncio.get_event_loop().run_until_complete(
262+
ssh_lib.clus_copy(container_name=container_name,
263+
username='aztk',
264+
nodes=cluster_nodes,
265+
source_path=source_path,
266+
destination_path=destination_path,
267+
ssh_key=ssh_key.exportKey().decode('utf-8'),
268+
get=get))
269+
return output
267270
except (OSError, batch_error.BatchErrorException) as exc:
268271
raise exc
272+
finally:
273+
self.__delete_user_on_pool('aztk', pool.id, nodes)
269274

270275
def __submit_job(self,
271276
job_configuration,
@@ -388,5 +393,8 @@ def cluster_run(self, cluster_id, command):
388393
def cluster_copy(self, cluster_id, source_path, destination_path):
389394
raise NotImplementedError()
390395

396+
def cluster_download(self, cluster_id, source_path, destination_path):
397+
raise NotImplementedError()
398+
391399
def submit_job(self, job):
392400
raise NotImplementedError()

aztk/spark/client.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from aztk.spark.helpers import submit as cluster_submit_helper
1010
from aztk.spark.helpers import job_submission as job_submit_helper
1111
from aztk.spark.helpers import get_log as get_log_helper
12+
from aztk.spark.helpers import cluster_diagnostic_helper
1213
from aztk.spark.utils import util
1314
from aztk.internal.cluster_data import NodeData
1415
import yaml
@@ -146,15 +147,23 @@ def get_application_status(self, cluster_id: str, app_name: str):
146147
except batch_error.BatchErrorException as e:
147148
raise error.AztkError(helpers.format_batch_exception(e))
148149

149-
def cluster_run(self, cluster_id: str, command: str, internal: bool = False):
150+
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False):
150151
try:
151-
return self.__cluster_run(cluster_id, 'spark', command, internal)
152+
return self.__cluster_run(cluster_id, command, internal, container_name='spark' if not host else None)
152153
except batch_error.BatchErrorException as e:
153154
raise error.AztkError(helpers.format_batch_exception(e))
154155

155-
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, internal: bool = False):
156+
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
156157
try:
157-
return self.__cluster_copy(cluster_id, 'spark', source_path, destination_path, internal)
158+
container_name = None if host else 'spark'
159+
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=False, internal=internal)
160+
except batch_error.BatchErrorException as e:
161+
raise error.AztkError(helpers.format_batch_exception(e))
162+
163+
def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
164+
try:
165+
container_name = None if host else 'spark'
166+
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=True, internal=internal)
158167
except batch_error.BatchErrorException as e:
159168
raise error.AztkError(helpers.format_batch_exception(e))
160169

@@ -272,3 +281,10 @@ def wait_until_job_finished(self, job_id):
272281
def wait_until_all_jobs_finished(self, jobs):
273282
for job in jobs:
274283
self.wait_until_job_finished(job)
284+
285+
def run_cluster_diagnostics(self, cluster_id, output_directory):
286+
try:
287+
output = cluster_diagnostic_helper.run(self, cluster_id, output_directory)
288+
return output
289+
except batch_error.BatchErrorException as e:
290+
raise error.AztkError(helpers.format_batch_exception(e))
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import os
2+
from aztk.utils import ssh
3+
from aztk.utils.command_builder import CommandBuilder
4+
from aztk import models as aztk_models
5+
import azure.batch.models as batch_models
6+
7+
def run(spark_client, cluster_id, output_directory):
8+
# copy debug program to each node
9+
spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
10+
ssh_cmd = _build_diagnostic_ssh_command()
11+
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True)
12+
local_path = os.path.join(os.path.abspath(output_directory), "debug", "debug.zip")
13+
remote_path = "/tmp/debug.zip"
14+
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True)
15+
# write run output to debug/ directory
16+
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
17+
[f.write(line + '\n') for node_id, result in run_output for line in result]
18+
return output
19+
20+
21+
def _build_diagnostic_ssh_command():
22+
return "sudo rm -rf /tmp/debug.zip; "\
23+
"sudo apt-get install -y python3-pip; "\
24+
"sudo -H pip3 install --upgrade pip; "\
25+
"sudo -H pip3 install docker; "\
26+
"sudo python3 /tmp/debug.py"

aztk/spark/utils/debug.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
"""
2+
Diagnostic program that runs on each node in the cluster
3+
This program must be run with sudo
4+
"""
5+
import io
6+
import json
7+
import os
8+
import socket
9+
import tarfile
10+
from subprocess import STDOUT, CalledProcessError, check_output
11+
from zipfile import ZIP_DEFLATED, ZipFile
12+
13+
import docker # pylint: disable=import-error
14+
15+
16+
def main():
17+
zipf = create_zip_archive()
18+
19+
# general node diagnostics
20+
zipf.writestr("hostname.txt", data=get_hostname())
21+
zipf.writestr("df.txt", data=get_disk_free())
22+
23+
# docker container diagnostics
24+
docker_client = docker.from_env()
25+
for filename, data in get_docker_diagnostics(docker_client):
26+
zipf.writestr(filename, data=data)
27+
28+
zipf.close()
29+
30+
31+
def create_zip_archive():
32+
zip_file_path = "/tmp/debug.zip"
33+
return ZipFile(zip_file_path, "w", ZIP_DEFLATED)
34+
35+
36+
def get_hostname():
37+
return socket.gethostname()
38+
39+
40+
def cmd_check_output(cmd):
41+
try:
42+
output = check_output(cmd, shell=True, stderr=STDOUT)
43+
except CalledProcessError as e:
44+
return "CMD: {0}\n"\
45+
"returncode: {1}"\
46+
"output: {2}".format(e.cmd, e.returncode, e.output)
47+
else:
48+
return output
49+
50+
51+
def get_disk_free():
52+
return cmd_check_output("df -h")
53+
54+
55+
def get_docker_diagnostics(docker_client):
56+
'''
57+
returns list of tuples (filename, data) to be written in the zip
58+
'''
59+
output = []
60+
output.append(get_docker_images(docker_client))
61+
logs = get_docker_containers(docker_client)
62+
for item in logs:
63+
output.append(item)
64+
65+
return output
66+
67+
68+
def get_docker_images(docker_client):
69+
output = ""
70+
try:
71+
images = docker_client.images.list()
72+
for image in images:
73+
output += json.dumps(image.attrs, sort_keys=True, indent=4)
74+
return ("docker-images.txt", output)
75+
except docker.errors.APIError as e:
76+
return ("docker-images.err", e.__str__())
77+
78+
79+
def get_docker_containers(docker_client):
80+
container_attrs = ""
81+
logs = []
82+
try:
83+
containers = docker_client.containers.list()
84+
for container in containers:
85+
container_attrs += json.dumps(container.attrs, sort_keys=True, indent=4)
86+
# get docker container logs
87+
logs.append((container.name + "/docker.log", container.logs()))
88+
logs.append(get_docker_process_status(container))
89+
if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers
90+
logs.extend(get_container_aztk_script(container))
91+
logs.extend(get_spark_logs(container))
92+
logs.extend(get_spark_app_logs(container))
93+
94+
logs.append(("docker-containers.txt", container_attrs))
95+
return logs
96+
except docker.errors.APIError as e:
97+
return [("docker-containers.err", e.__str__())]
98+
99+
100+
def get_docker_process_status(container):
101+
try:
102+
exit_code, output = container.exec_run("ps -auxw", tty=True, privileged=True)
103+
out_file_name = container.name + "/ps_aux.txt"
104+
if exit_code == 0:
105+
return (out_file_name, output)
106+
else:
107+
return (out_file_name, "exit_code: {0}\n{1}".format(exit_code, output))
108+
except docker.errors.APIError as e:
109+
return (container.name + "ps_aux.err", e.__str__())
110+
111+
112+
def get_container_aztk_script(container):
113+
aztk_path = "/mnt/batch/tasks/startup/wd"
114+
try:
115+
stream, _ = container.get_archive(aztk_path) # second item is stat info
116+
return extract_tar_in_memory(container, stream)
117+
except docker.errors.APIError as e:
118+
return (container.name + "/" + "aztk-scripts.err", e.__str__())
119+
120+
121+
def get_spark_logs(container):
122+
spark_logs_path = "/home/spark-current/logs"
123+
try:
124+
stream, _ = container.get_archive(spark_logs_path) # second item is stat info
125+
return extract_tar_in_memory(container, stream)
126+
except docker.errors.APIError as e:
127+
return [(container.name + "/" + "spark-logs.err", e.__str__())]
128+
129+
130+
def get_spark_app_logs(container):
131+
spark_app_logs_path = "/home/spark-current/work"
132+
try:
133+
stream, _ = container.get_archive(spark_app_logs_path)
134+
return extract_tar_in_memory(container, stream)
135+
except docker.errors.APIError as e:
136+
return [(container.name + "/" + "spark-work-logs.err", e.__str__())]
137+
138+
139+
def filter_members(members):
140+
skip_files = ["id_rsa", "id_rsa.pub", "docker.log"]
141+
skip_extensions = [".pyc", ".zip"]
142+
for tarinfo in members:
143+
if (os.path.splitext(tarinfo.name)[1] not in skip_extensions and
144+
os.path.basename(tarinfo.name) not in skip_files):
145+
yield tarinfo
146+
147+
148+
def extract_tar_in_memory(container, data):
149+
data = io.BytesIO(b''.join([item for item in data]))
150+
tarf = tarfile.open(fileobj=data)
151+
logs = []
152+
for member in filter_members(tarf):
153+
file_bytes = tarf.extractfile(member)
154+
if file_bytes is not None:
155+
logs.append((container.name + "/" + member.name, b''.join(file_bytes.readlines())))
156+
return logs
157+
158+
159+
if __name__ == "__main__":
160+
main()

0 commit comments

Comments
 (0)