跳转至

API 参考 - 主程序仿真入口 (Simulation)

主程序仿真入口 (Simulation)

主程序仿真入口 (Simulation) 包含了 TRICYS 的主要执行脚本,负责启动标准仿真和分析工作流。 请在下方的标签页中选择您感兴趣的特定模块。

export_results_to_csv(results_dir, hdf_path)

Exports results from HDF5 to legacy CSV formats.

Source code in tricys/simulation/simulation.py
def export_results_to_csv(results_dir: str, hdf_path: str):
    """Exports results from HDF5 to legacy CSV formats."""
    logger.info(f"Exporting results from {hdf_path} to CSV...")
    try:
        # Export simulation_result.csv (Time-Job Matrix)
        # Needs to pivot: Time vs [Var&Params] for each Job
        # This is memory intensive and was the reason for HDF5, but we do it if requested.

        with pd.HDFStore(hdf_path, mode="r") as store:
            if "/results" not in store.keys():
                logger.warning("No results found in HDF5 to export.")
                return

            # Read all results (chunking could be better but sticking to simple pivot for now)
            # To avoid huge memory usage, we might want to iterate jobs if possible,
            # but standard pivot requires all data.
            # Let's read full table.
            df_results = store.select("results")

            # Read jobs to get parameters
            if "/jobs" in store.keys():
                df_jobs = store.select("jobs")
            else:
                df_jobs = pd.DataFrame()

            # Read summary for export
            if "/summary" in store.keys():
                df_summary = store.select("summary")

                # Join with jobs table to include parameters in the summary CSV
                if not df_jobs.empty:
                    df_summary = pd.merge(df_jobs, df_summary, on="job_id", how="left")

                summary_csv_path = get_unique_filename(
                    results_dir, "summary_metrics.csv"
                )
                df_summary.to_csv(summary_csv_path, index=False)
                logger.info(f"Exported summary metrics to {summary_csv_path}")

        # Pivot Logic for simulation_result.csv
        # Pivot columns: time, values... where columns need to be renamed with params.

        # 1. Join params to results if needed, or just build column names.
        # We need to reconstruct the "Var&Param=Val" column format.

        job_params_map = {}
        if not df_jobs.empty:
            for _, row in df_jobs.iterrows():
                job_id = row["job_id"]
                params = row.drop("job_id").to_dict()
                # filter nulls
                params = {k: v for k, v in params.items() if pd.notna(v)}
                param_str = "&".join([f"{k}={v}" for k, v in params.items()])
                job_params_map[job_id] = param_str

        # 2. Pivot
        # df_results has: time, var1, var2, ..., job_id
        # We want: time, var1&params...

        # Group by job_id to process separate dataframes and then concat (like original logic)
        all_dfs = []
        time_df_added = False

        job_ids = df_results["job_id"].unique()
        job_ids.sort()

        for job_id in job_ids:
            job_df = df_results[df_results["job_id"] == job_id].copy()
            job_df.sort_values("time", inplace=True)

            if not time_df_added:
                all_dfs.append(job_df[["time"]].reset_index(drop=True))
                time_df_added = True

            param_string = job_params_map.get(job_id, "")
            data_cols = job_df.drop(columns=["time", "job_id"], errors="ignore")

            rename_map = {
                col: f"{col}&{param_string}" if param_string else col
                for col in data_cols.columns
            }
            all_dfs.append(data_cols.rename(columns=rename_map).reset_index(drop=True))

        if all_dfs:
            combined_df = pd.concat(all_dfs, axis=1)
            # Cleanup rows with empty time (though check above should handle it)
            combined_df.dropna(subset=["time"], inplace=True)

            csv_path = get_unique_filename(
                results_dir,
                "simulation_result.csv" if len(job_ids) == 1 else "sweep_results.csv",
            )
            combined_df.to_csv(csv_path, index=False)
            logger.info(f"Exported simulation results to {csv_path}")

    except Exception as e:
        logger.error(f"Failed to export CSVs: {e}", exc_info=True)

main(config_or_path, base_dir=None, export_csv=False)

Main entry point for the simulation runner.

This function prepares the configuration, sets up logging, and invokes the main run_simulation orchestrator.

Parameters:

Name Type Description Default
config_or_path Union[str, Dict[str, Any]]

The path to the JSON configuration file OR a config dict.

required
base_dir str

Optional base directory for resolving relative paths if a dict is passed.

None
export_csv bool

Whether to export results to CSV after HDF5 storage.

False
Source code in tricys/simulation/simulation.py
def main(
    config_or_path: Union[str, Dict[str, Any]],
    base_dir: str = None,
    export_csv: bool = False,
) -> None:
    """Main entry point for the simulation runner.

    This function prepares the configuration, sets up logging, and invokes
    the main `run_simulation` orchestrator.

    Args:
        config_or_path: The path to the JSON configuration file OR a config dict.
        base_dir: Optional base directory for resolving relative paths if a dict is passed.
        export_csv: Whether to export results to CSV after HDF5 storage.
    """
    config, original_config = basic_prepare_config(config_or_path, base_dir=base_dir)
    setup_logging(config, original_config)
    logger.info(
        "Loading configuration",
        extra={
            "config_source": (
                os.path.abspath(config_or_path)
                if isinstance(config_or_path, str)
                else "Dictionary"
            ),
        },
    )
    try:
        run_simulation(config, export_csv=export_csv)
        logger.info("Main execution completed successfully")
    except Exception as e:
        logger.error(
            "Main execution failed", exc_info=True, extra={"exception": str(e)}
        )
        sys.exit(1)

run_co_simulation_job(config, job_params, job_id=0)

Runs the full co-simulation workflow in an isolated directory.

This function sets up a self-contained workspace for a single co-simulation job to ensure thread safety. It copies the model package, handles asset files, generates an interceptor model, and executes a two-stage simulation: first to get primary inputs, and second to run the final simulation with the intercepted model.

Parameters:

Name Type Description Default
config dict

The main configuration dictionary.

required
job_params dict

A dictionary of parameters specific to this job.

required
job_id int

