跳转至

API 参考 - 工具函数 (Utilities)

工具函数 (Utilities)

工具函数 (Utilities) 提供了一系列用于配置处理、文件操作、日志记录和数据库交互的辅助函数。 请在下方的标签页中选择您感兴趣的特定模块。

Configuration utility functions for tricys.

analysis_prepare_config(config_path)

Loads, validates, and prepares the configuration from the given path.

Source code in tricys/utils/config_utils.py
def analysis_prepare_config(config_path: str) -> tuple[Dict[str, Any], Dict[str, Any]]:
    """Loads, validates, and prepares the configuration from the given path."""
    try:
        config_path = os.path.abspath(config_path)
        with open(config_path, "r") as f:
            base_config = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError) as e:
        print(
            f"ERROR: Failed to load or parse config file {config_path}: {e}",
            file=sys.stderr,
        )
        sys.exit(1)

    original_config_dir = os.path.dirname(config_path)
    absolute_config = convert_relative_paths_to_absolute(
        base_config, original_config_dir
    )

    # Perform all validation on the config with absolute paths
    analysis_validate_config(absolute_config)

    config = json.loads(json.dumps(absolute_config))
    config["run_timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")
    run_workspace = os.path.abspath(config["run_timestamp"])

    config["paths"]["log_dir"] = run_workspace
    if "paths" not in config:
        config["paths"] = {}

    original_string = config["simulation"]["variableFilter"]
    config["simulation"]["variableFilter"] = original_string.replace(
        "[", "\\[["
    ).replace("]", "]\\]")

    return config, base_config

analysis_setup_analysis_cases_workspaces(config)

Set up independent working directories and configuration files for multiple analysis_cases

This function will: 1. Create independent working directories for each analysis_case in the current working directory 2. Convert relative paths in the original configuration to absolute paths 3. Convert analysis_cases format to standard analysis_case format 4. Generate independent config.json files for each case

Parameters:

Name Type Description Default
config Dict[str, Any]

Original configuration dictionary containing analysis_cases

required

Returns:

Type Description
List[Dict[str, Any]]

List containing information for each case, each element contains:

List[Dict[str, Any]]
  • index: Case index
List[Dict[str, Any]]
  • workspace: Working directory path
List[Dict[str, Any]]
  • config_path: Configuration file path
List[Dict[str, Any]]
  • config: Configuration applicable to this case
List[Dict[str, Any]]
  • case_data: Original case data
