Skip to content

API Reference - Post-processing Modules

Post-processing Modules

The Post-processing modules provide analysis and reporting functions that are automatically executed after a simulation run. Please select a specific module of interest from the tabs below.

This module provides functions for plotting simulation results.

baseline_analysis(results_file_path, output_dir, **kwargs)

Generates baseline analysis plots and reports from a unified HDF5 results file.

Source code in tricys/postprocess/baseline_analysis.py
def baseline_analysis(results_file_path: str, output_dir: str, **kwargs) -> None:
    """Generates baseline analysis plots and reports from a unified HDF5 results file."""
    if not os.path.exists(results_file_path):
        logger.error(f"HDF5 file not found: {results_file_path}")
        return

    try:
        if "glossary_path" in kwargs:
            load_glossary(kwargs["glossary_path"])

        report_dir = Path(output_dir).parent / "report"
        os.makedirs(report_dir, exist_ok=True)

        stream_data = _collect_baseline_stream_data(results_file_path, **kwargs)
        if not stream_data:
            logger.warning("No data found to analyze for baseline analysis.")
            return

        plot_kwargs = kwargs.copy()
        plot_kwargs["color_map"] = stream_data["color_map"]
        plot_kwargs["turning_label"] = stream_data["turning_label"]

        _plot_time_series_with_zoom_from_hdf5(
            results_file_path,
            stream_data["jobs_df"],
            stream_data["result_columns"],
            str(report_dir),
            **plot_kwargs,
        )
        _plot_final_values_bar_chart_from_series(
            stream_data["final_values"], str(report_dir), **plot_kwargs
        )

        base_report_path, base_report_content = (
            _generate_postprocess_report_from_stream_data(
                stream_data, str(report_dir), **kwargs
            )
        )

        if base_report_path and kwargs.get("ai", False):
            env = get_llm_env({"llm_env": kwargs.get("llm_env")})
            api_key = env.get("API_KEY")
            base_url = env.get("BASE_URL")
            ai_models_str = env.get("AI_MODELS") or env.get("AI_MODEL")

            if not api_key or not base_url or not ai_models_str:
                logger.warning(
                    "API_KEY, BASE_URL, or AI_MODELS/AI_MODEL not found in environment variables. Skipping LLM analysis."
                )
                return

            ai_models = [model.strip() for model in ai_models_str.split(",")]

            for ai_model in ai_models:
                logger.info(f"Generating AI analysis for model: {ai_model}")
                sanitized_model_name = "".join(
                    c for c in ai_model if c.isalnum() or c in ("-", "_")
                ).rstrip()
                model_report_filename = (
                    f"analysis_report_baseline_condition_{sanitized_model_name}.md"
                )
                model_report_path = os.path.join(report_dir, model_report_filename)

                with open(model_report_path, "w", encoding="utf-8") as file_obj:
                    file_obj.write(base_report_content)

                llm_analysis = _call_openai_for_postprocess_analysis(
                    api_key=api_key,
                    base_url=base_url,
                    ai_model=ai_model,
                    report_content=base_report_content,
                    **kwargs,
                )

                if llm_analysis:
                    with open(model_report_path, "a", encoding="utf-8") as file_obj:
                        file_obj.write(
                            f"\n\n---\n\n# AIๆจกๅž‹ๅˆ†ๆžๆ็คบ่ฏ ({ai_model})\n\n"
                        )
                        file_obj.write("```markdown\n")
                        file_obj.write(llm_analysis)
                        file_obj.write("\n```\n")
                    logger.info(
                        f"Appended LLM analysis for model {ai_model} to {model_report_path}"
                    )

                    academic_kwargs = kwargs.copy()
                    academic_kwargs["report_filename"] = model_report_filename
                    generate_academic_report(
                        str(report_dir), ai_model=ai_model, **academic_kwargs
                    )
    except Exception as e:
        logger.error(f"Failed to run HDF5 baseline analysis: {e}", exc_info=True)

generate_academic_report(output_dir, ai_model, **kwargs)

Generates a professional academic analysis summary by sending the existing report and a glossary of terms to an LLM.

