Skip to content
This repository was archived by the owner on Feb 3, 2021. It is now read-only.

Commit 47000a5

Browse files
authored
Bug: add timeout handling to cluster_run and copy (#524)
* update cluster_run and copy to handle timeouts * fix * move timeout default to connect function
1 parent 9ccc1c6 commit 47000a5

File tree

4 files changed

+63
-31
lines changed

4 files changed

+63
-31
lines changed

aztk/client.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def __delete_user_on_pool(self, username, pool_id, nodes):
229229
concurrent.futures.wait(futures)
230230

231231

232-
def __cluster_run(self, cluster_id, command, internal, container_name=None):
232+
def __cluster_run(self, cluster_id, command, internal, container_name=None, timeout=None):
233233
pool, nodes = self.__get_pool_details(cluster_id)
234234
nodes = [node for node in nodes]
235235
if internal:
@@ -242,14 +242,15 @@ def __cluster_run(self, cluster_id, command, internal, container_name=None):
242242
'aztk',
243243
cluster_nodes,
244244
ssh_key=ssh_key.exportKey().decode('utf-8'),
245-
container_name=container_name))
245+
container_name=container_name,
246+
timeout=timeout))
246247
return output
247248
except OSError as exc:
248249
raise exc
249250
finally:
250251
self.__delete_user_on_pool('aztk', pool.id, nodes)
251252

252-
def __cluster_copy(self, cluster_id, source_path, destination_path, container_name=None, internal=False, get=False):
253+
def __cluster_copy(self, cluster_id, source_path, destination_path, container_name=None, internal=False, get=False, timeout=None):
253254
pool, nodes = self.__get_pool_details(cluster_id)
254255
nodes = [node for node in nodes]
255256
if internal:
@@ -265,7 +266,8 @@ def __cluster_copy(self, cluster_id, source_path, destination_path, container_na
265266
source_path=source_path,
266267
destination_path=destination_path,
267268
ssh_key=ssh_key.exportKey().decode('utf-8'),
268-
get=get))
269+
get=get,
270+
timeout=timeout))
269271
return output
270272
except (OSError, batch_error.BatchErrorException) as exc:
271273
raise exc

aztk/spark/client.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,23 +161,39 @@ def get_application_status(self, cluster_id: str, app_name: str):
161161
except batch_error.BatchErrorException as e:
162162
raise error.AztkError(helpers.format_batch_exception(e))
163163

164-
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False):
164+
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None):
165165
try:
166-
return self.__cluster_run(cluster_id, command, internal, container_name='spark' if not host else None)
166+
return self.__cluster_run(cluster_id,
167+
command,
168+
internal,
169+
container_name='spark' if not host else None,
170+
timeout=timeout)
167171
except batch_error.BatchErrorException as e:
168172
raise error.AztkError(helpers.format_batch_exception(e))
169173

170-
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
174+
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None):
171175
try:
172176
container_name = None if host else 'spark'
173-
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=False, internal=internal)
177+
return self.__cluster_copy(cluster_id,
178+
source_path,
179+
destination_path,
180+
container_name=container_name,
181+
get=False,
182+
internal=internal,
183+
timeout=timeout)
174184
except batch_error.BatchErrorException as e:
175185
raise error.AztkError(helpers.format_batch_exception(e))
176186

177-
def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
187+
def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None):
178188
try:
179189
container_name = None if host else 'spark'
180-
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=True, internal=internal)
190+
return self.__cluster_copy(cluster_id,
191+
source_path,
192+
destination_path,
193+
container_name=container_name,
194+
get=True,
195+
internal=internal,
196+
timeout=timeout)
181197
except batch_error.BatchErrorException as e:
182198
raise error.AztkError(helpers.format_batch_exception(e))
183199

aztk/utils/ssh.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,42 +5,45 @@
55
import io
66
import os
77
import select
8+
import socket
89
import socketserver as SocketServer
910
import sys
1011
from concurrent.futures import ThreadPoolExecutor
1112

13+
from aztk.error import AztkError
1214
from . import helpers
1315

1416

1517
def connect(hostname,
1618
port=22,
1719
username=None,
1820
password=None,
19-
pkey=None):
21+
pkey=None,
22+
timeout=None):
2023
import paramiko
2124

2225
client = paramiko.SSHClient()
23-
2426
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
2527

