コンテンツにスキップ

AIエージェント本番運用完全ガイド - 2025年8月エンタープライズ対応実践アーキテクチャ

はじめに

AIエージェント開発の理論記事実装ハンズオンガイドに続く第3弾として、本記事では本番環境での安全で効率的なAIエージェント運用に特化した実践的ガイドを提供します。

エンタープライズ環境での実際の運用で直面する課題と、それらを解決する実証済みアーキテクチャパターンを詳しく解説します。

この記事のポイント

  • エンタープライズ対応

    大規模組織での安全なAIエージェント運用アーキテクチャの構築

  • 自動スケーリング

    負荷に応じた動的リソース管理とコスト最適化の自動化

  • セキュリティ強化

    ゼロトラスト原則に基づくAIエージェントのセキュアな運用

  • 包括的監視

    AIエージェントの動作を360度で監視・分析するObservabilityシステム

エンタープライズアーキテクチャ設計

1. 全体アーキテクチャ概要

graph TB
    subgraph "API Gateway Layer"
        AGW[API Gateway] --> LB[Load Balancer]
    end

    subgraph "AI Agent Management Layer"
        LB --> AM[Agent Manager]
        AM --> AR[Agent Registry]
        AM --> AS[Agent Scheduler]
    end

    subgraph "Execution Layer"
        AS --> AE1[Agent Executor 1]
        AS --> AE2[Agent Executor 2]
        AS --> AE3[Agent Executor N]
    end

    subgraph "Data Layer"
        AE1 --> MCP[MCP Servers]
        AE2 --> DB[(Database)]
        AE3 --> FS[(File System)]
    end

    subgraph "Observability Layer"
        AM --> MON[Monitoring]
        AE1 --> LOG[Logging]
        AE2 --> TRC[Tracing]
        AE3 --> MET[Metrics]
    end

    subgraph "Security Layer"
        AGW --> IAM[Identity & Access]
        AM --> VLT[Secrets Vault]
        AE1 --> NET[Network Security]
    end

2. Agent Manager 実装

# infrastructure/agent_manager.py
import asyncio
import json
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis
import kubernetes
from prometheus_client import Counter, Histogram, Gauge

class AgentStatus(Enum):
    IDLE = "idle"
    BUSY = "busy"
    FAILED = "failed"
    MAINTENANCE = "maintenance"

@dataclass
class AgentInstance:
    id: str
    type: str
    status: AgentStatus
    capacity: int
    current_load: int
    created_at: datetime
    last_heartbeat: datetime
    metadata: Dict[str, Any]