Source code in tricys/postprocess/baseline_analysis.py
def generate_academic_report(output_dir: str, ai_model: str, **kwargs) -> None:
    """
    Generates a professional academic analysis summary by sending the existing report
    and a glossary of terms to an LLM.
    """
    try:
        logger.info(
            f"Starting generation of the academic analysis summary for model {ai_model}."
        )

        # 1. Read the existing report
        report_filename = kwargs.get(
            "report_filename", "baseline_condition_analysis_report.md"
        )
        report_path = os.path.join(output_dir, report_filename)
        if not os.path.exists(report_path):
            logger.error(
                f"Cannot generate academic summary: Original report '{report_path}' not found."
            )
            return
        with open(report_path, "r", encoding="utf-8") as f:
            original_report_content = f.read()

        # 2. Read the glossary
        glossary_path = kwargs.get("glossary_path", "sheets.csv")
        if not os.path.exists(glossary_path):
            logger.error(
                f"Cannot generate academic summary: Glossary file '{glossary_path}' not found."
            )
            return
        with open(glossary_path, "r", encoding="utf-8") as f:
            glossary_content = f.read()

        # 3. Check for API credentials
        env = get_llm_env({"llm_env": kwargs.get("llm_env")})
        api_key = env.get("API_KEY")
        base_url = env.get("BASE_URL")

        if not all([api_key, base_url, ai_model]):
            logger.warning(
                "API_KEY, BASE_URL, or AI_MODEL not found. Skipping academic summary generation."
            )
            return

        # 4. Construct the prompt
        role_prompt = """**่ง’่‰ฒ๏ผš** ๆ‚จๆ˜ฏไธ€ไฝๅœจๆ ธ่šๅ˜ๅทฅ็จ‹๏ผŒ็‰นๅˆซๆ˜ฏๆฐš็‡ƒๆ–™ๅพช็Žฏ้ข†ๅŸŸ๏ผŒๅ…ทๆœ‰ๆทฑๅŽšๅญฆๆœฏ่ƒŒๆ™ฏ็š„่ต„ๆทฑ็ง‘ๅญฆๅฎถใ€‚

**ไปปๅŠก๏ผš** ๆ‚จๆ”ถๅˆฐไบ†็”ฑ็จ‹ๅบ่‡ชๅŠจ็”Ÿๆˆ็š„ๅˆๆญฅๅˆ†ๆžๆŠฅๅ‘Šๅ’Œไธ€ไปฝไธ“ไธšๆœฏ่ฏญ่กจใ€‚่ฏทๆ‚จๅŸบไบŽ่ฟ™ไธคไปฝๆ–‡ไปถ๏ผŒๆ’ฐๅ†™ไธ€ไปฝๆ›ดๅŠ ไธ“ไธšใ€ๆญฃๅผใ€็ฌฆๅˆๅญฆๆœฏๅ‘่กจๆ ‡ๅ‡†็š„ๆทฑๅบฆๅˆ†ๆžๆ€ป็ป“ๆŠฅๅ‘Šใ€‚
"""

        # Find all plots to instruct the LLM to include them
        all_plots = [f for f in os.listdir(output_dir) if f.endswith((".svg", ".png"))]
        plot_list_str = "\n".join([f"    *   `{plot}`" for plot in all_plots])
        instructions_prompt = f"""**ๆŒ‡ไปค๏ผš**

1.  **ไธ“ไธšๅŒ–่ฏญ่จ€๏ผš** ๅฐ†ๅˆๆญฅๆŠฅๅ‘Šไธญ็š„ๆจกๅž‹ๅ‚ๆ•ฐ/็ผฉๅ†™๏ผˆไพ‹ๅฆ‚ `sds.I[1]`, `detailed_var`๏ผ‰ๆ›ฟๆขไธบๆœฏ่ฏญ่กจไธญๅฏนๅบ”็š„โ€œไธญๆ–‡็ฟป่ฏ‘โ€ๆˆ–โ€œ่‹ฑๆ–‡ๆœฏ่ฏญโ€ใ€‚ไพ‹ๅฆ‚๏ผŒๅบ”ๅฐ†โ€œ`sds`็š„ๅบ“ๅญ˜โ€่กจ่ฟฐไธบโ€œๅ‚จๅญ˜ไธŽ่พ“้€็ณป็ปŸ (SDS) ็š„ๆฐšๅบ“ๅญ˜้‡ (Tritium Inventory)โ€ใ€‚
2.  **ๅญฆๆœฏๅŒ–้‡่ฟฐ๏ผš** ็”จไธฅ่ฐจใ€ๅฎข่ง‚็š„ๅญฆๆœฏ่ฏญ่จ€้‡ๆ–ฐ็ป„็ป‡ๅ’Œ้˜่ฟฐๅˆๆญฅๆŠฅๅ‘Šไธญ็š„ๅ‘็Žฐใ€‚้ฟๅ…ไฝฟ็”จโ€œ็œ‹่ตทๆฅโ€ใ€โ€œๅฅฝๅƒโ€็ญ‰ๆจก็ณŠ่ฏๆฑ‡ใ€‚
3.  **ๅ›พ่กจๅ’Œ่กจๆ ผ็š„ๅ‘ˆ็ŽฐไธŽๅผ•็”จ๏ผš**
    *   **ๆ˜พ็คบๅ›พ่กจ๏ผš** ๅœจๆŠฅๅ‘Š็š„โ€œ็ป“ๆžœไธŽ่ฎจ่ฎบโ€้ƒจๅˆ†๏ผŒๆ‚จ**ๅฟ…้กป**ไฝฟ็”จMarkdown่ฏญๆณ• `![ๅ›พ่กจๆ ‡้ข˜](ๅ›พ่กจๆ–‡ไปถๅ)` ๆฅ**็›ดๆŽฅๅตŒๅ…ฅ**ๅ’Œๆ˜พ็คบๅˆๆญฅๆŠฅๅ‘ŠไธญๅŒ…ๅซ็š„ๆ‰€ๆœ‰ๅ›พ่กจใ€‚ๅฏ็”จ็š„ๅ›พ่กจๆ–‡ไปถๅฆ‚ไธ‹๏ผš
{plot_list_str}
    *   **ๅผ•็”จๅ›พ่กจ๏ผš** ๅœจๆญฃๆ–‡ไธญๅˆ†ๆžๅ’Œ่ฎจ่ฎบๅ›พ่กจๅ†…ๅฎนๆ—ถ๏ผŒ่ฏทไฝฟ็”จโ€œๅฆ‚ๅ›พ1ๆ‰€็คบ...โ€็ญ‰ๆ–นๅผๅฏนๅ›พ่กจ่ฟ›่กŒ็ผ–ๅทๅ’Œๆ–‡ๅญ—ๅผ•็”จใ€‚
    *   **ๆ˜พ็คบ่กจๆ ผ๏ผš** ๅฝ“ๅ‘ˆ็Žฐๆ•ฐๆฎๆ—ถ๏ผˆไพ‹ๅฆ‚๏ผŒๅ…ณ้”ฎ้˜ถๆฎต็š„ๆŠฝๆ ทๆ•ฐๆฎๆˆ–ๆœ€็ปˆๅ€ผ๏ผ‰๏ผŒๆ‚จ**ๅฟ…้กป**ไฝฟ็”จMarkdown็š„็ฎก้“่กจๆ ผ๏ผˆpipe-table๏ผ‰ๆ ผๅผๆฅๆธ…ๆ™ฐๅœฐๅฑ•็คบๅฎƒไปฌใ€‚ๆ‚จๅฏไปฅ็›ดๆŽฅๅค็”จๆˆ–้‡ๆ–ฐๆ ผๅผๅŒ–ๅˆๆญฅๆŠฅๅ‘Šไธญ็š„ๆ•ฐๆฎ่กจๆ ผใ€‚
4.  **็ป“ๆž„ๅŒ–ๆŠฅๅ‘Š๏ผš** ๆ‚จ็š„ๆŠฅๅ‘Šๆ˜ฏๅ…ณไบŽไธ€ไธช**ๅŸบๅ‡†ๅทฅๅ†ต๏ผˆBaseline Operating Condition๏ผ‰**็š„ๆจกๆ‹Ÿๅˆ†ๆžใ€‚ๆŠฅๅ‘Šๅบ”ๅŒ…ๅซไปฅไธ‹้ƒจๅˆ†๏ผš
    *   **ๆ‘˜่ฆ (Abstract):** ็ฎ€่ฆๆฆ‚ๆ‹ฌๆœฌๆฌก**ๅŸบๅ‡†ๅทฅๅ†ต**ๆจกๆ‹Ÿ็š„็›ฎ็š„ใ€ๅ…ณ้”ฎๅ‘็Žฐๅ’Œๆ ธๅฟƒ็ป“่ฎบใ€‚
    *   **ๅผ•่จ€ (Introduction):** ๆ่ฟฐ**ๅŸบๅ‡†ๅทฅๅ†ต**ๆจกๆ‹Ÿ็š„่ƒŒๆ™ฏๅ’Œ็›ฎๆ ‡๏ผŒๆๅŠๅ…ณ้”ฎ็š„่พ“ๅ…ฅๅ‚ๆ•ฐใ€‚
    *   **็ป“ๆžœไธŽ่ฎจ่ฎบ (Results and Discussion):** ่ฟ™ๆ˜ฏๆŠฅๅ‘Š็š„ๆ ธๅฟƒใ€‚ๅˆ†็‚น่ฏฆ็ป†่ฎบ่ฟฐ๏ผš
        *   ๅ…ณ้”ฎๆ€ง่ƒฝๆŒ‡ๆ ‡๏ผˆๅฆ‚ๆฐš่‡ชๆŒๆ—ถ้—ดใ€ๅ€ๅขžๆ—ถ้—ด็ญ‰๏ผŒๅฆ‚ๆžœๆ•ฐๆฎๅฏ็”จ๏ผ‰็š„ๆ€ปไฝ“่ถ‹ๅŠฟใ€‚
        *   ๅฏนๅ…ณ้”ฎ่ฝฌๆŠ˜็‚น๏ผˆไพ‹ๅฆ‚ๆฐšๅบ“ๅญ˜็š„ๆœ€ไฝŽ็‚น๏ผ‰็š„็‰ฉ็†ๆ„ไน‰่ฟ›่กŒๆทฑๅ…ฅๅˆ†ๆžใ€‚
        *   ่ฏ„ไผฐ็ณป็ปŸๅœจๆจกๆ‹Ÿ็ป“ๆŸๆ—ถ็š„ๆœ€็ปˆ็Šถๆ€๏ผŒๅนถ่ฎจ่ฎบๆฐšๅœจๅ„ๅญ็ณป็ปŸไธญ็š„ๅˆ†ๅธƒๆƒ…ๅ†ตใ€‚
    *   **็ป“่ฎบ (Conclusion):** ๆ€ป็ป“ๆœฌๆฌกๆจกๆ‹Ÿ็ ”็ฉถๅพ—ๅ‡บ็š„ไธป่ฆๅญฆๆœฏ็ป“่ฎบใ€‚
5.  **่พ“ๅ‡บๆ ผๅผ๏ผš** ่ฏท็›ดๆŽฅ่พ“ๅ‡บๅฎŒๆ•ด็š„ๅญฆๆœฏๅˆ†ๆžๆŠฅๅ‘Šๆญฃๆ–‡๏ผŒ็กฎไฟๆ‰€ๆœ‰ๅ†…ๅฎน๏ผˆๅŒ…ๆ‹ฌๅ›พ่กจๅ’Œ่กจๆ ผ๏ผ‰้ƒฝ้ตๅพชๆญฃ็กฎ็š„Markdown่ฏญๆณ•ใ€‚

**่พ“ๅ…ฅๆ–‡ไปถ๏ผš**
"""

        analysis_prompt = f"""
---
### 1. ๅˆๆญฅๅˆ†ๆžๆŠฅๅ‘Š (`baseline_condition_analysis_report.md`)
---
{original_report_content}

---
### 2. ไธ“ไธšๆœฏ่ฏญ่กจ (`sheets.csv`)
---
{glossary_content}
"""

        # 5. Call the API
        max_retries = 3
        for attempt in range(max_retries):
            try:
                client = openai.OpenAI(api_key=api_key, base_url=base_url)
                logger.info(
                    f"Sending request to OpenAI API for academic summary for model {ai_model} (Attempt {attempt + 1}/{max_retries})..."
                )

                full_text_prompt = "\n\n".join(
                    [role_prompt, instructions_prompt, analysis_prompt]
                )

                response = client.chat.completions.create(
                    model=ai_model,
                    messages=[{"role": "user", "content": full_text_prompt}],
                    max_tokens=4000,
                )
                academic_summary = response.choices[0].message.content

                # 6. Save the result
                sanitized_model_name = "".join(
                    c for c in ai_model if c.isalnum() or c in ("-", "_")
                ).rstrip()
                summary_filename = (
                    f"academic_analysis_summary_{sanitized_model_name}.md"
                )
                summary_path = os.path.join(output_dir, summary_filename)
                with open(summary_path, "w", encoding="utf-8") as f:
                    f.write(academic_summary)

                logger.info(
                    f"Successfully generated academic analysis summary: {summary_path}"
                )
                return  # Exit after success

            except Exception as e:
                logger.error(
                    f"Error calling OpenAI API for academic summary on attempt {attempt + 1}: {e}"
                )
                if attempt < max_retries - 1:
                    time.sleep(5)
                else:
                    logger.error(
                        f"Failed to generate academic summary for {ai_model} after {max_retries} attempts."
                    )
                    return  # Exit after all retries failed

    except Exception as e:
        logger.error(
            f"Error in generate_academic_report for model {ai_model}: {e}",
            exc_info=True,
        )