Source code in tricys/utils/config_utils.py
def analysis_setup_analysis_cases_workspaces(
    config: Dict[str, Any],
) -> List[Dict[str, Any]]:
    """
    Set up independent working directories and configuration files for multiple analysis_cases

    This function will:
    1. Create independent working directories for each analysis_case in the current working directory
    2. Convert relative paths in the original configuration to absolute paths
    3. Convert analysis_cases format to standard analysis_case format
    4. Generate independent config.json files for each case

    Args:
        config: Original configuration dictionary containing analysis_cases

    Returns:
        List containing information for each case, each element contains:
        - index: Case index
        - workspace: Working directory path
        - config_path: Configuration file path
        - config: Configuration applicable to this case
        - case_data: Original case data
    """

    analysis_cases_raw = config["sensitivity_analysis"]["analysis_cases"]

    # Unified processing into list format
    if isinstance(analysis_cases_raw, dict):
        # Single analysis_case object
        analysis_cases = [analysis_cases_raw]
        logger.info(
            "Detected single analysis_case object, converting to list format for processing"
        )
    else:
        # Already in list format
        analysis_cases = analysis_cases_raw

    # The main run workspace is the timestamped directory, already created by initialize_run.
    # We will create the case workspaces inside it.
    run_workspace = os.path.abspath(config["run_timestamp"])

    # Determine the main log file path to be shared with all cases
    main_log_file_name = f"simulation_{config['run_timestamp']}.log"
    main_log_path = os.path.join(run_workspace, main_log_file_name)

    logger.info(
        f"Detected {len(analysis_cases)} analysis cases, creating independent workspaces inside: {run_workspace}"
    )

    case_configs = []

    for i, analysis_case in enumerate(analysis_cases):
        try:
            # Generate case working directory name
            workspace_name = analysis_case.get("name", f"case_{i}")
            # Create the case workspace directly inside the main run workspace
            case_workspace = os.path.join(run_workspace, workspace_name)
            os.makedirs(case_workspace, exist_ok=True)

            # Create standard configuration (inlined from _create_standard_config_for_case)
            base_config = config
            original_config_dir = os.path.dirname(
                base_config.get("paths", {}).get("package_path", os.getcwd())
            )
            absolute_config = convert_relative_paths_to_absolute(
                base_config, original_config_dir
            )
            standard_config = json.loads(json.dumps(absolute_config))

            # if analysis_case.get("name") == "SALib_Analysis":
            if isinstance(
                analysis_case.get("independent_variable"), list
            ) and isinstance(analysis_case.get("independent_variable_sampling"), dict):
                sensitivity_analysis = standard_config["sensitivity_analysis"]
                if "analysis_cases" in sensitivity_analysis:
                    del sensitivity_analysis["analysis_cases"]
                sensitivity_analysis["analysis_case"] = analysis_case.copy()
            else:
                # Get independent variable and sampling from the current analysis case
                independent_var = analysis_case["independent_variable"]
                independent_sampling = analysis_case["independent_variable_sampling"]
                logger.debug(
                    f"independent_sampling configuration: {independent_sampling}"
                )

                # Ensure simulation_parameters exists at the top level
                if "simulation_parameters" not in standard_config:
                    standard_config["simulation_parameters"] = {}

                # If the specific analysis_case has its own simulation_parameters, merge them into the top-level ones
                # This allows for case-specific parameter overrides or additions
                if "simulation_parameters" in analysis_case:
                    case_sim_params = analysis_case.get("simulation_parameters", {})

                    # Identify and handle virtual parameters (e.g., Required_TBR) used for metric configuration
                    virtual_params = {
                        k: v
                        for k, v in case_sim_params.items()
                        if k.startswith("Required_") and isinstance(v, dict)
                    }

                    if virtual_params:
                        # Merge virtual parameter config into the case's metrics_definition
                        metrics_def = standard_config.setdefault(
                            "sensitivity_analysis", {}
                        ).setdefault("metrics_definition", {})
                        for key, value in virtual_params.items():
                            if key in metrics_def:
                                metrics_def[key].update(value)
                            else:
                                metrics_def[key] = value

                    # Get real parameters by excluding virtual ones
                    real_params = {
                        k: v
                        for k, v in case_sim_params.items()
                        if k not in virtual_params
                    }

                    # Update standard_config's simulation_parameters with only real parameters for job generation
                    standard_config["simulation_parameters"].update(real_params)

                # Fetch default values for both independent and simulation parameters
                omc = None
                try:
                    # Get all sim params from the case, which may include virtual parameters
                    all_case_sim_params = analysis_case.get("simulation_parameters", {})
                    # Filter out virtual parameters before fetching default values
                    sim_param_keys = [
                        k
                        for k, v in all_case_sim_params.items()
                        if not (k.startswith("Required_") and isinstance(v, dict))
                    ]
                    # Ensure independent_var is a list for consistent processing, as it can be a list in SALib cases
                    ind_param_keys = (
                        [independent_var]
                        if isinstance(independent_var, str)
                        else independent_var
                    )

                    param_keys_to_fetch = sim_param_keys + ind_param_keys

                    if param_keys_to_fetch:
                        logger.info(
                            f"Fetching default values for parameters: {param_keys_to_fetch}"
                        )
                        omc = get_om_session()
                        if load_modelica_package(
                            omc,
                            Path(standard_config["paths"]["package_path"]).as_posix(),
                        ):
                            all_defaults = get_model_default_parameters(
                                omc, standard_config["simulation"]["model_name"]
                            )

                            # Helper function to handle array access like 'param[1]'
                            def get_specific_default(key, defaults):
                                if key in defaults:
                                    return defaults[key]
                                if "[" in key and key.endswith("]"):
                                    try:
                                        base_name, index_str = key.rsplit("[", 1)
                                        # Modelica is 1-based, Python is 0-based
                                        index = int(index_str[:-1]) - 1
                                        if base_name in defaults:
                                            default_array = defaults[base_name]
                                            if isinstance(
                                                default_array, list
                                            ) and 0 <= index < len(default_array):
                                                return default_array[index]
                                    except (ValueError, IndexError):
                                        pass  # Malformed index or out of bounds
                                return "N/A"

                            # Get defaults for simulation_parameters
                            default_sim_values = {
                                p: get_specific_default(p, all_defaults)
                                for p in sim_param_keys
                            }
                            analysis_case["default_simulation_values"] = (
                                default_sim_values
                            )

                            # Get defaults for independent_variable
                            default_ind_values = {
                                p: get_specific_default(p, all_defaults)
                                for p in ind_param_keys
                            }
                            analysis_case["default_independent_values"] = (
                                default_ind_values
                            )

                except Exception as e:
                    logger.warning(
                        f"Could not fetch default parameter values. Defaults will be empty. Error: {e}"
                    )
                    analysis_case["default_simulation_values"] = {}
                    analysis_case["default_independent_values"] = {}
                finally:
                    if omc:
                        omc.sendExpression("quit()")

                # Add the primary independent_variable_sampling for the current analysis case
                standard_config["simulation_parameters"][
                    independent_var
                ] = independent_sampling

                # Update sensitivity_analysis configuration
                sensitivity_analysis = standard_config["sensitivity_analysis"]

                # Remove analysis_cases and replace with single analysis_case
                if "analysis_cases" in sensitivity_analysis:
                    del sensitivity_analysis["analysis_cases"]

                sensitivity_analysis["analysis_case"] = analysis_case.copy()

            # Update paths in configuration to be relative to case working directory
            case_config = standard_config.copy()
            case_config["paths"]["results_dir"] = os.path.join(
                case_workspace, "results"
            )
            case_config["paths"]["temp_dir"] = os.path.join(case_workspace, "temp")
            case_config["paths"]["db_path"] = os.path.join(
                case_workspace, "data", "parameters.db"
            )

            # If there's logging configuration, also update log directory
            if "paths" in case_config and "log_dir" in case_config["paths"]:
                case_config["paths"]["log_dir"] = os.path.join(case_workspace, "log")
                # Inject the main log path for dual logging
                if "logging" in case_config:
                    case_config["logging"]["main_log_path"] = main_log_path

            # Save standard configuration file to case working directory
            config_file_path = os.path.join(case_workspace, "config.json")
            with open(config_file_path, "w", encoding="utf-8") as f:
                json.dump(standard_config, f, indent=4, ensure_ascii=False)

            # Record case information
            case_info = {
                "index": i,
                "workspace": case_workspace,
                "config_path": config_file_path,
                "config": case_config,
                "case_data": analysis_case,
            }
            case_configs.append(case_info)

            logger.info(
                f"Workspace for case {i+1} created successfully",
                extra={
                    "case_index": i,
                    "case_name": analysis_case.get("name", f"case_{i}"),
                    "workspace": case_workspace,
                    "config_path": config_file_path,
                },
            )

        except Exception as e:
            logger.error(f"✗ Error processing case {i}: {e}", exc_info=True)
            continue

    logger.info(
        f"Successfully created independent working directories for {len(case_configs)} analysis cases"
    )
    return case_configs

analysis_validate_analysis_cases_config(config)

Validates analysis_cases configuration format supporting both list and single object.

This function validates: 1. Basic structure and required fields of analysis_cases 2. Simulation parameters compatibility (single job requirement) 3. Required_TBR configuration completeness if used in dependent_variables

Parameters:

Name Type Description Default
config Dict[str, Any]

Configuration dictionary to validate.

required

Returns:

Type Description
bool

True if configuration is valid, False otherwise.

Note

Supports both single analysis_case dict or list of cases. Required fields per case: name, independent_variable, independent_variable_sampling. Validates simulation_parameters contain only single job (no sweep). Checks Required_TBR completeness in metrics_definition.

