Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2@file 

3@brief A class to help connect with a remote machine and send command line. 

4""" 

5 

6import time 

7import socket 

8import os 

9import io 

10import warnings 

11 

12from pyquickhelper.loghelper import noLOG 

13from pyquickhelper.filehelper import is_file_string 

14 

15 

16class ASSHClient(): 

17 

18 """ 

19 A simple class to access to remote machine through SSH. 

20 It requires modules 

21 `paramiko <http://www.paramiko.org/>`_, 

22 `pycrypto <https://pypi.python.org/pypi/pycrypto/>`_, 

23 `ecdsa <https://pypi.python.org/pypi/ecdsa>`_. 

24 

25 This class is used in magic command @see me remote_open. 

26 On Windows, the installation of pycrypto can be tricky. 

27 See `Pycrypto on Windows <http://www.xavierdupre.fr/blog/2014-10-21_nojs.html>`_. 

28 Those modules are part of the `Anaconda <http://docs.continuum.io/anaconda/pkg-docs.html>`_ distribution. 

29 """ 

30 

31 def __init__(self, server, username, password): 

32 """ 

33 constructor 

34 

35 @param server server 

36 @param username username 

37 @param password password 

38 """ 

39 self.server = server 

40 self.username = username 

41 self.password = password 

42 self.connection = None 

43 self.session = None 

44 

45 def __str__(self): 

46 """ 

47 usual 

48 """ 

49 return "ASSHClient" 

50 

51 def connect(self): 

52 """ 

53 connect 

54 """ 

55 import paramiko # pylint: disable=C0415 

56 self.connection = paramiko.SSHClient() 

57 self.connection.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 

58 self.connection.connect( 

59 self.server, 

60 username=self.username, 

61 password=self.password) 

62 

63 def execute_command(self, command, no_exception=False, fill_stdin=None): 

64 """ 

65 execute a command line, it raises an error 

66 if there is an error 

67 

68 @param command command 

69 @param no_exception if True, do not raise any exception 

70 @param fill_stdin data to send on the stdin input 

71 @return stdout, stderr 

72 

73 Example of commands:: 

74 

75 ssh.execute_command("ls") 

76 ssh.execute_command("hdfs dfs -ls") 

77 

78 """ 

79 stdin, stdout, stderr = self.connection.exec_command(command) 

80 

81 if fill_stdin is not None: 

82 if isinstance(fill_stdin, list): 

83 fill_stdin = "\n".join(stdin) 

84 if isinstance(fill_stdin, str): 

85 stdin.write(fill_stdin) 

86 stdin.flush() 

87 else: 

88 raise TypeError( 

89 "fill_stdin must be a string, not: {0}".format( 

90 type(fill_stdin))) 

91 

92 stdin.close() 

93 

94 err = stderr.read() 

95 out = stdout.read() 

96 

97 # weird... 

98 if isinstance(err, str) and err.startswith("b'"): 

99 err = eval(err) 

100 if isinstance(out, str) and out.startswith("b'"): 

101 out = eval(out) 

102 

103 if isinstance(err, bytes): 

104 err = err.decode("utf-8") 

105 if isinstance(out, bytes): 

106 out = out.decode("utf-8") 

107 

108 if not no_exception and len(err) > 0: 

109 raise Exception( 

110 "unable to run: {0}\nOUT:\n{1}\nERR:\n{2}".format( 

111 command, 

112 out, 

113 err)) 

114 

115 return out, err 

116 

117 def close(self): 

118 """ 

119 close the connection 

120 """ 

121 self.connection.close() 

122 self.connection = None 

123 

124 def upload(self, localpath, remotepath): 

125 """ 

126 upload a file to the remote machine (not on the cluster) 

127 

128 @param localpath local file (or a list of files) 

129 @param remotepath remote file 

130 

131 .. versionchanged:: 1.1 

132 it can upload multiple files if localpath is a list 

