55import io
66import os
77import select
8+ import socket
89import socketserver as SocketServer
910import sys
1011from concurrent .futures import ThreadPoolExecutor
1112
13+ from aztk .error import AztkError
1214from . import helpers
1315
1416
1517def connect (hostname ,
1618 port = 22 ,
1719 username = None ,
1820 password = None ,
19- pkey = None ):
21+ pkey = None ,
22+ timeout = None ):
2023 import paramiko
2124
2225 client = paramiko .SSHClient ()
23-
2426 client .set_missing_host_key_policy (paramiko .AutoAddPolicy ())
2527
2628 if pkey :
2729 ssh_key = paramiko .RSAKey .from_private_key (file_obj = io .StringIO (pkey ))
2830 else :
2931 ssh_key = None
3032
31- client .connect (
32- hostname ,
33- port = port ,
34- username = username ,
35- password = password ,
36- pkey = ssh_key
37- )
33+ timeout = timeout or 20
34+ try :
35+ client .connect (hostname , port = port , username = username , password = password , pkey = ssh_key , timeout = timeout )
36+ except socket .timeout :
37+ raise AztkError ("Connection timed out to: {}" .format (hostname ))
3838
3939 return client
4040
4141
42- def node_exec_command (node_id , command , username , hostname , port , ssh_key = None , password = None , container_name = None ):
43- client = connect (hostname = hostname , port = port , username = username , password = password , pkey = ssh_key )
42+ def node_exec_command (node_id , command , username , hostname , port , ssh_key = None , password = None , container_name = None , timeout = None ):
43+ try :
44+ client = connect (hostname = hostname , port = port , username = username , password = password , pkey = ssh_key , timeout = timeout )
45+ except AztkError as e :
46+ return (node_id , e )
4447 if container_name :
4548 cmd = 'sudo docker exec 2>&1 -t {0} /bin/bash -c \' set -e; set -o pipefail; {1}; wait\' ' .format (container_name , command )
4649 else :
@@ -51,7 +54,7 @@ def node_exec_command(node_id, command, username, hostname, port, ssh_key=None,
5154 return (node_id , output )
5255
5356
54- async def clus_exec_command (command , username , nodes , ports = None , ssh_key = None , password = None , container_name = None ):
57+ async def clus_exec_command (command , username , nodes , ports = None , ssh_key = None , password = None , container_name = None , timeout = None ):
5558 return await asyncio .gather (
5659 * [asyncio .get_event_loop ().run_in_executor (ThreadPoolExecutor (),
5760 node_exec_command ,
@@ -62,12 +65,16 @@ async def clus_exec_command(command, username, nodes, ports=None, ssh_key=None,
6265 node_rls .port ,
6366 ssh_key ,
6467 password ,
65- container_name ) for node , node_rls in nodes ]
68+ container_name ,
69+ timeout ) for node , node_rls in nodes ]
6670 )
6771
6872
69- def copy_from_node (node_id , source_path , destination_path , username , hostname , port , ssh_key = None , password = None , container_name = None ):
70- client = connect (hostname = hostname , port = port , username = username , password = password , pkey = ssh_key )
73+ def copy_from_node (node_id , source_path , destination_path , username , hostname , port , ssh_key = None , password = None , container_name = None , timeout = None ):
74+ try :
75+ client = connect (hostname = hostname , port = port , username = username , password = password , pkey = ssh_key , timeout = timeout )
76+ except AztkError as e :
77+ return (node_id , False , e )
7178 sftp_client = client .open_sftp ()
7279 try :
7380 destination_path = os .path .join (os .path .dirname (destination_path ), node_id , os .path .basename (destination_path ))
@@ -82,8 +89,11 @@ def copy_from_node(node_id, source_path, destination_path, username, hostname, p
8289 client .close ()
8390
8491
85- def node_copy (node_id , source_path , destination_path , username , hostname , port , ssh_key = None , password = None , container_name = None ):
86- client = connect (hostname = hostname , port = port , username = username , password = password , pkey = ssh_key )
92+ def node_copy (node_id , source_path , destination_path , username , hostname , port , ssh_key = None , password = None , container_name = None , timeout = None ):
93+ try :
94+ client = connect (hostname = hostname , port = port , username = username , password = password , pkey = ssh_key , timeout = timeout )
95+ except AztkError as e :
96+ return (node_id , False , e )
8797 sftp_client = client .open_sftp ()
8898 try :
8999 if container_name :
@@ -108,7 +118,7 @@ def node_copy(node_id, source_path, destination_path, username, hostname, port,
108118 #TODO: progress bar
109119
110120
111- async def clus_copy (username , nodes , source_path , destination_path , ssh_key = None , password = None , container_name = None , get = False ):
121+ async def clus_copy (username , nodes , source_path , destination_path , ssh_key = None , password = None , container_name = None , get = False , timeout = None ):
112122 return await asyncio .gather (
113123 * [asyncio .get_event_loop ().run_in_executor (ThreadPoolExecutor (),
114124 copy_from_node if get else node_copy ,
@@ -120,5 +130,6 @@ async def clus_copy(username, nodes, source_path, destination_path, ssh_key=None
120130 node_rls .port ,
121131 ssh_key ,
122132 password ,
123- container_name ) for node , node_rls in nodes ]
133+ container_name ,
134+ timeout ) for node , node_rls in nodes ]
124135 )
0 commit comments