跳转至

API 参考 - 后处理模块 (Post-processing)

后处理模块 (Post-processing)

后处理模块 (Post-processing) 提供了在仿真运行后自动执行的分析和报告功能。 请在下方的标签页中选择您感兴趣的特定模块。

This module provides functions for plotting simulation results.

baseline_analysis(results_df, output_dir, **kwargs)

Generates baseline analysis plots and reports.

Creates three outputs: 1. A time-series plot with overall view and detailed zoom around turning point 2. A bar chart showing final values of all variables, sorted 3. An optional Markdown report with AI analysis (if 'ai' flag is True)

Parameters:

Name Type Description Default
results_df DataFrame

The combined DataFrame of simulation results.

required
output_dir str

The directory to save the plots and report.

required
**kwargs

Additional parameters from config, including 'ai' flag, 'detailed_var', 'glossary_path', and AI model settings.

{}
Note

Removes duplicate rows before processing. Creates bilingual plots (English and Chinese). If AI analysis enabled, requires API_KEY, BASE_URL, and AI_MODELS/AI_MODEL environment variables. Generates both initial LLM analysis and academic summary.

Source code in tricys/postprocess/baseline_analysis.py
def baseline_analysis(results_df: pd.DataFrame, output_dir: str, **kwargs) -> None:
    """Generates baseline analysis plots and reports.

    Creates three outputs:
    1. A time-series plot with overall view and detailed zoom around turning point
    2. A bar chart showing final values of all variables, sorted
    3. An optional Markdown report with AI analysis (if 'ai' flag is True)

    Args:
        results_df: The combined DataFrame of simulation results.
        output_dir: The directory to save the plots and report.
        **kwargs: Additional parameters from config, including 'ai' flag, 'detailed_var',
            'glossary_path', and AI model settings.

    Note:
        Removes duplicate rows before processing. Creates bilingual plots (English and
        Chinese). If AI analysis enabled, requires API_KEY, BASE_URL, and AI_MODELS/AI_MODEL
        environment variables. Generates both initial LLM analysis and academic summary.
    """
    if "time" not in results_df.columns:
        logger.error("Plotting failed: 'time' column not found in results DataFrame.")
        return

    if "glossary_path" in kwargs:
        load_glossary(kwargs["glossary_path"])

    os.removedirs(output_dir) if os.path.exists(output_dir) else None
    p = Path(output_dir)
    output_dir = p.parent / "report"
    os.makedirs(output_dir, exist_ok=True)

    df = results_df.copy()
    # Remove duplicate rows before processing
    df.drop_duplicates(inplace=True)
    df.reset_index(drop=True, inplace=True)

    # Create a unified color map for all variables
    all_plot_columns = sorted([col for col in df.columns if col != "time"])
    colors = sns.color_palette("turbo", len(all_plot_columns))
    color_map = dict(zip(all_plot_columns, colors))

    # Add the color map to kwargs to pass it to the helper functions
    plot_kwargs = kwargs.copy()
    plot_kwargs["color_map"] = color_map

    # Generate the time-series plot with zoom
    _plot_time_series_with_zoom(df, output_dir, **plot_kwargs)

    # Generate the bar chart of final values
    _plot_final_values_bar_chart(df, output_dir, **plot_kwargs)

    # --- Report Generation and AI Analysis ---
    base_report_path, base_report_content = _generate_postprocess_report(
        df, output_dir, **kwargs
    )

    if base_report_path and kwargs.get("ai", False):
        load_dotenv()
        api_key = os.environ.get("API_KEY")
        base_url = os.environ.get("BASE_URL")

        # Prioritize AI_MODELS, fallback to AI_MODEL
        ai_models_str = os.environ.get("AI_MODELS")
        if not ai_models_str:
            ai_models_str = os.environ.get("AI_MODEL")

        if not api_key or not base_url or not ai_models_str:
            logger.warning(
                "API_KEY, BASE_URL, or AI_MODELS/AI_MODEL not found in environment variables. Skipping LLM analysis."
            )
            return

        ai_models = [model.strip() for model in ai_models_str.split(",")]

        for ai_model in ai_models:
            logger.info(f"Generating AI analysis for model: {ai_model}")

            sanitized_model_name = "".join(
                c for c in ai_model if c.isalnum() or c in ("-", "_")
            ).rstrip()

            model_report_filename = (
                f"analysis_report_baseline_condition_{sanitized_model_name}.md"
            )
            model_report_path = os.path.join(output_dir, model_report_filename)

            with open(model_report_path, "w", encoding="utf-8") as f:
                f.write(base_report_content)

            llm_analysis = _call_openai_for_postprocess_analysis(
                api_key=api_key,
                base_url=base_url,
                ai_model=ai_model,
                report_content=base_report_content,
                **kwargs,
            )

            if llm_analysis:
                with open(model_report_path, "a", encoding="utf-8") as f:
                    f.write(f"\n\n---\n\n# AI模型分析提示词 ({ai_model})\n\n")
                    f.write("```markdown\n")
                    f.write(llm_analysis)
                    f.write("\n```\n")
                logger.info(
                    f"Appended LLM analysis for model {ai_model} to {model_report_path}"
                )

                # --- ADDED: Second AI call for academic summary ---
                academic_kwargs = kwargs.copy()
                academic_kwargs["report_filename"] = model_report_filename
                generate_academic_report(
                    output_dir, ai_model=ai_model, **academic_kwargs
                )

