Source code for recipe.ingredients

from functools import total_ordering
from uuid import uuid4
from sqlalchemy import Float, String, and_, between, case, cast, func, or_, text, not_
from recipe.exceptions import BadIngredient
from recipe.utils import AttrDict, filter_to_string
from recipe.utils.datatype import (
    convert_date,
    convert_datetime,
    determine_datatype,
    datatype_from_column_expression,
)
from typing import List

ALLOWED_OPERATORS = set(
    [
        "eq",
        "ne",
        "lt",
        "lte",
        "gt",
        "gte",
        "is",
        "isnot",
        "like",
        "ilike",
        "quickselect",
        "in",
        "notin",
        "between",
    ]
)


def is_nested_condition(v) -> bool:
    return isinstance(v, dict) and "operator" in v and "value" in v


def contains_complex_values(v: List) -> bool:
    """Check if any of the values in a list requires special handling to filter on"""
    return None in v or any(map(is_nested_condition, v))


[docs]@total_ordering class Ingredient(object): """Ingredients combine to make a SQLAlchemy query. Any unknown keyword arguments provided to an Ingredient during initialization are stored in a meta object. .. code:: python # icon is an unknown keyword argument m = Metric(func.sum(MyTable.sales), icon='cog') print(m.meta.icon) >>> 'cog' This meta storage can be used to add new capabilities to ingredients. Args: id (:obj:`str`): An id to identify this Ingredient. If ingredients are added to a Shelf, the id is automatically set as the key in the shelf. columns (:obj:`list` of :obj:`ColumnElement`): A list of SQLAlchemy columns to use in a query select. filters (:obj:`list` of :obj:`BinaryExpression`): A list of SQLAlchemy BinaryExpressions to use in the .filter() clause of a query. havings (:obj:`list` of :obj:`BinaryExpression`): A list of SQLAlchemy BinaryExpressions to use in the .having() clause of a query. columns (:obj:`list` of :obj:`ColumnElement`): A list of SQLAlchemy columns to use in the `group_by` clause of a query. formatters: (:obj:`list` of :obj:`callable`): A list of callables to apply to the result values. If formatters exist, property `{ingredient.id}_raw` will exist on each result row containing the unformatted value. cache_context (:obj:`str`): Extra context when caching this ingredient. DEPRECATED ordering (`string`, 'asc' or 'desc'): One of 'asc' or 'desc'. 'asc' is the default value. The default ordering of this ingredient if it is used in a ``recipe.order_by``. This is added to the ingredient when the ingredient is used in a ``recipe.order_by``. group_by_strategy (:obj:`str`): A strategy to use when preparing group_bys for the query "labels" is the default strategy which will use the labels assigned to each column. "direct" will use the column expression directly. This alternative is useful when there might be more than one column with the same label being used in the query. quickselects (:obj:`list` of named filters): A list of named filters that can be accessed through ``build_filter``. Named filters are dictionaries with a ``name`` (:obj:str) property and a ``condition`` property (:obj:`BinaryExpression`) datatype (:obj:`str`): The identified datatype (num, str, date, bool, datetime) of the parsed expression datatype_by_role (:obj:`dict`): The identified datatype (num, str, date, bool, datetime) for each role. Returns: An Ingredient object. """ def __init__(self, **kwargs): self.id = kwargs.pop("id", uuid4().hex[:12]) self.columns = kwargs.pop("columns", []) self.filters = kwargs.pop("filters", []) self.havings = kwargs.pop("havings", []) self.group_by = kwargs.pop("group_by", []) self.formatters = kwargs.pop("formatters", []) self.quickselects = kwargs.pop("quickselects", []) self.column_suffixes = kwargs.pop("column_suffixes", None) self.cache_context = kwargs.pop("cache_context", "") self.datatype = kwargs.pop("datatype", None) self.datatype_by_role = kwargs.pop("datatype_by_role", dict()) self.anonymize = False self.roles = {} self._labels = [] self.error = kwargs.pop("error", None) # What order should this be in self.ordering = kwargs.pop("ordering", "asc") self.group_by_strategy = kwargs.pop("group_by_strategy", "labels") if not isinstance(self.formatters, (list, tuple)): raise BadIngredient( "formatters passed to an ingredient must be a list or tuple" ) # If explicit suffixes are passed in, there must be one for each column if self.column_suffixes is not None and len(self.column_suffixes) != len( self.columns ): raise BadIngredient("column_suffixes must be the same length as columns") # Any remaining passed properties are available in self.meta self.meta = AttrDict(kwargs) def __hash__(self): return hash(self.describe()) def __repr__(self): return self.describe() def _stringify(self): """Return a relevant string based on ingredient type for repr and ordering. Ingredients with the same classname, id and _stringify value are considered the same.""" return " ".join(str(col) for col in self.columns)
[docs] def describe(self): """A string representation of the ingredient.""" return "({}){} {}".format(self.__class__.__name__, self.id, self._stringify())
def _format_value(self, value): """Formats value using any stored formatters.""" for f in self.formatters: value = f(value) return value
[docs] def make_column_suffixes(self): """Make sure we have the right column suffixes. These will be appended to `id` when generating the query. Developers note: These are generated when the query runs because the recipe may be run with anonymization on or off, which will inject a formatter. """ if self.column_suffixes: return self.column_suffixes if len(self.columns) == 0: return () elif len(self.columns) == 1: return ("_raw",) if self.formatters else ("",) else: raise BadIngredient( "column_suffixes must be supplied if there is more than one column" )
@property def query_columns(self): """Yield labeled columns to be used as a select in a query.""" self._labels = [] for column, suffix in zip(self.columns, self.make_column_suffixes()): self._labels.append(self.id + suffix) yield column.label(self.id + suffix) @property def order_by_columns(self): """Yield columns to be used in an order by using this ingredient. Column ordering is in reverse order of columns. When grouping, recipe supports two strategies. group_by_strategy == "labels" uses the labels added to columns. This is preferable and is supported by some databases. SQL Server requires grouping by the original column expressions """ # Ensure the labels are generated if not self._labels: list(self.query_columns) if self.group_by_strategy == "labels": if self.ordering == "desc": suffix = " DESC" else: suffix = "" return [ text(f"{lbl}{suffix}") for _, lbl in reversed(list(zip(self.columns, self._labels))) ] else: if self.ordering == "desc": return [col.desc() for col in reversed(self.columns)] else: return reversed(self.columns) @property def cauldron_extras(self): """Yield extra tuples containing a field name and a callable that takes a row. """ if self.formatters: yield self.id, lambda row: self._format_value( getattr(row, f"{self.id}_raw") ) def _order(self): """Ingredients are sorted by subclass then by id.""" if isinstance(self, Dimension): return (0, self.id) elif isinstance(self, Metric): return (1, self.id) elif isinstance(self, Filter): return (2, self.id) elif isinstance(self, Having): return (3, self.id) else: return (4, self.id) def __lt__(self, other): """Make ingredients sortable.""" return self._order() < other._order() def __eq__(self, other): """Make ingredients sortable.""" return self._order() == other._order() def __ne__(self, other): """Make ingredients sortable.""" return not (self._order() == other._order()) def _build_scalar_filter(self, value, operator=None, target_role=None): """Build a Filter given a single value. Args: value (a string, number, boolean or None): operator (`str`) A valid scalar operator. The default operator is `eq` target_role (`str`) An optional role to build the filter against Returns: A Filter object """ # Developer's note: Valid operators should appear in ALLOWED_OPERATORS # This is used by the AutomaticFilter extension. if operator is None: operator = "eq" if target_role and target_role in self.roles: filter_column = self.roles.get(target_role) datatype = determine_datatype(self, target_role) else: filter_column = self.columns[0] datatype = determine_datatype(self) # Ensure that the filter_column and value have compatible data types # Support passing ILIKE in Paginate extensions if datatype == "date": value = convert_date(value) elif datatype == "datetime": value = convert_datetime(value) if isinstance(value, str) and datatype != "str": filter_column = cast(filter_column, String) if operator == "eq": # Default operator is 'eq' so if no operator is provided, handle # like an 'eq' if value is None: return filter_column.is_(value) else: return filter_column == value if operator == "ne": return filter_column != value elif operator == "lt": return filter_column < value elif operator == "lte": return filter_column <= value elif operator == "gt": return filter_column > value elif operator == "gte": return filter_column >= value elif operator == "is": return filter_column.is_(value) elif operator == "isnot": return filter_column.isnot(value) elif operator == "like": value = str(value) return filter_column.like(value) elif operator == "ilike": value = str(value) return filter_column.ilike(value) elif operator == "quickselect": for qs in self.quickselects: if qs.get("name") == value: return qs.get("condition") raise ValueError( "quickselect {} was not found in " "ingredient {}".format(value, self.id) ) else: raise ValueError("Unknown operator {}".format(operator)) def _build_vector_filter(self, value, operator=None, target_role=None): """Build a Filter given a list of values. Args: value (a list of string, number, boolean or None): operator (:obj:`str`) A valid vector operator. The default operator is `in`. target_role (`str`) An optional role to build the filter against Returns: A Filter object """ # Developer's note: Valid operators should appear in ALLOWED_OPERATORS # This is used by the AutomaticFilter extension. if operator is None: operator = "in" if target_role and target_role in self.roles: filter_column = self.roles.get(target_role) datatype = determine_datatype(self, target_role) else: filter_column = self.columns[0] datatype = determine_datatype(self) if datatype == "date": value = list(map(convert_date, value)) elif datatype == "datetime": value = list(map(convert_datetime, value)) if operator == "and": conditions = [self.build_filter(x["value"], x["operator"]) for x in value] return and_(*conditions) if operator in ("in", "notin"): if contains_complex_values(value): # A list may contain additional operators or nones # Convert from: # department__in: [None, "A", 'B"] # # to the SQL # # department in ("A", "B") OR department is null simple_values = sorted( [v for v in value if v is not None and not is_nested_condition(v)] ) nested_conditions = [v for v in value if is_nested_condition(v)] conditions = [] if None in value: conditions.append(filter_column.is_(None)) if simple_values: conditions.append(filter_column.in_(simple_values)) conditions.extend( self.build_filter(cond["value"], operator=cond["operator"]) for cond in nested_conditions ) cond = or_(*conditions) else: # Sort to generate deterministic query sql for caching cond = filter_column.in_(sorted(value)) return not_(cond) if operator == "notin" else cond elif operator == "between": if len(value) != 2: ValueError( "When using between, you can only supply a " "lower and upper bounds." ) lower_bound, upper_bound = value return between(filter_column, lower_bound, upper_bound) elif operator == "quickselect": qs_conditions = [] for v in value: qs_found = False for qs in self.quickselects: if qs.get("name") == v: qs_found = True qs_conditions.append(qs.get("condition")) break if not qs_found: raise ValueError( "quickselect {} was not found in " "ingredient {}".format(value, self.id) ) return or_(*qs_conditions) else: raise ValueError("Unknown operator {}".format(operator))
[docs] def build_filter(self, value, operator=None, target_role=None): """ Builds a filter based on a supplied value and optional operator. If no operator is supplied an ``in`` filter will be used for a list and a ``eq`` filter if we get a scalar value. ``build_filter`` is used by the AutomaticFilter extension. Args: value: A value or list of values to operate against operator (:obj:`str`) An operator that determines the type of comparison to do against value. The default operator is 'in' if value is a list and 'eq' if value is a string, number, boolean or None. target_role (`str`) An optional role to build the filter against Returns: A SQLAlchemy boolean expression """ value_is_scalar = not isinstance(value, (list, tuple)) if value_is_scalar: return self._build_scalar_filter( value, operator=operator, target_role=target_role ) else: return self._build_vector_filter( value, operator=operator, target_role=target_role )
@property def expression(self): """An accessor for the SQLAlchemy expression representing this Ingredient.""" if self.columns: return self.columns[0] else: return None
[docs]class Filter(Ingredient): """A simple filter created from a single expression.""" def __init__(self, expression, **kwargs): super(Filter, self).__init__(**kwargs) self.filters = [expression] self.datatype = "bool" def _stringify(self): return filter_to_string(self) @property def expression(self): """An accessor for the SQLAlchemy expression representing this Ingredient.""" if self.filters: return self.filters[0] else: return None
[docs]class Having(Ingredient): """A Having that limits results based on an aggregate boolean clause""" def __init__(self, expression, **kwargs): super(Having, self).__init__(**kwargs) self.havings = [expression] self.datatype = "bool" def _stringify(self): return " ".join(str(expr) for expr in self.havings) @property def expression(self): """An accessor for the SQLAlchemy expression representing this Ingredient.""" if self.havings: return self.havings[0] else: return None
[docs]class Dimension(Ingredient): """A Dimension is an Ingredient that adds columns and groups by those columns. Columns should be non-aggregate SQLAlchemy expressions. The required expression supplies the dimension's "value" role. Additional expressions can be provided in keyword arguments with keys that look like "{role}_expression". The role is suffixed to the end of the SQL column name. For instance, the following .. code:: python Dimension(Hospitals.name, latitude_expression=Hospitals.lat longitude_expression=Hospitals.lng, id='hospital') would add columns named "hospital", "hospital_latitude", and "hospital_longitude" to the recipes results. All three of these expressions would be used as group bys. Two special roles that can be added are "id" and "order_by". If a keyword argument "id_expression" is passed, this expression will appear first in the list of columns and group_bys. This "id" will be used if you call `build_filter` on the dimension. If the keyword argument "order_by_expression" is passed, this expression will appear last in the list of columns and group_bys. The following additional keyword parameters are also supported: Args: lookup (:obj:`dict`): A dictionary that is used to map values to new values. Note: Lookup adds a ``formatter`` callable as the first item in the list of formatters. lookup_default (:obj:`object`) A default to show if the value can't be found in the lookup dictionary. Returns: A Filter object :param lookup: dict A dictionary to translate values into :param lookup_default: A default to show if the value can't be found in the lookup dictionary. """ def __init__(self, expression, **kwargs): super(Dimension, self).__init__(**kwargs) if self.datatype is None: self.datatype = datatype_from_column_expression(expression) # We must always have a value role self.roles = {"value": expression} for k, v in kwargs.items(): role = None if k.endswith("_expression"): # Remove _expression to get the role role = k[:-11] if role: if role == "raw": raise BadIngredient("raw is a reserved role in dimensions") self.roles[role] = v if not self.datatype_by_role: for k, expr in self.roles.items(): self.datatype_by_role[k] = datatype_from_column_expression(expr) self.columns = [] self._group_by = [] self.role_keys = [] if "id" in self.roles: self.columns.append(self.roles["id"]) self._group_by.append(self.roles["id"]) self.role_keys.append("id") if "value" in self.roles: self.columns.append(self.roles["value"]) self._group_by.append(self.roles["value"]) self.role_keys.append("value") # Add all the other columns in sorted order of role # with order_by coming last # For instance, if the following are passed # expression, id_expression, order_by_expresion, zed_expression the order of # columns would be "id", "value", "zed", "order_by" # When using group_bys for ordering we put them in reverse order. ordered_roles = [ k for k in sorted(self.roles.keys()) if k not in ("id", "value") ] # Move order_by to the end if "order_by" in ordered_roles: ordered_roles.remove("order_by") ordered_roles.append("order_by") for k in ordered_roles: self.columns.append(self.roles[k]) self._group_by.append(self.roles[k]) self.role_keys.append(k) if "lookup" in kwargs: self.lookup = kwargs.get("lookup") if not isinstance(self.lookup, dict): raise BadIngredient("lookup must be a dictionary") # Inject a formatter that performs the lookup if "lookup_default" in kwargs: self.lookup_default = kwargs.get("lookup_default") self.formatters.insert( 0, lambda value: self.lookup.get(value, self.lookup_default) ) else: self.formatters.insert(0, lambda value: self.lookup.get(value, value)) @property def group_by(self): # Ensure the labels are generated if not self._labels: list(self.query_columns) if self.group_by_strategy == "labels": return [lbl for _, lbl in zip(self._group_by, self._labels)] else: return self._group_by @group_by.setter def group_by(self, value): self._group_by = value @property def cauldron_extras(self): """Yield extra tuples containing a field name and a callable that takes a row """ # This will format the value field for extra in super(Dimension, self).cauldron_extras: yield extra yield self.id + "_id", lambda row: getattr(row, self.id_prop)
[docs] def make_column_suffixes(self): """Make sure we have the right column suffixes. These will be appended to `id` when generating the query. """ if self.formatters: value_suffix = "_raw" else: value_suffix = "" return tuple( value_suffix if role == "value" else "_" + role for role in self.role_keys )
@property def id_prop(self): """The label of this dimensions id in the query columns""" if "id" in self.role_keys: return self.id + "_id" else: # Use the value dimension if self.formatters: return self.id + "_raw" else: return self.id
[docs]class IdValueDimension(Dimension): """ DEPRECATED: A convenience class for creating a Dimension with a separate ``id_expression``. The following are identical. .. code:: python d = Dimension(Student.student_name, id_expression=Student.student_id) d = IdValueDimension(Student.student_id, Student.student_name) The former approach is recommended. Args: id_expression (:obj:`ColumnElement`) A column expression that is used to identify the id for a Dimension value_expression (:obj:`ColumnElement`) A column expression that is used to identify the value for a Dimension """ def __init__(self, id_expression, value_expression, **kwargs): kwargs["id_expression"] = id_expression super(IdValueDimension, self).__init__(value_expression, **kwargs)
[docs]class LookupDimension(Dimension): """DEPRECATED Returns the expression value looked up in a lookup dictionary""" def __init__(self, expression, lookup, **kwargs): """A Dimension that replaces values using a lookup table. :param expression: The dimension field :type value: object :param lookup: A dictionary of key/value pairs. If the keys will be replaced by values in the value of this Dimension :type operator: dict :param default: The value to use if a dimension value isn't found in the lookup table. The default behavior is to show the original value if the value isn't found in the lookup table. :type default: object """ if "default" in kwargs: kwargs["lookup_default"] = kwargs.pop("default") kwargs["lookup"] = lookup super(LookupDimension, self).__init__(expression, **kwargs)
[docs]class Metric(Ingredient): """A simple metric created from a single expression""" def __init__(self, expression, **kwargs): super(Metric, self).__init__(**kwargs) self.columns = [expression] if self.datatype is None: self.datatype = datatype_from_column_expression(expression) # We must always have a value role self.roles = {"value": expression}
[docs] def build_filter(self, value, operator=None): """Building filters with Metric returns Having objects.""" f = super().build_filter(value, operator=operator) return Having(f.filters[0])
[docs]class DivideMetric(Metric): """A metric that divides a numerator by a denominator handling several possible error conditions The default strategy is to add an small value to the denominator Passing ifzero allows you to give a different value if the denominator is zero. """ def __init__(self, numerator, denominator, **kwargs): ifzero = kwargs.pop("ifzero", "epsilon") epsilon = kwargs.pop("epsilon", 0.000000001) if ifzero == "epsilon": # Add an epsilon value to denominator to avoid divide by zero # errors expression = cast(numerator, Float) / ( func.coalesce(cast(denominator, Float), 0.0) + epsilon ) else: # If the denominator is zero, return the ifzero value otherwise do # the division expression = case( ((cast(denominator, Float) == 0.0, ifzero),), else_=cast(numerator, Float) / cast(denominator, Float), ) super(DivideMetric, self).__init__(expression, **kwargs)
[docs]class WtdAvgMetric(DivideMetric): """A metric that generates the weighted average of a metric by a weight.""" def __init__(self, expression, weight_expression, **kwargs): numerator = func.sum(expression * weight_expression) denominator = func.sum(weight_expression) super(WtdAvgMetric, self).__init__(numerator, denominator, **kwargs)
class InvalidIngredient(Ingredient): pass