Skip to content

Complete AI Agent Production Operations Guide - August 2025 Enterprise Architecture Implementation

BadgeBadgeBadgeBadge

Introduction

Following the AI Agent Development Theory Article and Implementation Hands-on Guide, this third installment provides a practical guide specifically focused on safe and efficient AI agent operations in production environments.

We will provide detailed explanations of the actual challenges faced in enterprise environment operations and proven architecture patterns to solve them.

Key Points

  • Enterprise Ready

    Building secure AI agent operation architecture for large-scale organizations

  • Auto Scaling

    Dynamic resource management based on load and automated cost optimization

  • Enhanced Security

    Secure AI agent operations based on zero-trust principles

  • Comprehensive Monitoring

    360-degree observability system for monitoring and analyzing AI agent behavior

Enterprise Architecture Design

1. Overall Architecture Overview

graph TB
    subgraph "API Gateway Layer"
        AGW[API Gateway] --> LB[Load Balancer]
    end

    subgraph "AI Agent Management Layer"
        LB --> AM[Agent Manager]
        AM --> AR[Agent Registry]
        AM --> AS[Agent Scheduler]
    end

    subgraph "Execution Layer"
        AS --> AE1[Agent Executor 1]
        AS --> AE2[Agent Executor 2]
        AS --> AE3[Agent Executor N]
    end

    subgraph "Data Layer"
        AE1 --> MCP[MCP Servers]
        AE2 --> DB[(Database)]
        AE3 --> FS[(File System)]
    end

    subgraph "Observability Layer"
        AM --> MON[Monitoring]
        AE1 --> LOG[Logging]
        AE2 --> TRC[Tracing]
        AE3 --> MET[Metrics]
    end

    subgraph "Security Layer"
        AGW --> IAM[Identity & Access]
        AM --> VLT[Secrets Vault]
        AE1 --> NET[Network Security]
    end

2. Agent Manager Implementation

# infrastructure/agent_manager.py
import asyncio
import json
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis
import kubernetes
from prometheus_client import Counter, Histogram, Gauge

class AgentStatus(Enum):
    IDLE = "idle"
    BUSY = "busy"
    FAILED = "failed"
    MAINTENANCE = "maintenance"

@dataclass
class AgentInstance:
    id: str
    type: str
    status: AgentStatus
    capacity: int
    current_load: int
    created_at: datetime
    last_heartbeat: datetime
    metadata: Dict[str, Any]

