跳转至

API 参考 - 后处理模块 (Post-processing)

后处理模块 (Post-processing)

后处理模块 (Post-processing) 提供了在仿真运行后自动执行的分析和报告功能。 请在下方的标签页中选择您感兴趣的特定模块。

This module provides functions for plotting simulation results.

baseline_analysis(results_file_path, output_dir, **kwargs)

Generates baseline analysis plots and reports from a unified HDF5 results file.

Source code in tricys/postprocess/baseline_analysis.py
def baseline_analysis(results_file_path: str, output_dir: str, **kwargs) -> None:
    """Generates baseline analysis plots and reports from a unified HDF5 results file."""
    if not os.path.exists(results_file_path):
        logger.error(f"HDF5 file not found: {results_file_path}")
        return

    try:
        if "glossary_path" in kwargs:
            load_glossary(kwargs["glossary_path"])

        report_dir = Path(output_dir).parent / "report"
        os.makedirs(report_dir, exist_ok=True)

        stream_data = _collect_baseline_stream_data(results_file_path, **kwargs)
        if not stream_data:
            logger.warning("No data found to analyze for baseline analysis.")
            return

        plot_kwargs = kwargs.copy()
        plot_kwargs["color_map"] = stream_data["color_map"]
        plot_kwargs["turning_label"] = stream_data["turning_label"]

        _plot_time_series_with_zoom_from_hdf5(
            results_file_path,
            stream_data["jobs_df"],
            stream_data["result_columns"],
            str(report_dir),
            **plot_kwargs,
        )
        _plot_final_values_bar_chart_from_series(
            stream_data["final_values"], str(report_dir), **plot_kwargs
        )

        base_report_path, base_report_content = (
            _generate_postprocess_report_from_stream_data(
                stream_data, str(report_dir), **kwargs
            )
        )

        if base_report_path and kwargs.get("ai", False):
            env = get_llm_env({"llm_env": kwargs.get("llm_env")})
            api_key = env.get("API_KEY")
            base_url = env.get("BASE_URL")
            ai_models_str = env.get("AI_MODELS") or env.get("AI_MODEL")

            if not api_key or not base_url or not ai_models_str:
                logger.warning(
                    "API_KEY, BASE_URL, or AI_MODELS/AI_MODEL not found in environment variables. Skipping LLM analysis."
                )
                return

            ai_models = [model.strip() for model in ai_models_str.split(",")]

            for ai_model in ai_models:
                logger.info(f"Generating AI analysis for model: {ai_model}")
                sanitized_model_name = "".join(
                    c for c in ai_model if c.isalnum() or c in ("-", "_")
                ).rstrip()
                model_report_filename = (
                    f"analysis_report_baseline_condition_{sanitized_model_name}.md"
                )
                model_report_path = os.path.join(report_dir, model_report_filename)

                with open(model_report_path, "w", encoding="utf-8") as file_obj:
                    file_obj.write(base_report_content)

                llm_analysis = _call_openai_for_postprocess_analysis(
                    api_key=api_key,
                    base_url=base_url,
                    ai_model=ai_model,
                    report_content=base_report_content,
                    **kwargs,
                )

                if llm_analysis:
                    with open(model_report_path, "a", encoding="utf-8") as file_obj:
                        file_obj.write(
                            f"\n\n---\n\n# AI模型分析提示词 ({ai_model})\n\n"
                        )
                        file_obj.write("```markdown\n")
                        file_obj.write(llm_analysis)
                        file_obj.write("\n```\n")
                    logger.info(
                        f"Appended LLM analysis for model {ai_model} to {model_report_path}"
                    )

                    academic_kwargs = kwargs.copy()
                    academic_kwargs["report_filename"] = model_report_filename
                    generate_academic_report(
                        str(report_dir), ai_model=ai_model, **academic_kwargs
                    )
    except Exception as e:
        logger.error(f"Failed to run HDF5 baseline analysis: {e}", exc_info=True)

generate_academic_report(output_dir, ai_model, **kwargs)

Generates a professional academic analysis summary by sending the existing report and a glossary of terms to an LLM.