load_glossary(glossary_path)

Loads glossary data from the specified CSV path into global dictionaries.

Parameters:

Name Type Description Default
glossary_path str

Path to the glossary CSV file.

required
Note

Expected columns: "ๆจกๅž‹ๅ‚ๆ•ฐ (Model Parameter)", "่‹ฑๆ–‡ๆœฏ่ฏญ (English Term)", "ไธญๆ–‡็ฟป่ฏ‘ (Chinese Translation)". Clears existing glossaries on error. Updates global _english_glossary_map and _chinese_glossary_map.

Source code in tricys/postprocess/baseline_analysis.py
def load_glossary(glossary_path: str) -> None:
    """Loads glossary data from the specified CSV path into global dictionaries.

    Args:
        glossary_path: Path to the glossary CSV file.

    Note:
        Expected columns: "ๆจกๅž‹ๅ‚ๆ•ฐ (Model Parameter)", "่‹ฑๆ–‡ๆœฏ่ฏญ (English Term)",
        "ไธญๆ–‡็ฟป่ฏ‘ (Chinese Translation)". Clears existing glossaries on error.
        Updates global _english_glossary_map and _chinese_glossary_map.
    """
    global _english_glossary_map, _chinese_glossary_map

    if not glossary_path or not os.path.exists(glossary_path):
        logger.warning(
            f"Glossary file not found at {glossary_path}. No labels will be loaded."
        )
        _english_glossary_map = {}
        _chinese_glossary_map = {}
        return

    try:
        df = pd.read_csv(glossary_path)
        if (
            "ๆจกๅž‹ๅ‚ๆ•ฐ (Model Parameter)" in df.columns
            and "่‹ฑๆ–‡ๆœฏ่ฏญ (English Term)" in df.columns
            and "ไธญๆ–‡็ฟป่ฏ‘ (Chinese Translation)" in df.columns
        ):
            df.dropna(subset=["ๆจกๅž‹ๅ‚ๆ•ฐ (Model Parameter)"], inplace=True)
            _english_glossary_map = pd.Series(
                df["่‹ฑๆ–‡ๆœฏ่ฏญ (English Term)"].values,
                index=df["ๆจกๅž‹ๅ‚ๆ•ฐ (Model Parameter)"],
            ).to_dict()
            _chinese_glossary_map = pd.Series(
                df["ไธญๆ–‡็ฟป่ฏ‘ (Chinese Translation)"].values,
                index=df["ๆจกๅž‹ๅ‚ๆ•ฐ (Model Parameter)"],
            ).to_dict()
            logger.info(f"Successfully loaded glossary from {glossary_path}.")
        else:
            logger.warning("Glossary CSV does not contain expected columns.")
            _english_glossary_map = {}
            _chinese_glossary_map = {}
    except Exception as e:
        logger.warning(f"Failed to load or parse glossary file. Error: {e}")
        _english_glossary_map = {}
        _chinese_glossary_map = {}

