Source code for cascade_at.model.model

from collections import Counter
from collections.abc import Mapping
from typing import List, Optional, Dict

import numpy as np

from cascade_at.dismod.constants import WeightEnum
from cascade_at.model.covariate import Covariate
from cascade_at.model.dismod_groups import DismodGroups
from cascade_at.model.var import Var
from cascade_at.model.smooth_grid import uninformative_grid_from_var


[docs]class Model(DismodGroups): """ A DismodGroups container of SmoothGrid. Uses locations as given and translates them into nodes for Dismod-AT. Uses ages and times as given and translates them into ``age_id`` and ``time_id`` for Dismod-AT. """ def __init__(self, nonzero_rates: List[str], parent_location: int, child_location: Optional[List[int]] = None, covariates: Optional[List[Covariate]] = None, weights: Optional[Dict[str, Var]] = None): """ >>> from cascade_at.inputs.locations import LocationDAG >>> locations = LocationDAG(location_set_version_id=429) >>> m = Model(["chi", "omega", "iota"], 6, locations.dag.successors(6)) Parameters ---------- nonzero_rates A list of rates, using the Dismod-AT terms for the rates, so they are "iota", "chi", "omega", "rho", and "pini". parent_location The location ID for the parent. child_location List of the children. covariates A list of covariate objects. This supplies the reference values and max differences, used to exclude data by covariate value. weights There are four kinds of weights: "constant", "susceptible", "with_condition", and "total". No other weights are used. """ super().__init__() self.nonzero_rates = nonzero_rates self.location_id = parent_location self.child_location = child_location if child_location else list() double_locations = [l for (l, v) in Counter(self.child_location + [parent_location]).items() if v > 1] if double_locations: raise ValueError(f"Multiple locations have same ID {double_locations}") # Covariates are here because their reference values are part of # the model. Even though avgint and data use them, a model is always # written before the avgint and data are written. self.covariates = covariates if covariates else list() assert isinstance(self.covariates, list) if len(self.covariates) > 0: assert isinstance(self.covariates[0], Covariate) self._check_covariates(self.covariates) # There are always four weights, constant, susceptible, # with_condition, and total. if weights: self.weights = weights else: self.weights = dict() self._check_weights(self.weights) self._scale = None self.scale_set_by_user = False @property def scale(self) -> Var: """The scale is a Var, so it has a value for every model variable. It is the value of the model variable at which to evaluate the derivative of its base log-likelihood. This derivative sets the baseline against which nonlinear optimization will compare later values. If the derivative is zero, then the optimization will ignore this variable. """ return self._scale @scale.setter def scale(self, value): """Dismod-AT usually calculates this for you. If you've set it by hand, this records that this is the case so that it will rewrite what Dismod-AT calculates.""" self.scale_set_by_user = True self._scale = value def model_like(self): """Make another model with the same structure as this one but without priors.""" model = Model(self.nonzero_rates, self.location_id, self.child_location, self.covariates, self.weights) if self.scale_set_by_user: model._scale = self._scale return model def get_age_array(self) -> np.ndarray: """ Gets an array of ages used across grids in the model. """ ages = np.empty((0,), dtype=float) for group in self.values(): for grid in group.values(): ages = np.append(ages, grid.ages) return ages def get_time_array(self) -> np.ndarray: """ Gets an array of times used across grids in the model. """ times = np.empty((0,), dtype=float) for group in self.values(): for grid in group.values(): times = np.append(times, grid.times) return times def get_weights(self): """ Gets the weights to be written for the model. """ weights = self.weights.copy() arbitrary_grid = next(iter(self.rate.values())) one_age_time = (arbitrary_grid.ages[0:1], arbitrary_grid.times[0:1]) for kind in (weight.name for weight in WeightEnum): if kind not in self.weights: weights[kind] = Var(*one_age_time) weights[kind].grid.loc[:, "mean"] = 1.0 return weights def var_from_mean(self): # Call the mean mu because mean is a function. mu = DismodGroups() for group_name, group in self.items(): if group_name != "random_effect": for key, grid in group.items(): mu[group_name][key] = grid.var_from_mean() else: for key, grid in group.items(): # One Random Effect grid creates many child vars. if key[1] is None: for child in self.child_location: mu[group_name][(key[0], child)] = grid.var_from_mean() else: mu[group_name][key] = grid.var_from_mean() return mu def __eq__(self, other): if not isinstance(other, type(self)): return NotImplemented if not super().__eq__(other): return False if self.scale_set_by_user and not (other.scale_set_by_user and self._scale == other._scale): return False return (set(self.nonzero_rates) == set(other.nonzero_rates) and self.location_id == other.location_id and set(self.child_location) == set(other.child_location) and set(self.covariates) == set(other.covariates) and self.weights == other.weights ) def _ensure_weights(self): """If weights weren't made, then make them constant. Must be done after there is data in the Model.""" # Find an age and time already in the model because adding an # age and time outside the model can change the integration ranges. arbitrary_grid = next(iter(self.rate.values())) one_age_time = (arbitrary_grid.ages[0:1], arbitrary_grid.times[0:1]) for kind in (weight.name for weight in WeightEnum): if kind not in self.weights: self.weights[kind] = Var(*one_age_time) self.weights[kind].grid.loc[:, "mean"] = 1.0 def _check(self): child_specific_rate = dict() for rate, child in self.random_effect: child_specific = child is not None if rate in child_specific_rate and child_specific_rate[rate] != child_specific: raise ValueError( f"Model random effect for {rate} has both child-specific " f"and all-child specifications") else: child_specific_rate[rate] = child_specific @staticmethod def _check_covariates(covariates): for c in covariates: if not isinstance(c, Covariate): raise TypeError(f"Covariate passed to model isn't an instance of covariate {c}.") @staticmethod def _check_weights(weights): if not isinstance(weights, Mapping): raise TypeError(f"Weights are a dictionary from string to Var classes, not {type(weights)}.") for name, weight in weights.items(): if not isinstance(weight, Var): raise TypeError(f"Each weight should be a Var object, not {name}={type(weight)}.") if name not in dir(WeightEnum): raise ValueError(f"Weights should be one of {[w.name for w in WeightEnum]}") @classmethod def from_var(cls, var, parent_location, weights=None, covariates=None, multiple_random_effects=False): """ Given values across all rates, construct a model with loose priors in order to be able to predict from those rates. Args: var (DismodGroups[Var]): Values on grids. parent_location (int): A parent location, because that isn't in the keys. weights (Dict[Var]): Population weights for integrands. covariates(List[Covariate]): Covariate objects. multiple_random_effects (bool): Create a separate smooth grid for each random effect in the Var. The default is to create a single smooth grid for all random effects. Returns: Model: with Uniform distributions everywhere and no mulstd. """ child_locations = list(sorted({k[1] for k in var.random_effect.keys()})) nonzero_rates = list(var.rate.keys()) model = cls(nonzero_rates, parent_location, child_locations, weights=weights, covariates=covariates) strictly_positive = dict(rate=True) for group_name, group in var.items(): skip_re_children = group_name == "random_effect" and not multiple_random_effects for key, var in group.items(): if skip_re_children: # Make one smooth grid for all children. assign_key = (key[0], None) else: assign_key = key if assign_key not in model[group_name]: must_be_positive = strictly_positive.get(group_name, False) model[group_name][assign_key] = uninformative_grid_from_var(var, must_be_positive) return model