class EnterpriseAgentManager:
    """Enterprise AI Agent Manager"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.redis_client = redis.Redis.from_url(config['redis_url'])
        self.k8s_client = kubernetes.client.ApiClient()

        # Prometheus metrics
        self.agent_counter = Counter('ai_agents_total', 'Total AI agents', ['type', 'status'])
        self.task_duration = Histogram('ai_task_duration_seconds', 'Task execution time')
        self.agent_load = Gauge('ai_agent_load', 'Current agent load', ['agent_id'])

        # Agent registry
        self.agents: Dict[str, AgentInstance] = {}
        self.task_queue = asyncio.Queue()

    async def start(self):
        """Start the manager"""
        print("Starting Enterprise Agent Manager...")

        # Start background tasks
        tasks = [
            asyncio.create_task(self._health_monitor()),
            asyncio.create_task(self._task_scheduler()),
            asyncio.create_task(self._auto_scaler()),
            asyncio.create_task(self._metrics_collector())
        ]

        await asyncio.gather(*tasks)

    async def register_agent(self, agent_config: Dict[str, Any]) -> str:
        """Register an agent"""
        agent_id = str(uuid.uuid4())

        agent = AgentInstance(
            id=agent_id,
            type=agent_config['type'],
            status=AgentStatus.IDLE,
            capacity=agent_config.get('capacity', 10),
            current_load=0,
            created_at=datetime.now(),
            last_heartbeat=datetime.now(),
            metadata=agent_config.get('metadata', {})
        )

        self.agents[agent_id] = agent

        # Persist to Redis
        await self._persist_agent(agent)

        # Deploy agent pod to Kubernetes
        await self._deploy_agent_pod(agent)

        # Update metrics
        self.agent_counter.labels(
            type=agent.type,
            status=agent.status.value
        ).inc()

        print(f"Agent registered: {agent_id} (type: {agent.type})")
        return agent_id

    async def assign_task(
        self,
        task: Dict[str, Any],
        agent_type: Optional[str] = None
    ) -> Dict[str, Any]:
        """Assign a task"""

        # Select optimal agent
        agent = await self._select_best_agent(agent_type, task)

        if not agent:
            # Auto-scale if no agents available
            agent = await self._auto_scale_agent(agent_type)
            if not agent:
                return {
                    "success": False,
                    "error": "No agents available",
                    "retry_after": 30
                }

        # Execute task
        task_id = str(uuid.uuid4())
        execution_result = await self._execute_task(agent, task_id, task)

        return {
            "success": True,
            "task_id": task_id,
            "agent_id": agent.id,
            "result": execution_result
        }

    async def _select_best_agent(
        self,
        agent_type: Optional[str],
        task: Dict[str, Any]
    ) -> Optional[AgentInstance]:
        """Select the best agent"""

        available_agents = [
            agent for agent in self.agents.values()
            if (agent.status == AgentStatus.IDLE or
                (agent.status == AgentStatus.BUSY and
                 agent.current_load < agent.capacity))
            and (not agent_type or agent.type == agent_type)
        ]

        if not available_agents:
            return None

        # Select optimal agent based on load
        return min(
            available_agents,
            key=lambda a: a.current_load / a.capacity
        )

    async def _execute_task(
        self,
        agent: AgentInstance,
        task_id: str,
        task: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Execute a task"""

        start_time = datetime.now()

        try:
            # Update agent load
            agent.current_load += 1
            agent.status = AgentStatus.BUSY if agent.current_load >= agent.capacity else AgentStatus.IDLE

            # Update metrics
            self.agent_load.labels(agent_id=agent.id).set(agent.current_load)

            # Execute actual task (call agent pod API)
            result = await self._call_agent_api(agent, task)

            # Record execution time
            duration = (datetime.now() - start_time).total_seconds()
            self.task_duration.observe(duration)

            return {
                "status": "completed",
                "result": result,
                "duration": duration
            }

        except Exception as e:
            print(f"Task execution failed: {e}")
            agent.status = AgentStatus.FAILED

            return {
                "status": "failed",
                "error": str(e),
                "duration": (datetime.now() - start_time).total_seconds()
            }

        finally:
            # Reduce load
            agent.current_load = max(0, agent.current_load - 1)
            if agent.current_load == 0:
                agent.status = AgentStatus.IDLE

            self.agent_load.labels(agent_id=agent.id).set(agent.current_load)

    async def _call_agent_api(
        self,
        agent: AgentInstance,
        task: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Call agent API"""
        import aiohttp

        agent_url = f"http://agent-{agent.id}:8080/execute"

        async with aiohttp.ClientSession() as session:
            async with session.post(
                agent_url,
                json=task,
                timeout=aiohttp.ClientTimeout(total=300)  # 5 minute timeout
            ) as response:
                if response.status == 200:
                    return await response.json()
                else:
                    raise Exception(f"Agent API error: {response.status}")

    async def _health_monitor(self):
        """Agent health monitoring"""
        while True:
            try:
                current_time = datetime.now()

                for agent in list(self.agents.values()):
                    # Heartbeat check
                    if (current_time - agent.last_heartbeat).total_seconds() > 60:
                        print(f"Agent {agent.id} is unhealthy, removing...")
                        await self._remove_agent(agent.id)

                await asyncio.sleep(30)  # Check every 30 seconds

            except Exception as e:
                print(f"Health monitor error: {e}")
                await asyncio.sleep(60)

    async def _auto_scaler(self):
        """Auto scaling"""
        while True:
            try:
                # Analyze load status for each agent type
                agent_types = set(agent.type for agent in self.agents.values())

                for agent_type in agent_types:
                    agents_of_type = [
                        agent for agent in self.agents.values()
                        if agent.type == agent_type
                    ]

                    if not agents_of_type:
                        continue

                    # Calculate average load
                    avg_load = sum(
                        agent.current_load / agent.capacity
                        for agent in agents_of_type
                    ) / len(agents_of_type)

                    # Scaling decision
                    if avg_load > 0.8:  # 80%+ load
                        print(f"Scaling up {agent_type} agents (load: {avg_load:.2f})")
                        await self._scale_up(agent_type)
                    elif avg_load < 0.2 and len(agents_of_type) > 1:  # <20% load
                        print(f"Scaling down {agent_type} agents (load: {avg_load:.2f})")
                        await self._scale_down(agent_type)

                await asyncio.sleep(60)  # Check every minute

            except Exception as e:
                print(f"Auto scaler error: {e}")
                await asyncio.sleep(120)

    async def _scale_up(self, agent_type: str):
        """Scale up"""
        agent_config = {
            "type": agent_type,
            "capacity": 10,
            "metadata": {"auto_scaled": True}
        }

        await self.register_agent(agent_config)

    async def _scale_down(self, agent_type: str):
        """Scale down"""
        agents_of_type = [
            agent for agent in self.agents.values()
            if agent.type == agent_type and agent.current_load == 0
        ]

        if agents_of_type:
            # Remove oldest idle agent
            oldest_agent = min(agents_of_type, key=lambda a: a.created_at)
            await self._remove_agent(oldest_agent.id)

    async def _deploy_agent_pod(self, agent: AgentInstance):
        """Deploy Kubernetes agent pod"""

        pod_spec = {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {
                "name": f"agent-{agent.id}",
                "labels": {
                    "app": "ai-agent",
                    "agent-type": agent.type,
                    "agent-id": agent.id
                }
            },
            "spec": {
                "containers": [{
                    "name": "agent",
                    "image": f"ai-agent-{agent.type}:latest",
                    "ports": [{"containerPort": 8080}],
                    "env": [
                        {"name": "AGENT_ID", "value": agent.id},
                        {"name": "AGENT_TYPE", "value": agent.type},
                        {"name": "MANAGER_URL", "value": self.config['manager_url']}
                    ],
                    "resources": {
                        "requests": {"cpu": "100m", "memory": "256Mi"},
                        "limits": {"cpu": "500m", "memory": "1Gi"}
                    },
                    "livenessProbe": {
                        "httpGet": {"path": "/health", "port": 8080},
                        "initialDelaySeconds": 30,
                        "periodSeconds": 10
                    }
                }],
                "restartPolicy": "Always"
            }
        }

        # Create pod using Kubernetes API
        v1 = kubernetes.client.CoreV1Api(self.k8s_client)
        try:
            v1.create_namespaced_pod(
                namespace="ai-agents",
                body=pod_spec
            )
            print(f"Pod deployed for agent {agent.id}")
        except Exception as e:
            print(f"Failed to deploy pod for agent {agent.id}: {e}")
            raise

    async def _persist_agent(self, agent: AgentInstance):
        """Persist agent information"""
        agent_data = asdict(agent)
        agent_data['created_at'] = agent.created_at.isoformat()
        agent_data['last_heartbeat'] = agent.last_heartbeat.isoformat()
        agent_data['status'] = agent.status.value

        self.redis_client.hset(
            "agents",
            agent.id,
            json.dumps(agent_data)
        )

    async def _remove_agent(self, agent_id: str):
        """Remove agent"""
        if agent_id in self.agents:
            agent = self.agents[agent_id]

            # Delete Kubernetes pod
            v1 = kubernetes.client.CoreV1Api(self.k8s_client)
            try:
                v1.delete_namespaced_pod(
                    name=f"agent-{agent_id}",
                    namespace="ai-agents"
                )
            except Exception as e:
                print(f"Failed to delete pod for agent {agent_id}: {e}")

            # Remove from memory and Redis
            del self.agents[agent_id]
            self.redis_client.hdel("agents", agent_id)

            print(f"Agent removed: {agent_id}")

    async def _metrics_collector(self):
        """Metrics collection"""
        while True:
            try:
                # Update agent status metrics
                for agent in self.agents.values():
                    self.agent_counter.labels(
                        type=agent.type,
                        status=agent.status.value
                    ).inc()

                await asyncio.sleep(15)  # Update metrics every 15 seconds

            except Exception as e:
                print(f"Metrics collector error: {e}")
                await asyncio.sleep(60)

# Usage example
async def start_enterprise_manager():
    """Start enterprise manager"""
    config = {
        "redis_url": "redis://redis-cluster:6379",
        "manager_url": "http://agent-manager:8080",
        "kubernetes_namespace": "ai-agents"
    }

    manager = EnterpriseAgentManager(config)
    await manager.start()

if __name__ == "__main__":
    asyncio.run(start_enterprise_manager())

Security Architecture

1. Zero Trust Security Model

# security/zero-trust-policy.yml
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: ai-agent-zero-trust
  namespace: ai-agents
spec:
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/ai-agents/sa/agent-executor"]
    to:
    - operation:
        methods: ["POST"]
        paths: ["/api/v1/tasks"]
    when:
    - key: custom.agent_authenticated
      values: ["true"]
    - key: custom.task_validated
      values: ["true"]

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: agent-executor
  namespace: ai-agents
  annotations:
    iam.gke.io/gcp-service-account: ai-agent-sa@project.iam.gserviceaccount.com

---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: ai-agent-tls
  namespace: ai-agents
spec:
  host: "*.ai-agents.svc.cluster.local"
  trafficPolicy:
    tls:
      mode: ISTIO_MUTUAL

2. Secrets Management System

# security/secrets_manager.py
import base64
import json
from typing import Dict, Any, Optional
from cryptography.fernet import Fernet
from kubernetes import client, config
import hvac  # HashiCorp Vault client

class EnterpriseSecretsManager:
    """Enterprise secrets management"""

    def __init__(self, vault_url: str, vault_token: str):
        self.vault_client = hvac.Client(url=vault_url, token=vault_token)
        self.k8s_client = client.CoreV1Api()
        self.encryption_key = self._get_or_create_encryption_key()
        self.cipher = Fernet(self.encryption_key)

    async def store_agent_secrets(
        self,
        agent_id: str,
        secrets: Dict[str, str]
    ) -> bool:
        """Store agent secrets"""

        try:
            # Encrypt and store in Vault
            encrypted_secrets = {
                key: self.cipher.encrypt(value.encode()).decode()
                for key, value in secrets.items()
            }

            vault_path = f"agents/{agent_id}"
            self.vault_client.secrets.kv.v2.create_or_update_secret(
                path=vault_path,
                secret=encrypted_secrets
            )

            # Also store as Kubernetes secret (for fast access)
            k8s_secret_data = {
                key: base64.b64encode(value.encode()).decode()
                for key, value in secrets.items()
            }

            secret_metadata = client.V1ObjectMeta(
                name=f"agent-secrets-{agent_id}",
                namespace="ai-agents",
                labels={"agent-id": agent_id}
            )

            secret = client.V1Secret(
                metadata=secret_metadata,
                data=k8s_secret_data,
                type="Opaque"
            )

            self.k8s_client.create_namespaced_secret(
                namespace="ai-agents",
                body=secret
            )

            return True

        except Exception as e:
            print(f"Failed to store secrets for agent {agent_id}: {e}")
            return False

    async def get_agent_secrets(self, agent_id: str) -> Dict[str, str]:
        """Get agent secrets"""

        try:
            # Try Kubernetes first (fast)
            try:
                secret = self.k8s_client.read_namespaced_secret(
                    name=f"agent-secrets-{agent_id}",
                    namespace="ai-agents"
                )

                return {
                    key: base64.b64decode(value).decode()
                    for key, value in secret.data.items()
                }

            except client.exceptions.ApiException:
                # Fallback to Vault if not in K8s
                vault_path = f"agents/{agent_id}"
                response = self.vault_client.secrets.kv.v2.read_secret_version(
                    path=vault_path
                )

                encrypted_secrets = response['data']['data']

                return {
                    key: self.cipher.decrypt(value.encode()).decode()
                    for key, value in encrypted_secrets.items()
                }

        except Exception as e:
            print(f"Failed to get secrets for agent {agent_id}: {e}")
            return {}

    async def rotate_secrets(self, agent_id: str) -> bool:
        """Rotate secrets"""

        try:
            # Generate new API keys
            new_secrets = await self._generate_new_secrets(agent_id)

            # Store new secrets
            await self.store_agent_secrets(agent_id, new_secrets)

            # Notify agent of new secrets
            await self._notify_agent_secret_rotation(agent_id)

            print(f"Secrets rotated for agent {agent_id}")
            return True

        except Exception as e:
            print(f"Failed to rotate secrets for agent {agent_id}: {e}")
            return False

    async def audit_secret_access(self, agent_id: str, secret_key: str):
        """Audit secret access"""

        audit_entry = {
            "timestamp": datetime.now().isoformat(),
            "agent_id": agent_id,
            "secret_key": secret_key,
            "action": "access",
            "source_ip": self._get_client_ip()
        }

        # Send audit log to secure logging system
        await self._send_audit_log(audit_entry)

    def _get_or_create_encryption_key(self) -> bytes:
        """Get or create encryption key"""

        try:
            # Get encryption key from Vault
            response = self.vault_client.secrets.kv.v2.read_secret_version(
                path="encryption/master-key"
            )
            return response['data']['data']['key'].encode()

        except:
            # Generate new key and store in Vault
            new_key = Fernet.generate_key()

            self.vault_client.secrets.kv.v2.create_or_update_secret(
                path="encryption/master-key",
                secret={"key": new_key.decode()}
            )

            return new_key

    async def _generate_new_secrets(self, agent_id: str) -> Dict[str, str]:
        """Generate new secrets"""
        import secrets
        import string

        # Generate strong random password
        alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
        new_api_key = ''.join(secrets.choice(alphabet) for _ in range(32))

        return {
            "api_key": new_api_key,
            "agent_token": str(uuid.uuid4()),
            "generated_at": datetime.now().isoformat()
        }

Monitoring & Observability System

1. Comprehensive Monitoring Architecture

# monitoring/observability_stack.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
from dataclasses import dataclass
import aiohttp
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

@dataclass
class AlertRule:
    name: str
    condition: str
    threshold: float
    duration: int  # seconds
    severity: str
    action: str

class AIAgentObservability:
    """AI Agent dedicated observability system"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config

        # Prometheus registry
        self.registry = CollectorRegistry()

        # Metrics definitions
        self.agent_requests = Counter(
            'ai_agent_requests_total',
            'Total agent requests',
            ['agent_type', 'status', 'endpoint'],
            registry=self.registry
        )

        self.agent_response_time = Histogram(
            'ai_agent_response_time_seconds',
            'Agent response time',
            ['agent_type', 'complexity'],
            registry=self.registry
        )

        self.agent_success_rate = Gauge(
            'ai_agent_success_rate',
            'Agent success rate',
            ['agent_type'],
            registry=self.registry
        )

        self.token_usage = Counter(
            'ai_agent_token_usage_total',
            'Total token usage',
            ['agent_type', 'model'],
            registry=self.registry
        )

        # Tracing setup
        trace.set_tracer_provider(TracerProvider())
        tracer = trace.get_tracer(__name__)

        jaeger_exporter = JaegerExporter(
            agent_host_name=config.get('jaeger_host', 'localhost'),
            agent_port=config.get('jaeger_port', 14268),
        )

        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)

        # Alert rules
        self.alert_rules = self._load_alert_rules()
        self.active_alerts = {}

    def _load_alert_rules(self) -> List[AlertRule]:
        """Load alert rules"""
        return [
            AlertRule(
                name="high_error_rate",
                condition="agent_success_rate < 0.9",
                threshold=0.9,
                duration=300,  # 5 minutes
                severity="warning",
                action="scale_up"
            ),
            AlertRule(
                name="critical_error_rate",
                condition="agent_success_rate < 0.8",
                threshold=0.8,
                duration=180,  # 3 minutes
                severity="critical",
                action="immediate_intervention"
            ),
            AlertRule(
                name="high_response_time",
                condition="agent_response_time > 30.0",
                threshold=30.0,
                duration=600,  # 10 minutes
                severity="warning",
                action="performance_investigation"
            ),
            AlertRule(
                name="token_budget_exceeded",
                condition="hourly_token_usage > 1000000",
                threshold=1000000,
                duration=60,
                severity="critical",
                action="rate_limiting"
            )
        ]

    async def start_monitoring(self):
        """Start monitoring system"""
        print("Starting AI Agent Observability System...")

        tasks = [
            asyncio.create_task(self._metrics_collector()),
            asyncio.create_task(self._alert_manager()),
            asyncio.create_task(self._health_checker()),
            asyncio.create_task(self._cost_tracker()),
            asyncio.create_task(self._performance_analyzer())
        ]

        await asyncio.gather(*tasks)

    async def record_agent_request(
        self,
        agent_type: str,
        endpoint: str,
        status: str,
        response_time: float,
        token_count: int = 0,
        model: str = "unknown"
    ):
        """Record agent request"""

        # Update metrics
        self.agent_requests.labels(
            agent_type=agent_type,
            status=status,
            endpoint=endpoint
        ).inc()

        self.agent_response_time.labels(
            agent_type=agent_type,
            complexity="standard"  # Actually determined dynamically
        ).observe(response_time)

        if token_count > 0:
            self.token_usage.labels(
                agent_type=agent_type,
                model=model
            ).inc(token_count)

        # Tracing
        with trace.get_tracer(__name__).start_as_current_span("agent_request") as span:
            span.set_attribute("agent.type", agent_type)
            span.set_attribute("agent.endpoint", endpoint)
            span.set_attribute("agent.status", status)
            span.set_attribute("agent.response_time", response_time)
            span.set_attribute("agent.token_count", token_count)

    async def _metrics_collector(self):
        """Metrics collection"""
        while True:
            try:
                # Calculate success rates
                success_rates = await self._calculate_success_rates()
                for agent_type, rate in success_rates.items():
                    self.agent_success_rate.labels(agent_type=agent_type).set(rate)

                await asyncio.sleep(30)  # Every 30 seconds

            except Exception as e:
                print(f"Metrics collector error: {e}")
                await asyncio.sleep(60)

    async def _calculate_success_rates(self) -> Dict[str, float]:
        """Calculate agent success rates"""

        # Get metrics from Prometheus and calculate success rates
        # Simplified implementation returns dummy data
        return {
            "code_generator": 0.95,
            "security_scanner": 0.98,
            "performance_tester": 0.92
        }

    async def _alert_manager(self):
        """Alert management"""
        while True:
            try:
                for rule in self.alert_rules:
                    await self._evaluate_alert_rule(rule)

                await asyncio.sleep(60)  # Evaluate alerts every minute

            except Exception as e:
                print(f"Alert manager error: {e}")
                await asyncio.sleep(120)

    async def _evaluate_alert_rule(self, rule: AlertRule):
        """Evaluate alert rule"""

        current_value = await self._get_metric_value(rule.condition)

        if current_value is None:
            return

        # Check if condition is met
        if self._condition_met(rule, current_value):
            if rule.name not in self.active_alerts:
                # New alert
                self.active_alerts[rule.name] = {
                    "start_time": datetime.now(),
                    "rule": rule,
                    "current_value": current_value
                }
                print(f"Alert triggered: {rule.name} (value: {current_value})")

            else:
                # Check existing alert duration
                alert = self.active_alerts[rule.name]
                duration = (datetime.now() - alert["start_time"]).total_seconds()

                if duration >= rule.duration:
                    # Alert condition exceeded duration
                    await self._fire_alert(rule, current_value, duration)

        else:
            # Condition resolved
            if rule.name in self.active_alerts:
                del self.active_alerts[rule.name]
                print(f"Alert resolved: {rule.name}")

    def _condition_met(self, rule: AlertRule, value: float) -> bool:
        """Check alert condition"""
        if ">" in rule.condition:
            return value > rule.threshold
        elif "<" in rule.condition:
            return value < rule.threshold
        return False

    async def _fire_alert(self, rule: AlertRule, value: float, duration: float):
        """Fire alert"""

        alert_data = {
            "alert_name": rule.name,
            "severity": rule.severity,
            "current_value": value,
            "threshold": rule.threshold,
            "duration": duration,
            "timestamp": datetime.now().isoformat(),
            "action": rule.action
        }

        # Send notifications
        await self._send_alert_notification(alert_data)

        # Execute automated response actions
        await self._execute_alert_action(rule.action, alert_data)

    async def _send_alert_notification(self, alert_data: Dict[str, Any]):
        """Send alert notification"""

        # Slack notification
        if self.config.get('slack_webhook'):
            await self._send_slack_alert(alert_data)

        # Email notification
        if self.config.get('email_enabled'):
            await self._send_email_alert(alert_data)

        # PagerDuty notification (for critical alerts)
        if alert_data['severity'] == 'critical':
            await self._send_pagerduty_alert(alert_data)

    async def _send_slack_alert(self, alert_data: Dict[str, Any]):
        """Slack notification"""

        color = "warning" if alert_data['severity'] == "warning" else "danger"

        message = {
            "attachments": [{
                "color": color,
                "title": f"🤖 AI Agent Alert: {alert_data['alert_name']}",
                "fields": [
                    {"title": "Severity", "value": alert_data['severity'], "short": True},
                    {"title": "Current Value", "value": str(alert_data['current_value']), "short": True},
                    {"title": "Threshold", "value": str(alert_data['threshold']), "short": True},
                    {"title": "Duration", "value": f"{alert_data['duration']:.0f}s", "short": True}
                ],
                "footer": "AI Agent Monitoring",
                "ts": int(datetime.now().timestamp())
            }]
        }

        async with aiohttp.ClientSession() as session:
            await session.post(
                self.config['slack_webhook'],
                json=message
            )

    async def _execute_alert_action(self, action: str, alert_data: Dict[str, Any]):
        """Execute alert action"""

        actions = {
            "scale_up": self._action_scale_up,
            "immediate_intervention": self._action_immediate_intervention,
            "performance_investigation": self._action_performance_investigation,
            "rate_limiting": self._action_rate_limiting
        }

        if action in actions:
            try:
                await actions[action](alert_data)
                print(f"Alert action executed: {action}")
            except Exception as e:
                print(f"Failed to execute alert action {action}: {e}")

    async def _action_scale_up(self, alert_data: Dict[str, Any]):
        """Scale up action"""
        # Send auto-scaling instruction to Agent Manager
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.config['agent_manager_url']}/scale-up",
                json={"reason": "high_error_rate_alert"}
            )

    async def _action_immediate_intervention(self, alert_data: Dict[str, Any]):
        """Immediate intervention action"""
        # Immediately notify operations team
        # Implement temporary load limiting
        async with aiohttp.ClientSession() as session:
            await session.post(
                f"{self.config['agent_manager_url']}/emergency-mode",
                json={"alert_data": alert_data}
            )

    async def _cost_tracker(self):
        """Cost tracking"""
        while True:
            try:
                # Calculate costs from token usage
                hourly_costs = await self._calculate_hourly_costs()

                # Monitor cost budget
                await self._monitor_cost_budget(hourly_costs)

                await asyncio.sleep(3600)  # Every hour

            except Exception as e:
                print(f"Cost tracker error: {e}")
                await asyncio.sleep(3600)

    async def _calculate_hourly_costs(self) -> Dict[str, float]:
        """Calculate hourly costs"""

        # Model pricing table
        pricing = {
            "gpt-5": {"input": 0.002, "output": 0.006},  # per 1k tokens
            "claude-opus-4.1": {"input": 0.015, "output": 0.075},
            "gemini-2.0-flash": {"input": 0.001, "output": 0.002}
        }

        costs = {}

        # Get token usage for each model and calculate costs
        # Implementation simplified

        return costs