class EnterpriseAgentManager:
    """エンタープライズ対応AIエージェントマネージャー"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.redis_client = redis.Redis.from_url(config['redis_url'])
        self.k8s_client = kubernetes.client.ApiClient()

        # Prometheus メトリクス
        self.agent_counter = Counter('ai_agents_total', 'Total AI agents', ['type', 'status'])
        self.task_duration = Histogram('ai_task_duration_seconds', 'Task execution time')
        self.agent_load = Gauge('ai_agent_load', 'Current agent load', ['agent_id'])

        # エージェントレジストリ
        self.agents: Dict[str, AgentInstance] = {}
        self.task_queue = asyncio.Queue()

    async def start(self):
        """マネージャーの開始"""
        print("Starting Enterprise Agent Manager...")

        # バックグラウンドタスクの開始
        tasks = [
            asyncio.create_task(self._health_monitor()),
            asyncio.create_task(self._task_scheduler()),
            asyncio.create_task(self._auto_scaler()),
            asyncio.create_task(self._metrics_collector())
        ]

        await asyncio.gather(*tasks)

    async def register_agent(self, agent_config: Dict[str, Any]) -> str:
        """エージェントの登録"""
        agent_id = str(uuid.uuid4())

        agent = AgentInstance(
            id=agent_id,
            type=agent_config['type'],
            status=AgentStatus.IDLE,
            capacity=agent_config.get('capacity', 10),
            current_load=0,
            created_at=datetime.now(),
            last_heartbeat=datetime.now(),
            metadata=agent_config.get('metadata', {})
        )

        self.agents[agent_id] = agent

        # Redisに永続化
        await self._persist_agent(agent)

        # Kubernetesにエージェントポッドをデプロイ
        await self._deploy_agent_pod(agent)

        # メトリクス更新
        self.agent_counter.labels(
            type=agent.type,
            status=agent.status.value
        ).inc()

        print(f"Agent registered: {agent_id} (type: {agent.type})")
        return agent_id

    async def assign_task(
        self,
        task: Dict[str, Any],
        agent_type: Optional[str] = None
    ) -> Dict[str, Any]:
        """タスクの割り当て"""

        # 最適なエージェントを選択
        agent = await self._select_best_agent(agent_type, task)

        if not agent:
            # エージェントが利用できない場合、自動スケーリング
            agent = await self._auto_scale_agent(agent_type)
            if not agent:
                return {
                    "success": False,
                    "error": "No agents available",
                    "retry_after": 30
                }

        # タスクの実行
        task_id = str(uuid.uuid4())
        execution_result = await self._execute_task(agent, task_id, task)

        return {
            "success": True,
            "task_id": task_id,
            "agent_id": agent.id,
            "result": execution_result
        }

    async def _select_best_agent(
        self,
        agent_type: Optional[str],
        task: Dict[str, Any]
    ) -> Optional[AgentInstance]:
        """最適なエージェントを選択"""

        available_agents = [
            agent for agent in self.agents.values()
            if (agent.status == AgentStatus.IDLE or
                (agent.status == AgentStatus.BUSY and
                 agent.current_load < agent.capacity))
            and (not agent_type or agent.type == agent_type)
        ]

        if not available_agents:
            return None

        # 負荷に基づいて最適なエージェントを選択
        return min(
            available_agents,
            key=lambda a: a.current_load / a.capacity
        )

    async def _execute_task(
        self,
        agent: AgentInstance,
        task_id: str,
        task: Dict[str, Any]
    ) -> Dict[str, Any]:
        """タスクの実行"""

        start_time = datetime.now()

        try:
            # エージェントの負荷を更新
            agent.current_load += 1
            agent.status = AgentStatus.BUSY if agent.current_load >= agent.capacity else AgentStatus.IDLE

            # メトリクス更新
            self.agent_load.labels(agent_id=agent.id).set(agent.current_load)

            # 実際のタスク実行(エージェントポッドのAPIを呼び出し)
            result = await self._call_agent_api(agent, task)

            # 実行時間の記録
            duration = (datetime.now() - start_time).total_seconds()
            self.task_duration.observe(duration)

            return {
                "status": "completed",
                "result": result,
                "duration": duration
            }

        except Exception as e:
            print(f"Task execution failed: {e}")
            agent.status = AgentStatus.FAILED

            return {
                "status": "failed",
                "error": str(e),
                "duration": (datetime.now() - start_time).total_seconds()
            }

        finally:
            # 負荷を減らす
            agent.current_load = max(0, agent.current_load - 1)
            if agent.current_load == 0:
                agent.status = AgentStatus.IDLE

            self.agent_load.labels(agent_id=agent.id).set(agent.current_load)

    async def _call_agent_api(
        self,
        agent: AgentInstance,
        task: Dict[str, Any]
    ) -> Dict[str, Any]:
        """エージェントAPIの呼び出し"""
        import aiohttp

        agent_url = f"http://agent-{agent.id}:8080/execute"

        async with aiohttp.ClientSession() as session:
            async with session.post(
                agent_url,
                json=task,
                timeout=aiohttp.ClientTimeout(total=300)  # 5分タイムアウト
            ) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    raise Exception(f"Agent API error: {response.status}")

    async def _health_monitor(self):
        """エージェントヘルスモニタリング"""
        while True:
            try:
                current_time = datetime.now()

                for agent in list(self.agents.values()):
                    # ハートビートチェック
                    if (current_time - agent.last_heartbeat).total_seconds() > 60:
                        print(f"Agent {agent.id} is unhealthy, removing...")
                        await self._remove_agent(agent.id)

                await asyncio.sleep(30)  # 30秒間隔でチェック

            except Exception as e:
                print(f"Health monitor error: {e}")
                await asyncio.sleep(60)

    async def _auto_scaler(self):
        """自動スケーリング"""
        while True:
            try:
                # 各エージェントタイプの負荷状況を分析
                agent_types = set(agent.type for agent in self.agents.values())

                for agent_type in agent_types:
                    agents_of_type = [
                        agent for agent in self.agents.values()
                        if agent.type == agent_type
                    ]

                    if not agents_of_type:
                        continue

                    # 平均負荷を計算
                    avg_load = sum(
                        agent.current_load / agent.capacity
                        for agent in agents_of_type
                    ) / len(agents_of_type)

                    # スケーリング判定
                    if avg_load > 0.8:  # 80%以上の負荷
                        print(f"Scaling up {agent_type} agents (load: {avg_load:.2f})")
                        await self._scale_up(agent_type)
                    elif avg_load < 0.2 and len(agents_of_type) > 1:  # 20%未満の負荷
                        print(f"Scaling down {agent_type} agents (load: {avg_load:.2f})")
                        await self._scale_down(agent_type)

                await asyncio.sleep(60)  # 1分間隔でスケーリング判定

            except Exception as e:
                print(f"Auto scaler error: {e}")
                await asyncio.sleep(120)

    async def _scale_up(self, agent_type: str):
        """スケールアップ"""
        agent_config = {
            "type": agent_type,
            "capacity": 10,
            "metadata": {"auto_scaled": True}
        }

        await self.register_agent(agent_config)

    async def _scale_down(self, agent_type: str):
        """スケールダウン"""
        agents_of_type = [
            agent for agent in self.agents.values()
            if agent.type == agent_type and agent.current_load == 0
        ]

        if agents_of_type:
            # 最も古いアイドルエージェントを削除
            oldest_agent = min(agents_of_type, key=lambda a: a.created_at)
            await self._remove_agent(oldest_agent.id)

    async def _deploy_agent_pod(self, agent: AgentInstance):
        """Kubernetesエージェントポッドのデプロイ"""

        pod_spec = {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": f"agent-{agent.id}",
                "labels": {
                    "app": "ai-agent",
                    "agent-type": agent.type,
                    "agent-id": agent.id
                }
            },
            "spec": {
                "containers": [{
                    "name": "agent",
                    "image": f"ai-agent-{agent.type}:latest",
                    "ports": [{"containerPort": 8080}],
                    "env": [
                        {"name": "AGENT_ID", "value": agent.id},
                        {"name": "AGENT_TYPE", "value": agent.type},
                        {"name": "MANAGER_URL", "value": self.config['manager_url']}
                    ],
                    "resources": {
                        "requests": {"cpu": "100m", "memory": "256Mi"},
                        "limits": {"cpu": "500m", "memory": "1Gi"}
                    },
                    "livenessProbe": {
                        "httpGet": {"path": "/health", "port": 8080},
                        "initialDelaySeconds": 30,
                        "periodSeconds": 10
                    }
                }],
                "restartPolicy": "Always"
            }
        }

        # Kubernetes APIを使用してポッドを作成
        v1 = kubernetes.client.CoreV1Api(self.k8s_client)
        try:
            v1.create_namespaced_pod(
                namespace="ai-agents",
                body=pod_spec
            )
            print(f"Pod deployed for agent {agent.id}")
        except Exception as e:
            print(f"Failed to deploy pod for agent {agent.id}: {e}")
            raise

    async def _persist_agent(self, agent: AgentInstance):
        """エージェント情報の永続化"""
        agent_data = asdict(agent)
        agent_data['created_at'] = agent.created_at.isoformat()
        agent_data['last_heartbeat'] = agent.last_heartbeat.isoformat()
        agent_data['status'] = agent.status.value

        self.redis_client.hset(
            "agents",
            agent.id,
            json.dumps(agent_data)
        )

    async def _remove_agent(self, agent_id: str):
        """エージェントの削除"""
        if agent_id in self.agents:
            agent = self.agents[agent_id]

            # Kubernetesポッドを削除
            v1 = kubernetes.client.CoreV1Api(self.k8s_client)
            try:
                v1.delete_namespaced_pod(
                    name=f"agent-{agent_id}",
                    namespace="ai-agents"
                )
            except Exception as e:
                print(f"Failed to delete pod for agent {agent_id}: {e}")

            # メモリとRedisから削除
            del self.agents[agent_id]
            self.redis_client.hdel("agents", agent_id)

            print(f"Agent removed: {agent_id}")

    async def _metrics_collector(self):
        """メトリクス収集"""
        while True:
            try:
                # エージェント状態のメトリクス更新
                for agent in self.agents.values():
                    self.agent_counter.labels(
                        type=agent.type,
                        status=agent.status.value
                    ).inc()

                await asyncio.sleep(15)  # 15秒間隔でメトリクス更新

            except Exception as e:
                print(f"Metrics collector error: {e}")
                await asyncio.sleep(60)

# 使用例
async def start_enterprise_manager():
    """エンタープライズマネージャーの開始"""
    config = {
        "redis_url": "redis://redis-cluster:6379",
        "manager_url": "http://agent-manager:8080",
        "kubernetes_namespace": "ai-agents"
    }

    manager = EnterpriseAgentManager(config)
    await manager.start()

if __name__ == "__main__":
    asyncio.run(start_enterprise_manager())

セキュリティアーキテクチャ

1. ゼロトラストセキュリティモデル

# security/zero-trust-policy.yml
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: ai-agent-zero-trust
  namespace: ai-agents
spec:
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/ai-agents/sa/agent-executor"]
    to:
    - operation:
        methods: ["POST"]
        paths: ["/api/v1/tasks"]
    when:
    - key: custom.agent_authenticated
      values: ["true"]
    - key: custom.task_validated
      values: ["true"]

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: agent-executor
  namespace: ai-agents
  annotations:
    iam.gke.io/gcp-service-account: ai-agent-sa@project.iam.gserviceaccount.com

---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: ai-agent-tls
  namespace: ai-agents
spec:
  host: "*.ai-agents.svc.cluster.local"
  trafficPolicy:
    tls:
      mode: ISTIO_MUTUAL

2. シークレット管理システム

# security/secrets_manager.py
import base64
import json
from typing import Dict, Any, Optional
from cryptography.fernet import Fernet
from kubernetes import client, config
import hvac  # HashiCorp Vault client

class EnterpriseSecretsManager:
    """エンタープライズシークレット管理"""

    def __init__(self, vault_url: str, vault_token: str):
        self.vault_client = hvac.Client(url=vault_url, token=vault_token)
        self.k8s_client = client.CoreV1Api()
        self.encryption_key = self._get_or_create_encryption_key()
        self.cipher = Fernet(self.encryption_key)

    async def store_agent_secrets(
        self,
        agent_id: str,
        secrets: Dict[str, str]
    ) -> bool:
        """エージェント用シークレットの保存"""

        try:
            # Vaultに暗号化して保存
            encrypted_secrets = {
                key: self.cipher.encrypt(value.encode()).decode()
                for key, value in secrets.items()
            }

            vault_path = f"agents/{agent_id}"
            self.vault_client.secrets.kv.v2.create_or_update_secret(
                path=vault_path,
                secret=encrypted_secrets
            )

            # Kubernetesシークレットとしても保存(アクセス高速化のため)
            k8s_secret_data = {
                key: base64.b64encode(value.encode()).decode()
                for key, value in secrets.items()
            }

            secret_metadata = client.V1ObjectMeta(
                name=f"agent-secrets-{agent_id}",
                namespace="ai-agents",
                labels={"agent-id": agent_id}
            )

            secret = client.V1Secret(
                metadata=secret_metadata,
                data=k8s_secret_data,
                type="Opaque"
            )

            self.k8s_client.create_namespaced_secret(
                namespace="ai-agents",
                body=secret
            )

            return True

        except Exception as e:
            print(f"Failed to store secrets for agent {agent_id}: {e}")
            return False

    async def get_agent_secrets(self, agent_id: str) -> Dict[str, str]:
        """エージェントシークレットの取得"""

        try:
            # まずKubernetesから取得を試行(高速)
            try:
                secret = self.k8s_client.read_namespaced_secret(
                    name=f"agent-secrets-{agent_id}",
                    namespace="ai-agents"
                )

                return {
                    key: base64.b64decode(value).decode()
                    for key, value in secret.data.items()
                }

            except client.exceptions.ApiException:
                # K8sにない場合はVaultから取得
                vault_path = f"agents/{agent_id}"
                response = self.vault_client.secrets.kv.v2.read_secret_version(
                    path=vault_path
                )

                encrypted_secrets = response['data']['data']

                return {
                    key: self.cipher.decrypt(value.encode()).decode()
                    for key, value in encrypted_secrets.items()
                }

        except Exception as e:
            print(f"Failed to get secrets for agent {agent_id}: {e}")
            return {}

    async def rotate_secrets(self, agent_id: str) -> bool:
        """シークレットのローテーション"""

        try:
            # 新しいAPIキーを生成
            new_secrets = await self._generate_new_secrets(agent_id)

            # 新しいシークレットを保存
            await self.store_agent_secrets(agent_id, new_secrets)

            # エージェントに新しいシークレットを通知
            await self._notify_agent_secret_rotation(agent_id)

            print(f"Secrets rotated for agent {agent_id}")
            return True

        except Exception as e:
            print(f"Failed to rotate secrets for agent {agent_id}: {e}")
            return False

    async def audit_secret_access(self, agent_id: str, secret_key: str):
        """シークレットアクセスの監査ログ"""

        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "agent_id": agent_id,
            "secret_key": secret_key,
            "action": "access",
            "source_ip": self._get_client_ip()
        }

        # 監査ログをセキュアなログシステムに送信
        await self._send_audit_log(audit_entry)

    def _get_or_create_encryption_key(self) -> bytes:
        """暗号化キーの取得または作成"""

        try:
            # Vaultから暗号化キーを取得
            response = self.vault_client.secrets.kv.v2.read_secret_version(
                path="encryption/master-key"
            )
            return response['data']['data']['key'].encode()

        except:
            # 新しいキーを生成してVaultに保存
            new_key = Fernet.generate_key()

            self.vault_client.secrets.kv.v2.create_or_update_secret(
                path="encryption/master-key",
                secret={"key": new_key.decode()}
            )

            return new_key

    async def _generate_new_secrets(self, agent_id: str) -> Dict[str, str]:
        """新しいシークレットの生成"""
        import secrets
        import string

        # 強力なランダムパスワード生成
        alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
        new_api_key = ''.join(secrets.choice(alphabet) for _ in range(32))

        return {
            "api_key": new_api_key,
            "agent_token": str(uuid.uuid4()),
            "generated_at": datetime.now().isoformat()
        }

監視・Observabilityシステム

1. 包括的監視アーキテクチャ

# monitoring/observability_stack.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
from dataclasses import dataclass
import aiohttp
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

@dataclass
class AlertRule:
    name: str
    condition: str
    threshold: float
    duration: int  # seconds
    severity: str
    action: str

class AIAgentObservability:
    """AIエージェント専用Observabilityシステム"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config

        # Prometheus レジストリ
        self.registry = CollectorRegistry()

        # メトリクス定義
        self.agent_requests = Counter(
            'ai_agent_requests_total',
            'Total agent requests',
            ['agent_type', 'status', 'endpoint'],
            registry=self.registry
        )

        self.agent_response_time = Histogram(
            'ai_agent_response_time_seconds',
            'Agent response time',
            ['agent_type', 'complexity'],
            registry=self.registry
        )

        self.agent_success_rate = Gauge(
            'ai_agent_success_rate',
            'Agent success rate',
            ['agent_type'],
            registry=self.registry
        )

        self.token_usage = Counter(
            'ai_agent_token_usage_total',
            'Total token usage',
            ['agent_type', 'model'],
            registry=self.registry
        )

        # トレーシング設定
        trace.set_tracer_provider(TracerProvider())
        tracer = trace.get_tracer(__name__)

        jaeger_exporter = JaegerExporter(
            agent_host_name=config.get('jaeger_host', 'localhost'),
            agent_port=config.get('jaeger_port', 14268),
        )

        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)

        # アラートルール
        self.alert_rules = self._load_alert_rules()
        self.active_alerts = {}

    def _load_alert_rules(self) -> List[AlertRule]:
        """アラートルールの読み込み"""
        return [
            AlertRule(
                name="high_error_rate",
                condition="agent_success_rate < 0.9",
                threshold=0.9,
                duration=300,  # 5 minutes
                severity="warning",
                action="scale_up"
            ),
            AlertRule(
                name="critical_error_rate",
                condition="agent_success_rate < 0.8",
                threshold=0.8,
                duration=180,  # 3 minutes
                severity="critical",
                action="immediate_intervention"
            ),
            AlertRule(
                name="high_response_time",
                condition="agent_response_time > 30.0",
                threshold=30.0,
                duration=600,  # 10 minutes
                severity="warning",
                action="performance_investigation"
            ),
            AlertRule(
                name="token_budget_exceeded",
                condition="hourly_token_usage > 1000000",
                threshold=1000000,
                duration=60,
                severity="critical",
                action="rate_limiting"
            )
        ]

    async def start_monitoring(self):
        """監視システムの開始"""
        print("Starting AI Agent Observability System...")

        tasks = [
            asyncio.create_task(self._metrics_collector()),
            asyncio.create_task(self._alert_manager()),
            asyncio.create_task(self._health_checker()),
            asyncio.create_task(self._cost_tracker()),
            asyncio.create_task(self._performance_analyzer())
        ]

        await asyncio.gather(*tasks)

    async def record_agent_request(
        self,
        agent_type: str,
        endpoint: str,
        status: str,
        response_time: float,
        token_count: int = 0,
        model: str = "unknown"
    ):
        """エージェントリクエストの記録"""

        # メトリクス更新
        self.agent_requests.labels(
            agent_type=agent_type,
            status=status,
            endpoint=endpoint
        ).inc()

        self.agent_response_time.labels(
            agent_type=agent_type,
            complexity="standard"  # 実際は動的に判定
        ).observe(response_time)

        if token_count > 0:
            self.token_usage.labels(
                agent_type=agent_type,
                model=model
            ).inc(token_count)

        # トレーシング
        with trace.get_tracer(__name__).start_as_current_span("agent_request") as span:
            span.set_attribute("agent.type", agent_type)
            span.set_attribute("agent.endpoint", endpoint)
            span.set_attribute("agent.status", status)
            span.set_attribute("agent.response_time", response_time)
            span.set_attribute("agent.token_count", token_count)

    async def _metrics_collector(self):
        """メトリクス収集"""
        while True:
            try:
                # 成功率の計算
                success_rates = await self._calculate_success_rates()
                for agent_type, rate in success_rates.items():
                    self.agent_success_rate.labels(agent_type=agent_type).set(rate)

                await asyncio.sleep(30)  # 30秒間隔

            except Exception as e:
                print(f"Metrics collector error: {e}")
                await asyncio.sleep(60)

    async def _calculate_success_rates(self) -> Dict[str, float]:
        """エージェント成功率の計算"""

        # Prometheusからメトリクスを取得して成功率を計算
        # 実装簡略化のため、ダミーデータを返す
        return {
            "code_generator": 0.95,
            "security_scanner": 0.98,
            "performance_tester": 0.92
        }

    async def _alert_manager(self):
        """アラート管理"""
        while True:
            try:
                for rule in self.alert_rules:
                    await self._evaluate_alert_rule(rule)

                await asyncio.sleep(60)  # 1分間隔でアラート評価

            except Exception as e:
                print(f"Alert manager error: {e}")
                await asyncio.sleep(120)

    async def _evaluate_alert_rule(self, rule: AlertRule):
        """アラートルールの評価"""

        current_value = await self._get_metric_value(rule.condition)

        if current_value is None:
            return

        # 条件に合致するかチェック
        if self._condition_met(rule, current_value):
            if rule.name not in self.active_alerts:
                # 新しいアラート
                self.active_alerts[rule.name] = {
                    "start_time": datetime.now(),
                    "rule": rule,
                    "current_value": current_value
                }
                print(f"Alert triggered: {rule.name} (value: {current_value})")

            else:
                # 既存アラートの継続チェック
                alert = self.active_alerts[rule.name]
                duration = (datetime.now() - alert["start_time"]).total_seconds()

                if duration >= rule.duration:
                    # アラート条件が持続時間を超えた
                    await self._fire_alert(rule, current_value, duration)

        else:
            # 条件が解決された
            if rule.name in self.active_alerts:
                del self.active_alerts[rule.name]
                print(f"Alert resolved: {rule.name}")

    def _condition_met(self, rule: AlertRule, value: float) -> bool:
        """アラート条件の判定"""
        if ">" in rule.condition:
            return value > rule.threshold
        elif "<" in rule.condition:
            return value < rule.threshold
        return False

    async def _fire_alert(self, rule: AlertRule, value: float, duration: float):
        """アラートの発火"""

        alert_data = {
            "alert_name": rule.name,
            "severity": rule.severity,
            "current_value": value,
            "threshold": rule.threshold,
            "duration": duration,
            "timestamp": datetime.now().isoformat(),
            "action": rule.action
        }

        # 通知送信
        await self._send_alert_notification(alert_data)

        # 自動対応アクションの実行
        await self._execute_alert_action(rule.action, alert_data)

    async def _send_alert_notification(self, alert_data: Dict[str, Any]):
        """アラート通知の送信"""

        # Slack通知
        if self.config.get('slack_webhook'):
            await self._send_slack_alert(alert_data)

        # メール通知
        if self.config.get('email_enabled'):
            await self._send_email_alert(alert_data)

        # PagerDuty通知(重要アラートの場合)
        if alert_data['severity'] == 'critical':
            await self._send_pagerduty_alert(alert_data)

    async def _send_slack_alert(self, alert_data: Dict[str, Any]):
        """Slack通知"""

        color = "warning" if alert_data['severity'] == "warning" else "danger"

        message = {
            "attachments": [{
                "color": color,
                "title": f"🤖 AI Agent Alert: {alert_data['alert_name']}",
                "fields": [
                    {"title": "Severity", "value": alert_data['severity'], "short": True},
                    {"title": "Current Value", "value": str(alert_data['current_value']), "short": True},
                    {"title": "Threshold", "value": str(alert_data['threshold']), "short": True},
                    {"title": "Duration", "value": f"{alert_data['duration']:.0f}s", "short": True}
                ],
                "footer": "AI Agent Monitoring",
                "ts": int(datetime.now().timestamp())
            }]
        }

        async with aiohttp.ClientSession() as session:
            await session.post(
                self.config['slack_webhook'],
                json=message
            )

    async def _execute_alert_action(self, action: str, alert_data: Dict[str, Any]):
        """アラートアクションの実行"""

        actions = {
            "scale_up": self._action_scale_up,
            "immediate_intervention": self._action_immediate_intervention,
            "performance_investigation": self._action_performance_investigation,
            "rate_limiting": self._action_rate_limiting
        }

        if action in actions:
            try:
                await actions[action](alert_data)
                print(f"Alert action executed: {action}")
            except Exception as e:
                print(f"Failed to execute alert action {action}: {e}")

    async def _action_scale_up(self, alert_data: Dict[str, Any]):
        """スケールアップアクション"""
        # Agent Managerに自動スケーリング指示
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.config['agent_manager_url']}/scale-up",
                json={"reason": "high_error_rate_alert"}
            )

    async def _action_immediate_intervention(self, alert_data: Dict[str, Any]):
        """緊急対応アクション"""
        # 運用チームに即座に通知
        # 一時的な負荷制限の実装
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.config['agent_manager_url']}/emergency-mode",
                json={"alert_data": alert_data}
            )

    async def _cost_tracker(self):
        """コスト追跡"""
        while True:
            try:
                # トークン使用量からコストを計算
                hourly_costs = await self._calculate_hourly_costs()

                # コスト予算の監視
                await self._monitor_cost_budget(hourly_costs)

                await asyncio.sleep(3600)  # 1時間間隔

            except Exception as e:
                print(f"Cost tracker error: {e}")
                await asyncio.sleep(3600)

    async def _calculate_hourly_costs(self) -> Dict[str, float]:
        """時間当たりコストの計算"""

        # モデルごとの料金テーブル
        pricing = {
            "gpt-5": {"input": 0.002, "output": 0.006},  # per 1k tokens
            "claude-opus-4.1": {"input": 0.015, "output": 0.075},
            "gemini-2.0-flash": {"input": 0.001, "output": 0.002}
        }

        costs = {}

        # 各モデルのトークン使用量を取得してコスト計算
        # 実装簡略化

        return costs