set_plot_language(lang='en')

Sets the preferred language for plot labels.

Parameters:

Name Type Description Default
lang str

'en' for English (default), 'cn' for Chinese.

'en'
Note

For Chinese, sets font to SimHei and adjusts unicode_minus. For English, restores matplotlib defaults. Changes apply globally to all subsequent plots.

Source code in tricys/postprocess/baseline_analysis.py
def set_plot_language(lang: str = "en") -> None:
    """Sets the preferred language for plot labels.

    Args:
        lang: 'en' for English (default), 'cn' for Chinese.

    Note:
        For Chinese, sets font to SimHei and adjusts unicode_minus. For English,
        restores matplotlib defaults. Changes apply globally to all subsequent plots.
    """
    global _use_chinese_labels
    _use_chinese_labels = lang.lower() == "cn"

    if _use_chinese_labels:
        # To display Chinese characters correctly, specify a list of fallback fonts.
        plt.rcParams["font.sans-serif"] = ["SimHei"]  # ๆ›ฟๆขๆˆไฝ ็”ต่„‘ไธŠๆœ‰็š„ๅญ—ไฝ“
        plt.rcParams["axes.unicode_minus"] = False  # To display minus sign correctly.
        plt.rcParams["font.family"] = "sans-serif"  # ็กฎไฟๅญ—ไฝ“ๅฎถๆ—่ฎพ็ฝฎ็”Ÿๆ•ˆ
    else:
        # Restore default settings
        plt.rcParams["font.sans-serif"] = plt.rcParamsDefault["font.sans-serif"]
        plt.rcParams["axes.unicode_minus"] = plt.rcParamsDefault["axes.unicode_minus"]

