# pylint: disable=too-many-lines
"""
Various helpers.
"""
import collections
import contextlib
import errno
import functools
import httplib
import json
import os
import pipes
import re
import shlex
import subprocess
import sys
import threading
import time
import urllib2
import warnings
import bs4
import urlnorm
import jinja2
import requests as original_requests
# Don't know why pylint reports "Relative import 'ruamel.yaml', should be 'gluetool.ruamel.yaml'" :(
# pylint: disable=relative-import
import ruamel.yaml
from gluetool import GlueError, SoftGlueError, GlueCommandError
from gluetool.log import Logging, ContextAdapter, log_blob, BlobLogger, format_dict, log_dict, print_wrapper, \
PackageAdapter
try:
# pylint: disable=ungrouped-imports
from subprocess import DEVNULL
except ImportError:
DEVNULL = open(os.devnull, 'wb')
[docs]def deprecated(func):
"""
This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted
when the function is used.
"""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.simplefilter('always', DeprecationWarning) # turn off filter
warnings.warn('Function {} is deprecated.'.format(func.__name__), category=DeprecationWarning,
stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) # reset filter
return func(*args, **kwargs)
return new_func
[docs]def dict_update(dst, *args):
"""
Python's ``dict.update`` does not return the dictionary just updated but a ``None``. This function
is a helper that does updates the dictionary *and* returns it. So, instead of:
.. code-block:: python
d.update(other)
return d
you can use:
.. code-block:: python
return dict_update(d, other)
:param dict dst: dictionary to be updated.
:param args: dictionaries to update ``dst`` with.
"""
for other in args:
assert isinstance(other, dict)
dst.update(other)
return dst
[docs]def normalize_bool_option(option_value):
"""
Convert option value to Python's boolean.
``option_value`` is what all those internal option processing return,
which may be a default value set for an option, or what user passed in.
As switches, options with values can be used:
.. code-block:: bash
--foo=yes|no
--foo=true|false
--foo=1|0
--foo=Y|N
--foo=on|off
With combination of ``store_true``/``store_false`` and a default value module developer
sets for the option, simple form without value is evaluated as easily. With ``store_true``
and ``False`` default, following option turn the feature `foo` on:
.. code-block:: bash
--enable-foo
With ``store_false`` and ``True`` default, following simple option turn the feature `foo` off:
.. code-block:: bash
--disable-foo
"""
if str(option_value).strip().lower() in ('yes', 'true', '1', 'y', 'on'):
return True
return False
[docs]def normalize_multistring_option(option_value, separator=','):
"""
Reduce string, representing comma-separated list of items, or possibly a list of such strings,
to a simple list of items. Strips away the whitespace wrapping such items.
.. code-block:: bash
foo --option value1 --option value2, value3
foo --option value1,value2,value3
Or, when option is set by a config file:
.. code-block:: bash
option = value1
option = value1, value2, \
value3
After processing, different variants can be found when ``option('option')`` is called,
``['value1', 'value2,value3']``, ``['value1,value2,value3']``, ``'value1'`` and ``value1, value2, value3``.
To reduce the necessary work, use this helper function to treat such option's value,
and get simple ``['value1', 'value2', 'value3']`` structure.
"""
if option_value is None:
return []
# If the value is string, convert it to list - it comes from a config file,
# command-line parsing always produces a list. This reduces config file values
# to the same structure command-line produces.
values = [option_value] if isinstance(option_value, str) else option_value
# Now deal with possibly multiple paths, separated by comma and some white space, inside
# every item of the list. Split the paths in the item by the given separator, strip the
# white space, and concatenate these lists (one for each item in the main `values` list)
# using sum - with an empty list as a start, it works for lists just as nicely.
return sum([
[value.strip() for value in item.split(separator)] for item in values
], [])
[docs]def normalize_shell_option(option_value):
"""
Reduce string, using a shell-like syntax, or possibly a list of such strings,
to a simple list of items. Strips away the whitespace wrapping such items.
.. code-block:: bash
foo --option value1 --option value2\\ value3 --option "value4 value5"
Or, when option is set by a config file:
.. code-block:: bash
option = value1 value2\\ value3 "value4 value5"
After processing, different variants can be found when ``option('option')`` is called,
``['value1', 'value2,value3']``, ``['value1,value2,value3']``, ``'value1'`` and ``value1, value2, value3``.
To reduce the necessary work, use this helper function to treat such option's value,
and get simple ``['value1', 'value2 value3', 'value4 value5']`` structure.
"""
if not option_value:
return []
# If the value is string, convert it to list - it comes from a config file,
# command-line parsing always produces a list. This reduces config file values
# to the same structure command-line produces.
values = [option_value] if isinstance(option_value, str) else option_value
# Now split each item using shlex, and merge these lists into a single one.
return sum([
shlex.split(value) for value in values
], [])
[docs]def normalize_path(path):
"""
Apply common treatments on a given path:
* replace home directory reference (``~`` and similar), and
* convert ``path`` to a normalized absolutized version of the pathname.
"""
return os.path.abspath(os.path.expanduser(path))
[docs]def normalize_path_option(option_value, separator=','):
"""
Reduce many ways how list of paths is specified by user, to a simple list of paths. See
:py:func:`normalize_multistring_option` for more details.
"""
return [normalize_path(path) for path in normalize_multistring_option(option_value, separator=separator)]
[docs]class IncompatibleOptionsError(SoftGlueError):
pass
[docs]class Bunch(object):
# pylint: disable=too-few-public-methods
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
[docs]class ThreadAdapter(ContextAdapter):
"""
Custom logger adapter, adding thread name as a context.
:param gluetool.log.ContextAdapter logger: parent logger whose methods will be used for logging.
:param threading.Thread thread: thread whose name will be added.
"""
def __init__(self, logger, thread):
super(ThreadAdapter, self).__init__(logger, {'ctx_thread_name': (5, thread.name)})
[docs]class WorkerThread(threading.Thread):
"""
Worker threads gets a job to do, and returns a result. It gets a callable, ``fn``,
which will be called in thread's ``run()`` method, and thread's ``result`` property
will be the result - value returned by ``fn``, or exception raised during the
runtime of ``fn``.
:param gluetool.log.ContextAdapter logger: logger to use for logging.
:param fn: thread will start `fn` to do the job.
:param fn_args: arguments for `fn`
:param fn_kwargs: keyword arguments for `fn`
"""
def __init__(self, logger, fn, fn_args=None, fn_kwargs=None, **kwargs):
threading.Thread.__init__(self, **kwargs)
self.logger = ThreadAdapter(logger, self)
self.logger.connect(self)
self._fn = fn
self._args = fn_args or ()
self._kwargs = fn_kwargs or {}
self.result = None
[docs] def run(self):
self.debug('worker thread started')
try:
self.result = self._fn(*self._args, **self._kwargs)
# pylint: disable=broad-except
except Exception as e:
self.exception('exception raised in worker thread: {}'.format(str(e)))
self.result = e
finally:
self.debug('worker thread finished')
[docs]class StreamReader(object):
def __init__(self, stream, name=None, block=16):
"""
Wrap blocking ``stream`` with a reading thread. The threads read from
the (normal, blocking) `stream` and adds bits and pieces into the `queue`.
``StreamReader`` user then can check the `queue` for new data.
"""
self._stream = stream
self._name = name or stream.name
# List would fine as well, however deque is better optimized for
# FIFO operations, and it provides the same thread safety.
self._queue = collections.deque()
self._content = []
def _enqueue():
"""
Read what's available in stream and add it into the queue
"""
while True:
data = self._stream.read(block)
if not data:
# signal EOF
self._queue.append('')
return
self._queue.append(data)
self._content.append(data)
self._thread = threading.Thread(target=_enqueue)
self._thread.daemon = True
self._thread.start()
@property
def name(self):
return self._name
@property
def content(self):
return ''.join(self._content)
[docs] def wait(self):
self._thread.join()
[docs] def read(self):
try:
return self._queue.popleft()
except IndexError:
return None
[docs]class ProcessOutput(object):
"""
Result of external process.
"""
# pylint: disable=too-many-arguments,too-few-public-methods
def __init__(self, cmd, exit_code, stdout, stderr, kwargs):
self.cmd = cmd
self.kwargs = kwargs
self.exit_code = exit_code
self.stdout = stdout
self.stderr = stderr
[docs] def log_stream(self, stream, logger):
content = getattr(self, stream)
if content is None:
if stream in self.kwargs:
logger.debug('{}:\n command produced no output'.format(stream))
else:
logger.debug('{}:\n command forwarded the output to its parent'.format(stream))
else:
log_blob(logger.verbose, stream, content)
[docs] def log(self, logger):
logger.debug('command exited with code {}'.format(self.exit_code))
self.log_stream('stdout', logger)
self.log_stream('stderr', logger)
[docs]class Command(object):
"""
Wrap an external command, its options and other information, necessary for running the command.
The main purpose is to gather all relevant pieces into a single space, call :py:class:`subprocess.Popen`,
and log everything.
By default, both standard output and error output are of the process are captured and returned back to
caller. Under some conditions, caller might want to see the output in "real-time". For that purpose,
they can pass callable via ``inspect_callback`` parameter - such callable will be called for every received
bit of input on both standard and error outputs. E.g.
.. code-block:: python
def foo(stream, s, flush=False):
if s is not None and 'a' in s:
print s
Command(['/bin/foo']).run(inspect=foo)
This example will print all substrings containing letter `a`. Strings passed to ``foo`` may be of arbitrary
lengths, and may change between subsequent use of ``Command`` class.
:param list executable: Executable to run. Feel free to use the whole command, including its options,
if you have no intention to modify them before running the command.
:param list options: If set, it's a list of options to pass to the ``executable``. Options are
specified in a separate list to allow modifications of ``executable`` and ``options``
before actually running the command.
:param gluetool.log.ContextAdapter logger: Parent logger whose methods will be used for logging.
.. versionadded:: 1.1
"""
# pylint: disable=too-few-public-methods
def __init__(self, executable, options=None, logger=None):
self.logger = logger or ContextAdapter(Logging.get_logger())
self.logger.connect(self)
self.executable = executable
self.options = options or []
self.use_shell = False
self.quote_args = False
self._command = None
self._popen_kwargs = None
self._process = None
self._exit_code = None
self._stdout = None
self._stderr = None
[docs] def _apply_quotes(self):
"""
Return options to pass to ``Popen``. Applies quotes as necessary.
"""
if not self.quote_args:
return self.executable + self.options
# escape apostrophes in strings and adds them around strings with space
return [('"{}"'.format(option.replace('"', r'\"')) if ' ' in option and not
(
(option.startswith('"') and option.endswith('"')) or
(option.startswith("'") and option.endswith("'")))
else option) for option in self.executable + self.options]
[docs] def _communicate_batch(self):
self._stdout, self._stderr = self._process.communicate()
[docs] def _communicate_inspect(self, inspect_callback):
# let's capture *both* streams - capturing just a single one leads to so many ifs
# and elses and messy code
p_stdout = StreamReader(self._process.stdout, name='<stdout>')
p_stderr = StreamReader(self._process.stderr, name='<stderr>')
if inspect_callback is None:
def stdout_write(stream, data, flush=False):
# pylint: disable=unused-argument
if data is None:
return
# Not suitable for multiple simultaneous commands. Shuffled output will
# ruin your day. And night. And few following weeks, full of debugging, as well.
sys.stdout.write(data)
sys.stdout.flush()
inspect_callback = stdout_write
inputs = (p_stdout, p_stderr)
with BlobLogger('Output of command: {}'.format(format_command_line([self._command])),
outro='End of command output',
writer=self.info):
self.debug("output of command is inspected by the caller")
self.debug('following blob-like header and footer are expected to be empty')
self.debug('the captured output will follow them')
# As long as process runs, keep calling callbacks with incoming data
while True:
for stream in inputs:
inspect_callback(stream, stream.read())
if self._process.poll() is not None:
break
# give up OS' attention and let others run
# time.sleep(0) is a Python synonym for "thread yields the rest of its quantum"
time.sleep(0.1)
# OK, process finished but we have to wait for our readers to finish as well
p_stdout.wait()
p_stderr.wait()
for stream in inputs:
while True:
data = stream.read()
if data in ('', None):
break
inspect_callback(stream, data)
inspect_callback(stream, None, flush=True)
self._stdout, self._stderr = p_stdout.content, p_stderr.content
[docs] def _construct_output(self):
output = ProcessOutput(self._command, self._exit_code, self._stdout, self._stderr, self._popen_kwargs)
output.log(self.logger)
return output
[docs] def run(self, inspect=False, inspect_callback=None, **kwargs):
"""
Run the command, wait for it to finish and return the output.
:param bool inspect: If set, ``inspect_callback`` will receive the output of command in "real-time".
:param callable inspect_callback: callable that will receive command output. If not set, default
"write to ``sys.stdout``" is used.
:rtype: gluetool.utils.ProcessOutput instance
:returns: :py:class:`gluetool.utils.ProcessOutput` instance whose attributes contain data returned
by the child process.
:raises gluetool.glue.GlueError: When somethign went wrong.
:raises gluetool.glue.GlueCommandError: When command exited with non-zero exit code.
"""
# pylint: disable=too-many-branches
def _check_types(items):
if not isinstance(items, list):
raise GlueError('Only list of strings is accepted')
if not all((isinstance(s, str) for s in items)):
raise GlueError('Only list of strings is accepted, {} found'.format([type(s) for s in items]))
_check_types(self.executable)
_check_types(self.options)
self._command = self._apply_quotes()
if self.use_shell is True:
self._command = [' '.join(self._command)]
# Set default stdout/stderr, unless told otherwise
if 'stdout' not in kwargs:
kwargs['stdout'] = subprocess.PIPE
if 'stderr' not in kwargs:
kwargs['stderr'] = subprocess.PIPE
if self.use_shell:
kwargs['shell'] = True
self._popen_kwargs = kwargs
def _format_stream(stream):
if stream == subprocess.PIPE:
return 'PIPE'
if stream == DEVNULL:
return 'DEVNULL'
if stream == subprocess.STDOUT:
return 'STDOUT'
return stream
printable_kwargs = kwargs.copy()
for stream in ('stdout', 'stderr'):
if stream in printable_kwargs:
printable_kwargs[stream] = _format_stream(printable_kwargs[stream])
log_dict(self.debug, 'command', self._command)
log_dict(self.debug, 'kwargs', printable_kwargs)
log_blob(self.debug, 'runnable (copy & paste)', format_command_line([self._command]))
try:
self._process = subprocess.Popen(self._command, **self._popen_kwargs)
if inspect is True:
self._communicate_inspect(inspect_callback)
else:
self._communicate_batch()
except OSError as e:
if e.errno == errno.ENOENT:
raise GlueError("Command '{}' not found".format(self._command[0]))
raise e
self._exit_code = self._process.poll()
output = self._construct_output()
if self._exit_code != 0:
raise GlueCommandError(self._command, output)
return output
@deprecated
[docs]def run_command(cmd, logger=None, **kwargs):
"""
Wrapper for ``Command(...).run().
Provided for backward compatibility.
.. deprecated:: 1.1
Use :py:class:`gluetool.utils.Command` instead.
"""
return Command(cmd, logger=logger).run(**kwargs)
[docs]def check_for_commands(cmds):
""" Checks if all commands in list cmds are valid """
for cmd in cmds:
try:
Command(['/bin/bash', '-c', 'command -v {}'.format(cmd)]).run(stdout=DEVNULL)
except GlueError:
raise GlueError("Command '{}' not found on the system".format(cmd))
[docs]class cached_property(object):
# pylint: disable=invalid-name,too-few-public-methods
"""
``property``-like decorator - at first access, it calls decorated
method to acquire the real value, and then replaces itself with
this value, making it effectively "cached". Useful for properties
whose value does not change over time, and where getting the real
value could penalize execution with unnecessary (network, memory)
overhead.
Delete attribute to clear the cached value - on next access, decorated
method will be called again, to acquire the real value.
Of possible options, only read-only instance attribute access is
supported so far.
"""
def __init__(self, method):
self._method = method
self.__doc__ = getattr(method, '__doc__')
def __get__(self, obj, cls):
# does not support class attribute access, only instance
assert obj is not None
# get the real value of this property
value = self._method(obj)
# replace cached_property instance with the value
obj.__dict__[self._method.__name__] = value
return value
@deprecated
[docs]def fetch_url(url, logger=None, success_codes=(200,)):
"""
"Get me content of this URL" helper.
Very thin wrapper around urllib. Added value is logging, and converting
possible errors to :py:class:`gluetool.glue.GlueError` exception.
:param str url: URL to get.
:param gluetool.log.ContextLogger logger: Logger used for logging.
:param tuple success_codes: tuple of HTTP response codes representing successfull request.
:returns: tuple ``(response, content)`` where ``response`` is what :py:func:`urllib2.urlopen`
returns, and ``content`` is the payload of the response.
"""
logger = logger or Logging.get_logger()
logger.debug("opening URL '{}'".format(url))
try:
response = urllib2.urlopen(url)
code, content = response.getcode(), response.read()
except urllib2.HTTPError as exc:
logger.exception(exc)
raise GlueError("Failed to fetch URL '{}': {}".format(url, exc.message))
log_blob(logger.debug, '{}: {}'.format(url, code), content)
if code not in success_codes:
raise GlueError("Unsuccessfull response from '{}'".format(url))
return response, content
@contextlib.contextmanager
[docs]def requests(logger=None):
"""
Wrap :py:mod:`requests` with few layers providing us with the logging and better insight into
what has been happening when ``requests`` did their job.
Used as a context manager, yields a patched ``requests`` module. As long as inside the context,
detailed information about HTTP traffic are logged via given logger.
.. note::
The original ``requests`` library is returned, with slight modifications for better integration
with ``gluetool`` logging facilities. Each and every ``requests`` API feature is available
and , hopefully, enhancements applied by this wrapper wouldn't interact with ``requests``
functionality.
.. code-block:: python
with gluetool.utils.requests() as R:
R.get(...).json()
...
r = R.post(...)
assert r.code == 404
...
:param logger: used for logging.
:returns: :py:mod:`requests` module.
"""
# Enable httplib debugging. It's being used underneath ``requests`` and ``urllib3``,
# but it's stupid - uses "print" instead of a logger, therefore we have to capture it
# and disable debug logging when leaving the context.
logger = logger or Logging.get_logger()
httplib_logger = PackageAdapter(logger, 'httplib')
httplib.HTTPConnection.debuglevel = 1
# Start capturing ``print`` statements - they are used to provide debug messages, therefore
# using ``debug`` level.
with print_wrapper(log_fn=httplib_logger.debug):
# To log responses and their content, we must take a look at ``Response`` instance
# returned by several entry methods (``get``, ``post``, ...). To do that, we have
# a simple wrapper function.
# ``original_method`` is the actual ``requests.foo`` (``get``, ``post``, ...), wrapper
# calls it to do the job, and logs response when it's done.
def _verbose_request(original_method, *args, **kwargs):
ret = original_method(*args, **kwargs)
log_blob(logger.debug, 'response content', ret.text)
return ret
# gather the original methods...
methods = {
method_name: getattr(original_requests, method_name)
for method_name in ('head', 'get', 'post', 'put', 'patch', 'delete')
}
# ... and replace them with our wrapper, giving it the original method as the first argument
for method_name, original_method in methods.iteritems():
setattr(original_requests, method_name, functools.partial(_verbose_request, original_method))
try:
yield original_requests
finally:
# put original methods back...
for method_name, original_method in methods.iteritems():
setattr(original_requests, method_name, original_method)
# ... and disable httplib debugging
httplib.HTTPConnection.debuglevel = 0
[docs]def treat_url(url, logger=None):
"""
Remove "weird" artifacts from the given URL. Collapse adjacent '.'s, apply '..', etc.
:param str url: URL to clear.
:param gluetool.log.ContextAdapter logger: logger to use for logging.
:rtype: str
:returns: Treated URL.
"""
logger = logger or Logging.get_logger()
logger.debug("treating a URL '{}'".format(url))
try:
url = str(urlnorm.norm(url))
except urlnorm.InvalidUrl as exc:
# urlnorm cannot handle localhost: https://github.com/jehiah/urlnorm/issues/3
if exc.message == "host u'localhost' is not valid":
pass
else:
raise exc
return url.strip()
[docs]def render_template(template, logger=None, **kwargs):
"""
Render Jinja2 template. Logs errors, and raises an exception when it's not possible
to correctly render the template.
:param template: Template to render. It can be either :py:class:`jinja2.environment.Template` instance,
or a string.
:param dict kwargs: Keyword arguments passed to render process.
:rtype: str
:returns: Rendered template.
:raises gluetool.glue.GlueError: when the rednering failed.
"""
logger = logger or Logging.get_logger()
try:
def _render(template):
log_blob(logger.debug, 'rendering template', template.source)
log_dict(logger.verbose, 'context', kwargs)
return str(template.render(**kwargs).strip())
if isinstance(template, (str, unicode)):
source, template = template, jinja2.Template(template)
template.source = source
return _render(template)
if isinstance(template, jinja2.environment.Template):
if template.filename != '<template>':
with open(template.filename, 'r') as f:
template.source = f.read()
else:
template.source = '<unknown template source>'
return _render(template)
raise GlueError('Unhandled template type {}'.format(type(template)))
except Exception as exc:
raise GlueError('Cannot render template: {}'.format(exc))
# pylint: disable=invalid-name
[docs]def YAML():
"""
Provides YAML read/write interface with common settings.
:rtype: ruamel.yaml.YAML
"""
yaml = ruamel.yaml.YAML()
yaml.indent(sequence=4, mapping=4, offset=2)
return yaml
[docs]def from_yaml(yaml_string):
"""
Convert YAML in a string into Python data structures.
Uses internal YAML parser to produce result. Paired with :py:func:`load_yaml` and their
JSON siblings to provide unified access to JSON and YAML.
"""
return YAML().load(yaml_string)
[docs]def load_yaml(filepath, logger=None):
"""
Load data stored in YAML file, and return their Python representation.
:param str filepath: Path to a file. ``~`` or ``~<username>`` are expanded before using.
:param gluetool.log.ContextLogger logger: Logger used for logging.
:rtype: object
:returns: structures representing data in the file.
:raises gluetool.glue.GlueError: if it was not possible to successfully load content of the file.
"""
if not filepath:
raise GlueError('File path is not valid: {}'.format(filepath))
logger = logger or Logging.get_logger()
real_filepath = normalize_path(filepath)
logger.debug("attempt to load YAML from '{}' (maps to '{}')".format(filepath, real_filepath))
if not os.path.exists(real_filepath):
raise GlueError("File '{}' does not exist".format(filepath))
try:
with open(real_filepath, 'r') as f:
data = YAML().load(f)
logger.debug("loaded YAML data from '{}':\n{}".format(filepath, format_dict(data)))
return data
except ruamel.yaml.YAMLError as e:
raise GlueError("Unable to load YAML file '{}': {}".format(filepath, str(e)))
[docs]def dump_yaml(data, filepath, logger=None):
"""
Save data stored in variable to YAML file.
:param object data: Data to store in YAML file
:param str filepath: Path to an output file.
:raises gluetool.glue.GlueError: if it was not possible to successfully save data to file.
"""
if not filepath:
raise GlueError("File path is not valid: '{}'".format(filepath))
logger = logger or Logging.get_logger()
real_filepath = normalize_path(filepath)
dirpath = os.path.dirname(real_filepath)
if not os.path.exists(dirpath):
raise GlueError("Cannot save file in nonexistent directory '{}'".format(dirpath))
try:
with open(real_filepath, 'w') as f:
YAML().dump(data, f)
f.flush()
except ruamel.yaml.YAMLError as e:
raise GlueError("Unable to save YAML file '{}': {}".format(filepath, str(e)))
[docs]def _json_byteify(data, ignore_dicts=False):
# if this is a unicode string, return its string representation
if isinstance(data, unicode):
return data.encode('utf-8')
# if this is a list of values, return list of byteified values
if isinstance(data, list):
return [_json_byteify(item, ignore_dicts=True) for item in data]
# if this is a dictionary, return dictionary of byteified keys and values
# but only if we haven't already byteified it
if isinstance(data, dict) and not ignore_dicts:
return {
_json_byteify(key, ignore_dicts=True): _json_byteify(value, ignore_dicts=True)
for key, value in data.iteritems()
}
# if it's anything else, return it in its original form
return data
[docs]def from_json(json_string):
"""
Convert JSON in a string into Python data structures.
Similar to :py:func:`json.loads` but uses special object hook to avoid unicode strings
in the output..
"""
return _json_byteify(json.loads(json_string, object_hook=_json_byteify), ignore_dicts=True)
[docs]def load_json(filepath, logger=None):
"""
Load data stored in JSON file, and return their Python representation.
:param str filepath: Path to a file. ``~`` or ``~<username>`` are expanded before using.
:param gluetool.log.ContextLogger logger: Logger used for logging.
:rtype: object
:returns: structures representing data in the file.
:raises gluetool.glue.GlueError: if it was not possible to successfully load content of the file.
"""
if not filepath:
raise GlueError('File path is not valid: {}'.format(filepath))
logger = logger or Logging.get_logger()
real_filepath = normalize_path(filepath)
logger.debug("attempt to load JSON from '{}' (maps to '{}')".format(filepath, real_filepath))
if not os.path.exists(real_filepath):
raise GlueError("File '{}' does not exist".format(filepath))
try:
with open(real_filepath, 'r') as f:
data = _json_byteify(json.load(f, object_hook=_json_byteify), ignore_dicts=True)
logger.debug("loaded JSON data from '{}':\n{}".format(filepath, format_dict(data)))
return data
except Exception as exc:
raise GlueError("Unable to load JSON file '{}': {}".format(filepath, str(exc)))
[docs]def _load_yaml_variables(data, enabled=True, logger=None):
"""
Load all variables from files referenced by a YAML, and return function to render a string
as a template using these variables. The files containing variables are mentioned in comments,
in a form ``# !include <filepath>`` form.
:param data: data loaded from a YAML file.
:param bool enabled: when set to ``False``, variables are not loaded and a simple no-op
function is returned.
:param gluetool.log.ContextLogger logger: Logger used for logging.
:returns: Function accepting a string and returning a rendered template.
"""
logger = logger or Logging.get_logger()
def _render_template_nop(s):
return s
if not enabled:
return _render_template_nop
# Our YAML reader preserves comments, they are accessible via `ca` attribute of the
# YAML data structure (which behaves like a dict or list, but it has additional
# powers).
if not hasattr(data, 'ca') or not hasattr(data.ca, 'comment') or len(data.ca.comment) <= 1:
logger.debug('when looking for !import directives, no comments found in YAML data')
return _render_template_nop
# Ok, so this YAML data contains comments. Check their values to find `!include` directives.
# Load referenced files and merged them into a single context.
context = {}
for comment in data.ca.comment[1]:
value = comment.value.strip()
if not value.startswith('# !include'):
continue
try:
variables_map_path = shlex.split(value[2:])[1]
except Exception as exc:
raise GlueError("Cannot extract filename to include from '{}': {}".format(value, exc))
logger.debug("loading variables from '{}'".format(variables_map_path))
context.update(load_yaml(variables_map_path, logger=logger))
def _render_template(s):
if isinstance(s, str):
return render_template(s, logger=logger, **context)
if isinstance(s, list):
return [
render_template(t, logger=logger, **context)
for t in s
]
raise GlueError("Don't know how to render object of type {}".format(type(s)))
return _render_template
[docs]class SimplePatternMap(object):
# pylint: disable=too-few-public-methods
"""
`Pattern map` is a list of ``<pattern>``: ``<result>`` pairs. ``Pattern`` is a
regular expression used to match a string, ``result`` is what the matching
string maps to.
Basically an ordered dictionary with regexp matching of keys, backed by an YAML file.
:param str filepath: Path to a YAML file with map definition.
:param gluetool.log.ContextLogger logger: Logger used for logging.
:param bool allow_variables: if set, both patterns and converters are first treated as templates,
and as such are rendered before doing anything else. Map may contain special comments,
``# !include <path>``, where path refers to a YAML file providing the necessary variables.
"""
def __init__(self, filepath, logger=None, allow_variables=False):
self.logger = logger or Logging.get_logger()
logger.connect(self)
pattern_map = load_yaml(filepath, logger=self.logger)
if pattern_map is None:
raise GlueError("pattern map '{}' does not contain any patterns".format(filepath))
_render_template = _load_yaml_variables(pattern_map, enabled=allow_variables, logger=self.logger)
self._compiled_map = []
for pattern_dict in pattern_map:
if not isinstance(pattern_dict, dict):
raise GlueError("Invalid format: '- <pattern>: <result>' expected, '{}' found".format(pattern_dict))
pattern = pattern_dict.keys()[0]
result = pattern_dict[pattern].strip()
# Apply variables if requested.
pattern = _render_template(pattern)
result = _render_template(result)
log_dict(logger.debug, "rendered mapping '{}'".format(pattern), result)
try:
pattern = re.compile(pattern)
except re.error as exc:
raise GlueError("Pattern '{}' is not valid: {}".format(pattern, str(exc)))
self._compiled_map.append((pattern, result))
[docs] def match(self, s):
"""
Try to match ``s`` by the map. If the match is found - the first one wins - then its
transformation is applied to the ``s``.
:rtype: str
:returns: if matched, output of the corresponding transformation.
"""
self.debug("trying to match string '{}' with patterns in the map".format(s))
for pattern, result in self._compiled_map:
self.debug("testing pattern '{}'".format(pattern.pattern))
match = pattern.match(s)
if match is None:
continue
self.debug(' matched!')
return result
raise GlueError("Could not match string '{}' with any pattern".format(s))
[docs]class PatternMap(object):
# pylint: disable=too-few-public-methods
"""
`Pattern map` is a list of ``<pattern>``: ``<converter>`` pairs. ``Pattern`` is a
regular expression used to match a string, ``converter`` is a function that transforms
a string into another one, accepting the pattern and the string as arguments.
It is defined in a YAML file:
.. code-block:: yaml
---
- 'foo-(\\d+)': 'bar-\\1'
- 'baz-(\\d+)': 'baz, find_the_most_recent, append_dot'
- 'bar-(\\d+)':
- 'bar, find_the_most_recent, append_dot'
- 'bar, find_the_oldest, append_dot'
Patterns are the keys in each pair, while ``converter`` is a string (or list of strings),
consisting of multiple items, separated by comma. The first item is **always** a string,
let's call it ``R``. ``R``, given input string ``S1`` and the pattern, is used to transform
``S1`` to a new string, ``S2``, by calling ``pattern.sub(R, S1)``. ``R`` can make use of anything
:py:meth:`re.sub` supports, including capturing groups.
If there are other items in the ``converter`` string, they are names of `spices`, additional
functions that will be called with ``pattern`` and the output of the previous spicing function,
starting with ``S2`` in the case of the first `spice`.
To allow spicing, user of ``PatternMap`` class must provide `spice makers` - mapping between
`spice` names and functions that generate spicing functions. E.g.:
.. code-block:: python
def create_spice_append_dot(previous_spice):
def _spice(pattern, s):
s = previous_spice(pattern, s)
return s + '.'
return _spice
``create_spice_append_dot`` is a `spice maker`, used during creation of a pattern map after
its definition is read, ``_spice`` is the actual spicing function used during the transformation
process.
There can be multiple converters for a single pattern, resulting in multiple values returned
when the input string matches the corresponding pattern.
:param str filepath: Path to a YAML file with map definition.
:param dict spices: apping between `spices` and their `makers`.
:param gluetool.log.ContextLogger logger: Logger used for logging.
:param bool allow_variables: if set, both patterns and converters are first treated as templates,
and as such are rendered before doing anything else. Map may contain special comments,
``# !include <path>``, where path refers to a YAML file providing the necessary variables.
"""
def __init__(self, filepath, spices=None, logger=None, allow_variables=False):
self.logger = logger or Logging.get_logger()
logger.connect(self)
spices = spices or {}
pattern_map = load_yaml(filepath, logger=self.logger)
if pattern_map is None:
raise GlueError("pattern map '{}' does not contain any patterns".format(filepath))
_render_template = _load_yaml_variables(pattern_map, enabled=allow_variables, logger=self.logger)
def _create_simple_repl(repl):
def _replace(pattern, target):
"""
Use `repl` to construct image from `target`, honoring all backreferences made by `pattern`.
"""
self.debug("pattern '{}', repl '{}', target '{}'".format(pattern.pattern, repl, target))
try:
return pattern.sub(repl, target)
except re.error as e:
raise GlueError("Cannot transform pattern '{}' with target '{}', repl '{}': {}".format(
pattern.pattern, target, repl, str(e)))
return _replace
self._compiled_map = []
for pattern_dict in pattern_map:
log_dict(logger.debug, 'pattern dict', pattern_dict)
if not isinstance(pattern_dict, dict):
raise GlueError("Invalid format: '- <pattern>: <transform>' expected, '{}' found".format(pattern_dict))
# There is always just a single key, the pattern.
pattern_key = pattern_dict.keys()[0]
# Apply variables if requested.
pattern = _render_template(pattern_key)
converter_chains = _render_template(pattern_dict[pattern_key])
log_dict(logger.debug, "rendered mapping '{}'".format(pattern), converter_chains)
if isinstance(converter_chains, str):
converter_chains = [converter_chains]
try:
pattern = re.compile(pattern)
except re.error as e:
raise GlueError("Pattern '{}' is not valid: {}".format(pattern, str(e)))
compiled_chains = []
for chain in converter_chains:
converters = [s.strip() for s in chain.split(',')]
# first item in `converters` is always a simple string used by `pattern.sub()` call
converter = _create_simple_repl(converters.pop(0))
# if there any any items left, they name "spices" to apply, one by one,
# on the result of the first operation
for spice in converters:
if spice not in spices:
raise GlueError("Unknown 'spice' function '{}'".format(spice))
converter = spices[spice](converter)
compiled_chains.append(converter)
self._compiled_map.append((pattern, compiled_chains))
[docs] def match(self, s, multiple=False):
"""
Try to match ``s`` by the map. If the match is found - the first one wins - then its
conversions are applied to the ``s``.
There can be multiple conversions for a pattern, by default only the product of
the first one is returned. If ``multiple`` is set to ``True``, list of all products
is returned instead.
:rtype: str
:returns: if matched, output of the corresponding transformation.
"""
self.debug("trying to match string '{}' with patterns in the map".format(s))
for pattern, converters in self._compiled_map:
self.debug("testing pattern '{}'".format(pattern.pattern))
match = pattern.match(s)
if match is None:
continue
self.debug(' matched!')
if multiple is not True:
return converters[0](pattern, s)
return [
converter(pattern, s) for converter in converters
]
raise GlueError("Could not match string '{}' with any pattern".format(s))
[docs]def wait(label, check, timeout=None, tick=30, logger=None):
"""
Wait for a condition to be true.
:param str label: printable label used for logging.
:param callable check: called to test the condition. If its return value evaluates as ``True``,
the condition is assumed to pass the test and waiting ends.
:param int timeout: fail after this many seconds. ``None`` means test forever.
:param int tick: test condition every ``tick`` seconds.
:param gluetool.log.ContextAdapter logger: parent logger whose methods will be used for logging.
:raises gluetool.glue.GlueError: when ``timeout`` elapses while condition did not pass the check.
"""
if not isinstance(tick, int):
raise GlueError('Tick must be an integer')
if tick < 0:
raise GlueError('Tick must be a positive integer')
logger = logger or Logging.get_logger()
if timeout is not None:
end_time = time.time() + timeout
def _timeout():
return '{} seconds'.format(int(end_time - time.time())) if timeout is not None else 'infinite'
logger.debug("waiting for condition '{}', timeout {}, check every {} seconds".format(label, _timeout(),
tick))
while timeout is None or time.time() < end_time:
logger.debug("calling callback function")
ret = check()
if ret:
logger.debug('check passed, assuming success')
return ret
logger.debug("check failed with {}, assuming failure".format(ret))
logger.debug('{} left, sleeping for {} seconds'.format(_timeout(), tick))
time.sleep(tick)
raise GlueError("Condition '{}' failed to pass within given time".format(label))
[docs]def new_xml_element(tag_name, _parent=None, **attrs):
"""
Create new XML element.
:param str tag_name: Name of the element.
:param element _parent: If set, the newly created element will be appended to this element.
:param dict attrs: Attributes to set on the newly created element.
:returns: Newly created XML element.
"""
element = bs4.BeautifulSoup('', 'xml').new_tag(tag_name)
for name, value in attrs.iteritems():
element[name] = value
if _parent is not None:
_parent.append(element)
return element