A unique identifier for the job, used for workspace naming. Defaults to 0.

0

Returns:

Type Description
str

The path to the final simulation result file, or an empty string if the simulation failed.

Note

Creates isolated workspace in temp_dir/job_{job_id}. Supports both single-file and multi-file Modelica packages. Handles external interceptor handlers. Cleans up workspace after completion. Two-stage simulation: stage 1 generates input CSV, stage 2 runs with interceptor model.

Source code in tricys/simulation/simulation.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def run_co_simulation_job(config: dict, job_params: dict, job_id: int = 0) -> str:
    """Runs the full co-simulation workflow in an isolated directory.

    This function sets up a self-contained workspace for a single co-simulation
    job to ensure thread safety. It copies the model package, handles asset
    files, generates an interceptor model, and executes a two-stage
    simulation: first to get primary inputs, and second to run the final
    simulation with the intercepted model.

    Args:
        config: The main configuration dictionary.
        job_params: A dictionary of parameters specific to this job.
        job_id: A unique identifier for the job, used for workspace naming. Defaults to 0.

    Returns:
        The path to the final simulation result file, or an empty string if the simulation failed.

    Note:
        Creates isolated workspace in temp_dir/job_{job_id}. Supports both single-file
        and multi-file Modelica packages. Handles external interceptor handlers. Cleans
        up workspace after completion. Two-stage simulation: stage 1 generates input CSV,
        stage 2 runs with interceptor model.
    """
    paths_config = config["paths"]
    sim_config = config["simulation"]

    base_temp_dir = os.path.abspath(paths_config.get("temp_dir", "temp"))
    # The temp_dir from the config is now the self-contained workspace's temp folder.
    job_workspace = os.path.join(base_temp_dir, f"job_{job_id}")
    os.makedirs(job_workspace, exist_ok=True)

    omc = None

    try:
        original_package_path = os.path.abspath(paths_config["package_path"])

        # Determine if it's a single-file or multi-file package and copy accordingly.
        if os.path.isfile(original_package_path) and not original_package_path.endswith(
            "package.mo"
        ):
            # SINGLE-FILE: Copy the single .mo file into the root of the job_workspace.
            isolated_package_path = os.path.join(
                job_workspace, os.path.basename(original_package_path)
            )
            shutil.copy(original_package_path, isolated_package_path)
            logger.info(
                "Copied single-file package",
                extra={
                    "job_id": job_id,
                    "source_path": original_package_path,
                    "destination_path": isolated_package_path,
                },
            )
        else:
            # MULTI-FILE: Copy the entire package directory.
            # This handles both a directory path and a path to a package.mo file.
            if os.path.isfile(original_package_path):
                original_package_dir = os.path.dirname(original_package_path)
            else:  # It's a directory
                original_package_dir = original_package_path

            package_dir_name = os.path.basename(original_package_dir)
            isolated_package_dir = os.path.join(job_workspace, package_dir_name)

            if os.path.exists(isolated_package_dir):
                shutil.rmtree(isolated_package_dir)
            shutil.copytree(original_package_dir, isolated_package_dir)

            # Reconstruct the path to the main package file inside the new isolated directory
            if os.path.isfile(original_package_path):
                isolated_package_path = os.path.join(
                    isolated_package_dir, os.path.basename(original_package_path)
                )
            else:  # path was a directory, so we assume package.mo
                isolated_package_path = os.path.join(isolated_package_dir, "package.mo")

            logger.info(
                "Copied multi-file package",
                extra={
                    "job_id": job_id,
                    "source_dir": original_package_dir,
                    "destination_dir": isolated_package_dir,
                },
            )

        isolated_temp_dir = job_workspace
        results_dir = os.path.abspath(paths_config["results_dir"])
        os.makedirs(results_dir, exist_ok=True)

        # Parse co_simulation config - new format with mode at top level
        co_sim_config = config["co_simulation"]
        mode = co_sim_config.get("mode", "interceptor")  # Get mode from top level
        handlers = co_sim_config.get("handlers", [])  # Get handlers array

        # Validate that handlers is a list
        if not isinstance(handlers, list):
            handlers = [handlers]

        model_name = sim_config["model_name"]
        stop_time = sim_config["stop_time"]
        step_size = sim_config["step_size"]

        omc = get_om_session()
        if not load_modelica_package(omc, Path(isolated_package_path).as_posix()):
            raise RuntimeError(
                f"Failed to load Modelica package at {isolated_package_path}"
            )

        # Handle copying of any additional asset directories specified with a '_path' suffix
        for handler_config in handlers:
            if "params" in handler_config:
                # Iterate over a copy of items since we are modifying the dict
                for param_key, param_value in list(handler_config["params"].items()):
                    if isinstance(param_value, str) and param_key.endswith("_path"):
                        original_asset_path_str = param_value

                        # Paths in config are relative to project root. We need the absolute path.
                        original_asset_path = Path(
                            os.path.abspath(original_asset_path_str)
                        )
                        if not original_asset_path.exists():
                            logger.warning(
                                f"Asset file '{original_asset_path}' for parameter '{param_key}' not found. Skipping copy."
                            )
                            continue

                        # Create a destination directory based on the asset's parent folder name
                        # to avoid collisions if multiple files have the same name.
                        asset_dir_name = original_asset_path.parent.name
                        dest_dir = Path(job_workspace) / asset_dir_name
                        os.makedirs(dest_dir, exist_ok=True)

                        dest_path = dest_dir / original_asset_path.name

                        # Copy the file only if it hasn't been copied already
                        if not dest_path.exists():
                            shutil.copy(original_asset_path, dest_path)
                            logger.info(
                                "Copied asset file",
                                extra={
                                    "job_id": job_id,
                                    "source_path": original_asset_path,
                                    "destination_path": dest_path,
                                },
                            )

                        # Update the path in the config to point to the new location
                        handler_config["params"][param_key] = dest_path.as_posix()
                        logger.info(
                            "Updated asset parameter path",
                            extra={
                                "job_id": job_id,
                                "parameter_key": param_key,
                                "new_path": handler_config["params"][param_key],
                            },
                        )

        all_input_vars = []
        for handler_config in handlers:
            submodel_name = handler_config["submodel_name"]
            instance_name = handler_config["instance_name"]
            logger.info(
                "Identifying input ports for submodel",
                extra={
                    "job_id": job_id,
                    "submodel_name": submodel_name,
                },
            )
            components = omc.sendExpression(f"getComponents({submodel_name})")
            input_ports = [
                {"name": c[1], "dim": int(c[11][0]) if c[11] else 1}
                for c in components
                if c[0] == "Modelica.Blocks.Interfaces.RealInput"
            ]
            if not input_ports:
                logger.warning(f"No RealInput ports found in {submodel_name}.")
                continue

            logger.info(
                "Found input ports for instance",
                extra={
                    "job_id": job_id,
                    "instance_name": instance_name,
                    "input_ports": [p["name"] for p in input_ports],
                },
            )
            for port in input_ports:
                full_name = f"{instance_name}.{port['name']}".replace(".", "\\.")
                if port["dim"] > 1:
                    full_name += f"\\[[1-{port['dim']}]\\]"
                all_input_vars.append(full_name)

        variable_filter = "time|" + "|".join(all_input_vars)

        mod = ModelicaSystem(
            fileName=Path(isolated_package_path).as_posix(),
            modelName=model_name,
            variableFilter=variable_filter,
        )
        mod.setSimulationOptions(
            [f"stopTime={stop_time}", f"stepSize={step_size}", "outputFormat=csv"]
        )

        param_settings = [
            format_parameter_value(name, value) for name, value in job_params.items()
        ]
        if param_settings:
            logger.info(
                "Applying parameters for job",
                extra={
                    "job_id": job_id,
                    "param_settings": param_settings,
                },
            )
            mod.setParameters(param_settings)

        primary_result_filename = get_unique_filename(
            isolated_temp_dir, "primary_inputs.csv"
        )
        mod.simulate(resultfile=Path(primary_result_filename).as_posix())

        # Clean up the simulation result file
        if os.path.exists(primary_result_filename):
            try:
                df = pd.read_csv(primary_result_filename)
                df.drop_duplicates(subset=["time"], keep="last", inplace=True)
                df.dropna(subset=["time"], inplace=True)
                df.to_csv(primary_result_filename, index=False)
            except Exception as e:
                logger.warning(
                    "Failed to clean primary result file",
                    extra={
                        "job_id": job_id,
                        "file_path": primary_result_filename,
                        "error": str(e),
                    },
                )

        interception_configs = []
        for handler_config in handlers:
            handler_function_name = handler_config["handler_function"]
            module = None

            # New method: Load from a direct script path
            if "handler_script_path" in handler_config:
                script_path_str = handler_config["handler_script_path"]
                script_path = Path(script_path_str).resolve()
                module_name = script_path.stem

                logger.info(
                    "Loading co-simulation handler from script path",
                    extra={
                        "job_id": job_id,
                        "script_path": str(script_path),
                        "function": handler_function_name,
                    },
                )

                if not script_path.is_file():
                    raise FileNotFoundError(
                        f"Co-simulation handler script not found at {script_path}"
                    )

                spec = importlib.util.spec_from_file_location(module_name, script_path)
                if spec and spec.loader:
                    module = importlib.util.module_from_spec(spec)
                    spec.loader.exec_module(module)
                else:
                    raise ImportError(
                        f"Could not create module spec from script {script_path}"
                    )

            # Old method: Load from module name (backward compatibility)
            elif "handler_module" in handler_config:
                module_name = handler_config["handler_module"]
                logger.info(
                    "Loading co-simulation handler from module",
                    extra={
                        "job_id": job_id,
                        "module_name": module_name,
                        "function": handler_function_name,
                    },
                )
                module = importlib.import_module(module_name)

            else:
                raise KeyError(
                    "Handler config must contain either 'script_path' or 'handler_module'"
                )

            if not module:
                raise ImportError("Failed to load co-simulation handler module.")

            handler_function = getattr(module, handler_function_name)
            instance_name = handler_config["instance_name"]

            co_sim_output_filename = get_unique_filename(
                isolated_temp_dir, f"{instance_name}_outputs.csv"
            )

            output_placeholder = handler_function(
                temp_input_csv=primary_result_filename,
                temp_output_csv=co_sim_output_filename,
                **handler_config.get("params", {}),
            )

            interception_config = {
                "submodel_name": handler_config["submodel_name"],
                "instance_name": handler_config["instance_name"],
                "csv_uri": Path(os.path.abspath(co_sim_output_filename)).as_posix(),
                "output_placeholder": output_placeholder,
            }

            # Add mode from top-level co_simulation config
            interception_config["mode"] = mode

            interception_configs.append(interception_config)

        intercepted_model_paths = integrate_interceptor_model(
            package_path=isolated_package_path,
            model_name=model_name,
            interception_configs=interception_configs,
        )

        verif_config = config["simulation"]["variableFilter"]
        logger.info("Proceeding with Final simulation.", extra={"job_id": job_id})

        # Use mode from top-level config
        if mode == "replacement":
            # For direct replacement, the system model name stays the same
            logger.info(
                "Using direct replacement mode, system model unchanged",
                extra={"job_id": job_id},
            )
            final_model_name = model_name
            final_model_file = isolated_package_path
        else:
            # For interceptor mode, load the interceptor models and use modified system
            for model_path in intercepted_model_paths["interceptor_model_paths"]:
                omc.sendExpression(f"""loadFile("{Path(model_path).as_posix()}")""")
            omc.sendExpression(
                f"""loadFile("{Path(intercepted_model_paths["system_model_path"]).as_posix()}")"""
            )

            package_name, original_system_name = model_name.split(".")
            final_model_name = f"{package_name}.{original_system_name}_Intercepted"
            final_model_file = (
                Path(intercepted_model_paths["system_model_path"]).as_posix()
                if os.path.isfile(isolated_package_path)
                and not original_package_path.endswith("package.mo")
                else Path(isolated_package_path).as_posix()
            )

        verif_mod = ModelicaSystem(
            fileName=final_model_file,
            modelName=final_model_name,
            variableFilter=verif_config,
        )
        verif_mod.setSimulationOptions(
            [f"stopTime={stop_time}", f"stepSize={step_size}", "outputFormat=csv"]
        )
        if param_settings:
            verif_mod.setParameters(param_settings)

        default_result_path = get_unique_filename(
            job_workspace, "co_simulation_results.csv"
        )
        verif_mod.simulate(resultfile=Path(default_result_path).as_posix())

        # Clean up the simulation result file
        if os.path.exists(default_result_path):
            try:
                df = pd.read_csv(default_result_path)
                df.drop_duplicates(subset=["time"], keep="last", inplace=True)
                df.dropna(subset=["time"], inplace=True)
                df.to_csv(default_result_path, index=False)
            except Exception as e:
                logger.warning(
                    "Failed to clean final co-simulation result file",
                    extra={
                        "job_id": job_id,
                        "file_path": default_result_path,
                        "error": str(e),
                    },
                )

        if not os.path.exists(default_result_path):
            raise FileNotFoundError(
                f"Simulation for job {job_id} failed to produce a result file at {default_result_path}"
            )

        # Return the path to the result file inside the temporary workspace
        return Path(default_result_path).as_posix()
    except Exception:
        logger.error(
            "Co-simulation workflow failed", exc_info=True, extra={"job_id": job_id}
        )
        return ""
    finally:
        # Cleanup ModelicaSystem instances which might have their own sessions
        if "mod" in locals() and mod and hasattr(mod, "omc"):
            try:
                mod.omc.sendExpression("quit()")
                # mod.__del__() # triggering del might be safer but explicit quit is sure
            except Exception:
                pass

        if "verif_mod" in locals() and verif_mod and hasattr(verif_mod, "omc"):
            try:
                verif_mod.omc.sendExpression("quit()")
            except Exception:
                pass

        if omc:
            try:
                omc.sendExpression("quit()")
                logger.info("Closed OMPython session", extra={"job_id": job_id})
            except Exception:
                pass

        if not sim_config.get("keep_temp_files", True):
            if os.path.exists(job_workspace):
                shutil.rmtree(job_workspace)
                logger.info(
                    "Cleaned up job workspace",
                    extra={"job_id": job_id, "workspace": job_workspace},
                )