# 設定とデプロイ用Kubernetes manifest
observability_config = """
apiVersion: v1
kind: ConfigMap
metadata:
  name: observability-config
  namespace: ai-agents
data:
  config.yaml: |
    jaeger_host: jaeger-collector.monitoring.svc.cluster.local
    jaeger_port: 14268
    prometheus_url: http://prometheus.monitoring.svc.cluster.local:9090
    slack_webhook: "${{ secrets.SLACK_WEBHOOK_URL }}"
    agent_manager_url: http://agent-manager.ai-agents.svc.cluster.local:8080
    cost_budget:
      daily_limit: 1000.0
      monthly_limit: 25000.0
    alert_channels:
      - slack
      - email
      - pagerduty
"""

# 使用例
async def start_observability_system():
    """Observabilityシステムの開始"""
    config = {
        "jaeger_host": "jaeger-collector",
        "jaeger_port": 14268,
        "slack_webhook": "https://hooks.slack.com/services/...",
        "agent_manager_url": "http://agent-manager:8080"
    }

    observability = AIAgentObservability(config)
    await observability.start_monitoring()

if __name__ == "__main__":
    asyncio.run(start_observability_system())

パフォーマンス最適化とスケーリング

1. 動的負荷分散システム

# performance/load_balancer.py
import asyncio
import random
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import aiohttp
import time

