Source code for pyquickhelper.serverdoc.documentation_server

# -*- coding:utf-8 -*-
"""
This file defines a simple local server delivering generating documentation.


:githublink:`%|py|6`
"""
import sys
import os
import subprocess
import copy
import datetime
try:
    from urllib.parse import urlparse, parse_qs
except ImportError:  # pragma: no cover
    from urlparse import urlparse, parse_qs
from threading import Thread
try:
    from http.server import BaseHTTPRequestHandler, HTTPServer
except ImportError:  # pragma: no cover
    from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer

if __name__ == "__main__":  # pragma: no cover
    path_ = os.path.normpath(os.path.abspath(
        os.path.join(os.path.split(__file__)[0], "..", "..", "..", "src")))
    if path_ not in sys.path:
        sys.path.append(path_)
    path_ = os.path.normpath(os.path.abspath(os.path.join(os.path.split(__file__)[0],
                                                          "..", "..", "..", "..", "pyquickhelper", "src")))
    if path_ not in sys.path:
        sys.path.append(path_)
    from pyquickhelper.loghelper import fLOG, get_url_content
else:
    from ..loghelper.flog import fLOG
    from ..loghelper.url_helper import get_url_content


[docs]class DocumentationHandler(BaseHTTPRequestHandler): """ Define a simple handler used by HTTPServer, it just serves local content. :githublink:`%|py|42` """ mappings = {"__fetchurl__": "http://", "__shutdown__": "shut://", } html_header = """ <?xml version="1.0" encoding="utf-8"?> <html> <head> <title>%s</title> </head> <body> """.replace(" ", "") html_footer = """ </body> </html> """.replace(" ", "") cache = {} cache_attributes = {} cache_refresh = datetime.timedelta(1)
[docs] def LOG(self, *args, **kwargs): """ logging function :githublink:`%|py|69` """ fLOG(*args, **kwargs)
[docs] @staticmethod def add_mapping(key, value): """ Adds a mapping associated to a local path to watch. :param key: key in ``http://locahost:8008/key/`` :param value: local path Python documentation says list are protected against multithreading (concurrent accesses). If you run the server multiple times, the mappings stays because it is a static variable. :githublink:`%|py|84` """ value = os.path.normpath(value) if not os.path.exists(value): raise FileNotFoundError(value) # pragma: no cover DocumentationHandler.mappings[key] = value
[docs] @staticmethod def get_mappings(): """ Returns a copy of the mappings. :return: dictionary of mappings :githublink:`%|py|96` """ return copy.copy(DocumentationHandler.mappings)
[docs] def __init__(self, request, client_address, server): """ Regular constructor, an instance is created for each request, do not store any data for a longer time than a request. :githublink:`%|py|103` """ BaseHTTPRequestHandler.__init__(self, request, client_address, server)
[docs] def do_GET(self): """ What to do is case of GET request. :githublink:`%|py|109` """ parsed_path = urlparse(self.path) self.serve_content(parsed_path, "GET")
[docs] def do_POST(self): """ What to do is case of POST request. :githublink:`%|py|116` """ parsed_path = urlparse.urlparse(self.path) self.serve_content(parsed_path)
[docs] def do_redirect(self, path="/index.html"): """ Redirection when url is just the website. :param path: path to redirect to (a string) :githublink:`%|py|125` """ self.send_response(301) self.send_header('Location', path) self.end_headers()
media_types = { ".js": ('application/javascript', 'r'), ".css": ("text/css", 'r'), ".html": ('text/html', 'r'), ".py": ('text/html', 'execute'), ".png": ('image/png', 'rb'), ".jpeg": ('image/jpeg', 'rb'), ".jpg": ('image/jpeg', 'rb'), ".ico": ('image/x-icon', 'rb'), ".gif": ('image/gif', 'rb'), ".eot": ('application/vnd.ms-fontobject', 'rb'), ".ttf": ('application/font-sfnt', 'rb'), ".otf": ('font/opentype', 'rb'), ".svg": ('image/svg+xml', 'r'), ".woff": ('application/font-wof', 'rb'), }
[docs] @staticmethod def get_ftype(apath): """ defines the header to send (type of files) based on path :param apath: location (a string) :return: htype, ftype (html, css, ...) If a type is missing, you should look for the ``MIME TYPE`` on a search engine. See also `media-types <http://www.iana.org/assignments/media-types/media-types.xhtml>`_ :githublink:`%|py|158` """ ext = "." + apath.split(".")[-1] htype, ftype = DocumentationHandler.media_types.get(ext, ('', '')) return htype, ftype
[docs] def send_headers(self, path): """ defines the header to send (type of files) based on path :param path: location (a string) :return: type (html, css, ...) :githublink:`%|py|168` """ htype, ftype = self.get_ftype(path) if htype != '': self.send_header('Content-type', htype) self.end_headers() else: self.send_header('Content-type', 'text/plain') self.end_headers() return ftype
[docs] def get_file_content(self, localpath, ftype, path=None): """ Returns the content of a local file. :param localpath: local filename :param ftype: r or rb :param path: if != None, the filename will be path/localpath :return: content This function implements a simple cache mechanism. :githublink:`%|py|189` """ if path is not None: tlocalpath = os.path.join(path, localpath) else: tlocalpath = localpath content = DocumentationHandler.get_from_cache(tlocalpath) if content is not None: self.LOG("serves cached", tlocalpath) return content if ftype in ("r", "execute"): if not os.path.exists( tlocalpath) and "_static/bootswatch" in tlocalpath: access = tlocalpath.replace("bootswatch", "bootstrap") else: access = tlocalpath if not os.path.exists(access): self.LOG("** w,unable to find: ", access) return None self.LOG("reading file ", access) with open(access, "r", encoding="utf8") as f: content = f.read() DocumentationHandler.update_cache(tlocalpath, content) return content else: if not os.path.exists( tlocalpath) and "_static/bootswatch" in tlocalpath: access = tlocalpath.replace("bootswatch", "bootstrap") else: access = tlocalpath if not os.path.exists(access): self.LOG("** w,unable to find: ", access) return None self.LOG("reading file ", access) with open(tlocalpath, "rb") as f: content = f.read() DocumentationHandler.update_cache(tlocalpath, content) return content
[docs] @staticmethod def get_from_cache(key): """ Retrieves a file from the cache if it was cached, it the file was added later than a day, it returns None. :param key: key :return: content or None if None found or too old :githublink:`%|py|241` """ content = DocumentationHandler.cache.get(key, None) if content is None: return content att = DocumentationHandler.cache_attributes[key] delta = datetime.datetime.now() - att["date"] if delta > DocumentationHandler.cache_refresh: del DocumentationHandler.cache[key] del DocumentationHandler.cache_attributes[key] return None else: DocumentationHandler.cache_attributes[key]["nb"] += 1 return content
[docs] @staticmethod def update_cache(key, content): """ Updates the cache. :param key: key :param content: content to place :githublink:`%|py|263` """ if len(DocumentationHandler.cache) < 5000: # we do not clean here as the cache is shared by every session/user # it would not be safe # unless we add protection # self.clean_cache(1000) pass # this one first as a document existence is checked by using cache DocumentationHandler.cache_attributes[key] = {"nb": 1, "date": datetime.datetime.now()} DocumentationHandler.cache[key] = content
[docs] @staticmethod def _print_cache(n=20): """ Displays the most requested files. :githublink:`%|py|280` """ al = [(v["nb"], k) for k, v in DocumentationHandler.cache_attributes.items() if v["nb"] > 1] for i, doc in enumerate(sorted(al, reverse=True)): if i >= n: break print("cache: {0} - {1}".format(*doc))
[docs] @staticmethod def execute(localpath): """ Locally executes a python script. :param localpath: local python script :return: output, error :githublink:`%|py|295` """ exe = subprocess.Popen([sys.executable, localpath], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, error = exe.communicate() return out, error
[docs] def feed(self, anys, script_python=False, params=None): """ Displays something. :param anys: string :param script_python: if True, the function processes script sections :param params: extra parameters when a script must be executed (should be a dictionary) A script section looks like: :: <script type="text/python"> from pandas import DataFrame pars = [ { "key":k, "value":v } for k,v in params ] tbl = DataFrame (pars) print ( tbl.to_html(class_table="myclasstable") ) </script> The server does not interpret Python, to do that, you need to use `pyrsslocal <http://www.xavierdupre.fr/app/pyrsslocal/helpsphinx/index.html>`_. :githublink:`%|py|323` """ if isinstance(anys, bytes): if script_python: raise SystemError( # pragma: no cover "** w,unable to execute script from bytes") self.wfile.write(anys) else: if script_python: #any = self.process_scripts(any, params) raise NotImplementedError( # pragma: no cover "unable to execute a python script") text = anys.encode("utf-8") self.wfile.write(text)
[docs] def shutdown(self): """ Shuts down the service. :githublink:`%|py|340` """ raise NotImplementedError() # pragma: no cover
[docs] def serve_content(self, cpath, method="GET"): """ Tells what to do based on the path. The function intercepts the path /localfile/, otherwise it calls ``serve_content_web``. If you type ``http://localhost:8080/root/file``, assuming ``root`` is mapped to a local folder. It will display this file. :param cpath: ParseResult :param method: GET or POST :githublink:`%|py|354` """ if cpath.path == "" or cpath.path == "/": params = parse_qs(cpath.query) self.serve_main_page() else: params = parse_qs(cpath.query) params["__path__"] = cpath # fullurl = cpath.geturl() fullfile = cpath.path params["__url__"] = cpath spl = fullfile.strip("/").split("/") project = spl[0] link = "/".join(spl[1:]) value = DocumentationHandler.mappings.get(project, None) if value is None: self.LOG("can't serve", cpath) self.LOG("with params", params) self.send_response(404) #raise KeyError("unable to find a mapping associated to: " + project + "\nURL:\n" + url + "\nPARAMS:\n" + str(params)) elif value == "shut://": self.LOG("call shutdown") self.shutdown() elif value == "http://": self.send_response(200) self.send_headers("debug.html") url = cpath.path.replace("/%s/" % project, "") try: content = get_url_content(url) except Exception as e: # pragma: no cover content = "<html><body>ERROR (2): %s</body></html>" % e self.feed(content, False, params={}) else: if ".." in link: # we avoid that case to prevent users from digging others paths # than the mapped ones, just in that the browser does not # remove them self.send_error(404) self.feed("Requested resource %s unavailable" % link) else: # we do not expect the documentation to point to the root # it must be relative paths localpath = link.lstrip("/") if localpath in [None, "/", ""]: localpath = "index.html" fullpath = os.path.join(value, localpath) self.LOG("localpath ", fullpath, os.path.isfile(fullpath)) self.send_response(200) _, ftype = self.get_ftype(localpath) execute = eval(params.get("execute", ["True"])[0]) spath = params.get("path", [None])[0] # keep = eval(params.get("keep", ["False"])[0]) if ftype != 'execute' or not execute: content = self.get_file_content(fullpath, ftype, spath) if content is None: self.LOG("** w,unable to get file for key:", spath) self.send_error(404) self.feed( "Requested resource %s unavailable" % localpath) else: ext = os.path.splitext(localpath)[-1].lower() if ext in [ ".py", ".c", ".cpp", ".hpp", ".h", ".r", ".sql", ".java"]: self.send_headers(".html") self.feed( DocumentationHandler.html_code_renderer(localpath, content)) elif ext in [".html"]: content = DocumentationHandler.process_html_path( project, content) self.send_headers(localpath) self.feed(content) else: self.send_headers(localpath) self.feed(content) else: self.LOG("execute file ", localpath) out, err = DocumentationHandler.execute(localpath) if len(err) > 0: self.send_error(404) self.feed( "Requested resource %s unavailable" % localpath) else: self.send_headers(localpath) self.feed(out)
[docs] @staticmethod def process_html_path(project, content): """ Processes a :epkg:`HTML` content, replaces path which are relative to the root and not the project. :param project: project, ex: ``pyquickhelper`` :param content: page content :return: modified content :githublink:`%|py|456` """ #content = content.replace(' src="',' src="' + project + '/') #content = content.replace(' href="',' href="' + project + '/') return content
[docs] @staticmethod def html_code_renderer(localpath, content): """ Produces a html code for code. :param localpath: local path to file (local or not) :param content: content of the file :return: html string :githublink:`%|py|469` """ res = [DocumentationHandler.html_header % (localpath)] res.append("<pre class=\"prettyprint\">") res.append(content.replace("<", "&lt;").replace(">", "&gt;")) res.append(DocumentationHandler.html_footer) return "\n".join(res)
[docs] def serve_content_web(self, path, method, params): """ Functions to overload (executed after serve_content). :param path: ParseResult :param method: GET or POST :param params: params parsed from the url + others :githublink:`%|py|483` """ self.send_response(200) self.send_headers("") self.feed("** w,unable to serve content for url: " + path.geturl() + "\n" + str(params) + "\n") self.send_error(404)
[docs] def serve_main_page(self): # pragma: no cover """ Displays all the mapping for the default path. :githublink:`%|py|493` """ rows = ["<html><body>"] rows.append("<h1>Documentation Server</h1>") rows.append("<ul>") for k, _ in sorted(DocumentationHandler.mappings.items()): if not k.startswith("_"): row = '<li><a href="{0}/">{0}</a></li>'.format(k) rows.append(row) rows.append("</ul></body></html>") content = "\n".join(rows) self.send_response(200) self.send_headers(".html") self.feed(content)
[docs]class DocumentationThreadServer (Thread): """ defines a thread which holds a web server .. list-table:: :widths: auto :header-rows: 1 * - attribute - meaning * - server - the server of run :githublink:`%|py|515` """
[docs] def __init__(self, server): """ :param server: to run :githublink:`%|py|520` """ Thread.__init__(self) self.server = server
[docs] def run(self): """ Runs the server. :githublink:`%|py|527` """ self.server.serve_forever()
[docs] def shutdown(self): """ Shuts down the server, if it does not work, you can still kill the thread: :: self.kill() :githublink:`%|py|538` """ self.server.shutdown() self.server.server_close()
[docs]def run_doc_server(server, mappings, thread=False, port=8079): """ Runs the server. :param server: if None, it becomes ``HTTPServer(('localhost', 8080), DocumentationHandler)`` :param mappings: prefixes with local folders (dictionary) :param thread: if True, the server is run in a thread and the function returns right away, otherwise, it runs the server. :param port: port to use :return: server if thread is False, the thread otherwise (the thread is started) .. faqref:: :title: How to run a local server which serves the documentation? The following code will create a local server: `http://localhost:8079/pyquickhelper/ <http://localhost:8079/pyquickhelper/>`_. :: this_fold = os.path.dirname(pyquickhelper.serverdoc.documentation_server.__file__) this_path = os.path.abspath( os.path.join( this_fold, "..", "..", "..", "dist", "html") ) run_doc_server(None, mappings = { "pyquickhelper": this_path } ) The same server can serves more than one project. More than one mappings can be sent. :githublink:`%|py|569` """ for k, v in mappings.items(): DocumentationHandler.add_mapping(k, v) if server is None: server = HTTPServer(('localhost', port), DocumentationHandler) elif isinstance(server, str): server = HTTPServer((server, port), DocumentationHandler) elif not isinstance(server, HTTPServer): raise TypeError( # pragma: no cover "unexpected type for server: " + str(type(server))) if thread: th = DocumentationThreadServer(server) th.start() return th else: # pragma: no cover server.serve_forever() return server
if __name__ == '__main__': # pragma: no cover run_server = True if run_server: # http://localhost:8079/pyquickhelper/ this_fold = os.path.abspath(os.path.dirname(__file__)) this_fold2 = os.path.join( this_fold, "..", "..", "..", "..", "ensae_teaching_cs", "dist", "html3") this_fold = os.path.join(this_fold, "..", "..", "..", "dist", "html") fLOG(OutputPrint=True) fLOG("running server") run_doc_server(None, mappings={"pyquickhelper": this_fold, "ensae_teaching_cs": this_fold2}) fLOG("end running server")