analyze_rise_dip(results_file_path, output_dir, **kwargs)

Analyzes HDF5 simulation results to identify curves that fail to exhibit 'dip and rise' feature.

Parameters:

Name Type Description Default
results_file_path str

Path to the HDF5 file containing 'results' and 'jobs' tables.

required
output_dir str

The directory to save the analysis report.

required
**kwargs

Additional parameters.

{}
Source code in tricys/postprocess/rise_analysis.py
def analyze_rise_dip(results_file_path: str, output_dir: str, **kwargs) -> None:
    """Analyzes HDF5 simulation results to identify curves that fail to exhibit 'dip and rise' feature.

    Args:
        results_file_path: Path to the HDF5 file containing 'results' and 'jobs' tables.
        output_dir: The directory to save the analysis report.
        **kwargs: Additional parameters.
    """
    logger.info("Starting HDF5 post-processing: Analyzing curve rise/dip features...")
    all_curves_info = []
    error_count = 0

    if not os.path.exists(results_file_path):
        logger.error(f"Results file not found: {results_file_path}")
        return

    try:
        with pd.HDFStore(results_file_path, mode="r") as store:
            if f"/{RESULTS_KEY}" not in store.keys():
                logger.error("HDF5 file missing 'results' table.")
                return

            try:
                jobs_key = get_jobs_key(store)
            except KeyError:
                logger.error("HDF5 file missing 'jobs' table.")
                return

            jobs_df = store.select(jobs_key)
            jobs_map = jobs_df.set_index("job_id").to_dict(orient="index")
            job_ids = sorted(jobs_map.keys())

            def check_curve(series, job_params, var_name):
                rises = False
                if len(series) > 2:
                    window_size = max(1, int(len(series) * 0.001))
                    smoothed = series.rolling(
                        window=window_size, center=True, min_periods=1
                    ).mean()

                    min_pos_index = smoothed.idxmin()
                    min_val = smoothed.loc[min_pos_index]
                    is_min_at_boundary = (min_pos_index == smoothed.index[0]) or (
                        min_pos_index == smoothed.index[-1]
                    )

                    if not is_min_at_boundary:
                        series_range = smoothed.max() - smoothed.min()
                        tolerance = series_range * 0.001 if series_range > 1e-9 else 0
                        start_val = smoothed.iloc[0]
                        end_val = smoothed.iloc[-1]

                        if (
                            start_val > min_val + tolerance
                            and end_val > min_val + tolerance
                        ):
                            rises = True

                info = job_params.copy()
                info["variable"] = var_name
                info["rises"] = rises
                return info, rises

            batch_size = 100
            total_jobs = len(job_ids)

            for i in range(0, total_jobs, batch_size):
                batch_ids = job_ids[i : i + batch_size]
                min_id = min(batch_ids)
                max_id = max(batch_ids)

                try:
                    res_batch = store.select(
                        RESULTS_KEY,
                        where=f"job_id >= {min_id} & job_id <= {max_id}",
                    )
                except Exception as e:
                    logger.warning(f"Failed to load batch {min_id}-{max_id}: {e}")
                    continue

                grouped = res_batch.groupby("job_id")

                for j_id, group in grouped:
                    if j_id not in jobs_map:
                        continue

                    params = jobs_map[j_id]

                    for col in group.columns:
                        if col in ["time", "job_id"]:
                            continue

                        info, rises = check_curve(
                            group[col].reset_index(drop=True), params, col
                        )
                        all_curves_info.append(info)

                        if not rises:
                            error_count += 1
                            logger.error(
                                f"Feature not detected for Job {j_id}, Var '{col}' (Params: {params})"
                            )

    except Exception as e:
        logger.error(f"HDF5 processing failed: {e}", exc_info=True)

    # Generate a report file with all information unconditionally
    output_filename = kwargs.get("output_filename", "rise_report.json")
    report_path = os.path.join(output_dir, output_filename)

    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(all_curves_info, f, indent=4, ensure_ascii=False)

    if error_count > 0:
        logger.info(f"{error_count} curves failed checks. Report: {report_path}")
    else:
        logger.info(f"All curves passed. Report: {report_path}")