class LoadBalancingStrategy(Enum):
    ROUND_ROBIN = "round_robin"
    LEAST_CONNECTIONS = "least_connections"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    AI_OPTIMIZED = "ai_optimized"

@dataclass
class AgentEndpoint:
    id: str
    url: str
    weight: int
    current_connections: int
    average_response_time: float
    success_rate: float
    capacity: int
    agent_type: str

class IntelligentLoadBalancer:
    """AI最適化ロードバランサー"""

    def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.AI_OPTIMIZED):
        self.strategy = strategy
        self.endpoints: Dict[str, AgentEndpoint] = {}
        self.request_history = []
        self.round_robin_counter = 0

    def register_endpoint(self, endpoint: AgentEndpoint):
        """エンドポイントの登録"""
        self.endpoints[endpoint.id] = endpoint
        print(f"Endpoint registered: {endpoint.id} ({endpoint.agent_type})")

    async def select_endpoint(
        self,
        request_type: str,
        estimated_complexity: float = 1.0
    ) -> Optional[AgentEndpoint]:
        """最適なエンドポイントの選択"""

        available_endpoints = [
            ep for ep in self.endpoints.values()
            if ep.current_connections < ep.capacity
        ]

        if not available_endpoints:
            return None

        if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
            return self._round_robin_select(available_endpoints)

        elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
            return self._least_connections_select(available_endpoints)

        elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
            return self._weighted_round_robin_select(available_endpoints)

        elif self.strategy == LoadBalancingStrategy.AI_OPTIMIZED:
            return await self._ai_optimized_select(
                available_endpoints,
                request_type,
                estimated_complexity
            )

    def _round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
        """ラウンドロビン選択"""
        selected = endpoints[self.round_robin_counter % len(endpoints)]
        self.round_robin_counter += 1
        return selected

    def _least_connections_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
        """最少コネクション選択"""
        return min(endpoints, key=lambda ep: ep.current_connections)

    def _weighted_round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
        """重み付きラウンドロビン選択"""
        weighted_endpoints = []
        for ep in endpoints:
            weighted_endpoints.extend([ep] * ep.weight)

        if weighted_endpoints:
            return random.choice(weighted_endpoints)
        return endpoints[0]

    async def _ai_optimized_select(
        self,
        endpoints: List[AgentEndpoint],
        request_type: str,
        estimated_complexity: float
    ) -> AgentEndpoint:
        """AI最適化選択"""

        # 各エンドポイントのスコアを計算
        scores = []

        for ep in endpoints:
            # 基本スコア(成功率とレスポンス時間)
            success_score = ep.success_rate
            response_time_score = 1.0 / (1.0 + ep.average_response_time)

            # 負荷スコア
            load_score = 1.0 - (ep.current_connections / ep.capacity)

            # 適合性スコア(リクエストタイプとエージェントタイプの相性)
            compatibility_score = await self._calculate_compatibility(
                request_type,
                ep.agent_type
            )

            # 複雑度適合スコア
            complexity_score = await self._calculate_complexity_fitness(
                estimated_complexity,
                ep
            )

            # 総合スコア
            total_score = (
                success_score * 0.3 +
                response_time_score * 0.25 +
                load_score * 0.2 +
                compatibility_score * 0.15 +
                complexity_score * 0.1
            )

            scores.append((ep, total_score))

        # 最高スコアのエンドポイントを選択
        return max(scores, key=lambda x: x[1])[0]

    async def _calculate_compatibility(self, request_type: str, agent_type: str) -> float:
        """リクエストとエージェントの相性計算"""

        compatibility_matrix = {
            "code_generation": {
                "code_generator": 1.0,
                "security_scanner": 0.3,
                "performance_tester": 0.2
            },
            "security_analysis": {
                "security_scanner": 1.0,
                "code_generator": 0.4,
                "performance_tester": 0.1
            },
            "performance_test": {
                "performance_tester": 1.0,
                "code_generator": 0.3,
                "security_scanner": 0.2
            }
        }

        return compatibility_matrix.get(request_type, {}).get(agent_type, 0.5)

    async def _calculate_complexity_fitness(
        self,
        estimated_complexity: float,
        endpoint: AgentEndpoint
    ) -> float:
        """複雑度適合度の計算"""

        # エンドポイントの処理能力と推定複雑度の適合度
        if estimated_complexity <= 0.3:  # 簡単なタスク
            return 1.0  # どのエンドポイントでも対応可能

        elif estimated_complexity <= 0.7:  # 中程度のタスク
            # 平均的な能力のエンドポイントが最適
            return 1.0 - abs(endpoint.weight - 5) / 5.0

        else:  # 複雑なタスク
            # 高性能エンドポイントが必要
            return endpoint.weight / 10.0

    async def execute_request(
        self,
        request_data: Dict[str, Any],
        request_type: str,
        estimated_complexity: float = 1.0
    ) -> Dict[str, Any]:
        """リクエストの実行"""

        endpoint = await self.select_endpoint(request_type, estimated_complexity)

        if not endpoint:
            return {
                "success": False,
                "error": "No available endpoints",
                "retry_after": 30
            }

        # 接続数を増加
        endpoint.current_connections += 1
        start_time = time.time()

        try:
            # 実際のリクエスト実行
            result = await self._execute_on_endpoint(endpoint, request_data)

            execution_time = time.time() - start_time

            # 統計情報の更新
            await self._update_endpoint_stats(endpoint, execution_time, True)

            return {
                "success": True,
                "result": result,
                "endpoint_id": endpoint.id,
                "execution_time": execution_time
            }

        except Exception as e:
            execution_time = time.time() - start_time
            await self._update_endpoint_stats(endpoint, execution_time, False)

            return {
                "success": False,
                "error": str(e),
                "endpoint_id": endpoint.id,
                "execution_time": execution_time
            }

        finally:
            # 接続数を減少
            endpoint.current_connections = max(0, endpoint.current_connections - 1)

    async def _execute_on_endpoint(
        self,
        endpoint: AgentEndpoint,
        request_data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """エンドポイントでのリクエスト実行"""

        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{endpoint.url}/execute",
                json=request_data,
                timeout=aiohttp.ClientTimeout(total=300)
            ) as response:

                if response.status == 200:
                    return await response.json()
                else:
                    raise Exception(f"HTTP {response.status}: {await response.text()}")

    async def _update_endpoint_stats(
        self,
        endpoint: AgentEndpoint,
        execution_time: float,
        success: bool
    ):
        """エンドポイント統計の更新"""

        # レスポンス時間の移動平均更新
        alpha = 0.1  # 平滑化係数
        endpoint.average_response_time = (
            alpha * execution_time +
            (1 - alpha) * endpoint.average_response_time
        )

        # 成功率の更新
        self.request_history.append(success)

        # 直近100件の成功率を計算
        recent_history = self.request_history[-100:]
        endpoint.success_rate = sum(recent_history) / len(recent_history)

    async def health_check(self):
        """エンドポイントのヘルスチェック"""

        while True:
            try:
                for endpoint in list(self.endpoints.values()):
                    try:
                        async with aiohttp.ClientSession() as session:
                            async with session.get(
                                f"{endpoint.url}/health",
                                timeout=aiohttp.ClientTimeout(total=10)
                            ) as response:

                                if response.status != 200:
                                    print(f"Endpoint {endpoint.id} health check failed")
                                    # エンドポイントを一時的に無効化する処理

                    except Exception as e:
                        print(f"Health check error for {endpoint.id}: {e}")

                await asyncio.sleep(30)  # 30秒間隔でヘルスチェック

            except Exception as e:
                print(f"Health check loop error: {e}")
                await asyncio.sleep(60)