Source code in tricys/utils/config_utils.py
def analysis_validate_analysis_cases_config(config: Dict[str, Any]) -> bool:
    """Validates analysis_cases configuration format supporting both list and single object.

    This function validates:
    1. Basic structure and required fields of analysis_cases
    2. Simulation parameters compatibility (single job requirement)
    3. Required_TBR configuration completeness if used in dependent_variables

    Args:
        config: Configuration dictionary to validate.

    Returns:
        True if configuration is valid, False otherwise.

    Note:
        Supports both single analysis_case dict or list of cases. Required fields per case:
        name, independent_variable, independent_variable_sampling. Validates simulation_parameters
        contain only single job (no sweep). Checks Required_TBR completeness in metrics_definition.
    """
    if "sensitivity_analysis" not in config:
        logger.error("Missing sensitivity_analysis")
        return False

    sensitivity_analysis = config["sensitivity_analysis"]
    if "analysis_cases" not in sensitivity_analysis:
        logger.error("Missing analysis_cases")
        return False

    analysis_cases = sensitivity_analysis["analysis_cases"]

    # Support both single object and list formats
    if isinstance(analysis_cases, dict):
        # Single analysis_case object
        cases_to_check = [analysis_cases]
    elif isinstance(analysis_cases, list) and len(analysis_cases) > 0:
        # analysis_cases list
        cases_to_check = analysis_cases
    else:
        logger.error("analysis_cases must be a non-empty list or a single object")
        return False

    # Check required fields for each analysis_case
    required_fields = ["name", "independent_variable", "independent_variable_sampling"]
    for i, case in enumerate(cases_to_check):
        if not isinstance(case, dict):
            logger.error(f"analysis_cases[{i}] must be an object")
            return False
        for field in required_fields:
            if field not in case:
                logger.error(f"Missing required field '{field}' in analysis_cases[{i}]")
                return False

    # Check if top-level simulation_parameters are used, which is disallowed in analysis_cases mode
    if config.get("simulation_parameters"):
        logger.error(
            "The top-level 'simulation_parameters' field cannot be used when 'analysis_cases' is defined. "
            "Please move any shared or case-specific parameters into the 'simulation_parameters' field "
            "inside each object within the 'analysis_cases' list."
        )
        return False

    # Check Required_TBR configuration completeness if it exists in dependent_variables
    metrics_definition = sensitivity_analysis.get("metrics_definition", {})
    for i, case in enumerate(cases_to_check):
        dependent_vars = case.get("dependent_variables", [])
        if "Required_TBR" in dependent_vars:
            # Check if Required_TBR exists in metrics_definition
            if "Required_TBR" not in metrics_definition:
                logger.error(
                    f"Required_TBR is in dependent_variables of analysis_cases[{i}] but missing from metrics_definition"
                )
                return False

            # Check if Required_TBR configuration is complete
            required_tbr_config = metrics_definition["Required_TBR"]
            required_fields = [
                "method",
                "parameter_to_optimize",
                "search_range",
                "tolerance",
                "max_iterations",
            ]
            missing_fields = [
                field for field in required_fields if field not in required_tbr_config
            ]
            if missing_fields:
                logger.error(
                    f"Required_TBR configuration in metrics_definition is incomplete. Missing fields: {missing_fields}"
                )
                return False

    return True

analysis_validate_config(config, required_keys=ANALYSIS_REQUIRED_CONFIG_KEYS, parent_key='')

Recursively validates the configuration's structure and values.

Source code in tricys/utils/config_utils.py
def analysis_validate_config(
    config: Dict[str, Any],
    required_keys: Dict = ANALYSIS_REQUIRED_CONFIG_KEYS,
    parent_key: str = "",
) -> None:
    """
    Recursively validates the configuration's structure and values.
    """
    # --- Structural Validation ---
    for key, expected in required_keys.items():
        full_key_path = f"{parent_key}.{key}" if parent_key else key
        if key not in config:
            print(
                f"ERROR: Missing required configuration key: '{full_key_path}'",
                file=sys.stderr,
            )
            sys.exit(1)

        if isinstance(expected, dict):
            if not isinstance(config[key], dict):
                print(
                    f"ERROR: Configuration key '{full_key_path}' must be a dictionary.",
                    file=sys.stderr,
                )
                sys.exit(1)
            analysis_validate_config(config[key], expected, parent_key=full_key_path)
        elif not isinstance(config[key], expected):
            print(
                f"ERROR: Configuration key '{full_key_path}' has incorrect type. Expected {expected}, got {type(config[key])}.",
                file=sys.stderr,
            )
            sys.exit(1)

    # 2. Validate variableFilter format
    variable_filter = config.get("simulation", {}).get("variableFilter")
    if variable_filter:
        # Regex for a valid Modelica identifier (simplified)
        ident = r"[a-zA-Z_][a-zA-Z0-9_]*"
        # Regex for a valid substring in the filter:
        # - time
        # - class.name
        # - class.name[index]
        # - class.name[start-end]
        valid_substring_re = re.compile(rf"^time$|^{ident}\.{ident}(\[\d+(-\d+)?\])?$")

        substrings = variable_filter.split("|")
        for sub in substrings:
            if not valid_substring_re.match(sub):
                print(
                    f"ERROR: Invalid format in 'simulation.variableFilter'. Substring '{sub}' does not match required format. "
                    f"Valid formats are 'time', 'classname.typename', 'classname.typename[1]', or 'classname.typename[1-5]'.",
                    file=sys.stderr,
                )
                sys.exit(1)

    # --- Value and Conditional Validation (only on top-level call) ---
    if not parent_key:
        # Check for package_path existence
        package_path = config.get("paths", {}).get("package_path")
        if package_path and not os.path.exists(package_path):
            print(
                f"ERROR: File specified in 'paths.package_path' not found: {package_path}",
                file=sys.stderr,
            )
            sys.exit(1)

        # Analysis-specific validation
        sa_config = config.get("sensitivity_analysis", {})
        if sa_config.get("enabled", False):
            has_sim_params = (
                "simulation_parameters" in config and config["simulation_parameters"]
            )
            has_analysis_cases = (
                "analysis_cases" in sa_config and sa_config["analysis_cases"]
            )

            if not has_sim_params and not has_analysis_cases:
                print(
                    "ERROR: When 'sensitivity_analysis' is enabled, either 'simulation_parameters' or 'sensitivity_analysis.analysis_cases' must be defined.",
                    file=sys.stderr,
                )
                sys.exit(1)

            if has_analysis_cases:
                if not analysis_validate_analysis_cases_config(config):
                    # The original function uses a logger which is not yet configured.
                    # Add a print statement to ensure the user sees an error.
                    print(
                        "ERROR: 'analysis_cases' configuration is invalid. See previous logs for details.",
                        file=sys.stderr,
                    )
                    sys.exit(1)

        check_ai_config(config)

