Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2@file
3@brief A class to help connect with a remote machine and send command line.
4"""
6import time
7import socket
8import os
9import io
10import warnings
12from pyquickhelper.loghelper import noLOG
13from pyquickhelper.filehelper import is_file_string
16class ASSHClient():
18 """
19 A simple class to access to remote machine through SSH.
20 It requires modules
21 `paramiko <http://www.paramiko.org/>`_,
22 `pycrypto <https://pypi.python.org/pypi/pycrypto/>`_,
23 `ecdsa <https://pypi.python.org/pypi/ecdsa>`_.
25 This class is used in magic command @see me remote_open.
26 On Windows, the installation of pycrypto can be tricky.
27 See `Pycrypto on Windows <http://www.xavierdupre.fr/blog/2014-10-21_nojs.html>`_.
28 Those modules are part of the `Anaconda <http://docs.continuum.io/anaconda/pkg-docs.html>`_ distribution.
29 """
31 def __init__(self, server, username, password):
32 """
33 constructor
35 @param server server
36 @param username username
37 @param password password
38 """
39 self.server = server
40 self.username = username
41 self.password = password
42 self.connection = None
43 self.session = None
45 def __str__(self):
46 """
47 usual
48 """
49 return "ASSHClient"
51 def connect(self):
52 """
53 connect
54 """
55 import paramiko # pylint: disable=C0415
56 self.connection = paramiko.SSHClient()
57 self.connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
58 self.connection.connect(
59 self.server,
60 username=self.username,
61 password=self.password)
63 def execute_command(self, command, no_exception=False, fill_stdin=None):
64 """
65 execute a command line, it raises an error
66 if there is an error
68 @param command command
69 @param no_exception if True, do not raise any exception
70 @param fill_stdin data to send on the stdin input
71 @return stdout, stderr
73 Example of commands::
75 ssh.execute_command("ls")
76 ssh.execute_command("hdfs dfs -ls")
78 """
79 stdin, stdout, stderr = self.connection.exec_command(command)
81 if fill_stdin is not None:
82 if isinstance(fill_stdin, list):
83 fill_stdin = "\n".join(stdin)
84 if isinstance(fill_stdin, str):
85 stdin.write(fill_stdin)
86 stdin.flush()
87 else:
88 raise TypeError(
89 "fill_stdin must be a string, not: {0}".format(
90 type(fill_stdin)))
92 stdin.close()
94 err = stderr.read()
95 out = stdout.read()
97 # weird...
98 if isinstance(err, str) and err.startswith("b'"):
99 err = eval(err)
100 if isinstance(out, str) and out.startswith("b'"):
101 out = eval(out)
103 if isinstance(err, bytes):
104 err = err.decode("utf-8")
105 if isinstance(out, bytes):
106 out = out.decode("utf-8")
108 if not no_exception and len(err) > 0:
109 raise Exception(
110 "unable to run: {0}\nOUT:\n{1}\nERR:\n{2}".format(
111 command,
112 out,
113 err))
115 return out, err
117 def close(self):
118 """
119 close the connection
120 """
121 self.connection.close()
122 self.connection = None
124 def upload(self, localpath, remotepath):
125 """
126 upload a file to the remote machine (not on the cluster)
128 @param localpath local file (or a list of files)
129 @param remotepath remote file
131 .. versionchanged:: 1.1
132 it can upload multiple files if localpath is a list
133 """
134 sftp = self.connection.open_sftp()
135 if isinstance(localpath, str):
136 if not os.path.exists(localpath):
137 raise FileNotFoundError(localpath)
138 sftp.put(localpath, remotepath)
139 else:
140 for f in localpath:
141 if not os.path.exists(f):
142 raise FileNotFoundError(f)
143 sftp.put(f, remotepath + "/" + os.path.split(f)[-1])
144 sftp.close()
146 def upload_cluster(self, localpath, remotepath):
147 """
148 the function directly uploads the file to the cluster, it first goes
149 to the bridge, uploads it to the cluster and deletes it from the bridge
151 @param localpath local filename (or list of files)
152 @param remotepath path to the cluster
153 @return filename
155 .. versionadded:: 1.1
156 """
157 if isinstance(localpath, str):
158 filename = os.path.split(localpath)[-1]
159 self.upload(localpath, filename)
160 self.execute_command(
161 "hdfs dfs -put {0} {1}".format(filename, remotepath))
162 self.execute_command("rm {0}".format(filename))
163 else:
164 self.upload(localpath, ".")
165 for afile in localpath:
166 filename = os.path.split(afile)[-1]
167 self.execute_command(
168 "hdfs dfs -put {0} {1}".format(filename, remotepath))
169 self.execute_command("rm {0}".format(filename))
171 return remotepath
173 def download(self, remotepath, localpath):
174 """
175 download a file from the remote machine (not on the cluster)
176 @param localpath local file
177 @param remotepath remote file (it can be a list, localpath is a folder in that case)
179 .. versionchanged:: 1.1
180 remotepath can be a list of paths
181 """
182 sftp = self.connection.open_sftp()
183 if isinstance(remotepath, str):
184 sftp.get(remotepath, localpath)
185 else:
186 for path in remotepath:
187 filename = os.path.split(path)[-1]
188 sftp.get(path, localpath + "/" + filename)
189 sftp.close()
191 def download_cluster(self, remotepath, localpath, merge=False):
192 """
193 download a file directly from the cluster to the local machine
194 @param localpath local file
195 @param remotepath remote file (it can be a list, localpath is a folder in that case)
196 @param merge True to use getmerge instead of get
198 .. versionadded:: 1.1
199 """
200 cget = "getmerge" if merge else "get"
201 if isinstance(remotepath, str):
202 filename = os.path.split(localpath)[-1]
203 self.execute_command(
204 "hdfs dfs -{2} {0} {1}".format(remotepath, filename, cget))
205 self.download(filename, localpath)
206 self.execute_command("rm {0}".format(filename))
207 else:
208 tod = []
209 for afile in remotepath:
210 filename = os.path.split(afile)[-1]
211 self.execute_command(
212 "hdfs dfs -{2} {0} {1}".format(afile, filename, cget))
213 tod.append(filename)
214 self.download(tod, localpath)
215 for afile in tod:
216 self.execute_command("rm {0}".format(afile))
218 return remotepath
220 _allowed_form = {None: None, "plain": None, "html": None}
222 @staticmethod
223 def _get_out_format(format):
224 """
225 Returns a function which converts an :epkg:`ANSI` string into a
226 different format.
228 @param format string
229 @return function
230 """
231 if format not in ASSHClient._allowed_form:
232 raise KeyError(
233 "unexpected format, it should be in " +
234 ",".join(
235 ASSHClient._allowed_form.keys()))
236 func = ASSHClient._allowed_form[format]
237 if func is None:
238 if format is None:
239 def idfunc(s):
240 "local function"
241 return s
242 func = idfunc
243 elif format == "plain":
244 import ansiconv # pylint: disable=C0415
246 def convert_plain(s):
247 "local function"
248 return ansiconv.to_plain(s)
249 func = convert_plain
250 elif format == "html":
251 from ansi2html import Ansi2HTMLConverter
252 conv = Ansi2HTMLConverter()
254 def convert_html(s):
255 "local function"
256 return conv.convert(s)
257 func = convert_html
258 ASSHClient._allowed_form[format] = func
259 return func
261 def open_session(self, no_exception=False, timeout=1.0,
262 add_eol=True, prompts=("~$", ">>>"), out_format=None):
263 """
264 Opens a session with method
265 `invoke_shell <http://docs.paramiko.org/en/latest/api/client.html?highlight=invoke_shell#paramiko.client.SSHClient.invoke_shell>`_.
267 @param no_exception if True, do not raise any exception in case of error
268 @param timeout timeout in s
269 @param add_eol if True, the function will add a EOL to the sent command if it does not have one
270 @param prompts if function terminates if the output ends by one of those strings.
271 @param out_format None, plain, html
273 .. exref::
274 :title: How to open a remote shell?
275 :tag: Hadoop
277 ::
279 ssh = ASSHClient( "<server>",
280 "<login>",
281 "<password>")
282 ssh.connect()
283 out = ssh.send_recv_session("ls")
284 print( ssh.send_recv_session("python") )
285 print( ssh.send_recv_session("print('3')") )
286 print( ssh.send_recv_session("import sys\\nsys.executable") )
287 print( ssh.send_recv_session("sys.exit()") )
288 print( ssh.send_recv_session(None) )
289 ssh.close_session()
290 ssh.close()
292 The notebook :ref:`exampleofsshclientcommunicationrst` illustrates
293 the output of these instructions.
294 """
295 if self.connection is None:
296 raise Exception("No open connection.")
297 if self.session is not None:
298 raise Exception(
299 "A session is already open. Cannot open a second one.")
300 if out_format not in ASSHClient._allowed_form:
301 raise KeyError(
302 "unexpected format, it should be in {0}".format(
303 ";".join(
304 str(_) for _ in ASSHClient._allowed_form.keys())))
306 self.session = self.connection.invoke_shell(width=300, height=1000)
307 self.session_params = {
308 "no_exception": no_exception,
309 "timeout": timeout,
310 "add_eol": add_eol,
311 "prompts": [] if prompts is None else prompts,
312 "out_format": out_format,
313 "out_func": ASSHClient._get_out_format(out_format)
314 }
316 self.session.settimeout(timeout)
317 return self.session
319 def close_session(self):
320 """
321 close a session
322 """
323 if self.session is None:
324 raise Exception("No open session. Cannot close anything.")
326 self.session.close()
327 self.session = None
329 def send_recv_session(self, fillin):
330 """
331 Send something through a session,
332 the function is supposed to return when the execute of the given command is done,
333 but this is quite difficult to detect without knowing what exactly was send.
335 So we add a timeout just to tell the function it has to return even if nothing
336 tells the command has finished. It fillin is None, the function will just
337 listen to the output.
339 @param fillin sent to stdin
340 @return stdout
342 The output contains
343 `escape codes <http://ascii-table.com/ansi-escape-sequences-vt-100.php>`_.
344 They can be converted to plain text or HTML
345 by using the module `ansiconv <http://pythonhosted.org/ansiconv/>`_
346 and `ansi2html <https://github.com/ralphbean/ansi2html/>`_.
347 This can be specified when opening the session.
348 """
349 prompts = self.session_params["prompts"]
350 timeout = self.session_params["timeout"]
351 add_eol = self.session_params["add_eol"]
352 func = self.session_params["out_func"]
354 if fillin is not None:
355 self.session.send(fillin.encode("utf-8"))
356 if add_eol and not fillin.endswith('\n'):
357 self.session.send("\n".encode("utf-8"))
359 buff = ''
360 begin = time.perf_counter()
361 while True:
362 try:
363 resp = self.session.recv(9999)
364 except socket.timeout:
365 resp = b""
366 dec = resp.decode("unicode_escape")
367 buff += dec
368 for p in prompts:
369 if buff.endswith(p):
370 break
371 if time.perf_counter() - begin > timeout:
372 break
374 return func(buff.replace("\r", ""))
376 @staticmethod
377 def parse_lsout(out, local_schema=True):
378 """
379 parses the output of a command ls
381 @param out output
382 @param local_schema schema for the bridge or the cluster (False)
383 @return DataFrame
385 .. versionadded:: 1.1
386 """
387 import pandas # pylint: disable=C0415
388 if local_schema:
389 names = ["attributes", "code", "alias", "folder",
390 "size", "unit", "name"]
391 else:
392 names = ["attributes", "code", "alias", "folder",
393 "size", "date", "time", "name"]
394 kout = out
395 out = out.replace("\r", "").split("\n")
396 out = [_ for _ in out if len(_.split()) > 3]
397 if len(out) == 0:
398 df = pandas.DataFrame(columns=names)
399 return df
401 try:
402 out_ = [_.split() for _ in out]
403 if len(out_) > 0 and len(out_[0]) != len(names):
404 if names[5] == "date" and len(out_[0]) == len(names) + 1:
405 # we merge 2 columns
406 out_ = [_[:5] + [" ".join(_[5:7])] + _[7:] for _ in out_]
407 df = pandas.DataFrame(data=out_, columns=names)
408 except (AssertionError, ValueError) as e:
409 out = "\n".join(out)
410 buf = io.StringIO(out)
411 try:
412 df = pandas.read_fwf(buf, names=names, index=False)
413 except ValueError as e:
414 raise ValueError(
415 "unable to parse output:\nSCHEMA:\n{1}\nOUT:\n{0}".format(kout, ",".join(names))) from e
417 df["isdir"] = df.apply(lambda r: r["attributes"][0] == "d", axis=1)
418 return df
420 def ls(self, path):
421 """
422 return the content of a folder on the bridge as a DataFrame
424 @param path path on the bridge
425 @return DataFrame
427 .. versionadded:: 1.1
428 """
429 out, err = self.execute_command("ls -l " + path)
430 if len(err) > 0:
431 raise Exception("unable to execute ls " + path + "\nERR:\n" + err)
432 return ASSHClient.parse_lsout(out)
434 def dfs_ls(self, path):
435 """
436 return the content of a folder on the cluster as a DataFrame
438 @param path path on the cluster
439 @return DataFrame
441 .. versionadded:: 1.1
442 """
443 out, err = self.execute_command("hdfs dfs -ls " + path)
444 if len(err) > 0:
445 raise Exception(
446 "unable to execute hdfs dfs -ls " +
447 path +
448 "\nERR:\n" +
449 err)
450 return ASSHClient.parse_lsout(out, False)
452 def exists(self, path):
453 """
454 tells if a file exists on the bridge
456 @param path path
457 @return boolean
459 .. versionadded:: 1.1
460 """
461 try:
462 df = self.ls(path)
463 except Exception as e:
464 if "No such file or directory" in str(e):
465 return False
466 ex = df[df.name == path]
467 return len(ex) > 0
469 def dfs_exists(self, path):
470 """
471 tells if a file exists on the cluster
473 @param path path
474 @return boolean
476 .. versionadded:: 1.1
477 """
478 try:
479 df = self.dfs_ls(path)
480 except Exception as e:
481 if "No such file or directory" in str(e):
482 return False
483 else:
484 raise e
485 if len(df) == 0:
486 # it is a folder
487 return True
488 ex = df[df.name == path]
489 if len(ex) > 0:
490 return True
491 ex = df[df.apply(lambda r: r["name"].startswith(path + "/"), axis=1)]
492 if len(ex) > 0:
493 return True
494 return False
496 def dfs_mkdir(self, path):
497 """
498 creates a directory on the cluster
500 @param path path
502 .. versionadded:: 1.1
503 """
504 return self.execute_command("hdfs dfs -mkdir " + path)
506 def dfs_rm(self, path, recursive=False):
507 """
508 removes a file on the cluster
510 @param path path
511 @param recursive boolean
513 .. versionadded:: 1.1
514 """
515 cmd = "hdfs dfs -rm "
516 if recursive:
517 cmd += "-r "
518 out, err = self.execute_command(cmd + path, no_exception=True)
519 if out.startswith("Moved"):
520 return out, err
521 else:
522 raise Exception(
523 "unable to remove " +
524 path +
525 "\nOUT\n" +
526 out +
527 "\nERR:\n" +
528 err)
530 @staticmethod
531 def build_command_line_parameters(params, command_name="-param"):
532 """
533 builds a string for ``pig`` based on the parameters in params
535 @param params dictionary
536 @param command_name ``-param`` or ``-hiveconf``
537 @return string
539 .. versionadded:: 1.1
540 """
541 if params is None:
542 return ""
543 res = []
544 for k, v in sorted(params.items()):
545 if '"' in v:
546 v = v.replace('"', '\\"')
547 one = '{2} {0}="{1}"'.format(k, v, command_name)
548 res.append(one)
549 return " ".join(res)
551 def pig_submit(self, pig_file,
552 dependencies=None,
553 params=None,
554 redirection="redirection.pig",
555 local=False,
556 stop_on_failure=False,
557 check=False,
558 no_exception=True,
559 fLOG=noLOG):
560 """
561 Submits a :epkg:`PIG` script, it first upload the script
562 to the default folder and submits it.
564 @param pig_file pig script (local)
565 @param dependencies others files to upload (still in the default folder)
566 @param params parameters to send to the job
567 @param redirection string empty or not
568 @param local local run or not (option `-x local <https://cwiki.apache.org/confluence/display/PIG/PigTutorial>`_)
569 (in that case, redirection will be empty)
570 @param stop_on_failure if True, add option ``-stop_on_failure`` on the command line
571 @param check if True, add option ``-check`` (in that case, redirection will be empty)
572 @param no_exception sent to @see me execute_command
573 @param fLOG logging function
574 @return out, err from @see me execute_command
576 If *redirection* is not empty, the job is submitted but
577 the function returns after the standard output and error were
578 redirected to ``redirection.out`` and ``redirection.err``.
580 The first file will contain the results of commands
581 `DESCRIBE <http://pig.apache.org/docs/r0.14.0/test.html#describe>`_
582 `DUMP <http://pig.apache.org/docs/r0.14.0/test.html#dump>`_,
583 `EXPLAIN <http://pig.apache.org/docs/r0.14.0/test.html#explain>`_.
584 The standard error receives logs and exceptions.
586 The function executes the command line::
588 pig -execute -f <filename>
590 With redirection::
592 pig -execute -f <filename> 2> redirection.pig.err 1> redirection.pig.out &
594 .. versionadded:: 1.1
595 """
596 dest = os.path.split(pig_file)[-1]
597 self.upload(pig_file, dest)
598 if dependencies is not None:
599 for py in dependencies:
600 self.upload(py, os.path.split(py)[-1])
602 slocal = " -x local" if local else ""
603 sstop_on_failure = " -stop_on_failure" if stop_on_failure else ""
604 scheck = " -check" if check else ""
606 if local or check:
607 redirection = None
609 if params is not None:
610 sparams = ASSHClient.build_command_line_parameters(params)
611 if len(sparams) > 0:
612 sparams = " " + sparams
613 else:
614 sparams = ""
616 if redirection is None:
617 cmd = "pig{0}{1}{2} -execute -f {3}{4}".format(
618 slocal,
619 sstop_on_failure,
620 scheck,
621 dest,
622 sparams)
623 else:
624 cmd = "pig{2}{3}{4} -execute -f {0}{5} 2> {1}.err 1> {1}.out &".format(
625 dest,
626 redirection,
627 slocal,
628 sstop_on_failure,
629 scheck,
630 sparams)
632 if isinstance(cmd, list):
633 raise TypeError("this should not happen:" + str(cmd))
635 fLOG("[pig_submit]:", cmd)
636 out, err = self.execute_command(cmd, no_exception=no_exception)
637 return out, err
639 def hive_submit(self, hive_file_or_query,
640 params=None,
641 redirection="redirection.hive",
642 no_exception=True,
643 fLOG=noLOG):
644 """
645 submits a PIG script, it first upload the script
646 to the default folder and submit it
648 @param hive_file_or_query pig script (local)
649 @param params parameters to send to the job
650 @param redirection string empty or not
651 @param no_exception sent to @see me execute_command
652 @param fLOG logging function
653 @return out, err from @see me execute_command
655 If *redirection* is not empty, the job is submitted but
656 the function returns after the standard output and error were
657 redirected to ``redirection.hive.out`` and ``redirection.hive.err``.
659 The function executes the command line::
661 hive -f <filename>
663 Or::
665 hive -e <query>
667 With redirection::
669 hive -execute -f <filename> 2> redirection.hive.err 1> redirection.hive.out &
671 If there is no redirection, the function
672 waits and return the output.
674 .. exref::
675 :title: Submit a HIVE query
676 :tag: Hadoop
678 ::
680 client = ASSHClient()
682 hive_sql = '''
683 DROP TABLE IF EXISTS bikes20;
684 CREATE TABLE bikes20 (sjson STRING);
685 LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20;
686 SELECT * FROM bikes20 LIMIT 10;
687 '''.replace("__USERNAME__", self.client.username)
689 out,err = client.hive_submit(hive_sql, redirection=None)
691 .. versionadded:: 1.1
692 """
693 if is_file_string(hive_file_or_query) and os.path.exists(hive_file_or_query):
694 dest = os.path.split(hive_file_or_query)[-1]
695 self.upload(hive_file_or_query, dest)
696 command = "-f"
697 else:
698 command = "-e"
699 dest = hive_file_or_query.replace(
700 "\n", " ").replace("\r", "").replace("\t", " ")
701 dest = dest.replace("'", "\\'")
702 dest = "'{}'".format(dest.strip())
704 if params is not None:
705 sparams = ASSHClient.build_command_line_parameters(
706 params, "-hiveconf")
707 if len(sparams) > 0:
708 sparams = " " + sparams
709 else:
710 sparams = ""
712 if redirection is None:
713 cmd = "hive {0} {1}{2}".format(
714 command,
715 dest,
716 sparams)
717 else:
718 cmd = "hive {0} {1}{2} 2> {3}.err 1> {3}.out &".format(
719 command,
720 dest,
721 sparams,
722 redirection)
724 if isinstance(cmd, list):
725 raise TypeError("this should not happen:" + str(cmd))
727 warnings.warn("Hive submission is not tested. It will probably fail.")
729 fLOG("[hive_submit]:", cmd)
730 out, err = self.execute_command(cmd, no_exception=no_exception)
731 return out, err