run_post_processing(config, results_df, post_processing_output_dir, results_file_path=None)

Dynamically loads and runs post-processing modules.

This function iterates through the post-processing tasks defined in the configuration. For each task, it dynamically loads the specified module (from a module name or a script path) and executes the target function, passing the HDF5 results path and other parameters to it.

Parameters:

Name Type Description Default
config Dict[str, Any]

The main configuration dictionary.

required
results_df DataFrame

Deprecated legacy argument retained for call compatibility.

required
post_processing_output_dir str

The directory to save any output from the tasks.

required
results_file_path str

Path to the HDF5 results file.

None
Note

Supports two loading methods: 'script_path' for direct .py files, or 'module' for installed packages. Creates output_dir if it doesn't exist. Passes results_file_path, output_dir, and user-specified params to each task function. Logs errors for failed tasks but continues with remaining tasks.

Source code in tricys/simulation/simulation.py
def run_post_processing(
    config: Dict[str, Any],
    results_df: pd.DataFrame,
    post_processing_output_dir: str,
    results_file_path: str = None,
) -> None:
    """Dynamically loads and runs post-processing modules.

    This function iterates through the post-processing tasks defined in the
    configuration. For each task, it dynamically loads the specified module
    (from a module name or a script path) and executes the target function,
    passing the HDF5 results path and other parameters to it.

    Args:
        config: The main configuration dictionary.
        results_df: Deprecated legacy argument retained for call compatibility.
        post_processing_output_dir: The directory to save any output from the tasks.
        results_file_path: Path to the HDF5 results file.

    Note:
        Supports two loading methods: 'script_path' for direct .py files, or 'module'
        for installed packages. Creates output_dir if it doesn't exist. Passes
        results_file_path, output_dir, and user-specified params to each task function.
        Logs errors for failed tasks but continues with remaining tasks.
    """
    post_processing_configs = config.get("post_processing")
    if not post_processing_configs:
        logger.info("No post-processing task configured, skipping this step.")
        return

    logger.info("Starting post-processing phase")

    post_processing_dir = post_processing_output_dir
    os.makedirs(post_processing_dir, exist_ok=True)
    logger.info(
        "Post-processing report will be saved",
        extra={"output_dir": post_processing_dir},
    )

    if not results_file_path:
        logger.error("Post-processing requires a valid HDF5 results file path.")
        return

    for i, task_config in enumerate(post_processing_configs):
        try:
            function_name = task_config["function"]
            params = task_config.get("params", {})
            module = None

            if "llm_env" not in params and isinstance(config.get("llm_env"), dict):
                params["llm_env"] = config["llm_env"]

            # New method: Load from a direct script path
            if "script_path" in task_config:
                script_path_str = task_config["script_path"]
                script_path = Path(script_path_str).resolve()
                module_name = script_path.stem

                logger.info(
                    "Running post-processing task from script path",
                    extra={
                        "task_index": i + 1,
                        "script_path": str(script_path),
                        "function": function_name,
                    },
                )

                if not script_path.is_file():
                    logger.error(
                        "Post-processing script not found",
                        extra={"path": str(script_path)},
                    )
                    continue

                spec = importlib.util.spec_from_file_location(module_name, script_path)
                if spec and spec.loader:
                    module = importlib.util.module_from_spec(spec)
                    spec.loader.exec_module(module)
                else:
                    logger.error(
                        "Could not create module spec from script",
                        extra={"path": str(script_path)},
                    )
                    continue

            elif "module" in task_config:
                module_name = task_config["module"]

                logger.info(
                    "Running post-processing task from module",
                    extra={
                        "task_index": i + 1,
                        "module_name": module_name,
                        "function": function_name,
                    },
                )
                module = importlib.import_module(module_name)

            else:
                logger.warning(
                    "Post-processing task is missing 'script_path' or 'module' key. Skipping.",
                    extra={"task_index": i + 1},
                )
                continue

            if module:
                post_processing_func = getattr(module, function_name)
                post_processing_func(
                    results_file_path=results_file_path,
                    output_dir=post_processing_dir,
                    **params,
                )
            else:
                logger.error(
                    "Failed to load post-processing module.",
                    extra={"task_index": i + 1},
                )

        except Exception:
            logger.error(
                "Post-processing task failed",
                exc_info=True,
                extra={"task_index": i + 1},
            )
    logger.info("Post-processing phase ended")

