Complete AI Agent Production Operations Guide - August 2025 Enterprise Architecture Implementation¶
Introduction¶
Following the AI Agent Development Theory Article and Implementation Hands-on Guide, this third installment provides a practical guide specifically focused on safe and efficient AI agent operations in production environments.
We will provide detailed explanations of the actual challenges faced in enterprise environment operations and proven architecture patterns to solve them.
Key Points¶
Enterprise Ready
Building secure AI agent operation architecture for large-scale organizations
Auto Scaling
Dynamic resource management based on load and automated cost optimization
Enhanced Security
Secure AI agent operations based on zero-trust principles
Comprehensive Monitoring
360-degree observability system for monitoring and analyzing AI agent behavior
Enterprise Architecture Design¶
1. Overall Architecture Overview¶
graph TB
subgraph "API Gateway Layer"
AGW[API Gateway] --> LB[Load Balancer]
end
subgraph "AI Agent Management Layer"
LB --> AM[Agent Manager]
AM --> AR[Agent Registry]
AM --> AS[Agent Scheduler]
end
subgraph "Execution Layer"
AS --> AE1[Agent Executor 1]
AS --> AE2[Agent Executor 2]
AS --> AE3[Agent Executor N]
end
subgraph "Data Layer"
AE1 --> MCP[MCP Servers]
AE2 --> DB[(Database)]
AE3 --> FS[(File System)]
end
subgraph "Observability Layer"
AM --> MON[Monitoring]
AE1 --> LOG[Logging]
AE2 --> TRC[Tracing]
AE3 --> MET[Metrics]
end
subgraph "Security Layer"
AGW --> IAM[Identity & Access]
AM --> VLT[Secrets Vault]
AE1 --> NET[Network Security]
end2. Agent Manager Implementation¶
# infrastructure/agent_manager.py
import asyncio
import json
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis
import kubernetes
from prometheus_client import Counter, Histogram, Gauge
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
FAILED = "failed"
MAINTENANCE = "maintenance"
@dataclass
class AgentInstance:
id: str
type: str
status: AgentStatus
capacity: int
current_load: int
created_at: datetime
last_heartbeat: datetime
metadata: Dict[str, Any]
class EnterpriseAgentManager:
"""Enterprise AI Agent Manager"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.redis_client = redis.Redis.from_url(config['redis_url'])
self.k8s_client = kubernetes.client.ApiClient()
# Prometheus metrics
self.agent_counter = Counter('ai_agents_total', 'Total AI agents', ['type', 'status'])
self.task_duration = Histogram('ai_task_duration_seconds', 'Task execution time')
self.agent_load = Gauge('ai_agent_load', 'Current agent load', ['agent_id'])
# Agent registry
self.agents: Dict[str, AgentInstance] = {}
self.task_queue = asyncio.Queue()
async def start(self):
"""Start the manager"""
print("Starting Enterprise Agent Manager...")
# Start background tasks
tasks = [
asyncio.create_task(self._health_monitor()),
asyncio.create_task(self._task_scheduler()),
asyncio.create_task(self._auto_scaler()),
asyncio.create_task(self._metrics_collector())
]
await asyncio.gather(*tasks)
async def register_agent(self, agent_config: Dict[str, Any]) -> str:
"""Register an agent"""
agent_id = str(uuid.uuid4())
agent = AgentInstance(
id=agent_id,
type=agent_config['type'],
status=AgentStatus.IDLE,
capacity=agent_config.get('capacity', 10),
current_load=0,
created_at=datetime.now(),
last_heartbeat=datetime.now(),
metadata=agent_config.get('metadata', {})
)
self.agents[agent_id] = agent
# Persist to Redis
await self._persist_agent(agent)
# Deploy agent pod to Kubernetes
await self._deploy_agent_pod(agent)
# Update metrics
self.agent_counter.labels(
type=agent.type,
status=agent.status.value
).inc()
print(f"Agent registered: {agent_id} (type: {agent.type})")
return agent_id
async def assign_task(
self,
task: Dict[str, Any],
agent_type: Optional[str] = None
) -> Dict[str, Any]:
"""Assign a task"""
# Select optimal agent
agent = await self._select_best_agent(agent_type, task)
if not agent:
# Auto-scale if no agents available
agent = await self._auto_scale_agent(agent_type)
if not agent:
return {
"success": False,
"error": "No agents available",
"retry_after": 30
}
# Execute task
task_id = str(uuid.uuid4())
execution_result = await self._execute_task(agent, task_id, task)
return {
"success": True,
"task_id": task_id,
"agent_id": agent.id,
"result": execution_result
}
async def _select_best_agent(
self,
agent_type: Optional[str],
task: Dict[str, Any]
) -> Optional[AgentInstance]:
"""Select the best agent"""
available_agents = [
agent for agent in self.agents.values()
if (agent.status == AgentStatus.IDLE or
(agent.status == AgentStatus.BUSY and
agent.current_load < agent.capacity))
and (not agent_type or agent.type == agent_type)
]
if not available_agents:
return None
# Select optimal agent based on load
return min(
available_agents,
key=lambda a: a.current_load / a.capacity
)
async def _execute_task(
self,
agent: AgentInstance,
task_id: str,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute a task"""
start_time = datetime.now()
try:
# Update agent load
agent.current_load += 1
agent.status = AgentStatus.BUSY if agent.current_load >= agent.capacity else AgentStatus.IDLE
# Update metrics
self.agent_load.labels(agent_id=agent.id).set(agent.current_load)
# Execute actual task (call agent pod API)
result = await self._call_agent_api(agent, task)
# Record execution time
duration = (datetime.now() - start_time).total_seconds()
self.task_duration.observe(duration)
return {
"status": "completed",
"result": result,
"duration": duration
}
except Exception as e:
print(f"Task execution failed: {e}")
agent.status = AgentStatus.FAILED
return {
"status": "failed",
"error": str(e),
"duration": (datetime.now() - start_time).total_seconds()
}
finally:
# Reduce load
agent.current_load = max(0, agent.current_load - 1)
if agent.current_load == 0:
agent.status = AgentStatus.IDLE
self.agent_load.labels(agent_id=agent.id).set(agent.current_load)
async def _call_agent_api(
self,
agent: AgentInstance,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""Call agent API"""
import aiohttp
agent_url = f"http://agent-{agent.id}:8080/execute"
async with aiohttp.ClientSession() as session:
async with session.post(
agent_url,
json=task,
timeout=aiohttp.ClientTimeout(total=300) # 5 minute timeout
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Agent API error: {response.status}")
async def _health_monitor(self):
"""Agent health monitoring"""
while True:
try:
current_time = datetime.now()
for agent in list(self.agents.values()):
# Heartbeat check
if (current_time - agent.last_heartbeat).total_seconds() > 60:
print(f"Agent {agent.id} is unhealthy, removing...")
await self._remove_agent(agent.id)
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Health monitor error: {e}")
await asyncio.sleep(60)
async def _auto_scaler(self):
"""Auto scaling"""
while True:
try:
# Analyze load status for each agent type
agent_types = set(agent.type for agent in self.agents.values())
for agent_type in agent_types:
agents_of_type = [
agent for agent in self.agents.values()
if agent.type == agent_type
]
if not agents_of_type:
continue
# Calculate average load
avg_load = sum(
agent.current_load / agent.capacity
for agent in agents_of_type
) / len(agents_of_type)
# Scaling decision
if avg_load > 0.8: # 80%+ load
print(f"Scaling up {agent_type} agents (load: {avg_load:.2f})")
await self._scale_up(agent_type)
elif avg_load < 0.2 and len(agents_of_type) > 1: # <20% load
print(f"Scaling down {agent_type} agents (load: {avg_load:.2f})")
await self._scale_down(agent_type)
await asyncio.sleep(60) # Check every minute
except Exception as e:
print(f"Auto scaler error: {e}")
await asyncio.sleep(120)
async def _scale_up(self, agent_type: str):
"""Scale up"""
agent_config = {
"type": agent_type,
"capacity": 10,
"metadata": {"auto_scaled": True}
}
await self.register_agent(agent_config)
async def _scale_down(self, agent_type: str):
"""Scale down"""
agents_of_type = [
agent for agent in self.agents.values()
if agent.type == agent_type and agent.current_load == 0
]
if agents_of_type:
# Remove oldest idle agent
oldest_agent = min(agents_of_type, key=lambda a: a.created_at)
await self._remove_agent(oldest_agent.id)
async def _deploy_agent_pod(self, agent: AgentInstance):
"""Deploy Kubernetes agent pod"""
pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": f"agent-{agent.id}",
"labels": {
"app": "ai-agent",
"agent-type": agent.type,
"agent-id": agent.id
}
},
"spec": {
"containers": [{
"name": "agent",
"image": f"ai-agent-{agent.type}:latest",
"ports": [{"containerPort": 8080}],
"env": [
{"name": "AGENT_ID", "value": agent.id},
{"name": "AGENT_TYPE", "value": agent.type},
{"name": "MANAGER_URL", "value": self.config['manager_url']}
],
"resources": {
"requests": {"cpu": "100m", "memory": "256Mi"},
"limits": {"cpu": "500m", "memory": "1Gi"}
},
"livenessProbe": {
"httpGet": {"path": "/health", "port": 8080},
"initialDelaySeconds": 30,
"periodSeconds": 10
}
}],
"restartPolicy": "Always"
}
}
# Create pod using Kubernetes API
v1 = kubernetes.client.CoreV1Api(self.k8s_client)
try:
v1.create_namespaced_pod(
namespace="ai-agents",
body=pod_spec
)
print(f"Pod deployed for agent {agent.id}")
except Exception as e:
print(f"Failed to deploy pod for agent {agent.id}: {e}")
raise
async def _persist_agent(self, agent: AgentInstance):
"""Persist agent information"""
agent_data = asdict(agent)
agent_data['created_at'] = agent.created_at.isoformat()
agent_data['last_heartbeat'] = agent.last_heartbeat.isoformat()
agent_data['status'] = agent.status.value
self.redis_client.hset(
"agents",
agent.id,
json.dumps(agent_data)
)
async def _remove_agent(self, agent_id: str):
"""Remove agent"""
if agent_id in self.agents:
agent = self.agents[agent_id]
# Delete Kubernetes pod
v1 = kubernetes.client.CoreV1Api(self.k8s_client)
try:
v1.delete_namespaced_pod(
name=f"agent-{agent_id}",
namespace="ai-agents"
)
except Exception as e:
print(f"Failed to delete pod for agent {agent_id}: {e}")
# Remove from memory and Redis
del self.agents[agent_id]
self.redis_client.hdel("agents", agent_id)
print(f"Agent removed: {agent_id}")
async def _metrics_collector(self):
"""Metrics collection"""
while True:
try:
# Update agent status metrics
for agent in self.agents.values():
self.agent_counter.labels(
type=agent.type,
status=agent.status.value
).inc()
await asyncio.sleep(15) # Update metrics every 15 seconds
except Exception as e:
print(f"Metrics collector error: {e}")
await asyncio.sleep(60)
# Usage example
async def start_enterprise_manager():
"""Start enterprise manager"""
config = {
"redis_url": "redis://redis-cluster:6379",
"manager_url": "http://agent-manager:8080",
"kubernetes_namespace": "ai-agents"
}
manager = EnterpriseAgentManager(config)
await manager.start()
if __name__ == "__main__":
asyncio.run(start_enterprise_manager())
Security Architecture¶
1. Zero Trust Security Model¶
# security/zero-trust-policy.yml
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: ai-agent-zero-trust
namespace: ai-agents
spec:
rules:
- from:
- source:
principals: ["cluster.local/ns/ai-agents/sa/agent-executor"]
to:
- operation:
methods: ["POST"]
paths: ["/api/v1/tasks"]
when:
- key: custom.agent_authenticated
values: ["true"]
- key: custom.task_validated
values: ["true"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: agent-executor
namespace: ai-agents
annotations:
iam.gke.io/gcp-service-account: ai-agent-sa@project.iam.gserviceaccount.com
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: ai-agent-tls
namespace: ai-agents
spec:
host: "*.ai-agents.svc.cluster.local"
trafficPolicy:
tls:
mode: ISTIO_MUTUAL
2. Secrets Management System¶
# security/secrets_manager.py
import base64
import json
from typing import Dict, Any, Optional
from cryptography.fernet import Fernet
from kubernetes import client, config
import hvac # HashiCorp Vault client
class EnterpriseSecretsManager:
"""Enterprise secrets management"""
def __init__(self, vault_url: str, vault_token: str):
self.vault_client = hvac.Client(url=vault_url, token=vault_token)
self.k8s_client = client.CoreV1Api()
self.encryption_key = self._get_or_create_encryption_key()
self.cipher = Fernet(self.encryption_key)
async def store_agent_secrets(
self,
agent_id: str,
secrets: Dict[str, str]
) -> bool:
"""Store agent secrets"""
try:
# Encrypt and store in Vault
encrypted_secrets = {
key: self.cipher.encrypt(value.encode()).decode()
for key, value in secrets.items()
}
vault_path = f"agents/{agent_id}"
self.vault_client.secrets.kv.v2.create_or_update_secret(
path=vault_path,
secret=encrypted_secrets
)
# Also store as Kubernetes secret (for fast access)
k8s_secret_data = {
key: base64.b64encode(value.encode()).decode()
for key, value in secrets.items()
}
secret_metadata = client.V1ObjectMeta(
name=f"agent-secrets-{agent_id}",
namespace="ai-agents",
labels={"agent-id": agent_id}
)
secret = client.V1Secret(
metadata=secret_metadata,
data=k8s_secret_data,
type="Opaque"
)
self.k8s_client.create_namespaced_secret(
namespace="ai-agents",
body=secret
)
return True
except Exception as e:
print(f"Failed to store secrets for agent {agent_id}: {e}")
return False
async def get_agent_secrets(self, agent_id: str) -> Dict[str, str]:
"""Get agent secrets"""
try:
# Try Kubernetes first (fast)
try:
secret = self.k8s_client.read_namespaced_secret(
name=f"agent-secrets-{agent_id}",
namespace="ai-agents"
)
return {
key: base64.b64decode(value).decode()
for key, value in secret.data.items()
}
except client.exceptions.ApiException:
# Fallback to Vault if not in K8s
vault_path = f"agents/{agent_id}"
response = self.vault_client.secrets.kv.v2.read_secret_version(
path=vault_path
)
encrypted_secrets = response['data']['data']
return {
key: self.cipher.decrypt(value.encode()).decode()
for key, value in encrypted_secrets.items()
}
except Exception as e:
print(f"Failed to get secrets for agent {agent_id}: {e}")
return {}
async def rotate_secrets(self, agent_id: str) -> bool:
"""Rotate secrets"""
try:
# Generate new API keys
new_secrets = await self._generate_new_secrets(agent_id)
# Store new secrets
await self.store_agent_secrets(agent_id, new_secrets)
# Notify agent of new secrets
await self._notify_agent_secret_rotation(agent_id)
print(f"Secrets rotated for agent {agent_id}")
return True
except Exception as e:
print(f"Failed to rotate secrets for agent {agent_id}: {e}")
return False
async def audit_secret_access(self, agent_id: str, secret_key: str):
"""Audit secret access"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"secret_key": secret_key,
"action": "access",
"source_ip": self._get_client_ip()
}
# Send audit log to secure logging system
await self._send_audit_log(audit_entry)
def _get_or_create_encryption_key(self) -> bytes:
"""Get or create encryption key"""
try:
# Get encryption key from Vault
response = self.vault_client.secrets.kv.v2.read_secret_version(
path="encryption/master-key"
)
return response['data']['data']['key'].encode()
except:
# Generate new key and store in Vault
new_key = Fernet.generate_key()
self.vault_client.secrets.kv.v2.create_or_update_secret(
path="encryption/master-key",
secret={"key": new_key.decode()}
)
return new_key
async def _generate_new_secrets(self, agent_id: str) -> Dict[str, str]:
"""Generate new secrets"""
import secrets
import string
# Generate strong random password
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
new_api_key = ''.join(secrets.choice(alphabet) for _ in range(32))
return {
"api_key": new_api_key,
"agent_token": str(uuid.uuid4()),
"generated_at": datetime.now().isoformat()
}
Monitoring & Observability System¶
1. Comprehensive Monitoring Architecture¶
# monitoring/observability_stack.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
from dataclasses import dataclass
import aiohttp
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
@dataclass
class AlertRule:
name: str
condition: str
threshold: float
duration: int # seconds
severity: str
action: str
class AIAgentObservability:
"""AI Agent dedicated observability system"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# Prometheus registry
self.registry = CollectorRegistry()
# Metrics definitions
self.agent_requests = Counter(
'ai_agent_requests_total',
'Total agent requests',
['agent_type', 'status', 'endpoint'],
registry=self.registry
)
self.agent_response_time = Histogram(
'ai_agent_response_time_seconds',
'Agent response time',
['agent_type', 'complexity'],
registry=self.registry
)
self.agent_success_rate = Gauge(
'ai_agent_success_rate',
'Agent success rate',
['agent_type'],
registry=self.registry
)
self.token_usage = Counter(
'ai_agent_token_usage_total',
'Total token usage',
['agent_type', 'model'],
registry=self.registry
)
# Tracing setup
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name=config.get('jaeger_host', 'localhost'),
agent_port=config.get('jaeger_port', 14268),
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Alert rules
self.alert_rules = self._load_alert_rules()
self.active_alerts = {}
def _load_alert_rules(self) -> List[AlertRule]:
"""Load alert rules"""
return [
AlertRule(
name="high_error_rate",
condition="agent_success_rate < 0.9",
threshold=0.9,
duration=300, # 5 minutes
severity="warning",
action="scale_up"
),
AlertRule(
name="critical_error_rate",
condition="agent_success_rate < 0.8",
threshold=0.8,
duration=180, # 3 minutes
severity="critical",
action="immediate_intervention"
),
AlertRule(
name="high_response_time",
condition="agent_response_time > 30.0",
threshold=30.0,
duration=600, # 10 minutes
severity="warning",
action="performance_investigation"
),
AlertRule(
name="token_budget_exceeded",
condition="hourly_token_usage > 1000000",
threshold=1000000,
duration=60,
severity="critical",
action="rate_limiting"
)
]
async def start_monitoring(self):
"""Start monitoring system"""
print("Starting AI Agent Observability System...")
tasks = [
asyncio.create_task(self._metrics_collector()),
asyncio.create_task(self._alert_manager()),
asyncio.create_task(self._health_checker()),
asyncio.create_task(self._cost_tracker()),
asyncio.create_task(self._performance_analyzer())
]
await asyncio.gather(*tasks)
async def record_agent_request(
self,
agent_type: str,
endpoint: str,
status: str,
response_time: float,
token_count: int = 0,
model: str = "unknown"
):
"""Record agent request"""
# Update metrics
self.agent_requests.labels(
agent_type=agent_type,
status=status,
endpoint=endpoint
).inc()
self.agent_response_time.labels(
agent_type=agent_type,
complexity="standard" # Actually determined dynamically
).observe(response_time)
if token_count > 0:
self.token_usage.labels(
agent_type=agent_type,
model=model
).inc(token_count)
# Tracing
with trace.get_tracer(__name__).start_as_current_span("agent_request") as span:
span.set_attribute("agent.type", agent_type)
span.set_attribute("agent.endpoint", endpoint)
span.set_attribute("agent.status", status)
span.set_attribute("agent.response_time", response_time)
span.set_attribute("agent.token_count", token_count)
async def _metrics_collector(self):
"""Metrics collection"""
while True:
try:
# Calculate success rates
success_rates = await self._calculate_success_rates()
for agent_type, rate in success_rates.items():
self.agent_success_rate.labels(agent_type=agent_type).set(rate)
await asyncio.sleep(30) # Every 30 seconds
except Exception as e:
print(f"Metrics collector error: {e}")
await asyncio.sleep(60)
async def _calculate_success_rates(self) -> Dict[str, float]:
"""Calculate agent success rates"""
# Get metrics from Prometheus and calculate success rates
# Simplified implementation returns dummy data
return {
"code_generator": 0.95,
"security_scanner": 0.98,
"performance_tester": 0.92
}
async def _alert_manager(self):
"""Alert management"""
while True:
try:
for rule in self.alert_rules:
await self._evaluate_alert_rule(rule)
await asyncio.sleep(60) # Evaluate alerts every minute
except Exception as e:
print(f"Alert manager error: {e}")
await asyncio.sleep(120)
async def _evaluate_alert_rule(self, rule: AlertRule):
"""Evaluate alert rule"""
current_value = await self._get_metric_value(rule.condition)
if current_value is None:
return
# Check if condition is met
if self._condition_met(rule, current_value):
if rule.name not in self.active_alerts:
# New alert
self.active_alerts[rule.name] = {
"start_time": datetime.now(),
"rule": rule,
"current_value": current_value
}
print(f"Alert triggered: {rule.name} (value: {current_value})")
else:
# Check existing alert duration
alert = self.active_alerts[rule.name]
duration = (datetime.now() - alert["start_time"]).total_seconds()
if duration >= rule.duration:
# Alert condition exceeded duration
await self._fire_alert(rule, current_value, duration)
else:
# Condition resolved
if rule.name in self.active_alerts:
del self.active_alerts[rule.name]
print(f"Alert resolved: {rule.name}")
def _condition_met(self, rule: AlertRule, value: float) -> bool:
"""Check alert condition"""
if ">" in rule.condition:
return value > rule.threshold
elif "<" in rule.condition:
return value < rule.threshold
return False
async def _fire_alert(self, rule: AlertRule, value: float, duration: float):
"""Fire alert"""
alert_data = {
"alert_name": rule.name,
"severity": rule.severity,
"current_value": value,
"threshold": rule.threshold,
"duration": duration,
"timestamp": datetime.now().isoformat(),
"action": rule.action
}
# Send notifications
await self._send_alert_notification(alert_data)
# Execute automated response actions
await self._execute_alert_action(rule.action, alert_data)
async def _send_alert_notification(self, alert_data: Dict[str, Any]):
"""Send alert notification"""
# Slack notification
if self.config.get('slack_webhook'):
await self._send_slack_alert(alert_data)
# Email notification
if self.config.get('email_enabled'):
await self._send_email_alert(alert_data)
# PagerDuty notification (for critical alerts)
if alert_data['severity'] == 'critical':
await self._send_pagerduty_alert(alert_data)
async def _send_slack_alert(self, alert_data: Dict[str, Any]):
"""Slack notification"""
color = "warning" if alert_data['severity'] == "warning" else "danger"
message = {
"attachments": [{
"color": color,
"title": f"🤖 AI Agent Alert: {alert_data['alert_name']}",
"fields": [
{"title": "Severity", "value": alert_data['severity'], "short": True},
{"title": "Current Value", "value": str(alert_data['current_value']), "short": True},
{"title": "Threshold", "value": str(alert_data['threshold']), "short": True},
{"title": "Duration", "value": f"{alert_data['duration']:.0f}s", "short": True}
],
"footer": "AI Agent Monitoring",
"ts": int(datetime.now().timestamp())
}]
}
async with aiohttp.ClientSession() as session:
await session.post(
self.config['slack_webhook'],
json=message
)
async def _execute_alert_action(self, action: str, alert_data: Dict[str, Any]):
"""Execute alert action"""
actions = {
"scale_up": self._action_scale_up,
"immediate_intervention": self._action_immediate_intervention,
"performance_investigation": self._action_performance_investigation,
"rate_limiting": self._action_rate_limiting
}
if action in actions:
try:
await actions[action](alert_data)
print(f"Alert action executed: {action}")
except Exception as e:
print(f"Failed to execute alert action {action}: {e}")
async def _action_scale_up(self, alert_data: Dict[str, Any]):
"""Scale up action"""
# Send auto-scaling instruction to Agent Manager
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.config['agent_manager_url']}/scale-up",
json={"reason": "high_error_rate_alert"}
)
async def _action_immediate_intervention(self, alert_data: Dict[str, Any]):
"""Immediate intervention action"""
# Immediately notify operations team
# Implement temporary load limiting
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.config['agent_manager_url']}/emergency-mode",
json={"alert_data": alert_data}
)
async def _cost_tracker(self):
"""Cost tracking"""
while True:
try:
# Calculate costs from token usage
hourly_costs = await self._calculate_hourly_costs()
# Monitor cost budget
await self._monitor_cost_budget(hourly_costs)
await asyncio.sleep(3600) # Every hour
except Exception as e:
print(f"Cost tracker error: {e}")
await asyncio.sleep(3600)
async def _calculate_hourly_costs(self) -> Dict[str, float]:
"""Calculate hourly costs"""
# Model pricing table
pricing = {
"gpt-5": {"input": 0.002, "output": 0.006}, # per 1k tokens
"claude-opus-4.1": {"input": 0.015, "output": 0.075},
"gemini-2.0-flash": {"input": 0.001, "output": 0.002}
}
costs = {}
# Get token usage for each model and calculate costs
# Implementation simplified
return costs
# Configuration and deployment Kubernetes manifest
observability_config = """
apiVersion: v1
kind: ConfigMap
metadata:
name: observability-config
namespace: ai-agents
data:
config.yaml: |
jaeger_host: jaeger-collector.monitoring.svc.cluster.local
jaeger_port: 14268
prometheus_url: http://prometheus.monitoring.svc.cluster.local:9090
slack_webhook: "${{ secrets.SLACK_WEBHOOK_URL }}"
agent_manager_url: http://agent-manager.ai-agents.svc.cluster.local:8080
cost_budget:
daily_limit: 1000.0
monthly_limit: 25000.0
alert_channels:
- slack
- email
- pagerduty
"""
# Usage example
async def start_observability_system():
"""Start observability system"""
config = {
"jaeger_host": "jaeger-collector",
"jaeger_port": 14268,
"slack_webhook": "https://hooks.slack.com/services/...",
"agent_manager_url": "http://agent-manager:8080"
}
observability = AIAgentObservability(config)
await observability.start_monitoring()
if __name__ == "__main__":
asyncio.run(start_observability_system())
Performance Optimization and Scaling¶
1. Dynamic Load Balancing System¶
# performance/load_balancer.py
import asyncio
import random
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import aiohttp
import time
class LoadBalancingStrategy(Enum):
ROUND_ROBIN = "round_robin"
LEAST_CONNECTIONS = "least_connections"
WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
AI_OPTIMIZED = "ai_optimized"
@dataclass
class AgentEndpoint:
id: str
url: str
weight: int
current_connections: int
average_response_time: float
success_rate: float
capacity: int
agent_type: str
class IntelligentLoadBalancer:
"""AI-optimized load balancer"""
def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.AI_OPTIMIZED):
self.strategy = strategy
self.endpoints: Dict[str, AgentEndpoint] = {}
self.request_history = []
self.round_robin_counter = 0
def register_endpoint(self, endpoint: AgentEndpoint):
"""Register endpoint"""
self.endpoints[endpoint.id] = endpoint
print(f"Endpoint registered: {endpoint.id} ({endpoint.agent_type})")
async def select_endpoint(
self,
request_type: str,
estimated_complexity: float = 1.0
) -> Optional[AgentEndpoint]:
"""Select optimal endpoint"""
available_endpoints = [
ep for ep in self.endpoints.values()
if ep.current_connections < ep.capacity
]
if not available_endpoints:
return None
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
return self._round_robin_select(available_endpoints)
elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
return self._least_connections_select(available_endpoints)
elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
return self._weighted_round_robin_select(available_endpoints)
elif self.strategy == LoadBalancingStrategy.AI_OPTIMIZED:
return await self._ai_optimized_select(
available_endpoints,
request_type,
estimated_complexity
)
def _round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
"""Round robin selection"""
selected = endpoints[self.round_robin_counter % len(endpoints)]
self.round_robin_counter += 1
return selected
def _least_connections_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
"""Least connections selection"""
return min(endpoints, key=lambda ep: ep.current_connections)
def _weighted_round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
"""Weighted round robin selection"""
weighted_endpoints = []
for ep in endpoints:
weighted_endpoints.extend([ep] * ep.weight)
if weighted_endpoints:
return random.choice(weighted_endpoints)
return endpoints[0]
async def _ai_optimized_select(
self,
endpoints: List[AgentEndpoint],
request_type: str,
estimated_complexity: float
) -> AgentEndpoint:
"""AI-optimized selection"""
# Calculate score for each endpoint
scores = []
for ep in endpoints:
# Base score (success rate and response time)
success_score = ep.success_rate
response_time_score = 1.0 / (1.0 + ep.average_response_time)
# Load score
load_score = 1.0 - (ep.current_connections / ep.capacity)
# Compatibility score (request type and agent type compatibility)
compatibility_score = await self._calculate_compatibility(
request_type,
ep.agent_type
)
# Complexity fitness score
complexity_score = await self._calculate_complexity_fitness(
estimated_complexity,
ep
)
# Total score
total_score = (
success_score * 0.3 +
response_time_score * 0.25 +
load_score * 0.2 +
compatibility_score * 0.15 +
complexity_score * 0.1
)
scores.append((ep, total_score))
# Select highest scoring endpoint
return max(scores, key=lambda x: x[1])[0]
async def _calculate_compatibility(self, request_type: str, agent_type: str) -> float:
"""Calculate request and agent compatibility"""
compatibility_matrix = {
"code_generation": {
"code_generator": 1.0,
"security_scanner": 0.3,
"performance_tester": 0.2
},
"security_analysis": {
"security_scanner": 1.0,
"code_generator": 0.4,
"performance_tester": 0.1
},
"performance_test": {
"performance_tester": 1.0,
"code_generator": 0.3,
"security_scanner": 0.2
}
}
return compatibility_matrix.get(request_type, {}).get(agent_type, 0.5)
async def _calculate_complexity_fitness(
self,
estimated_complexity: float,
endpoint: AgentEndpoint
) -> float:
"""Calculate complexity fitness"""
# Fitness between endpoint processing capability and estimated complexity
if estimated_complexity <= 0.3: # Simple tasks
return 1.0 # Any endpoint can handle
elif estimated_complexity <= 0.7: # Medium tasks
# Average capability endpoints are optimal
return 1.0 - abs(endpoint.weight - 5) / 5.0
else: # Complex tasks
# High-performance endpoints needed
return endpoint.weight / 10.0
async def execute_request(
self,
request_data: Dict[str, Any],
request_type: str,
estimated_complexity: float = 1.0
) -> Dict[str, Any]:
"""Execute request"""
endpoint = await self.select_endpoint(request_type, estimated_complexity)
if not endpoint:
return {
"success": False,
"error": "No available endpoints",
"retry_after": 30
}
# Increase connection count
endpoint.current_connections += 1
start_time = time.time()
try:
# Execute actual request
result = await self._execute_on_endpoint(endpoint, request_data)
execution_time = time.time() - start_time
# Update statistics
await self._update_endpoint_stats(endpoint, execution_time, True)
return {
"success": True,
"result": result,
"endpoint_id": endpoint.id,
"execution_time": execution_time
}
except Exception as e:
execution_time = time.time() - start_time
await self._update_endpoint_stats(endpoint, execution_time, False)
return {
"success": False,
"error": str(e),
"endpoint_id": endpoint.id,
"execution_time": execution_time
}
finally:
# Decrease connection count
endpoint.current_connections = max(0, endpoint.current_connections - 1)
async def _execute_on_endpoint(
self,
endpoint: AgentEndpoint,
request_data: Dict[str, Any]
) -> Dict[str, Any]:
"""Execute request on endpoint"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{endpoint.url}/execute",
json=request_data,
timeout=aiohttp.ClientTimeout(total=300)
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"HTTP {response.status}: {await response.text()}")
async def _update_endpoint_stats(
self,
endpoint: AgentEndpoint,
execution_time: float,
success: bool
):
"""Update endpoint statistics"""
# Update response time moving average
alpha = 0.1 # Smoothing factor
endpoint.average_response_time = (
alpha * execution_time +
(1 - alpha) * endpoint.average_response_time
)
# Update success rate
self.request_history.append(success)
# Calculate success rate for recent 100 requests
recent_history = self.request_history[-100:]
endpoint.success_rate = sum(recent_history) / len(recent_history)
async def health_check(self):
"""Endpoint health check"""
while True:
try:
for endpoint in list(self.endpoints.values()):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{endpoint.url}/health",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status != 200:
print(f"Endpoint {endpoint.id} health check failed")
# Process to temporarily disable endpoint
except Exception as e:
print(f"Health check error for {endpoint.id}: {e}")
await asyncio.sleep(30) # Health check every 30 seconds
except Exception as e:
print(f"Health check loop error: {e}")
await asyncio.sleep(60)
# Usage example
async def example_load_balancer():
"""Load balancer usage example"""
balancer = IntelligentLoadBalancer(LoadBalancingStrategy.AI_OPTIMIZED)
# Register endpoints
endpoints = [
AgentEndpoint(
id="agent-1",
url="http://agent-1:8080",
weight=8,
current_connections=0,
average_response_time=2.5,
success_rate=0.95,
capacity=10,
agent_type="code_generator"
),
AgentEndpoint(
id="agent-2",
url="http://agent-2:8080",
weight=6,
current_connections=0,
average_response_time=3.1,
success_rate=0.92,
capacity=8,
agent_type="security_scanner"
)
]
for endpoint in endpoints:
balancer.register_endpoint(endpoint)
# Start health check
health_task = asyncio.create_task(balancer.health_check())
# Request execution example
request_result = await balancer.execute_request(
request_data={"task": "generate_function", "language": "python"},
request_type="code_generation",
estimated_complexity=0.5
)
print(f"Request result: {request_result}")
if __name__ == "__main__":
asyncio.run(example_load_balancer())
2. Cost Optimization System¶
# cost/optimization_engine.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass
import numpy as np
@dataclass
class CostMetrics:
model_name: str
input_tokens: int
output_tokens: int
request_count: int
total_cost: float
timestamp: datetime
class CostOptimizationEngine:
"""Cost optimization engine"""
def __init__(self, budget_config: Dict[str, Any]):
self.budget_config = budget_config
self.cost_history: List[CostMetrics] = []
# Model pricing table (per 1K tokens)
self.pricing = {
"gpt-5": {"input": 0.002, "output": 0.006},
"gpt-4o": {"input": 0.0015, "output": 0.002},
"claude-opus-4.1": {"input": 0.015, "output": 0.075},
"claude-sonnet-3.5": {"input": 0.003, "output": 0.015},
"gemini-2.0-flash": {"input": 0.001, "output": 0.002}
}
# Model performance table (relative quality scores)
self.model_performance = {
"gpt-5": {"quality": 0.95, "speed": 0.8},
"gpt-4o": {"quality": 0.9, "speed": 0.9},
"claude-opus-4.1": {"quality": 0.92, "speed": 0.7},
"claude-sonnet-3.5": {"quality": 0.88, "speed": 0.85},
"gemini-2.0-flash": {"quality": 0.85, "speed": 0.95}
}
async def optimize_model_selection(
self,
task_requirements: Dict[str, Any]
) -> Dict[str, Any]:
"""Optimize model selection based on task requirements"""
complexity = task_requirements.get("complexity", 0.5)
quality_priority = task_requirements.get("quality_priority", 0.7)
speed_priority = task_requirements.get("speed_priority", 0.3)
budget_limit = task_requirements.get("budget_limit")
model_scores = {}
for model, performance in self.model_performance.items():
# Estimate cost calculation
estimated_tokens = self._estimate_token_usage(task_requirements, model)
estimated_cost = self._calculate_cost(model, estimated_tokens)
# Budget constraint check
if budget_limit and estimated_cost > budget_limit:
continue
# Score calculation
quality_score = performance["quality"]
speed_score = performance["speed"]
cost_score = 1.0 / (1.0 + estimated_cost) # Lower cost = higher score
# Adjust quality requirements based on complexity
quality_weight = quality_priority * (0.5 + complexity)
total_score = (
quality_score * quality_weight +
speed_score * speed_priority +
cost_score * (1.0 - quality_priority - speed_priority)
)
model_scores[model] = {
"score": total_score,
"estimated_cost": estimated_cost,
"estimated_tokens": estimated_tokens,
"quality_score": quality_score,
"speed_score": speed_score
}
if not model_scores:
return {"error": "No models meet budget constraints"}
# Select highest scoring model
best_model = max(model_scores.items(), key=lambda x: x[1]["score"])
return {
"recommended_model": best_model[0],
"reasoning": best_model[1],
"alternatives": dict(sorted(
model_scores.items(),
key=lambda x: x[1]["score"],
reverse=True
))
}
def _estimate_token_usage(
self,
task_requirements: Dict[str, Any],
model: str
) -> Dict[str, int]:
"""Estimate token usage"""
complexity = task_requirements.get("complexity", 0.5)
task_type = task_requirements.get("type", "general")
# Base tokens by task type
base_tokens = {
"code_generation": {"input": 500, "output": 2000},
"security_analysis": {"input": 1000, "output": 1500},
"performance_testing": {"input": 800, "output": 1200},
"general": {"input": 300, "output": 800}
}
base = base_tokens.get(task_type, base_tokens["general"])
# Adjust for complexity
complexity_multiplier = 0.5 + complexity * 1.5
# Adjust for model characteristics
model_efficiency = {
"gpt-5": 1.0,
"gpt-4o": 1.1,
"claude-opus-4.1": 0.9,
"claude-sonnet-3.5": 1.0,
"gemini-2.0-flash": 1.2
}
efficiency = model_efficiency.get(model, 1.0)
return {
"input": int(base["input"] * complexity_multiplier * efficiency),
"output": int(base["output"] * complexity_multiplier * efficiency)
}
def _calculate_cost(self, model: str, token_usage: Dict[str, int]) -> float:
"""Calculate cost"""
if model not in self.pricing:
return 0.0
pricing = self.pricing[model]
input_cost = (token_usage["input"] / 1000) * pricing["input"]
output_cost = (token_usage["output"] / 1000) * pricing["output"]
return input_cost + output_cost
async def analyze_spending_patterns(self) -> Dict[str, Any]:
"""Analyze spending patterns"""
if not self.cost_history:
return {"error": "No cost history available"}
# Prepare time series data
daily_costs = {}
model_costs = {}
for metric in self.cost_history:
date_key = metric.timestamp.date().isoformat()
if date_key not in daily_costs:
daily_costs[date_key] = 0.0
daily_costs[date_key] += metric.total_cost
if metric.model_name not in model_costs:
model_costs[metric.model_name] = 0.0
model_costs[metric.model_name] += metric.total_cost
# Trend analysis
dates = sorted(daily_costs.keys())
costs = [daily_costs[date] for date in dates]
if len(costs) >= 7:
# Weekly trend
week_trend = np.polyfit(range(len(costs[-7:])), costs[-7:], 1)[0]
else:
week_trend = 0.0
# Budget burn rate
total_spent = sum(costs)
monthly_budget = self.budget_config.get("monthly_limit", 10000)
burn_rate = total_spent / monthly_budget
# Generate optimization suggestions
optimization_suggestions = await self._generate_optimization_suggestions(
model_costs, total_spent, week_trend
)
return {
"total_spent": total_spent,
"daily_average": np.mean(costs) if costs else 0.0,
"weekly_trend": week_trend,
"budget_burn_rate": burn_rate,
"model_breakdown": model_costs,
"optimization_suggestions": optimization_suggestions,
"projected_monthly_cost": total_spent + (week_trend * 4)
}
async def _generate_optimization_suggestions(
self,
model_costs: Dict[str, float],
total_spent: float,
trend: float
) -> List[Dict[str, Any]]:
"""Generate optimization suggestions"""
suggestions = []
# High-cost model analysis
if model_costs:
most_expensive = max(model_costs.items(), key=lambda x: x[1])
if most_expensive[1] > total_spent * 0.4: # 40%+ of total
suggestions.append({
"type": "model_substitution",
"priority": "high",
"description": f"{most_expensive[0]} usage frequency is too high",
"action": "Consider cheaper alternative models",
"potential_savings": most_expensive[1] * 0.3
})
# Increasing trend warning
if trend > 0:
suggestions.append({
"type": "spending_trend",
"priority": "medium",
"description": f"Spending increasing by ${trend:.2f} weekly",
"action": "Enhance usage monitoring",
"potential_savings": trend * 4 # Monthly projection
})
# Budget optimization
monthly_budget = self.budget_config.get("monthly_limit", 10000)
if total_spent > monthly_budget * 0.8:
suggestions.append({
"type": "budget_warning",
"priority": "critical",
"description": "Exceeded 80% of monthly budget",
"action": "Emergency cost reduction measures",
"potential_savings": total_spent - monthly_budget * 0.7
})
return suggestions
async def implement_cost_controls(
self,
control_config: Dict[str, Any]
) -> Dict[str, Any]:
"""Implement cost controls"""
implemented_controls = []
# Rate limiting setup
if control_config.get("enable_rate_limiting"):
rate_limits = {
"requests_per_hour": control_config.get("max_requests_per_hour", 1000),
"tokens_per_hour": control_config.get("max_tokens_per_hour", 100000),
"cost_per_hour": control_config.get("max_cost_per_hour", 100.0)
}
implemented_controls.append({
"type": "rate_limiting",
"config": rate_limits
})
# Auto model downgrade
if control_config.get("enable_auto_downgrade"):
downgrade_rules = {
"cost_threshold": control_config.get("downgrade_cost_threshold", 0.1),
"model_mapping": {
"gpt-5": "gpt-4o",
"claude-opus-4.1": "claude-sonnet-3.5"
}
}
implemented_controls.append({
"type": "auto_downgrade",
"config": downgrade_rules
})
# Budget alerts
if control_config.get("enable_budget_alerts"):
alert_thresholds = [0.5, 0.8, 0.9, 1.0] # 50%, 80%, 90%, 100%
implemented_controls.append({
"type": "budget_alerts",
"config": {"thresholds": alert_thresholds}
})
return {
"success": True,
"implemented_controls": implemented_controls,
"effective_date": datetime.now().isoformat()
}
# Usage example
async def demonstrate_cost_optimization():
"""Cost optimization usage example"""
budget_config = {
"daily_limit": 500.0,
"monthly_limit": 15000.0,
"alert_thresholds": [0.5, 0.8, 0.9]
}
optimizer = CostOptimizationEngine(budget_config)
# Task optimization example
task_requirements = {
"type": "code_generation",
"complexity": 0.7,
"quality_priority": 0.8,
"speed_priority": 0.2,
"budget_limit": 0.5
}
optimization_result = await optimizer.optimize_model_selection(task_requirements)
print(f"Optimization result: {optimization_result}")
# Spending analysis example
spending_analysis = await optimizer.analyze_spending_patterns()
print(f"Spending analysis: {spending_analysis}")
if __name__ == "__main__":
asyncio.run(demonstrate_cost_optimization())
Deployment Automation¶
1. Kubernetes Helm Chart¶
# helm/ai-agents/Chart.yaml
apiVersion: v2
name: ai-agents
description: Enterprise AI Agent Platform
type: application
version: 1.0.0
appVersion: "1.0.0"
---
# helm/ai-agents/values.yaml
global:
imageRegistry: "your-registry.com"
imagePullSecrets:
- regcred
agentManager:
replicaCount: 3
image:
repository: ai-agent-manager
tag: "latest"
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 8080
resources:
limits:
cpu: 1000m
memory: 2Gi
requests:
cpu: 500m
memory: 1Gi
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
loadBalancer:
replicaCount: 2
image:
repository: ai-load-balancer
tag: "latest"
service:
type: LoadBalancer
port: 80
targetPort: 8080
agents:
codeGenerator:
enabled: true
replicaCount: 2
image:
repository: ai-code-generator
tag: "latest"
resources:
limits:
cpu: 2000m
memory: 4Gi
requests:
cpu: 1000m
memory: 2Gi
securityScanner:
enabled: true
replicaCount: 1
image:
repository: ai-security-scanner
tag: "latest"
resources:
limits:
cpu: 1000m
memory: 2Gi
requests:
cpu: 500m
memory: 1Gi
redis:
enabled: true
architecture: replication
auth:
enabled: true
password: "${{ secrets.REDIS_PASSWORD }}"
monitoring:
prometheus:
enabled: true
grafana:
enabled: true
jaeger:
enabled: true
security:
networkPolicies:
enabled: true
podSecurityPolicy:
enabled: true
serviceAccount:
create: true
name: ai-agent-sa
---
# helm/ai-agents/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "ai-agents.fullname" . }}-manager
labels:
{{- include "ai-agents.labels" . | nindent 4 }}
component: manager
spec:
replicas: {{ .Values.agentManager.replicaCount }}
selector:
matchLabels:
{{- include "ai-agents.selectorLabels" . | nindent 6 }}
component: manager
template:
metadata:
labels:
{{- include "ai-agents.selectorLabels" . | nindent 8 }}
component: manager
spec:
serviceAccountName: {{ include "ai-agents.serviceAccountName" . }}
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 2000
containers:
- name: manager
image: "{{ .Values.global.imageRegistry }}/{{ .Values.agentManager.image.repository }}:{{ .Values.agentManager.image.tag }}"
imagePullPolicy: {{ .Values.agentManager.image.pullPolicy }}
ports:
- name: http
containerPort: 8080
protocol: TCP
- name: metrics
containerPort: 9090
protocol: TCP
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: http
initialDelaySeconds: 5
periodSeconds: 5
env:
- name: REDIS_URL
value: "redis://{{ include "ai-agents.redis.fullname" . }}:6379"
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "ai-agents.redis.secretName" . }}
key: redis-password
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
resources:
{{- toYaml .Values.agentManager.resources | nindent 10 }}
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
readOnlyRootFilesystem: true
---
# helm/ai-agents/templates/hpa.yaml
{{- if .Values.agentManager.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "ai-agents.fullname" . }}-manager-hpa
labels:
{{- include "ai-agents.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "ai-agents.fullname" . }}-manager
minReplicas: {{ .Values.agentManager.autoscaling.minReplicas }}
maxReplicas: {{ .Values.agentManager.autoscaling.maxReplicas }}
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.agentManager.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: ai_agent_queue_length
target:
type: AverageValue
averageValue: "10"
{{- end }}
---
# helm/ai-agents/templates/networkpolicy.yaml
{{- if .Values.security.networkPolicies.enabled }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: {{ include "ai-agents.fullname" . }}-network-policy
labels:
{{- include "ai-agents.labels" . | nindent 4 }}
spec:
podSelector:
matchLabels:
{{- include "ai-agents.selectorLabels" . | nindent 6 }}
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
{{- include "ai-agents.selectorLabels" . | nindent 10 }}
- namespaceSelector:
matchLabels:
name: monitoring
ports:
- protocol: TCP
port: 8080
- protocol: TCP
port: 9090
egress:
- to:
- podSelector:
matchLabels:
app.kubernetes.io/name: redis
ports:
- protocol: TCP
port: 6379
- to: []
ports:
- protocol: TCP
port: 443 # HTTPS for external API calls
- protocol: TCP
port: 53 # DNS
- protocol: UDP
port: 53 # DNS
{{- end }}
2. CI/CD Pipeline¶
# .github/workflows/ai-agents-cicd.yml
name: AI Agents CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'ai-agents/**'
- 'helm/ai-agents/**'
- '.github/workflows/ai-agents-cicd.yml'
pull_request:
branches: [main]
paths:
- 'ai-agents/**'
- 'helm/ai-agents/**'
env:
REGISTRY: ghcr.io
IMAGE_PREFIX: ${{ github.repository }}/ai-agents
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@master
with:
scan-type: 'fs'
scan-ref: './ai-agents'
format: 'sarif'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'trivy-results.sarif'
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.11', '3.12']
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
cd ai-agents
pip install -r requirements.txt
pip install pytest pytest-cov pytest-asyncio
- name: Run unit tests
run: |
cd ai-agents
pytest tests/ --cov=src --cov-report=xml --cov-report=html
- name: Upload coverage reports
uses: codecov/codecov-action@v3
with:
file: ./ai-agents/coverage.xml
flags: unittests
name: codecov-umbrella
build:
needs: [security-scan, test]
runs-on: ubuntu-latest
strategy:
matrix:
component: [manager, load-balancer, code-generator, security-scanner]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix=${{ github.ref_name }}-
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./ai-agents/${{ matrix.component }}
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Sign container image
env:
COSIGN_EXPERIMENTAL: 1
run: |
cosign sign --yes ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}:${{ github.sha }}
helm-test:
needs: build
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Helm
uses: azure/setup-helm@v3
with:
version: 'v3.12.0'
- name: Lint Helm chart
run: |
helm lint helm/ai-agents
- name: Template Helm chart
run: |
helm template ai-agents helm/ai-agents \
--values helm/ai-agents/values.yaml \
--output-dir /tmp/helm-output
- name: Validate Kubernetes manifests
run: |
kubectl --dry-run=client apply -f /tmp/helm-output/ai-agents/templates/
deploy-staging:
if: github.ref == 'refs/heads/develop'
needs: [build, helm-test]
runs-on: ubuntu-latest
environment: staging
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2
- name: Update kubeconfig
run: |
aws eks update-kubeconfig --name ai-agents-staging
- name: Deploy to staging
run: |
helm upgrade --install ai-agents-staging helm/ai-agents \
--namespace ai-agents-staging \
--create-namespace \
--values helm/ai-agents/values-staging.yaml \
--set global.imageTag=${{ github.sha }} \
--wait --timeout=600s
- name: Run integration tests
run: |
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=ai-agents \
-n ai-agents-staging --timeout=300s
./scripts/integration-tests.sh staging
deploy-production:
if: github.ref == 'refs/heads/main'
needs: [build, helm-test]
runs-on: ubuntu-latest
environment: production
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_PROD }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PROD }}
aws-region: us-west-2
- name: Update kubeconfig
run: |
aws eks update-kubeconfig --name ai-agents-production
- name: Blue-Green Deployment
run: |
# Check current version
CURRENT_VERSION=$(helm get values ai-agents-prod -n ai-agents-prod -o json | jq -r '.global.imageTag')
# Deploy new version to blue environment
helm upgrade --install ai-agents-blue helm/ai-agents \
--namespace ai-agents-blue \
--create-namespace \
--values helm/ai-agents/values-production.yaml \
--set global.imageTag=${{ github.sha }} \
--set service.type=ClusterIP \
--wait --timeout=600s
- name: Health check and smoke tests
run: |
./scripts/health-check.sh blue
./scripts/smoke-tests.sh blue
- name: Switch traffic to blue
run: |
# Update Istio VirtualService to switch traffic
kubectl patch virtualservice ai-agents-vs -n ai-agents-prod \
--type='json' \
-p='[{"op": "replace", "path": "/spec/http/0/route/0/destination/subset", "value": "blue"}]'
- name: Monitor deployment
run: |
./scripts/monitor-deployment.sh 300 # Monitor for 5 minutes
- name: Cleanup old version
if: success()
run: |
helm uninstall ai-agents-green -n ai-agents-green || true
notify:
if: always()
needs: [deploy-staging, deploy-production]
runs-on: ubuntu-latest
steps:
- name: Notify Slack
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
channel: '#ai-agents-deployments'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
Operations Manual and Best Practices¶
1. Operations Checklist¶
# AI Agent Operations Checklist
## Daily Operations Check
### System Health Verification
- [ ] Verify all agent status
- [ ] Monitor response times (target: 95%ile < 5 seconds)
- [ ] Monitor error rates (target: < 1%)
- [ ] Check resource utilization (CPU < 80%, Memory < 85%)
### Cost Monitoring
- [ ] Daily cost verification
- [ ] Budget burn rate check
- [ ] Investigate abnormal usage
### Security Verification
- [ ] Check for suspicious access logs
- [ ] Monitor authentication failure count
- [ ] Secret rotation status
## Weekly Operations Check
### Performance Analysis
- [ ] Review weekly performance report
- [ ] Analyze scaling history
- [ ] Identify bottlenecks and improvement plans
### Capacity Planning
- [ ] Analyze resource usage trends
- [ ] Forecast next month capacity
- [ ] Adjust scaling thresholds
### Update Management
- [ ] Apply security patches
- [ ] Check dependency updates
- [ ] Apply configuration changes
## Monthly Operations Check
### Comprehensive Review
- [ ] Create monthly operations report
- [ ] Verify SLA achievement
- [ ] Analyze incidents and action items
- [ ] Identify operations process improvements
### Disaster Recovery Testing
- [ ] Backup recovery test
- [ ] DR environment verification
- [ ] Update recovery procedures
2. Troubleshooting Guide¶
# troubleshooting/diagnostic_tools.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import aiohttp
import subprocess
class DiagnosticTools:
"""AI agent diagnostic tools"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.checks = {
"connectivity": self._check_connectivity,
"performance": self._check_performance,
"resources": self._check_resources,
"security": self._check_security,
"costs": self._check_costs
}
async def run_full_diagnostic(self) -> Dict[str, Any]:
"""Run full diagnostic"""
print("Starting comprehensive diagnostic...")
results = {}
for check_name, check_func in self.checks.items():
try:
print(f"Running {check_name} check...")
results[check_name] = await check_func()
print(f"✅ {check_name} check completed")
except Exception as e:
print(f"❌ {check_name} check failed: {e}")
results[check_name] = {"error": str(e)}
# Overall assessment
overall_health = self._calculate_overall_health(results)
return {
"timestamp": datetime.now().isoformat(),
"overall_health": overall_health,
"detailed_results": results,
"recommendations": self._generate_recommendations(results)
}
async def _check_connectivity(self) -> Dict[str, Any]:
"""Connectivity check"""
endpoints = [
{"name": "Agent Manager", "url": f"{self.config['agent_manager_url']}/health"},
{"name": "Redis", "url": f"{self.config['redis_url']}/ping"},
{"name": "Prometheus", "url": f"{self.config['prometheus_url']}/api/v1/targets"}
]
results = []
for endpoint in endpoints:
try:
async with aiohttp.ClientSession() as session:
start_time = datetime.now()
async with session.get(endpoint["url"], timeout=10) as response:
response_time = (datetime.now() - start_time).total_seconds()
results.append({
"endpoint": endpoint["name"],
"status": "healthy" if response.status == 200 else "unhealthy",
"response_time": response_time,
"status_code": response.status
})
except Exception as e:
results.append({
"endpoint": endpoint["name"],
"status": "error",
"error": str(e)
})
return {
"status": "healthy" if all(r["status"] == "healthy" for r in results) else "degraded",
"endpoints": results
}
async def _check_performance(self) -> Dict[str, Any]:
"""Performance check"""
# Get metrics from Prometheus
metrics_queries = {
"avg_response_time": "avg(ai_agent_response_time_seconds)",
"error_rate": "rate(ai_agent_requests_total{status='error'}[5m])",
"throughput": "rate(ai_agent_requests_total[5m])",
"active_agents": "count(up{job='ai-agents'})"
}
results = {}
async with aiohttp.ClientSession() as session:
for metric_name, query in metrics_queries.items():
try:
async with session.get(
f"{self.config['prometheus_url']}/api/v1/query",
params={"query": query}
) as response:
data = await response.json()
if data["data"]["result"]:
value = float(data["data"]["result"][0]["value"][1])
results[metric_name] = value
else:
results[metric_name] = None
except Exception as e:
results[f"{metric_name}_error"] = str(e)
# Performance evaluation
performance_status = "healthy"
if results.get("avg_response_time", 0) > 10:
performance_status = "degraded"
if results.get("error_rate", 0) > 0.05:
performance_status = "unhealthy"
return {
"status": performance_status,
"metrics": results,
"thresholds": {
"max_response_time": 10.0,
"max_error_rate": 0.05
}
}
async def _check_resources(self) -> Dict[str, Any]:
"""Resource check"""
# Check Kubernetes resource usage
try:
# Execute kubectl top pods
result = subprocess.run(
["kubectl", "top", "pods", "-n", "ai-agents", "--no-headers"],
capture_output=True,
text=True,
check=True
)
pod_resources = []
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split()
pod_resources.append({
"pod": parts[0],
"cpu": parts[1],
"memory": parts[2]
})
# Check node resources
node_result = subprocess.run(
["kubectl", "top", "nodes", "--no-headers"],
capture_output=True,
text=True,
check=True
)
node_resources = []
for line in node_result.stdout.strip().split('\n'):
if line:
parts = line.split()
node_resources.append({
"node": parts[0],
"cpu": parts[1],
"cpu_percent": parts[2],
"memory": parts[3],
"memory_percent": parts[4]
})
return {
"status": "healthy",
"pods": pod_resources,
"nodes": node_resources
}
except subprocess.CalledProcessError as e:
return {
"status": "error",
"error": f"Failed to get resource information: {e}"
}
async def _check_security(self) -> Dict[str, Any]:
"""Security check"""
security_checks = []
# Check secret expiration
try:
result = subprocess.run(
["kubectl", "get", "secrets", "-n", "ai-agents", "-o", "json"],
capture_output=True,
text=True,
check=True
)
secrets_data = json.loads(result.stdout)
for secret in secrets_data["items"]:
creation_time = datetime.fromisoformat(
secret["metadata"]["creationTimestamp"].replace('Z', '+00:00')
)
age_days = (datetime.now(creation_time.tzinfo) - creation_time).days
status = "healthy"
if age_days > 90: # 90+ days old
status = "warning"
if age_days > 180: # 180+ days old
status = "critical"
security_checks.append({
"type": "secret_age",
"name": secret["metadata"]["name"],
"age_days": age_days,
"status": status
})
except Exception as e:
security_checks.append({
"type": "secret_check_error",
"error": str(e)
})
# Check network policies
try:
result = subprocess.run(
["kubectl", "get", "networkpolicies", "-n", "ai-agents"],
capture_output=True,
text=True,
check=True
)
if "No resources found" in result.stdout:
security_checks.append({
"type": "network_policy",
"status": "warning",
"message": "No network policies found"
})
else:
security_checks.append({
"type": "network_policy",
"status": "healthy",
"message": "Network policies configured"
})
except Exception as e:
security_checks.append({
"type": "network_policy_error",
"error": str(e)
})
overall_status = "healthy"
if any(check.get("status") == "warning" for check in security_checks):
overall_status = "warning"
if any(check.get("status") == "critical" for check in security_checks):
overall_status = "critical"
return {
"status": overall_status,
"checks": security_checks
}
async def _check_costs(self) -> Dict[str, Any]:
"""Cost check"""
# Compare with cost budget (simplified)
daily_budget = self.config.get("daily_budget", 500.0)
current_spend = await self._get_current_daily_spend()
budget_utilization = current_spend / daily_budget
status = "healthy"
if budget_utilization > 0.8:
status = "warning"
if budget_utilization > 1.0:
status = "critical"
return {
"status": status,
"daily_budget": daily_budget,
"current_spend": current_spend,
"budget_utilization": budget_utilization,
"projected_monthly": current_spend * 30
}
async def _get_current_daily_spend(self) -> float:
"""Get current daily spend"""
# In actual implementation, get from billing API or metrics
return 423.50 # Dummy value
def _calculate_overall_health(self, results: Dict[str, Any]) -> str:
"""Calculate overall health score"""
statuses = []
for check_result in results.values():
if isinstance(check_result, dict) and "status" in check_result:
statuses.append(check_result["status"])
if "error" in statuses or "critical" in statuses:
return "critical"
elif "unhealthy" in statuses:
return "unhealthy"
elif "warning" in statuses or "degraded" in statuses:
return "warning"
else:
return "healthy"
def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]:
"""Generate recommended actions"""
recommendations = []
# Performance related
perf_result = results.get("performance", {})
if perf_result.get("status") == "degraded":
recommendations.append("Consider scaling up for performance improvement")
# Security related
security_result = results.get("security", {})
if security_result.get("status") in ["warning", "critical"]:
recommendations.append("Perform secret rotation and security configuration review")
# Cost related
cost_result = results.get("costs", {})
if cost_result.get("status") in ["warning", "critical"]:
recommendations.append("Review model selection and rate limiting for cost optimization")
return recommendations
# Usage example and command line tool
async def main():
"""Main execution for diagnostic tool"""
config = {
"agent_manager_url": "http://agent-manager.ai-agents.svc.cluster.local:8080",
"redis_url": "redis://redis.ai-agents.svc.cluster.local:6379",
"prometheus_url": "http://prometheus.monitoring.svc.cluster.local:9090",
"daily_budget": 500.0
}
diagnostic = DiagnosticTools(config)
results = await diagnostic.run_full_diagnostic()
print(json.dumps(results, indent=2))
if __name__ == "__main__":
asyncio.run(main())
Summary¶
This article has provided a comprehensive explanation of the architecture and best practices needed for production environment operations of the latest AI agent development tools from August 2025.
Key Implementation Points¶
- Enterprise Architecture: Scalable and reliable system design
- Zero Trust Security: Strong security through multi-layered defense
- Comprehensive Monitoring: Complete visibility of operations through observability
- Cost Optimization: Automated cost management and budget control
Phased Implementation Plan¶
- Phase 1: Build basic architecture and security configuration
- Phase 2: Implement monitoring systems and automation
- Phase 3: Full operation and continuous improvement of cost optimization
Keys to Operational Success¶
- Continuous Monitoring: Constant system state monitoring and rapid response
- Automation Focus: Comprehensive automation to eliminate human errors
- Security First: Design with security as the highest priority
- Cost Awareness: Operations with constant awareness of cost efficiency
Production operations of AI agents can reliably improve enterprise value with proper architecture and operational processes.