Source code for gluetool.glue

# pylint: disable=too-many-lines

import argparse
import collections
import ast
import enum
import imp
import inspect
import logging
import os
import sys
import warnings

from functools import partial

from six import iterkeys, itervalues, iteritems, ensure_str
from six.moves import configparser

import jinja2
import mock
import pkg_resources

from .action import Action
from .color import Colors, switch as switch_colors
from .help import LineWrapRawTextHelpFormatter, option_help, docstring_to_help, trim_docstring, eval_context_help
from .log import Logging, LoggerMixin, ContextAdapter, ModuleAdapter, log_dict, VERBOSE

# Type annotations
# pylint: disable=unused-import,wrong-import-order
from typing import TYPE_CHECKING, cast, overload, Any, Callable, Dict, Iterable, List, Optional, NoReturn  # noqa
from typing import Sequence, Tuple, Type, Union, NamedTuple  # noqa
from types import TracebackType  # noqa
from .log import LoggingFunctionType, ExceptionInfoType  # noqa

if TYPE_CHECKING:
    import gluetool.color  # noqa
    # pylint: disable=cyclic-import
    import gluetool.utils  # noqa

# Type definitions
# pylint: disable=invalid-name
SharedType = Callable[..., Any]


DEFAULT_MODULE_CONFIG_PATHS = [
    '/etc/gluetool.d/config',
    os.path.expanduser('~/.gluetool.d/config'),
    os.path.abspath('./.gluetool.d/config')
]

DEFAULT_DATA_PATH = '{}/data'.format(os.path.dirname(os.path.abspath(__file__)))

DEFAULT_MODULE_ENTRY_POINTS = [
    'gluetool.modules'
]  # type: List[str]

DEFAULT_MODULE_PATHS = [
    '{}/gluetool_modules'.format(sys.prefix)
]  # type: List[str]


# Install workarounds from Six - this makes templates compatible with both Python 2 and 3 when it comes
# to iterating over dictionaries.
jinja2.defaults.DEFAULT_NAMESPACE.update({
    'iteritems': iteritems,
    'iterkeys': iterkeys,
    'itervalues': itervalues
})


#
# NOTE: pipelines and shared functions.
#
# Each `Glue` instance manages one or more pipelines. The main one, requested by the user, can spawn additional
# "side-car" pipelines, and these can spawn their own children, and so on. Only a single pipeline is being processed
# at the moment - when a new pipeline is spawned, its parent gives up its time for its kid, and is awoken when the
# kid finishes. `Glue` keeps a stack of pipelines as their appear and disappear. There is always a "current pipeline",
# which has a "current" module, the one being currently executed.
#
# Shared functions are managed by *pipelines* - each pipeline takes care of shared functions exported by its modules.
# `Glue` instance, shared by these pipelines, then allows modules to access all shared functions from all other
# existing pipelines: when a module wished to call a shared function, first the current pipeline (the one running
# the module) is inspected, then its parent, after that parent's parent and so on. This is ensured by `Module`
# API "secretly" calling `Glue` methods, which take care of checking all the layers.
#