133 """ 

134 sftp = self.connection.open_sftp() 

135 if isinstance(localpath, str): 

136 if not os.path.exists(localpath): 

137 raise FileNotFoundError(localpath) 

138 sftp.put(localpath, remotepath) 

139 else: 

140 for f in localpath: 

141 if not os.path.exists(f): 

142 raise FileNotFoundError(f) 

143 sftp.put(f, remotepath + "/" + os.path.split(f)[-1]) 

144 sftp.close() 

145 

146 def upload_cluster(self, localpath, remotepath): 

147 """ 

148 the function directly uploads the file to the cluster, it first goes 

149 to the bridge, uploads it to the cluster and deletes it from the bridge 

150 

151 @param localpath local filename (or list of files) 

152 @param remotepath path to the cluster 

153 @return filename 

154 

155 .. versionadded:: 1.1 

156 """ 

157 if isinstance(localpath, str): 

158 filename = os.path.split(localpath)[-1] 

159 self.upload(localpath, filename) 

160 self.execute_command( 

161 "hdfs dfs -put {0} {1}".format(filename, remotepath)) 

162 self.execute_command("rm {0}".format(filename)) 

163 else: 

164 self.upload(localpath, ".") 

165 for afile in localpath: 

166 filename = os.path.split(afile)[-1] 

167 self.execute_command( 

168 "hdfs dfs -put {0} {1}".format(filename, remotepath)) 

169 self.execute_command("rm {0}".format(filename)) 

170 

171 return remotepath 

172 

173 def download(self, remotepath, localpath): 

174 """ 

175 download a file from the remote machine (not on the cluster) 

176 @param localpath local file 

177 @param remotepath remote file (it can be a list, localpath is a folder in that case) 

178 

179 .. versionchanged:: 1.1 

180 remotepath can be a list of paths 

181 """ 

182 sftp = self.connection.open_sftp() 

183 if isinstance(remotepath, str): 

184 sftp.get(remotepath, localpath) 

185 else: 

186 for path in remotepath: 

187 filename = os.path.split(path)[-1] 

188 sftp.get(path, localpath + "/" + filename) 

189 sftp.close() 

190 

191 def download_cluster(self, remotepath, localpath, merge=False): 

192 """ 

193 download a file directly from the cluster to the local machine 

194 @param localpath local file 

195 @param remotepath remote file (it can be a list, localpath is a folder in that case) 

196 @param merge True to use getmerge instead of get 

197 

198 .. versionadded:: 1.1 

199 """ 

200 cget = "getmerge" if merge else "get" 

201 if isinstance(remotepath, str): 

202 filename = os.path.split(localpath)[-1] 

203 self.execute_command( 

204 "hdfs dfs -{2} {0} {1}".format(remotepath, filename, cget)) 

205 self.download(filename, localpath) 

206 self.execute_command("rm {0}".format(filename)) 

207 else: 

208 tod = [] 

209 for afile in remotepath: 

210 filename = os.path.split(afile)[-1] 

211 self.execute_command( 

212 "hdfs dfs -{2} {0} {1}".format(afile, filename, cget)) 

213 tod.append(filename) 

214 self.download(tod, localpath) 

215 for afile in tod: 

216 self.execute_command("rm {0}".format(afile)) 

217 

218 return remotepath 

219 

220 _allowed_form = {None: None, "plain": None, "html": None} 

221 

222 @staticmethod 

223 def _get_out_format(format): 

224 """ 

225 Returns a function which converts an :epkg:`ANSI` string into a 

226 different format. 

227 

228 @param format string 

229 @return function 

230 """ 

231 if format not in ASSHClient._allowed_form: 

232 raise KeyError( 

233 "unexpected format, it should be in " + 

234 ",".join( 

235 ASSHClient._allowed_form.keys())) 

236 func = ASSHClient._allowed_form[format] 

237 if func is None: 

238 if format is None: 

239 def idfunc(s): 

240 "local function" 

241 return s 

242 func = idfunc 

243 elif format == "plain": 

244 import ansiconv # pylint: disable=C0415 

245 

246 def convert_plain(s): 

247 "local function" 

248 return ansiconv.to_plain(s) 

249 func = convert_plain 

250 elif format == "html": 

251 from ansi2html import Ansi2HTMLConverter 

252 conv = Ansi2HTMLConverter() 

253 

254 def convert_html(s): 

255 "local function" 

256 return conv.convert(s) 

257 func = convert_html 

258 ASSHClient._allowed_form[format] = func 

259 return func 

260 

261 def open_session(self, no_exception=False, timeout=1.0, 

262 add_eol=True, prompts=("~$", ">>>"), out_format=None): 

263 """ 