# 使用例
async def example_load_balancer():
    """ロードバランサーの使用例"""

    balancer = IntelligentLoadBalancer(LoadBalancingStrategy.AI_OPTIMIZED)

    # エンドポイントの登録
    endpoints = [
        AgentEndpoint(
            id="agent-1",
            url="http://agent-1:8080",
            weight=8,
            current_connections=0,
            average_response_time=2.5,
            success_rate=0.95,
            capacity=10,
            agent_type="code_generator"
        ),
        AgentEndpoint(
            id="agent-2",
            url="http://agent-2:8080",
            weight=6,
            current_connections=0,
            average_response_time=3.1,
            success_rate=0.92,
            capacity=8,
            agent_type="security_scanner"
        )
    ]

    for endpoint in endpoints:
        balancer.register_endpoint(endpoint)

    # ヘルスチェック開始
    health_task = asyncio.create_task(balancer.health_check())

    # リクエスト実行例
    request_result = await balancer.execute_request(
        request_data={"task": "generate_function", "language": "python"},
        request_type="code_generation",
        estimated_complexity=0.5
    )

    print(f"Request result: {request_result}")

if __name__ == "__main__":
    asyncio.run(example_load_balancer())

2. コスト最適化システム

# cost/optimization_engine.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass
import numpy as np

@dataclass
class CostMetrics:
    model_name: str
    input_tokens: int
    output_tokens: int
    request_count: int
    total_cost: float
    timestamp: datetime