[docs]class DryRunLevels(enum.IntEnum): """ Dry-run levels. :cvar int DEFAULT: Default level - everything is allowed. :cvar int DRY: Well-known "dry-run" - no changes to the outside world are allowed. :cvar int ISOLATED: No interaction with the outside world is allowed (networks connections, reading files, etc.) """ DEFAULT = 0 DRY = 1 ISOLATED = 2
[docs]class GlueError(Exception): """ Generic ``gluetool`` exception. :param str message: Exception message, describing what happened. :param tuple caused_by: If set, contains tuple as returned by :py:func:`sys.exc_info`, describing the exception that caused this one to be born. If not set, constructor will try to auto-detect this information, and if there's no such information, instance property ``caused_by`` will be set to ``None``. :param list(str) sentry_fingerprint: if set, it is used as a Sentry fingerprint of the exception. See :py:meth:`sentry_fingerprint` for more details. :param dict(str, str) sentry_tags: if set, it is merged with other tags when submitting the exception. See :py:meth:`sentry_tags` for more details. :ivar str message: Exception message, describing what happened. :ivar tuple caused_by: If set, contains tuple as returned by :py:func:`sys.exc_info`, describing the exception that caused this one to be born. ``None`` otherwise. """ no_sentry_exceptions = [] # type: List[str] def __init__(self, message, caused_by=None, sentry_fingerprint=None, sentry_tags=None, **kwargs): # type: (str, Optional[ExceptionInfoType], Optional[List[str]], Optional[Dict[str, str]], **Any) -> None super(GlueError, self).__init__(message, **kwargs) # type: ignore # too many arguments but it's fine self.message = message self._sentry_fingerprint = sentry_fingerprint self._sentry_tags = sentry_tags # if not told explicitly, try to detect the cause if caused_by is None: caused_by = sys.exc_info() # if there's no cause, use None to signal that fact to the rest of the world if caused_by == (None, None, None): caused_by = None self.caused_by = caused_by @property def submit_to_sentry(self): # type: () -> bool # pylint: disable=no-self-use """ Decide whether the exception should be submitted to Sentry or not. By default, all exceptions are submitted. Exception listed in `no_sentry_exceptions` are not submitted. :rtype: bool :returns: ``True`` when the exception should be submitted to Sentry, ``False`` otherwise. """ exception_name = '.'.join([type(self).__module__, type(self).__name__]) if exception_name in self.no_sentry_exceptions: return False return True
[docs] def sentry_fingerprint(self, current): # type: (List[str]) -> List[str] # pylint: disable=no-self-use """ Default grouping of events into issues might be too general for some cases. This method gives users a chance to provide custom fingerprint Sentry could use to group events in a more suitable way. E.g. user might be interested in some sort of connection issues but they would like to have them grouped not by a traceback (which is the default method) but per remote host IP. For that, the Sentry integration code will call ``sentry_fingerprint`` method of a raised exception, and the method should return new fingerprint, let's say ``[<exception class name>, <remote IP>]``, and Sentry will group events using this fingerprint. If the exception was raised with ``sentry_fingerprint`` parameter set, it is returned instead of ``current``, after prefixing the list of tags with a name of the exception's class. :param list(str) current: current fingerprint. Usually ``['{{ default }}']`` telling Sentry to use its default method, but it could already be more specific. :rtype: list(str) :returns: new fingerprint, e.g. ``['FailedToConnectToAPI', '10.20.30.40']`` """ if self._sentry_fingerprint: return [ self.__class__.__name__ ] + self._sentry_fingerprint return current
[docs] def sentry_tags(self, current): # type: (Dict[str, str]) -> Dict[str, str] # pylint: disable=no-self-use """ Add, modify or remove tags attached to a Sentry event, reported when the exception was raised. Most common usage would be an addition of tags, e.g. ``remote-host`` to allow search for events related to the same remote address. If the exception was raised with ``sentry_tags`` parameter set, its value is injected to ``current`` before returning it. :param dict(str, str) current: current set of tags and their values. :rtype: dict(str, str) :returns: new set of tags. It is possible to add tags directly into ``current`` and then return it. """ if self._sentry_tags: current.update(self._sentry_tags) return current
[docs]class SoftGlueError(GlueError): """ **Soft** errors are errors Glue Ops and/or developers shouldn't be bothered with, things that are up to the user to fix, e.g. empty set of tests. **Hard** errors are supposed to warn Ops/Devel teams about important infrastructure issues, code deficiencies, bugs and other issues that are fixable only by actions of Glue staff. However, we still must provide notification to user(s), and since we expect them to fix the issues that led to raising the soft error, we must provide them with as much information as possible. Therefore modules dealing with notifications are expected to give these exceptions a chance to influence the outgoing messages, e.g. by letting them provide an e-mail body template. """
[docs]class GlueRetryError(GlueError): """ Retry gluetool exception """
[docs]class GlueCommandError(GlueError): """ Exception raised when external command failes. :param list cmd: Command as passed to gluetool.utils.run_command helper. :param gluetool.utils.ProcessOutput output: Process output data. :ivar list cmd: Command as passed to gluetool.utils.run_command helper. :ivar gluetool.utils.ProcessOutput output: Process output data. """ def __init__(self, cmd, output, **kwargs): # type: (List[str], gluetool.utils.ProcessOutput, **Any) -> None super(GlueCommandError, self).__init__("Command '{}' failed with exit code {}".format(cmd, output.exit_code), **kwargs) self.cmd = cmd self.output = output
[docs]class Failure(object): # pylint: disable=too-few-public-methods """ Bundles exception related info. Used to inform modules in their ``destroy()`` phase that ``gluetool`` session was killed because of exception raised by one of modules. :param gluetool.glue.Module module: module in which the error happened, or ``None``. :param tuple exc_info: Exception information as returned by :py:func:`sys.exc_info`. :ivar gluetool.glue.Module module: module in which the error happened, or ``None``. :ivar Exception exception: Shortcut to ``exc_info[1]``, if available, or ``None``. :ivar tuple exc_info: Exception information as returned by :py:func:`sys.exc_info`. :ivar str sentry_event_id: If set, the failure was reported to the Sentry under this ID. """ def __init__(self, module, exc_info): # type: (Optional[Module], ExceptionInfoType) -> None self.module = module self.exc_info = exc_info self.sentry_event_id = None # type: Optional[str] self.sentry_event_url = None # type: Optional[str] if exc_info: self.exception = exc_info[1] self.soft = isinstance(self.exception, SoftGlueError) else: self.exception = None self.soft = False
[docs]def retry(*args): # type: (*Any) -> Any """ Retry decorator This decorator catches given exceptions and returns libRetryError exception instead. usage: @retry(exception1, exception2, ..) """ def wrap(func): # type: (Any) -> Any def func_wrapper(obj, *fargs, **fkwargs): # type: (Any, *Any, **Any) -> Any try: func(obj, *fargs, **fkwargs) except args as e: if isinstance(e, GlueError): raise GlueRetryError(e.value) # type: ignore else: raise GlueRetryError(e) return func_wrapper return wrap
[docs]class PipelineStep(object): # pylint: disable=too-few-public-methods """ Step of ``gluetool``'s pipeline - which is basically just a list of steps. """
[docs] def to_module(self, glue): # type: (Glue) -> Any raise NotImplementedError()
[docs]class PipelineStepModule(PipelineStep): # pylint: disable=too-few-public-methods """ Step of ``gluetool``'s pipeline backed by a module. :param str module: name to give to the module instance. This name is used e.g. in logging or when searching for module's config file. :param str actual_module: The actual module class the step uses. Usually it is same as ``module`` but may differ, ``module`` is then a mere "alias". ``actual_module`` is used to locate a module class, whose instance is then given name ``module``. :param list(str) argv: list of options to be given to the module, in a form similar to :py:data:`sys.argv`. """ def __init__(self, module, actual_module=None, argv=None): # type: (str, Optional[str], Optional[List[str]]) -> None self.module = module self.actual_module = actual_module or module self.argv = argv or [] def __repr__(self): # type: () -> str return "PipelineStepModule('{}', actual_module='{}', argv={})".format( self.module, self.actual_module, self.argv )
[docs] def to_module(self, glue): # type: (Glue) -> Module return glue.init_module(self.module, actual_module_name=self.actual_module)
@property def module_designation(self): # type: () -> str return self.module if self.module == self.actual_module else '{}:{}'.format(self.module, self.actual_module)
[docs] def serialize_to_json(self): # type: () -> Dict[str, Any] return { field: getattr(self, field) for field in ('module', 'actual_module', 'argv') }
@classmethod
[docs] def unserialize_from_json(cls, serialized): # type: (Dict[str, Any]) -> PipelineStepModule return PipelineStepModule( serialized['module'], actual_module=serialized['actual_module'], argv=serialized['argv'] )
[docs]class PipelineStepCallback(PipelineStep): # pylint: disable=too-few-public-methods """ Step of ``gluetool``'s pipeline backed by callable. :param str name: name to give to the module instance. This name is used e.g. in logging or when searching for module's config file. :param callable callback: a callable to execute. """ def __init__(self, name, callback, *args, **kwargs): # type: (str, Callable[..., None], *Any, **Any) -> None self.name = name self.callback = callback self.args = self.argv = args # `argv` to match module-based pipelines self.kwargs = kwargs def __repr__(self): # type: () -> str return "PipelineStepCallback('{}', {})".format(self.name, self.callback)
[docs] def to_module(self, glue): # type: (Glue) -> CallbackModule return CallbackModule(self.name, glue, self.callback)
[docs] def serialize_to_json(self): # type: () -> Dict[str, Any] return { field: getattr(self, field) for field in ('name', 'callback') }
@classmethod
[docs] def unserialize_from_json(cls, serialized): # type: (Dict[str, Any]) -> NoReturn # pylint: disable=unused-argument raise GlueError('Cannot unserialize callback pipeline step')
#: Type of pipeline steps. # pylint: disable=invalid-name PipelineStepsType = Sequence[PipelineStep] #: Return type of a pipeline. # pylint: disable=invalid-name PipelineReturnType = Tuple[Optional[Failure], Optional[Failure]]
[docs]class PipelineAdapter(ContextAdapter): """ Custom logger adapter, adding pipeline name as a context. :param logging.Logger logger: parent logger this adapter modifies. """ def __init__(self, logger, pipeline_name): # type: (ContextAdapter, str) -> None super(PipelineAdapter, self).__init__(logger, {'ctx_pipeline_name': (5, pipeline_name)})
[docs]class Pipeline(LoggerMixin, object): """ Pipeline of ``gluetool`` modules. Defined by its steps, takes care of registering their shared functions, running modules and destroying the pipeline. To simplify the workflow, 2 primitives are defined: * :py:meth:`_safe_call` - calls a given callback, returns its return value. Any exception raised by the callback is wrapped by :py:class:`Failure` instance and returned instead of what callback would return. * :py:meth:`_for_each_module` - loop over given list of modules, calling given callback for each of the modules. :py:meth:`_safe_call` is used for the call, making sure we always have a return value and no exceptions. Coupled with the following rules, things clear up a bit: * Callbacks are allowed to return either ``None`` or ``Failure`` instance. * This rule matches ``_safe_call`` behavior. There's no need to worry about the return values, return value of a callback can be immediately passed through ``_safe_call``. * There are no other possible return values - return value is **always** either *nothing* or *failure*. * There are no "naked" exceptions, all are catched by ``_safe_call`` and converted to a common return value. * Users of ``_safe_call`` and ``_for_each_module`` also return either ``None`` or ``Failure`` instance, therefore it is very easy to end method by a ``return self._for_each_method(...)`` call - return types of callback, ``_safe_call``, ``_for_each_module`` and our method match, no need to translate them between these method. :param Glue glue: :py:class:`Glue` instance, taking care of this pipeline. :param list(PipelineStep) steps: modules to run and their options. :ivar list(Module) modules: list of instantiated modules forming the pipeline. :ivar Module current_module: if set, it is the module which is currently being executed. """ def __init__(self, glue, steps, logger=None): # type: (Glue, PipelineStepsType, Optional[ContextAdapter]) -> None logger = logger or glue.logger super(Pipeline, self).__init__(logger) self.glue = glue self.steps = steps # Materialized pipeline self.modules = [] # type: List[Module] # Current module (if applicable) self.current_module = None # type: Optional[Module] #: Shared function registry. #: funcname: (module, fn) self.shared_functions = {} # type: Dict[str, Tuple[Configurable, SharedType]] # Action wrapping runtime of the pipeline. Initialized when pipeline begins running, all following # actions (e.g. executing modules) are children of this action. self.action = None # type: Optional[Action]
[docs] def _add_shared(self, funcname, module, func): # type: (str, Configurable, SharedType) -> None """ Add a shared function. Overwrites previously registered shared function of the same name. Private part of API, for easier testing. :param str funcname: name of the shared function. :param Module module: module providing the shared function. :param callable func: the shared function. """ self.debug("registering shared function '{}' of module '{}'".format(funcname, module.unique_name)) self.shared_functions[funcname] = (module, func)
[docs] def add_shared(self, funcname, module): # type: (str, Module) -> None """ Add a shared function. Overwrites previously registered shared function of the same name. :param str funcname: name of the shared function. :param Module module: module providing the shared function. """ if not hasattr(module, funcname): raise GlueError("No such shared function '{}' of module '{}'".format(funcname, module.name)) self._add_shared(funcname, module, getattr(module, funcname))
[docs] def has_shared(self, funcname): # type: (str) -> bool """ Check whether a shared function of a given name exists. :param str funcname: name of the shared function. :rtype: bool """ return funcname in self.shared_functions
[docs] def get_shared(self, funcname): # type: (str) -> Optional[SharedType] """ Return a shared function. :param str funcname: name of the shared function. :returns: a callable (shared function), or ``None`` if no such shared function exists. """ if not self.has_shared(funcname): return None return self.shared_functions[funcname][1]
[docs] def _safe_call(self, callback, *args, **kwargs): # type: (Callable[..., Optional[Failure]], *Any, **Any) -> Optional[Failure] """ "Safe" call a function with given arguments, converting raised exceptions to :py:class:`Failure` instance. :param callable callback: callable to call. Must return either ``None`` or :py:class:`Failure` instance, although it can freely raise exceptions. :returns: value returned by ``callback``, or :py:class:`Failure` instance wrapping exception raise by ``callback``. """ try: return callback(*args, **kwargs) # pylint: disable=broad-except except Exception: return Failure(module=self.current_module, exc_info=sys.exc_info())
[docs] def _for_each_module(self, modules, callback, *args, **kwargs): # type: (Iterable[Module], Callable[..., Optional[Failure]], *Any, **Any) -> Optional[Failure] """ For each module in a list, call a given function with module as its first argument. If the call returns anything but ``None``, the value is returned by this function as well, ending the loop. :param list(Module) modules: list of modules to iterate over. :param callable callback: a callback, accepting at least one parameter, current module of the loop. Must return either ``None`` or :py:class:`Failure` instance, although it can freely raise exceptions. :returns: value returned by ``callback``, or ``None`` when loop finished. """ for module in modules: self.current_module = module # Given that we're using `_safe_call`, we shouldn't encounter any exception - `_safe_call` # would convert any exception into `Failure` instance. Callback also cannot return either # `None` or a failure. Therefore, if we got anything `True`-ish, we simply pass it to our # caller since it must be a failure. # # Note: This is enforced by type checks, we have no other power over the callback and its # return type. ret = self._safe_call(callback, module, *args, **kwargs) if ret: return ret return None
[docs] def _log_failure(self, module, failure, label=None): # type: (Module, Failure, Optional[str]) -> None """ Log a failure, and submit it to Sentry. :param Module module: module to use for logging - apparently, the failure appeared when this module was running. :param Failure failure: failure to log. :param str label: label for logging purposes. If it's set and exception exists, exception message is appended. If it's not set and exception exists, exception message is used. If failure has no exception, a generic message is the final choice. """ if label and failure.exception: label = '{}: {}'.format(label, failure.exception) elif failure.exception: label = str(failure.exception) else: label = 'Exception raised' module.error(label, exc_info=failure.exc_info) self.glue.sentry_submit_exception(failure, logger=self.logger)
[docs] def _setup(self): # type: () -> Optional[Failure] # Make a copy of steps - while setting modules up, we won't have access to module index, so we cannot # reach to `self.steps` for its arguments, but we can pop the first item of this list - it's always # the "current" module, the one currently being set up. steps = self.steps[:] for step in steps: module = step.to_module(self.glue) self.modules.append(module) def _do_setup(module): # type: (Module) -> None step = steps.pop(0) # type: ignore # sequence of modules does have `pop`... module.parse_config() if isinstance(step, PipelineStepModule): module.parse_args(step.argv) module.check_dryrun() return self._for_each_module(self.modules, _do_setup)
[docs] def _sanity(self): # type: () -> Optional[Failure] def _do_sanity(module): # type: (Module) -> Optional[Failure] failure = self._safe_call(module.sanity) if failure: self._log_failure(module, failure) return failure failure = self._safe_call(module.check_required_options) if failure: self._log_failure(module, failure) return failure return None return self._for_each_module(self.modules, _do_sanity)
[docs] def _execute(self): # type: () -> Optional[Failure] def _do_execute(module): # type: (Module) -> Optional[Failure] # Safely call module's `execute` method. We get either `None`, which is good, or a `Failure` # instance we can log, submit to Sentry and return to break the loop in `_for_each_module`. # The failure would then be propagated to `run()` method and it would represent the cause # that killed the pipeline. with Action( 'executing module', parent=self.action, logger=module.logger, tags={ 'unique-name': module.unique_name } ): failure = self._safe_call(module.execute) if failure: self._log_failure(module, failure, label='Exception raised') # Always register module's shared functions module.add_shared() return failure return self._for_each_module(self.modules, _do_execute)
[docs] def _destroy(self, failure=None): # type: (Optional[Failure]) -> Optional[Failure] """ "Destroy" the pipeline - call each module's ``destroy`` method, reversing the order of modules. If a ``destroy`` method raises an exception, loop ends and the failure is returned. :param Failure failure: if set, it represents a failure that caused pipeline to stop, which was followed by a call to currently running ``_destroy``. It is passed to modules' ``destroy`` methods. :returns: ``None`` if everything went well, or a :py:class:`Failure` instance if any ``destroy`` method raised an exception. """ if not self.modules: return None self.debug('destroying modules') def _destroy(module): # type: (Module) -> Optional[Failure] # If we simply called module's `destroy` method, possible exception would be logged as any # other exception, but we want to add "while destroying" message, and make sure it's sent # to the Sentry. Therefore, adding `_safe_call` (inside `_destroy` which itself was called via # `_safe_call`), catching and logging the failure. After that, we simply return the "destroy failure" # from `_destroy`, which then causes `_for_each_module` to quit loop immediately, propagating this # destroy failure even further. # We get either `None` or a failure if an exception was raised by `destroy`. If it's a failure, # we can log it with a bit more context. with Action( 'destroying module', parent=self.action, logger=module.logger, tags={ 'unique-name': module.unique_name } ): destroy_failure = self._safe_call(module.destroy, failure=failure) if destroy_failure: self._log_failure(module, destroy_failure, label='Exception raised while destroying module') # Just like in the case of `execute` above, return `destroy_failure` - it is either `None` # or genuine `Failure` instance, representing the cause that killed the destroy stage. return destroy_failure final_failure = self._for_each_module(reversed(self.modules), _destroy) self.current_module = None self.modules = [] return final_failure
[docs] def run(self): # type: () -> PipelineReturnType """ Run a pipeline - instantiate modules, prepare and execute each of them. When done, destroy all modules. :returns: tuple of two items, each of them either ``None`` or a :py:class:`Failure` instance. The first item represents the output of the pipeline, the second item represents the output of the destroy chain. If the item is ``None``, the stage finished without any issues, if it's a ``Failure`` instance, then an exception was raised during the stage, and ``Failure`` wraps it. """ with Action('running pipeline', logger=self.logger) as self.action: log_dict(self.debug, 'running a pipeline', self.steps) # Take a list of modules, and call a helper method for each module of the list. The helper function calls # modules' methods, and these methods may raise exceptions. Should that happen, _safe_call` inside # `_for_each_module` will wrap them with `Failure` instance, collecting the necessary data. # # Here we dispatch loops, wait for them to return, and if a failure got back to us, we stop # running and try to clean things up. # # We always return "output of the forward run" and "output of the destroy run" - these "output" values # are either `None` or `Failure` instances. We don't check too often, all involved methods can accept # these objects and decide what to do with them. failure = self._setup() if failure: return failure, self._destroy(failure=failure) failure = self._sanity() if failure: return failure, self._destroy(failure=failure) failure = self._execute() return failure, self._destroy(failure=failure)
[docs]class NamedPipeline(Pipeline): """ Pipeline with a name. The name is recorded in log messages emitted by the pipeline itself. :param Glue glue: :py:class:`Glue` instance, taking care of this pipeline. :param str name: name of the pipeline. :param list(PipelineStep) steps: modules to run and their options. :ivar list(Module) modules: list of instantiated modules forming the pipeline. :ivar Module current_module: if set, it is the module which is currently being executed. """ def __init__(self, glue, name, steps): # type: (Glue, str, PipelineStepsType) -> None super(NamedPipeline, self).__init__(glue, steps, logger=PipelineAdapter(glue.logger, name)) self.name = name
[docs]class ArgumentParser(argparse.ArgumentParser): """ Pretty much the :py:class:`argparse.ArgumentParser`, it overrides just the :py:meth:`argparse.ArgumentParser.error` method, to catch errors and to wrap them into nice and common :py:class:`GlueError` instances. The original prints (for us) useless message, including the program name, and raises ``SystemExit`` exception. Such action does not provide necessary information when encountered in Sentry, for example. """
[docs] def error(self, message): # type: ignore # incompatible with supertype because of unicode # type: (str) -> None """ Must not return - raising an exception is a good way to "not return". :raises gluetool.glue.GlueError: When argument parser encounters an error. """ raise GlueError('Parsing command-line options failed: {}'.format(message))
[docs]class Configurable(LoggerMixin, object): """ Base class of two main ``gluetool`` classes - :py:class:`gluetool.glue.Glue` and :py:class:`gluetool.glue.Module`. Gives them the ability to use `options`, settable from configuration files and/or command-line arguments. :ivar dict _config: internal configuration store. Values of all options are stored here, regardless of them being set on command-line or by the configuration file. """ options = {} # type: Union[Dict[Any, Any], List[Any]] """ The ``options`` variable defines options accepted by module, and their properties:: options = { <option name>: { <option properties> }, ... } where * ``<option name>`` is used to `name` the option in the parser, and two formats are accepted (don't add any leading dashes (``-`` nor ``--``): * ``<long name>`` * ``tuple(<short name>, <long name #1>, <long name #2>, ...)`` * the first of long names (``long name #1``) is used to identify the option - other long names are understood by argument parser but their values are stored under ``long name #1`` option. * dictionary ``<option properties>`` is passed to :py:meth:`argparse.ArgumentParser.add_argument` as keyword arguments when the option is being added to the parser, therefore any arguments recognized by :py:mod:`argparse` can be used. It is also possible to use groups:: options = [ (<group name>, <group options>), ... ] where * ``<group name>`` is the name of the group, e.g. ``Debugging options`` * ``<group options>`` is the ``dict`` with all group options, as described above. This way, you can split pile of options into conceptualy closer groups of options. A single ``dict`` you would have is split into multiple smaller dictionaries, and each one is coupled with the group name in a ``tuple``. """ required_options = [] # type: Iterable[str] """Iterable of names of required options.""" options_note = None # type: str """If set, it will be printed after all options as a help's epilog.""" supported_dryrun_level = DryRunLevels.DEFAULT """Highest supported level of dry-run.""" name = None # type: str """ Module name. Usually matches the name of the source file, no suffix. """ unique_name = None # type: Optional[str] """ Unque name of this (module) instance. Used by modules, has no meaning elsewhere, but since dry-run checks are done on this level, it must be declared here to make pylint happy :/ """ def __repr__(self): # type: () -> str return '<Module {}:{}:{}>'.format(self.unique_name, self.name, id(self)) @staticmethod
[docs] def _for_each_option(callback, options): # type: (Callable[..., None], Any) -> None """ Given dictionary defining options, call a callback for each of them. :param dict options: Dictionary of options, in a form ``option-name: option-params``. :param callable callback: Must accept at least 3 parameters: option name (``str``), all option names (short and long ones) (``tuple(str, str)``), and option params (``dict``). """ # Sort options by their names - no code has a strong option on their order, so force # one to all users of this helper. option_names = sorted(iterkeys(options), key=lambda x: x[1] if isinstance(x, tuple) else x) for names in option_names: params = options[names] name = names[1] if isinstance(names, tuple) else names callback(name, names, params)
@staticmethod
[docs] def _for_each_option_group(callback, options): # type: (Callable[..., None], Any) -> None """ Given set of options, call a callback for each option group. :param options: List of option groups, or a dict listing options directly. :param callable callback: Must accept at least 2 parameters: ``options`` (``dict``), listing options in the group, and keyword parameter ``group_name`` (``str``), which is set to group name when the ``options`` defines an option group. """ if isinstance(options, dict): callback(options) elif isinstance(options, (list, tuple)): for group in options: if isinstance(group, dict): callback(group) else: group_name, group_options = group callback(group_options, group_name=group_name)
def __init__(self, logger): # type: (ContextAdapter) -> None super(Configurable, self).__init__(logger) # Initialize configuration store self._config = {} # type: Dict[str, Any] # Initialize values in the store, and make sanity check of option names def _fail_name(name): # type: (str) -> None raise GlueError("Option name must be either a string or (<letter>, <string>), '{}' found".format(name)) def _verify_option(name, names, params): # type: (str, List[str], Dict[str, Any]) -> None if isinstance(names, str): self._config[name] = None elif isinstance(names, tuple): if not isinstance(names[0], str) or len(names[0]) != 1: _fail_name(name) if not isinstance(names[1], str) or len(names[1]) < 2: _fail_name(name) self._config[name] = None else: _fail_name(name) if 'help' not in params: return # Long help texts can be written using triple quotes and docstring-like # formatting. Convert every help string to a single line string. params['help'] = option_help(params['help']) def _verify_options(options, **kwargs): # type: (Dict[str, Dict[str, Any]], **Any) -> None # pylint: disable=unused-argument Configurable._for_each_option(_verify_option, options) Configurable._for_each_option_group(_verify_options, self.options)
[docs] def _parse_config(self, paths): # type: (List[str]) -> None """ Parse configuration files. Uses :py:mod:`ConfigParser` for the actual parsing. Updates module's configuration store with values found returned by the parser. :param list paths: List of paths to possible configuration files. """ log_dict(self.debug, 'Loading configuration from following paths', paths) parser = configparser.ConfigParser() parsed_paths = parser.read(paths) log_dict(self.debug, 'Read configuration files', parsed_paths) def _inject_value(name, names, params): # type: (str, Tuple[str, ...], Dict[str, Any]) -> None # pylint: disable=unused-argument try: value = parser.get('default', name) except (configparser.NoOptionError, configparser.NoSectionError): return if 'type' in params: try: value = params['type'](value) except ValueError as exc: raise GlueError( "Value of option '{}' expected to be '{}' but cannot be parsed: '{}'".format( name, params['type'].__name__, str(exc) ) ) self._config[name] = value self.debug("Option '{}' set to '{}' by config file".format(name, value)) # pylint: disable=not-callable def _inject_values(options, **kwargs): # type: (Any, **Any) -> None # pylint: disable=unused-argument Configurable._for_each_option(_inject_value, options) Configurable._for_each_option_group(_inject_values, self.options)
@classmethod
[docs] def _create_args_parser(cls, **kwargs): # type: (**Any) -> ArgumentParser """ Create an argument parser. Used by Sphinx to document "command-line" options of the module - which are, by the way, the module options as well. :param dict kwargs: Additional arguments passed to :py:class:`argparse.ArgumentParser`. """ root_parser = ArgumentParser(**kwargs) def _add_option(parser, name, names, params): # type: (ArgumentParser, str, Tuple[str, ...], Dict[str, Any]) -> None if params.get('raw', False) is True: final_names = (name,) # type: Tuple[str, ...] del params['raw'] else: if isinstance(names, str): final_names = ('--{}'.format(name),) else: final_names = ('-{}'.format(names[0]),) + tuple(['--{}'.format(n) for n in names[1:]]) parser.add_argument(*final_names, **params) def _add_options(group_options, group_name=None): # type: (Any, Optional[str]) -> None group_parser = None # type: Optional[Union[argparse.ArgumentParser, argparse._ArgumentGroup]] if group_name is None: group_parser = root_parser else: group_parser = root_parser.add_argument_group(group_name) assert group_parser is not None Configurable._for_each_option(partial(_add_option, group_parser), group_options) Configurable._for_each_option_group(_add_options, cls.options) return root_parser
[docs] def _parse_args(self, args, **kwargs): # type: (Any, **Any) -> None """ Parse command-line arguments. Uses :py:mod:`argparse` for the actual parsing. Updates module's configuration store with values returned by parser. :param list args: arguments passed to this module. Similar to what :py:data:`sys.argv` provides on program level. """ self.debug('Loading configuration from command-line arguments') # construct the parser parser = self._create_args_parser(**kwargs) # parse the added args args = parser.parse_args(args) # add the parsed args to options def _inject_value(name, names, params): # type: (str, Tuple[str, ...], Dict[str, Any]) -> None # pylint: disable=unused-argument dest = params.get('dest', name.replace('-', '_')) value = getattr(args, dest) # if the option was not specified, skip it if value is None and name in self._config: return # do not replace config options with default command line values if name in self._config and self._config[name] is not None: # if default parameter used if 'default' in params and value == params['default']: return # with action store_true, the default is False if params.get('action', '') == 'store_true' and value is False: return # with action store_false, the default is True if params.get('action', '') == 'store_false' and value is True: return self._config[name] = value self.debug("Option '{}' set to '{}' by command-line".format(name, value)) def _inject_values(options, **kwargs): # type: (Any, **Any) -> None # pylint: disable=unused-argument Configurable._for_each_option(_inject_value, options) Configurable._for_each_option_group(_inject_values, self.options)
[docs] def parse_config(self): # type: () -> None """ Public entry point to configuration parsing. Child classes must implement this method, e.g. by calling :py:meth:`gluetool.glue.Configurable._parse_config` which requires list of paths. """ # E.g. self._parse_config(<list of possible configuration files>) raise NotImplementedError('Implement this method to enable the actual parsing')
[docs] def parse_args(self, args): # type: (List[str]) -> None """ Public entry point to argument parsing. Child classes must implement this method, e.g. by calling :py:meth:`gluetool.glue.Configurable._parse_args` which makes use of additional :py:class:`argparse.ArgumentParser` options. """ # E.g. self._parse_args(args, description=...) raise NotImplementedError('Implement this method to enable the actual parsing')
[docs] def check_required_options(self): # type: () -> None if not self.required_options: self.debug('skipping checking of required options') return for name in self.required_options: if name not in self._config or not self._config[name]: raise GlueError("Missing required '{}' option".format(name))
# `option()` returns two different types based on the number of positional arguments: # # - 1 argument => returns just a single value (Any) # - more than 1 argument => returns a tuple of values # # `mypy` supports typechecking of such function by using @overload decorators, describing # each variant, followed by the actual body of the function. We just need to silence pylint # and flake since we're re-defining the method - these checkes does not understand @overload. # pylint: disable=function-redefined @overload def option(self, name): # type: (str) -> Any pass @overload # noqa def option(self, *names): # type: (*str) -> List[Any] pass
[docs] def option(self, *names): # type: ignore # noqa """ Return values of given options from module's configuration store. :param str names: names of requested options. :returns: either a value or ``None`` if such option does not exist. When multiple options are requested, a tuple of their values is returned, for a single option its value is **not** wrapped by a tuple. """ if not names: raise GlueError('Specify at least one option') values = tuple( self._config.get(name, None) for name in names ) return values[0] if len(values) == 1 else values
@property def dryrun_level(self): # type: () -> int """ Return current dry-run level. This must be implemented by class descendants because each one finds the necessary information in different places. """ raise NotImplementedError() @property def dryrun_enabled(self): # type: () -> bool """ ``True`` if dry-run level is enabled, on any level. """ return self.dryrun_level != DryRunLevels.DEFAULT
[docs] def _dryrun_allows(self, threshold, msg): # type: (int, str) -> bool """ Check whether current dry-run level allows an action. If the current dry-run level is equal of higher than ``threshold``, then the action is not allowed. E.g. when action's ``threshold`` is :py:attr:`DryRunLevels.ISOLATED`, and the current level is :py:attr:`DryRunLevels.DRY`, the action is allowed. :param DryRunLevels threshold: Dry-run level the action is not allowed. :param str msg: Message logged (as a warning) when the action is deemed not allowed. :returns: ``True`` when action is allowed, ``False`` otherwise. """ if self.dryrun_level >= threshold: self.warn('{} is not allowed by current dry-run level'.format(msg)) return False return True
[docs] def dryrun_allows(self, msg): # type: (str) -> bool """ Checks whether current dry-run level allows an action which is disallowed on :py:attr:`DryRunLevels.DRY` level. See :py:meth:`Configurable._dryrun_allows` for detailed description. """ return self._dryrun_allows(DryRunLevels.DRY, msg)
[docs] def isolatedrun_allows(self, msg): # type: (str) -> bool """ Checks whether current dry-run level allows an action which is disallowed on :py:attr:`DryRunLevels.ISOLATED` level. """ return self._dryrun_allows(DryRunLevels.ISOLATED, msg)
[docs] def check_dryrun(self): # type: () -> None """ Checks whether this object supports current dry-run level. """ if not self.dryrun_enabled: return if self.dryrun_level > self.supported_dryrun_level: dryrun_level_name = self.dryrun_level.name # type: ignore # `dryrun_level` is not pure int but Enum raise GlueError("Module '{}' does not support current dry-run level of '{}'".format(self.unique_name, dryrun_level_name))
@property def eval_context(self): # type: () -> Dict[str, Any] """ Return "evaluation context" - a dictionary of variable names (usually in uppercase) and their values, which is supposed to be used in various "evaluate *this*" operations like rendering of templates. To provide nice and readable documentation of variables, returned by a module's ``eval_context`` property, assign a dictionary, describing these variables, to a local variable named ``__content__``: .. code-block:: python ... @property def eval_context(self): __content__ = { 'FOO': 'This is an important variable, extracted from clouds.' } return { 'FOO': 42 } ``gluetool`` core will extract this information and will use it to generate different help texts like your module's help or a list of all known context variables. :rtype: dict """ return {}
[docs]class CallbackModule(mock.MagicMock): # type: ignore # MagicMock has type Any, complains about inheriting from it """ Stand-in replacement for common :py:`Module` instances which does not represent any real module. We need it only to simplify code pipeline code - it can keep working with ``Module``-like instances, since this class mocks each and every method, but calls given ``callback`` in its ``execute`` method. :param str name: name of the pseudo-module. :param Glue glue: ``Glue`` instance governing the pipeline this module is part of. :param callable callback: called in the ``execute`` method. Its arguments will be ``glue``, followed by remaining positional and keyword arguments (``args`` and ``kwargs``). :param tuple args: passed to ``callback``. :param dict kwargs: passed to ``callback``. """ def __init__(self, name, glue, callback, *args, **kwargs): # type: (str, Glue, Callable[..., None], *Any, **Any) -> None super(CallbackModule, self).__init__() self.glue = glue self.name = self.unique_name = name self._callback = callback self._args = args self._kwargs = kwargs
[docs] def _get_child_mock(self, **kw): # type: (**Any) -> mock.MagicMock # The default implementation uses the same class it's member of, i.e. ``CallbackModule``. Too complicated, # we're perfectly fine with child mocks being of ``MagicMock``. return mock.MagicMock(**kw)
[docs] def execute(self): # type: () -> None self._callback(self.glue, *self._args, **self._kwargs)
[docs] def sanity(self): # type: () -> None # pylint: disable-msg=no-self-use return None
[docs] def destroy(self, failure=None): # type: (Optional[Failure]) -> None # pylint: disable-msg=no-self-use,unused-argument return None
[docs] def check_required_options(self): # type: () -> None # pylint: disable-msg=no-self-use return None
[docs]class Module(Configurable): """ Base class of all ``gluetool`` modules. :param gluetool.glue.Glue glue: ``Glue`` instance owning the module. :ivar gluetool.glue.Glue glue: ``Glue`` instance owning the module. :ivar dict _config: internal configuration store. Values of all module options are stored here, regardless of them being set on command-line or in the configuration file. :ivar dict _overloaded_shared_functions: If a shared function added by this module overloades an older function of the same name, registered by a previous module, the overloaded one is added into this dictionary. The module can then call this saved function - using :py:meth:`overloaded_shared` - to implement a "chain" of shared functions, when one calls another, implementing the same operation. """ description = None # type: str """Short module description, displayed in ``gluetool``'s module listing.""" shared_functions = [] # type: List[str] """Iterable of names of shared functions exported by the module."""
[docs] def _paths_with_module(self, roots): # type: (List[str]) -> List[str] """ Return paths cretaed by joining roots with module's unique name. :param list(str) roots: List of root directories. """ assert self.unique_name is not None return [os.path.join(root, self.unique_name) for root in roots]
def __init__(self, glue, name): # type: (Glue, str) -> None # we need to save the unique name in case there are more aliases available self.unique_name = name super(Module, self).__init__(ModuleAdapter(glue.logger, self)) self.glue = glue # initialize data path if exists, else it will be None self.data_path = None for path in self._paths_with_module(self.glue.module_data_paths): if not os.path.exists(path): continue self.data_path = path self.debug('data file is {}'.format(path)) break else: self.debug('no data file found') self._overloaded_shared_functions = {} # type: Dict[str, SharedType] @property def dryrun_level(self): # type: () -> int return self.glue.dryrun_level
[docs] def parse_config(self): # type: () -> None self._parse_config(self._paths_with_module(self.glue.module_config_paths))
[docs] def _generate_shared_functions_help(self): # type: () -> str """ Generate help for shared functions provided by the module. :returns: Formatted help, describing module's shared functions. """ if not self.shared_functions: return '' from .help import functions_help functions = [] for name in self.shared_functions: if not hasattr(self, name): raise GlueError("No such shared function '{}' of module '{}'".format(name, self.unique_name)) functions.append((name, getattr(self, name))) return ensure_str( jinja2.Template( trim_docstring(""" {{ '** Shared functions **' | style(fg='yellow') }} {{ FUNCTIONS }} """) ).render(FUNCTIONS=functions_help(functions)) )
[docs] def parse_args(self, args): # type: (Any) -> None epilog = [ '' if self.options_note is None else docstring_to_help(self.options_note), self._generate_shared_functions_help(), eval_context_help(self) ] self._parse_args(args, usage='{} [options]'.format(Colors.style(self.unique_name, fg='cyan')), description=docstring_to_help(self.__doc__ or ''), epilog='\n'.join(epilog).strip(), formatter_class=LineWrapRawTextHelpFormatter)
[docs] def add_shared(self): # type: () -> None """ Add all shared functions declared by the module. """ for funcname in self.shared_functions: original_shared = self.glue.get_shared(funcname) if original_shared: self._overloaded_shared_functions[funcname] = original_shared self.glue.add_shared(funcname, self)
[docs] def has_shared(self, funcname): # type: (str) -> bool """ Check whether a shared function of a given name exists. :param str funcname: name of the shared function. :rtype: bool """ # A proxy for Glue's `has_shared`, exists to simplify modules. return self.glue.has_shared(funcname)
[docs] def require_shared(self, *names, **kwargs): # type: (*str, **str) -> bool """ Make sure given shared functions exist. :param tuple(str) names: iterable of shared function names. :param bool warn_only: if set, only warning is emitted. Otherwise, when any required shared function didn't exist, an exception is raised. """ # A proxy for Glue's `require_shared`, exists to simplify modules. return self.glue.require_shared(*names, **kwargs)
[docs] def get_shared(self, funcname): # type: (str) -> Optional[SharedType] """ Return a shared function. :param str funcname: name of the shared function. :returns: a callable (shared function), or ``None`` if no such shared function exists. """ # A proxy for Glue's `get_shared`, exists to simplify modules. return self.glue.get_shared(funcname)
[docs] def shared(self, funcname, *args, **kwargs): # type: (str, *Any, **Any) -> Any """ Call a shared function, passing it all positional and keyword arguments. """ # A proxy for Glue's `shared`, exists to simplify modules. return self.glue.shared(funcname, *args, **kwargs)
[docs] def overloaded_shared(self, funcname, *args, **kwargs): # type: (str, *Any, **Any) -> Any """ Call a shared function overloaded by the one provided by this module. This way, a module can give chance to other implementations of the same name, e.g. function named ``publish_message``, working with message bus A, would call previously shared function holding of this name, registered by a module earlier in the pipeline, which works with message bus B. """ # *Not* a proxy for Glue's `overloaded_shared` - Glue core doesn't care about one module overloading # another module's shared function, Glue handles just the whole pipelines. if funcname not in self._overloaded_shared_functions: return None self.debug("calling overloaded shared function '{}'".format(funcname)) return self._overloaded_shared_functions[funcname](*args, **kwargs)
[docs] def sanity(self): # type: () -> None # pylint: disable-msg=no-self-use """ In this method, modules can define additional checks before execution. Some examples: * Advanced checks on passed options * Check for additional requirements (tools, data, etc.) By default, this method does nothing. Reimplement as needed. """
[docs] def execute(self): # type: () -> None # pylint: disable-msg=no-self-use """ In this method, modules can perform any work they deemed necessary for completing their purpose. E.g. if the module promises to run some tests, this is the place where the code belongs to. By default, this method does nothing. Reimplement as needed. """
[docs] def destroy(self, failure=None): # type: (Optional[Failure]) -> None # pylint: disable-msg=no-self-use,unused-argument """ Here should go any code that needs to be run on exit, like job cleanup etc. :param gluetool.glue.Failure failure: if set, carries information about failure that made ``gluetool`` to destroy the whole session. Modules might want to take actions based on provided information, e.g. send different notifications. """
[docs] def run_module(self, module, args=None): # type: (str, Optional[List[str]]) -> None self.glue.run_module(module, args or [])
#: Describes one discovered ``gluetool`` module. #: #: :ivar Module klass: a module class. #: :ivar str group: group the module belongs to. DiscoveredModule = NamedTuple('DiscoveredModule', ( ('klass', Type[Module]), ('group', str) )) #: Module registry type. ModuleRegistryType = Dict[str, DiscoveredModule]
[docs]class Glue(Configurable): # pylint: disable=too-many-public-methods """ Main workhorse of the ``gluetool``. Manages modules, their instances and runs them as requested. :param gluetool.tool.Tool tool: If set, it's an ``gluetool``-like tool that created this instance. Some functionality may need it to gain access to bits like its command-name. :param gluetool.sentry.Sentry sentry: If set, it provides interface to Sentry. """ name = 'gluetool core' options = [ ('Global options', { ('l', 'list-modules'): { 'help': 'List all available modules. If a GROUP is set, limits list to the given module group.', 'action': 'append', 'nargs': '?', 'metavar': 'GROUP', 'const': True }, ('L', 'list-shared'): { 'help': 'List all available shared functions.', 'action': 'store_true', 'default': False }, ('E', 'list-eval-context'): { 'help': 'List all available variables provided by modules in their evaluation contexts.', 'action': 'store_true', 'default': False, }, ('r', 'retries'): { 'help': 'Number of retries', 'type': int, 'default': 0 }, ('V', 'version'): { 'help': 'Print version', 'action': 'store_true' }, 'no-sentry-exceptions': { 'help': 'List of exception names, which are not reported to Sentry (Default: none)', 'action': 'append', 'default': [] } }), ('Output control', { ('c', 'colors'): { 'help': 'Colorize logging on the terminal', 'action': 'store_true' }, ('d', 'debug'): { 'help': 'Log debugging messages to the terminal (WARNING: very verbose!).', 'action': 'store_true' }, ('i', 'info'): { 'help': 'Print command-line that would re-run the gluetool session', 'action': 'store_true' }, ('j', 'json-file'): { 'help': """ If set, all log messages (including ``VERBOSE``) are stored in this file in a form of JSON structures (default: %(default)s). """, 'default': None }, ('o', 'debug-file', 'output'): { 'help': 'Log messages with at least ``DEBUG`` level are sent to this file.' }, 'verbose-file': { 'help': 'Log messages with ``VERBOSE`` level sent to this file.', 'default': None }, ('p', 'pid'): { 'help': 'Log PID of gluetool process', 'action': 'store_true' }, ('q', 'quiet'): { 'help': 'Silence info messages', 'action': 'store_true' }, 'show-traceback': { 'help': """ Display exception tracebacks on terminal (besides the debug file when ``--debug-file`` is used) (default: %(default)s). """, 'action': 'store_true', 'default': False }, ('v', 'verbose'): { 'help': 'Log **all** messages to the terminal (WARNING: even more verbose than ``-d``!).', 'action': 'store_true' } }), ('Directories', { 'module-path': { 'help': 'Specify directory with modules.', 'metavar': 'DIR', 'action': 'append', 'default': [] }, 'module-data-path': { 'help': 'Specify directory with module data files.', 'metavar': 'DIR', 'action': 'append', 'default': [] }, 'module-config-path': { 'help': 'Specify directory with module configuration files.', 'metavar': 'DIR', 'action': 'append', 'default': [] }, 'module-entry-point': { 'help': 'Specify setuptools entry point for modules.', 'metavar': 'ENTRY-POINT', 'action': 'append', 'default': [] } }), ('Dry run options', { 'dry-run': { 'help': 'Modules that support this option will make no changes to the outside world.', 'action': 'store_true', 'default': False }, 'isolated-run': { 'help': 'Modules that support this option will not interact with the outside world.', 'action': 'store_true', 'default': False } }), { 'pipeline': { 'raw': True, 'help': 'List of modules and their options, passed after gluetool options.', 'nargs': argparse.REMAINDER } } ] @property def module_entry_points(self): # type: () -> List[str] """ List of setuptools entry points to which modules are attached. """ from .utils import normalize_multistring_option return normalize_multistring_option(self.option('module-entry-point')) or DEFAULT_MODULE_ENTRY_POINTS @property def module_paths(self): # type: () -> List[str] """ List of paths in which modules reside. """ from .utils import normalize_path_option return normalize_path_option(self.option('module-path')) or DEFAULT_MODULE_PATHS @property def module_data_paths(self): # type: () -> List[str] """ List of paths in which module data files reside. """ from .utils import normalize_path_option return normalize_path_option(self.option('module-data-path')) or [DEFAULT_DATA_PATH] @property def module_config_paths(self): # type: () -> List[str] """ List of paths in which module config files reside. """ from .utils import normalize_path_option return normalize_path_option(self.option('module-config-path')) or DEFAULT_MODULE_CONFIG_PATHS # pylint: disable=method-hidden
[docs] def sentry_submit_exception(self, *args, **kwargs): # type: (*Any, **Any) -> None """ Submits exceptions to the Sentry server. Does nothing by default, unless this instance is initialized with a :py:class:`gluetool.sentry.Sentry` instance which actually does the job. See :py:meth:`gluetool.sentry.Sentry.submit_exception`. """
# pylint: disable=method-hidden
[docs] def sentry_submit_message(self, *args, **kwargs): # type: (*Any, **Any) -> None """ Submits message to the Sentry server. Does nothing by default, unless this instance is initialized with a :py:class:`gluetool.sentry.Sentry` instance which actually does the job. See :py:meth:`gluetool.sentry.Sentry.submit_warning`. """
[docs] def add_shared(self, funcname, module): # type: (str, Module) -> None """ Add a shared function. Overwrites previously registered shared function of the same name. :param str funcname: name of the shared function. :param Module module: module providing the shared function. """ # A proxy for current pipeline's `add-shared.` self.current_pipeline.add_shared(funcname, module)
[docs] def has_shared(self, funcname): # type: (str) -> bool """ Check whether a shared function of a given name exists. :param str funcname: name of the shared function. :rtype: bool """ # Check all running pieplines, start with the most recent one. for pipeline in reversed(self.pipelines): if pipeline.has_shared(funcname): return True return False
[docs] def require_shared(self, *names, **kwargs): # type: (*str, **str) -> bool """ Make sure given shared functions exist. :param tuple(str) names: iterable of shared function names. :param bool warn_only: if set, only warning is emitted. Otherwise, when any required shared function didn't exist, an exception is raised. """ warn_only = kwargs.get('warn_only', False) def _check(name): # type: (str) -> bool if self.has_shared(name): return True # pylint: disable=line-too-long msg = "Shared function '{}' is required. See `gluetool -L` to find out which module provides it.".format(name) # Ignore PEP8Bear if warn_only is True: self.warn(msg, sentry=True) return False raise GlueError(msg) return all([_check(name) for name in names])
[docs] def get_shared(self, funcname): # type: (str) -> Optional[SharedType] """ Return a shared function. :param str funcname: name of the shared function. :returns: a callable (shared function), or ``None`` if no such shared function exists. """ # Check all running pieplines, start with the most recent one. for pipeline in reversed(self.pipelines): if pipeline.has_shared(funcname): return pipeline.get_shared(funcname) return None
[docs] def shared(self, funcname, *args, **kwargs): # type: (str, *Any, **Any) -> Any """ Call a shared function, passing it all positional and keyword arguments. """ func = self.get_shared(funcname) if not func: return None return func(*args, **kwargs)
@property def eval_context(self): # type: () -> Dict[str, Any] """ Returns "global" evaluation context - some variables that are nice to have in all contexts. """ # pylint: disable=unused-variable __content__ = { # noqa 'ENV': 'Dictionary representing environment variables.', 'PIPELINE': 'Current pipeline, represented as a list of ``PipelineStep`` instances.' } return { 'ENV': dict(os.environ), 'PIPELINE': self.current_pipeline.steps }
[docs] def _eval_context_module_caller(self): # type: () -> Optional[Module] """ Infere module instance calling the eval context shared function. :rtype: gluetool.glue.Module """ stack = inspect.stack() log_dict(self.verbose, 'stack', stack) # When being called as a regular shared function, the call stack layout should be # as follows: # # Glue._eval_context_module_caller - this helper method, caller of inspect.stack() # Glue._eval_context - our caller, the actual shared function body # Glue.shared - shared function call dispatcher of Glue class, calls Glue._eval_context # Module.shared - shared function call dispatcher of Module class, calls Glue.shared internally # pylint: disable=too-many-boolean-expressions if len(stack) < 4 \ or stack[0][3] != '_eval_context_module_caller' \ or stack[1][3] != '_eval_context' \ or stack[2][3] != 'shared' \ or stack[3][3] != 'shared' \ or 'self' not in stack[3][0].f_locals: self.warn('Cannot infer calling module of eval_context') return None return cast(Module, stack[3][0].f_locals['self'])
[docs] def _eval_context(self): # type: () -> Dict[str, Any] """ Gather contexts of all modules in a pipeline and merge them together. **Always** returns a unique dictionary object, therefore it is safe for caller to update it. The return value is not cached in any way, therefore the change if its content won't affect future callers. Provided as a shared function, registered by the Glue instance itself. :rtype: dict """ self.debug('gather pipeline eval context') context = { 'MODULE': self._eval_context_module_caller() } # 1st "module" is always this instance of ``Glue``. # Then we walk through all pipelines, from the oldest to the most recent ones, and call their modules # in the order they were specified. context.update(self.eval_context) for pipeline in self.pipelines: for module in pipeline.modules: context.update(module.eval_context) log_dict(self.verbose, 'eval context', context) return context
@property def current_pipeline(self): # type: () -> Pipeline return self.pipelines[-1] @property def current_module(self): # type: () -> Optional[Module] return self.current_pipeline.current_module # # Module discovery and loading #
[docs] def _register_module(self, registry, group_name, klass, filepath): # type: (ModuleRegistryType, str, Type[Module], str) -> None """ Register one discovered ``gluetool`` module. :param dict(str, DiscoveredModule) registry: module registry to add module to. :param str group_name: group the module belongs to. :param Module klass: module class. :param str filepath: path to a file module comes from. """ names = getattr(klass, 'name', None) # type: Union[None, List[str], Tuple[str]] if not names: raise GlueError('No name specified by module class {}:{}'.format(filepath, klass.__name__)) def _do_register_module(name): # type: (str) -> None if name in registry: raise GlueError("Name '{}' of class {}:{} is a duplicate module name".format( name, filepath, klass.__name__ )) self.debug("registering module '{}' from {}:{}".format(name, filepath, klass.__name__)) registry[name] = DiscoveredModule(klass, group_name) if isinstance(names, (list, tuple)): for alias in names: _do_register_module(alias) else: _do_register_module(names)
[docs] def _check_pm_file(self, filepath): # type: (str) -> bool """ Make sure a file looks like a ``gluetool`` module: - can be processed by Python parser, - imports :py:class:`gluetool.glue.Glue` and :py:class:`gluetool.glue.Module`, - contains child class of :py:class:`gluetool.glue.Module`. :param str filepath: path to a file. :returns: ``True`` if file contains ``gluetool`` module, ``False`` otherwise. :raises gluetool.glue.GlueError: when it's not possible to finish the check. """ self.debug("check possible module file '{}'".format(filepath)) try: with open(filepath) as f: node = ast.parse(f.read()) # check for gluetool import def imports_gluetool(item): # type: (Any) -> bool """ Return ``True`` if item is an ``import`` statement, and imports ``gluetool``. """ class_name = item.__class__.__name__ is_import = class_name == 'Import' and item.names[0].name == 'gluetool' is_import_from = class_name == 'ImportFrom' and item.module == 'gluetool' return cast(bool, is_import or is_import_from) if not any((imports_gluetool(item) for item in node.__dict__['body'])): self.debug(" no 'import gluetool' found") return False # check for gluetool.Module class definition def has_module_class(item): # type: (Any) -> bool """ Return ``True`` if item is a class definition, and any of the base classes is gluetool.glue.Module. """ if item.__class__.__name__ != 'ClassDef': return False for base in item.bases: if (hasattr(base, 'id') and base.id == 'Module') \ or (hasattr(base, 'attr') and base.attr == 'Module'): return True return False if not any((has_module_class(item) for item in node.__dict__['body'])): self.debug(' no child of gluetool.Module found') return False return True # pylint: disable=broad-except except Exception as e: raise GlueError("Unable to check check module file '{}': {}".format(filepath, e))
[docs] def _do_import_pm(self, filepath, pm_name): # type: (str, str) -> Any """ Attempt to import a file as a Python module. :param str filepath: a file to import. :param str pm_name: name assigned to the imported module. :returns: imported Python module. :raises gluetool.glue.GlueError: when import failed. """ self.debug("try to import '{}' as a module '{}'".format(filepath, pm_name)) try: with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) pm = imp.load_source(pm_name, filepath) self.debug('imported file {} as a Python module {}'.format(filepath, pm_name)) return pm # pylint: disable=broad-except except Exception as exc: raise GlueError("Unable to import file '{}' as a module: {}".format(filepath, exc))
[docs] def _import_pm(self, filepath, pm_name): # type: (str, str) -> Any """ If a file contains ``gluetool`` modules, import the file as a Python module. If the file does not look like it contains ``gluetool`` modules, or when it's not possible to import the Python module successfully, method simply warns user and ignores the file. :param str filepath: file to load. :param str pm_name: Python module name for the loaded module. :returns: loaded Python module. :raises gluetool.glue.GlueError: when import failed. """ # Check content of the file, look for Glue and Module stuff. try: if not self._check_pm_file(filepath): return except GlueError as exc: self.warn("ignoring file '{}': {}".format(filepath, exc)) return # Try to import the file. try: pm = self._do_import_pm(filepath, pm_name) except GlueError as exc: self.warn("ignoring file '{}': {}".format(filepath, exc)) return return pm
[docs] def _discover_gm_in_file(self, registry, filepath, pm_name, group_name): # type: (ModuleRegistryType, str, str, str) -> None """ Discover ``gluetool`` modules in a file. Attempts to import the file as a Python module, and then checks its content and looks for ``gluetool`` modules. :param dict(str, DiscoveredModule) registry: registry of modules to which new ones would be added. :param str filepath: path to a file. :param str pm_name: a Python module name to use for the imported module. :param str group_name: a ``gluetool`` module group name assigned to ``gluetool`` modules discovered in the file. """ pm = self._import_pm(filepath, pm_name) if not pm: return # Look for gluetool modules in imported Python module's members, and register them. for _, member in inspect.getmembers(pm, inspect.isclass): if not isinstance(member, type) or not issubclass(member, Module) or member == Module: continue assert issubclass(member, Module) self._register_module(registry, group_name, member, filepath)
[docs] def _discover_gm_in_dir(self, dirpath, registry, pm_prefix): # type: (str, ModuleRegistryType, str) -> None """ Discover ``gluetool`` modules in a directory tree. In essence, it scans directory and its subdirectories for files with ``.py`` suffix, and searches for classes derived from :py:class:`gluetool.glue.Module` in these files. :param str dirpath: path to a directory. :param dict(str, DiscoveredModule) registry: registry of modules to which new ones would be added. :param str pm_prefix: a string used to prefix all imported Python module names. """ self.debug('discovering modules in directory {}'.format(dirpath)) for root, _, files in os.walk(dirpath): for filename in sorted(files): if not filename.endswith('.py'): continue # A group of the module is defined by the directories it lies in under the ``dirpath``. if root == dirpath: group_name = '' else: group_name = root.replace(dirpath + os.sep, '') pm_name = '{}.{}.{}'.format( pm_prefix, group_name.replace(os.sep, '.'), os.path.splitext(filename)[0] ) self._discover_gm_in_file(registry, os.path.join(root, filename), pm_name, group_name)
[docs] def _discover_gm_in_entry_point(self, entry_point, registry): # type: (str, ModuleRegistryType) -> None self.debug('discovering modules in entry point {}'.format(entry_point)) for ep_entry in pkg_resources.iter_entry_points(entry_point): klass = ep_entry.load() assert ep_entry.dist is not None self._register_module(registry, getattr(klass, 'group', ''), klass, ep_entry.dist.location)
[docs] def discover_modules(self, entry_points=None, paths=None): # type: (Optional[List[str]], Optional[List[str]]) -> ModuleRegistryType """ Discover and load all accessible modules. Two sources are examined: 1. entry points, handled by setuptools, to which Python packages can attach ``gluetool`` modules they provide, 2. directory trees. :param list(str) entry_points: list of entry point names to which ``gluetool`` modules are attached. If not set, entry points set byt the configuration (``--module-entry-point`` option) are used. :param list(str) paths: list of directories to search for ``gluetool`` modules. If not set, paths set by the configuration (``--module-path`` option) are used. :rtype: dict(str, DiscoveredModule) :returns: mapping between module names and ``DiscoveredModule`` instances, describing each module. """ entry_points = entry_points or self.module_entry_points paths = paths or self.module_paths log_dict(self.debug, 'discovering modules under following entry points', entry_points) log_dict(self.debug, 'discovering modules under following paths', paths) modules_registry = {} # type: ModuleRegistryType for entry_point in entry_points: self._discover_gm_in_entry_point(entry_point, modules_registry) for path in paths: self._discover_gm_in_dir(path, modules_registry, 'gluetool.file_modules') log_dict(self.debug, 'discovered modules', modules_registry) return modules_registry
def __init__(self, tool=None, sentry=None): # type: (Optional[Any], Optional[Any]) -> None # Initialize logging methods before doing anything else. # Right now, we don't know the desired log level, or if # output file is in play, just get simple logger before # the actual configuration is known. self._sentry = sentry if sentry is not None: self.sentry_submit_exception = sentry.submit_exception # type: ignore self.sentry_submit_message = sentry.submit_message # type: ignore Logging.setup_logger(sentry=sentry) super(Glue, self).__init__(Logging.get_logger()) self.tool = tool self._dryrun_level = DryRunLevels.DEFAULT # module types dictionary self.modules = {} # type: ModuleRegistryType # Pipeline stack - start with a mock pipeline: we need a place to register our shared functions. self.pipelines = [ Pipeline(self, []) ] # pylint: disable=protected-access self.current_pipeline._add_shared('eval_context', self, self._eval_context) # pylint: disable=arguments-differ
[docs] def parse_config(self, paths): # type: ignore # signature differs on purpose # type: (List[str]) -> None self._parse_config(paths)
[docs] def parse_args(self, args): # type: (Any) -> None module_dirs = '\n'.join([' - {}'.format(directory) for directory in DEFAULT_MODULE_PATHS]) data_dirs = '\n'.join([' - {}'.format(directory) for directory in [DEFAULT_DATA_PATH]]) module_config_dirs = '\n'.join([' - {}'.format(directory) for directory in DEFAULT_MODULE_CONFIG_PATHS]) epilog = trim_docstring(""" Default paths: * modules are searched under (--module-path): {} * module data files are searched under (--module-data-path): {} * module configuration files are searched under (--module-config-path): {} Supported environment variables: * GLUETOOL_TRACING_DISABLE - when set, tracing won't be enabled * GLUETOOL_TRACING_SERVICE_NAME (string) - name of the trace produced by tool execution * GLUETOOL_TRACING_REPORTING_HOST (string) - a hostname where tracing collector listens * GLUETOOL_TRACING_REPORTING_PORT (int) - a port on which tracing collector listens """).format(module_dirs, data_dirs, module_config_dirs) self._parse_args(args, usage='%(prog)s [options] [module1 [module1 options] module2 [module2 options] ...]', epilog=epilog, formatter_class=LineWrapRawTextHelpFormatter) # re-create logger - now we have all necessary configuration if self.option('verbose'): level = VERBOSE elif self.option('debug'): level = logging.DEBUG elif self.option('quiet'): level = logging.WARNING else: level = logging.INFO from .utils import normalize_bool_option switch_colors(normalize_bool_option(self.option('colors'))) debug_file = self.option('debug-file') verbose_file = self.option('verbose-file') json_file = self.option('json-file') if debug_file and not verbose_file: verbose_file = '{}.verbose'.format(debug_file) Logging.setup_logger( level=level, debug_file=debug_file, verbose_file=verbose_file, json_file=json_file, sentry=self._sentry, show_traceback=normalize_bool_option(self.option('show-traceback')) ) if level == logging.DEBUG and not verbose_file: # pylint: disable=line-too-long self.warn('Debug output enabled but no verbose destination set.') self.warn('Either use ``-v`` to display verbose messages on screen, or ``--verbose-file`` to store them in a file.') if self.option('isolated-run'): self._dryrun_level = DryRunLevels.ISOLATED elif self.option('dry-run'): self._dryrun_level = DryRunLevels.DRY
@property def dryrun_level(self): # type: () -> int return self._dryrun_level
[docs] def init_module(self, module_name, actual_module_name=None): # type: (str, Optional[str]) -> Module """ Given a name of the module, create its instance and give it a name. :param str module_name: Name under which will be the module instance known. :param str actual_module_name: Name of the module to instantiate. It does not have to match ``module_name`` - ``actual_module_name`` refers to the list of known ``gluetool`` modules while ``module_name`` is basically an arbitrary name new instance calls itself. If it's not set, which is the most common situation, it defaults to ``module_name``. :returns: A :py:class:`Module` instance. """ actual_module_name = actual_module_name or module_name klass = self.modules[actual_module_name].klass # type: Type[Module] return klass(self, module_name)
[docs] def run_pipeline(self, pipeline): # type: (Pipeline) -> PipelineReturnType self.pipelines.append(pipeline) try: return pipeline.run() finally: self.pipelines.pop(-1)
[docs] def run_modules(self, steps): # type: (PipelineStepsType) -> PipelineReturnType return self.run_pipeline(Pipeline(self, steps))
[docs] def run_module(self, module_name, module_argv=None, actual_module_name=None): # type: (str, Optional[List[str]], Optional[str]) -> PipelineReturnType """ Syntax sugar for :py:meth:`run_modules`, in the case you want to run just a one-shot module. :param str module_name: Name under which will be the module instance known. :param list(str) module_argv: Arguments of the module. :param str actual_module_name: Name of the module to instantiate. It does not have to match ``module_name`` - ``actual_module_name`` refers to the list of known ``gluetool`` modules while ``module_name`` is basically an arbitrary name new instance calls itself. If it's not set, which is the most common situation, it defaults to ``module_name``. """ step = PipelineStepModule(module_name, actual_module=actual_module_name, argv=module_argv) return self.run_modules([step])
[docs] def modules_as_groups(self, modules=None): # type: (Optional[ModuleRegistryType]) -> Dict[str, ModuleRegistryType] """ Gathers modules by their groups. :rtype: dict(str, dict(str, DiscoveredModule)) :returns: dictonary where keys represent module groups, and values are mappings between module names and the corresponding modules. """ modules = modules or self.modules groups = collections.defaultdict(dict) # type: Dict[str, ModuleRegistryType] for name, module_info in iteritems(modules): groups[module_info.group][name] = module_info return groups
[docs] def modules_descriptions(self, modules=None, groups=None): # type: (Optional[ModuleRegistryType], Optional[List[str]]) -> str """ Returns a string with modules and their descriptions. :param dict(str, DiscoveredModule) modules: mapping with modules. If not set, current known modules are used. :param list(str) groups: if set, limit descriptions to modules belinging to the given groups. :rtype: str """ modules = modules or self.modules as_groups = self.modules_as_groups(modules=modules) # List of lines, will be merged with `\n` before printing. descriptions = [] # type: List[str] if groups: descriptions += [ 'Available modules in group(s) {}'.format(', '.join(groups)) ] else: descriptions += [ 'Available modules' ] def _add_no_modules(): # type: () -> None descriptions.extend([ '', ' -- no modules found --' ]) if not as_groups: _add_no_modules() else: descriptions += [ '' ] for group_name, group in sorted(iteritems(as_groups)): # skip groups that are not in the list # note that groups is None if all groups should be shown if groups and group_name not in groups: continue group = as_groups[group_name] if not group: _add_no_modules() continue for module_name, module in sorted(iteritems(group)): # Indent module name by 4 spaces, and reserve 32 characters for each module name, # starting all descriptions at the same offset. descriptions.append(' {:32} {}'.format(module_name, module.klass.description)) return '\n'.join(descriptions)