264 Opens a session with method 

265 `invoke_shell <http://docs.paramiko.org/en/latest/api/client.html?highlight=invoke_shell#paramiko.client.SSHClient.invoke_shell>`_. 

266 

267 @param no_exception if True, do not raise any exception in case of error 

268 @param timeout timeout in s 

269 @param add_eol if True, the function will add a EOL to the sent command if it does not have one 

270 @param prompts if function terminates if the output ends by one of those strings. 

271 @param out_format None, plain, html 

272 

273 .. exref:: 

274 :title: How to open a remote shell? 

275 :tag: Hadoop 

276 

277 :: 

278 

279 ssh = ASSHClient( "<server>", 

280 "<login>", 

281 "<password>") 

282 ssh.connect() 

283 out = ssh.send_recv_session("ls") 

284 print( ssh.send_recv_session("python") ) 

285 print( ssh.send_recv_session("print('3')") ) 

286 print( ssh.send_recv_session("import sys\\nsys.executable") ) 

287 print( ssh.send_recv_session("sys.exit()") ) 

288 print( ssh.send_recv_session(None) ) 

289 ssh.close_session() 

290 ssh.close() 

291 

292 The notebook :ref:`exampleofsshclientcommunicationrst` illustrates 

293 the output of these instructions. 

294 """ 

295 if self.connection is None: 

296 raise Exception("No open connection.") 

297 if self.session is not None: 

298 raise Exception( 

299 "A session is already open. Cannot open a second one.") 

300 if out_format not in ASSHClient._allowed_form: 

301 raise KeyError( 

302 "unexpected format, it should be in {0}".format( 

303 ";".join( 

304 str(_) for _ in ASSHClient._allowed_form.keys()))) 

305 

306 self.session = self.connection.invoke_shell(width=300, height=1000) 

307 self.session_params = { 

308 "no_exception": no_exception, 

309 "timeout": timeout, 

310 "add_eol": add_eol, 

311 "prompts": [] if prompts is None else prompts, 

312 "out_format": out_format, 

313 "out_func": ASSHClient._get_out_format(out_format) 

314 } 

315 

316 self.session.settimeout(timeout) 

317 return self.session 

318 

319 def close_session(self): 

320 """ 

321 close a session 

322 """ 

323 if self.session is None: 

324 raise Exception("No open session. Cannot close anything.") 

325 

326 self.session.close() 

327 self.session = None 

328 

329 def send_recv_session(self, fillin): 

330 """ 

331 Send something through a session, 

332 the function is supposed to return when the execute of the given command is done, 

333 but this is quite difficult to detect without knowing what exactly was send. 

334 

335 So we add a timeout just to tell the function it has to return even if nothing 

336 tells the command has finished. It fillin is None, the function will just 

337 listen to the output. 

338 

339 @param fillin sent to stdin 

340 @return stdout 

341 

342 The output contains 

343 `escape codes <http://ascii-table.com/ansi-escape-sequences-vt-100.php>`_. 

344 They can be converted to plain text or HTML 

345 by using the module `ansiconv <http://pythonhosted.org/ansiconv/>`_ 

346 and `ansi2html <https://github.com/ralphbean/ansi2html/>`_. 

347 This can be specified when opening the session. 

348 """ 

349 prompts = self.session_params["prompts"] 

350 timeout = self.session_params["timeout"] 

351 add_eol = self.session_params["add_eol"] 

352 func = self.session_params["out_func"] 

353 

354 if fillin is not None: 

355 self.session.send(fillin.encode("utf-8")) 

356 if add_eol and not fillin.endswith('\n'): 

357 self.session.send("\n".encode("utf-8")) 

358 

359 buff = '' 

360 begin = time.perf_counter() 

361 while True: 

362 try: 

363 resp = self.session.recv(9999) 

364 except socket.timeout: 

365 resp = b"" 

366 dec = resp.decode("unicode_escape") 

367 buff += dec 

368 for p in prompts: 

369 if buff.endswith(p): 

370 break 

371 if time.perf_counter() - begin > timeout: 

372 break 

373 

374 return func(buff.replace("\r", "")) 

375 

376 @staticmethod 

377 def parse_lsout(out, local_schema=True): 

378 """ 