basic_prepare_config(config_path)

Loads and prepares the configuration from the given path.

Parameters:

Name Type Description Default
config_path str

Path to the JSON configuration file.

required

Returns:

Type Description
tuple[Dict[str, Any], Dict[str, Any]]

A tuple of (runtime_config, original_config).

Raises:

Type Description
SystemExit

If config file loading/parsing fails or validation fails.

Note

Converts relative paths to absolute, validates config structure, adds run_timestamp, creates workspace directories, and processes variableFilter for regex escaping. Sets up log_dir, temp_dir, and results_dir within run workspace.

Source code in tricys/utils/config_utils.py
def basic_prepare_config(config_path: str) -> tuple[Dict[str, Any], Dict[str, Any]]:
    """Loads and prepares the configuration from the given path.

    Args:
        config_path: Path to the JSON configuration file.

    Returns:
        A tuple of (runtime_config, original_config).

    Raises:
        SystemExit: If config file loading/parsing fails or validation fails.

    Note:
        Converts relative paths to absolute, validates config structure, adds run_timestamp,
        creates workspace directories, and processes variableFilter for regex escaping.
        Sets up log_dir, temp_dir, and results_dir within run workspace.
    """
    try:
        config_path = os.path.abspath(config_path)
        with open(config_path, "r") as f:
            base_config = json.load(f)
    except (FileNotFoundError, json.JSONDecodeError) as e:
        # Logger is not set up yet, so print directly to stderr
        print(
            f"ERROR: Failed to load or parse config file {config_path}: {e}",
            file=sys.stderr,
        )
        sys.exit(1)

    original_config_dir = os.path.dirname(config_path)

    absolute_config = convert_relative_paths_to_absolute(
        base_config, original_config_dir
    )

    # Perform all validation on the config with absolute paths
    basic_validate_config(absolute_config)

    config = json.loads(json.dumps(absolute_config))
    config["run_timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S")

    run_workspace = os.path.abspath(config["run_timestamp"])

    if "paths" not in config:
        config["paths"] = {}

    original_string = config["simulation"]["variableFilter"]
    config["simulation"]["variableFilter"] = original_string.replace(
        "[", "\\[["
    ).replace("]", "]\\]")

    config["paths"]["log_dir"] = os.path.join(
        run_workspace, base_config["paths"].get("log_dir", "log")
    )
    config["paths"]["temp_dir"] = os.path.join(
        run_workspace, base_config["paths"].get("temp_dir", "temp")
    )
    config["paths"]["results_dir"] = os.path.join(
        run_workspace, base_config["paths"].get("results_dir", "results")
    )

    os.makedirs(config["paths"]["log_dir"], exist_ok=True)
    os.makedirs(config["paths"]["temp_dir"], exist_ok=True)
    os.makedirs(config["paths"]["results_dir"], exist_ok=True)

    return config, base_config

basic_validate_config(config, required_keys=BASIC_REQUIRED_CONFIG_KEYS, parent_key='')

Recursively validates the configuration against required structure.

Parameters:

Name Type Description Default
config Dict[str, Any]

Configuration dictionary to validate.

required
required_keys Dict

Dictionary defining required keys and their expected types.

BASIC_REQUIRED_CONFIG_KEYS
parent_key str

Parent key path for nested validation (used internally).

''

Raises:

Type Description
SystemExit

If validation fails (exits with code 1).

Note

Performs structural validation (required keys and types) and value validation (path existence, variableFilter format). Uses regex to validate variableFilter against Modelica identifier patterns. Only validates values on top-level call.

Source code in tricys/utils/config_utils.py
def basic_validate_config(
    config: Dict[str, Any],
    required_keys: Dict = BASIC_REQUIRED_CONFIG_KEYS,
    parent_key: str = "",
) -> None:
    """Recursively validates the configuration against required structure.

    Args:
        config: Configuration dictionary to validate.
        required_keys: Dictionary defining required keys and their expected types.
        parent_key: Parent key path for nested validation (used internally).

    Raises:
        SystemExit: If validation fails (exits with code 1).

    Note:
        Performs structural validation (required keys and types) and value validation
        (path existence, variableFilter format). Uses regex to validate variableFilter
        against Modelica identifier patterns. Only validates values on top-level call.
    """
    # --- Structural Validation ---
    for key, expected_type_or_dict in required_keys.items():
        full_key_path = f"{parent_key}.{key}" if parent_key else key

        if key not in config:
            print(
                f"ERROR: Missing required configuration key: '{full_key_path}'",
                file=sys.stderr,
            )
            sys.exit(1)

        if isinstance(expected_type_or_dict, dict):
            if not isinstance(config[key], dict):
                print(
                    f"ERROR: Configuration key '{full_key_path}' must be a dictionary.",
                    file=sys.stderr,
                )
                sys.exit(1)
            # Recurse for nested dictionaries
            basic_validate_config(
                config[key], expected_type_or_dict, parent_key=full_key_path
            )
        else:
            # Perform type checking for leaf keys
            if not isinstance(config[key], expected_type_or_dict):
                print(
                    f"ERROR: Configuration key '{full_key_path}' has incorrect type. "
                    f"Expected {expected_type_or_dict}, but got {type(config[key])}.",
                    file=sys.stderr,
                )
                sys.exit(1)

    # --- Value Validation (only on top-level call) ---
    if not parent_key:
        # 1. Check if package_path exists
        package_path = config.get("paths", {}).get("package_path")
        if package_path and not os.path.exists(package_path):
            print(
                f"ERROR: File specified in 'paths.package_path' not found: {package_path}",
                file=sys.stderr,
            )
            sys.exit(1)

        # 2. Validate variableFilter format
        variable_filter = config.get("simulation", {}).get("variableFilter")
        if variable_filter:
            # Regex for a valid Modelica identifier (simplified)
            ident = r"[a-zA-Z_][a-zA-Z0-9_]*"
            # Regex for a valid substring in the filter:
            # - time
            # - class.name
            # - class.name[index]
            # - class.name[start-end]
            valid_substring_re = re.compile(
                rf"^time$|^{ident}\.{ident}(\[\d+(-\d+)?\])?$"
            )

            substrings = variable_filter.split("|")
            for sub in substrings:
                if not valid_substring_re.match(sub):
                    print(
                        f"ERROR: Invalid format in 'simulation.variableFilter'. Substring '{sub}' does not match required format. "
                        f"Valid formats are 'time', 'classname.typename', 'classname.typename[1]', or 'classname.typename[1-5]'.",
                        file=sys.stderr,
                    )
                    sys.exit(1)

        check_ai_config(config)