generate_academic_report(output_dir, ai_model, **kwargs)

Generates a professional academic analysis summary by sending the existing report and a glossary of terms to an LLM.

Source code in tricys/postprocess/baseline_analysis.py
def generate_academic_report(output_dir: str, ai_model: str, **kwargs) -> None:
    """
    Generates a professional academic analysis summary by sending the existing report
    and a glossary of terms to an LLM.
    """
    try:
        logger.info(
            f"Starting generation of the academic analysis summary for model {ai_model}."
        )

        # 1. Read the existing report
        report_filename = kwargs.get(
            "report_filename", "baseline_condition_analysis_report.md"
        )
        report_path = os.path.join(output_dir, report_filename)
        if not os.path.exists(report_path):
            logger.error(
                f"Cannot generate academic summary: Original report '{report_path}' not found."
            )
            return
        with open(report_path, "r", encoding="utf-8") as f:
            original_report_content = f.read()

        # 2. Read the glossary
        glossary_path = kwargs.get("glossary_path", "sheets.csv")
        if not os.path.exists(glossary_path):
            logger.error(
                f"Cannot generate academic summary: Glossary file '{glossary_path}' not found."
            )
            return
        with open(glossary_path, "r", encoding="utf-8") as f:
            glossary_content = f.read()

        # 3. Check for API credentials
        load_dotenv()
        api_key = os.environ.get("API_KEY")
        base_url = os.environ.get("BASE_URL")

        if not all([api_key, base_url, ai_model]):
            logger.warning(
                "API_KEY, BASE_URL, or AI_MODEL not found. Skipping academic summary generation."
            )
            return

        # 4. Construct the prompt
        role_prompt = """**角色:** 您是一位在核聚变工程,特别是氚燃料循环领域,具有深厚学术背景的资深科学家。

**任务:** 您收到了由程序自动生成的初步分析报告和一份专业术语表。请您基于这两份文件,撰写一份更加专业、正式、符合学术发表标准的深度分析总结报告。
"""

        # Find all plots to instruct the LLM to include them
        all_plots = [f for f in os.listdir(output_dir) if f.endswith((".svg", ".png"))]
        plot_list_str = "\n".join([f"    *   `{plot}`" for plot in all_plots])
        instructions_prompt = f"""**指令:**

1.  **专业化语言:** 将初步报告中的模型参数/缩写(例如 `sds.I[1]`, `detailed_var`)替换为术语表中对应的“中文翻译”或“英文术语”。例如,应将“`sds`的库存”表述为“储存与输送系统 (SDS) 的氚库存量 (Tritium Inventory)”。
2.  **学术化重述:** 用严谨、客观的学术语言重新组织和阐述初步报告中的发现。避免使用“看起来”、“好像”等模糊词汇。
3.  **图表和表格的呈现与引用:**
    *   **显示图表:** 在报告的“结果与讨论”部分,您**必须**使用Markdown语法 `![图表标题](图表文件名)` 来**直接嵌入**和显示初步报告中包含的所有图表。可用的图表文件如下:
{plot_list_str}
    *   **引用图表:** 在正文中分析和讨论图表内容时,请使用“如图1所示...”等方式对图表进行编号和文字引用。
    *   **显示表格:** 当呈现数据时(例如,关键阶段的抽样数据或最终值),您**必须**使用Markdown的管道表格(pipe-table)格式来清晰地展示它们。您可以直接复用或重新格式化初步报告中的数据表格。
4.  **结构化报告:** 您的报告是关于一个**基准工况(Baseline Operating Condition)**的模拟分析。报告应包含以下部分:
    *   **摘要 (Abstract):** 简要概括本次**基准工况**模拟的目的、关键发现和核心结论。
    *   **引言 (Introduction):** 描述**基准工况**模拟的背景和目标,提及关键的输入参数。
    *   **结果与讨论 (Results and Discussion):** 这是报告的核心。分点详细论述:
        *   关键性能指标(如氚自持时间、倍增时间等,如果数据可用)的总体趋势。
        *   对关键转折点(例如氚库存的最低点)的物理意义进行深入分析。
        *   评估系统在模拟结束时的最终状态,并讨论氚在各子系统中的分布情况。
    *   **结论 (Conclusion):** 总结本次模拟研究得出的主要学术结论。
5.  **输出格式:** 请直接输出完整的学术分析报告正文,确保所有内容(包括图表和表格)都遵循正确的Markdown语法。

**输入文件:**
"""

        analysis_prompt = f"""
---
### 1. 初步分析报告 (`baseline_condition_analysis_report.md`)
---
{original_report_content}

---
### 2. 专业术语表 (`sheets.csv`)
---
{glossary_content}
"""

        # 5. Call the API
        max_retries = 3
        for attempt in range(max_retries):
            try:
                client = openai.OpenAI(api_key=api_key, base_url=base_url)
                logger.info(
                    f"Sending request to OpenAI API for academic summary for model {ai_model} (Attempt {attempt + 1}/{max_retries})..."
                )

                full_text_prompt = "\n\n".join(
                    [role_prompt, instructions_prompt, analysis_prompt]
                )

                response = client.chat.completions.create(
                    model=ai_model,
                    messages=[{"role": "user", "content": full_text_prompt}],
                    max_tokens=4000,
                )
                academic_summary = response.choices[0].message.content

                # 6. Save the result
                sanitized_model_name = "".join(
                    c for c in ai_model if c.isalnum() or c in ("-", "_")
                ).rstrip()
                summary_filename = (
                    f"academic_analysis_summary_{sanitized_model_name}.md"
                )
                summary_path = os.path.join(output_dir, summary_filename)
                with open(summary_path, "w", encoding="utf-8") as f:
                    f.write(academic_summary)

                logger.info(
                    f"Successfully generated academic analysis summary: {summary_path}"
                )
                return  # Exit after success

            except Exception as e:
                logger.error(
                    f"Error calling OpenAI API for academic summary on attempt {attempt + 1}: {e}"
                )
                if attempt < max_retries - 1:
                    time.sleep(5)
                else:
                    logger.error(
                        f"Failed to generate academic summary for {ai_model} after {max_retries} attempts."
                    )
                    return  # Exit after all retries failed

    except Exception as e:
        logger.error(
            f"Error in generate_academic_report for model {ai_model}: {e}",
            exc_info=True,
        )

