import sys
from pathlib import Path
from typing import List, Union
import logging
import pandas as pd
from cascade_at.context.model_context import Context
from cascade_at.core.log import get_loggers, LEVELS
from cascade_at.dismod.api.dismod_io import DismodIO
from cascade_at.dismod.api.fill_extract_helpers.data_tables import prep_data_avgint
from cascade_at.dismod.api.fill_extract_helpers.posterior_to_prior import get_prior_avgint_grid
from cascade_at.dismod.api.multithreading import _DismodThread, dmdismod_in_parallel
from cascade_at.dismod.api.run_dismod import run_dismod_commands
from cascade_at.executor.args.arg_utils import ArgumentList
from cascade_at.executor.args.args import LogLevel, BoolArg, ListArg
from cascade_at.executor.args.args import ModelVersionID, ParentLocationID, SexID, NSim, NPool
from cascade_at.executor.dismod_db import save_predictions
from cascade_at.inputs.measurement_inputs import MeasurementInputs
from cascade_at.model.grid_alchemy import Alchemy
from cascade_at.model.utilities.integrand_grids import integrand_grids
from cascade_at.settings.settings import SettingsConfig
LOG = get_loggers(__name__)
ARG_LIST = ArgumentList([
ModelVersionID(),
ParentLocationID(),
SexID(),
NSim(),
NPool(),
ListArg('--child-locations', help='child locations to make predictions for', type=int, required=False),
ListArg('--child-sexes', help='sexes to make predictions for', type=int, required=False),
BoolArg('--prior-grid', help='whether to predict on the prior grid or the regular avgint grid'),
BoolArg('--save-fit', help='whether to save the results of the predict sample as the fit'),
BoolArg('--save-final', help='whether to save results as final'),
BoolArg('--sample', help='whether to predict from the sample table or the fit_var table'),
LogLevel()
])
[docs]def fill_avgint_with_priors_grid(inputs: MeasurementInputs, alchemy: Alchemy, settings: SettingsConfig,
source_db_path: Union[str, Path],
child_locations: List[int], child_sexes: List[int]):
"""
Fill the average integrand table with the grid that the priors are on.
This is so that we can "predict" the prior for the next level of the cascade.
Parameters
----------
inputs
An inputs object
alchemy
A grid alchemy object
settings
A settings configuration object
source_db_path
The path of the source database that has had a fit on it
child_locations
The child locations to predict for
child_sexes
The child sexes to predict for
"""
sourceDB = DismodIO(path=source_db_path)
rates = [r.rate for r in settings.rate]
grids = integrand_grids(alchemy=alchemy, integrands=rates)
posterior_grid = get_prior_avgint_grid(
grids=grids,
sexes=child_sexes,
locations=child_locations,
midpoint=False
)
posterior_grid = inputs.add_covariates_to_data(df=posterior_grid)
posterior_grid = prep_data_avgint(
df=posterior_grid,
node_df=sourceDB.node,
covariate_df=sourceDB.covariate
)
posterior_grid.rename(columns={'sex_id': 'c_sex_id'}, inplace=True)
sourceDB.avgint = posterior_grid
[docs]class Predict(_DismodThread):
"""
Predicts for a database in parallel. Chops up the sample table
into a bunch of copies, each with only one sample.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def _process(self, db: str):
dbio = DismodIO(path=db)
n_var = len(dbio.var)
this_sample = dbio.sample.loc[dbio.sample.sample_index == self.index].copy()
this_sample['sample_index'] = 0
this_sample['sample_id'] = this_sample['var_id']
dbio.sample = this_sample
del dbio
run_dismod_commands(
dm_file=db,
commands=[f'predict sample']
)
dbio = DismodIO(path=db)
predict = dbio.predict
predict['sample_index'] = self.index
return predict
[docs]def predict_sample_sequence(path: Union[str, Path], table: str):
"""
Runs predict for either fit_var or sample, based on the table.
"""
run_dismod_commands(
dm_file=path,
commands=[f'predict {table}']
)
[docs]def predict_sample_pool(main_db: Union[str, Path], index_file_pattern: str,
n_sim: int, n_pool: int):
"""
Run predict sample in a pool by making copies of the existing database
and splitting out the sample table into n_sim databases, running
predict sample on each of them, and combining the results back
into the main database.
"""
predict = Predict(
main_db=main_db,
index_file_pattern=index_file_pattern
)
predictions = dmdismod_in_parallel(
dm_thread=predict,
sims=list(range(n_sim)),
n_pool=n_pool
)
predictions = pd.DataFrame().append(predictions).reset_index(drop=True)
d = DismodIO(path=main_db)
return predictions[['sample_index', 'avgint_id', 'avg_integrand']]
[docs]def predict_sample(model_version_id: int, parent_location_id: int, sex_id: int,
child_locations: List[int], child_sexes: List[int],
prior_grid: bool = True, save_fit: bool = False, save_final: bool = False,
sample: bool = False, n_sim: int = 1, n_pool: int = 1) -> None:
"""
Takes a database that has already had a fit and simulate sample run on it,
fills the avgint table for the child_locations and child_sexes you want to make
predictions for, and then predicts on that grid. Makes predictions on the grid
that is specified for the primary rates in the model, for the primary rates only.
Parameters
----------
model_version_id
The model version ID
parent_location_id
The parent location ID that specifies where the database is stored
sex_id
The sex ID that specifies where the database is stored
child_locations
The child locations to make predictions for on the rate grid
child_sexes
The child sexes to make predictions for on the rate grid
prior_grid
Whether or not to replace the default gbd-avgint grid with
a prior grid for the rates.
save_fit
Whether or not to save the fit for upload later.
save_final
Whether or not to save the final for upload later.
sample
Whether to predict from the sample table or the fit_var table
n_sim
The number of simulations to predict for
n_pool
The number of multiprocessing pools to create. If 1, then will not
run with pools but just run all simulations together in one dmdismod command.
"""
predictions = None
context = Context(model_version_id=model_version_id)
inputs, alchemy, settings = context.read_inputs()
main_db = context.db_file(location_id=parent_location_id, sex_id=sex_id)
index_file_pattern = context.db_index_file_pattern(location_id=parent_location_id, sex_id=sex_id)
if sample:
table = 'sample'
else:
table = 'fit_var'
if prior_grid:
fill_avgint_with_priors_grid(
inputs=inputs, alchemy=alchemy, settings=settings, source_db_path=main_db,
child_locations=child_locations, child_sexes=child_sexes
)
if sample and (n_pool > 1):
predictions = predict_sample_pool(
main_db=main_db, index_file_pattern=index_file_pattern,
n_sim=n_sim, n_pool=n_pool
)
else:
predict_sample_sequence(path=main_db, table=table)
if save_fit or save_final:
if len(child_locations) == 0:
locations = inputs.location_dag.parent_children(parent_location_id)
else:
locations = child_locations
if len(child_sexes) == 0:
sexes = [sex_id]
else:
sexes = child_sexes
out_dirs = []
if save_fit:
out_dirs.append(context.fit_dir)
if save_final:
out_dirs.append(context.draw_dir)
for folder in out_dirs:
save_predictions(
db_file=main_db,
locations=locations, sexes=sexes,
model_version_id=model_version_id,
gbd_round_id=settings.gbd_round_id,
out_dir=folder,
sample=sample,
predictions=predictions
)
def main():
args = ARG_LIST.parse_args(sys.argv[1:])
logging.basicConfig(level=LEVELS[args.log_level])
predict_sample(
model_version_id=args.model_version_id,
parent_location_id=args.parent_location_id,
sex_id=args.sex_id,
child_locations=args.child_locations,
child_sexes=args.child_sexes,
prior_grid=args.prior_grid,
save_fit=args.save_fit,
save_final=args.save_final,
sample=args.sample,
n_sim=args.n_sim,
n_pool=args.n_pool
)
if __name__ == '__main__':
main()