check_ai_config(config)

Checks for AI-related environment variables if 'ai: true' is found in the config.

Parameters:

Name Type Description Default
config Dict[str, Any]

The configuration dictionary.

required

Raises:

Type Description
SystemExit

If AI is enabled in the config but required environment variables are missing.

Note

If any part of the configuration contains "ai": true, this function verifies that API_KEY, BASE_URL, and either AI_MODEL or AI_MODELS are set as environment variables.

Source code in tricys/utils/config_utils.py
def check_ai_config(config: Dict[str, Any]) -> None:
    """
    Checks for AI-related environment variables if 'ai: true' is found in the config.

    Args:
        config: The configuration dictionary.

    Raises:
        SystemExit: If AI is enabled in the config but required environment
                    variables are missing.

    Note:
        If any part of the configuration contains `"ai": true`, this function verifies
        that `API_KEY`, `BASE_URL`, and either `AI_MODEL` or `AI_MODELS` are set as
        environment variables.
    """
    if _search_dict(config, "ai", True):
        logger.info(
            "AI feature enabled in config, checking for required environment variables..."
        )
        load_dotenv()
        api_key = os.environ.get("API_KEY")
        base_url = os.environ.get("BASE_URL")
        ai_model = os.environ.get("AI_MODEL")
        ai_models = os.environ.get("AI_MODELS")

        missing_vars = []
        if not api_key:
            missing_vars.append("API_KEY")
        if not base_url:
            missing_vars.append("BASE_URL")
        if not ai_model and not ai_models:
            missing_vars.append("AI_MODEL or AI_MODELS")

        if missing_vars:
            print(
                f"ERROR: 'ai: true' is set in the configuration, but the following required environment variables are missing: {', '.join(missing_vars)}. "
                "Please set them in your environment or a .env file.",
                file=sys.stderr,
            )
            sys.exit(1)
        else:
            logger.info("All required AI environment variables are present.")

convert_relative_paths_to_absolute(config, base_dir)

Recursively converts relative paths to absolute paths in configuration.

Parameters:

Name Type Description Default
config Dict[str, Any]

Configuration dictionary to process.

required
base_dir str

Base directory path for resolving relative paths.

required

Returns:

Type Description
Dict[str, Any]

Configuration dictionary with converted absolute paths.

Note

Processes path keys including package_path, db_path, results_dir, temp_dir, log_dir, glossary_path, and any key ending with '_path'. Converts relative paths to absolute using base_dir. Handles nested dictionaries and lists recursively.

Source code in tricys/utils/config_utils.py
def convert_relative_paths_to_absolute(
    config: Dict[str, Any], base_dir: str
) -> Dict[str, Any]:
    """Recursively converts relative paths to absolute paths in configuration.

    Args:
        config: Configuration dictionary to process.
        base_dir: Base directory path for resolving relative paths.

    Returns:
        Configuration dictionary with converted absolute paths.

    Note:
        Processes path keys including package_path, db_path, results_dir, temp_dir,
        log_dir, glossary_path, and any key ending with '_path'. Converts relative
        paths to absolute using base_dir. Handles nested dictionaries and lists recursively.
    """

    def _process_value(value, key_name="", parent_dict=None):
        if isinstance(value, dict):
            return {k: _process_value(v, k, value) for k, v in value.items()}
        elif isinstance(value, list):
            return [_process_value(item, parent_dict=parent_dict) for item in value]
        elif isinstance(value, str):
            # Check if it's a path-related key name (extended support for more path fields)
            path_keys = [
                "package_path",
                "db_path",
                "results_dir",
                "temp_dir",
                "log_dir",
                "glossary_path",
            ]

            if key_name.endswith("_path") or key_name in path_keys:
                # If it's a relative path, convert to absolute path
                if not os.path.isabs(value) and value:
                    abs_path = os.path.abspath(os.path.join(base_dir, value))
                    logger.debug(
                        "Converted path",
                        extra={
                            "key_name": key_name,
                            "original_value": value,
                            "absolute_path": abs_path,
                        },
                    )
                    return abs_path
            return value
        else:
            return value

    return _process_value(config)

Utility functions for file and directory management.

This module provides helper functions for creating unique filenames and managing log file rotation.

archive_run(timestamp)

Archives a run (simulation or analysis) based on its configuration.

Parameters:

Name Type Description Default
timestamp str

The timestamp directory name of the run to archive.

required
Note

Determines run type (analysis vs simulation) from configuration. Delegates to _archive_run() with appropriate run_type. Extracts configuration from log files using restore_configs_from_log().

Source code in tricys/utils/file_utils.py
def archive_run(timestamp: str) -> None:
    """Archives a run (simulation or analysis) based on its configuration.

    Args:
        timestamp: The timestamp directory name of the run to archive.

    Note:
        Determines run type (analysis vs simulation) from configuration. Delegates
        to _archive_run() with appropriate run_type. Extracts configuration from
        log files using restore_configs_from_log().
    """

    configs = restore_configs_from_log(timestamp)
    if not configs:
        return
    runtime_config, original_config = configs
    logger.info("Successfully extracted both runtime and original configurations.")

    is_analysis = "sensitivity_analysis" in original_config and original_config.get(
        "sensitivity_analysis", {}
    ).get("enabled", False)

    if is_analysis:
        _archive_run(timestamp, "analysis")
    else:
        _archive_run(timestamp, "simulation")

get_unique_filename(base_path, filename)

Generates a unique filename by appending a counter if the file already exists.

Parameters:

Name Type Description Default
base_path str

The directory path where the file will be saved.

required
filename str

The desired filename, including the extension.

required

Returns:

Type Description
str

A unique, non-existing file path.

Note

Appends _1, _2, etc. before the extension until a non-existing filename is found. Example: if "data.csv" exists, returns "data_1.csv", then "data_2.csv", etc.