load_glossary(glossary_path)

Loads glossary data from the specified CSV path into global dictionaries.

Parameters:

Name Type Description Default
glossary_path str

Path to the glossary CSV file.

required
Note

Expected columns: "模型参数 (Model Parameter)", "英文术语 (English Term)", "中文翻译 (Chinese Translation)". Clears existing glossaries on error. Updates global _english_glossary_map and _chinese_glossary_map.

Source code in tricys/postprocess/baseline_analysis.py
def load_glossary(glossary_path: str) -> None:
    """Loads glossary data from the specified CSV path into global dictionaries.

    Args:
        glossary_path: Path to the glossary CSV file.

    Note:
        Expected columns: "模型参数 (Model Parameter)", "英文术语 (English Term)",
        "中文翻译 (Chinese Translation)". Clears existing glossaries on error.
        Updates global _english_glossary_map and _chinese_glossary_map.
    """
    global _english_glossary_map, _chinese_glossary_map

    if not glossary_path or not os.path.exists(glossary_path):
        logger.warning(
            f"Glossary file not found at {glossary_path}. No labels will be loaded."
        )
        _english_glossary_map = {}
        _chinese_glossary_map = {}
        return

    try:
        df = pd.read_csv(glossary_path)
        if (
            "模型参数 (Model Parameter)" in df.columns
            and "英文术语 (English Term)" in df.columns
            and "中文翻译 (Chinese Translation)" in df.columns
        ):
            df.dropna(subset=["模型参数 (Model Parameter)"], inplace=True)
            _english_glossary_map = pd.Series(
                df["英文术语 (English Term)"].values,
                index=df["模型参数 (Model Parameter)"],
            ).to_dict()
            _chinese_glossary_map = pd.Series(
                df["中文翻译 (Chinese Translation)"].values,
                index=df["模型参数 (Model Parameter)"],
            ).to_dict()
            logger.info(f"Successfully loaded glossary from {glossary_path}.")
        else:
            logger.warning("Glossary CSV does not contain expected columns.")
            _english_glossary_map = {}
            _chinese_glossary_map = {}
    except Exception as e:
        logger.warning(f"Failed to load or parse glossary file. Error: {e}")
        _english_glossary_map = {}
        _chinese_glossary_map = {}