run_sequential_sweep(config, jobs, post_job_callback=None, filter_schema=None)

Executes a parameter sweep sequentially.

This function runs a series of simulation jobs one after another, reusing the same OpenModelica session for efficiency. Intermediate results for each job are saved to a dedicated job workspace.

Parameters:

Name Type Description Default
config dict

The main configuration dictionary.

required
jobs List[Dict[str, Any]]

A list of job parameter dictionaries to execute.

required

Returns:

Type Description
List[str]

A list of paths to the result files for each job. Failed jobs will have

List[str]

an empty string as their path.

Note

Reuses single OMPython session for all jobs. Cleans result files by removing duplicate/NaN time values. Cleans up workspaces unless keep_temp_files is True. More efficient than parallel mode for small job counts.

Source code in tricys/simulation/simulation.py
def run_sequential_sweep(
    config: dict,
    jobs: List[Dict[str, Any]],
    post_job_callback: Callable[[int, Dict[str, Any], str], None] = None,
    filter_schema: List[Dict[str, Any]] | None = None,
) -> List[str]:
    """Executes a parameter sweep sequentially.

    This function runs a series of simulation jobs one after another, reusing
    the same OpenModelica session for efficiency. Intermediate results for each
    job are saved to a dedicated job workspace.

    Args:
        config: The main configuration dictionary.
        jobs: A list of job parameter dictionaries to execute.

    Returns:
        A list of paths to the result files for each job. Failed jobs will have
        an empty string as their path.

    Note:
        Reuses single OMPython session for all jobs. Cleans result files by removing
        duplicate/NaN time values. Cleans up workspaces unless keep_temp_files is True.
        More efficient than parallel mode for small job counts.
    """
    paths_config = config["paths"]
    sim_config = config["simulation"]

    base_temp_dir = os.path.abspath(paths_config.get("temp_dir", "temp"))
    # The temp_dir is now the self-contained workspace's temp folder.
    os.makedirs(base_temp_dir, exist_ok=True)

    logger.info(
        "Running sequential sweep",
        extra={
            "mode": "sequential",
            "intermediate_files_dir": base_temp_dir,
        },
    )

    omc = None
    result_paths = []
    try:
        omc = get_om_session()
        package_path = os.path.abspath(paths_config["package_path"])
        if not load_modelica_package(omc, Path(package_path).as_posix()):
            raise RuntimeError("Failed to load Modelica package for sequential sweep.")

        mod = ModelicaSystem(
            fileName=Path(package_path).as_posix(),
            modelName=sim_config["model_name"],
            variableFilter=sim_config["variableFilter"],
        )

        mod.setSimulationOptions(
            [
                f"stopTime={sim_config['stop_time']}",
                "tolerance=1e-6",
                "outputFormat=csv",
                f"stepSize={sim_config['step_size']}",
            ]
        )
        # mod.buildModel()

        for i, job_params in enumerate(jobs):
            try:
                # Add standardized progress log for backend parsing (Pattern: Job X of Y)
                logger.info(f"Job {i+1} of {len(jobs)}")

                logger.info(
                    "Running sequential job",
                    extra={
                        "job_index": f"{i+1}/{len(jobs)}",
                        "job_params": job_params,
                    },
                )
                param_settings = [
                    format_parameter_value(name, value)
                    for name, value in job_params.items()
                ]
                if param_settings:
                    mod.setParameters(param_settings)

                job_workspace = os.path.join(base_temp_dir, f"job_{i+1}")
                os.makedirs(job_workspace, exist_ok=True)
                result_filename = f"job_{i+1}_simulation_results.csv"
                result_file_path = os.path.join(job_workspace, result_filename)

                mod.simulate(resultfile=Path(result_file_path).as_posix())

                # Clean up the simulation result file
                if os.path.exists(result_file_path):
                    try:
                        df = pd.read_csv(result_file_path)
                        df.drop_duplicates(subset=["time"], keep="last", inplace=True)
                        df.dropna(subset=["time"], inplace=True)
                        df.to_csv(result_file_path, index=False)
                    except Exception as e:
                        logger.warning(
                            "Failed to clean result file",
                            extra={
                                "job_index": i + 1,
                                "file_path": result_file_path,
                                "error": str(e),
                            },
                        )

                logger.info(
                    "Sequential job finished successfully",
                    extra={
                        "job_index": i + 1,
                        "result_path": result_file_path,
                    },
                )
                result_paths.append(result_file_path)

                # Calculate Summary Metrics
                metrics_definition = config.get("metrics_definition", {})

                # Pre-filter check before metric calc
                skip_metrics = False
                if filter_schema and os.path.exists(result_file_path):
                    try:
                        temp_df = pd.read_csv(result_file_path)
                        violations = find_filter_schema_violations(
                            temp_df, filter_schema
                        )
                        if violations:
                            skip_metrics = True
                            logger.info(
                                f"Job {i+1} matched filter_schema. Skipping metric calculations before HDF5 append."
                            )
                    except Exception:
                        pass

                if (
                    not skip_metrics
                    and metrics_definition
                    and os.path.exists(result_file_path)
                ):
                    try:
                        df_metric = pd.read_csv(result_file_path)
                        single_job_metrics = calculate_single_job_metrics(
                            df_metric, metrics_definition
                        )

                        if single_job_metrics:
                            # Save to JSON in the job workspace
                            metrics_file_path = os.path.join(
                                job_workspace, "job_metrics.json"
                            )
                            with open(metrics_file_path, "w") as f:
                                json.dump(single_job_metrics, f, indent=4)

                            logger.info(
                                "Calculated and saved job metrics",
                                extra={
                                    "job_index": i + 1,
                                    "metrics": single_job_metrics,
                                },
                            )
                    except Exception as e:
                        logger.warning(
                            f"Failed to calculate metrics for job {i+1}: {e}"
                        )

                if post_job_callback:
                    try:
                        post_job_callback(i, job_params, result_file_path)
                    except Exception as e:
                        logger.error(
                            "Post-job callback failed",
                            exc_info=True,
                            extra={"job_index": i + 1, "error": str(e)},
                        )
            except Exception:
                logger.error(
                    "Sequential job failed", exc_info=True, extra={"job_index": i + 1}
                )
                result_paths.append("")

        return result_paths
    except Exception:
        logger.error("Sequential sweep setup failed", exc_info=True)
        return [""] * len(jobs)
    finally:
        if omc:
            omc.sendExpression("quit()")