Source code in tricys/utils/file_utils.py
def get_unique_filename(base_path: str, filename: str) -> str:
    """Generates a unique filename by appending a counter if the file already exists.

    Args:
        base_path: The directory path where the file will be saved.
        filename: The desired filename, including the extension.

    Returns:
        A unique, non-existing file path.

    Note:
        Appends _1, _2, etc. before the extension until a non-existing filename is found.
        Example: if "data.csv" exists, returns "data_1.csv", then "data_2.csv", etc.
    """
    base_name, ext = os.path.splitext(filename)
    counter = 0
    new_filename = filename
    new_filepath = os.path.join(base_path, new_filename)

    while os.path.exists(new_filepath):
        counter += 1
        new_filename = f"{base_name}_{counter}{ext}"
        new_filepath = os.path.join(base_path, new_filename)

    return new_filepath

unarchive_run(zip_file)

Unarchives a simulation run from a zip file.

Parameters:

Name Type Description Default
zip_file str

Path to the zip file to extract.

required

Raises:

Type Description
SystemExit

If zip file not found or extraction fails.

Note

Extracts to current directory if empty, otherwise creates new directory named after the zip file. Sets up basic logging for the unarchive process. Handles BadZipFile exceptions gracefully.

Source code in tricys/utils/file_utils.py
def unarchive_run(zip_file: str) -> None:
    """Unarchives a simulation run from a zip file.

    Args:
        zip_file: Path to the zip file to extract.

    Raises:
        SystemExit: If zip file not found or extraction fails.

    Note:
        Extracts to current directory if empty, otherwise creates new directory
        named after the zip file. Sets up basic logging for the unarchive process.
        Handles BadZipFile exceptions gracefully.
    """
    # Basic logging setup for unarchive command
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        stream=sys.stdout,
    )
    logger = logging.getLogger(__name__)

    if not os.path.isfile(zip_file):
        logger.error(f"Archive file not found: {zip_file}")
        sys.exit(1)

    target_dir = "."
    if os.listdir("."):  # If the list of CWD contents is not empty
        dir_name = os.path.splitext(os.path.basename(zip_file))[0]
        target_dir = dir_name
        logger.info(
            f"Current directory is not empty. Extracting to new directory: {target_dir}"
        )
        os.makedirs(target_dir, exist_ok=True)
    else:
        logger.info("Current directory is empty. Extracting to current directory.")

    # Unzip the file
    try:
        with zipfile.ZipFile(zip_file, "r") as zip_ref:
            zip_ref.extractall(target_dir)
        logger.info(
            f"Successfully unarchived '{zip_file}' to '{os.path.abspath(target_dir)}'"
        )
    except zipfile.BadZipFile:
        logger.error(f"Error: '{zip_file}' is not a valid zip file.")
        sys.exit(1)
    except Exception as e:
        logger.error(f"An error occurred during unarchiving: {e}")
        sys.exit(1)

delete_old_logs(log_path, max_files)

Deletes the oldest log files in a directory to meet a specified limit.

Checks the number of .log files in the given directory and removes the oldest ones based on modification time until the file count matches the max_files limit.

Parameters:

Name Type Description Default
log_path str

The path to the directory containing log files.

required
max_files int

The maximum number of .log files to retain.

required
Note

Only processes files with .log extension. Sorts by modification time (oldest first) before deletion. Does nothing if current count <= max_files.

Source code in tricys/utils/log_utils.py
def delete_old_logs(log_path: str, max_files: int) -> None:
    """Deletes the oldest log files in a directory to meet a specified limit.

    Checks the number of `.log` files in the given directory and removes the
    oldest ones based on modification time until the file count matches the
    `max_files` limit.

    Args:
        log_path: The path to the directory containing log files.
        max_files: The maximum number of `.log` files to retain.

    Note:
        Only processes files with .log extension. Sorts by modification time
        (oldest first) before deletion. Does nothing if current count <= max_files.
    """
    log_files = [
        os.path.join(log_path, f) for f in os.listdir(log_path) if f.endswith(".log")
    ]

    if len(log_files) > max_files:
        # Sort by modification time, oldest first
        log_files.sort(key=os.path.getmtime)

        # Calculate how many files to delete
        files_to_delete_count = len(log_files) - max_files

        # Delete the oldest files
        for i in range(files_to_delete_count):
            os.remove(log_files[i])

log_execution_time(func)

A decorator to log the execution time of a function.

Parameters:

Name Type Description Default
func Callable

The function to be decorated.

required

Returns:

Type Description
Callable

The wrapped function that logs execution time.

Note

Measures execution time using time.perf_counter(). Logs function name, module, and duration in milliseconds. Uses structured logging with extra fields.