set_plot_language(lang='en')

Sets the preferred language for plot labels.

Parameters:

Name Type Description Default
lang str

'en' for English (default), 'cn' for Chinese.

'en'
Note

For Chinese, sets font to SimHei and adjusts unicode_minus. For English, restores matplotlib defaults. Changes apply globally to all subsequent plots.

Source code in tricys/postprocess/baseline_analysis.py
def set_plot_language(lang: str = "en") -> None:
    """Sets the preferred language for plot labels.

    Args:
        lang: 'en' for English (default), 'cn' for Chinese.

    Note:
        For Chinese, sets font to SimHei and adjusts unicode_minus. For English,
        restores matplotlib defaults. Changes apply globally to all subsequent plots.
    """
    global _use_chinese_labels
    _use_chinese_labels = lang.lower() == "cn"

    if _use_chinese_labels:
        # To display Chinese characters correctly, specify a list of fallback fonts.
        plt.rcParams["font.sans-serif"] = ["SimHei"]  # 替换成你电脑上有的字体
        plt.rcParams["axes.unicode_minus"] = False  # To display minus sign correctly.
        plt.rcParams["font.family"] = "sans-serif"  # 确保字体家族设置生效
    else:
        # Restore default settings
        plt.rcParams["font.sans-serif"] = plt.rcParamsDefault["font.sans-serif"]
        plt.rcParams["axes.unicode_minus"] = plt.rcParamsDefault["axes.unicode_minus"]

analyze_rise_dip(results_df, output_dir, **kwargs)

Analyzes parameter sweep results to identify curves that fail to exhibit 'dip and rise' feature.

A curve exhibits the 'dip and rise' feature if: 1. It has a clear minimum point (not at boundaries) 2. Values at both start and end are higher than the minimum (with tolerance)

Parameters:

Name Type Description Default
results_df DataFrame

The combined DataFrame of simulation results, including time and multiple parameter combinations.

required
output_dir str

The directory to save the analysis report.

required
**kwargs

Additional parameters from config, e.g., 'output_filename'.

{}
Note

Uses 0.1% smoothing window to handle noisy data. Column names expected in format 'variable&param1=v1&param2=v2'. Logs ERROR for each curve without the feature. Always generates rise_report.json with analysis results for all curves, including 'rises' boolean flag.