class CostOptimizationEngine:
    """コスト最適化エンジン"""

    def __init__(self, budget_config: Dict[str, Any]):
        self.budget_config = budget_config
        self.cost_history: List[CostMetrics] = []

        # モデル料金テーブル(per 1K tokens)
        self.pricing = {
            "gpt-5": {"input": 0.002, "output": 0.006},
            "gpt-4o": {"input": 0.0015, "output": 0.002},
            "claude-opus-4.1": {"input": 0.015, "output": 0.075},
            "claude-sonnet-3.5": {"input": 0.003, "output": 0.015},
            "gemini-2.0-flash": {"input": 0.001, "output": 0.002}
        }

        # モデル性能テーブル(相対的品質スコア)
        self.model_performance = {
            "gpt-5": {"quality": 0.95, "speed": 0.8},
            "gpt-4o": {"quality": 0.9, "speed": 0.9},
            "claude-opus-4.1": {"quality": 0.92, "speed": 0.7},
            "claude-sonnet-3.5": {"quality": 0.88, "speed": 0.85},
            "gemini-2.0-flash": {"quality": 0.85, "speed": 0.95}
        }

    async def optimize_model_selection(
        self,
        task_requirements: Dict[str, Any]
    ) -> Dict[str, Any]:
        """タスク要件に基づく最適モデル選択"""

        complexity = task_requirements.get("complexity", 0.5)
        quality_priority = task_requirements.get("quality_priority", 0.7)
        speed_priority = task_requirements.get("speed_priority", 0.3)
        budget_limit = task_requirements.get("budget_limit")

        model_scores = {}

        for model, performance in self.model_performance.items():
            # 推定コスト計算
            estimated_tokens = self._estimate_token_usage(task_requirements, model)
            estimated_cost = self._calculate_cost(model, estimated_tokens)

            # 予算制約チェック
            if budget_limit and estimated_cost > budget_limit:
                continue

            # スコア計算
            quality_score = performance["quality"]
            speed_score = performance["speed"]
            cost_score = 1.0 / (1.0 + estimated_cost)  # コストが低いほど高スコア

            # 複雑度に基づく品質要件調整
            quality_weight = quality_priority * (0.5 + complexity)

            total_score = (
                quality_score * quality_weight +
                speed_score * speed_priority +
                cost_score * (1.0 - quality_priority - speed_priority)
            )

            model_scores[model] = {
                "score": total_score,
                "estimated_cost": estimated_cost,
                "estimated_tokens": estimated_tokens,
                "quality_score": quality_score,
                "speed_score": speed_score
            }

        if not model_scores:
            return {"error": "No models meet budget constraints"}

        # 最高スコアのモデルを選択
        best_model = max(model_scores.items(), key=lambda x: x[1]["score"])

        return {
            "recommended_model": best_model[0],
            "reasoning": best_model[1],
            "alternatives": dict(sorted(
                model_scores.items(),
                key=lambda x: x[1]["score"],
                reverse=True
            ))
        }

    def _estimate_token_usage(
        self,
        task_requirements: Dict[str, Any],
        model: str
    ) -> Dict[str, int]:
        """トークン使用量の推定"""

        complexity = task_requirements.get("complexity", 0.5)
        task_type = task_requirements.get("type", "general")

        # タスクタイプ別基本トークン数
        base_tokens = {
            "code_generation": {"input": 500, "output": 2000},
            "security_analysis": {"input": 1000, "output": 1500},
            "performance_testing": {"input": 800, "output": 1200},
            "general": {"input": 300, "output": 800}
        }

        base = base_tokens.get(task_type, base_tokens["general"])

        # 複雑度による調整
        complexity_multiplier = 0.5 + complexity * 1.5

        # モデル特性による調整
        model_efficiency = {
            "gpt-5": 1.0,
            "gpt-4o": 1.1,
            "claude-opus-4.1": 0.9,
            "claude-sonnet-3.5": 1.0,
            "gemini-2.0-flash": 1.2
        }

        efficiency = model_efficiency.get(model, 1.0)

        return {
            "input": int(base["input"] * complexity_multiplier * efficiency),
            "output": int(base["output"] * complexity_multiplier * efficiency)
        }

    def _calculate_cost(self, model: str, token_usage: Dict[str, int]) -> float:
        """コスト計算"""

        if model not in self.pricing:
            return 0.0

        pricing = self.pricing[model]

        input_cost = (token_usage["input"] / 1000) * pricing["input"]
        output_cost = (token_usage["output"] / 1000) * pricing["output"]

        return input_cost + output_cost

    async def analyze_spending_patterns(self) -> Dict[str, Any]:
        """支出パターンの分析"""

        if not self.cost_history:
            return {"error": "No cost history available"}

        # 時系列データの準備
        daily_costs = {}
        model_costs = {}

        for metric in self.cost_history:
            date_key = metric.timestamp.date().isoformat()

            if date_key not in daily_costs:
                daily_costs[date_key] = 0.0
            daily_costs[date_key] += metric.total_cost

            if metric.model_name not in model_costs:
                model_costs[metric.model_name] = 0.0
            model_costs[metric.model_name] += metric.total_cost

        # 傾向分析
        dates = sorted(daily_costs.keys())
        costs = [daily_costs[date] for date in dates]

        if len(costs) >= 7:
            # 週次傾向
            week_trend = np.polyfit(range(len(costs[-7:])), costs[-7:], 1)[0]
        else:
            week_trend = 0.0

        # 予算消化率
        total_spent = sum(costs)
        monthly_budget = self.budget_config.get("monthly_limit", 10000)
        burn_rate = total_spent / monthly_budget

        # 最適化提案の生成
        optimization_suggestions = await self._generate_optimization_suggestions(
            model_costs, total_spent, week_trend
        )

        return {
            "total_spent": total_spent,
            "daily_average": np.mean(costs) if costs else 0.0,
            "weekly_trend": week_trend,
            "budget_burn_rate": burn_rate,
            "model_breakdown": model_costs,
            "optimization_suggestions": optimization_suggestions,
            "projected_monthly_cost": total_spent + (week_trend * 4)
        }

    async def _generate_optimization_suggestions(
        self,
        model_costs: Dict[str, float],
        total_spent: float,
        trend: float
    ) -> List[Dict[str, Any]]:
        """最適化提案の生成"""

        suggestions = []

        # 高コストモデルの分析
        if model_costs:
            most_expensive = max(model_costs.items(), key=lambda x: x[1])

            if most_expensive[1] > total_spent * 0.4:  # 40%以上を占める
                suggestions.append({
                    "type": "model_substitution",
                    "priority": "high",
                    "description": f"{most_expensive[0]}の使用頻度が高すぎます",
                    "action": "より安価な代替モデルの検討",
                    "potential_savings": most_expensive[1] * 0.3
                })

        # 増加傾向の警告
        if trend > 0:
            suggestions.append({
                "type": "spending_trend",
                "priority": "medium",
                "description": f"支出が週次で${trend:.2f}増加中",
                "action": "使用量の監視強化",
                "potential_savings": trend * 4  # 月次予測
            })

        # 予算最適化
        monthly_budget = self.budget_config.get("monthly_limit", 10000)
        if total_spent > monthly_budget * 0.8:
            suggestions.append({
                "type": "budget_warning",
                "priority": "critical",
                "description": "月次予算の80%を超過",
                "action": "緊急コスト削減措置",
                "potential_savings": total_spent - monthly_budget * 0.7
            })

        return suggestions

    async def implement_cost_controls(
        self,
        control_config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """コスト制御の実装"""

        implemented_controls = []

        # レート制限の設定
        if control_config.get("enable_rate_limiting"):
            rate_limits = {
                "requests_per_hour": control_config.get("max_requests_per_hour", 1000),
                "tokens_per_hour": control_config.get("max_tokens_per_hour", 100000),
                "cost_per_hour": control_config.get("max_cost_per_hour", 100.0)
            }

            implemented_controls.append({
                "type": "rate_limiting",
                "config": rate_limits
            })

        # 自動モデル降格
        if control_config.get("enable_auto_downgrade"):
            downgrade_rules = {
                "cost_threshold": control_config.get("downgrade_cost_threshold", 0.1),
                "model_mapping": {
                    "gpt-5": "gpt-4o",
                    "claude-opus-4.1": "claude-sonnet-3.5"
                }
            }

            implemented_controls.append({
                "type": "auto_downgrade",
                "config": downgrade_rules
            })

        # 予算アラート
        if control_config.get("enable_budget_alerts"):
            alert_thresholds = [0.5, 0.8, 0.9, 1.0]  # 50%, 80%, 90%, 100%

            implemented_controls.append({
                "type": "budget_alerts",
                "config": {"thresholds": alert_thresholds}
            })

        return {
            "success": True,
            "implemented_controls": implemented_controls,
            "effective_date": datetime.now().isoformat()
        }

# 使用例
async def demonstrate_cost_optimization():
    """コスト最適化の使用例"""

    budget_config = {
        "daily_limit": 500.0,
        "monthly_limit": 15000.0,
        "alert_thresholds": [0.5, 0.8, 0.9]
    }

    optimizer = CostOptimizationEngine(budget_config)

    # タスク最適化例
    task_requirements = {
        "type": "code_generation",
        "complexity": 0.7,
        "quality_priority": 0.8,
        "speed_priority": 0.2,
        "budget_limit": 0.5
    }

    optimization_result = await optimizer.optimize_model_selection(task_requirements)
    print(f"Optimization result: {optimization_result}")

    # 支出分析例
    spending_analysis = await optimizer.analyze_spending_patterns()
    print(f"Spending analysis: {spending_analysis}")

if __name__ == "__main__":
    asyncio.run(demonstrate_cost_optimization())

デプロイメント自動化

1. Kubernetes Helm Chart

# helm/ai-agents/Chart.yaml
apiVersion: v2
name: ai-agents
description: Enterprise AI Agent Platform
type: application
version: 1.0.0
appVersion: "1.0.0"

---
# helm/ai-agents/values.yaml
global:
  imageRegistry: "your-registry.com"
  imagePullSecrets:
    - regcred

agentManager:
  replicaCount: 3
  image:
    repository: ai-agent-manager
    tag: "latest"
    pullPolicy: IfNotPresent

  service:
    type: ClusterIP
    port: 8080

  resources:
    limits:
      cpu: 1000m
      memory: 2Gi
    requests:
      cpu: 500m
      memory: 1Gi

  autoscaling:
    enabled: true
    minReplicas: 3
    maxReplicas: 10
    targetCPUUtilizationPercentage: 70

loadBalancer:
  replicaCount: 2
  image:
    repository: ai-load-balancer
    tag: "latest"

  service:
    type: LoadBalancer
    port: 80
    targetPort: 8080

agents:
  codeGenerator:
    enabled: true
    replicaCount: 2
    image:
      repository: ai-code-generator
      tag: "latest"
    resources:
      limits:
        cpu: 2000m
        memory: 4Gi
      requests:
        cpu: 1000m
        memory: 2Gi

  securityScanner:
    enabled: true
    replicaCount: 1
    image:
      repository: ai-security-scanner
      tag: "latest"
    resources:
      limits:
        cpu: 1000m
        memory: 2Gi
      requests:
        cpu: 500m
        memory: 1Gi

redis:
  enabled: true
  architecture: replication
  auth:
    enabled: true
    password: "${{ secrets.REDIS_PASSWORD }}"

monitoring:
  prometheus:
    enabled: true
  grafana:
    enabled: true
  jaeger:
    enabled: true

security:
  networkPolicies:
    enabled: true
  podSecurityPolicy:
    enabled: true
  serviceAccount:
    create: true
    name: ai-agent-sa

---
# helm/ai-agents/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "ai-agents.fullname" . }}-manager
  labels:
    {{- include "ai-agents.labels" . | nindent 4 }}
    component: manager
spec:
  replicas: {{ .Values.agentManager.replicaCount }}
  selector:
    matchLabels:
      {{- include "ai-agents.selectorLabels" . | nindent 6 }}
      component: manager
  template:
    metadata:
      labels:
        {{- include "ai-agents.selectorLabels" . | nindent 8 }}
        component: manager
    spec:
      serviceAccountName: {{ include "ai-agents.serviceAccountName" . }}
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 2000
      containers:
      - name: manager
        image: "{{ .Values.global.imageRegistry }}/{{ .Values.agentManager.image.repository }}:{{ .Values.agentManager.image.tag }}"
        imagePullPolicy: {{ .Values.agentManager.image.pullPolicy }}
        ports:
        - name: http
          containerPort: 8080
          protocol: TCP
        - name: metrics
          containerPort: 9090
          protocol: TCP
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: http
          initialDelaySeconds: 5
          periodSeconds: 5
        env:
        - name: REDIS_URL
          value: "redis://{{ include "ai-agents.redis.fullname" . }}:6379"
        - name: REDIS_PASSWORD
          valueFrom:
            secretKeyRef:
              name: {{ include "ai-agents.redis.secretName" . }}
              key: redis-password
        - name: KUBERNETES_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        resources:
          {{- toYaml .Values.agentManager.resources | nindent 10 }}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - ALL
          readOnlyRootFilesystem: true

---
# helm/ai-agents/templates/hpa.yaml
{{- if .Values.agentManager.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: {{ include "ai-agents.fullname" . }}-manager-hpa
  labels:
    {{- include "ai-agents.labels" . | nindent 4 }}
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {{ include "ai-agents.fullname" . }}-manager
  minReplicas: {{ .Values.agentManager.autoscaling.minReplicas }}
  maxReplicas: {{ .Values.agentManager.autoscaling.maxReplicas }}
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: {{ .Values.agentManager.autoscaling.targetCPUUtilizationPercentage }}
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: ai_agent_queue_length
      target:
        type: AverageValue
        averageValue: "10"
{{- end }}

---
# helm/ai-agents/templates/networkpolicy.yaml
{{- if .Values.security.networkPolicies.enabled }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: {{ include "ai-agents.fullname" . }}-network-policy
  labels:
    {{- include "ai-agents.labels" . | nindent 4 }}
spec:
  podSelector:
    matchLabels:
      {{- include "ai-agents.selectorLabels" . | nindent 6 }}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          {{- include "ai-agents.selectorLabels" . | nindent 10 }}
    - namespaceSelector:
        matchLabels:
          name: monitoring
    ports:
    - protocol: TCP
      port: 8080
    - protocol: TCP
      port: 9090
  egress:
  - to:
    - podSelector:
        matchLabels:
          app.kubernetes.io/name: redis
    ports:
    - protocol: TCP
      port: 6379
  - to: []
    ports:
    - protocol: TCP
      port: 443  # HTTPS for external API calls
    - protocol: TCP
      port: 53   # DNS
    - protocol: UDP
      port: 53   # DNS
{{- end }}

2. CI/CD パイプライン

# .github/workflows/ai-agents-cicd.yml
name: AI Agents CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'ai-agents/**'
      - 'helm/ai-agents/**'
      - '.github/workflows/ai-agents-cicd.yml'
  pull_request:
    branches: [main]
    paths:
      - 'ai-agents/**'
      - 'helm/ai-agents/**'

env:
  REGISTRY: ghcr.io
  IMAGE_PREFIX: ${{ github.repository }}/ai-agents

jobs:
  security-scan:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Run Trivy vulnerability scanner
        uses: aquasecurity/trivy-action@master
        with:
          scan-type: 'fs'
          scan-ref: './ai-agents'
          format: 'sarif'
          output: 'trivy-results.sarif'

      - name: Upload Trivy scan results
        uses: github/codeql-action/upload-sarif@v2
        with:
          sarif_file: 'trivy-results.sarif'

  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ['3.11', '3.12']

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install dependencies
        run: |
          cd ai-agents
          pip install -r requirements.txt
          pip install pytest pytest-cov pytest-asyncio

      - name: Run unit tests
        run: |
          cd ai-agents
          pytest tests/ --cov=src --cov-report=xml --cov-report=html

      - name: Upload coverage reports
        uses: codecov/codecov-action@v3
        with:
          file: ./ai-agents/coverage.xml
          flags: unittests
          name: codecov-umbrella

  build:
    needs: [security-scan, test]
    runs-on: ubuntu-latest
    strategy:
      matrix:
        component: [manager, load-balancer, code-generator, security-scanner]

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Log in to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}
          tags: |
            type=ref,event=branch
            type=ref,event=pr
            type=sha,prefix=${{ github.ref_name }}-
            type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}

      - name: Build and push Docker image
        uses: docker/build-push-action@v5
        with:
          context: ./ai-agents/${{ matrix.component }}
          platforms: linux/amd64,linux/arm64
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: Sign container image
        env:
          COSIGN_EXPERIMENTAL: 1
        run: |
          cosign sign --yes ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}:${{ github.sha }}

  helm-test:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Helm
        uses: azure/setup-helm@v3
        with:
          version: 'v3.12.0'

      - name: Lint Helm chart
        run: |
          helm lint helm/ai-agents

      - name: Template Helm chart
        run: |
          helm template ai-agents helm/ai-agents \
            --values helm/ai-agents/values.yaml \
            --output-dir /tmp/helm-output

      - name: Validate Kubernetes manifests
        run: |
          kubectl --dry-run=client apply -f /tmp/helm-output/ai-agents/templates/

  deploy-staging:
    if: github.ref == 'refs/heads/develop'
    needs: [build, helm-test]
    runs-on: ubuntu-latest
    environment: staging

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: us-west-2

      - name: Update kubeconfig
        run: |
          aws eks update-kubeconfig --name ai-agents-staging

      - name: Deploy to staging
        run: |
          helm upgrade --install ai-agents-staging helm/ai-agents \
            --namespace ai-agents-staging \
            --create-namespace \
            --values helm/ai-agents/values-staging.yaml \
            --set global.imageTag=${{ github.sha }} \
            --wait --timeout=600s

      - name: Run integration tests
        run: |
          kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=ai-agents \
            -n ai-agents-staging --timeout=300s
          ./scripts/integration-tests.sh staging

  deploy-production:
    if: github.ref == 'refs/heads/main'
    needs: [build, helm-test]
    runs-on: ubuntu-latest
    environment: production

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_PROD }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PROD }}
          aws-region: us-west-2

      - name: Update kubeconfig
        run: |
          aws eks update-kubeconfig --name ai-agents-production

      - name: Blue-Green Deployment
        run: |
          # 現在のバージョンを確認
          CURRENT_VERSION=$(helm get values ai-agents-prod -n ai-agents-prod -o json | jq -r '.global.imageTag')

          # 新バージョンをblue環境にデプロイ
          helm upgrade --install ai-agents-blue helm/ai-agents \
            --namespace ai-agents-blue \
            --create-namespace \
            --values helm/ai-agents/values-production.yaml \
            --set global.imageTag=${{ github.sha }} \
            --set service.type=ClusterIP \
            --wait --timeout=600s

      - name: Health check and smoke tests
        run: |
          ./scripts/health-check.sh blue
          ./scripts/smoke-tests.sh blue

      - name: Switch traffic to blue
        run: |
          # Istio VirtualService を更新してトラフィックを切り替え
          kubectl patch virtualservice ai-agents-vs -n ai-agents-prod \
            --type='json' \
            -p='[{"op": "replace", "path": "/spec/http/0/route/0/destination/subset", "value": "blue"}]'

      - name: Monitor deployment
        run: |
          ./scripts/monitor-deployment.sh 300  # 5分間監視

      - name: Cleanup old version
        if: success()
        run: |
          helm uninstall ai-agents-green -n ai-agents-green || true

  notify:
    if: always()
    needs: [deploy-staging, deploy-production]
    runs-on: ubuntu-latest

    steps:
      - name: Notify Slack
        uses: 8398a7/action-slack@v3
        with:
          status: ${{ job.status }}
          channel: '#ai-agents-deployments'
          webhook_url: ${{ secrets.SLACK_WEBHOOK }}
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

