from datetime import timedelta
from itertools import product
from math import nan, inf
import numpy as np
import pandas as pd
from cascade_at.core.log import get_loggers
from cascade_at.dismod.constants import PriorKindEnum
LOG = get_loggers(__name__)
GRID_SNAP_DISTANCE = 1 / timedelta(days=365).total_seconds()
"""Times within one second are considered equal."""
[docs]class AgeTimeGrid:
"""The AgeTime grid holds rows of a table at each age and time value.
At each age and time point is a DataFrame consisting of the columns
given in the constructor. So getting an item returns a dataframe
with those columns. Setting a DataFrame sets those columns.
Each AgeTimeGrid has three possible mulstds, for value, dage, dtime.
>>> atg = AgeTimeGrid([0, 10, 20], [1990, 2000, 2010], ["height", "weight"])
>>> atg[:, :] = [6.1, 195]
>>> atg[:, :].height = [5.9]
>>> atg[10, 2000] = [5.7, 180]
>>> atg[5:17, 1980:1990].weight = 125
>>> assert (atg[20, 2000].weight == 195).all()
>>> assert isinstance(atg[0, 1990], pd.DataFrame)
If the column has the same name as a function (mean), then access it
with getitem,
>>> atg[:, :]["mean"] = [5.9]
Why is this in Pandas, when it's a regular array of data with an
index, which makes it better suited to XArray, or even a
Numpy array? It needs to interface with a database representation,
and Pandas is a better match there.
"""
def __init__(self, ages, times, columns):
try:
self.ages = np.sort(np.atleast_1d(ages).astype(float))
self.times = np.sort(np.atleast_1d(times).astype(float))
except TypeError:
raise TypeError(f"Ages and times should be arrays of floats {(ages, times)}.")
type_constraint = "Columns should be either a string or an iterable of strings."
if isinstance(columns, str):
columns = [columns]
try:
self.columns = list(columns)
except TypeError:
raise TypeError(f"{type_constraint} {columns}")
for col_is_str in self.columns:
if not isinstance(col_is_str, str):
raise TypeError(f"{type_constraint} {col_is_str}")
age_time = np.array(list(product(sorted(self.ages), sorted(self.times))))
self.grid = pd.DataFrame(dict(
age=age_time[:, 0],
time=age_time[:, 1],
))
self.grid = self.grid.assign(**{new_col: nan for new_col in columns})
self._mulstd = dict()
# Each mulstd is one record.
for kind in PriorKindEnum:
mulstd_df = pd.DataFrame(dict(
age=[nan],
time=[nan],
))
mulstd_df = mulstd_df.assign(**{new_col: nan for new_col in columns})
self._mulstd[kind.name] = mulstd_df
@property
def mulstd(self):
return self._mulstd
[docs] def age_time(self):
yield from zip(np.repeat(self.ages, len(self.times)), np.tile(self.times, len(self.ages)))
def __getitem__(self, age_time):
"""
Args:
age_time (float, float): Gets all rows with this (age, time).
Returns:
pd.DataFrame or pd.Series with columns.
"""
try:
age, time = age_time
except TypeError as te:
if "not iterable" in str(te):
raise TypeError(f"Index should be two floats for getting, not {age_time}.")
else:
raise
if isinstance(age, slice) or isinstance(time, slice):
raise TypeError(f"Cannot get a slice from an AgeTimeGrid.")
rows = self.grid.query("age == @age and time == @time")
if len(rows) > 0:
return rows[self.columns]
else:
raise KeyError(f"Age {age} and time {time} not found.")
def __setitem__(self, at_slice, value):
"""
Args:
at_slice (slice, slice): What to change, as integer offset into ages and times.
value (priors.Prior): The prior to set, containing dictionary of
parameters.
"""
try:
if len(at_slice) != 2:
raise ValueError("Set value at an age and time, so two arguments.")
except TypeError:
raise ValueError("Set value at an age and time, so two arguments")
at_range = list()
for one_slice in at_slice:
if not isinstance(one_slice, slice):
one_slice = slice(one_slice, one_slice)
if one_slice.step is not None:
raise ValueError("Slice in age or time, without a step.")
start = one_slice.start if one_slice.start is not None else -inf
stop = one_slice.stop if one_slice.stop is not None else inf
at_range.append([start - GRID_SNAP_DISTANCE, stop + GRID_SNAP_DISTANCE])
ages = self.ages[(at_range[0][0] <= self.ages) & (self.ages <= at_range[0][1])]
times = self.times[(at_range[1][0] <= self.times) & (self.times <= at_range[1][1])]
if len(ages) == 0:
raise ValueError(f"No ages within range {at_range[0]} "
"Are you looking for a point not in the grid?")
if len(times) == 0:
raise ValueError(f"No times within range {at_range[1]} "
"Are you looking for a point not in the grid?")
self.grid.loc[np.in1d(self.grid.age, ages) & np.in1d(self.grid.time, times), self.columns] = value
def __len__(self):
return self.variable_count()
[docs] def variable_count(self):
mulstd_cnt = sum(not df[self.columns].dropna(how="all").empty for df in self._mulstd.values())
return self.ages.shape[0] * self.times.shape[0] + mulstd_cnt
def __str__(self):
return f"AgeTimeGrid({len(self.ages)}, {len(self.times)}) with {self.variable_count()} model variables."
def __repr__(self):
return f"AgeTimeGrid({self.ages}, {self.times})"
def __eq__(self, other):
if not isinstance(other, type(self)):
LOG.debug(f"SmoothGrid not equal to {other}")
return NotImplemented
if set(self.mulstd.keys()) != set(other.mulstd.keys()):
LOG.debug(f"Different number of mulstd keys")
return False
for mul_key in self.mulstd.keys():
try:
pd.testing.assert_frame_equal(self.mulstd[mul_key], other.mulstd[mul_key])
except AssertionError:
LOG.debug("assert frame equal false on mulstd")
return False
try:
pd.testing.assert_frame_equal(self.grid, other.grid, check_like=True, check_exact=False)
return True
except AssertionError as ae:
if "values are different" in str(ae):
LOG.debug("assert frame equal false on grid")
return False
else:
raise