run_simulation(config, export_csv=False)

Orchestrates the main simulation workflow.

Simplified Mode Logic (Unified HDF5 Storage): 1. Concurrent Mode (concurrent=True): - Uses "Enhanced" execution (Compile Once, Run Many). - Streams results directly to HDF5.

  1. Sequential Mode (concurrent=False):
  2. Uses "Standard" execution (Reuse OMPython Session).
  3. Also streams results directly to HDF5.

Parameters:

Name Type Description Default
config Dict[str, Any]

The main configuration dictionary for the run.

required
export_csv bool

Whether to export results to CSV files at the end.

False
Source code in tricys/simulation/simulation.py
def run_simulation(config: Dict[str, Any], export_csv: bool = False) -> None:
    """Orchestrates the main simulation workflow.

    Simplified Mode Logic (Unified HDF5 Storage):
    1. Concurrent Mode (concurrent=True):
       - Uses "Enhanced" execution (Compile Once, Run Many).
       - Streams results directly to HDF5.

    2. Sequential Mode (concurrent=False):
       - Uses "Standard" execution (Reuse OMPython Session).
       - Also streams results directly to HDF5.

    Args:
        config: The main configuration dictionary for the run.
        export_csv: Whether to export results to CSV files at the end.
    """
    jobs = generate_simulation_jobs(config.get("simulation_parameters", {}))

    try:
        results_dir = os.path.abspath(config["paths"]["results_dir"])
        temp_dir = os.path.abspath(config["paths"].get("temp_dir", "temp"))
    except KeyError as e:
        logger.error(f"Missing required path key in configuration file: {e}")
        sys.exit(1)

    os.makedirs(results_dir, exist_ok=True)
    os.makedirs(temp_dir, exist_ok=True)

    sim_config = config["simulation"]
    use_concurrent = sim_config.get("concurrent", False)
    maximize_workers = sim_config.get("maximize_workers", False)
    max_workers = get_safe_max_workers(
        sim_config.get("max_workers"),
        maximize=maximize_workers,
        task_count=len(jobs),
    )
    is_co_sim = config.get("co_simulation") is not None
    metrics_definition = config.get("metrics_definition", {})
    filter_schema = config.get("filter_schema")

    # HDF5 Setup (Unified)
    hdf_filename = "sweep_results.h5"
    hdf_path = get_unique_filename(results_dir, hdf_filename)

    logger.info(
        f"Starting Simulation (Concurrent={use_concurrent}, CoSim={is_co_sim}). Storage: {hdf_path}"
    )

    with pd.HDFStore(hdf_path, mode="w", complib="blosc", complevel=9) as store:
        # Save configuration
        try:
            config_df = pd.DataFrame({"config_json": [json.dumps(config)]})
            config_df = config_df.astype(object)
            store.put("config", config_df, format="fixed")
        except Exception as e:
            logger.warning(f"Failed to save config to HDF5: {e}")

        # --- Concurrent Mode ---
        if use_concurrent:
            from tricys.utils.log_capture import LogCapture

            with LogCapture() as log_handler:
                pool_args = []
                wrapper_func = None

                if not is_co_sim:
                    # 1. Compile Model Once
                    master_exe, master_xml, om_bin = _build_model_only(config)
                    for i, job_params in enumerate(jobs):
                        kwargs = {
                            "exe_source": master_exe,
                            "xml_source": master_xml,
                            "om_bin_path": om_bin,
                            "base_temp_dir": temp_dir,
                            "sim_config": sim_config,
                            "variable_filter": sim_config.get("variableFilter"),
                            "inplace_execution": True,
                        }
                        pool_args.append((job_params, i + 1, kwargs))
                    wrapper_func = _mp_run_fast_subprocess_job_wrapper
                else:
                    for i, job_params in enumerate(jobs):
                        pool_args.append((config, job_params, i + 1))
                    wrapper_func = _mp_run_co_simulation_job_wrapper

                # Execute Pool
                completed_count = 0
                total_jobs = len(jobs)
                with multiprocessing.Pool(processes=max_workers) as pool:
                    for job_id, job_p, result_path, error in pool.imap_unordered(
                        wrapper_func, pool_args
                    ):
                        completed_count += 1
                        logger.info(f"Job {completed_count} of {total_jobs}")
                        if error:
                            logger.error(f"Job {job_id} failed: {error}")
                        else:
                            _process_h5_result(
                                store,
                                job_id,
                                job_p,
                                result_path,
                                metrics_definition,
                                filter_schema=filter_schema,
                            )

                try:
                    logs_json = log_handler.to_json()
                    log_df = pd.DataFrame({"log": [logs_json]})
                    log_df = log_df.astype(object)
                    store.put("log", log_df, format="fixed")
                except Exception as e:
                    logger.warning(f"Failed to save logs to HDF5: {e}")

        # --- Sequential Mode ---
        else:
            from tricys.utils.log_capture import LogCapture

            with LogCapture() as log_handler:
                if is_co_sim:
                    for i, job_params in enumerate(jobs):
                        job_id = i + 1
                        logger.info(f"Running job {job_id}/{len(jobs)}")
                        try:
                            result_path = run_co_simulation_job(
                                config, job_params, job_id=job_id
                            )
                            _process_h5_result(
                                store,
                                job_id,
                                job_params,
                                result_path,
                                metrics_definition,
                                filter_schema=filter_schema,
                            )
                        except Exception as e:
                            logger.error(f"Job {job_id} failed: {e}")
                else:
                    # Standard Sequential using callback to stream to HDF5
                    logger.info("Running sequential sweep with HDF5 streaming.")

                    def h5_callback(idx, params, res_path):
                        _process_h5_result(
                            store,
                            idx + 1,
                            params,
                            res_path,
                            metrics_definition,
                            filter_schema=filter_schema,
                        )

                    run_sequential_sweep(
                        config,
                        jobs,
                        post_job_callback=h5_callback,
                        filter_schema=filter_schema,
                    )

                try:
                    logs_json = log_handler.to_json()
                    log_df = pd.DataFrame({"log": [logs_json]})
                    log_df = log_df.astype(object)
                    store.put("log", log_df, format="fixed")
                except Exception as e:
                    logger.warning(f"Failed to save logs to HDF5: {e}")

    # Export CSV if requested
    if export_csv:
        export_results_to_csv(results_dir, hdf_path)

    # Post-processing (Unified HDF5)
    post_processing_output_dir = os.path.join(results_dir, "post_processing")
    run_post_processing(
        config, None, post_processing_output_dir, results_file_path=hdf_path
    )

    # Cleanup
    if not sim_config.get("keep_temp_files", True):
        if os.path.exists(temp_dir):
            try:
                shutil.rmtree(temp_dir)
                os.makedirs(temp_dir)
                logger.info("Cleaned up temp directory.")
            except Exception as e:
                logger.warning(f"Failed to cleanup temp dir: {e}")