# Configuration and deployment Kubernetes manifest
observability_config = """
apiVersion: v1
kind: ConfigMap
metadata:
  name: observability-config
  namespace: ai-agents
data:
  config.yaml: |
    jaeger_host: jaeger-collector.monitoring.svc.cluster.local
    jaeger_port: 14268
    prometheus_url: http://prometheus.monitoring.svc.cluster.local:9090
    slack_webhook: "${{ secrets.SLACK_WEBHOOK_URL }}"
    agent_manager_url: http://agent-manager.ai-agents.svc.cluster.local:8080
    cost_budget:
      daily_limit: 1000.0
      monthly_limit: 25000.0
    alert_channels:
      - slack
      - email
      - pagerduty
"""

# Usage example
async def start_observability_system():
    """Start observability system"""
    config = {
        "jaeger_host": "jaeger-collector",
        "jaeger_port": 14268,
        "slack_webhook": "https://hooks.slack.com/services/...",
        "agent_manager_url": "http://agent-manager:8080"
    }

    observability = AIAgentObservability(config)
    await observability.start_monitoring()

if __name__ == "__main__":
    asyncio.run(start_observability_system())

Performance Optimization and Scaling

1. Dynamic Load Balancing System

# performance/load_balancer.py
import asyncio
import random
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import aiohttp
import time