Source code in tricys/postprocess/baseline_analysis.py
def generate_academic_report(output_dir: str, ai_model: str, **kwargs) -> None:
    """
    Generates a professional academic analysis summary by sending the existing report
    and a glossary of terms to an LLM.
    """
    try:
        logger.info(
            f"Starting generation of the academic analysis summary for model {ai_model}."
        )

        # 1. Read the existing report
        report_filename = kwargs.get(
            "report_filename", "baseline_condition_analysis_report.md"
        )
        report_path = os.path.join(output_dir, report_filename)
        if not os.path.exists(report_path):
            logger.error(
                f"Cannot generate academic summary: Original report '{report_path}' not found."
            )
            return
        with open(report_path, "r", encoding="utf-8") as f:
            original_report_content = f.read()

        # 2. Read the glossary
        glossary_path = kwargs.get("glossary_path", "sheets.csv")
        if not os.path.exists(glossary_path):
            logger.error(
                f"Cannot generate academic summary: Glossary file '{glossary_path}' not found."
            )
            return
        with open(glossary_path, "r", encoding="utf-8") as f:
            glossary_content = f.read()

        # 3. Check for API credentials
        env = get_llm_env({"llm_env": kwargs.get("llm_env")})
        api_key = env.get("API_KEY")
        base_url = env.get("BASE_URL")

        if not all([api_key, base_url, ai_model]):
            logger.warning(
                "API_KEY, BASE_URL, or AI_MODEL not found. Skipping academic summary generation."
            )
            return

        # 4. Construct the prompt
        role_prompt = """**角色:** 您是一位在核聚变工程,特别是氚燃料循环领域,具有深厚学术背景的资深科学家。

**任务:** 您收到了由程序自动生成的初步分析报告和一份专业术语表。请您基于这两份文件,撰写一份更加专业、正式、符合学术发表标准的深度分析总结报告。
"""

        # Find all plots to instruct the LLM to include them
        all_plots = [f for f in os.listdir(output_dir) if f.endswith((".svg", ".png"))]
        plot_list_str = "\n".join([f"    *   `{plot}`" for plot in all_plots])
        instructions_prompt = f"""**指令:**

1.  **专业化语言:** 将初步报告中的模型参数/缩写(例如 `sds.I[1]`, `detailed_var`)替换为术语表中对应的“中文翻译”或“英文术语”。例如,应将“`sds`的库存”表述为“储存与输送系统 (SDS) 的氚库存量 (Tritium Inventory)”。
2.  **学术化重述:** 用严谨、客观的学术语言重新组织和阐述初步报告中的发现。避免使用“看起来”、“好像”等模糊词汇。
3.  **图表和表格的呈现与引用:**
    *   **显示图表:** 在报告的“结果与讨论”部分,您**必须**使用Markdown语法 `![图表标题](图表文件名)` 来**直接嵌入**和显示初步报告中包含的所有图表。可用的图表文件如下:
{plot_list_str}
    *   **引用图表:** 在正文中分析和讨论图表内容时,请使用“如图1所示...”等方式对图表进行编号和文字引用。
    *   **显示表格:** 当呈现数据时(例如,关键阶段的抽样数据或最终值),您**必须**使用Markdown的管道表格(pipe-table)格式来清晰地展示它们。您可以直接复用或重新格式化初步报告中的数据表格。
4.  **结构化报告:** 您的报告是关于一个**基准工况(Baseline Operating Condition)**的模拟分析。报告应包含以下部分:
    *   **摘要 (Abstract):** 简要概括本次**基准工况**模拟的目的、关键发现和核心结论。
    *   **引言 (Introduction):** 描述**基准工况**模拟的背景和目标,提及关键的输入参数。
    *   **结果与讨论 (Results and Discussion):** 这是报告的核心。分点详细论述:
        *   关键性能指标(如氚自持时间、倍增时间等,如果数据可用)的总体趋势。
        *   对关键转折点(例如氚库存的最低点)的物理意义进行深入分析。
        *   评估系统在模拟结束时的最终状态,并讨论氚在各子系统中的分布情况。
    *   **结论 (Conclusion):** 总结本次模拟研究得出的主要学术结论。
5.  **输出格式:** 请直接输出完整的学术分析报告正文,确保所有内容(包括图表和表格)都遵循正确的Markdown语法。

**输入文件:**
"""

        analysis_prompt = f"""
---
### 1. 初步分析报告 (`baseline_condition_analysis_report.md`)
---
{original_report_content}

---
### 2. 专业术语表 (`sheets.csv`)
---
{glossary_content}
"""

        # 5. Call the API
        max_retries = 3
        for attempt in range(max_retries):
            try:
                client = openai.OpenAI(api_key=api_key, base_url=base_url)
                logger.info(
                    f"Sending request to OpenAI API for academic summary for model {ai_model} (Attempt {attempt + 1}/{max_retries})..."
                )

                full_text_prompt = "\n\n".join(
                    [role_prompt, instructions_prompt, analysis_prompt]
                )

                response = client.chat.completions.create(
                    model=ai_model,
                    messages=[{"role": "user", "content": full_text_prompt}],
                    max_tokens=4000,
                )
                academic_summary = response.choices[0].message.content

                # 6. Save the result
                sanitized_model_name = "".join(
                    c for c in ai_model if c.isalnum() or c in ("-", "_")
                ).rstrip()
                summary_filename = (
                    f"academic_analysis_summary_{sanitized_model_name}.md"
                )
                summary_path = os.path.join(output_dir, summary_filename)
                with open(summary_path, "w", encoding="utf-8") as f:
                    f.write(academic_summary)

                logger.info(
                    f"Successfully generated academic analysis summary: {summary_path}"
                )
                return  # Exit after success

            except Exception as e:
                logger.error(
                    f"Error calling OpenAI API for academic summary on attempt {attempt + 1}: {e}"
                )
                if attempt < max_retries - 1:
                    time.sleep(5)
                else:
                    logger.error(
                        f"Failed to generate academic summary for {ai_model} after {max_retries} attempts."
                    )
                    return  # Exit after all retries failed

    except Exception as e:
        logger.error(
            f"Error in generate_academic_report for model {ai_model}: {e}",
            exc_info=True,
        )