main(config_or_path, base_dir=None)

Main entry point for a simulation analysis run.

This function prepares the configuration for an analysis run, sets up logging, and calls the main run_simulation orchestrator for analysis.

Parameters:

Name Type Description Default
config_or_path Union[str, Dict[str, Any]]

The path to the JSON configuration file OR a config dict.

required
base_dir str

Optional base directory for resolving relative paths.

None
Source code in tricys/simulation/simulation_analysis.py
def main(config_or_path: Union[str, Dict[str, Any]], base_dir: str = None) -> None:
    """Main entry point for a simulation analysis run.

    This function prepares the configuration for an analysis run, sets up
    logging, and calls the main `run_simulation` orchestrator for analysis.

    Args:
        config_or_path: The path to the JSON configuration file OR a config dict.
        base_dir: Optional base directory for resolving relative paths.
    """
    config, original_config = analysis_prepare_config(config_or_path, base_dir=base_dir)
    setup_logging(config, original_config)
    logger.info(
        "Loading configuration",
        extra={
            "config_source": (
                os.path.abspath(config_or_path)
                if isinstance(config_or_path, str)
                else "Dictionary"
            ),
        },
    )
    try:
        run_simulation(config)
        logger.info("Main execution completed successfully")
    except Exception as e:
        logger.error(
            "Main execution failed", exc_info=True, extra={"exception": str(e)}
        )
        sys.exit(1)