379 parses the output of a command ls 

380 

381 @param out output 

382 @param local_schema schema for the bridge or the cluster (False) 

383 @return DataFrame 

384 

385 .. versionadded:: 1.1 

386 """ 

387 import pandas # pylint: disable=C0415 

388 if local_schema: 

389 names = ["attributes", "code", "alias", "folder", 

390 "size", "unit", "name"] 

391 else: 

392 names = ["attributes", "code", "alias", "folder", 

393 "size", "date", "time", "name"] 

394 kout = out 

395 out = out.replace("\r", "").split("\n") 

396 out = [_ for _ in out if len(_.split()) > 3] 

397 if len(out) == 0: 

398 df = pandas.DataFrame(columns=names) 

399 return df 

400 

401 try: 

402 out_ = [_.split() for _ in out] 

403 if len(out_) > 0 and len(out_[0]) != len(names): 

404 if names[5] == "date" and len(out_[0]) == len(names) + 1: 

405 # we merge 2 columns 

406 out_ = [_[:5] + [" ".join(_[5:7])] + _[7:] for _ in out_] 

407 df = pandas.DataFrame(data=out_, columns=names) 

408 except (AssertionError, ValueError) as e: 

409 out = "\n".join(out) 

410 buf = io.StringIO(out) 

411 try: 

412 df = pandas.read_fwf(buf, names=names, index=False) 

413 except ValueError as e: 

414 raise ValueError( 

415 "unable to parse output:\nSCHEMA:\n{1}\nOUT:\n{0}".format(kout, ",".join(names))) from e 

416 

417 df["isdir"] = df.apply(lambda r: r["attributes"][0] == "d", axis=1) 

418 return df 

419 

420 def ls(self, path): 

421 """ 

422 return the content of a folder on the bridge as a DataFrame 

423 

424 @param path path on the bridge 

425 @return DataFrame 

426 

427 .. versionadded:: 1.1 

428 """ 

429 out, err = self.execute_command("ls -l " + path) 

430 if len(err) > 0: 

431 raise Exception("unable to execute ls " + path + "\nERR:\n" + err) 

432 return ASSHClient.parse_lsout(out) 

433 

434 def dfs_ls(self, path): 

435 """ 

436 return the content of a folder on the cluster as a DataFrame 

437 

438 @param path path on the cluster 

439 @return DataFrame 

440 

441 .. versionadded:: 1.1 

442 """ 

443 out, err = self.execute_command("hdfs dfs -ls " + path) 

444 if len(err) > 0: 

445 raise Exception( 

446 "unable to execute hdfs dfs -ls " + 

447 path + 

448 "\nERR:\n" + 

449 err) 

450 return ASSHClient.parse_lsout(out, False) 

451 

452 def exists(self, path): 

453 """ 

454 tells if a file exists on the bridge 

455 

456 @param path path 

457 @return boolean 

458 

459 .. versionadded:: 1.1 

460 """ 

461 try: 

462 df = self.ls(path) 

463 except Exception as e: 

464 if "No such file or directory" in str(e): 

465 return False 

466 ex = df[df.name == path] 

467 return len(ex) > 0 

468 

469 def dfs_exists(self, path): 

470 """ 

471 tells if a file exists on the cluster 

472 

473 @param path path 

474 @return boolean 

475 

476 .. versionadded:: 1.1 

477 """ 

478 try: 

479 df = self.dfs_ls(path) 

480 except Exception as e: 

481 if "No such file or directory" in str(e): 

482 return False 

483 else: 

484 raise e 

485 if len(df) == 0: 

486 # it is a folder 

487 return True 

488 ex = df[df.name == path] 

489 if len(ex) > 0: 

490 return True 

491 ex = df[df.apply(lambda r: r["name"].startswith(path + "/"), axis=1)] 

492 if len(ex) > 0: 

493 return True 

494 return False 

495 

496 def dfs_mkdir(self, path): 

497 """ 