Source code in tricys/utils/log_utils.py
def log_execution_time(func: Callable) -> Callable:
    """A decorator to log the execution time of a function.

    Args:
        func: The function to be decorated.

    Returns:
        The wrapped function that logs execution time.

    Note:
        Measures execution time using time.perf_counter(). Logs function name,
        module, and duration in milliseconds. Uses structured logging with extra fields.
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        duration_ms = (end_time - start_time) * 1000

        logger.info(
            "Function executed",
            extra={
                "function_name": func.__name__,
                "function_module": func.__module__,
                "duration_ms": round(duration_ms, 2),
            },
        )
        return result

    return wrapper

restore_configs_from_log(timestamp)

Finds the log file for a given timestamp and restores configurations.

Parameters:

Name Type Description Default
timestamp str

The timestamp directory name to search for log files.

required

Returns:

Type Description
tuple[Dict[str, Any] | None, Dict[str, Any] | None]

A tuple of (runtime_config, original_config) or (None, None) if not found.

Note

Searches in timestamp/simulation_{timestamp}.log and timestamp/log/ directory. Parses JSON log entries to find "Runtime Configuration" and "Original Configuration" messages. Returns parsed configurations as dictionaries.

Source code in tricys/utils/log_utils.py
def restore_configs_from_log(
    timestamp: str,
) -> tuple[Dict[str, Any] | None, Dict[str, Any] | None]:
    """Finds the log file for a given timestamp and restores configurations.

    Args:
        timestamp: The timestamp directory name to search for log files.

    Returns:
        A tuple of (runtime_config, original_config) or (None, None) if not found.

    Note:
        Searches in timestamp/simulation_{timestamp}.log and timestamp/log/ directory.
        Parses JSON log entries to find "Runtime Configuration" and "Original Configuration"
        messages. Returns parsed configurations as dictionaries.
    """
    log_file_path = None
    # Define potential locations for the log file
    search_paths = [
        os.path.join(timestamp, f"simulation_{timestamp}.log"),  # analysis style
        os.path.join(timestamp, "log"),  # simulation style
    ]

    for path in search_paths:
        if os.path.isfile(path):
            log_file_path = path
            break
        if os.path.isdir(path):
            for f in os.listdir(path):
                if f.startswith("simulation_") and f.endswith(".log"):
                    log_file_path = os.path.join(path, f)
                    break
            if log_file_path:
                break

    if not log_file_path:
        print(
            f"ERROR: Main log file not found for timestamp {timestamp}", file=sys.stderr
        )
        return None, None

    runtime_config_str = None
    original_config_str = None
    try:
        with open(log_file_path, "r", encoding="utf-8") as f:
            for line in f:
                try:
                    log_entry = json.loads(line)
                    if "message" in log_entry:
                        if log_entry["message"].startswith(
                            "Runtime Configuration (compact JSON):"
                        ):
                            runtime_config_str = log_entry["message"].replace(
                                "Runtime Configuration (compact JSON): ", ""
                            )
                        elif log_entry["message"].startswith(
                            "Original Configuration (compact JSON):"
                        ):
                            original_config_str = log_entry["message"].replace(
                                "Original Configuration (compact JSON): ", ""
                            )
                    if runtime_config_str and original_config_str:
                        break
                except json.JSONDecodeError:
                    continue
    except Exception as e:
        print(f"ERROR: Failed to read log file {log_file_path}: {e}", file=sys.stderr)
        return None, None

    if not runtime_config_str or not original_config_str:
        print(
            "ERROR: Could not find runtime and/or original configuration in log file.",
            file=sys.stderr,
        )
        return None, None

    try:
        runtime_config = json.loads(runtime_config_str)
        original_config = json.loads(original_config_str)
        return runtime_config, original_config
    except json.JSONDecodeError as e:
        print(
            f"ERROR: Failed to parse configuration from log file: {e}", file=sys.stderr
        )
        return None, None

setup_logging(config, original_config=None)

Configures the logging module based on the application configuration.

Parameters:

Name Type Description Default
config Dict[str, Any]

The main configuration dictionary containing logging settings.

required
original_config Dict[str, Any]

Optional original configuration for additional logging.

None
Note

Sets up JSON formatted logging to console and/or file. Manages log file rotation via delete_old_logs(). Supports main_log_path for analysis cases. Logs both runtime and original configurations in compact JSON format. Clears existing handlers to prevent duplicates.

Source code in tricys/utils/log_utils.py
def setup_logging(
    config: Dict[str, Any], original_config: Dict[str, Any] = None
) -> None:
    """Configures the logging module based on the application configuration.

    Args:
        config: The main configuration dictionary containing logging settings.
        original_config: Optional original configuration for additional logging.

    Note:
        Sets up JSON formatted logging to console and/or file. Manages log file rotation
        via delete_old_logs(). Supports main_log_path for analysis cases. Logs both
        runtime and original configurations in compact JSON format. Clears existing
        handlers to prevent duplicates.
    """
    log_config = config.get("logging", {})
    log_level_str = log_config.get("log_level", "INFO").upper()
    log_level = getattr(logging, log_level_str, logging.INFO)
    log_to_console = log_config.get("log_to_console", True)
    run_timestamp = config.get("run_timestamp")

    log_dir_path = config.get("paths", {}).get("log_dir")
    log_count = log_config.get("log_count", 5)

    root_logger = logging.getLogger()
    root_logger.setLevel(log_level)

    # Clear any existing handlers to prevent duplicate logs
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
        handler.close()

    formatter = jsonlogger.JsonFormatter(
        "%(asctime)s %(name)s %(levelname)s %(message)s"
    )

    if log_to_console:
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        root_logger.addHandler(console_handler)

    if log_dir_path:
        abs_log_dir = os.path.abspath(log_dir_path)
        os.makedirs(abs_log_dir, exist_ok=True)
        delete_old_logs(abs_log_dir, log_count)
        log_file_path = os.path.join(abs_log_dir, f"simulation_{run_timestamp}.log")

        file_handler = logging.FileHandler(log_file_path, mode="a", encoding="utf-8")
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)

        # If a main log path is provided (for analysis cases), add it as an additional handler
        main_log_path = log_config.get("main_log_path")
        if main_log_path:
            try:
                # Ensure the directory for the main log exists, just in case
                os.makedirs(os.path.dirname(main_log_path), exist_ok=True)

                main_log_handler = logging.FileHandler(
                    main_log_path, mode="a", encoding="utf-8"
                )
                main_log_handler.setFormatter(formatter)
                root_logger.addHandler(main_log_handler)
                logger.info(f"Also logging to main log file: {main_log_path}")
            except Exception as e:
                logger.warning(
                    f"Failed to attach main log handler for {main_log_path}: {e}"
                )

        logger.info(f"Logging to file: {log_file_path}")
        # Log the full runtime configuration in a compact JSON format
        logger.info(
            f"Runtime Configuration (compact JSON): {json.dumps(config, separators=(',', ':'), ensure_ascii=False)}"
        )
        if original_config:
            logger.info(
                f"Original Configuration (compact JSON): {json.dumps(original_config, separators=(',', ':'), ensure_ascii=False)}"
            )

Utilities for interacting with the simulation parameter SQLite database.

This module provides functions to create, store, update, and retrieve simulation parameter data from a SQLite database file.

create_parameters_table(db_path)

Creates the parameters table in the database if it does not exist.

Parameters:

Name Type Description Default
db_path str

The path to the SQLite database file.

required

Raises:

Type Description
Error

If a database error occurs during table creation.

Note

Creates parent directories if they don't exist. Table schema includes: name (TEXT PRIMARY KEY), type, default_value, sweep_values, description, dimensions. Uses CREATE TABLE IF NOT EXISTS for safe repeated calls.

Source code in tricys/utils/sqlite_utils.py
def create_parameters_table(db_path: str) -> None:
    """Creates the parameters table in the database if it does not exist.

    Args:
        db_path: The path to the SQLite database file.

    Raises:
        sqlite3.Error: If a database error occurs during table creation.

    Note:
        Creates parent directories if they don't exist. Table schema includes:
        name (TEXT PRIMARY KEY), type, default_value, sweep_values, description, dimensions.
        Uses CREATE TABLE IF NOT EXISTS for safe repeated calls.
    """
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
    logger.debug(f"Ensuring 'parameters' table exists in {db_path}")
    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS parameters (
                    name TEXT PRIMARY KEY,
                    type TEXT,
                    default_value TEXT,
                    sweep_values TEXT,
                    description TEXT,
                    dimensions TEXT
                )
            """
            )
            conn.commit()
    except sqlite3.Error as e:
        logger.error(f"Database error while creating table: {e}", exc_info=True)
        raise

