"""Container for the result of running thegenerate quantities (GQ) method"""fromcollectionsimportCounterfromtypingimport(Any,Dict,Generic,Hashable,List,MutableMapping,NoReturn,Optional,Tuple,TypeVar,Union,overload,)importnumpyasnpimportpandasaspdtry:importxarrayasxrXARRAY_INSTALLED=TrueexceptImportError:XARRAY_INSTALLED=Falsefromcmdstanpy.cmdstan_argsimportMethodfromcmdstanpy.utilsimportbuild_xarray_data,flatten_chains,get_loggerfromcmdstanpy.utils.stancsvimportscan_generic_csvfrom.mcmcimportCmdStanMCMCfrom.metadataimportInferenceMetadatafrom.mleimportCmdStanMLEfrom.runsetimportRunSetfrom.vbimportCmdStanVBFit=TypeVar('Fit',CmdStanMCMC,CmdStanMLE,CmdStanVB)
[docs]classCmdStanGQ(Generic[Fit]):""" Container for outputs from CmdStan generate_quantities run. Created by :meth:`CmdStanModel.generate_quantities`. """def__init__(self,runset:RunSet,previous_fit:Fit,)->None:"""Initialize object."""ifnotrunset.method==Method.GENERATE_QUANTITIES:raiseValueError('Wrong runset method, expecting generate_quantities runset, ''found method {}'.format(runset.method))self.runset=runsetself.previous_fit:Fit=previous_fitself._draws:np.ndarray=np.array(())config=self._validate_csv_files()self._metadata=InferenceMetadata(config)def__repr__(self)->str:repr='CmdStanGQ: model={} chains={}{}'.format(self.runset.model,self.chains,self.runset._args.method_args.compose(0,cmd=[]),)repr='{}\n csv_files:\n\t{}\n output_files:\n\t{}'.format(repr,'\n\t'.join(self.runset.csv_files),'\n\t'.join(self.runset.stdout_files),)returnreprdef__getattr__(self,attr:str)->np.ndarray:"""Synonymous with ``fit.stan_variable(attr)"""ifattr.startswith("_"):raiseAttributeError(f"Unknown variable name {attr}")try:returnself.stan_variable(attr)exceptValueErrorase:# pylint: disable=raise-missing-fromraiseAttributeError(*e.args)def__getstate__(self)->dict:# This function returns the mapping of objects to serialize with pickle.# See https://docs.python.org/3/library/pickle.html#object.__getstate__# for details. We call _assemble_generated_quantities to ensure# the data are loaded prior to serialization.self._assemble_generated_quantities()returnself.__dict__def_validate_csv_files(self)->Dict[str,Any]:""" Checks that Stan CSV output files for all chains are consistent and returns dict containing config and column names. Raises exception when inconsistencies detected. """dzero={}foriinrange(self.chains):ifi==0:dzero=scan_generic_csv(path=self.runset.csv_files[i],)else:drest=scan_generic_csv(path=self.runset.csv_files[i],)forkeyindzero:if(keynotin['id','fitted_params','diagnostic_file','metric_file','profile_file','init','seed','start_datetime',]anddzero[key]!=drest[key]):raiseValueError('CmdStan config mismatch in Stan CSV file {}: ''arg {} is {}, expected {}'.format(self.runset.csv_files[i],key,dzero[key],drest[key],))returndzero@propertydefchains(self)->int:"""Number of chains."""returnself.runset.chains@propertydefchain_ids(self)->List[int]:"""Chain ids."""returnself.runset.chain_ids@propertydefcolumn_names(self)->Tuple[str,...]:""" Names of generated quantities of interest. """returnself._metadata.cmdstan_config['column_names']# type: ignore@propertydefmetadata(self)->InferenceMetadata:""" Returns object which contains CmdStan configuration as well as information about the names and structure of the inference method and model output variables. """returnself._metadata
[docs]defdraws(self,*,inc_warmup:bool=False,inc_iterations:bool=False,concat_chains:bool=False,inc_sample:bool=False,)->np.ndarray:""" Returns a numpy.ndarray over the generated quantities draws from all chains which is stored column major so that the values for a parameter are contiguous in memory, likewise all draws from a chain are contiguous. By default, returns a 3D array arranged (draws, chains, columns); parameter ``concat_chains=True`` will return a 2D array where all chains are flattened into a single column, preserving chain order, so that given M chains of N draws, the first N draws are from chain 1, ..., and the the last N draws are from chain M. :param inc_warmup: When ``True`` and the warmup draws are present in the output, i.e., the sampler was run with ``save_warmup=True``, then the warmup draws are included. Default value is ``False``. :param concat_chains: When ``True`` return a 2D array flattening all all draws from all chains. Default value is ``False``. :param inc_sample: When ``True`` include all columns in the previous_fit draws array as well, excepting columns for variables already present in the generated quantities drawset. Default value is ``False``. See Also -------- CmdStanGQ.draws_pd CmdStanGQ.draws_xr CmdStanMCMC.draws """self._assemble_generated_quantities()inc_warmup|=inc_iterationsifinc_warmup:if(isinstance(self.previous_fit,CmdStanMCMC)andnotself.previous_fit._save_warmup):get_logger().warning("Sample doesn't contain draws from warmup iterations,"' rerun sampler with "save_warmup=True".')elif(isinstance(self.previous_fit,CmdStanMLE)andnotself.previous_fit._save_iterations):get_logger().warning("MLE doesn't contain draws from pre-convergence iterations,"' rerun optimization with "save_iterations=True".')elifisinstance(self.previous_fit,CmdStanVB):get_logger().warning("Variational fit doesn't make sense with argument "'"inc_warmup=True"')ifinc_sample:cols_1=self.previous_fit.column_namescols_2=self.column_namesdups=[itemforitem,countinCounter(cols_1+cols_2).items()ifcount>1]drop_cols:List[int]=[]fordupindups:drop_cols.extend(self.previous_fit._metadata.stan_vars[dup].columns())start_idx,_=self._draws_start(inc_warmup)previous_draws=self._previous_draws(True)ifconcat_chainsandinc_sample:returnflatten_chains(np.dstack((np.delete(previous_draws,drop_cols,axis=1),self._draws,))[start_idx:,:,:])ifconcat_chains:returnflatten_chains(self._draws[start_idx:,:,:])ifinc_sample:returnnp.dstack((np.delete(previous_draws,drop_cols,axis=1),self._draws,))[start_idx:,:,:]returnself._draws[start_idx:,:,:]
[docs]defdraws_pd(self,vars:Union[List[str],str,None]=None,inc_warmup:bool=False,inc_sample:bool=False,)->pd.DataFrame:""" Returns the generated quantities draws as a pandas DataFrame. Flattens all chains into single column. Container variables (array, vector, matrix) will span multiple columns, one column per element. E.g. variable 'matrix[2,2] foo' spans 4 columns: 'foo[1,1], ... foo[2,2]'. :param vars: optional list of variable names. :param inc_warmup: When ``True`` and the warmup draws are present in the output, i.e., the sampler was run with ``save_warmup=True``, then the warmup draws are included. Default value is ``False``. See Also -------- CmdStanGQ.draws CmdStanGQ.draws_xr CmdStanMCMC.draws_pd """ifvarsisnotNone:ifisinstance(vars,str):vars_list=[vars]else:vars_list=varsvars_list=list(dict.fromkeys(vars_list))ifinc_warmup:if(isinstance(self.previous_fit,CmdStanMCMC)andnotself.previous_fit._save_warmup):get_logger().warning("Sample doesn't contain draws from warmup iterations,"' rerun sampler with "save_warmup=True".')elif(isinstance(self.previous_fit,CmdStanMLE)andnotself.previous_fit._save_iterations):get_logger().warning("MLE doesn't contain draws from pre-convergence iterations,"' rerun optimization with "save_iterations=True".')elifisinstance(self.previous_fit,CmdStanVB):get_logger().warning("Variational fit doesn't make sense with argument "'"inc_warmup=True"')self._assemble_generated_quantities()all_columns=['chain__','iter__','draw__']+list(self.column_names)gq_cols:List[str]=[]mcmc_vars:List[str]=[]ifvarsisnotNone:forvarinvars_list:ifvarinself._metadata.stan_vars:info=self._metadata.stan_vars[var]gq_cols.extend(self.column_names[info.start_idx:info.end_idx])elif(inc_sampleandvarinself.previous_fit._metadata.stan_vars):info=self.previous_fit._metadata.stan_vars[var]mcmc_vars.extend(self.previous_fit.column_names[info.start_idx:info.end_idx])elifvarin['chain__','iter__','draw__']:gq_cols.append(var)else:raiseValueError('Unknown variable: {}'.format(var))else:gq_cols=all_columnsvars_list=gq_colsprevious_draws_pd=self._previous_draws_pd(mcmc_vars,inc_warmup)draws=self.draws(inc_warmup=inc_warmup)# add long-form columns for chain, iteration, drawn_draws,n_chains,_=draws.shapechains_col=(np.repeat(np.arange(1,n_chains+1),n_draws).reshape(1,n_chains,n_draws).T)iter_col=(np.tile(np.arange(1,n_draws+1),n_chains).reshape(1,n_chains,n_draws).T)draw_col=(np.arange(1,(n_draws*n_chains)+1).reshape(1,n_chains,n_draws).T)draws=np.concatenate([chains_col,iter_col,draw_col,draws],axis=2)draws_pd=pd.DataFrame(data=flatten_chains(draws),columns=all_columns,)ifinc_sampleandmcmc_vars:ifgq_cols:returnpd.concat([previous_draws_pd,draws_pd[gq_cols],],axis='columns',)[vars_list]else:returnprevious_draws_pdelifinc_sampleandvarsisNone:cols_1=list(previous_draws_pd.columns)cols_2=list(draws_pd.columns)dups=[itemforitem,countinCounter(cols_1+cols_2).items()ifcount>1]returnpd.concat([previous_draws_pd.drop(columns=dups).reset_index(drop=True),draws_pd,],axis=1,)elifgq_cols:returndraws_pd[gq_cols]returndraws_pd
[docs]defdraws_xr(self,vars:Union[str,List[str],None]=None,inc_warmup:bool=False,inc_sample:bool=False,)->"xr.Dataset":""" Returns the generated quantities draws as a xarray Dataset. This method can only be called when the underlying fit was made through sampling, it cannot be used on MLE or VB outputs. :param vars: optional list of variable names. :param inc_warmup: When ``True`` and the warmup draws are present in the MCMC sample, then the warmup draws are included. Default value is ``False``. See Also -------- CmdStanGQ.draws CmdStanGQ.draws_pd CmdStanMCMC.draws_xr """ifnotXARRAY_INSTALLED:raiseRuntimeError('Package "xarray" is not installed, cannot produce draws array.')ifnotisinstance(self.previous_fit,CmdStanMCMC):raiseRuntimeError('Method "draws_xr" is only available when ''original fit is done via Sampling.')mcmc_vars_list=[]dup_vars=[]ifvarsisnotNone:ifisinstance(vars,str):vars_list=[vars]else:vars_list=varsforvarinvars_list:ifvarnotinself._metadata.stan_vars:ifinc_sampleand(varinself.previous_fit._metadata.stan_vars):mcmc_vars_list.append(var)dup_vars.append(var)else:raiseValueError('Unknown variable: {}'.format(var))else:vars_list=list(self._metadata.stan_vars.keys())ifinc_sample:forvarinself.previous_fit._metadata.stan_vars.keys():ifvarnotinvars_listandvarnotinmcmc_vars_list:mcmc_vars_list.append(var)forvarindup_vars:vars_list.remove(var)self._assemble_generated_quantities()num_draws=self.previous_fit.num_draws_samplingsample_config=self.previous_fit._metadata.cmdstan_configattrs:MutableMapping[Hashable,Any]={"stan_version":f"{sample_config['stan_version_major']}."f"{sample_config['stan_version_minor']}."f"{sample_config['stan_version_patch']}","model":sample_config["model"],"num_draws_sampling":num_draws,}ifinc_warmupandsample_config['save_warmup']:num_draws+=self.previous_fit.num_draws_warmupattrs["num_draws_warmup"]=self.previous_fit.num_draws_warmupdata:MutableMapping[Hashable,Any]={}coordinates:MutableMapping[Hashable,Any]={"chain":self.chain_ids,"draw":np.arange(num_draws),}forvarinvars_list:build_xarray_data(data,self._metadata.stan_vars[var],self.draws(inc_warmup=inc_warmup),)ifinc_sample:forvarinmcmc_vars_list:build_xarray_data(data,self.previous_fit._metadata.stan_vars[var],self.previous_fit.draws(inc_warmup=inc_warmup),)returnxr.Dataset(data,coords=coordinates,attrs=attrs).transpose('chain','draw',...)
[docs]defstan_variable(self,var:str,**kwargs:bool)->np.ndarray:""" Return a numpy.ndarray which contains the set of draws for the named Stan program variable. Flattens the chains, leaving the draws in chain order. The first array dimension, corresponds to number of draws in the sample. The remaining dimensions correspond to the shape of the Stan program variable. Underlyingly draws are in chain order, i.e., for a sample with N chains of M draws each, the first M array elements are from chain 1, the next M are from chain 2, and the last M elements are from chain N. * If the variable is a scalar variable, the return array has shape ( draws * chains, 1). * If the variable is a vector, the return array has shape ( draws * chains, len(vector)) * If the variable is a matrix, the return array has shape ( draws * chains, size(dim 1), size(dim 2) ) * If the variable is an array with N dimensions, the return array has shape ( draws * chains, size(dim 1), ..., size(dim N)) For example, if the Stan program variable ``theta`` is a 3x3 matrix, and the sample consists of 4 chains with 1000 post-warmup draws, this function will return a numpy.ndarray with shape (4000,3,3). This functionaltiy is also available via a shortcut using ``.`` - writing ``fit.a`` is a synonym for ``fit.stan_variable("a")`` :param var: variable name :param kwargs: Additional keyword arguments are passed to the underlying fit's ``stan_variable`` method if the variable is not a generated quantity. See Also -------- CmdStanGQ.stan_variables CmdStanMCMC.stan_variable CmdStanMLE.stan_variable CmdStanPathfinder.stan_variable CmdStanVB.stan_variable CmdStanLaplace.stan_variable """model_var_names=self.previous_fit._metadata.stan_vars.keys()gq_var_names=self._metadata.stan_vars.keys()ifnot(varinmodel_var_namesorvaringq_var_names):raiseValueError(f'Unknown variable name: {var}\n''Available variables are '+", ".join(model_var_names|gq_var_names))ifvarnotingq_var_names:# TODO(2.0) atleast1d may not be neededreturnnp.atleast_1d(# type: ignoreself.previous_fit.stan_variable(var,**kwargs))# is gq variableself._assemble_generated_quantities()draw1,_=self._draws_start(inc_warmup=kwargs.get('inc_warmup',False)orkwargs.get('inc_iterations',False))draws=flatten_chains(self._draws[draw1:])out:np.ndarray=self._metadata.stan_vars[var].extract_reshape(draws)returnout
[docs]defstan_variables(self,**kwargs:bool)->Dict[str,np.ndarray]:""" Return a dictionary mapping Stan program variables names to the corresponding numpy.ndarray containing the inferred values. :param kwargs: Additional keyword arguments are passed to the underlying fit's ``stan_variable`` method if the variable is not a generated quantity. See Also -------- CmdStanGQ.stan_variable CmdStanMCMC.stan_variables CmdStanMLE.stan_variables CmdStanPathfinder.stan_variables CmdStanVB.stan_variables CmdStanLaplace.stan_variables """result={}sample_var_names=self.previous_fit._metadata.stan_vars.keys()gq_var_names=self._metadata.stan_vars.keys()fornameingq_var_names:result[name]=self.stan_variable(name,**kwargs)fornameinsample_var_names:ifnamenotingq_var_names:result[name]=self.stan_variable(name,**kwargs)returnresult
def_assemble_generated_quantities(self)->None:ifself._draws.shape!=(0,):return# use numpy loadtxt_,num_draws=self._draws_start(inc_warmup=True)gq_sample:np.ndarray=np.empty((num_draws,self.chains,len(self.column_names)),dtype=float,order='F',)forchaininrange(self.chains):withopen(self.runset.csv_files[chain],'r')asfd:lines=(lineforlineinfdifnotline.startswith('#'))gq_sample[:,chain,:]=np.loadtxt(lines,dtype=np.ndarray,ndmin=2,skiprows=1,delimiter=',')self._draws=gq_sampledef_draws_start(self,inc_warmup:bool)->Tuple[int,int]:draw1=0p_fit=self.previous_fitifisinstance(p_fit,CmdStanMCMC):num_draws=p_fit.num_draws_samplingifp_fit._save_warmup:ifinc_warmup:num_draws+=p_fit.num_draws_warmupelse:draw1=p_fit.num_draws_warmupelifisinstance(p_fit,CmdStanMLE):num_draws=1ifp_fit._save_iterations:opt_iters=len(p_fit.optimized_iterations_np)# type: ignoreifinc_warmup:num_draws=opt_iterselse:draw1=opt_iters-1else:# CmdStanVB:draw1=1# skip meannum_draws=p_fit.variational_sample.shape[0]ifinc_warmup:num_draws+=1returndraw1,num_drawsdef_previous_draws(self,inc_warmup:bool)->np.ndarray:""" Extract the draws from self.previous_fit. Return is always 3-d """p_fit=self.previous_fitifisinstance(p_fit,CmdStanMCMC):returnp_fit.draws(inc_warmup=inc_warmup)elifisinstance(p_fit,CmdStanMLE):ifinc_warmupandp_fit._save_iterations:returnp_fit.optimized_iterations_np[:,None]# type: ignorereturnnp.atleast_2d(# type: ignorep_fit.optimized_params_np,)[:,None]else:# CmdStanVB:ifinc_warmup:returnnp.vstack([p_fit.variational_params_np,p_fit.variational_sample])[:,None]returnp_fit.variational_sample[:,None]def_previous_draws_pd(self,vars:List[str],inc_warmup:bool)->pd.DataFrame:ifvars:sel:Union[List[str],slice]=varselse:sel=slice(None,None)p_fit=self.previous_fitifisinstance(p_fit,CmdStanMCMC):returnp_fit.draws_pd(varsorNone,inc_warmup=inc_warmup)elifisinstance(p_fit,CmdStanMLE):ifinc_warmupandp_fit._save_iterations:returnp_fit.optimized_iterations_pd[sel]# type: ignoreelse:returnp_fit.optimized_params_pd[sel]else:# CmdStanVB:returnp_fit.variational_sample_pd[sel]
[docs]defsave_csvfiles(self,dir:Optional[str]=None)->None:""" Move output CSV files to specified directory. If files were written to the temporary session directory, clean filename. E.g., save 'bernoulli-201912081451-1-5nm6as7u.csv' as 'bernoulli-201912081451-1.csv'. :param dir: directory path See Also -------- stanfit.RunSet.save_csvfiles cmdstanpy.from_csv """self.runset.save_csvfiles(dir)
# TODO(2.0): remove@propertydefmcmc_sample(self)->Union[CmdStanMCMC,CmdStanMLE,CmdStanVB]:get_logger().warning("Property `mcmc_sample` is deprecated, use `previous_fit` instead")returnself.previous_fit