Source code in tricys/postprocess/rise_analysis.py
def analyze_rise_dip(results_df: pd.DataFrame, output_dir: str, **kwargs) -> None:
    """Analyzes parameter sweep results to identify curves that fail to exhibit 'dip and rise' feature.

    A curve exhibits the 'dip and rise' feature if:
    1. It has a clear minimum point (not at boundaries)
    2. Values at both start and end are higher than the minimum (with tolerance)

    Args:
        results_df: The combined DataFrame of simulation results, including time and
            multiple parameter combinations.
        output_dir: The directory to save the analysis report.
        **kwargs: Additional parameters from config, e.g., 'output_filename'.

    Note:
        Uses 0.1% smoothing window to handle noisy data. Column names expected in
        format 'variable&param1=v1&param2=v2'. Logs ERROR for each curve without
        the feature. Always generates rise_report.json with analysis results for
        all curves, including 'rises' boolean flag.
    """
    logger.info("Starting post-processing: Analyzing curve rise/dip features...")
    all_curves_info = []
    error_count = 0

    # Iterate over each column of the DataFrame (except for the 'time' column)
    for col_name in results_df.columns:
        if col_name == "time":
            continue

        # Parse parameters from the column name 'variable&param1=v1&param2=v2'
        try:
            parts = col_name.split("&")
            if len(parts) < 2:  # Must have at least one variable name and one parameter
                logger.warning(
                    f"Column name '{col_name}' has an incorrect format, skipping."
                )
                continue

            # parts[0] is the variable name, parse parameters from parts[1:]
            param_parts = parts[1:]
            job_params = dict(item.split("=") for item in param_parts)
            job_params["variable"] = parts[
                0
            ]  # Also add the original variable name to the info

        except (ValueError, IndexError):
            logger.warning(
                f"Could not parse parameters from column name '{col_name}', skipping."
            )
            continue

        series = results_df[col_name]
        rises = False
        if len(series) > 2:
            # This logic is inspired by `time_of_turning_point` from `tricys/analysis/metric.py`.
            # It uses a smoothed series to determine if there is a 'dip and rise' trend.
            window_size = max(1, int(len(series) * 0.001))  # 0.1% smoothing window
            smoothed = series.rolling(
                window=window_size, center=True, min_periods=1
            ).mean()

            min_pos_index = smoothed.idxmin()
            min_val = smoothed.loc[min_pos_index]

            logger.info(
                f"Analyzing curve '{col_name}': min at index {min_pos_index} with value {min_val}"
            )

            # Check if the minimum is at the beginning or end of the series
            is_min_at_boundary = (min_pos_index == smoothed.index[0]) or (
                min_pos_index == smoothed.index[-1]
            )

            if not is_min_at_boundary:
                # Check if it dips from the start and rises to the end.
                # A small tolerance is used to avoid issues with noise.
                series_range = smoothed.max() - smoothed.min()
                # Avoid division by zero or NaN tolerance if series is flat
                if series_range > 1e-9:
                    tolerance = series_range * 0.001  # 0.1% of range as tolerance
                else:
                    tolerance = 0

                start_val = smoothed.iloc[0]
                end_val = smoothed.iloc[-1]

                if start_val > min_val + tolerance and end_val > min_val + tolerance:
                    rises = True

        # Record the analysis result for every curve
        info = job_params.copy()
        info["rises"] = bool(rises)
        all_curves_info.append(info)

        # If the feature is not detected, log it at the ERROR level
        if not rises:
            error_count += 1
            logger.error(
                f"Feature not detected: 'Dip and rise' feature was not found for the curve with parameters {job_params}."
            )

    # Generate a report file with all information unconditionally
    output_filename = kwargs.get("output_filename", "rise_report.json")
    report_path = os.path.join(output_dir, output_filename)

    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(all_curves_info, f, indent=4, ensure_ascii=False)

    if error_count > 0:
        logger.info(
            f"{error_count} curves did not exhibit the expected feature. See report for details: {report_path}"
        )
    else:
        logger.info(
            f"All curves exhibit the expected 'dip and rise' feature. Report generated at: {report_path}"
        )

check_thresholds(results_df, output_dir, rules, **kwargs)

Analyzes simulation results to check if specified columns fall within threshold ranges.

Supports both single tasks (column name as 'var') and parameter sweep tasks (column name as 'var&param=value').

Parameters:

Name Type Description Default
results_df DataFrame

Merged simulation results DataFrame.

required
output_dir str

Directory for saving alert reports.

required
rules List[Dict[str, Any]]

List of rules, where each rule defines columns and their min/max thresholds. Format: [{"columns": ["var1", "var2"], "min": value, "max": value}, ...]

required
**kwargs

Additional parameters from configuration, such as 'output_filename'.

{}
Note