498 creates a directory on the cluster 

499 

500 @param path path 

501 

502 .. versionadded:: 1.1 

503 """ 

504 return self.execute_command("hdfs dfs -mkdir " + path) 

505 

506 def dfs_rm(self, path, recursive=False): 

507 """ 

508 removes a file on the cluster 

509 

510 @param path path 

511 @param recursive boolean 

512 

513 .. versionadded:: 1.1 

514 """ 

515 cmd = "hdfs dfs -rm " 

516 if recursive: 

517 cmd += "-r " 

518 out, err = self.execute_command(cmd + path, no_exception=True) 

519 if out.startswith("Moved"): 

520 return out, err 

521 else: 

522 raise Exception( 

523 "unable to remove " + 

524 path + 

525 "\nOUT\n" + 

526 out + 

527 "\nERR:\n" + 

528 err) 

529 

530 @staticmethod 

531 def build_command_line_parameters(params, command_name="-param"): 

532 """ 

533 builds a string for ``pig`` based on the parameters in params 

534 

535 @param params dictionary 

536 @param command_name ``-param`` or ``-hiveconf`` 

537 @return string 

538 

539 .. versionadded:: 1.1 

540 """ 

541 if params is None: 

542 return "" 

543 res = [] 

544 for k, v in sorted(params.items()): 

545 if '"' in v: 

546 v = v.replace('"', '\\"') 

547 one = '{2} {0}="{1}"'.format(k, v, command_name) 

548 res.append(one) 

549 return " ".join(res) 

550 

551 def pig_submit(self, pig_file, 

552 dependencies=None, 

553 params=None, 

554 redirection="redirection.pig", 

555 local=False, 

556 stop_on_failure=False, 

557 check=False, 

558 no_exception=True, 

559 fLOG=noLOG): 

560 """ 

561 Submits a :epkg:`PIG` script, it first upload the script 

562 to the default folder and submits it. 

563 

564 @param pig_file pig script (local) 

565 @param dependencies others files to upload (still in the default folder) 

566 @param params parameters to send to the job 

567 @param redirection string empty or not 

568 @param local local run or not (option `-x local <https://cwiki.apache.org/confluence/display/PIG/PigTutorial>`_) 

569 (in that case, redirection will be empty) 

570 @param stop_on_failure if True, add option ``-stop_on_failure`` on the command line 

571 @param check if True, add option ``-check`` (in that case, redirection will be empty) 

572 @param no_exception sent to @see me execute_command 

573 @param fLOG logging function 

574 @return out, err from @see me execute_command 

575 

576 If *redirection* is not empty, the job is submitted but 

577 the function returns after the standard output and error were 

578 redirected to ``redirection.out`` and ``redirection.err``. 

579 

580 The first file will contain the results of commands 

581 `DESCRIBE <http://pig.apache.org/docs/r0.14.0/test.html#describe>`_ 

582 `DUMP <http://pig.apache.org/docs/r0.14.0/test.html#dump>`_, 

583 `EXPLAIN <http://pig.apache.org/docs/r0.14.0/test.html#explain>`_. 

584 The standard error receives logs and exceptions. 

585 

586 The function executes the command line:: 

587 

588 pig -execute -f <filename> 

589 

590 With redirection:: 

591 

592 pig -execute -f <filename> 2> redirection.pig.err 1> redirection.pig.out & 

593 

594 .. versionadded:: 1.1 

