Source code for pyenbc.filehelper.pig_helper

"""
Hadoop uses a java implementation of Python: Jython.
This provides provides helper around that.

.. versionadded:: 1.1


:githublink:`%|py|8`
"""

import os
import glob
from pyquickhelper.loghelper import run_cmd, noLOG
from pyquickhelper.filehelper import change_file_status
from pyensae.datasource.http_retrieve import download_data
from .jython_helper import get_java_cmd, get_java_path

PIG_VERSION = "0.17.0"
HADOOP_VERSION = "3.2.0"


[docs]def download_pig_standalone(pig_version=PIG_VERSION, hadoop_version=HADOOP_VERSION, fLOG=noLOG): """ Downloads the standalone :epkg:`jython`. If it does not exists, we should version ``HADOOP_VERSION`` by default in order to fit the cluster's version. :param pig_version: pig_version :param hadoop_version: hadoop_version :param fLOG: logging function :return: location This function might need to be run twice if the first try fails, it might to due to very long path when unzipping the downloaded file. :epkg:`Hadoop` is downloaded from one of the websites referenced at `Apache Software Foundation <http://www.apache.org/dyn/closer.cgi/hadoop/common/>`_. Check the source to see which one was chosen. :githublink:`%|py|41` """ fbs = [] # download winutils.exe d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "winutils") if not os.path.exists(d): os.mkdir(d) exe = download_data(name="winutils.zip", whereTo=d, website="xd", fLOG=fLOG) fbs.append(exe) change_file_status(d) # download hadoop fLOG("download hadoop", hadoop_version) d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "hadoopjar") if not os.path.exists(d): os.mkdir(d) fn = download_data(name="hadoop-%s.tar.gz" % hadoop_version, whereTo=d, website="http://apache.crihan.fr/dist/hadoop/common/hadoop-%s/" % hadoop_version, fLOG=fLOG) fbs.append(fn) change_file_status(d) # download pig fLOG("download pig", pig_version) d = os.path.join(os.path.abspath(os.path.dirname(__file__)), "pigjar") if not os.path.exists(d): os.mkdir(d) fn = download_data(name="pig-%s.tar.gz" % pig_version, whereTo=d, silent=True, website="http://apache.crihan.fr/dist/pig/pig-%s/" % pig_version, fLOG=fLOG) fbs.append(fn) change_file_status(d) return fbs
[docs]def get_pig_path(): """ This function assumes a folder pig ``pigjar`` is present in this directory, the function returns the folder :return: absolute path :githublink:`%|py|87` """ this = os.path.abspath(os.path.dirname(__file__)) files = [os.path.join(this, _) for _ in os.listdir(this)] files = [_ for _ in files if "pig" in _ and os.path.isdir(_)] if len(files) == 0: raise FileNotFoundError("no pig folder found in " + this) if len(files) != 1: raise FileNotFoundError( "more than one folder found in " + this + "\n:" + "\n".join(files)) return files[0]
[docs]def get_hadoop_path(): """ This function assumes a folder pig ``hadoopjar`` is present in this directory, the function returns the folder. :return: absolute path :githublink:`%|py|108` """ this = os.path.abspath(os.path.dirname(__file__)) files = [os.path.join(this, _) for _ in os.listdir(this)] files = [_ for _ in files if "hadoop" in _ and os.path.isdir(_)] if len(files) == 0: raise FileNotFoundError("no hadoop folder found in " + this) if len(files) != 1: raise FileNotFoundError( "more than one folder found in " + this + "\n:" + "\n".join(files)) return files[0]
[docs]def get_pig_jars(): """ Returns the list of jars to include into the command line in order to run :epkg:`PIG`. :return: list of jars :githublink:`%|py|129` """ path = get_pig_path() res = [] for root, _, files in os.walk(path): for name in files: if os.path.splitext(name)[-1] == ".jar" and "lib" in root: if "h1" not in root and "h1" not in name and "h1" not in root \ and "hadoop1-runtime" not in name \ and "hadoop1-runtime" not in root \ and "test" not in root \ and "h2" not in name \ and ("pig-" + PIG_VERSION + "-withouthadoop-h2") not in name: res.append(os.path.join(root, name)) return res
[docs]def get_hadoop_jars(): """ Returns the list of jars to include into the command line in order to run :epkg:`HADOOP`. :return: list of jars :githublink:`%|py|151` """ path = get_hadoop_path() res = [] for root, _, files in os.walk(path): for name in files: if os.path.splitext(name)[-1] == ".jar": if "sources.jar" not in name and "-test-sources" not in name \ and "tests.jar" not in name: res.append(os.path.join(root, name)) return res
[docs]def run_pig(pigfile, argv=None, pig_path=None, hadoop_path=None, jython_path=None, timeout=None, logpath="logs", pig_version=PIG_VERSION, hadoop_version=HADOOP_VERSION, jar_no_hadoop=True, fLOG=noLOG): """ Runs a :epkg:`pig` script and returns the standard output and error. :param pigfile: pig file :param argv: arguments to sned to the command line :param pig_path: path to pig 0.XX.0 :param hadoop_path: path to hadoop :param timeout: timeout :param logpath: path to the logs :param pig_version: PIG version (if *pig_path* is not defined) :param hadoop_version: Hadoop version (if *hadoop_path* is not defined) :param jar_no_hadoop: use :epkg:`pig` without :epkg:`hadoop` :param fLOG: logging function :return: out, err If *pig_path* is None, the function looks into this directory. :githublink:`%|py|184` """ if pig_path is None: pig_path = os.path.join(get_pig_path(), "pig-%s" % pig_version) if hadoop_path is None: hadoop_path = get_hadoop_path() java = get_java_path() if "JAVA_HOME" not in os.environ: os.environ["JAVA_HOME"] = java if "PIG_CONF_DIR" not in os.environ: os.environ["PIG_CONF_DIR"] = os.path.normpath( os.path.join( pig_path, "conf")) if not os.path.exists(os.environ["PIG_CONF_DIR"]): raise FileNotFoundError(os.environ["PIG_CONF_DIR"]) if "HADOOP_HOME" not in os.environ: os.environ["HADOOP_HOME"] = hadoop_path if not os.path.exists(os.environ["HADOOP_HOME"]): raise FileNotFoundError(os.environ["HADOOP_HOME"]) if "HADOOP_CLIENT_OPTS" not in os.environ: os.environ["HADOOP_CLIENT_OPTS"] = "-Xmx1024m" fLOG("PIG_CONF_DIR=", os.environ["PIG_CONF_DIR"]) def clean(i, p): "local function" if i == 0: return p if '"' in p: p = p.replace('"', '\\"') if " " in p: p = '"{0}"'.format(p) return p jars = [] if not jar_no_hadoop: jars.extend(get_pig_jars()) # + get_hadoop_jars() folds = set(os.path.split(j)[0] for j in jars) jars = [os.path.join(f, "*.jar") for f in folds] jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "common", "lib", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "hdfs", "lib", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "mapreduce", "lib", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "httpfs", "tomcat", "lib", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "tools", "lib", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "yarn", "lib", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "common", "hadoop-common-%s.jar" % hadoop_version)) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "common", "hadoop-nfs-%s" % hadoop_version)) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "hdfs", "hadoop-hdfs-%s.jar" % hadoop_version)) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "mapreduce", "*.jar")) jars.append( os.path.join( hadoop_path, "hadoop-%s" % hadoop_version, "share", "hadoop", "yarn", "*.jar")) jars.append(os.path.join(pig_path, "pig-%s-core-h2.jar" % pig_version)) else: jars.append( os.path.join( pig_path, "pig-%s" % pig_version, "legacy", "pig-%s-withouthadoop-h2.jar" % pig_version)) jarsall = [] for j in jars: r = glob.glob(j) jarsall.extend(r) jarsall.sort() jars = ";".join(jars) fLOG("jars", jars) cmd = [get_java_cmd(), "-Xmx1024m", "-classpath", jars, "-Dpig.log.dir=" + logpath, "-Dhadoop.log.dir=" + logpath, "-Dhadoop.tmp.dir=" + logpath, "-Dpig.log.file=pid.log", "-Djava.io.tmpdir=" + logpath, "-Dpig.home.dir=" + pig_path, # "-Dpig.schematuple=true", #"-Dpig.schematuple.local.dir=" + logpath, "org.apache.pig.Main", "-x", "local", pigfile, "-stop_on_failure" ] cmd = " ".join(clean(i, _) for i, _ in enumerate(cmd)) out, err = run_cmd( cmd, wait=True, sin=None, communicate=True, timeout=timeout, shell=False) out = "PIG_CONF_DIR={0}\nJAVA_HOME={1}\nHADOOP_HOME={2}\n{3}\n{4}".format( os.environ.get('PIG_CONF_DIR', ''), os.environ.get('JAVA_HOME', ''), os.environ.get('HADOOP_HOME', ''), "\n".join("add jar: '{0}'".format(j) for j in jarsall), out) return out, err