運用手順書とベストプラクティス

1. 運用チェックリスト

# AIエージェント運用チェックリスト

## 日次運用チェック

### システムヘルス確認
- [ ] 全エージェントの稼働状況確認
- [ ] レスポンス時間監視(目標: 95%ile < 5秒)
- [ ] エラー率監視(目標: < 1%)
- [ ] リソース使用率確認(CPU < 80%, Memory < 85%)

### コスト監視
- [ ] 日次コスト確認
- [ ] 予算消化率チェック
- [ ] 異常な使用量の調査

### セキュリティ確認
- [ ] 不審なアクセスログの確認
- [ ] 認証失敗回数の監視
- [ ] シークレットローテーション状況

## 週次運用チェック

### パフォーマンス分析
- [ ] 週次パフォーマンスレポート確認
- [ ] スケーリング履歴の分析
- [ ] ボトルネック特定と改善計画

### 容量計画
- [ ] リソース使用傾向の分析
- [ ] 来月の容量予測
- [ ] スケーリング閾値の調整

### 更新管理
- [ ] セキュリティパッチの適用
- [ ] 依存関係の更新確認
- [ ] 設定変更の反映

## 月次運用チェック

### 包括的レビュー
- [ ] 月次運用レポート作成
- [ ] SLA達成状況確認
- [ ] インシデント分析とアクション項目
- [ ] 運用プロセス改善点の特定

### 災害復旧テスト
- [ ] バックアップ復旧テスト
- [ ] DR環境での動作確認
- [ ] 復旧手順書の更新

2. トラブルシューティングガイド

# troubleshooting/diagnostic_tools.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import aiohttp
import subprocess