class LoadBalancingStrategy(Enum):
    ROUND_ROBIN = "round_robin"
    LEAST_CONNECTIONS = "least_connections"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    AI_OPTIMIZED = "ai_optimized"

@dataclass
class AgentEndpoint:
    id: str
    url: str
    weight: int
    current_connections: int
    average_response_time: float
    success_rate: float
    capacity: int
    agent_type: str

class IntelligentLoadBalancer:
    """AI-optimized load balancer"""

    def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.AI_OPTIMIZED):
        self.strategy = strategy
        self.endpoints: Dict[str, AgentEndpoint] = {}
        self.request_history = []
        self.round_robin_counter = 0

    def register_endpoint(self, endpoint: AgentEndpoint):
        """Register endpoint"""
        self.endpoints[endpoint.id] = endpoint
        print(f"Endpoint registered: {endpoint.id} ({endpoint.agent_type})")

    async def select_endpoint(
        self,
        request_type: str,
        estimated_complexity: float = 1.0
    ) -> Optional[AgentEndpoint]:
        """Select optimal endpoint"""

        available_endpoints = [
            ep for ep in self.endpoints.values()
            if ep.current_connections < ep.capacity
        ]

        if not available_endpoints:
            return None

        if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
            return self._round_robin_select(available_endpoints)

        elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
            return self._least_connections_select(available_endpoints)

        elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
            return self._weighted_round_robin_select(available_endpoints)

        elif self.strategy == LoadBalancingStrategy.AI_OPTIMIZED:
            return await self._ai_optimized_select(
                available_endpoints,
                request_type,
                estimated_complexity
            )

    def _round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
        """Round robin selection"""
        selected = endpoints[self.round_robin_counter % len(endpoints)]
        self.round_robin_counter += 1
        return selected

    def _least_connections_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
        """Least connections selection"""
        return min(endpoints, key=lambda ep: ep.current_connections)

    def _weighted_round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
        """Weighted round robin selection"""
        weighted_endpoints = []
        for ep in endpoints:
            weighted_endpoints.extend([ep] * ep.weight)

        if weighted_endpoints:
            return random.choice(weighted_endpoints)
        return endpoints[0]

    async def _ai_optimized_select(
        self,
        endpoints: List[AgentEndpoint],
        request_type: str,
        estimated_complexity: float
    ) -> AgentEndpoint:
        """AI-optimized selection"""

        # Calculate score for each endpoint
        scores = []

        for ep in endpoints:
            # Base score (success rate and response time)
            success_score = ep.success_rate
            response_time_score = 1.0 / (1.0 + ep.average_response_time)

            # Load score
            load_score = 1.0 - (ep.current_connections / ep.capacity)

            # Compatibility score (request type and agent type compatibility)
            compatibility_score = await self._calculate_compatibility(
                request_type,
                ep.agent_type
            )

            # Complexity fitness score
            complexity_score = await self._calculate_complexity_fitness(
                estimated_complexity,
                ep
            )

            # Total score
            total_score = (
                success_score * 0.3 +
                response_time_score * 0.25 +
                load_score * 0.2 +
                compatibility_score * 0.15 +
                complexity_score * 0.1
            )

            scores.append((ep, total_score))

        # Select highest scoring endpoint
        return max(scores, key=lambda x: x[1])[0]

    async def _calculate_compatibility(self, request_type: str, agent_type: str) -> float:
        """Calculate request and agent compatibility"""

        compatibility_matrix = {
            "code_generation": {
                "code_generator": 1.0,
                "security_scanner": 0.3,
                "performance_tester": 0.2
            },
            "security_analysis": {
                "security_scanner": 1.0,
                "code_generator": 0.4,
                "performance_tester": 0.1
            },
            "performance_test": {
                "performance_tester": 1.0,
                "code_generator": 0.3,
                "security_scanner": 0.2
            }
        }

        return compatibility_matrix.get(request_type, {}).get(agent_type, 0.5)

    async def _calculate_complexity_fitness(
        self,
        estimated_complexity: float,
        endpoint: AgentEndpoint
    ) -> float:
        """Calculate complexity fitness"""

        # Fitness between endpoint processing capability and estimated complexity
        if estimated_complexity <= 0.3:  # Simple tasks
            return 1.0  # Any endpoint can handle

        elif estimated_complexity <= 0.7:  # Medium tasks
            # Average capability endpoints are optimal
            return 1.0 - abs(endpoint.weight - 5) / 5.0

        else:  # Complex tasks
            # High-performance endpoints needed
            return endpoint.weight / 10.0

    async def execute_request(
        self,
        request_data: Dict[str, Any],
        request_type: str,
        estimated_complexity: float = 1.0
    ) -> Dict[str, Any]:
        """Execute request"""

        endpoint = await self.select_endpoint(request_type, estimated_complexity)

        if not endpoint:
            return {
                "success": False,
                "error": "No available endpoints",
                "retry_after": 30
            }

        # Increase connection count
        endpoint.current_connections += 1
        start_time = time.time()

        try:
            # Execute actual request
            result = await self._execute_on_endpoint(endpoint, request_data)

            execution_time = time.time() - start_time

            # Update statistics
            await self._update_endpoint_stats(endpoint, execution_time, True)

            return {
                "success": True,
                "result": result,
                "endpoint_id": endpoint.id,
                "execution_time": execution_time
            }

        except Exception as e:
            execution_time = time.time() - start_time
            await self._update_endpoint_stats(endpoint, execution_time, False)

            return {
                "success": False,
                "error": str(e),
                "endpoint_id": endpoint.id,
                "execution_time": execution_time
            }

        finally:
            # Decrease connection count
            endpoint.current_connections = max(0, endpoint.current_connections - 1)

    async def _execute_on_endpoint(
        self,
        endpoint: AgentEndpoint,
        request_data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Execute request on endpoint"""

        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{endpoint.url}/execute",
                json=request_data,
                timeout=aiohttp.ClientTimeout(total=300)
            ) as response:

                if response.status == 200:
                    return await response.json()
                else:
                    raise Exception(f"HTTP {response.status}: {await response.text()}")

    async def _update_endpoint_stats(
        self,
        endpoint: AgentEndpoint,
        execution_time: float,
        success: bool
    ):
        """Update endpoint statistics"""

        # Update response time moving average
        alpha = 0.1  # Smoothing factor
        endpoint.average_response_time = (
            alpha * execution_time +
            (1 - alpha) * endpoint.average_response_time
        )

        # Update success rate
        self.request_history.append(success)

        # Calculate success rate for recent 100 requests
        recent_history = self.request_history[-100:]
        endpoint.success_rate = sum(recent_history) / len(recent_history)

    async def health_check(self):
        """Endpoint health check"""

        while True:
            try:
                for endpoint in list(self.endpoints.values()):
                    try:
                        async with aiohttp.ClientSession() as session:
                            async with session.get(
                                f"{endpoint.url}/health",
                                timeout=aiohttp.ClientTimeout(total=10)
                            ) as response:

                                if response.status != 200:
                                    print(f"Endpoint {endpoint.id} health check failed")
                                    # Process to temporarily disable endpoint

                    except Exception as e:
                        print(f"Health check error for {endpoint.id}: {e}")

                await asyncio.sleep(30)  # Health check every 30 seconds

            except Exception as e:
                print(f"Health check loop error: {e}")
                await asyncio.sleep(60)

# Usage example
async def example_load_balancer():
    """Load balancer usage example"""

    balancer = IntelligentLoadBalancer(LoadBalancingStrategy.AI_OPTIMIZED)

    # Register endpoints
    endpoints = [
        AgentEndpoint(
            id="agent-1",
            url="http://agent-1:8080",
            weight=8,
            current_connections=0,
            average_response_time=2.5,
            success_rate=0.95,
            capacity=10,
            agent_type="code_generator"
        ),
        AgentEndpoint(
            id="agent-2",
            url="http://agent-2:8080",
            weight=6,
            current_connections=0,
            average_response_time=3.1,
            success_rate=0.92,
            capacity=8,
            agent_type="security_scanner"
        )
    ]

    for endpoint in endpoints:
        balancer.register_endpoint(endpoint)

    # Start health check
    health_task = asyncio.create_task(balancer.health_check())

    # Request execution example
    request_result = await balancer.execute_request(
        request_data={"task": "generate_function", "language": "python"},
        request_type="code_generation",
        estimated_complexity=0.5
    )

    print(f"Request result: {request_result}")

if __name__ == "__main__":
    asyncio.run(example_load_balancer())

2. Cost Optimization System

# cost/optimization_engine.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass
import numpy as np

@dataclass
class CostMetrics:
    model_name: str
    input_tokens: int
    output_tokens: int
    request_count: int
    total_cost: float
    timestamp: datetime

class CostOptimizationEngine:
    """Cost optimization engine"""

    def __init__(self, budget_config: Dict[str, Any]):
        self.budget_config = budget_config
        self.cost_history: List[CostMetrics] = []

        # Model pricing table (per 1K tokens)
        self.pricing = {
            "gpt-5": {"input": 0.002, "output": 0.006},
            "gpt-4o": {"input": 0.0015, "output": 0.002},
            "claude-opus-4.1": {"input": 0.015, "output": 0.075},
            "claude-sonnet-3.5": {"input": 0.003, "output": 0.015},
            "gemini-2.0-flash": {"input": 0.001, "output": 0.002}
        }

        # Model performance table (relative quality scores)
        self.model_performance = {
            "gpt-5": {"quality": 0.95, "speed": 0.8},
            "gpt-4o": {"quality": 0.9, "speed": 0.9},
            "claude-opus-4.1": {"quality": 0.92, "speed": 0.7},
            "claude-sonnet-3.5": {"quality": 0.88, "speed": 0.85},
            "gemini-2.0-flash": {"quality": 0.85, "speed": 0.95}
        }

    async def optimize_model_selection(
        self,
        task_requirements: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Optimize model selection based on task requirements"""

        complexity = task_requirements.get("complexity", 0.5)
        quality_priority = task_requirements.get("quality_priority", 0.7)
        speed_priority = task_requirements.get("speed_priority", 0.3)
        budget_limit = task_requirements.get("budget_limit")

        model_scores = {}

        for model, performance in self.model_performance.items():
            # Estimate cost calculation
            estimated_tokens = self._estimate_token_usage(task_requirements, model)
            estimated_cost = self._calculate_cost(model, estimated_tokens)

            # Budget constraint check
            if budget_limit and estimated_cost > budget_limit:
                continue

            # Score calculation
            quality_score = performance["quality"]
            speed_score = performance["speed"]
            cost_score = 1.0 / (1.0 + estimated_cost)  # Lower cost = higher score

            # Adjust quality requirements based on complexity
            quality_weight = quality_priority * (0.5 + complexity)

            total_score = (
                quality_score * quality_weight +
                speed_score * speed_priority +
                cost_score * (1.0 - quality_priority - speed_priority)
            )

            model_scores[model] = {
                "score": total_score,
                "estimated_cost": estimated_cost,
                "estimated_tokens": estimated_tokens,
                "quality_score": quality_score,
                "speed_score": speed_score
            }

        if not model_scores:
            return {"error": "No models meet budget constraints"}

        # Select highest scoring model
        best_model = max(model_scores.items(), key=lambda x: x[1]["score"])

        return {
            "recommended_model": best_model[0],
            "reasoning": best_model[1],
            "alternatives": dict(sorted(
                model_scores.items(),
                key=lambda x: x[1]["score"],
                reverse=True
            ))
        }

    def _estimate_token_usage(
        self,
        task_requirements: Dict[str, Any],
        model: str
    ) -> Dict[str, int]:
        """Estimate token usage"""

        complexity = task_requirements.get("complexity", 0.5)
        task_type = task_requirements.get("type", "general")

        # Base tokens by task type
        base_tokens = {
            "code_generation": {"input": 500, "output": 2000},
            "security_analysis": {"input": 1000, "output": 1500},
            "performance_testing": {"input": 800, "output": 1200},
            "general": {"input": 300, "output": 800}
        }

        base = base_tokens.get(task_type, base_tokens["general"])

        # Adjust for complexity
        complexity_multiplier = 0.5 + complexity * 1.5

        # Adjust for model characteristics
        model_efficiency = {
            "gpt-5": 1.0,
            "gpt-4o": 1.1,
            "claude-opus-4.1": 0.9,
            "claude-sonnet-3.5": 1.0,
            "gemini-2.0-flash": 1.2
        }

        efficiency = model_efficiency.get(model, 1.0)

        return {
            "input": int(base["input"] * complexity_multiplier * efficiency),
            "output": int(base["output"] * complexity_multiplier * efficiency)
        }

    def _calculate_cost(self, model: str, token_usage: Dict[str, int]) -> float:
        """Calculate cost"""

        if model not in self.pricing:
            return 0.0

        pricing = self.pricing[model]

        input_cost = (token_usage["input"] / 1000) * pricing["input"]
        output_cost = (token_usage["output"] / 1000) * pricing["output"]

        return input_cost + output_cost

    async def analyze_spending_patterns(self) -> Dict[str, Any]:
        """Analyze spending patterns"""

        if not self.cost_history:
            return {"error": "No cost history available"}

        # Prepare time series data
        daily_costs = {}
        model_costs = {}

        for metric in self.cost_history:
            date_key = metric.timestamp.date().isoformat()

            if date_key not in daily_costs:
                daily_costs[date_key] = 0.0
            daily_costs[date_key] += metric.total_cost

            if metric.model_name not in model_costs:
                model_costs[metric.model_name] = 0.0
            model_costs[metric.model_name] += metric.total_cost

        # Trend analysis
        dates = sorted(daily_costs.keys())
        costs = [daily_costs[date] for date in dates]

        if len(costs) >= 7:
            # Weekly trend
            week_trend = np.polyfit(range(len(costs[-7:])), costs[-7:], 1)[0]
        else:
            week_trend = 0.0

        # Budget burn rate
        total_spent = sum(costs)
        monthly_budget = self.budget_config.get("monthly_limit", 10000)
        burn_rate = total_spent / monthly_budget

        # Generate optimization suggestions
        optimization_suggestions = await self._generate_optimization_suggestions(
            model_costs, total_spent, week_trend
        )

        return {
            "total_spent": total_spent,
            "daily_average": np.mean(costs) if costs else 0.0,
            "weekly_trend": week_trend,
            "budget_burn_rate": burn_rate,
            "model_breakdown": model_costs,
            "optimization_suggestions": optimization_suggestions,
            "projected_monthly_cost": total_spent + (week_trend * 4)
        }

    async def _generate_optimization_suggestions(
        self,
        model_costs: Dict[str, float],
        total_spent: float,
        trend: float
    ) -> List[Dict[str, Any]]:
        """Generate optimization suggestions"""

        suggestions = []

        # High-cost model analysis
        if model_costs:
            most_expensive = max(model_costs.items(), key=lambda x: x[1])

            if most_expensive[1] > total_spent * 0.4:  # 40%+ of total
                suggestions.append({
                    "type": "model_substitution",
                    "priority": "high",
                    "description": f"{most_expensive[0]} usage frequency is too high",
                    "action": "Consider cheaper alternative models",
                    "potential_savings": most_expensive[1] * 0.3
                })

        # Increasing trend warning
        if trend > 0:
            suggestions.append({
                "type": "spending_trend",
                "priority": "medium",
                "description": f"Spending increasing by ${trend:.2f} weekly",
                "action": "Enhance usage monitoring",
                "potential_savings": trend * 4  # Monthly projection
            })

        # Budget optimization
        monthly_budget = self.budget_config.get("monthly_limit", 10000)
        if total_spent > monthly_budget * 0.8:
            suggestions.append({
                "type": "budget_warning",
                "priority": "critical",
                "description": "Exceeded 80% of monthly budget",
                "action": "Emergency cost reduction measures",
                "potential_savings": total_spent - monthly_budget * 0.7
            })

        return suggestions

    async def implement_cost_controls(
        self,
        control_config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Implement cost controls"""

        implemented_controls = []

        # Rate limiting setup
        if control_config.get("enable_rate_limiting"):
            rate_limits = {
                "requests_per_hour": control_config.get("max_requests_per_hour", 1000),
                "tokens_per_hour": control_config.get("max_tokens_per_hour", 100000),
                "cost_per_hour": control_config.get("max_cost_per_hour", 100.0)
            }

            implemented_controls.append({
                "type": "rate_limiting",
                "config": rate_limits
            })

        # Auto model downgrade
        if control_config.get("enable_auto_downgrade"):
            downgrade_rules = {
                "cost_threshold": control_config.get("downgrade_cost_threshold", 0.1),
                "model_mapping": {
                    "gpt-5": "gpt-4o",
                    "claude-opus-4.1": "claude-sonnet-3.5"
                }
            }

            implemented_controls.append({
                "type": "auto_downgrade",
                "config": downgrade_rules
            })

        # Budget alerts
        if control_config.get("enable_budget_alerts"):
            alert_thresholds = [0.5, 0.8, 0.9, 1.0]  # 50%, 80%, 90%, 100%

            implemented_controls.append({
                "type": "budget_alerts",
                "config": {"thresholds": alert_thresholds}
            })

        return {
            "success": True,
            "implemented_controls": implemented_controls,
            "effective_date": datetime.now().isoformat()
        }

# Usage example
async def demonstrate_cost_optimization():
    """Cost optimization usage example"""

    budget_config = {
        "daily_limit": 500.0,
        "monthly_limit": 15000.0,
        "alert_thresholds": [0.5, 0.8, 0.9]
    }

    optimizer = CostOptimizationEngine(budget_config)

    # Task optimization example
    task_requirements = {
        "type": "code_generation",
        "complexity": 0.7,
        "quality_priority": 0.8,
        "speed_priority": 0.2,
        "budget_limit": 0.5
    }

    optimization_result = await optimizer.optimize_model_selection(task_requirements)
    print(f"Optimization result: {optimization_result}")

    # Spending analysis example
    spending_analysis = await optimizer.analyze_spending_patterns()
    print(f"Spending analysis: {spending_analysis}")

if __name__ == "__main__":
    asyncio.run(demonstrate_cost_optimization())

Deployment Automation

1. Kubernetes Helm Chart

# helm/ai-agents/Chart.yaml
apiVersion: v2
name: ai-agents
description: Enterprise AI Agent Platform
type: application
version: 1.0.0
appVersion: "1.0.0"

---
# helm/ai-agents/values.yaml
global:
  imageRegistry: "your-registry.com"
  imagePullSecrets:
    - regcred

agentManager:
  replicaCount: 3
  image:
    repository: ai-agent-manager
    tag: "latest"
    pullPolicy: IfNotPresent

  service:
    type: ClusterIP
    port: 8080

  resources:
    limits:
      cpu: 1000m
      memory: 2Gi
    requests:
      cpu: 500m
      memory: 1Gi

  autoscaling:
    enabled: true
    minReplicas: 3
    maxReplicas: 10
    targetCPUUtilizationPercentage: 70

loadBalancer:
  replicaCount: 2
  image:
    repository: ai-load-balancer
    tag: "latest"

  service:
    type: LoadBalancer
    port: 80
    targetPort: 8080

agents:
  codeGenerator:
    enabled: true
    replicaCount: 2
    image:
      repository: ai-code-generator
      tag: "latest"
    resources:
      limits:
        cpu: 2000m
        memory: 4Gi
      requests:
        cpu: 1000m
        memory: 2Gi

  securityScanner:
    enabled: true
    replicaCount: 1
    image:
      repository: ai-security-scanner
      tag: "latest"
    resources:
      limits:
        cpu: 1000m
        memory: 2Gi
      requests:
        cpu: 500m
        memory: 1Gi

redis:
  enabled: true
  architecture: replication
  auth:
    enabled: true
    password: "${{ secrets.REDIS_PASSWORD }}"

monitoring:
  prometheus:
    enabled: true
  grafana:
    enabled: true
  jaeger:
    enabled: true

security:
  networkPolicies:
    enabled: true
  podSecurityPolicy:
    enabled: true
  serviceAccount:
    create: true
    name: ai-agent-sa

---
# helm/ai-agents/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "ai-agents.fullname" . }}-manager
  labels:
    {{- include "ai-agents.labels" . | nindent 4 }}
    component: manager
spec:
  replicas: {{ .Values.agentManager.replicaCount }}
  selector:
    matchLabels:
      {{- include "ai-agents.selectorLabels" . | nindent 6 }}
      component: manager
  template:
    metadata:
      labels:
        {{- include "ai-agents.selectorLabels" . | nindent 8 }}
        component: manager
    spec:
      serviceAccountName: {{ include "ai-agents.serviceAccountName" . }}
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 2000
      containers:
      - name: manager
        image: "{{ .Values.global.imageRegistry }}/{{ .Values.agentManager.image.repository }}:{{ .Values.agentManager.image.tag }}"
        imagePullPolicy: {{ .Values.agentManager.image.pullPolicy }}
        ports:
        - name: http
          containerPort: 8080
          protocol: TCP
        - name: metrics
          containerPort: 9090
          protocol: TCP
        livenessProbe:
          httpGet:
            path: /health
            port: http
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: http
          initialDelaySeconds: 5
          periodSeconds: 5
        env:
        - name: REDIS_URL
          value: "redis://{{ include "ai-agents.redis.fullname" . }}:6379"
        - name: REDIS_PASSWORD
          valueFrom:
            secretKeyRef:
              name: {{ include "ai-agents.redis.secretName" . }}
              key: redis-password
        - name: KUBERNETES_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        resources:
          {{- toYaml .Values.agentManager.resources | nindent 10 }}
        securityContext:
          allowPrivilegeEscalation: false
          capabilities:
            drop:
            - ALL
          readOnlyRootFilesystem: true

---
# helm/ai-agents/templates/hpa.yaml
{{- if .Values.agentManager.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: {{ include "ai-agents.fullname" . }}-manager-hpa
  labels:
    {{- include "ai-agents.labels" . | nindent 4 }}
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: {{ include "ai-agents.fullname" . }}-manager
  minReplicas: {{ .Values.agentManager.autoscaling.minReplicas }}
  maxReplicas: {{ .Values.agentManager.autoscaling.maxReplicas }}
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: {{ .Values.agentManager.autoscaling.targetCPUUtilizationPercentage }}
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  - type: Pods
    pods:
      metric:
        name: ai_agent_queue_length
      target:
        type: AverageValue
        averageValue: "10"
{{- end }}

---
# helm/ai-agents/templates/networkpolicy.yaml
{{- if .Values.security.networkPolicies.enabled }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: {{ include "ai-agents.fullname" . }}-network-policy
  labels:
    {{- include "ai-agents.labels" . | nindent 4 }}
spec:
  podSelector:
    matchLabels:
      {{- include "ai-agents.selectorLabels" . | nindent 6 }}
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          {{- include "ai-agents.selectorLabels" . | nindent 10 }}
    - namespaceSelector:
        matchLabels:
          name: monitoring
    ports:
    - protocol: TCP
      port: 8080
    - protocol: TCP
      port: 9090
  egress:
  - to:
    - podSelector:
        matchLabels:
          app.kubernetes.io/name: redis
    ports:
    - protocol: TCP
      port: 6379
  - to: []
    ports:
    - protocol: TCP
      port: 443  # HTTPS for external API calls
    - protocol: TCP
      port: 53   # DNS
    - protocol: UDP
      port: 53   # DNS
{{- end }}

2. CI/CD Pipeline

# .github/workflows/ai-agents-cicd.yml
name: AI Agents CI/CD Pipeline

on:
  push:
    branches: [main, develop]
    paths:
      - 'ai-agents/**'
      - 'helm/ai-agents/**'
      - '.github/workflows/ai-agents-cicd.yml'
  pull_request:
    branches: [main]
    paths:
      - 'ai-agents/**'
      - 'helm/ai-agents/**'

env:
  REGISTRY: ghcr.io
  IMAGE_PREFIX: ${{ github.repository }}/ai-agents

jobs:
  security-scan:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Run Trivy vulnerability scanner
        uses: aquasecurity/trivy-action@master
        with:
          scan-type: 'fs'
          scan-ref: './ai-agents'
          format: 'sarif'
          output: 'trivy-results.sarif'

      - name: Upload Trivy scan results
        uses: github/codeql-action/upload-sarif@v2
        with:
          sarif_file: 'trivy-results.sarif'

  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: ['3.11', '3.12']

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}

      - name: Install dependencies
        run: |
          cd ai-agents
          pip install -r requirements.txt
          pip install pytest pytest-cov pytest-asyncio

      - name: Run unit tests
        run: |
          cd ai-agents
          pytest tests/ --cov=src --cov-report=xml --cov-report=html

      - name: Upload coverage reports
        uses: codecov/codecov-action@v3
        with:
          file: ./ai-agents/coverage.xml
          flags: unittests
          name: codecov-umbrella

  build:
    needs: [security-scan, test]
    runs-on: ubuntu-latest
    strategy:
      matrix:
        component: [manager, load-balancer, code-generator, security-scanner]

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Docker Buildx
        uses: docker/setup-buildx-action@v3

      - name: Log in to Container Registry
        uses: docker/login-action@v3
        with:
          registry: ${{ env.REGISTRY }}
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}

      - name: Extract metadata
        id: meta
        uses: docker/metadata-action@v5
        with:
          images: ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}
          tags: |
            type=ref,event=branch
            type=ref,event=pr
            type=sha,prefix=${{ github.ref_name }}-
            type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}

      - name: Build and push Docker image
        uses: docker/build-push-action@v5
        with:
          context: ./ai-agents/${{ matrix.component }}
          platforms: linux/amd64,linux/arm64
          push: true
          tags: ${{ steps.meta.outputs.tags }}
          labels: ${{ steps.meta.outputs.labels }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

      - name: Sign container image
        env:
          COSIGN_EXPERIMENTAL: 1
        run: |
          cosign sign --yes ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}:${{ github.sha }}

  helm-test:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Helm
        uses: azure/setup-helm@v3
        with:
          version: 'v3.12.0'

      - name: Lint Helm chart
        run: |
          helm lint helm/ai-agents

      - name: Template Helm chart
        run: |
          helm template ai-agents helm/ai-agents \
            --values helm/ai-agents/values.yaml \
            --output-dir /tmp/helm-output

      - name: Validate Kubernetes manifests
        run: |
          kubectl --dry-run=client apply -f /tmp/helm-output/ai-agents/templates/

  deploy-staging:
    if: github.ref == 'refs/heads/develop'
    needs: [build, helm-test]
    runs-on: ubuntu-latest
    environment: staging

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
          aws-region: us-west-2

      - name: Update kubeconfig
        run: |
          aws eks update-kubeconfig --name ai-agents-staging

      - name: Deploy to staging
        run: |
          helm upgrade --install ai-agents-staging helm/ai-agents \
            --namespace ai-agents-staging \
            --create-namespace \
            --values helm/ai-agents/values-staging.yaml \
            --set global.imageTag=${{ github.sha }} \
            --wait --timeout=600s

      - name: Run integration tests
        run: |
          kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=ai-agents \
            -n ai-agents-staging --timeout=300s
          ./scripts/integration-tests.sh staging

  deploy-production:
    if: github.ref == 'refs/heads/main'
    needs: [build, helm-test]
    runs-on: ubuntu-latest
    environment: production

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_PROD }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PROD }}
          aws-region: us-west-2

      - name: Update kubeconfig
        run: |
          aws eks update-kubeconfig --name ai-agents-production

      - name: Blue-Green Deployment
        run: |
          # Check current version
          CURRENT_VERSION=$(helm get values ai-agents-prod -n ai-agents-prod -o json | jq -r '.global.imageTag')

          # Deploy new version to blue environment
          helm upgrade --install ai-agents-blue helm/ai-agents \
            --namespace ai-agents-blue \
            --create-namespace \
            --values helm/ai-agents/values-production.yaml \
            --set global.imageTag=${{ github.sha }} \
            --set service.type=ClusterIP \
            --wait --timeout=600s

      - name: Health check and smoke tests
        run: |
          ./scripts/health-check.sh blue
          ./scripts/smoke-tests.sh blue

      - name: Switch traffic to blue
        run: |
          # Update Istio VirtualService to switch traffic
          kubectl patch virtualservice ai-agents-vs -n ai-agents-prod \
            --type='json' \
            -p='[{"op": "replace", "path": "/spec/http/0/route/0/destination/subset", "value": "blue"}]'

      - name: Monitor deployment
        run: |
          ./scripts/monitor-deployment.sh 300  # Monitor for 5 minutes

      - name: Cleanup old version
        if: success()
        run: |
          helm uninstall ai-agents-green -n ai-agents-green || true

  notify:
    if: always()
    needs: [deploy-staging, deploy-production]
    runs-on: ubuntu-latest

    steps:
      - name: Notify Slack
        uses: 8398a7/action-slack@v3
        with:
          status: ${{ job.status }}
          channel: '#ai-agents-deployments'
          webhook_url: ${{ secrets.SLACK_WEBHOOK }}
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

Operations Manual and Best Practices

1. Operations Checklist

# AI Agent Operations Checklist

## Daily Operations Check

### System Health Verification
- [ ] Verify all agent status
- [ ] Monitor response times (target: 95%ile < 5 seconds)
- [ ] Monitor error rates (target: < 1%)
- [ ] Check resource utilization (CPU < 80%, Memory < 85%)

### Cost Monitoring
- [ ] Daily cost verification
- [ ] Budget burn rate check
- [ ] Investigate abnormal usage

### Security Verification
- [ ] Check for suspicious access logs
- [ ] Monitor authentication failure count
- [ ] Secret rotation status

## Weekly Operations Check

### Performance Analysis
- [ ] Review weekly performance report
- [ ] Analyze scaling history
- [ ] Identify bottlenecks and improvement plans

### Capacity Planning
- [ ] Analyze resource usage trends
- [ ] Forecast next month capacity
- [ ] Adjust scaling thresholds

### Update Management
- [ ] Apply security patches
- [ ] Check dependency updates
- [ ] Apply configuration changes

## Monthly Operations Check

### Comprehensive Review
- [ ] Create monthly operations report
- [ ] Verify SLA achievement
- [ ] Analyze incidents and action items
- [ ] Identify operations process improvements

### Disaster Recovery Testing
- [ ] Backup recovery test
- [ ] DR environment verification
- [ ] Update recovery procedures

2. Troubleshooting Guide

# troubleshooting/diagnostic_tools.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import aiohttp
import subprocess

class DiagnosticTools:
    """AI agent diagnostic tools"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.checks = {
            "connectivity": self._check_connectivity,
            "performance": self._check_performance,
            "resources": self._check_resources,
            "security": self._check_security,
            "costs": self._check_costs
        }

    async def run_full_diagnostic(self) -> Dict[str, Any]:
        """Run full diagnostic"""

        print("Starting comprehensive diagnostic...")
        results = {}

        for check_name, check_func in self.checks.items():
            try:
                print(f"Running {check_name} check...")
                results[check_name] = await check_func()
                print(f"✅ {check_name} check completed")

            except Exception as e:
                print(f"❌ {check_name} check failed: {e}")
                results[check_name] = {"error": str(e)}

        # Overall assessment
        overall_health = self._calculate_overall_health(results)

        return {
            "timestamp": datetime.now().isoformat(),
            "overall_health": overall_health,
            "detailed_results": results,
            "recommendations": self._generate_recommendations(results)
        }

    async def _check_connectivity(self) -> Dict[str, Any]:
        """Connectivity check"""

        endpoints = [
            {"name": "Agent Manager", "url": f"{self.config['agent_manager_url']}/health"},
            {"name": "Redis", "url": f"{self.config['redis_url']}/ping"},
            {"name": "Prometheus", "url": f"{self.config['prometheus_url']}/api/v1/targets"}
        ]

        results = []

        for endpoint in endpoints:
            try:
                async with aiohttp.ClientSession() as session:
                    start_time = datetime.now()
                    async with session.get(endpoint["url"], timeout=10) as response:
                        response_time = (datetime.now() - start_time).total_seconds()

                        results.append({
                            "endpoint": endpoint["name"],
                            "status": "healthy" if response.status == 200 else "unhealthy",
                            "response_time": response_time,
                            "status_code": response.status
                        })

            except Exception as e:
                results.append({
                    "endpoint": endpoint["name"],
                    "status": "error",
                    "error": str(e)
                })

        return {
            "status": "healthy" if all(r["status"] == "healthy" for r in results) else "degraded",
            "endpoints": results
        }

    async def _check_performance(self) -> Dict[str, Any]:
        """Performance check"""

        # Get metrics from Prometheus
        metrics_queries = {
            "avg_response_time": "avg(ai_agent_response_time_seconds)",
            "error_rate": "rate(ai_agent_requests_total{status='error'}[5m])",
            "throughput": "rate(ai_agent_requests_total[5m])",
            "active_agents": "count(up{job='ai-agents'})"
        }

        results = {}

        async with aiohttp.ClientSession() as session:
            for metric_name, query in metrics_queries.items():
                try:
                    async with session.get(
                        f"{self.config['prometheus_url']}/api/v1/query",
                        params={"query": query}
                    ) as response:
                        data = await response.json()

                        if data["data"]["result"]:
                            value = float(data["data"]["result"][0]["value"][1])
                            results[metric_name] = value
                        else:
                            results[metric_name] = None

                except Exception as e:
                    results[f"{metric_name}_error"] = str(e)

        # Performance evaluation
        performance_status = "healthy"

        if results.get("avg_response_time", 0) > 10:
            performance_status = "degraded"
        if results.get("error_rate", 0) > 0.05:
            performance_status = "unhealthy"

        return {
            "status": performance_status,
            "metrics": results,
            "thresholds": {
                "max_response_time": 10.0,
                "max_error_rate": 0.05
            }
        }

    async def _check_resources(self) -> Dict[str, Any]:
        """Resource check"""

        # Check Kubernetes resource usage
        try:
            # Execute kubectl top pods
            result = subprocess.run(
                ["kubectl", "top", "pods", "-n", "ai-agents", "--no-headers"],
                capture_output=True,
                text=True,
                check=True
            )

            pod_resources = []
            for line in result.stdout.strip().split('\n'):
                if line:
                    parts = line.split()
                    pod_resources.append({
                        "pod": parts[0],
                        "cpu": parts[1],
                        "memory": parts[2]
                    })

            # Check node resources
            node_result = subprocess.run(
                ["kubectl", "top", "nodes", "--no-headers"],
                capture_output=True,
                text=True,
                check=True
            )

            node_resources = []
            for line in node_result.stdout.strip().split('\n'):
                if line:
                    parts = line.split()
                    node_resources.append({
                        "node": parts[0],
                        "cpu": parts[1],
                        "cpu_percent": parts[2],
                        "memory": parts[3],
                        "memory_percent": parts[4]
                    })

            return {
                "status": "healthy",
                "pods": pod_resources,
                "nodes": node_resources
            }

        except subprocess.CalledProcessError as e:
            return {
                "status": "error",
                "error": f"Failed to get resource information: {e}"
            }

    async def _check_security(self) -> Dict[str, Any]:
        """Security check"""

        security_checks = []

        # Check secret expiration
        try:
            result = subprocess.run(
                ["kubectl", "get", "secrets", "-n", "ai-agents", "-o", "json"],
                capture_output=True,
                text=True,
                check=True
            )

            secrets_data = json.loads(result.stdout)

            for secret in secrets_data["items"]:
                creation_time = datetime.fromisoformat(
                    secret["metadata"]["creationTimestamp"].replace('Z', '+00:00')
                )
                age_days = (datetime.now(creation_time.tzinfo) - creation_time).days

                status = "healthy"
                if age_days > 90:  # 90+ days old
                    status = "warning"
                if age_days > 180:  # 180+ days old
                    status = "critical"

                security_checks.append({
                    "type": "secret_age",
                    "name": secret["metadata"]["name"],
                    "age_days": age_days,
                    "status": status
                })

        except Exception as e:
            security_checks.append({
                "type": "secret_check_error",
                "error": str(e)
            })

        # Check network policies
        try:
            result = subprocess.run(
                ["kubectl", "get", "networkpolicies", "-n", "ai-agents"],
                capture_output=True,
                text=True,
                check=True
            )

            if "No resources found" in result.stdout:
                security_checks.append({
                    "type": "network_policy",
                    "status": "warning",
                    "message": "No network policies found"
                })
            else:
                security_checks.append({
                    "type": "network_policy",
                    "status": "healthy",
                    "message": "Network policies configured"
                })

        except Exception as e:
            security_checks.append({
                "type": "network_policy_error",
                "error": str(e)
            })

        overall_status = "healthy"
        if any(check.get("status") == "warning" for check in security_checks):
            overall_status = "warning"
        if any(check.get("status") == "critical" for check in security_checks):
            overall_status = "critical"

        return {
            "status": overall_status,
            "checks": security_checks
        }

    async def _check_costs(self) -> Dict[str, Any]:
        """Cost check"""

        # Compare with cost budget (simplified)
        daily_budget = self.config.get("daily_budget", 500.0)
        current_spend = await self._get_current_daily_spend()

        budget_utilization = current_spend / daily_budget

        status = "healthy"
        if budget_utilization > 0.8:
            status = "warning"
        if budget_utilization > 1.0:
            status = "critical"

        return {
            "status": status,
            "daily_budget": daily_budget,
            "current_spend": current_spend,
            "budget_utilization": budget_utilization,
            "projected_monthly": current_spend * 30
        }

    async def _get_current_daily_spend(self) -> float:
        """Get current daily spend"""
        # In actual implementation, get from billing API or metrics
        return 423.50  # Dummy value

    def _calculate_overall_health(self, results: Dict[str, Any]) -> str:
        """Calculate overall health score"""

        statuses = []
        for check_result in results.values():
            if isinstance(check_result, dict) and "status" in check_result:
                statuses.append(check_result["status"])

        if "error" in statuses or "critical" in statuses:
            return "critical"
        elif "unhealthy" in statuses:
            return "unhealthy"
        elif "warning" in statuses or "degraded" in statuses:
            return "warning"
        else:
            return "healthy"

    def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]:
        """Generate recommended actions"""

        recommendations = []

        # Performance related
        perf_result = results.get("performance", {})
        if perf_result.get("status") == "degraded":
            recommendations.append("Consider scaling up for performance improvement")

        # Security related
        security_result = results.get("security", {})
        if security_result.get("status") in ["warning", "critical"]:
            recommendations.append("Perform secret rotation and security configuration review")

        # Cost related
        cost_result = results.get("costs", {})
        if cost_result.get("status") in ["warning", "critical"]:
            recommendations.append("Review model selection and rate limiting for cost optimization")

        return recommendations