595 """ 

596 dest = os.path.split(pig_file)[-1] 

597 self.upload(pig_file, dest) 

598 if dependencies is not None: 

599 for py in dependencies: 

600 self.upload(py, os.path.split(py)[-1]) 

601 

602 slocal = " -x local" if local else "" 

603 sstop_on_failure = " -stop_on_failure" if stop_on_failure else "" 

604 scheck = " -check" if check else "" 

605 

606 if local or check: 

607 redirection = None 

608 

609 if params is not None: 

610 sparams = ASSHClient.build_command_line_parameters(params) 

611 if len(sparams) > 0: 

612 sparams = " " + sparams 

613 else: 

614 sparams = "" 

615 

616 if redirection is None: 

617 cmd = "pig{0}{1}{2} -execute -f {3}{4}".format( 

618 slocal, 

619 sstop_on_failure, 

620 scheck, 

621 dest, 

622 sparams) 

623 else: 

624 cmd = "pig{2}{3}{4} -execute -f {0}{5} 2> {1}.err 1> {1}.out &".format( 

625 dest, 

626 redirection, 

627 slocal, 

628 sstop_on_failure, 

629 scheck, 

630 sparams) 

631 

632 if isinstance(cmd, list): 

633 raise TypeError("this should not happen:" + str(cmd)) 

634 

635 fLOG("[pig_submit]:", cmd) 

636 out, err = self.execute_command(cmd, no_exception=no_exception) 

637 return out, err 

638 

639 def hive_submit(self, hive_file_or_query, 

640 params=None, 

641 redirection="redirection.hive", 

642 no_exception=True, 

643 fLOG=noLOG): 

644 """ 

645 submits a PIG script, it first upload the script 

646 to the default folder and submit it 

647 

648 @param hive_file_or_query pig script (local) 

649 @param params parameters to send to the job 

650 @param redirection string empty or not 

651 @param no_exception sent to @see me execute_command 

652 @param fLOG logging function 

653 @return out, err from @see me execute_command 

654 

655 If *redirection* is not empty, the job is submitted but 

656 the function returns after the standard output and error were 

657 redirected to ``redirection.hive.out`` and ``redirection.hive.err``. 

658 

659 The function executes the command line:: 

660 

661 hive -f <filename> 

662 

663 Or:: 

664 

665 hive -e <query> 

666 

667 With redirection:: 

668 

669 hive -execute -f <filename> 2> redirection.hive.err 1> redirection.hive.out & 

670 

671 If there is no redirection, the function 

672 waits and return the output. 

673 

674 .. exref:: 

675 :title: Submit a HIVE query 

676 :tag: Hadoop 

677 

678 :: 

679 

680 client = ASSHClient() 

681 

682 hive_sql = ''' 

683 DROP TABLE IF EXISTS bikes20; 

684 CREATE TABLE bikes20 (sjson STRING); 

685 LOAD DATA INPATH "/user/__USERNAME__/unittest2/paris*.txt" INTO TABLE bikes20; 

686 SELECT * FROM bikes20 LIMIT 10; 

687 '''.replace("__USERNAME__", self.client.username) 

688 

689 out,err = client.hive_submit(hive_sql, redirection=None) 

690 

691 .. versionadded:: 1.1 

692 """ 

693 if is_file_string(hive_file_or_query) and os.path.exists(hive_file_or_query): 

694 dest = os.path.split(hive_file_or_query)[-1] 

695 self.upload(hive_file_or_query, dest) 

696 command = "-f" 

697 else: 

698 command = "-e" 

699 dest = hive_file_or_query.replace( 

700 "\n", " ").replace("\r", "").replace("\t", " ") 

701 dest = dest.replace("'", "\\'") 

702 dest = "'{}'".format(dest.strip()) 

703 

704 if params is not None: 

705 sparams = ASSHClient.build_command_line_parameters( 

706 params, "-hiveconf") 

707 if len(sparams) > 0: 

708 sparams = " " + sparams 

709 else: 

710 sparams = "" 

711 

712 if redirection is None: 

713 cmd = "hive {0} {1}{2}".format( 

714 command, 

715 dest, 

716 sparams) 

717 else: 

718 cmd = "hive {0} {1}{2} 2> {3}.err 1> {3}.out &".format( 

719 command, 

720 dest, 

721 sparams, 

722 redirection) 

723 

724 if isinstance(cmd, list): 

725 raise TypeError("this should not happen:" + str(cmd)) 

726 

727 warnings.warn("Hive submission is not tested. It will probably fail.") 

728 

729 fLOG("[hive_submit]:", cmd) 

730 out, err = self.execute_command(cmd, no_exception=no_exception) 

731 return out, err