retry_analysis(timestamp)

Retries a failed AI analysis for a given run timestamp.

This function restores the configuration from the log file of a previous run and re-triggers the AI-dependent parts of the analysis, including report generation and consolidation.

Parameters:

Name Type Description Default
timestamp str

The timestamp of the run to retry (e.g., "20230101_120000").

required
Source code in tricys/simulation/simulation_analysis.py
def retry_analysis(timestamp: str) -> None:
    """Retries a failed AI analysis for a given run timestamp.

    This function restores the configuration from the log file of a previous
    run and re-triggers the AI-dependent parts of the analysis, including
    report generation and consolidation.

    Args:
        timestamp (str): The timestamp of the run to retry (e.g., "20230101_120000").
    """
    config, original_config = restore_configs_from_log(timestamp)
    if not config or not original_config:
        # Error is printed inside the helper function
        sys.exit(1)

    config["run_timestamp"] = timestamp

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        stream=sys.stdout,
    )
    logger = logging.getLogger(__name__)
    logger.info(
        f"Successfully restored configuration for timestamp {timestamp} for retry."
    )

    logger.info("Starting in AI analysis retry mode...")
    if not analysis_validate_config(config):
        sys.exit(1)

    case_configs = analysis_setup_analysis_cases_workspaces(config)
    if not case_configs:
        logger.error("Could not set up case workspaces for retry. Aborting.")
        sys.exit(1)

    retry_ai_analysis(case_configs, config)
    consolidate_reports(case_configs, config)

    logger.info("AI analysis retry and consolidation complete.")

run_simulation(config)

Orchestrates the simulation analysis workflow (Default: Enhanced Mode).