# Usage example and command line tool
async def main():
    """Main execution for diagnostic tool"""

    config = {
        "agent_manager_url": "http://agent-manager.ai-agents.svc.cluster.local:8080",
        "redis_url": "redis://redis.ai-agents.svc.cluster.local:6379",
        "prometheus_url": "http://prometheus.monitoring.svc.cluster.local:9090",
        "daily_budget": 500.0
    }

    diagnostic = DiagnosticTools(config)
    results = await diagnostic.run_full_diagnostic()

    print(json.dumps(results, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

Summary

This article has provided a comprehensive explanation of the architecture and best practices needed for production environment operations of the latest AI agent development tools from August 2025.

Key Implementation Points

  • Enterprise Architecture: Scalable and reliable system design
  • Zero Trust Security: Strong security through multi-layered defense
  • Comprehensive Monitoring: Complete visibility of operations through observability
  • Cost Optimization: Automated cost management and budget control

Phased Implementation Plan

  1. Phase 1: Build basic architecture and security configuration
  2. Phase 2: Implement monitoring systems and automation
  3. Phase 3: Full operation and continuous improvement of cost optimization

Keys to Operational Success

  • Continuous Monitoring: Constant system state monitoring and rapid response
  • Automation Focus: Comprehensive automation to eliminate human errors
  • Security First: Design with security as the highest priority
  • Cost Awareness: Operations with constant awareness of cost efficiency

Production operations of AI agents can reliably improve enterprise value with proper architecture and operational processes.