Source code for impedance_agent.cli.main

# impedance_agent/cli/main.py
# Standard library imports
import asyncio
import asyncio.exceptions
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Optional

# Third-party imports
import typer

# Matplotlib imports (with config)
import matplotlib

matplotlib.use("Agg")  # Use non-interactive backend
import matplotlib.pyplot as plt

# Local imports
from ..agent.analysis import ImpedanceAnalysisAgent
from ..core.config import Config
from ..core.env import env
from ..core.exceptions import ExportError
from ..core.exporters import ResultExporter
from ..core.loaders import ImpedanceLoader
from ..core.logging import setup_logging
from ..core.plotting import PlotManager


app = typer.Typer()


[docs] async def run_async_tasks( result, output_path, output_format, plot, plot_dir, plot_format, show_plots ): """Run export and plotting tasks concurrently""" try: # Set up and execute tasks if output_path: # Execute both export tasks concurrently await asyncio.gather( ResultExporter.export_async(result, output_path, output_format), ResultExporter.export_async( result, Path(output_path).parent / "analysis_summary.md", "md" ), ) if plot and plot_dir: try: await asyncio.to_thread( PlotManager.create_plots, result=result, output_dir=plot_dir, file_format=plot_format, show=show_plots, dpi=300, ) except ExportError as e: logging.error(f"Plot export failed: {e}") except Exception as e: logging.error(f"Unexpected error during plotting: {e}") except ExportError as e: logging.error(f"Export failed: {e}") except Exception as e: logging.error(f"Error in async tasks: {str(e)}")
[docs] @app.command() def analyze( data_path: str = typer.Argument(..., help="Path to impedance data file"), provider: str = typer.Option("deepseek", help="LLM provider (deepseek/openai)"), ecm: Optional[str] = typer.Option( None, help="Path to the equivalent circuit model(ECM) configuration file" ), output_path: Optional[str] = typer.Option(None, help="Path for output files"), output_format: str = typer.Option("json", help="Output format (json/csv/excel)"), plot_format: str = typer.Option("png", help="Plot format (png/pdf/svg)"), plot: bool = typer.Option(True, help="Generate plots"), show_plots: bool = typer.Option(False, help="Display plots in window"), log_level: str = typer.Option(env.log_level, help="Logging level"), debug: bool = typer.Option(False, help="Enable debug mode"), workers: int = typer.Option(None, help="Number of worker processes"), ): """Analyze impedance data using ECM and/or DRT analysis with parallel processing""" loop = None try: # Setup logging level = "DEBUG" if debug else log_level setup_logging(level) logger = logging.getLogger(__name__) # Validate provider available_providers = env.get_available_providers() if not available_providers: logger.error( "No LLM providers are properly configured. Please check your .env file." ) raise typer.Exit(1) if provider not in available_providers: logger.error( f"Provider '{provider}' not available. Available: {', '.join(available_providers)}" ) raise typer.Exit(1) # Early validation of plot settings if plot and not output_path: logger.warning( "Plot generation requires an output path. Creating default 'results' directory." ) output_path = "results/analysis.json" # Create output directories if needed plot_dir = None if output_path: output_dir = Path(output_path).parent output_dir.mkdir(parents=True, exist_ok=True) if plot: plot_dir = output_dir / "plots" plot_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Plot files will be saved to: {plot_dir}") if debug: logger.debug(f"Loading file: {data_path}") logger.debug(f"Using provider: {provider}") if ecm: logger.debug(f"Using equivalent circuit model config: {ecm}") # Determine optimal number of workers if not specified if workers is None: workers = min(os.cpu_count(), 4) # Use up to 4 cores by default # Load data and config concurrently async def load_data_and_config(): with ThreadPoolExecutor(max_workers=2) as pool: loop = asyncio.get_running_loop() data_future = loop.run_in_executor( pool, ImpedanceLoader.load, data_path ) if ecm: config_future = loop.run_in_executor(pool, Config.load_model, ecm) data, ecm_cfg = await asyncio.gather(data_future, config_future) else: data = await data_future ecm_cfg = None return data, ecm_cfg # Set up event loop with policy if os.name == "posix": policy = asyncio.get_event_loop_policy() policy.set_event_loop(policy.new_event_loop()) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Load data data, ecm_cfg = loop.run_until_complete(load_data_and_config()) # Run analysis with specified provider logger.info(f"Starting impedance analysis using {provider}") agent = ImpedanceAnalysisAgent(provider=provider) result = agent.analyze(data, ecm_cfg) # Run export and plotting concurrently if output_path or (plot and plot_dir): loop.run_until_complete( run_async_tasks( result, output_path, output_format, plot, plot_dir, plot_format, show_plots, ) ) # Handle plot display if requested if show_plots and not matplotlib.is_interactive(): plt.ion() plt.show(block=False) plt.pause(0.1) # Give time for display # Print summary to console typer.echo("\nAnalysis Summary:") typer.echo(result.summary) except asyncio.CancelledError: logger.error("Async operation was cancelled") raise typer.Exit(1) except Exception as e: logger.error(f"Analysis failed: {str(e)}", exc_info=True) raise typer.Exit(1) finally: # Cleanup if loop and loop.is_running(): try: pending = asyncio.all_tasks(loop) for task in pending: task.cancel() loop.run_until_complete( asyncio.gather(*pending, return_exceptions=True) ) except Exception as e: logger.error(f"Error during task cleanup: {str(e)}") # Close plots and loop plt.close("all") if loop and not loop.is_closed(): loop.close()
[docs] @app.command() def list_providers(): """List available LLM providers""" providers = env.get_available_providers() if providers: typer.echo("Available LLM providers:") for provider in providers: config = env.get_provider_config(provider) typer.echo(f" - {provider} (model: {config.model})") else: typer.echo("No LLM providers are configured. Please check your .env file.")
[docs] @app.command() def version(): """Show the version of the impedance agent""" typer.echo("Impedance Agent v0.1.0")
[docs] def main(): """Main entry point for the CLI""" app()
if __name__ == "__main__": main()