load_glossary(glossary_path)

Loads glossary data from the specified CSV path into global dictionaries.

Parameters:

Name Type Description Default
glossary_path str

Path to the glossary CSV file.

required
Note

Expected columns: "模型参数 (Model Parameter)", "英文术语 (English Term)", "中文翻译 (Chinese Translation)". Clears existing glossaries on error. Updates global _english_glossary_map and _chinese_glossary_map.

Source code in tricys/postprocess/baseline_analysis.py
def load_glossary(glossary_path: str) -> None:
    """Loads glossary data from the specified CSV path into global dictionaries.

    Args:
        glossary_path: Path to the glossary CSV file.

    Note:
        Expected columns: "模型参数 (Model Parameter)", "英文术语 (English Term)",
        "中文翻译 (Chinese Translation)". Clears existing glossaries on error.
        Updates global _english_glossary_map and _chinese_glossary_map.
    """
    global _english_glossary_map, _chinese_glossary_map

    if not glossary_path or not os.path.exists(glossary_path):
        logger.warning(
            f"Glossary file not found at {glossary_path}. No labels will be loaded."
        )
        _english_glossary_map = {}
        _chinese_glossary_map = {}
        return

    try:
        df = pd.read_csv(glossary_path)
        if (
            "模型参数 (Model Parameter)" in df.columns
            and "英文术语 (English Term)" in df.columns
            and "中文翻译 (Chinese Translation)" in df.columns
        ):
            df.dropna(subset=["模型参数 (Model Parameter)"], inplace=True)
            _english_glossary_map = pd.Series(
                df["英文术语 (English Term)"].values,
                index=df["模型参数 (Model Parameter)"],
            ).to_dict()
            _chinese_glossary_map = pd.Series(
                df["中文翻译 (Chinese Translation)"].values,
                index=df["模型参数 (Model Parameter)"],
            ).to_dict()
            logger.info(f"Successfully loaded glossary from {glossary_path}.")
        else:
            logger.warning("Glossary CSV does not contain expected columns.")
            _english_glossary_map = {}
            _chinese_glossary_map = {}
    except Exception as e:
        logger.warning(f"Failed to load or parse glossary file. Error: {e}")
        _english_glossary_map = {}
        _chinese_glossary_map = {}

set_plot_language(lang='en')

Sets the preferred language for plot labels.

Parameters:

Name Type Description Default
lang str

'en' for English (default), 'cn' for Chinese.

'en'
Note

For Chinese, sets font to SimHei and adjusts unicode_minus. For English, restores matplotlib defaults. Changes apply globally to all subsequent plots.