2628
if pkey:
2729
ssh_key = paramiko.RSAKey.from_private_key(file_obj=io.StringIO(pkey))
2830
else:
2931
ssh_key = None
3032

31-
client.connect(
32-
hostname,
33-
port=port,
34-
username=username,
35-
password=password,
36-
pkey=ssh_key
37-
)
33+
timeout = timeout or 20
34+
try:
35+
client.connect(hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
36+
except socket.timeout:
37+
raise AztkError("Connection timed out to: {}".format(hostname))
3838

3939
return client
4040

4141

42-
def node_exec_command(node_id, command, username, hostname, port, ssh_key=None, password=None, container_name=None):
43-
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key)
42+
def node_exec_command(node_id, command, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
43+
try:
44+
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
45+
except AztkError as e:
46+
return (node_id, e)
4447
if container_name:
4548
cmd = 'sudo docker exec 2>&1 -t {0} /bin/bash -c \'set -e; set -o pipefail; {1}; wait\''.format(container_name, command)
4649
else:
@@ -51,7 +54,7 @@ def node_exec_command(node_id, command, username, hostname, port, ssh_key=None,
5154
return (node_id, output)
5255

5356

54-
async def clus_exec_command(command, username, nodes, ports=None, ssh_key=None, password=None, container_name=None):
57+
async def clus_exec_command(command, username, nodes, ports=None, ssh_key=None, password=None, container_name=None, timeout=None):
5558
return await asyncio.gather(
5659
*[asyncio.get_event_loop().run_in_executor(ThreadPoolExecutor(),
5760
node_exec_command,
@@ -62,12 +65,16 @@ async def clus_exec_command(command, username, nodes, ports=None, ssh_key=None,
6265
node_rls.port,
6366
ssh_key,
6467
password,
65-
container_name) for node, node_rls in nodes]
68+
container_name,
69+
timeout) for node, node_rls in nodes]
6670
)
6771

6872

69-
def copy_from_node(node_id, source_path, destination_path, username, hostname, port, ssh_key=None, password=None, container_name=None):
70-
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key)
73+
def copy_from_node(node_id, source_path, destination_path, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
74+
try:
75+
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
76+
except AztkError as e:
77+
return (node_id, False, e)
7178
sftp_client = client.open_sftp()
7279
try:
7380
destination_path = os.path.join(os.path.dirname(destination_path), node_id, os.path.basename(destination_path))
@@ -82,8 +89,11 @@ def copy_from_node(node_id, source_path, destination_path, username, hostname, p
8289
client.close()
8390

8491

85-
def node_copy(node_id, source_path, destination_path, username, hostname, port, ssh_key=None, password=None, container_name=None):
86-
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key)
92+
def node_copy(node_id, source_path, destination_path, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
93+
try:
94+
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
95+
except AztkError as e:
96+
return (node_id, False, e)
8797
sftp_client = client.open_sftp()
8898
try:
8999
if container_name:
@@ -108,7 +118,7 @@ def node_copy(node_id, source_path, destination_path, username, hostname, port,
108118
#TODO: progress bar
109119

110120

111-
async def clus_copy(username, nodes, source_path, destination_path, ssh_key=None, password=None, container_name=None, get=False):
121+
async def clus_copy(username, nodes, source_path, destination_path, ssh_key=None, password=None, container_name=None, get=False, timeout=None):
112122
return await asyncio.gather(
113123
*[asyncio.get_event_loop().run_in_executor(ThreadPoolExecutor(),
114124
copy_from_node if get else node_copy,
@@ -120,5 +130,6 @@ async def clus_copy(username, nodes, source_path, destination_path, ssh_key=None
120130
node_rls.port,
121131
ssh_key,
122132
password,
123-
container_name) for node, node_rls in nodes]
133+
container_name,
134+
timeout) for node, node_rls in nodes]
124135
)

aztk_cli/spark/endpoints/cluster/cluster_run.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,8 @@ def print_execute_result(node_id, result):
2828
print("-" * (len(node_id) + 6))
2929
print("| ", node_id, " |")
3030
print("-" * (len(node_id) + 6))
31-
for line in result:
32-
print(line)
31+
if isinstance(result, Exception):
32+
print(result + "\n")
33+
else:
34+
for line in result:
35+
print(line)

0 commit comments

Comments
 (0)