Source code for cascade_at.model.grid_alchemy

from collections import defaultdict
from typing import Dict, Tuple, Optional
import numpy as np
import pandas as pd
import itertools

from cascade_at.core.log import get_loggers
from cascade_at.model.model import Model
from cascade_at.settings.settings import SettingsConfig
from cascade_at.inputs.locations import LocationDAG
from cascade_at.inputs.covariate_specs import CovariateSpecs
from cascade_at.model.utilities.grid_helpers import smooth_grid_from_smoothing_form
from cascade_at.model.utilities.grid_helpers import rectangular_data_to_var
from cascade_at.model.utilities.grid_helpers import constraint_from_rectangular_data
from cascade_at.model.utilities.grid_helpers import estimate_grid_from_draws
from cascade_at.settings.settings_config import Smoothing
from cascade_at.model.var import Var
from cascade_at.model.smooth_grid import SmoothGrid, _PriorGrid
from cascade_at.model.priors import _Prior

LOG = get_loggers(__name__)


MOST_DETAILED_CASCADE_LEVEL = 'most_detailed'


[docs]class Alchemy: def __init__(self, settings: SettingsConfig): """ An object initialized with model settings from cascade.settings.configuration.Configuration that can be used to construct parent-child location-specific models with the attribute ModelConstruct.construct_two_level_model(). Examples -------- >>> from cascade_at.settings.base_case import BASE_CASE >>> from cascade_at.settings.settings import load_settings >>> from cascade_at.inputs.measurement_inputs import MeasurementInputsFromSettings >>> settings = load_settings(BASE_CASE) >>> mc = Alchemy(settings) >>> i = MeasurementInputsFromSettings(settings) >>> i.get_raw_inputs() >>> mc.construct_two_level_model(location_dag=i.location_dag, >>> parent_location_id=102, >>> covariate_specs=i.covariate_specs) """ self.settings = settings self.age_time_grid = self.construct_age_time_grid() self.single_age_time_grid = self.construct_single_age_time_grid() self.model = None
[docs] def construct_age_time_grid(self) -> Dict[str, np.ndarray]: """ Construct a DEFAULT age-time grid, to be updated when we initialize the model. """ default_age_time = dict() default_age_time["age"] = np.linspace(0, 100, 21) default_age_time["time"] = np.linspace(1990, 2015, 6) for kind in ["age", "time"]: default_grid = getattr(self.settings.model, f"default_{kind}_grid") if default_grid is not None: default_age_time[kind] = np.sort(np.array(default_grid, dtype=float)) return default_age_time
[docs] def construct_single_age_time_grid(self) -> Tuple[np.ndarray, np.ndarray]: """ Construct a single age-time grid. Use this age and time when a smooth grid doesn't depend on age and time. """ single_age = self.age_time_grid["age"][:1] single_time = np.array([self.age_time_grid["time"][len(self.age_time_grid["time"]) // 2]]) single_age_time = (single_age, single_time) return single_age_time
[docs] def get_smoothing_grid(self, rate: Smoothing) -> SmoothGrid: """ Construct a smoothing grid for any rate in the model. Parameters ---------- rate Some smoothing form for a rate. Returns ------- The rate translated into a SmoothGrid based on the model settings' default age and time grids. """ return smooth_grid_from_smoothing_form( default_age_time=self.age_time_grid, single_age_time=self.single_age_time_grid, smooth=rate )
[docs] def get_all_rates_grids(self) -> Dict[str, SmoothGrid]: """ Get a dictionary of all the rates and their grids in the model. """ return {c.rate: self.get_smoothing_grid(rate=c) for c in self.settings.rate}
[docs] @staticmethod def override_priors(rate_grid: SmoothGrid, update_dict=Dict[str, np.ndarray], new_prior_distribution: Optional[str] = 'gaussian'): """ Override priors for rates. This is used when we want to do posterior to prior, so we are overriding the global settings with location-specific settings based on parent posteriors. Parameters ---------- rate_grid SmoothGrid object for a rate update_dict Dictionary with ages and times vectors and draws for values, dage, and dtime to use in overriding the prior. new_prior_distribution The new prior distribution to override the existing priors. """ # Check that the prior grid lines up with this rate # grid. If it doesn't, we have a problem. assert (update_dict['ages'] == rate_grid.ages).all() assert (update_dict['times'] == rate_grid.times).all() # For each of the types of priors, update rate_grid # with the new prior information from the update_prior # object that has info from a different model fit if 'value' in update_dict: estimate_grid_from_draws( grid_priors=rate_grid.value, draws=update_dict['value'], ages=rate_grid.ages, times=rate_grid.times, new_prior_distribution=new_prior_distribution ) if 'dage' in update_dict: estimate_grid_from_draws( grid_priors=rate_grid.dage, draws=update_dict['dage'], ages=rate_grid.ages[:-1], times=rate_grid.times, new_prior_distribution=new_prior_distribution ) if 'dtime' in update_dict: estimate_grid_from_draws( grid_priors=rate_grid.dtime, draws=update_dict['dtime'], ages=rate_grid.ages, times=rate_grid.times[:-1], new_prior_distribution=new_prior_distribution )
[docs] @staticmethod def apply_min_cv_to_prior_grid(prior_grid: _PriorGrid, min_cv: float, min_std: float = 1e-10) -> None: """ Applies the minimum coefficient of variation to a _PriorGrid to enforce that minCV across all variables in the grid. Updates the _PriorGrid in place. """ prior_grid.grid['std'] = prior_grid.grid.apply( lambda row: max(min_std, row['std'], np.abs(row['mean']) * min_cv ), axis=1 )
[docs] def construct_two_level_model(self, location_dag: LocationDAG, parent_location_id: int, covariate_specs: CovariateSpecs, weights: Optional[Dict[str, Var]] = None, omega_df: Optional[pd.DataFrame] = None, update_prior: Optional[Dict[str, Dict[str, np.ndarray]]] = None, min_cv: Optional[Dict[str, Dict[str, float]]] = None, update_mulcov_prior: Optional[Dict[Tuple[str, str, str], _Prior]] = None): """ Construct a Model object for a parent location and its children. Parameters ---------- location_dag Location DAG specifying the location hierarchy parent_location_id Parent location to build the model for covariate_specs covariate specifications, specifically will use covariate_specs.covariate_multipliers weights omega_df data frame with omega values in it (other cause mortality) update_prior dictionary of dictionary for prior updates to rates update_mulcov_prior dictionary of mulcov prior updates min_cv dictionary (can be defaultdict) for minimum coefficient of variation keyed by cascade level, then by rate """ children = location_dag.children(parent_location_id) cascade_level = str(location_dag.depth(parent_location_id)) # min_cv lookup expects a string key is_leaf = location_dag.is_leaf(parent_location_id) if is_leaf: cascade_level = MOST_DETAILED_CASCADE_LEVEL model = Model( nonzero_rates=self.settings.rate, parent_location=parent_location_id, child_location=children, covariates=covariate_specs.covariate_list, weights=weights ) # First construct the rate grid, and update with prior # information from a parent for value, dage, and dtime. for smooth in self.settings.rate: rate_grid = self.get_smoothing_grid(rate=smooth) if update_prior is not None: if smooth.rate in update_prior: self.override_priors(rate_grid=rate_grid, update_dict=update_prior[smooth.rate]) if min_cv is not None: self.apply_min_cv_to_prior_grid( prior_grid=rate_grid.value, min_cv=min_cv[cascade_level][smooth.rate] ) model.rate[smooth.rate] = rate_grid # Second construct the covariate grids for mulcov in covariate_specs.covariate_multipliers: grid = smooth_grid_from_smoothing_form( default_age_time=self.age_time_grid, single_age_time=self.single_age_time_grid, smooth=mulcov.grid_spec ) if update_mulcov_prior is not None and (mulcov.group, *mulcov.key) in update_mulcov_prior: ages = grid.ages times = grid.times for age, time in itertools.product(ages, times): lb = grid.value[age, time].lower ub = grid.value[age, time].upper update_mulcov_prior[(mulcov.group, *mulcov.key)].lower = lb update_mulcov_prior[(mulcov.group, *mulcov.key)].upper = ub grid.value[age, time] = update_mulcov_prior[(mulcov.group, *mulcov.key)] model[mulcov.group][mulcov.key] = grid # Construct the random effect grids, based on the parent location # specified. if self.settings.random_effect: random_effect_by_rate = defaultdict(list) for smooth in self.settings.random_effect: re_grid = smooth_grid_from_smoothing_form( default_age_time=self.age_time_grid, single_age_time=self.single_age_time_grid, smooth=smooth ) if not smooth.is_field_unset("location") and smooth.location in model.child_location: location = smooth.location else: location = None model.random_effect[(smooth.rate, location)] = re_grid random_effect_by_rate[smooth.rate].append(location) for rate_to_check, locations in random_effect_by_rate.items(): if locations != [None] and set(locations) != set(model.child_location): raise RuntimeError(f"Random effect for {rate_to_check} does not have " f"entries for all child locations, only {locations} " f"instead of {model.child_location}.") # Lastly, constrain omega for the parent and the random effects for the children. if self.settings.model.constrain_omega: LOG.info("Adding the omega constraint.") if omega_df is None: raise RuntimeError("Need an omega data frame in order to constrain omega.") parent_omega = omega_df.loc[omega_df.location_id == parent_location_id].copy() if parent_omega.empty: raise RuntimeError(f"No omega values for location {parent_location_id}.") omega = rectangular_data_to_var(gridded_data=parent_omega) model.rate["omega"] = constraint_from_rectangular_data( rate_var=omega, default_age_time=self.age_time_grid ) locations = set(omega_df.location_id.unique().tolist()) children_without_omega = set(children) - set(locations) if children_without_omega: LOG.warning(f"Children of {parent_location_id} missing omega {children_without_omega}" f"so not including child omega constraints") else: for child in children: child_omega = omega_df.loc[omega_df.location_id == child].copy() assert not child_omega.empty child_rate = rectangular_data_to_var(gridded_data=child_omega) def child_effect(age, time): return np.log(child_rate(age, time) / omega(age, time)) model.random_effect[("omega", child)] = constraint_from_rectangular_data( rate_var=child_effect, default_age_time=self.age_time_grid ) return model