Source code in tricys/postprocess/baseline_analysis.py
def set_plot_language(lang: str = "en") -> None:
    """Sets the preferred language for plot labels.

    Args:
        lang: 'en' for English (default), 'cn' for Chinese.

    Note:
        For Chinese, sets font to SimHei and adjusts unicode_minus. For English,
        restores matplotlib defaults. Changes apply globally to all subsequent plots.
    """
    global _use_chinese_labels
    _use_chinese_labels = lang.lower() == "cn"

    if _use_chinese_labels:
        # To display Chinese characters correctly, specify a list of fallback fonts.
        plt.rcParams["font.sans-serif"] = ["SimHei"]  # 替换成你电脑上有的字体
        plt.rcParams["axes.unicode_minus"] = False  # To display minus sign correctly.
        plt.rcParams["font.family"] = "sans-serif"  # 确保字体家族设置生效
    else:
        # Restore default settings
        plt.rcParams["font.sans-serif"] = plt.rcParamsDefault["font.sans-serif"]
        plt.rcParams["axes.unicode_minus"] = plt.rcParamsDefault["axes.unicode_minus"]

analyze_rise_dip(results_file_path, output_dir, **kwargs)

Analyzes HDF5 simulation results to identify curves that fail to exhibit 'dip and rise' feature.

Parameters:

Name Type Description Default
results_file_path str

Path to the HDF5 file containing 'results' and 'jobs' tables.

required
output_dir str

The directory to save the analysis report.

required
**kwargs

Additional parameters.

{}
Source code in tricys/postprocess/rise_analysis.py
def analyze_rise_dip(results_file_path: str, output_dir: str, **kwargs) -> None:
    """Analyzes HDF5 simulation results to identify curves that fail to exhibit 'dip and rise' feature.

    Args:
        results_file_path: Path to the HDF5 file containing 'results' and 'jobs' tables.
        output_dir: The directory to save the analysis report.
        **kwargs: Additional parameters.
    """
    logger.info("Starting HDF5 post-processing: Analyzing curve rise/dip features...")
    all_curves_info = []
    error_count = 0

    if not os.path.exists(results_file_path):
        logger.error(f"Results file not found: {results_file_path}")
        return

    try:
        with pd.HDFStore(results_file_path, mode="r") as store:
            if f"/{RESULTS_KEY}" not in store.keys():
                logger.error("HDF5 file missing 'results' table.")
                return

            try:
                jobs_key = get_jobs_key(store)
            except KeyError:
                logger.error("HDF5 file missing 'jobs' table.")
                return

            jobs_df = store.select(jobs_key)
            jobs_map = jobs_df.set_index("job_id").to_dict(orient="index")
            job_ids = sorted(jobs_map.keys())

            def check_curve(series, job_params, var_name):
                rises = False
                if len(series) > 2:
                    window_size = max(1, int(len(series) * 0.001))
                    smoothed = series.rolling(
                        window=window_size, center=True, min_periods=1
                    ).mean()

                    min_pos_index = smoothed.idxmin()
                    min_val = smoothed.loc[min_pos_index]
                    is_min_at_boundary = (min_pos_index == smoothed.index[0]) or (
                        min_pos_index == smoothed.index[-1]
                    )

                    if not is_min_at_boundary:
                        series_range = smoothed.max() - smoothed.min()
                        tolerance = series_range * 0.001 if series_range > 1e-9 else 0
                        start_val = smoothed.iloc[0]
                        end_val = smoothed.iloc[-1]

                        if (
                            start_val > min_val + tolerance
                            and end_val > min_val + tolerance
                        ):
                            rises = True

                info = job_params.copy()
                info["variable"] = var_name
                info["rises"] = rises
                return info, rises

            batch_size = 100
            total_jobs = len(job_ids)

            for i in range(0, total_jobs, batch_size):
                batch_ids = job_ids[i : i + batch_size]
                min_id = min(batch_ids)
                max_id = max(batch_ids)

                try:
                    res_batch = store.select(
                        RESULTS_KEY,
                        where=f"job_id >= {min_id} & job_id <= {max_id}",
                    )
                except Exception as e:
                    logger.warning(f"Failed to load batch {min_id}-{max_id}: {e}")
                    continue

                grouped = res_batch.groupby("job_id")

                for j_id, group in grouped:
                    if j_id not in jobs_map:
                        continue

                    params = jobs_map[j_id]

                    for col in group.columns:
                        if col in ["time", "job_id"]:
                            continue

                        info, rises = check_curve(
                            group[col].reset_index(drop=True), params, col
                        )
                        all_curves_info.append(info)

                        if not rises:
                            error_count += 1
                            logger.error(
                                f"Feature not detected for Job {j_id}, Var '{col}' (Params: {params})"
                            )

    except Exception as e:
        logger.error(f"HDF5 processing failed: {e}", exc_info=True)

    # Generate a report file with all information unconditionally
    output_filename = kwargs.get("output_filename", "rise_report.json")
    report_path = os.path.join(output_dir, output_filename)

    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(all_curves_info, f, indent=4, ensure_ascii=False)

    if error_count > 0:
        logger.info(f"{error_count} curves failed checks. Report: {report_path}")
    else:
        logger.info(f"All curves passed. Report: {report_path}")

