"""Container for the result of running the sample (MCMC) method"""importmathimportosfromioimportStringIOfromtypingimport(Any,Dict,Hashable,List,MutableMapping,Optional,Sequence,Tuple,Union,)importnumpyasnpimportpandasaspdtry:importxarrayasxrXARRAY_INSTALLED=TrueexceptImportError:XARRAY_INSTALLED=Falsefromcmdstanpyimport_CMDSTAN_SAMPLING,_CMDSTAN_THIN,_CMDSTAN_WARMUP,_TMPDIRfromcmdstanpy.cmdstan_argsimportMethod,SamplerArgsfromcmdstanpy.utilsimport(EXTENSION,build_xarray_data,check_sampler_csv,cmdstan_path,cmdstan_version_before,create_named_text_file,do_command,flatten_chains,get_logger,)from.metadataimportInferenceMetadatafrom.runsetimportRunSet
[docs]classCmdStanMCMC:""" Container for outputs from CmdStan sampler run. Provides methods to summarize and diagnose the model fit and accessor methods to access the entire sample or individual items. Created by :meth:`CmdStanModel.sample` The sample is lazily instantiated on first access of either the resulting sample or the HMC tuning parameters, i.e., the step size and metric. """# pylint: disable=too-many-public-methodsdef__init__(self,runset:RunSet,)->None:"""Initialize object."""ifnotrunset.method==Method.SAMPLE:raiseValueError('Wrong runset method, expecting sample runset, ''found method {}'.format(runset.method))self.runset=runset# info from runset to be exposedsampler_args=self.runset._args.method_argsassertisinstance(sampler_args,SamplerArgs)# make the typechecker happyself._iter_sampling:int=_CMDSTAN_SAMPLINGifsampler_args.iter_samplingisnotNone:self._iter_sampling=sampler_args.iter_samplingself._iter_warmup:int=_CMDSTAN_WARMUPifsampler_args.iter_warmupisnotNone:self._iter_warmup=sampler_args.iter_warmupself._thin:int=_CMDSTAN_THINifsampler_args.thinisnotNone:self._thin=sampler_args.thinself._is_fixed_param=sampler_args.fixed_paramself._save_warmup:bool=sampler_args.save_warmupself._sig_figs=runset._args.sig_figs# info from CSV values, instantiated lazilyself._draws:np.ndarray=np.array(())# only valid when not is_fixed_paramself._metric:np.ndarray=np.array(())self._step_size:np.ndarray=np.array(())self._divergences:np.ndarray=np.zeros(self.runset.chains,dtype=int)self._max_treedepths:np.ndarray=np.zeros(self.runset.chains,dtype=int)# info from CSV initial comments and headerconfig=self._validate_csv_files()self._metadata:InferenceMetadata=InferenceMetadata(config)ifnotself._is_fixed_param:self._check_sampler_diagnostics()def__repr__(self)->str:repr='CmdStanMCMC: model={} chains={}{}'.format(self.runset.model,self.runset.chains,self.runset._args.method_args.compose(0,cmd=[]),)repr='{}\n csv_files:\n\t{}\n output_files:\n\t{}'.format(repr,'\n\t'.join(self.runset.csv_files),'\n\t'.join(self.runset.stdout_files),)# TODO - hamiltonian, profiling filesreturnreprdef__getattr__(self,attr:str)->np.ndarray:"""Synonymous with ``fit.stan_variable(attr)"""ifattr.startswith("_"):raiseAttributeError(f"Unknown variable name {attr}")try:returnself.stan_variable(attr)exceptValueErrorase:# pylint: disable=raise-missing-fromraiseAttributeError(*e.args)def__getstate__(self)->dict:# This function returns the mapping of objects to serialize with pickle.# See https://docs.python.org/3/library/pickle.html#object.__getstate__# for details. We call _assemble_draws to ensure posterior samples have# been loaded prior to serialization.self._assemble_draws()returnself.__dict__@propertydefchains(self)->int:"""Number of chains."""returnself.runset.chains@propertydefchain_ids(self)->List[int]:"""Chain ids."""returnself.runset.chain_ids@propertydefnum_draws_warmup(self)->int:"""Number of warmup draws per chain, i.e., thinned warmup iterations."""returnint(math.ceil((self._iter_warmup)/self._thin))@propertydefnum_draws_sampling(self)->int:""" Number of sampling (post-warmup) draws per chain, i.e., thinned sampling iterations. """returnint(math.ceil((self._iter_sampling)/self._thin))@propertydefmetadata(self)->InferenceMetadata:""" Returns object which contains CmdStan configuration as well as information about the names and structure of the inference method and model output variables. """returnself._metadata@propertydefcolumn_names(self)->Tuple[str,...]:""" Names of all outputs from the sampler, comprising sampler parameters and all components of all model parameters, transformed parameters, and quantities of interest. Corresponds to Stan CSV file header row, with names munged to array notation, e.g. `beta[1]` not `beta.1`. """returnself._metadata.cmdstan_config['column_names']# type: ignore@propertydefmetric_type(self)->Optional[str]:""" Metric type used for adaptation, either 'diag_e' or 'dense_e', according to CmdStan arg 'metric'. When sampler algorithm 'fixed_param' is specified, metric_type is None. """return(self._metadata.cmdstan_config['metric']ifnotself._is_fixed_paramelseNone)@propertydefmetric(self)->Optional[np.ndarray]:""" Metric used by sampler for each chain. When sampler algorithm 'fixed_param' is specified, metric is None. """ifself._is_fixed_param:returnNoneifself._metadata.cmdstan_config['metric']=='unit_e':get_logger().info('Unit diagnonal metric, inverse mass matrix size unknown.')returnNoneself._assemble_draws()returnself._metric@propertydefstep_size(self)->Optional[np.ndarray]:""" Step size used by sampler for each chain. When sampler algorithm 'fixed_param' is specified, step size is None. """self._assemble_draws()returnself._step_sizeifnotself._is_fixed_paramelseNone@propertydefthin(self)->int:""" Period between recorded iterations. (Default is 1). """returnself._thin@propertydefdivergences(self)->Optional[np.ndarray]:""" Per-chain total number of post-warmup divergent iterations. When sampler algorithm 'fixed_param' is specified, returns None. """returnself._divergencesifnotself._is_fixed_paramelseNone@propertydefmax_treedepths(self)->Optional[np.ndarray]:""" Per-chain total number of post-warmup iterations where the NUTS sampler reached the maximum allowed treedepth. When sampler algorithm 'fixed_param' is specified, returns None. """returnself._max_treedepthsifnotself._is_fixed_paramelseNone
[docs]defdraws(self,*,inc_warmup:bool=False,concat_chains:bool=False)->np.ndarray:""" Returns a numpy.ndarray over all draws from all chains which is stored column major so that the values for a parameter are contiguous in memory, likewise all draws from a chain are contiguous. By default, returns a 3D array arranged (draws, chains, columns); parameter ``concat_chains=True`` will return a 2D array where all chains are flattened into a single column, preserving chain order, so that given M chains of N draws, the first N draws are from chain 1, up through the last N draws from chain M. :param inc_warmup: When ``True`` and the warmup draws are present in the output, i.e., the sampler was run with ``save_warmup=True``, then the warmup draws are included. Default value is ``False``. :param concat_chains: When ``True`` return a 2D array flattening all all draws from all chains. Default value is ``False``. See Also -------- CmdStanMCMC.draws_pd CmdStanMCMC.draws_xr CmdStanGQ.draws """self._assemble_draws()ifinc_warmupandnotself._save_warmup:get_logger().warning("Sample doesn't contain draws from warmup iterations,"' rerun sampler with "save_warmup=True".')start_idx=0ifnotinc_warmupandself._save_warmup:start_idx=self.num_draws_warmupifconcat_chains:returnflatten_chains(self._draws[start_idx:,:,:])returnself._draws[start_idx:,:,:]
def_validate_csv_files(self)->Dict[str,Any]:""" Checks that Stan CSV output files for all chains are consistent and returns dict containing config and column names. Tabulates sampling iters which are divergent or at max treedepth Raises exception when inconsistencies detected. """dzero={}foriinrange(self.chains):ifi==0:dzero=check_sampler_csv(path=self.runset.csv_files[i],is_fixed_param=self._is_fixed_param,iter_sampling=self._iter_sampling,iter_warmup=self._iter_warmup,save_warmup=self._save_warmup,thin=self._thin,)ifnotself._is_fixed_param:self._divergences[i]=dzero['ct_divergences']self._max_treedepths[i]=dzero['ct_max_treedepth']else:drest=check_sampler_csv(path=self.runset.csv_files[i],is_fixed_param=self._is_fixed_param,iter_sampling=self._iter_sampling,iter_warmup=self._iter_warmup,save_warmup=self._save_warmup,thin=self._thin,)forkeyindzero:# check args that matter for parsing, plus name, versionif(keyin['stan_version_major','stan_version_minor','stan_version_patch','stanc_version','model','num_samples','num_warmup','save_warmup','thin','refresh',]anddzero[key]!=drest[key]):raiseValueError('CmdStan config mismatch in Stan CSV file {}: ''arg {} is {}, expected {}'.format(self.runset.csv_files[i],key,dzero[key],drest[key],))ifnotself._is_fixed_param:self._divergences[i]=drest['ct_divergences']self._max_treedepths[i]=drest['ct_max_treedepth']returndzerodef_check_sampler_diagnostics(self)->None:""" Warn if any iterations ended in divergences or hit maxtreedepth. """ifnp.any(self._divergences)ornp.any(self._max_treedepths):diagnostics=['Some chains may have failed to converge.']ct_iters=self._metadata.cmdstan_config['num_samples']foriinrange(self.runset._chains):ifself._divergences[i]>0:diagnostics.append(f'Chain {i+1} had {self._divergences[i]} ''divergent transitions 'f'({((self._divergences[i]/ct_iters)*100):.1f}%)')ifself._max_treedepths[i]>0:diagnostics.append(f'Chain {i+1} had {self._max_treedepths[i]} ''iterations at max treedepth 'f'({((self._max_treedepths[i]/ct_iters)*100):.1f}%)')diagnostics.append('Use the "diagnose()" method on the CmdStanMCMC object'' to see further information.')get_logger().warning('\n\t'.join(diagnostics))def_assemble_draws(self)->None:""" Allocates and populates the step size, metric, and sample arrays by parsing the validated stan_csv files. """ifself._draws.shape!=(0,):returnnum_draws=self.num_draws_samplingsampling_iter_start=0ifself._save_warmup:num_draws+=self.num_draws_warmupsampling_iter_start=self.num_draws_warmupself._draws=np.empty((num_draws,self.chains,len(self.column_names)),dtype=float,order='F',)self._step_size=np.empty(self.chains,dtype=float)forchaininrange(self.chains):withopen(self.runset.csv_files[chain],'r')asfd:line=fd.readline().strip()# read initial comments, CSV header rowwhilelen(line)>0andline.startswith('#'):line=fd.readline().strip()ifnotself._is_fixed_param:# handle warmup draws, if anyifself._save_warmup:foriinrange(self.num_draws_warmup):line=fd.readline().strip()xs=line.split(',')self._draws[i,chain,:]=[float(x)forxinxs]line=fd.readline().strip()ifline!='# Adaptation terminated':# shouldn't happen?whileline!='# Adaptation terminated':line=fd.readline().strip()# step_size, metric (diag_e and dense_e only)line=fd.readline().strip()_,step_size=line.split('=')self._step_size[chain]=float(step_size.strip())ifself._metadata.cmdstan_config['metric']!='unit_e':line=fd.readline().strip()# metric typeline=fd.readline().lstrip(' #\t').rstrip()num_unconstrained_params=len(line.split(','))ifchain==0:# can't allocate w/o num paramsifself.metric_type=='diag_e':self._metric=np.empty((self.chains,num_unconstrained_params),dtype=float,)else:self._metric=np.empty((self.chains,num_unconstrained_params,num_unconstrained_params,),dtype=float,)ifline:ifself.metric_type=='diag_e':xs=line.split(',')self._metric[chain,:]=[float(x)forxinxs]else:xs=line.strip().split(',')self._metric[chain,0,:]=[float(x)forxinxs]foriinrange(1,num_unconstrained_params):line=fd.readline().lstrip(' #\t').rstrip()xs=line.split(',')self._metric[chain,i,:]=[float(x)forxinxs]else:# unit_e changed in 2.34 to have an extra linepos=fd.tell()line=fd.readline().strip()ifnotline.startswith('#'):fd.seek(pos)# process drawsforiinrange(sampling_iter_start,num_draws):line=fd.readline().strip()xs=line.split(',')self._draws[i,chain,:]=[float(x)forxinxs]assertself._drawsisnotNone
[docs]defsummary(self,percentiles:Sequence[int]=(5,50,95),sig_figs:int=6,)->pd.DataFrame:""" Run cmdstan/bin/stansummary over all output CSV files, assemble summary into DataFrame object. The first row contains statistics for the total joint log probability `lp__`, but is omitted when the Stan model has no parameters. The remaining rows contain summary statistics for all parameters, transformed parameters, and generated quantities variables, in program declaration order. :param percentiles: Ordered non-empty sequence of percentiles to report. Must be integers from (1, 99), inclusive. Defaults to ``(5, 50, 95)`` :param sig_figs: Number of significant figures to report. Must be an integer between 1 and 18. If unspecified, the default precision for the system file I/O is used; the usual value is 6. If precision above 6 is requested, sample must have been produced by CmdStan version 2.25 or later and sampler output precision must equal to or greater than the requested summary precision. :return: pandas.DataFrame """iflen(percentiles)==0:raiseValueError('Invalid percentiles argument, must be ordered'' non-empty list from (1, 99), inclusive.')cur_pct=0forpctinpercentiles:ifpct>99ornotpct>cur_pct:raiseValueError('Invalid percentiles spec, must be ordered'' non-empty list from (1, 99), inclusive.')cur_pct=pctpercentiles_str=(f"--percentiles= {','.join(str(x)forxinpercentiles)}")ifnotisinstance(sig_figs,int)orsig_figs<1orsig_figs>18:raiseValueError('Keyword "sig_figs" must be an integer between 1 and 18,'' found {}'.format(sig_figs))csv_sig_figs=self._sig_figsor6ifsig_figs>csv_sig_figs:get_logger().warning('Requesting %d significant digits of output, but CSV files'' only have %d digits of precision.',sig_figs,csv_sig_figs,)sig_figs_str=f'--sig_figs={sig_figs}'cmd_path=os.path.join(cmdstan_path(),'bin','stansummary'+EXTENSION)tmp_csv_file='stansummary-{}-'.format(self.runset._args.model_name)tmp_csv_path=create_named_text_file(dir=_TMPDIR,prefix=tmp_csv_file,suffix='.csv',name_only=True)csv_str='--csv_filename={}'.format(tmp_csv_path)# TODO: remove at some future releaseifcmdstan_version_before(2,24):csv_str='--csv_file={}'.format(tmp_csv_path)cmd=[cmd_path,percentiles_str,sig_figs_str,csv_str,]+self.runset.csv_filesdo_command(cmd,fd_out=None)withopen(tmp_csv_path,'rb')asfd:summary_data=pd.read_csv(fd,delimiter=',',header=0,index_col=0,comment='#',float_precision='high',)mask=([notx.endswith('__')forxinsummary_data.index]ifself._is_fixed_paramelse[x=='lp__'ornotx.endswith('__')forxinsummary_data.index])summary_data.index.name=Nonereturnsummary_data[mask]
[docs]defdiagnose(self)->Optional[str]:""" Run cmdstan/bin/diagnose over all output CSV files, return console output. The diagnose utility reads the outputs of all chains and checks for the following potential problems: + Transitions that hit the maximum treedepth + Divergent transitions + Low E-BFMI values (sampler transitions HMC potential energy) + Low effective sample sizes + High R-hat values """cmd_path=os.path.join(cmdstan_path(),'bin','diagnose'+EXTENSION)cmd=[cmd_path]+self.runset.csv_filesresult=StringIO()do_command(cmd=cmd,fd_out=result)returnresult.getvalue()
[docs]defdraws_pd(self,vars:Union[List[str],str,None]=None,inc_warmup:bool=False,)->pd.DataFrame:""" Returns the sample draws as a pandas DataFrame. Flattens all chains into single column. Container variables (array, vector, matrix) will span multiple columns, one column per element. E.g. variable 'matrix[2,2] foo' spans 4 columns: 'foo[1,1], ... foo[2,2]'. :param vars: optional list of variable names. :param inc_warmup: When ``True`` and the warmup draws are present in the output, i.e., the sampler was run with ``save_warmup=True``, then the warmup draws are included. Default value is ``False``. See Also -------- CmdStanMCMC.draws CmdStanMCMC.draws_xr CmdStanGQ.draws_pd """ifvarsisnotNone:ifisinstance(vars,str):vars_list=[vars]else:vars_list=varsifinc_warmupandnotself._save_warmup:get_logger().warning('Draws from warmup iterations not available,'' must run sampler with "save_warmup=True".')self._assemble_draws()cols=[]ifvarsisnotNone:forvarindict.fromkeys(vars_list):ifvarinself._metadata.method_vars:cols.append(var)elifvarinself._metadata.stan_vars:info=self._metadata.stan_vars[var]cols.extend(self.column_names[info.start_idx:info.end_idx])elifvarin['chain__','iter__','draw__']:cols.append(var)else:raiseValueError(f'Unknown variable: {var}')else:cols=['chain__','iter__','draw__']+list(self.column_names)draws=self.draws(inc_warmup=inc_warmup)# add long-form columns for chain, iteration, drawn_draws,n_chains,_=draws.shapechains_col=(np.repeat(np.arange(1,n_chains+1),n_draws).reshape(1,n_chains,n_draws).T)iter_col=(np.tile(np.arange(1,n_draws+1),n_chains).reshape(1,n_chains,n_draws).T)draw_col=(np.arange(1,(n_draws*n_chains)+1).reshape(1,n_chains,n_draws).T)draws=np.concatenate([chains_col,iter_col,draw_col,draws],axis=2)returnpd.DataFrame(data=flatten_chains(draws),columns=['chain__','iter__','draw__']+list(self.column_names),)[cols]
[docs]defdraws_xr(self,vars:Union[str,List[str],None]=None,inc_warmup:bool=False)->"xr.Dataset":""" Returns the sampler draws as a xarray Dataset. :param vars: optional list of variable names. :param inc_warmup: When ``True`` and the warmup draws are present in the output, i.e., the sampler was run with ``save_warmup=True``, then the warmup draws are included. Default value is ``False``. See Also -------- CmdStanMCMC.draws CmdStanMCMC.draws_pd CmdStanGQ.draws_xr """ifnotXARRAY_INSTALLED:raiseRuntimeError('Package "xarray" is not installed, cannot produce draws array.')ifinc_warmupandnotself._save_warmup:get_logger().warning("Draws from warmup iterations not available,"' must run sampler with "save_warmup=True".')ifvarsisNone:vars_list=list(self._metadata.stan_vars.keys())elifisinstance(vars,str):vars_list=[vars]else:vars_list=varsself._assemble_draws()num_draws=self.num_draws_samplingmeta=self._metadata.cmdstan_configattrs:MutableMapping[Hashable,Any]={"stan_version":f"{meta['stan_version_major']}."f"{meta['stan_version_minor']}.{meta['stan_version_patch']}","model":meta["model"],"num_draws_sampling":num_draws,}ifinc_warmupandself._save_warmup:num_draws+=self.num_draws_warmupattrs["num_draws_warmup"]=self.num_draws_warmupdata:MutableMapping[Hashable,Any]={}coordinates:MutableMapping[Hashable,Any]={"chain":self.chain_ids,"draw":np.arange(num_draws),}forvarinvars_list:build_xarray_data(data,self._metadata.stan_vars[var],self.draws(inc_warmup=inc_warmup),)returnxr.Dataset(data,coords=coordinates,attrs=attrs).transpose('chain','draw',...)
[docs]defstan_variable(self,var:str,inc_warmup:bool=False,)->np.ndarray:""" Return a numpy.ndarray which contains the set of draws for the named Stan program variable. Flattens the chains, leaving the draws in chain order. The first array dimension, corresponds to number of draws or post-warmup draws in the sample, per argument ``inc_warmup``. The remaining dimensions correspond to the shape of the Stan program variable. Underlyingly draws are in chain order, i.e., for a sample with N chains of M draws each, the first M array elements are from chain 1, the next M are from chain 2, and the last M elements are from chain N. * If the variable is a scalar variable, the return array has shape ( draws * chains, 1). * If the variable is a vector, the return array has shape ( draws * chains, len(vector)) * If the variable is a matrix, the return array has shape ( draws * chains, size(dim 1), size(dim 2) ) * If the variable is an array with N dimensions, the return array has shape ( draws * chains, size(dim 1), ..., size(dim N)) For example, if the Stan program variable ``theta`` is a 3x3 matrix, and the sample consists of 4 chains with 1000 post-warmup draws, this function will return a numpy.ndarray with shape (4000,3,3). This functionaltiy is also available via a shortcut using ``.`` - writing ``fit.a`` is a synonym for ``fit.stan_variable("a")`` :param var: variable name :param inc_warmup: When ``True`` and the warmup draws are present in the output, i.e., the sampler was run with ``save_warmup=True``, then the warmup draws are included. Default value is ``False``. See Also -------- CmdStanMCMC.stan_variables CmdStanMLE.stan_variable CmdStanPathfinder.stan_variable CmdStanVB.stan_variable CmdStanGQ.stan_variable CmdStanLaplace.stan_variable """try:draws=self.draws(inc_warmup=inc_warmup,concat_chains=True)out:np.ndarray=self._metadata.stan_vars[var].extract_reshape(draws)returnoutexceptKeyError:# pylint: disable=raise-missing-fromraiseValueError(f'Unknown variable name: {var}\n''Available variables are '+", ".join(self._metadata.stan_vars.keys()))
[docs]defstan_variables(self)->Dict[str,np.ndarray]:""" Return a dictionary mapping Stan program variables names to the corresponding numpy.ndarray containing the inferred values. See Also -------- CmdStanMCMC.stan_variable CmdStanMLE.stan_variables CmdStanPathfinder.stan_variables CmdStanVB.stan_variables CmdStanGQ.stan_variables CmdStanLaplace.stan_variables """result={}fornameinself._metadata.stan_vars:result[name]=self.stan_variable(name)returnresult
[docs]defmethod_variables(self)->Dict[str,np.ndarray]:""" Returns a dictionary of all sampler variables, i.e., all output column names ending in `__`. Assumes that all variables are scalar variables where column name is variable name. Maps each column name to a numpy.ndarray (draws x chains x 1) containing per-draw diagnostic values. """self._assemble_draws()return{name:var.extract_reshape(self._draws)forname,varinself._metadata.method_vars.items()}
[docs]defsave_csvfiles(self,dir:Optional[str]=None)->None:""" Move output CSV files to specified directory. If files were written to the temporary session directory, clean filename. E.g., save 'bernoulli-201912081451-1-5nm6as7u.csv' as 'bernoulli-201912081451-1.csv'. :param dir: directory path See Also -------- stanfit.RunSet.save_csvfiles cmdstanpy.from_csv """self.runset.save_csvfiles(dir)