# -*- coding: utf-8 -*-
"""
Defines SQL commands to play with `sqlite3 <https://docs.python.org/3/library/sqlite3.html>`_.
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|7`
"""
import os
from IPython.core.magic import magics_class, line_magic
from IPython.core.magic import line_cell_magic
from pyquickhelper.ipythonhelper import MagicCommandParser, MagicClassWithHelpers
from .sql_interface import InterfaceSQL, InterfaceSQLException
[docs]@magics_class
class MagicSQL(MagicClassWithHelpers):
"""
Defines SQL commands to play with `sqlite3 <https://docs.python.org/3.4/library/sqlite3.html>`_
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|21`
"""
[docs] def get_connection(self, name):
"""
returns the connection stored in the workspace
:param name: variable name of the database
:return: object
:githublink:`%|py|29`
"""
if isinstance(name, str):
if self.shell is None:
raise Exception("No detected workspace.")
if name not in self.shell.user_ns:
raise KeyError(
"No opened sqlite3 database called: " + str(name))
res = self.shell.user_ns[name]
else:
res = name
if not isinstance(res, InterfaceSQL):
res = InterfaceSQL.create(res)
return res
[docs] @staticmethod
def SQL_connect_parser():
"""
defines the way to parse the magic command ``%SQL_connect``
:githublink:`%|py|49`
"""
parser = MagicCommandParser(prog="SQL_connect",
description='connect to a SQL database')
parser.add_argument('filename', type=str,
help='database filename', eval_type=str)
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_connect(self, line):
"""
define ``SQL_connect`` which connects to a SQL database,
it stores the database object in variable DB by default
.. nbref::
:tag: SQL
:title: SQL_connect_parser
The code for magic command ``%SQL_connect_parser`` is equivalent to::
from pyense.sql import InterfaceSQL
obj = InterfaceSQL.create(args.filename)
obj.connect()
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|78`
"""
parser = self.get_parser(MagicSQL.SQL_connect_parser, "SQL_connect")
args = self.get_args(line, parser)
if args is not None:
obj = InterfaceSQL.create(args.filename)
obj.connect()
self.shell.user_ns[args.variable] = obj
return obj
[docs] @staticmethod
def SQL_close_parser():
"""
defines the way to parse the magic command ``%SQL_close``
:githublink:`%|py|92`
"""
parser = MagicCommandParser(prog="SQL_close",
description='connect to a SQL database')
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object (and to close)')
return parser
[docs] @line_magic
def SQL_close(self, line=""):
"""
define ``SQL_close`` which closes a database
.. nbref::
:tag: SQL
:title: SQL_close
The code for magic command ``%SQL_close`` is equivalent to::
db.close()
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|116`
"""
parser = self.get_parser(MagicSQL.SQL_close_parser, "SQL_close")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
r = db.close()
return r
[docs] @staticmethod
def SQL_tables_parser():
"""
defines the way to parse the magic command ``%SQL_tables``
:githublink:`%|py|129`
"""
parser = MagicCommandParser(prog="SQL_tables",
description='list the tables of a database')
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_tables(self, line=""):
"""
define ``%SQL_tables`` whichs lists the tables in a database
.. nbref::
:tag: SQL
:title: SQL_tables
The code for magic command ``%SQL_tables`` is equivalent to::
db.get_table_list()
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|153`
"""
parser = self.get_parser(MagicSQL.SQL_tables_parser, "SQL_tables")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
return db.get_table_list()
[docs] @staticmethod
def SQL_drop_table_parser():
"""
defines the way to parse the magic command ``%SQL_drop_table``
:githublink:`%|py|165`
"""
parser = MagicCommandParser(prog="SQL_drop_table",
description='drop a table from a database')
parser.add_argument('table', type=str, help='table', eval_type=str)
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_drop_table(self, line):
"""
defines ``%SQL_drop_table`` which drops a table from a database
.. nbref::
:tag: SQL
:title: SQL_drop_table
The code for magic command ``%SQL_drop_table`` is equivalent to::
db.drop_table()
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|190`
"""
parser = self.get_parser(
MagicSQL.SQL_drop_table_parser, "SQL_drop_table")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
return db.drop_table(args.table)
[docs] @staticmethod
def SQL_refresh_completion_parser():
"""
defines the way to parse the magic command ``%SQL_refresh_completion``
:githublink:`%|py|203`
"""
parser = MagicCommandParser(prog="SQL_refresh_completion",
description='refresh completion (tables names, ...)')
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_refresh_completion(self, line=""):
"""
defines ``%SQL_refresh_completion``
:githublink:`%|py|217`
"""
parser = self.get_parser(
MagicSQL.SQL_refresh_completion_parser, "SQL_refresh_completion")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
db.refresh_completion()
[docs] @staticmethod
def SQL_schema_parser():
"""
defines the way to parse the magic command ``%SQL_schema``
:githublink:`%|py|230`
"""
parser = MagicCommandParser(prog="SQL_schema",
description='schema of a table')
parser.add_argument('table', type=str, help='table', eval_type=str)
parser.add_argument(
'--as_list', help='as a dictionary (False) or as a list (True)', action="store_true")
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_schema(self, line=""):
"""
define ``SQL_schema``
.. nbref::
:tag: SQL
:title: SQL_schema
The code for magic command ``%SQL_schema`` is equivalent to::
db.get_table_columns(<table>, as_dict=not <as_list>)
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|257`
"""
parser = self.get_parser(MagicSQL.SQL_schema_parser, "SQL_schema")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
return db.get_table_columns(args.table, as_dict=not args.as_list)
[docs] @staticmethod
def SQL_import_tsv_parser():
"""
defines the way to parse the magic command ``%SQL_import_tsv``
:githublink:`%|py|269`
"""
parser = MagicCommandParser(prog="SQL_import_tsv",
description='import a tsv file into the database')
parser.add_argument('filename', type=str,
help='tsv file name', eval_type=str)
parser.add_argument('-t', '--table', type=str,
help='table name', default="-", eval_type=str)
parser.add_argument(
'--verbose', help='print progress', action="store_true")
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_import_tsv(self, line):
"""
defines ``%SQL_import_tsv`` whichs import a TSV file into a database
.. nbref::
:tag: SQL
:title: SQL_import_tsv
The code for magic command ``%SQL_import_tsv`` is equivalent to::
db.import_flat_file(<filename>, <table>)
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|299`
"""
parser = self.get_parser(
MagicSQL.SQL_import_tsv_parser, "SQL_import_tsv")
args = self.get_args(line, parser)
if args is not None:
if not os.path.exists(args.filename):
raise FileNotFoundError(args.filename)
db = self.get_connection(args.variable)
table = os.path.splitext(os.path.split(args.filename)[-1])[0] \
if len(args.table) == 0 or args.table == "-" else args.table
return db.import_flat_file(args.filename, table)
[docs] @staticmethod
def SQL_add_function_parser():
"""
defines the way to parse the magic command ``%SQL_add_function``
:githublink:`%|py|316`
"""
parser = MagicCommandParser(prog="SQL_add_function",
description='add a custom function to the database')
parser.add_argument('funct', type=str, help='function name')
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_add_function(self, line):
"""
defines ``%SQL_add_function`` which adds a function to the database
.. nbref::
:tag: SQL
:title: SQL_add_function
The code for magic command ``%SQL_add_function`` is equivalent to::
db.add_function(fu)
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|341`
"""
parser = self.get_parser(
MagicSQL.SQL_add_function_parser, "SQL_add_function")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
if isinstance(args.funct, str):
if self.shell is not None:
if args.funct not in self.shell.user_ns:
raise KeyError(
"unable to find function %s in your workspace" %
args.funct)
fu = self.shell.user_ns[args.funct]
else:
raise Exception("unable to find IPython workspace")
else:
fu = args.funct
return db.add_function(fu)
[docs] @staticmethod
def SQL_import_df_parser():
"""
defines the way to parse the magic command ``%SQL_import_df``
:githublink:`%|py|365`
"""
parser = MagicCommandParser(prog="SQL_import_df",
description='import a dataframe into the database')
parser.add_argument('df', type=str, help='dataframe', no_eval=True)
parser.add_argument('-t', '--table', type=str,
help='table name', default="-", eval_type=str)
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_magic
def SQL_import_df(self, line):
"""
defines ``%SQL_import_df`` which imports a dataframe into a database
.. nbref::
:tag: SQL
:title: SQL_import_df
The code for magic command ``%SQL_import_df`` is equivalent to::
db.import_dataframe(<table>, <df>)
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|392`
"""
parser = self.get_parser(
MagicSQL.SQL_import_df_parser, "SQL_import_df")
args = self.get_args(line, parser)
if args is not None:
db = self.get_connection(args.variable)
df = args.df
if self.shell is not None:
if df not in self.shell.user_ns:
raise KeyError(
"unable to find dataframe %s in your workspace" %
df)
odf = self.shell.user_ns[df]
else:
raise Exception("unable to find IPython workspace")
table = df if len(
args.table) == 0 or args.table == "-" else args.table
return db.import_dataframe(table, odf)
[docs] @staticmethod
def SQL_parser():
"""
defines the way to parse the magic command ``%%SQL``
:githublink:`%|py|418`
"""
parser = MagicCommandParser(prog="SQL",
description='query the database')
parser.add_argument(
'--df', type=str, help='output dataframe', default="temp_view", no_eval=True)
parser.add_argument('-n', '--n', type=int,
help='number of first lines to display', default=10, eval_type=int)
parser.add_argument('-q', '--query', type=str,
help='when used in a single line (no cell), query is the SQL query, the command ' +
'returns the full dataframe', default="", eval_type=str)
parser.add_argument(
'-v',
'--variable',
default="DB",
help='variable name used to store the database object')
return parser
[docs] @line_cell_magic
def SQL(self, line, cell=None):
"""
defines command ``%%SQL``
.. nbref::
:tag: SQL
:title: SQL
The code for magic command ``%%SQL`` is equivalent to::
<variable> = db.execute(<cell>)
See notebook :ref:`pyensaesqlmagicrst`.
:githublink:`%|py|449`
"""
parser = self.get_parser(MagicSQL.SQL_parser, "SQL")
args = self.get_args(line, parser)
if args is not None:
full = False
if cell is None or len(cell) == 0:
cell = args.query
if cell is None or len(cell) == 0:
raise ValueError("no SQL query is defined")
query = cell
full = True
else:
query = cell
query = query.strip()
if len(query) > 0 and query[0] == '"' and query[-1] == '"':
query = query[1:-1]
db = self.get_connection(args.variable)
try:
df = db.execute(query)
ok = True
except InterfaceSQLException as e:
print(str(e))
ok = False
if ok:
self.shell.user_ns[args.df] = df
if full:
return df
else:
return df.head(n=args.n)
[docs]def register_sql_magics(ip=None):
"""
register magics function, can be called from a notebook
:param ip: from ``get_ipython()``
:githublink:`%|py|489`
"""
if ip is None:
from IPython import get_ipython
ip = get_ipython()
ip.register_magics(MagicSQL)