class DiagnosticTools:
    """AIエージェント診断ツール"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.checks = {
            "connectivity": self._check_connectivity,
            "performance": self._check_performance,
            "resources": self._check_resources,
            "security": self._check_security,
            "costs": self._check_costs
        }

    async def run_full_diagnostic(self) -> Dict[str, Any]:
        """完全診断の実行"""

        print("Starting comprehensive diagnostic...")
        results = {}

        for check_name, check_func in self.checks.items():
            try:
                print(f"Running {check_name} check...")
                results[check_name] = await check_func()
                print(f"✅ {check_name} check completed")

            except Exception as e:
                print(f"❌ {check_name} check failed: {e}")
                results[check_name] = {"error": str(e)}

        # 総合評価
        overall_health = self._calculate_overall_health(results)

        return {
            "timestamp": datetime.now().isoformat(),
            "overall_health": overall_health,
            "detailed_results": results,
            "recommendations": self._generate_recommendations(results)
        }

    async def _check_connectivity(self) -> Dict[str, Any]:
        """接続性チェック"""

        endpoints = [
            {"name": "Agent Manager", "url": f"{self.config['agent_manager_url']}/health"},
            {"name": "Redis", "url": f"{self.config['redis_url']}/ping"},
            {"name": "Prometheus", "url": f"{self.config['prometheus_url']}/api/v1/targets"}
        ]

        results = []

        for endpoint in endpoints:
            try:
                async with aiohttp.ClientSession() as session:
                    start_time = datetime.now()
                    async with session.get(endpoint["url"], timeout=10) as response:
                        response_time = (datetime.now() - start_time).total_seconds()

                        results.append({
                            "endpoint": endpoint["name"],
                            "status": "healthy" if response.status == 200 else "unhealthy",
                            "response_time": response_time,
                            "status_code": response.status
                        })

            except Exception as e:
                results.append({
                    "endpoint": endpoint["name"],
                    "status": "error",
                    "error": str(e)
                })

        return {
            "status": "healthy" if all(r["status"] == "healthy" for r in results) else "degraded",
            "endpoints": results
        }

    async def _check_performance(self) -> Dict[str, Any]:
        """パフォーマンスチェック"""

        # Prometheusからメトリクスを取得
        metrics_queries = {
            "avg_response_time": "avg(ai_agent_response_time_seconds)",
            "error_rate": "rate(ai_agent_requests_total{status='error'}[5m])",
            "throughput": "rate(ai_agent_requests_total[5m])",
            "active_agents": "count(up{job='ai-agents'})"
        }

        results = {}

        async with aiohttp.ClientSession() as session:
            for metric_name, query in metrics_queries.items():
                try:
                    async with session.get(
                        f"{self.config['prometheus_url']}/api/v1/query",
                        params={"query": query}
                    ) as response:
                        data = await response.json()

                        if data["data"]["result"]:
                            value = float(data["data"]["result"][0]["value"][1])
                            results[metric_name] = value
                        else:
                            results[metric_name] = None

                except Exception as e:
                    results[f"{metric_name}_error"] = str(e)

        # パフォーマンス評価
        performance_status = "healthy"

        if results.get("avg_response_time", 0) > 10:
            performance_status = "degraded"
        if results.get("error_rate", 0) > 0.05:
            performance_status = "unhealthy"

        return {
            "status": performance_status,
            "metrics": results,
            "thresholds": {
                "max_response_time": 10.0,
                "max_error_rate": 0.05
            }
        }

    async def _check_resources(self) -> Dict[str, Any]:
        """リソースチェック"""

        # Kubernetesリソース使用量の確認
        try:
            # kubectl top pods の実行
            result = subprocess.run(
                ["kubectl", "top", "pods", "-n", "ai-agents", "--no-headers"],
                capture_output=True,
                text=True,
                check=True
            )

            pod_resources = []
            for line in result.stdout.strip().split('\n'):
                if line:
                    parts = line.split()
                    pod_resources.append({
                        "pod": parts[0],
                        "cpu": parts[1],
                        "memory": parts[2]
                    })

            # ノードリソースの確認
            node_result = subprocess.run(
                ["kubectl", "top", "nodes", "--no-headers"],
                capture_output=True,
                text=True,
                check=True
            )

            node_resources = []
            for line in node_result.stdout.strip().split('\n'):
                if line:
                    parts = line.split()
                    node_resources.append({
                        "node": parts[0],
                        "cpu": parts[1],
                        "cpu_percent": parts[2],
                        "memory": parts[3],
                        "memory_percent": parts[4]
                    })

            return {
                "status": "healthy",
                "pods": pod_resources,
                "nodes": node_resources
            }

        except subprocess.CalledProcessError as e:
            return {
                "status": "error",
                "error": f"Failed to get resource information: {e}"
            }

    async def _check_security(self) -> Dict[str, Any]:
        """セキュリティチェック"""

        security_checks = []

        # シークレットの有効期限チェック
        try:
            result = subprocess.run(
                ["kubectl", "get", "secrets", "-n", "ai-agents", "-o", "json"],
                capture_output=True,
                text=True,
                check=True
            )

            secrets_data = json.loads(result.stdout)

            for secret in secrets_data["items"]:
                creation_time = datetime.fromisoformat(
                    secret["metadata"]["creationTimestamp"].replace('Z', '+00:00')
                )
                age_days = (datetime.now(creation_time.tzinfo) - creation_time).days

                status = "healthy"
                if age_days > 90:  # 90日以上古い
                    status = "warning"
                if age_days > 180:  # 180日以上古い
                    status = "critical"

                security_checks.append({
                    "type": "secret_age",
                    "name": secret["metadata"]["name"],
                    "age_days": age_days,
                    "status": status
                })

        except Exception as e:
            security_checks.append({
                "type": "secret_check_error",
                "error": str(e)
            })

        # ネットワークポリシーの確認
        try:
            result = subprocess.run(
                ["kubectl", "get", "networkpolicies", "-n", "ai-agents"],
                capture_output=True,
                text=True,
                check=True
            )

            if "No resources found" in result.stdout:
                security_checks.append({
                    "type": "network_policy",
                    "status": "warning",
                    "message": "No network policies found"
                })
            else:
                security_checks.append({
                    "type": "network_policy",
                    "status": "healthy",
                    "message": "Network policies configured"
                })

        except Exception as e:
            security_checks.append({
                "type": "network_policy_error",
                "error": str(e)
            })

        overall_status = "healthy"
        if any(check.get("status") == "warning" for check in security_checks):
            overall_status = "warning"
        if any(check.get("status") == "critical" for check in security_checks):
            overall_status = "critical"

        return {
            "status": overall_status,
            "checks": security_checks
        }

    async def _check_costs(self) -> Dict[str, Any]:
        """コストチェック"""

        # コスト予算との比較(簡略化)
        daily_budget = self.config.get("daily_budget", 500.0)
        current_spend = await self._get_current_daily_spend()

        budget_utilization = current_spend / daily_budget

        status = "healthy"
        if budget_utilization > 0.8:
            status = "warning"
        if budget_utilization > 1.0:
            status = "critical"

        return {
            "status": status,
            "daily_budget": daily_budget,
            "current_spend": current_spend,
            "budget_utilization": budget_utilization,
            "projected_monthly": current_spend * 30
        }

    async def _get_current_daily_spend(self) -> float:
        """現在の日次支出を取得"""
        # 実際の実装では請求APIやメトリクスから取得
        return 423.50  # ダミー値

    def _calculate_overall_health(self, results: Dict[str, Any]) -> str:
        """総合ヘルススコアの計算"""

        statuses = []
        for check_result in results.values():
            if isinstance(check_result, dict) and "status" in check_result:
                statuses.append(check_result["status"])

        if "error" in statuses or "critical" in statuses:
            return "critical"
        elif "unhealthy" in statuses:
            return "unhealthy"
        elif "warning" in statuses or "degraded" in statuses:
            return "warning"
        else:
            return "healthy"

    def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]:
        """推奨アクションの生成"""

        recommendations = []

        # パフォーマンス関連
        perf_result = results.get("performance", {})
        if perf_result.get("status") == "degraded":
            recommendations.append("パフォーマンス改善のためのスケールアップを検討してください")

        # セキュリティ関連
        security_result = results.get("security", {})
        if security_result.get("status") in ["warning", "critical"]:
            recommendations.append("シークレットのローテーションとセキュリティ設定の確認を行ってください")

        # コスト関連
        cost_result = results.get("costs", {})
        if cost_result.get("status") in ["warning", "critical"]:
            recommendations.append("コスト最適化のためのモデル選択やレート制限の見直しを行ってください")

        return recommendations

# 使用例とコマンドラインツール
async def main():
    """診断ツールのメイン実行"""

    config = {
        "agent_manager_url": "http://agent-manager.ai-agents.svc.cluster.local:8080",
        "redis_url": "redis://redis.ai-agents.svc.cluster.local:6379",
        "prometheus_url": "http://prometheus.monitoring.svc.cluster.local:9090",
        "daily_budget": 500.0
    }

    diagnostic = DiagnosticTools(config)
    results = await diagnostic.run_full_diagnostic()

    print(json.dumps(results, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

まとめ

本記事では、2025年8月最新のAIエージェント開発ツールの本番環境での運用に必要な包括的なアーキテクチャとベストプラクティスを詳しく解説しました。

重要な実装ポイント

  • エンタープライズアーキテクチャ: スケーラブルで信頼性の高いシステム設計
  • ゼロトラストセキュリティ: 多層防御による強固なセキュリティ確保
  • 包括的監視: Observabilityによる運用状況の完全な可視化
  • コスト最適化: 自動化されたコスト管理と予算制御

段階的導入計画

  1. Phase 1: 基本アーキテクチャの構築とセキュリティ設定
  2. Phase 2: 監視システムの導入と自動化の実装
  3. Phase 3: 本格運用とコスト最適化の継続改善

運用成功の鍵

  • 継続的監視: システム状態の常時監視と迅速な対応
  • 自動化重視: 人的ミスを排除する包括的な自動化
  • セキュリティファースト: セキュリティを最優先とした設計
  • コスト意識: 常にコスト効率を意識した運用

AIエージェントの本番運用は、適切なアーキテクチャと運用プロセスにより、確実に企業価値を向上させることができます。

関連記事