check_thresholds(results_file_path, output_dir, rules, **kwargs)

Analyzes HDF5 simulation results to check if specified columns fall within threshold ranges.

Parameters:

Name Type Description Default
results_file_path str

Path to HDF5 results.

required
output_dir str

Directory for saving alert reports.

required
rules List[Dict[str, Any]]

List of rules.

required
**kwargs

Additional parameters.

{}
Source code in tricys/postprocess/static_alarm.py
def check_thresholds(
    results_file_path: str, output_dir: str, rules: List[Dict[str, Any]], **kwargs
) -> None:
    """Analyzes HDF5 simulation results to check if specified columns fall within threshold ranges.

    Args:
        results_file_path: Path to HDF5 results.
        output_dir: Directory for saving alert reports.
        rules: List of rules.
        **kwargs: Additional parameters.
    """
    logger.info("Starting HDF5 post-processing: Checking thresholds...")

    final_report = []
    total_alarms = 0
    report_only_alarms = kwargs.get("report_only_alarms", False)

    if not os.path.exists(results_file_path):
        logger.error(f"Results file not found: {results_file_path}")
        return

    try:
        with pd.HDFStore(results_file_path, mode="r") as store:
            if f"/{RESULTS_KEY}" not in store.keys():
                return

            try:
                jobs_key = get_jobs_key(store)
            except KeyError:
                return

            jobs_df = store.select(jobs_key)
            jobs_map = jobs_df.set_index("job_id").to_dict(orient="index")
            available_vars = store.get_storer(RESULTS_KEY).table.colnames

            for rule in rules:
                min_val = rule.get("min")
                max_val = rule.get("max")
                columns_to_check = rule.get("columns", [])

                for col in columns_to_check:
                    if col not in available_vars:
                        continue

                    alarm_job_ids = set()

                    if max_val is not None:
                        try:
                            res = store.select(
                                RESULTS_KEY,
                                where=f"{col} > {max_val}",
                                columns=["job_id", col],
                            )
                            if not res.empty:
                                ids = res["job_id"].unique()
                                alarm_job_ids.update(ids)
                                for j_id in ids:
                                    peak = res[res["job_id"] == j_id][col].max()
                                    logger.error(
                                        f"ALARM: Job {j_id}, Var '{col}' > {max_val} (Peak: {peak})"
                                    )
                        except Exception as e:
                            logger.error(f"Query failed for {col} > {max_val}: {e}")

                    if min_val is not None:
                        try:
                            res = store.select(
                                RESULTS_KEY,
                                where=f"{col} < {min_val}",
                                columns=["job_id", col],
                            )
                            if not res.empty:
                                ids = res["job_id"].unique()
                                alarm_job_ids.update(ids)
                                for j_id in ids:
                                    dip = res[res["job_id"] == j_id][col].min()
                                    logger.error(
                                        f"ALARM: Job {j_id}, Var '{col}' < {min_val} (Dip: {dip})"
                                    )
                        except Exception as e:
                            logger.error(f"Query failed for {col} < {min_val}: {e}")

                    target_job_ids = (
                        alarm_job_ids if report_only_alarms else jobs_map.keys()
                    )

                    for j_id in target_job_ids:
                        if j_id in jobs_map:
                            has_alarm = j_id in alarm_job_ids
                            item = jobs_map[j_id].copy()
                            item["variable"] = col
                            item["has_alarm"] = has_alarm
                            item["job_id"] = int(j_id)
                            final_report.append(item)
                            if has_alarm:
                                total_alarms += 1

    except Exception as e:
        logger.error(f"HDF5 threshold check failed: {e}", exc_info=True)

    output_filename = kwargs.get("output_filename", "alarm_report.json")
    report_path = os.path.join(output_dir, output_filename)
    with open(report_path, "w", encoding="utf-8") as f:
        json.dump(final_report, f, indent=4, ensure_ascii=False)

    if total_alarms > 0:
        logger.info(f"Found {total_alarms} alarms. Report: {report_path}")
    else:
        logger.info(f"No alarms found. Report: {report_path}")