Source code in tricys/simulation/simulation_analysis.py
def run_simulation(config: Dict[str, Any]) -> None:
    """Orchestrates the simulation analysis workflow (Default: Enhanced Mode)."""

    # [Analysis Cases Handling - Preserved]
    if _handle_analysis_cases(config):
        return

    # Core Execution Logic
    jobs = generate_simulation_jobs(config.get("simulation_parameters", {}))
    _add_baseline_jobs(config, jobs)  # Helper to de-clutter (implied logic moved/kept)

    try:
        results_dir = os.path.abspath(config["paths"]["results_dir"])
        temp_dir = os.path.abspath(config["paths"].get("temp_dir", "temp"))
    except KeyError as e:
        logger.error(f"Missing config path: {e}")
        sys.exit(1)

    os.makedirs(results_dir, exist_ok=True)
    os.makedirs(temp_dir, exist_ok=True)

    hdf_filename = "sweep_results.h5"
    hdf_path = get_unique_filename(results_dir, hdf_filename)

    is_co_sim = config.get("co_simulation") is not None
    use_concurrent = config["simulation"].get("concurrent", False)
    maximize_workers = config["simulation"].get("maximize_workers", False)
    max_workers = get_safe_max_workers(
        config["simulation"].get("max_workers"),
        maximize=maximize_workers,
        task_count=len(jobs),
    )

    logger.info(
        f"Starting Simulation Analysis (Enhanced Mode). CoSim={is_co_sim}, Concurrent={use_concurrent}"
    )

    # Prepare Context
    fast_context = {}
    if not is_co_sim:
        # Build Once for Standard Jobs
        try:
            exe, xml, om_bin = _build_model_only(config)
            fast_context = {
                "exe": exe,
                "xml": xml,
                "om_bin": om_bin,
                "temp_dir": temp_dir,
            }
        except Exception as e:
            logger.error(f"Build failed: {e}")
            sys.exit(1)

    # Run Sweep
    final_results = []

    from tricys.utils.log_capture import LogCapture

    # Get metrics definition for summary calculation
    metrics_definition = config.get("sensitivity_analysis", {}).get(
        "metrics_definition", {}
    )

    # Capture logs for HDF5 storage
    with LogCapture() as log_handler:
        try:
            with pd.HDFStore(hdf_path, mode="w", complib="blosc", complevel=9) as store:
                # Metadata
                meta_df = pd.DataFrame(jobs)
                if not meta_df.empty:
                    meta_df["job_id"] = range(1, len(jobs) + 1)
                    # Force object dtype only for string/object columns to avoid HDF5 issues
                    for col in meta_df.select_dtypes(
                        include=["object", "string"]
                    ).columns:
                        meta_df[col] = meta_df[col].astype(object)
                    store.put("jobs", meta_df, format="table", data_columns=True)

                # Results Handler
                def _handle_result(job_id, params, result_data):
                    opts, vals, res_path = result_data

                    # Save trace to HDF5
                    if res_path and os.path.exists(res_path):
                        try:
                            df = pd.read_csv(res_path)
                            df["job_id"] = job_id
                            store.append("results", df, index=False, data_columns=True)

                            # Cleanup Job Workspace
                            job_dir = os.path.dirname(res_path)
                            if "job_" in os.path.basename(job_dir):
                                shutil.rmtree(job_dir, ignore_errors=True)
                        except Exception as e:
                            logger.error(f"HDF5 write failed for job {job_id}: {e}")

                    # Calculate and Save Summary Metrics
                    if res_path and os.path.exists(res_path) and metrics_definition:
                        try:
                            # We need to re-read df if not available (it IS available above, but scoped inside try)
                            # Simplified: just read again or reuse if variable scope allows.
                            # Variable 'df' is local to try block above.
                            # Let's read clean instance to be safe
                            df_metric = pd.read_csv(res_path)

                            single_job_metrics = calculate_single_job_metrics(
                                df_metric, metrics_definition
                            )

                            if single_job_metrics:
                                summary_df = build_single_job_summary_df(
                                    job_id,
                                    single_job_metrics,
                                    metrics_definition,
                                )
                                if not summary_df.empty:
                                    store.append(
                                        "summary",
                                        summary_df,
                                        index=False,
                                        data_columns=True,
                                    )

                        except Exception as e:
                            logger.warning(
                                f"Failed to calculate/save summary metrics for job {job_id}: {e}"
                            )

                    # Collect Summary
                    entry = params.copy()
                    entry.update(opts)
                    entry.update(vals)
                    final_results.append(entry)

                # Prepare Execution Args
                if not is_co_sim:
                    # STANDARD (FAST) PATH
                    pool_args = [
                        (job, i + 1, config, fast_context) for i, job in enumerate(jobs)
                    ]
                    wrapper = _enhanced_runner_wrapper
                else:
                    # CO-SIM PATH
                    pool_args = [(config, job, i + 1) for i, job in enumerate(jobs)]
                    wrapper = _co_sim_runner_wrapper

                # Run Execution
                # Counter for progress logging
                completed_count = 0
                total_jobs = len(jobs)

                if use_concurrent:
                    with multiprocessing.Pool(max_workers) as pool:
                        for job_id, parsed_params, res_data, err in pool.imap_unordered(
                            wrapper, pool_args
                        ):
                            completed_count += 1
                            if err:
                                logger.error(f"Job {job_id} failed: {err}")
                            else:
                                _handle_result(job_id, parsed_params, res_data)

                            # Log progress
                            logger.info(f"Job {completed_count} of {total_jobs}")
                else:
                    # Sequential Loop
                    for args in pool_args:
                        job_id, parsed_params, res_data, err = wrapper(args)
                        completed_count += 1
                        if err:
                            logger.error(f"Job {job_id} failed: {err}")
                        else:
                            _handle_result(job_id, parsed_params, res_data)

                        # Log progress
                        logger.info(f"Job {completed_count} of {total_jobs}")

                # Save Config and Logs at the end
                try:
                    # Save Config
                    config_df = pd.DataFrame({"config_json": [json.dumps(config)]})
                    config_df = config_df.astype(object)
                    store.put("config", config_df, format="fixed")

                    # Save Logs
                    logger.info("Saving execution logs to HDF5.")
                    logs_json = log_handler.to_json()
                    log_df = pd.DataFrame({"log": [logs_json]})
                    log_df = log_df.astype(object)
                    store.put("log", log_df, format="fixed")
                except Exception as e:
                    logger.warning(f"Failed to save finalize data to HDF5: {e}")

        except Exception as e:
            logger.error(f"Sweep failed: {e}", exc_info=True)

    # Post Analysis
    if final_results and _get_optimization_tasks(config):
        pd.DataFrame(final_results).to_csv(
            os.path.join(results_dir, "requierd_tbr_summary.csv"), index=False
        )

    # Sensitivity & Plots
    _run_sensitivity_analysis(config, results_dir, jobs)

    # General Post Processing
    run_post_processing(
        config,
        None,
        os.path.join(results_dir, "post_processing"),
        results_file_path=hdf_path,
    )

    # Final Cleanup
    if not config["simulation"].get("keep_temp_files", True):
        shutil.rmtree(temp_dir, ignore_errors=True)
        os.makedirs(temp_dir, exist_ok=True)