check_thresholds(results_file_path, output_dir, rules, **kwargs)

Analyzes HDF5 simulation results to check if specified columns fall within threshold ranges.

Parameters:

Name Type Description Default
results_file_path str

Path to HDF5 results.

required
output_dir str

Directory for saving alert reports.

required
rules List[Dict[str, Any]]

List of rules.

required
**kwargs

Additional parameters.

{}
Source code in tricys/postprocess/static_alarm.py
def check_thresholds(
    results_file_path: str, output_dir: str, rules: List[Dict[str, Any]], **kwargs
) -> None:
    """Analyzes HDF5 simulation results to check if specified columns fall within threshold ranges.

    Args:
        results_file_path: Path to HDF5 results.
        output_dir: Directory for saving alert reports.
        rules: List of rules.
        **kwargs: Additional parameters.
    """
    logger.info("Starting HDF5 post-processing: Checking thresholds...")

    final_report = []
    total_alarms = 0
    report_only_alarms = kwargs.get("report_only_alarms", False)

    if not os.path.exists(results_file_path):
        logger.error(f"Results file not found: {results_file_path}")
        return

    try:
        with pd.HDFStore(results_file_path, mode="r") as store:
            if f"/{RESULTS_KEY}" not in store.keys():
                return

            try:
                jobs_key = get_jobs_key(store)
            except KeyError:
                return

            jobs_df = store.select(jobs_key)
            jobs_map = jobs_df.set_index("job_id").to_dict(orient="index")
            available_vars = store.get_storer(RESULTS_KEY).table.colnames

            for rule in rules:
                min_val = rule.get("min")
                max_val = rule.get("max")
                columns_to_check = rule.get("columns", [])

                for col in columns_to_check:
                    if col not in available_vars:
                        continue

                    alarm_job_ids = set()

                    if max_val is not None:
                        try:
                            res = store.select(
                                RESULTS_KEY,
                                where=f"{col} > {max_val}",
                                columns=["job_id", col],
                            )
                            if not res.empty:
                                ids = res["job_id"].unique()
                                alarm_job_ids.update(ids)
                                for j_id in ids:
                                    peak = res[res["job_id"] == j_id][col].max()
                                    logger.error(
                                        f"ALARM: Job {j_id}, Var '{col}' > {max_val} (Peak: {peak})"
                                    )
                        except Exception as e:
                            logger.error(f"Query failed for {col} > {max_val}: {e}")

                    if min_val is not None:
                        try:
                            res = store.select(
                                RESULTS_KEY,
                                where=f"{col} < {min_val}",
                                columns=["job_id", col],
                            )
                            if not res.empty:
                                ids = res["job_id"].unique()
                                alarm_job_ids.update(ids)
                                for j_id in ids:
                                    dip = res[res["job_id"] == j_id][col].min()
                                    logger.error(
                                        f"ALARM: Job {j_id}, Var '{col}' < {min_val} (Dip: {dip})"
                                    )
                        except Exception as e:
                            logger.error(f"Query failed for {col} < {min_val}: {e}")

                    target_job_ids = (
                        alarm_job_ids if report_only_alarms else jobs_map.keys()
                    )

                    for j_id in target_job_ids:
                        if j_id in jobs_map:
                            has_alarm = j_id in alarm_job_ids
                            item = jobs_map[j_id].copy()
                            item["variable"] = col
                            item["has_alarm"] = has_alarm
                            item["job_id"] = int(j_id)
                            final_report.append(item)
                            if has_alarm:
                                total_alarms += 1

    except Exception as e:
        logger.error(f"HDF5 threshold check failed: {e}", exc_info=True)

    output_filename = kwargs.get("output_filename", "alarm_report.json")
    report_path = os.path.join(output_dir, output_filename)
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(final_report, f, indent=4, ensure_ascii=False)

    if total_alarms > 0:
        logger.info(f"Found {total_alarms} alarms. Report: {report_path}")
    else:
        logger.info(f"No alarms found. Report: {report_path}")