get_parameters_from_db(db_path)

Retrieves parameter details from the database.

Parameters:

Name Type Description Default
db_path str

The path to the SQLite database file.

required

Returns:

Type Description
List[Dict[str, Any]]

A list of parameter dictionaries, each containing the name, default_value,

List[Dict[str, Any]]

description, and sweep_values.

Note

JSON-decodes stored values. Returns empty string for sweep_values if None. Result dict keys: name, default_value, description, sweep_values.

Source code in tricys/utils/sqlite_utils.py
def get_parameters_from_db(db_path: str) -> List[Dict[str, Any]]:
    """Retrieves parameter details from the database.

    Args:
        db_path: The path to the SQLite database file.

    Returns:
        A list of parameter dictionaries, each containing the name, default_value,
        description, and sweep_values.

    Note:
        JSON-decodes stored values. Returns empty string for sweep_values if None.
        Result dict keys: name, default_value, description, sweep_values.
    """
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        cursor.execute(
            "SELECT name, default_value, description, sweep_values FROM parameters"
        )
        params = []
        for name, default_value, description, sweep_values in cursor.fetchall():
            params.append(
                {
                    "name": name,
                    "default_value": json.loads(default_value),
                    "description": description,
                    "sweep_values": json.loads(sweep_values) if sweep_values else "",
                }
            )
    return params

store_parameters_in_db(db_path, params_data)

Stores or replaces a list of parameter details in the database.

Parameters:

Name Type Description Default
db_path str

The path to the SQLite database file.

required
params_data List[Dict[str, Any]]

A list of dictionaries, where each dictionary contains details for a single parameter.

required

Raises:

Type Description
Error

If a database error occurs during insertion.

Note

Uses INSERT OR REPLACE for upsert behavior. JSON-encodes defaultValue and stores dimensions with '()' default. Skips parameters without names. Expected param dict keys: name, type, defaultValue, comment, dimensions.

Source code in tricys/utils/sqlite_utils.py
def store_parameters_in_db(db_path: str, params_data: List[Dict[str, Any]]) -> None:
    """Stores or replaces a list of parameter details in the database.

    Args:
        db_path: The path to the SQLite database file.
        params_data: A list of dictionaries, where each dictionary contains
            details for a single parameter.

    Raises:
        sqlite3.Error: If a database error occurs during insertion.

    Note:
        Uses INSERT OR REPLACE for upsert behavior. JSON-encodes defaultValue
        and stores dimensions with '()' default. Skips parameters without names.
        Expected param dict keys: name, type, defaultValue, comment, dimensions.
    """
    logger.info(f"Storing {len(params_data)} parameters into '{db_path}'")
    if not params_data:
        logger.warning("Parameter data is empty, nothing to store.")
        return

    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.cursor()
            for param in params_data:
                name = param.get("name")
                if not name:
                    continue

                value_json = json.dumps(param.get("defaultValue"))
                dimensions = param.get(
                    "dimensions", "()"
                )  # Default to '()' if not present

                cursor.execute(
                    """
                    INSERT OR REPLACE INTO parameters (name, type, default_value, sweep_values, description, dimensions)
                    VALUES (?, ?, ?, ?, ?, ?)
                """,
                    (
                        name,
                        param.get("type", "Real"),
                        value_json,
                        None,
                        param.get("comment", ""),
                        dimensions,
                    ),
                )
            conn.commit()
        logger.info("Successfully stored/updated parameters in the database.")
    except sqlite3.Error as e:
        logger.error(f"Database error while storing parameters: {e}", exc_info=True)
        raise

update_sweep_values_in_db(db_path, param_sweep)

Updates the 'sweep_values' for specified parameters in the database.

Parameters:

Name Type Description Default
db_path str

The path to the SQLite database file.

required
param_sweep Dict[str, Any]

A dictionary where keys are parameter names and values are the corresponding sweep values (e.g., a list).

required

Raises:

Type Description
Error

If a database error occurs during the update.

Note

Converts numpy arrays to lists before JSON encoding. Warns if parameter not found in database. Uses UPDATE statement so parameters must exist before calling this function.

Source code in tricys/utils/sqlite_utils.py
def update_sweep_values_in_db(db_path: str, param_sweep: Dict[str, Any]) -> None:
    """Updates the 'sweep_values' for specified parameters in the database.

    Args:
        db_path: The path to the SQLite database file.
        param_sweep: A dictionary where keys are parameter names and values are
            the corresponding sweep values (e.g., a list).

    Raises:
        sqlite3.Error: If a database error occurs during the update.

    Note:
        Converts numpy arrays to lists before JSON encoding. Warns if parameter
        not found in database. Uses UPDATE statement so parameters must exist
        before calling this function.
    """
    logger.info(f"Updating sweep values in '{db_path}'")
    if not param_sweep:
        logger.warning("param_sweep dictionary is empty. No values to update.")
        return

    try:
        with sqlite3.connect(db_path) as conn:
            cursor = conn.cursor()
            for param_name, sweep_values in param_sweep.items():
                if isinstance(sweep_values, np.ndarray):
                    sweep_values = sweep_values.tolist()

                sweep_values_json = json.dumps(sweep_values)

                cursor.execute(
                    """
                    UPDATE parameters SET sweep_values = ? WHERE name = ?
                """,
                    (sweep_values_json, param_name),
                )

                if cursor.rowcount == 0:
                    logger.warning(
                        f"Parameter '{param_name}' not found in database. No sweep value updated."
                    )
            conn.commit()
        logger.info("Sweep values updated successfully.")
    except sqlite3.Error as e:
        logger.error(f"Database error while updating sweep values: {e}", exc_info=True)
        raise