"""
Makefile options for stanc and C++ compilers
"""
import io
import json
import os
import platform
import shutil
import subprocess
from copy import copy
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Union
from cmdstanpy.utils import get_logger
from cmdstanpy.utils.cmdstan import (
EXTENSION,
cmdstan_path,
cmdstan_version,
cmdstan_version_before,
)
from cmdstanpy.utils.command import do_command
from cmdstanpy.utils.filesystem import SanitizedOrTmpFilePath
STANC_OPTS = [
'O',
'O0',
'O1',
'Oexperimental',
'allow-undefined',
'use-opencl',
'warn-uninitialized',
'include-paths',
'name',
'warn-pedantic',
]
# TODO(2.0): remove
STANC_DEPRECATED_OPTS = {
'allow_undefined': 'allow-undefined',
'include_paths': 'include-paths',
}
STANC_IGNORE_OPTS = [
'debug-lex',
'debug-parse',
'debug-ast',
'debug-decorated-ast',
'debug-generate-data',
'debug-mir',
'debug-mir-pretty',
'debug-optimized-mir',
'debug-optimized-mir-pretty',
'debug-transformed-mir',
'debug-transformed-mir-pretty',
'dump-stan-math-signatures',
'auto-format',
'print-canonical',
'print-cpp',
'o',
'help',
'version',
]
OptionalPath = Union[str, os.PathLike, None]
# TODO(2.0): can remove add function and other logic
[docs]class CompilerOptions:
"""
User-specified flags for stanc and C++ compiler.
Attributes:
stanc_options - stanc compiler flags, options
cpp_options - makefile options (NAME=value)
user_header - path to a user .hpp file to include during compilation
"""
def __init__(
self,
*,
stanc_options: Optional[Dict[str, Any]] = None,
cpp_options: Optional[Dict[str, Any]] = None,
user_header: OptionalPath = None,
) -> None:
"""Initialize object."""
self._stanc_options = stanc_options if stanc_options is not None else {}
self._cpp_options = cpp_options if cpp_options is not None else {}
self._user_header = str(user_header) if user_header is not None else ''
def __repr__(self) -> str:
return 'stanc_options={}, cpp_options={}'.format(
self._stanc_options, self._cpp_options
)
def __eq__(self, other: Any) -> bool:
"""Overrides the default implementation"""
if self.is_empty() and other is None: # equiv w/r/t compiler
return True
if not isinstance(other, CompilerOptions):
return False
return (
self._stanc_options == other.stanc_options
and self._cpp_options == other.cpp_options
and self._user_header == other.user_header
)
[docs] def is_empty(self) -> bool:
"""True if no options specified."""
return (
self._stanc_options == {}
and self._cpp_options == {}
and self._user_header == ''
)
@property
def stanc_options(self) -> Dict[str, Union[bool, int, str, Iterable[str]]]:
"""Stanc compiler options."""
return self._stanc_options
@property
def cpp_options(self) -> Dict[str, Union[bool, int]]:
"""C++ compiler options."""
return self._cpp_options
@property
def user_header(self) -> str:
"""user header."""
return self._user_header
[docs] def validate(self) -> None:
"""
Check compiler args.
Raise ValueError if invalid options are found.
"""
self.validate_stanc_opts()
self.validate_cpp_opts()
self.validate_user_header()
[docs] def validate_stanc_opts(self) -> None:
"""
Check stanc compiler args and consistency between stanc and C++ options.
Raise ValueError if bad config is found.
"""
# pylint: disable=no-member
if self._stanc_options is None:
return
ignore = []
paths = None
has_o_flag = False
for deprecated, replacement in STANC_DEPRECATED_OPTS.items():
if deprecated in self._stanc_options:
if replacement:
get_logger().warning(
'compiler option "%s" is deprecated, use "%s" instead',
deprecated,
replacement,
)
self._stanc_options[replacement] = copy(
self._stanc_options[deprecated]
)
del self._stanc_options[deprecated]
else:
get_logger().warning(
'compiler option "%s" is deprecated and '
'should not be used',
deprecated,
)
for key, val in self._stanc_options.items():
if key in STANC_IGNORE_OPTS:
get_logger().info('ignoring compiler option: %s', key)
ignore.append(key)
elif key not in STANC_OPTS:
raise ValueError(f'unknown stanc compiler option: {key}')
elif key == 'include-paths':
paths = val
if isinstance(val, str):
paths = val.split(',')
elif not isinstance(val, list):
raise ValueError(
'Invalid include-paths, expecting list or '
f'string, found type: {type(val)}.'
)
elif key == 'use-opencl':
if self._cpp_options is None:
self._cpp_options = {'STAN_OPENCL': 'TRUE'}
else:
self._cpp_options['STAN_OPENCL'] = 'TRUE'
elif key.startswith('O'):
if has_o_flag:
get_logger().warning(
'More than one of (O, O1, O2, Oexperimental)'
'optimizations passed. Only the last one will'
'be used'
)
else:
has_o_flag = True
for opt in ignore:
del self._stanc_options[opt]
if paths is not None:
bad_paths = [dir for dir in paths if not os.path.exists(dir)]
if any(bad_paths):
raise ValueError(
'invalid include paths: {}'.format(', '.join(bad_paths))
)
self._stanc_options['include-paths'] = [
os.path.abspath(os.path.expanduser(path)) for path in paths
]
[docs] def validate_cpp_opts(self) -> None:
"""
Check cpp compiler args.
Raise ValueError if bad config is found.
"""
if self._cpp_options is None:
return
for key in ['OPENCL_DEVICE_ID', 'OPENCL_PLATFORM_ID']:
if key in self._cpp_options:
self._cpp_options['STAN_OPENCL'] = 'TRUE'
val = self._cpp_options[key]
if not isinstance(val, int) or val < 0:
raise ValueError(
f'{key} must be a non-negative integer value,'
f' found {val}.'
)
[docs] def add(self, new_opts: "CompilerOptions") -> None: # noqa: disable=Q000
"""Adds options to existing set of compiler options."""
if new_opts.stanc_options is not None:
if self._stanc_options is None:
self._stanc_options = new_opts.stanc_options
else:
for key, val in new_opts.stanc_options.items():
if key == 'include-paths':
if isinstance(val, Iterable) and not isinstance(
val, str
):
for path in val:
self.add_include_path(str(path))
else:
self.add_include_path(str(val))
else:
self._stanc_options[key] = val
if new_opts.cpp_options is not None:
for key, val in new_opts.cpp_options.items():
self._cpp_options[key] = val
if new_opts._user_header != '' and self._user_header == '':
self._user_header = new_opts._user_header
[docs] def add_include_path(self, path: str) -> None:
"""Adds include path to existing set of compiler options."""
path = os.path.abspath(os.path.expanduser(path))
if 'include-paths' not in self._stanc_options:
self._stanc_options['include-paths'] = [path]
elif path not in self._stanc_options['include-paths']:
self._stanc_options['include-paths'].append(path)
def compose_stanc(self, filename_in_msg: Optional[str]) -> List[str]:
opts = []
if filename_in_msg is not None:
opts.append(f'--filename-in-msg={filename_in_msg}')
if self._stanc_options is not None and len(self._stanc_options) > 0:
for key, val in self._stanc_options.items():
if key == 'include-paths':
opts.append(
'--include-paths='
+ ','.join(
(
Path(p).as_posix()
for p in self._stanc_options['include-paths']
)
)
)
elif key == 'name':
opts.append(f'--name={val}')
else:
opts.append(f'--{key}')
return opts
[docs] def compose(self, filename_in_msg: Optional[str] = None) -> List[str]:
"""
Format makefile options as list of strings.
Parameters
----------
filename_in_msg : str, optional
filename to be displayed in stanc3 error messages
(if different from actual filename on disk), by default None
"""
opts = [
'STANCFLAGS+=' + flag.replace(" ", "\\ ")
for flag in self.compose_stanc(filename_in_msg)
]
if self._cpp_options is not None and len(self._cpp_options) > 0:
for key, val in self._cpp_options.items():
opts.append(f'{key}={val}')
return opts
def src_info(
stan_file: str, compiler_options: CompilerOptions
) -> Dict[str, Any]:
"""
Get source info for Stan program file.
This function is used in the implementation of
:meth:`CmdStanModel.src_info`, and should not be called directly.
"""
cmd = (
[os.path.join(cmdstan_path(), 'bin', 'stanc' + EXTENSION)]
# handle include-paths, allow-undefined etc
+ compiler_options.compose_stanc(None)
+ ['--info', str(stan_file)]
)
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode:
raise ValueError(
f"Failed to get source info for Stan model "
f"'{stan_file}'. Console:\n{proc.stderr}"
)
result: Dict[str, Any] = json.loads(proc.stdout)
return result
[docs]def compile_stan_file(
src: Union[str, Path],
force: bool = False,
stanc_options: Optional[Dict[str, Any]] = None,
cpp_options: Optional[Dict[str, Any]] = None,
user_header: OptionalPath = None,
) -> str:
"""
Compile the given Stan program file. Translates the Stan code to
C++, then calls the C++ compiler.
By default, this function compares the timestamps on the source and
executable files; if the executable is newer than the source file, it
will not recompile the file, unless argument ``force`` is ``True``
or unless the compiler options have been changed.
:param src: Path to Stan program file.
:param force: When ``True``, always compile, even if the executable file
is newer than the source file. Used for Stan models which have
``#include`` directives in order to force recompilation when changes
are made to the included files.
:param stanc_options: Options for stanc compiler.
:param cpp_options: Options for C++ compiler.
:param user_header: A path to a header file to include during C++
compilation.
"""
src = Path(src).resolve()
if not src.exists():
raise ValueError(f'stan file does not exist: {src}')
compiler_options = CompilerOptions(
stanc_options=stanc_options,
cpp_options=cpp_options,
user_header=user_header,
)
compiler_options.validate()
exe_target = src.with_suffix(EXTENSION)
if exe_target.exists():
exe_time = os.path.getmtime(exe_target)
included_files = [src]
included_files.extend(
src_info(str(src), compiler_options).get('included_files', [])
)
out_of_date = any(
os.path.getmtime(included_file) > exe_time
for included_file in included_files
)
if not out_of_date and not force:
get_logger().debug('found newer exe file, not recompiling')
return str(exe_target)
compilation_failed = False
# if target path has spaces or special characters, use a copy in a
# temporary directory (GNU-Make constraint)
with SanitizedOrTmpFilePath(str(src)) as (stan_file, is_copied):
exe_file = os.path.splitext(stan_file)[0] + EXTENSION
hpp_file = os.path.splitext(exe_file)[0] + '.hpp'
if os.path.exists(hpp_file):
os.remove(hpp_file)
if os.path.exists(exe_file):
get_logger().debug('Removing %s', exe_file)
os.remove(exe_file)
get_logger().info(
'compiling stan file %s to exe file %s',
stan_file,
exe_target,
)
make = os.getenv(
'MAKE',
'make' if platform.system() != 'Windows' else 'mingw32-make',
)
cmd = [make]
cmd.extend(compiler_options.compose(filename_in_msg=src.name))
cmd.append(Path(exe_file).as_posix())
sout = io.StringIO()
try:
do_command(cmd=cmd, cwd=cmdstan_path(), fd_out=sout)
except RuntimeError as e:
sout.write(f'\n{str(e)}\n')
compilation_failed = True
finally:
console = sout.getvalue()
get_logger().debug('Console output:\n%s', console)
if not compilation_failed:
if is_copied:
shutil.copy(exe_file, exe_target)
get_logger().info('compiled model executable: %s', exe_target)
if 'Warning' in console:
lines = console.split('\n')
warnings = [x for x in lines if x.startswith('Warning')]
get_logger().warning(
'Stan compiler has produced %d warnings:',
len(warnings),
)
get_logger().warning(console)
if compilation_failed:
if 'PCH' in console or 'precompiled header' in console:
get_logger().warning(
"CmdStan's precompiled header (PCH) files "
"may need to be rebuilt."
"Please run cmdstanpy.rebuild_cmdstan().\n"
"If the issue persists please open a bug report"
)
raise ValueError(
f"Failed to compile Stan model '{src}'. " f"Console:\n{console}"
)
return str(exe_target)