Logs ERROR for each threshold violation with peak/dip values. Generates alarm_report.json with parsed parameter information and 'has_alarm' flags. For columns matching 'base_col_name&param=value', extracts parameters into separate fields in the report. Reports total alarm count in logs.

Source code in tricys/postprocess/static_alarm.py
def check_thresholds(
    results_df: pd.DataFrame, output_dir: str, rules: List[Dict[str, Any]], **kwargs
) -> None:
    """Analyzes simulation results to check if specified columns fall within threshold ranges.

    Supports both single tasks (column name as 'var') and parameter sweep tasks
    (column name as 'var&param=value').

    Args:
        results_df: Merged simulation results DataFrame.
        output_dir: Directory for saving alert reports.
        rules: List of rules, where each rule defines columns and their min/max thresholds.
            Format: [{"columns": ["var1", "var2"], "min": value, "max": value}, ...]
        **kwargs: Additional parameters from configuration, such as 'output_filename'.

    Note:
        Logs ERROR for each threshold violation with peak/dip values. Generates
        alarm_report.json with parsed parameter information and 'has_alarm' flags.
        For columns matching 'base_col_name&param=value', extracts parameters into
        separate fields in the report. Reports total alarm count in logs.
    """
    logger.info("Starting post-processing: Checking thresholds...")

    # Use a dictionary to track the alarm status of each checked column
    checked_columns_status = {}

    for i, rule in enumerate(rules):
        min_val = rule.get("min")
        max_val = rule.get("max")
        columns_to_check = rule.get("columns", [])

        if not columns_to_check:
            logger.warning(f"Rule {i+1} does not specify 'columns', skipping.")
            continue

        # Iterate over each base column name specified in the rule
        for base_col_name in columns_to_check:
            # Iterate over all actual column names in the DataFrame to find matches
            for df_col_name in results_df.columns:
                if df_col_name == base_col_name or df_col_name.startswith(
                    base_col_name + "&"
                ):

                    # Initialize status for this column if it's the first time being checked
                    if df_col_name not in checked_columns_status:
                        checked_columns_status[df_col_name] = (
                            False  # Default to no alarm
                        )

                    # Check for values exceeding the maximum threshold
                    if max_val is not None:
                        exceeded_max = results_df[results_df[df_col_name] > max_val]
                        if not exceeded_max.empty:
                            peak_value = exceeded_max[df_col_name].max()
                            logger.error(
                                f"ALARM: Column '{df_col_name}' exceeds maximum threshold (Threshold: {max_val}, Value: {peak_value})"
                            )
                            checked_columns_status[df_col_name] = True

                    # Check for values falling below the minimum threshold
                    if min_val is not None:
                        exceeded_min = results_df[results_df[df_col_name] < min_val]
                        if not exceeded_min.empty:
                            dip_value = exceeded_min[df_col_name].min()
                            logger.error(
                                f"ALARM: Column '{df_col_name}' is below minimum threshold (Threshold: {min_val}, Value: {dip_value})"
                            )
                            checked_columns_status[df_col_name] = True

    # Convert to the final report format, parsing column names to include parameters
    final_report = []
    for col, status in checked_columns_status.items():
        try:
            report_item = {}
            parts = col.split("&")

            # For single runs, the column name may not contain '&'
            if len(parts) == 1:
                report_item["variable"] = parts[0]
            else:
                variable_name = parts[0]
                param_parts = parts[1:]
                report_item = dict(item.split("=") for item in param_parts)
                report_item["variable"] = variable_name

            report_item["has_alarm"] = status
            final_report.append(report_item)

        except (ValueError, IndexError):
            logger.warning(
                f"Could not parse column name '{col}' for the report, using the original name as a fallback."
            )
            # Fallback to the old format if parsing fails
            final_report.append({"column": col, "has_alarm": status})

    output_filename = kwargs.get("output_filename", "alarm_report.json")
    report_path = os.path.join(output_dir, output_filename)
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(final_report, f, indent=4, ensure_ascii=False)

    total_alarms = sum(1 for entry in final_report if entry["has_alarm"])
    if total_alarms > 0:
        logger.info(
            f"{total_alarms} columns with alarms were found. See logs for details. Report generated at: {report_path}"
        )
    else:
        logger.info(
            f"Threshold check complete. All checked columns are within their thresholds. Report generated at: {report_path}"
        )