AIエージェント本番運用完全ガイド - 2025年8月エンタープライズ対応実践アーキテクチャ¶
はじめに¶
AIエージェント開発の理論記事と実装ハンズオンガイドに続く第3弾として、本記事では本番環境での安全で効率的なAIエージェント運用に特化した実践的ガイドを提供します。
エンタープライズ環境での実際の運用で直面する課題と、それらを解決する実証済みアーキテクチャパターンを詳しく解説します。
この記事のポイント¶
エンタープライズ対応
大規模組織での安全なAIエージェント運用アーキテクチャの構築
自動スケーリング
負荷に応じた動的リソース管理とコスト最適化の自動化
セキュリティ強化
ゼロトラスト原則に基づくAIエージェントのセキュアな運用
包括的監視
AIエージェントの動作を360度で監視・分析するObservabilityシステム
エンタープライズアーキテクチャ設計¶
1. 全体アーキテクチャ概要¶
graph TB
subgraph "API Gateway Layer"
AGW[API Gateway] --> LB[Load Balancer]
end
subgraph "AI Agent Management Layer"
LB --> AM[Agent Manager]
AM --> AR[Agent Registry]
AM --> AS[Agent Scheduler]
end
subgraph "Execution Layer"
AS --> AE1[Agent Executor 1]
AS --> AE2[Agent Executor 2]
AS --> AE3[Agent Executor N]
end
subgraph "Data Layer"
AE1 --> MCP[MCP Servers]
AE2 --> DB[(Database)]
AE3 --> FS[(File System)]
end
subgraph "Observability Layer"
AM --> MON[Monitoring]
AE1 --> LOG[Logging]
AE2 --> TRC[Tracing]
AE3 --> MET[Metrics]
end
subgraph "Security Layer"
AGW --> IAM[Identity & Access]
AM --> VLT[Secrets Vault]
AE1 --> NET[Network Security]
end2. Agent Manager 実装¶
# infrastructure/agent_manager.py
import asyncio
import json
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import redis
import kubernetes
from prometheus_client import Counter, Histogram, Gauge
class AgentStatus(Enum):
IDLE = "idle"
BUSY = "busy"
FAILED = "failed"
MAINTENANCE = "maintenance"
@dataclass
class AgentInstance:
id: str
type: str
status: AgentStatus
capacity: int
current_load: int
created_at: datetime
last_heartbeat: datetime
metadata: Dict[str, Any]
class EnterpriseAgentManager:
"""エンタープライズ対応AIエージェントマネージャー"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.redis_client = redis.Redis.from_url(config['redis_url'])
self.k8s_client = kubernetes.client.ApiClient()
# Prometheus メトリクス
self.agent_counter = Counter('ai_agents_total', 'Total AI agents', ['type', 'status'])
self.task_duration = Histogram('ai_task_duration_seconds', 'Task execution time')
self.agent_load = Gauge('ai_agent_load', 'Current agent load', ['agent_id'])
# エージェントレジストリ
self.agents: Dict[str, AgentInstance] = {}
self.task_queue = asyncio.Queue()
async def start(self):
"""マネージャーの開始"""
print("Starting Enterprise Agent Manager...")
# バックグラウンドタスクの開始
tasks = [
asyncio.create_task(self._health_monitor()),
asyncio.create_task(self._task_scheduler()),
asyncio.create_task(self._auto_scaler()),
asyncio.create_task(self._metrics_collector())
]
await asyncio.gather(*tasks)
async def register_agent(self, agent_config: Dict[str, Any]) -> str:
"""エージェントの登録"""
agent_id = str(uuid.uuid4())
agent = AgentInstance(
id=agent_id,
type=agent_config['type'],
status=AgentStatus.IDLE,
capacity=agent_config.get('capacity', 10),
current_load=0,
created_at=datetime.now(),
last_heartbeat=datetime.now(),
metadata=agent_config.get('metadata', {})
)
self.agents[agent_id] = agent
# Redisに永続化
await self._persist_agent(agent)
# Kubernetesにエージェントポッドをデプロイ
await self._deploy_agent_pod(agent)
# メトリクス更新
self.agent_counter.labels(
type=agent.type,
status=agent.status.value
).inc()
print(f"Agent registered: {agent_id} (type: {agent.type})")
return agent_id
async def assign_task(
self,
task: Dict[str, Any],
agent_type: Optional[str] = None
) -> Dict[str, Any]:
"""タスクの割り当て"""
# 最適なエージェントを選択
agent = await self._select_best_agent(agent_type, task)
if not agent:
# エージェントが利用できない場合、自動スケーリング
agent = await self._auto_scale_agent(agent_type)
if not agent:
return {
"success": False,
"error": "No agents available",
"retry_after": 30
}
# タスクの実行
task_id = str(uuid.uuid4())
execution_result = await self._execute_task(agent, task_id, task)
return {
"success": True,
"task_id": task_id,
"agent_id": agent.id,
"result": execution_result
}
async def _select_best_agent(
self,
agent_type: Optional[str],
task: Dict[str, Any]
) -> Optional[AgentInstance]:
"""最適なエージェントを選択"""
available_agents = [
agent for agent in self.agents.values()
if (agent.status == AgentStatus.IDLE or
(agent.status == AgentStatus.BUSY and
agent.current_load < agent.capacity))
and (not agent_type or agent.type == agent_type)
]
if not available_agents:
return None
# 負荷に基づいて最適なエージェントを選択
return min(
available_agents,
key=lambda a: a.current_load / a.capacity
)
async def _execute_task(
self,
agent: AgentInstance,
task_id: str,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""タスクの実行"""
start_time = datetime.now()
try:
# エージェントの負荷を更新
agent.current_load += 1
agent.status = AgentStatus.BUSY if agent.current_load >= agent.capacity else AgentStatus.IDLE
# メトリクス更新
self.agent_load.labels(agent_id=agent.id).set(agent.current_load)
# 実際のタスク実行(エージェントポッドのAPIを呼び出し)
result = await self._call_agent_api(agent, task)
# 実行時間の記録
duration = (datetime.now() - start_time).total_seconds()
self.task_duration.observe(duration)
return {
"status": "completed",
"result": result,
"duration": duration
}
except Exception as e:
print(f"Task execution failed: {e}")
agent.status = AgentStatus.FAILED
return {
"status": "failed",
"error": str(e),
"duration": (datetime.now() - start_time).total_seconds()
}
finally:
# 負荷を減らす
agent.current_load = max(0, agent.current_load - 1)
if agent.current_load == 0:
agent.status = AgentStatus.IDLE
self.agent_load.labels(agent_id=agent.id).set(agent.current_load)
async def _call_agent_api(
self,
agent: AgentInstance,
task: Dict[str, Any]
) -> Dict[str, Any]:
"""エージェントAPIの呼び出し"""
import aiohttp
agent_url = f"http://agent-{agent.id}:8080/execute"
async with aiohttp.ClientSession() as session:
async with session.post(
agent_url,
json=task,
timeout=aiohttp.ClientTimeout(total=300) # 5分タイムアウト
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"Agent API error: {response.status}")
async def _health_monitor(self):
"""エージェントヘルスモニタリング"""
while True:
try:
current_time = datetime.now()
for agent in list(self.agents.values()):
# ハートビートチェック
if (current_time - agent.last_heartbeat).total_seconds() > 60:
print(f"Agent {agent.id} is unhealthy, removing...")
await self._remove_agent(agent.id)
await asyncio.sleep(30) # 30秒間隔でチェック
except Exception as e:
print(f"Health monitor error: {e}")
await asyncio.sleep(60)
async def _auto_scaler(self):
"""自動スケーリング"""
while True:
try:
# 各エージェントタイプの負荷状況を分析
agent_types = set(agent.type for agent in self.agents.values())
for agent_type in agent_types:
agents_of_type = [
agent for agent in self.agents.values()
if agent.type == agent_type
]
if not agents_of_type:
continue
# 平均負荷を計算
avg_load = sum(
agent.current_load / agent.capacity
for agent in agents_of_type
) / len(agents_of_type)
# スケーリング判定
if avg_load > 0.8: # 80%以上の負荷
print(f"Scaling up {agent_type} agents (load: {avg_load:.2f})")
await self._scale_up(agent_type)
elif avg_load < 0.2 and len(agents_of_type) > 1: # 20%未満の負荷
print(f"Scaling down {agent_type} agents (load: {avg_load:.2f})")
await self._scale_down(agent_type)
await asyncio.sleep(60) # 1分間隔でスケーリング判定
except Exception as e:
print(f"Auto scaler error: {e}")
await asyncio.sleep(120)
async def _scale_up(self, agent_type: str):
"""スケールアップ"""
agent_config = {
"type": agent_type,
"capacity": 10,
"metadata": {"auto_scaled": True}
}
await self.register_agent(agent_config)
async def _scale_down(self, agent_type: str):
"""スケールダウン"""
agents_of_type = [
agent for agent in self.agents.values()
if agent.type == agent_type and agent.current_load == 0
]
if agents_of_type:
# 最も古いアイドルエージェントを削除
oldest_agent = min(agents_of_type, key=lambda a: a.created_at)
await self._remove_agent(oldest_agent.id)
async def _deploy_agent_pod(self, agent: AgentInstance):
"""Kubernetesエージェントポッドのデプロイ"""
pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": f"agent-{agent.id}",
"labels": {
"app": "ai-agent",
"agent-type": agent.type,
"agent-id": agent.id
}
},
"spec": {
"containers": [{
"name": "agent",
"image": f"ai-agent-{agent.type}:latest",
"ports": [{"containerPort": 8080}],
"env": [
{"name": "AGENT_ID", "value": agent.id},
{"name": "AGENT_TYPE", "value": agent.type},
{"name": "MANAGER_URL", "value": self.config['manager_url']}
],
"resources": {
"requests": {"cpu": "100m", "memory": "256Mi"},
"limits": {"cpu": "500m", "memory": "1Gi"}
},
"livenessProbe": {
"httpGet": {"path": "/health", "port": 8080},
"initialDelaySeconds": 30,
"periodSeconds": 10
}
}],
"restartPolicy": "Always"
}
}
# Kubernetes APIを使用してポッドを作成
v1 = kubernetes.client.CoreV1Api(self.k8s_client)
try:
v1.create_namespaced_pod(
namespace="ai-agents",
body=pod_spec
)
print(f"Pod deployed for agent {agent.id}")
except Exception as e:
print(f"Failed to deploy pod for agent {agent.id}: {e}")
raise
async def _persist_agent(self, agent: AgentInstance):
"""エージェント情報の永続化"""
agent_data = asdict(agent)
agent_data['created_at'] = agent.created_at.isoformat()
agent_data['last_heartbeat'] = agent.last_heartbeat.isoformat()
agent_data['status'] = agent.status.value
self.redis_client.hset(
"agents",
agent.id,
json.dumps(agent_data)
)
async def _remove_agent(self, agent_id: str):
"""エージェントの削除"""
if agent_id in self.agents:
agent = self.agents[agent_id]
# Kubernetesポッドを削除
v1 = kubernetes.client.CoreV1Api(self.k8s_client)
try:
v1.delete_namespaced_pod(
name=f"agent-{agent_id}",
namespace="ai-agents"
)
except Exception as e:
print(f"Failed to delete pod for agent {agent_id}: {e}")
# メモリとRedisから削除
del self.agents[agent_id]
self.redis_client.hdel("agents", agent_id)
print(f"Agent removed: {agent_id}")
async def _metrics_collector(self):
"""メトリクス収集"""
while True:
try:
# エージェント状態のメトリクス更新
for agent in self.agents.values():
self.agent_counter.labels(
type=agent.type,
status=agent.status.value
).inc()
await asyncio.sleep(15) # 15秒間隔でメトリクス更新
except Exception as e:
print(f"Metrics collector error: {e}")
await asyncio.sleep(60)
# 使用例
async def start_enterprise_manager():
"""エンタープライズマネージャーの開始"""
config = {
"redis_url": "redis://redis-cluster:6379",
"manager_url": "http://agent-manager:8080",
"kubernetes_namespace": "ai-agents"
}
manager = EnterpriseAgentManager(config)
await manager.start()
if __name__ == "__main__":
asyncio.run(start_enterprise_manager())
セキュリティアーキテクチャ¶
1. ゼロトラストセキュリティモデル¶
# security/zero-trust-policy.yml
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
name: ai-agent-zero-trust
namespace: ai-agents
spec:
rules:
- from:
- source:
principals: ["cluster.local/ns/ai-agents/sa/agent-executor"]
to:
- operation:
methods: ["POST"]
paths: ["/api/v1/tasks"]
when:
- key: custom.agent_authenticated
values: ["true"]
- key: custom.task_validated
values: ["true"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: agent-executor
namespace: ai-agents
annotations:
iam.gke.io/gcp-service-account: ai-agent-sa@project.iam.gserviceaccount.com
---
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: ai-agent-tls
namespace: ai-agents
spec:
host: "*.ai-agents.svc.cluster.local"
trafficPolicy:
tls:
mode: ISTIO_MUTUAL
2. シークレット管理システム¶
# security/secrets_manager.py
import base64
import json
from typing import Dict, Any, Optional
from cryptography.fernet import Fernet
from kubernetes import client, config
import hvac # HashiCorp Vault client
class EnterpriseSecretsManager:
"""エンタープライズシークレット管理"""
def __init__(self, vault_url: str, vault_token: str):
self.vault_client = hvac.Client(url=vault_url, token=vault_token)
self.k8s_client = client.CoreV1Api()
self.encryption_key = self._get_or_create_encryption_key()
self.cipher = Fernet(self.encryption_key)
async def store_agent_secrets(
self,
agent_id: str,
secrets: Dict[str, str]
) -> bool:
"""エージェント用シークレットの保存"""
try:
# Vaultに暗号化して保存
encrypted_secrets = {
key: self.cipher.encrypt(value.encode()).decode()
for key, value in secrets.items()
}
vault_path = f"agents/{agent_id}"
self.vault_client.secrets.kv.v2.create_or_update_secret(
path=vault_path,
secret=encrypted_secrets
)
# Kubernetesシークレットとしても保存(アクセス高速化のため)
k8s_secret_data = {
key: base64.b64encode(value.encode()).decode()
for key, value in secrets.items()
}
secret_metadata = client.V1ObjectMeta(
name=f"agent-secrets-{agent_id}",
namespace="ai-agents",
labels={"agent-id": agent_id}
)
secret = client.V1Secret(
metadata=secret_metadata,
data=k8s_secret_data,
type="Opaque"
)
self.k8s_client.create_namespaced_secret(
namespace="ai-agents",
body=secret
)
return True
except Exception as e:
print(f"Failed to store secrets for agent {agent_id}: {e}")
return False
async def get_agent_secrets(self, agent_id: str) -> Dict[str, str]:
"""エージェントシークレットの取得"""
try:
# まずKubernetesから取得を試行(高速)
try:
secret = self.k8s_client.read_namespaced_secret(
name=f"agent-secrets-{agent_id}",
namespace="ai-agents"
)
return {
key: base64.b64decode(value).decode()
for key, value in secret.data.items()
}
except client.exceptions.ApiException:
# K8sにない場合はVaultから取得
vault_path = f"agents/{agent_id}"
response = self.vault_client.secrets.kv.v2.read_secret_version(
path=vault_path
)
encrypted_secrets = response['data']['data']
return {
key: self.cipher.decrypt(value.encode()).decode()
for key, value in encrypted_secrets.items()
}
except Exception as e:
print(f"Failed to get secrets for agent {agent_id}: {e}")
return {}
async def rotate_secrets(self, agent_id: str) -> bool:
"""シークレットのローテーション"""
try:
# 新しいAPIキーを生成
new_secrets = await self._generate_new_secrets(agent_id)
# 新しいシークレットを保存
await self.store_agent_secrets(agent_id, new_secrets)
# エージェントに新しいシークレットを通知
await self._notify_agent_secret_rotation(agent_id)
print(f"Secrets rotated for agent {agent_id}")
return True
except Exception as e:
print(f"Failed to rotate secrets for agent {agent_id}: {e}")
return False
async def audit_secret_access(self, agent_id: str, secret_key: str):
"""シークレットアクセスの監査ログ"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"secret_key": secret_key,
"action": "access",
"source_ip": self._get_client_ip()
}
# 監査ログをセキュアなログシステムに送信
await self._send_audit_log(audit_entry)
def _get_or_create_encryption_key(self) -> bytes:
"""暗号化キーの取得または作成"""
try:
# Vaultから暗号化キーを取得
response = self.vault_client.secrets.kv.v2.read_secret_version(
path="encryption/master-key"
)
return response['data']['data']['key'].encode()
except:
# 新しいキーを生成してVaultに保存
new_key = Fernet.generate_key()
self.vault_client.secrets.kv.v2.create_or_update_secret(
path="encryption/master-key",
secret={"key": new_key.decode()}
)
return new_key
async def _generate_new_secrets(self, agent_id: str) -> Dict[str, str]:
"""新しいシークレットの生成"""
import secrets
import string
# 強力なランダムパスワード生成
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
new_api_key = ''.join(secrets.choice(alphabet) for _ in range(32))
return {
"api_key": new_api_key,
"agent_token": str(uuid.uuid4()),
"generated_at": datetime.now().isoformat()
}
監視・Observabilityシステム¶
1. 包括的監視アーキテクチャ¶
# monitoring/observability_stack.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
from dataclasses import dataclass
import aiohttp
from prometheus_client import CollectorRegistry, Counter, Histogram, Gauge
from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
@dataclass
class AlertRule:
name: str
condition: str
threshold: float
duration: int # seconds
severity: str
action: str
class AIAgentObservability:
"""AIエージェント専用Observabilityシステム"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# Prometheus レジストリ
self.registry = CollectorRegistry()
# メトリクス定義
self.agent_requests = Counter(
'ai_agent_requests_total',
'Total agent requests',
['agent_type', 'status', 'endpoint'],
registry=self.registry
)
self.agent_response_time = Histogram(
'ai_agent_response_time_seconds',
'Agent response time',
['agent_type', 'complexity'],
registry=self.registry
)
self.agent_success_rate = Gauge(
'ai_agent_success_rate',
'Agent success rate',
['agent_type'],
registry=self.registry
)
self.token_usage = Counter(
'ai_agent_token_usage_total',
'Total token usage',
['agent_type', 'model'],
registry=self.registry
)
# トレーシング設定
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name=config.get('jaeger_host', 'localhost'),
agent_port=config.get('jaeger_port', 14268),
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# アラートルール
self.alert_rules = self._load_alert_rules()
self.active_alerts = {}
def _load_alert_rules(self) -> List[AlertRule]:
"""アラートルールの読み込み"""
return [
AlertRule(
name="high_error_rate",
condition="agent_success_rate < 0.9",
threshold=0.9,
duration=300, # 5 minutes
severity="warning",
action="scale_up"
),
AlertRule(
name="critical_error_rate",
condition="agent_success_rate < 0.8",
threshold=0.8,
duration=180, # 3 minutes
severity="critical",
action="immediate_intervention"
),
AlertRule(
name="high_response_time",
condition="agent_response_time > 30.0",
threshold=30.0,
duration=600, # 10 minutes
severity="warning",
action="performance_investigation"
),
AlertRule(
name="token_budget_exceeded",
condition="hourly_token_usage > 1000000",
threshold=1000000,
duration=60,
severity="critical",
action="rate_limiting"
)
]
async def start_monitoring(self):
"""監視システムの開始"""
print("Starting AI Agent Observability System...")
tasks = [
asyncio.create_task(self._metrics_collector()),
asyncio.create_task(self._alert_manager()),
asyncio.create_task(self._health_checker()),
asyncio.create_task(self._cost_tracker()),
asyncio.create_task(self._performance_analyzer())
]
await asyncio.gather(*tasks)
async def record_agent_request(
self,
agent_type: str,
endpoint: str,
status: str,
response_time: float,
token_count: int = 0,
model: str = "unknown"
):
"""エージェントリクエストの記録"""
# メトリクス更新
self.agent_requests.labels(
agent_type=agent_type,
status=status,
endpoint=endpoint
).inc()
self.agent_response_time.labels(
agent_type=agent_type,
complexity="standard" # 実際は動的に判定
).observe(response_time)
if token_count > 0:
self.token_usage.labels(
agent_type=agent_type,
model=model
).inc(token_count)
# トレーシング
with trace.get_tracer(__name__).start_as_current_span("agent_request") as span:
span.set_attribute("agent.type", agent_type)
span.set_attribute("agent.endpoint", endpoint)
span.set_attribute("agent.status", status)
span.set_attribute("agent.response_time", response_time)
span.set_attribute("agent.token_count", token_count)
async def _metrics_collector(self):
"""メトリクス収集"""
while True:
try:
# 成功率の計算
success_rates = await self._calculate_success_rates()
for agent_type, rate in success_rates.items():
self.agent_success_rate.labels(agent_type=agent_type).set(rate)
await asyncio.sleep(30) # 30秒間隔
except Exception as e:
print(f"Metrics collector error: {e}")
await asyncio.sleep(60)
async def _calculate_success_rates(self) -> Dict[str, float]:
"""エージェント成功率の計算"""
# Prometheusからメトリクスを取得して成功率を計算
# 実装簡略化のため、ダミーデータを返す
return {
"code_generator": 0.95,
"security_scanner": 0.98,
"performance_tester": 0.92
}
async def _alert_manager(self):
"""アラート管理"""
while True:
try:
for rule in self.alert_rules:
await self._evaluate_alert_rule(rule)
await asyncio.sleep(60) # 1分間隔でアラート評価
except Exception as e:
print(f"Alert manager error: {e}")
await asyncio.sleep(120)
async def _evaluate_alert_rule(self, rule: AlertRule):
"""アラートルールの評価"""
current_value = await self._get_metric_value(rule.condition)
if current_value is None:
return
# 条件に合致するかチェック
if self._condition_met(rule, current_value):
if rule.name not in self.active_alerts:
# 新しいアラート
self.active_alerts[rule.name] = {
"start_time": datetime.now(),
"rule": rule,
"current_value": current_value
}
print(f"Alert triggered: {rule.name} (value: {current_value})")
else:
# 既存アラートの継続チェック
alert = self.active_alerts[rule.name]
duration = (datetime.now() - alert["start_time"]).total_seconds()
if duration >= rule.duration:
# アラート条件が持続時間を超えた
await self._fire_alert(rule, current_value, duration)
else:
# 条件が解決された
if rule.name in self.active_alerts:
del self.active_alerts[rule.name]
print(f"Alert resolved: {rule.name}")
def _condition_met(self, rule: AlertRule, value: float) -> bool:
"""アラート条件の判定"""
if ">" in rule.condition:
return value > rule.threshold
elif "<" in rule.condition:
return value < rule.threshold
return False
async def _fire_alert(self, rule: AlertRule, value: float, duration: float):
"""アラートの発火"""
alert_data = {
"alert_name": rule.name,
"severity": rule.severity,
"current_value": value,
"threshold": rule.threshold,
"duration": duration,
"timestamp": datetime.now().isoformat(),
"action": rule.action
}
# 通知送信
await self._send_alert_notification(alert_data)
# 自動対応アクションの実行
await self._execute_alert_action(rule.action, alert_data)
async def _send_alert_notification(self, alert_data: Dict[str, Any]):
"""アラート通知の送信"""
# Slack通知
if self.config.get('slack_webhook'):
await self._send_slack_alert(alert_data)
# メール通知
if self.config.get('email_enabled'):
await self._send_email_alert(alert_data)
# PagerDuty通知(重要アラートの場合)
if alert_data['severity'] == 'critical':
await self._send_pagerduty_alert(alert_data)
async def _send_slack_alert(self, alert_data: Dict[str, Any]):
"""Slack通知"""
color = "warning" if alert_data['severity'] == "warning" else "danger"
message = {
"attachments": [{
"color": color,
"title": f"🤖 AI Agent Alert: {alert_data['alert_name']}",
"fields": [
{"title": "Severity", "value": alert_data['severity'], "short": True},
{"title": "Current Value", "value": str(alert_data['current_value']), "short": True},
{"title": "Threshold", "value": str(alert_data['threshold']), "short": True},
{"title": "Duration", "value": f"{alert_data['duration']:.0f}s", "short": True}
],
"footer": "AI Agent Monitoring",
"ts": int(datetime.now().timestamp())
}]
}
async with aiohttp.ClientSession() as session:
await session.post(
self.config['slack_webhook'],
json=message
)
async def _execute_alert_action(self, action: str, alert_data: Dict[str, Any]):
"""アラートアクションの実行"""
actions = {
"scale_up": self._action_scale_up,
"immediate_intervention": self._action_immediate_intervention,
"performance_investigation": self._action_performance_investigation,
"rate_limiting": self._action_rate_limiting
}
if action in actions:
try:
await actions[action](alert_data)
print(f"Alert action executed: {action}")
except Exception as e:
print(f"Failed to execute alert action {action}: {e}")
async def _action_scale_up(self, alert_data: Dict[str, Any]):
"""スケールアップアクション"""
# Agent Managerに自動スケーリング指示
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.config['agent_manager_url']}/scale-up",
json={"reason": "high_error_rate_alert"}
)
async def _action_immediate_intervention(self, alert_data: Dict[str, Any]):
"""緊急対応アクション"""
# 運用チームに即座に通知
# 一時的な負荷制限の実装
async with aiohttp.ClientSession() as session:
await session.post(
f"{self.config['agent_manager_url']}/emergency-mode",
json={"alert_data": alert_data}
)
async def _cost_tracker(self):
"""コスト追跡"""
while True:
try:
# トークン使用量からコストを計算
hourly_costs = await self._calculate_hourly_costs()
# コスト予算の監視
await self._monitor_cost_budget(hourly_costs)
await asyncio.sleep(3600) # 1時間間隔
except Exception as e:
print(f"Cost tracker error: {e}")
await asyncio.sleep(3600)
async def _calculate_hourly_costs(self) -> Dict[str, float]:
"""時間当たりコストの計算"""
# モデルごとの料金テーブル
pricing = {
"gpt-5": {"input": 0.002, "output": 0.006}, # per 1k tokens
"claude-opus-4.1": {"input": 0.015, "output": 0.075},
"gemini-2.0-flash": {"input": 0.001, "output": 0.002}
}
costs = {}
# 各モデルのトークン使用量を取得してコスト計算
# 実装簡略化
return costs
# 設定とデプロイ用Kubernetes manifest
observability_config = """
apiVersion: v1
kind: ConfigMap
metadata:
name: observability-config
namespace: ai-agents
data:
config.yaml: |
jaeger_host: jaeger-collector.monitoring.svc.cluster.local
jaeger_port: 14268
prometheus_url: http://prometheus.monitoring.svc.cluster.local:9090
slack_webhook: "${{ secrets.SLACK_WEBHOOK_URL }}"
agent_manager_url: http://agent-manager.ai-agents.svc.cluster.local:8080
cost_budget:
daily_limit: 1000.0
monthly_limit: 25000.0
alert_channels:
- slack
- email
- pagerduty
"""
# 使用例
async def start_observability_system():
"""Observabilityシステムの開始"""
config = {
"jaeger_host": "jaeger-collector",
"jaeger_port": 14268,
"slack_webhook": "https://hooks.slack.com/services/...",
"agent_manager_url": "http://agent-manager:8080"
}
observability = AIAgentObservability(config)
await observability.start_monitoring()
if __name__ == "__main__":
asyncio.run(start_observability_system())
パフォーマンス最適化とスケーリング¶
1. 動的負荷分散システム¶
# performance/load_balancer.py
import asyncio
import random
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
import aiohttp
import time
class LoadBalancingStrategy(Enum):
ROUND_ROBIN = "round_robin"
LEAST_CONNECTIONS = "least_connections"
WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
AI_OPTIMIZED = "ai_optimized"
@dataclass
class AgentEndpoint:
id: str
url: str
weight: int
current_connections: int
average_response_time: float
success_rate: float
capacity: int
agent_type: str
class IntelligentLoadBalancer:
"""AI最適化ロードバランサー"""
def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.AI_OPTIMIZED):
self.strategy = strategy
self.endpoints: Dict[str, AgentEndpoint] = {}
self.request_history = []
self.round_robin_counter = 0
def register_endpoint(self, endpoint: AgentEndpoint):
"""エンドポイントの登録"""
self.endpoints[endpoint.id] = endpoint
print(f"Endpoint registered: {endpoint.id} ({endpoint.agent_type})")
async def select_endpoint(
self,
request_type: str,
estimated_complexity: float = 1.0
) -> Optional[AgentEndpoint]:
"""最適なエンドポイントの選択"""
available_endpoints = [
ep for ep in self.endpoints.values()
if ep.current_connections < ep.capacity
]
if not available_endpoints:
return None
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
return self._round_robin_select(available_endpoints)
elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
return self._least_connections_select(available_endpoints)
elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
return self._weighted_round_robin_select(available_endpoints)
elif self.strategy == LoadBalancingStrategy.AI_OPTIMIZED:
return await self._ai_optimized_select(
available_endpoints,
request_type,
estimated_complexity
)
def _round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
"""ラウンドロビン選択"""
selected = endpoints[self.round_robin_counter % len(endpoints)]
self.round_robin_counter += 1
return selected
def _least_connections_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
"""最少コネクション選択"""
return min(endpoints, key=lambda ep: ep.current_connections)
def _weighted_round_robin_select(self, endpoints: List[AgentEndpoint]) -> AgentEndpoint:
"""重み付きラウンドロビン選択"""
weighted_endpoints = []
for ep in endpoints:
weighted_endpoints.extend([ep] * ep.weight)
if weighted_endpoints:
return random.choice(weighted_endpoints)
return endpoints[0]
async def _ai_optimized_select(
self,
endpoints: List[AgentEndpoint],
request_type: str,
estimated_complexity: float
) -> AgentEndpoint:
"""AI最適化選択"""
# 各エンドポイントのスコアを計算
scores = []
for ep in endpoints:
# 基本スコア(成功率とレスポンス時間)
success_score = ep.success_rate
response_time_score = 1.0 / (1.0 + ep.average_response_time)
# 負荷スコア
load_score = 1.0 - (ep.current_connections / ep.capacity)
# 適合性スコア(リクエストタイプとエージェントタイプの相性)
compatibility_score = await self._calculate_compatibility(
request_type,
ep.agent_type
)
# 複雑度適合スコア
complexity_score = await self._calculate_complexity_fitness(
estimated_complexity,
ep
)
# 総合スコア
total_score = (
success_score * 0.3 +
response_time_score * 0.25 +
load_score * 0.2 +
compatibility_score * 0.15 +
complexity_score * 0.1
)
scores.append((ep, total_score))
# 最高スコアのエンドポイントを選択
return max(scores, key=lambda x: x[1])[0]
async def _calculate_compatibility(self, request_type: str, agent_type: str) -> float:
"""リクエストとエージェントの相性計算"""
compatibility_matrix = {
"code_generation": {
"code_generator": 1.0,
"security_scanner": 0.3,
"performance_tester": 0.2
},
"security_analysis": {
"security_scanner": 1.0,
"code_generator": 0.4,
"performance_tester": 0.1
},
"performance_test": {
"performance_tester": 1.0,
"code_generator": 0.3,
"security_scanner": 0.2
}
}
return compatibility_matrix.get(request_type, {}).get(agent_type, 0.5)
async def _calculate_complexity_fitness(
self,
estimated_complexity: float,
endpoint: AgentEndpoint
) -> float:
"""複雑度適合度の計算"""
# エンドポイントの処理能力と推定複雑度の適合度
if estimated_complexity <= 0.3: # 簡単なタスク
return 1.0 # どのエンドポイントでも対応可能
elif estimated_complexity <= 0.7: # 中程度のタスク
# 平均的な能力のエンドポイントが最適
return 1.0 - abs(endpoint.weight - 5) / 5.0
else: # 複雑なタスク
# 高性能エンドポイントが必要
return endpoint.weight / 10.0
async def execute_request(
self,
request_data: Dict[str, Any],
request_type: str,
estimated_complexity: float = 1.0
) -> Dict[str, Any]:
"""リクエストの実行"""
endpoint = await self.select_endpoint(request_type, estimated_complexity)
if not endpoint:
return {
"success": False,
"error": "No available endpoints",
"retry_after": 30
}
# 接続数を増加
endpoint.current_connections += 1
start_time = time.time()
try:
# 実際のリクエスト実行
result = await self._execute_on_endpoint(endpoint, request_data)
execution_time = time.time() - start_time
# 統計情報の更新
await self._update_endpoint_stats(endpoint, execution_time, True)
return {
"success": True,
"result": result,
"endpoint_id": endpoint.id,
"execution_time": execution_time
}
except Exception as e:
execution_time = time.time() - start_time
await self._update_endpoint_stats(endpoint, execution_time, False)
return {
"success": False,
"error": str(e),
"endpoint_id": endpoint.id,
"execution_time": execution_time
}
finally:
# 接続数を減少
endpoint.current_connections = max(0, endpoint.current_connections - 1)
async def _execute_on_endpoint(
self,
endpoint: AgentEndpoint,
request_data: Dict[str, Any]
) -> Dict[str, Any]:
"""エンドポイントでのリクエスト実行"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{endpoint.url}/execute",
json=request_data,
timeout=aiohttp.ClientTimeout(total=300)
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"HTTP {response.status}: {await response.text()}")
async def _update_endpoint_stats(
self,
endpoint: AgentEndpoint,
execution_time: float,
success: bool
):
"""エンドポイント統計の更新"""
# レスポンス時間の移動平均更新
alpha = 0.1 # 平滑化係数
endpoint.average_response_time = (
alpha * execution_time +
(1 - alpha) * endpoint.average_response_time
)
# 成功率の更新
self.request_history.append(success)
# 直近100件の成功率を計算
recent_history = self.request_history[-100:]
endpoint.success_rate = sum(recent_history) / len(recent_history)
async def health_check(self):
"""エンドポイントのヘルスチェック"""
while True:
try:
for endpoint in list(self.endpoints.values()):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{endpoint.url}/health",
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status != 200:
print(f"Endpoint {endpoint.id} health check failed")
# エンドポイントを一時的に無効化する処理
except Exception as e:
print(f"Health check error for {endpoint.id}: {e}")
await asyncio.sleep(30) # 30秒間隔でヘルスチェック
except Exception as e:
print(f"Health check loop error: {e}")
await asyncio.sleep(60)
# 使用例
async def example_load_balancer():
"""ロードバランサーの使用例"""
balancer = IntelligentLoadBalancer(LoadBalancingStrategy.AI_OPTIMIZED)
# エンドポイントの登録
endpoints = [
AgentEndpoint(
id="agent-1",
url="http://agent-1:8080",
weight=8,
current_connections=0,
average_response_time=2.5,
success_rate=0.95,
capacity=10,
agent_type="code_generator"
),
AgentEndpoint(
id="agent-2",
url="http://agent-2:8080",
weight=6,
current_connections=0,
average_response_time=3.1,
success_rate=0.92,
capacity=8,
agent_type="security_scanner"
)
]
for endpoint in endpoints:
balancer.register_endpoint(endpoint)
# ヘルスチェック開始
health_task = asyncio.create_task(balancer.health_check())
# リクエスト実行例
request_result = await balancer.execute_request(
request_data={"task": "generate_function", "language": "python"},
request_type="code_generation",
estimated_complexity=0.5
)
print(f"Request result: {request_result}")
if __name__ == "__main__":
asyncio.run(example_load_balancer())
2. コスト最適化システム¶
# cost/optimization_engine.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Tuple
from dataclasses import dataclass
import numpy as np
@dataclass
class CostMetrics:
model_name: str
input_tokens: int
output_tokens: int
request_count: int
total_cost: float
timestamp: datetime
class CostOptimizationEngine:
"""コスト最適化エンジン"""
def __init__(self, budget_config: Dict[str, Any]):
self.budget_config = budget_config
self.cost_history: List[CostMetrics] = []
# モデル料金テーブル(per 1K tokens)
self.pricing = {
"gpt-5": {"input": 0.002, "output": 0.006},
"gpt-4o": {"input": 0.0015, "output": 0.002},
"claude-opus-4.1": {"input": 0.015, "output": 0.075},
"claude-sonnet-3.5": {"input": 0.003, "output": 0.015},
"gemini-2.0-flash": {"input": 0.001, "output": 0.002}
}
# モデル性能テーブル(相対的品質スコア)
self.model_performance = {
"gpt-5": {"quality": 0.95, "speed": 0.8},
"gpt-4o": {"quality": 0.9, "speed": 0.9},
"claude-opus-4.1": {"quality": 0.92, "speed": 0.7},
"claude-sonnet-3.5": {"quality": 0.88, "speed": 0.85},
"gemini-2.0-flash": {"quality": 0.85, "speed": 0.95}
}
async def optimize_model_selection(
self,
task_requirements: Dict[str, Any]
) -> Dict[str, Any]:
"""タスク要件に基づく最適モデル選択"""
complexity = task_requirements.get("complexity", 0.5)
quality_priority = task_requirements.get("quality_priority", 0.7)
speed_priority = task_requirements.get("speed_priority", 0.3)
budget_limit = task_requirements.get("budget_limit")
model_scores = {}
for model, performance in self.model_performance.items():
# 推定コスト計算
estimated_tokens = self._estimate_token_usage(task_requirements, model)
estimated_cost = self._calculate_cost(model, estimated_tokens)
# 予算制約チェック
if budget_limit and estimated_cost > budget_limit:
continue
# スコア計算
quality_score = performance["quality"]
speed_score = performance["speed"]
cost_score = 1.0 / (1.0 + estimated_cost) # コストが低いほど高スコア
# 複雑度に基づく品質要件調整
quality_weight = quality_priority * (0.5 + complexity)
total_score = (
quality_score * quality_weight +
speed_score * speed_priority +
cost_score * (1.0 - quality_priority - speed_priority)
)
model_scores[model] = {
"score": total_score,
"estimated_cost": estimated_cost,
"estimated_tokens": estimated_tokens,
"quality_score": quality_score,
"speed_score": speed_score
}
if not model_scores:
return {"error": "No models meet budget constraints"}
# 最高スコアのモデルを選択
best_model = max(model_scores.items(), key=lambda x: x[1]["score"])
return {
"recommended_model": best_model[0],
"reasoning": best_model[1],
"alternatives": dict(sorted(
model_scores.items(),
key=lambda x: x[1]["score"],
reverse=True
))
}
def _estimate_token_usage(
self,
task_requirements: Dict[str, Any],
model: str
) -> Dict[str, int]:
"""トークン使用量の推定"""
complexity = task_requirements.get("complexity", 0.5)
task_type = task_requirements.get("type", "general")
# タスクタイプ別基本トークン数
base_tokens = {
"code_generation": {"input": 500, "output": 2000},
"security_analysis": {"input": 1000, "output": 1500},
"performance_testing": {"input": 800, "output": 1200},
"general": {"input": 300, "output": 800}
}
base = base_tokens.get(task_type, base_tokens["general"])
# 複雑度による調整
complexity_multiplier = 0.5 + complexity * 1.5
# モデル特性による調整
model_efficiency = {
"gpt-5": 1.0,
"gpt-4o": 1.1,
"claude-opus-4.1": 0.9,
"claude-sonnet-3.5": 1.0,
"gemini-2.0-flash": 1.2
}
efficiency = model_efficiency.get(model, 1.0)
return {
"input": int(base["input"] * complexity_multiplier * efficiency),
"output": int(base["output"] * complexity_multiplier * efficiency)
}
def _calculate_cost(self, model: str, token_usage: Dict[str, int]) -> float:
"""コスト計算"""
if model not in self.pricing:
return 0.0
pricing = self.pricing[model]
input_cost = (token_usage["input"] / 1000) * pricing["input"]
output_cost = (token_usage["output"] / 1000) * pricing["output"]
return input_cost + output_cost
async def analyze_spending_patterns(self) -> Dict[str, Any]:
"""支出パターンの分析"""
if not self.cost_history:
return {"error": "No cost history available"}
# 時系列データの準備
daily_costs = {}
model_costs = {}
for metric in self.cost_history:
date_key = metric.timestamp.date().isoformat()
if date_key not in daily_costs:
daily_costs[date_key] = 0.0
daily_costs[date_key] += metric.total_cost
if metric.model_name not in model_costs:
model_costs[metric.model_name] = 0.0
model_costs[metric.model_name] += metric.total_cost
# 傾向分析
dates = sorted(daily_costs.keys())
costs = [daily_costs[date] for date in dates]
if len(costs) >= 7:
# 週次傾向
week_trend = np.polyfit(range(len(costs[-7:])), costs[-7:], 1)[0]
else:
week_trend = 0.0
# 予算消化率
total_spent = sum(costs)
monthly_budget = self.budget_config.get("monthly_limit", 10000)
burn_rate = total_spent / monthly_budget
# 最適化提案の生成
optimization_suggestions = await self._generate_optimization_suggestions(
model_costs, total_spent, week_trend
)
return {
"total_spent": total_spent,
"daily_average": np.mean(costs) if costs else 0.0,
"weekly_trend": week_trend,
"budget_burn_rate": burn_rate,
"model_breakdown": model_costs,
"optimization_suggestions": optimization_suggestions,
"projected_monthly_cost": total_spent + (week_trend * 4)
}
async def _generate_optimization_suggestions(
self,
model_costs: Dict[str, float],
total_spent: float,
trend: float
) -> List[Dict[str, Any]]:
"""最適化提案の生成"""
suggestions = []
# 高コストモデルの分析
if model_costs:
most_expensive = max(model_costs.items(), key=lambda x: x[1])
if most_expensive[1] > total_spent * 0.4: # 40%以上を占める
suggestions.append({
"type": "model_substitution",
"priority": "high",
"description": f"{most_expensive[0]}の使用頻度が高すぎます",
"action": "より安価な代替モデルの検討",
"potential_savings": most_expensive[1] * 0.3
})
# 増加傾向の警告
if trend > 0:
suggestions.append({
"type": "spending_trend",
"priority": "medium",
"description": f"支出が週次で${trend:.2f}増加中",
"action": "使用量の監視強化",
"potential_savings": trend * 4 # 月次予測
})
# 予算最適化
monthly_budget = self.budget_config.get("monthly_limit", 10000)
if total_spent > monthly_budget * 0.8:
suggestions.append({
"type": "budget_warning",
"priority": "critical",
"description": "月次予算の80%を超過",
"action": "緊急コスト削減措置",
"potential_savings": total_spent - monthly_budget * 0.7
})
return suggestions
async def implement_cost_controls(
self,
control_config: Dict[str, Any]
) -> Dict[str, Any]:
"""コスト制御の実装"""
implemented_controls = []
# レート制限の設定
if control_config.get("enable_rate_limiting"):
rate_limits = {
"requests_per_hour": control_config.get("max_requests_per_hour", 1000),
"tokens_per_hour": control_config.get("max_tokens_per_hour", 100000),
"cost_per_hour": control_config.get("max_cost_per_hour", 100.0)
}
implemented_controls.append({
"type": "rate_limiting",
"config": rate_limits
})
# 自動モデル降格
if control_config.get("enable_auto_downgrade"):
downgrade_rules = {
"cost_threshold": control_config.get("downgrade_cost_threshold", 0.1),
"model_mapping": {
"gpt-5": "gpt-4o",
"claude-opus-4.1": "claude-sonnet-3.5"
}
}
implemented_controls.append({
"type": "auto_downgrade",
"config": downgrade_rules
})
# 予算アラート
if control_config.get("enable_budget_alerts"):
alert_thresholds = [0.5, 0.8, 0.9, 1.0] # 50%, 80%, 90%, 100%
implemented_controls.append({
"type": "budget_alerts",
"config": {"thresholds": alert_thresholds}
})
return {
"success": True,
"implemented_controls": implemented_controls,
"effective_date": datetime.now().isoformat()
}
# 使用例
async def demonstrate_cost_optimization():
"""コスト最適化の使用例"""
budget_config = {
"daily_limit": 500.0,
"monthly_limit": 15000.0,
"alert_thresholds": [0.5, 0.8, 0.9]
}
optimizer = CostOptimizationEngine(budget_config)
# タスク最適化例
task_requirements = {
"type": "code_generation",
"complexity": 0.7,
"quality_priority": 0.8,
"speed_priority": 0.2,
"budget_limit": 0.5
}
optimization_result = await optimizer.optimize_model_selection(task_requirements)
print(f"Optimization result: {optimization_result}")
# 支出分析例
spending_analysis = await optimizer.analyze_spending_patterns()
print(f"Spending analysis: {spending_analysis}")
if __name__ == "__main__":
asyncio.run(demonstrate_cost_optimization())
デプロイメント自動化¶
1. Kubernetes Helm Chart¶
# helm/ai-agents/Chart.yaml
apiVersion: v2
name: ai-agents
description: Enterprise AI Agent Platform
type: application
version: 1.0.0
appVersion: "1.0.0"
---
# helm/ai-agents/values.yaml
global:
imageRegistry: "your-registry.com"
imagePullSecrets:
- regcred
agentManager:
replicaCount: 3
image:
repository: ai-agent-manager
tag: "latest"
pullPolicy: IfNotPresent
service:
type: ClusterIP
port: 8080
resources:
limits:
cpu: 1000m
memory: 2Gi
requests:
cpu: 500m
memory: 1Gi
autoscaling:
enabled: true
minReplicas: 3
maxReplicas: 10
targetCPUUtilizationPercentage: 70
loadBalancer:
replicaCount: 2
image:
repository: ai-load-balancer
tag: "latest"
service:
type: LoadBalancer
port: 80
targetPort: 8080
agents:
codeGenerator:
enabled: true
replicaCount: 2
image:
repository: ai-code-generator
tag: "latest"
resources:
limits:
cpu: 2000m
memory: 4Gi
requests:
cpu: 1000m
memory: 2Gi
securityScanner:
enabled: true
replicaCount: 1
image:
repository: ai-security-scanner
tag: "latest"
resources:
limits:
cpu: 1000m
memory: 2Gi
requests:
cpu: 500m
memory: 1Gi
redis:
enabled: true
architecture: replication
auth:
enabled: true
password: "${{ secrets.REDIS_PASSWORD }}"
monitoring:
prometheus:
enabled: true
grafana:
enabled: true
jaeger:
enabled: true
security:
networkPolicies:
enabled: true
podSecurityPolicy:
enabled: true
serviceAccount:
create: true
name: ai-agent-sa
---
# helm/ai-agents/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "ai-agents.fullname" . }}-manager
labels:
{{- include "ai-agents.labels" . | nindent 4 }}
component: manager
spec:
replicas: {{ .Values.agentManager.replicaCount }}
selector:
matchLabels:
{{- include "ai-agents.selectorLabels" . | nindent 6 }}
component: manager
template:
metadata:
labels:
{{- include "ai-agents.selectorLabels" . | nindent 8 }}
component: manager
spec:
serviceAccountName: {{ include "ai-agents.serviceAccountName" . }}
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 2000
containers:
- name: manager
image: "{{ .Values.global.imageRegistry }}/{{ .Values.agentManager.image.repository }}:{{ .Values.agentManager.image.tag }}"
imagePullPolicy: {{ .Values.agentManager.image.pullPolicy }}
ports:
- name: http
containerPort: 8080
protocol: TCP
- name: metrics
containerPort: 9090
protocol: TCP
livenessProbe:
httpGet:
path: /health
port: http
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: http
initialDelaySeconds: 5
periodSeconds: 5
env:
- name: REDIS_URL
value: "redis://{{ include "ai-agents.redis.fullname" . }}:6379"
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: {{ include "ai-agents.redis.secretName" . }}
key: redis-password
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
resources:
{{- toYaml .Values.agentManager.resources | nindent 10 }}
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
readOnlyRootFilesystem: true
---
# helm/ai-agents/templates/hpa.yaml
{{- if .Values.agentManager.autoscaling.enabled }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "ai-agents.fullname" . }}-manager-hpa
labels:
{{- include "ai-agents.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "ai-agents.fullname" . }}-manager
minReplicas: {{ .Values.agentManager.autoscaling.minReplicas }}
maxReplicas: {{ .Values.agentManager.autoscaling.maxReplicas }}
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: {{ .Values.agentManager.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: ai_agent_queue_length
target:
type: AverageValue
averageValue: "10"
{{- end }}
---
# helm/ai-agents/templates/networkpolicy.yaml
{{- if .Values.security.networkPolicies.enabled }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: {{ include "ai-agents.fullname" . }}-network-policy
labels:
{{- include "ai-agents.labels" . | nindent 4 }}
spec:
podSelector:
matchLabels:
{{- include "ai-agents.selectorLabels" . | nindent 6 }}
policyTypes:
- Ingress
- Egress
ingress:
- from:
- podSelector:
matchLabels:
{{- include "ai-agents.selectorLabels" . | nindent 10 }}
- namespaceSelector:
matchLabels:
name: monitoring
ports:
- protocol: TCP
port: 8080
- protocol: TCP
port: 9090
egress:
- to:
- podSelector:
matchLabels:
app.kubernetes.io/name: redis
ports:
- protocol: TCP
port: 6379
- to: []
ports:
- protocol: TCP
port: 443 # HTTPS for external API calls
- protocol: TCP
port: 53 # DNS
- protocol: UDP
port: 53 # DNS
{{- end }}
2. CI/CD パイプライン¶
# .github/workflows/ai-agents-cicd.yml
name: AI Agents CI/CD Pipeline
on:
push:
branches: [main, develop]
paths:
- 'ai-agents/**'
- 'helm/ai-agents/**'
- '.github/workflows/ai-agents-cicd.yml'
pull_request:
branches: [main]
paths:
- 'ai-agents/**'
- 'helm/ai-agents/**'
env:
REGISTRY: ghcr.io
IMAGE_PREFIX: ${{ github.repository }}/ai-agents
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Run Trivy vulnerability scanner
uses: aquasecurity/trivy-action@master
with:
scan-type: 'fs'
scan-ref: './ai-agents'
format: 'sarif'
output: 'trivy-results.sarif'
- name: Upload Trivy scan results
uses: github/codeql-action/upload-sarif@v2
with:
sarif_file: 'trivy-results.sarif'
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.11', '3.12']
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
cd ai-agents
pip install -r requirements.txt
pip install pytest pytest-cov pytest-asyncio
- name: Run unit tests
run: |
cd ai-agents
pytest tests/ --cov=src --cov-report=xml --cov-report=html
- name: Upload coverage reports
uses: codecov/codecov-action@v3
with:
file: ./ai-agents/coverage.xml
flags: unittests
name: codecov-umbrella
build:
needs: [security-scan, test]
runs-on: ubuntu-latest
strategy:
matrix:
component: [manager, load-balancer, code-generator, security-scanner]
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=sha,prefix=${{ github.ref_name }}-
type=raw,value=latest,enable=${{ github.ref == 'refs/heads/main' }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./ai-agents/${{ matrix.component }}
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
- name: Sign container image
env:
COSIGN_EXPERIMENTAL: 1
run: |
cosign sign --yes ${{ env.REGISTRY }}/${{ env.IMAGE_PREFIX }}/${{ matrix.component }}:${{ github.sha }}
helm-test:
needs: build
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Helm
uses: azure/setup-helm@v3
with:
version: 'v3.12.0'
- name: Lint Helm chart
run: |
helm lint helm/ai-agents
- name: Template Helm chart
run: |
helm template ai-agents helm/ai-agents \
--values helm/ai-agents/values.yaml \
--output-dir /tmp/helm-output
- name: Validate Kubernetes manifests
run: |
kubectl --dry-run=client apply -f /tmp/helm-output/ai-agents/templates/
deploy-staging:
if: github.ref == 'refs/heads/develop'
needs: [build, helm-test]
runs-on: ubuntu-latest
environment: staging
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2
- name: Update kubeconfig
run: |
aws eks update-kubeconfig --name ai-agents-staging
- name: Deploy to staging
run: |
helm upgrade --install ai-agents-staging helm/ai-agents \
--namespace ai-agents-staging \
--create-namespace \
--values helm/ai-agents/values-staging.yaml \
--set global.imageTag=${{ github.sha }} \
--wait --timeout=600s
- name: Run integration tests
run: |
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=ai-agents \
-n ai-agents-staging --timeout=300s
./scripts/integration-tests.sh staging
deploy-production:
if: github.ref == 'refs/heads/main'
needs: [build, helm-test]
runs-on: ubuntu-latest
environment: production
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID_PROD }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY_PROD }}
aws-region: us-west-2
- name: Update kubeconfig
run: |
aws eks update-kubeconfig --name ai-agents-production
- name: Blue-Green Deployment
run: |
# 現在のバージョンを確認
CURRENT_VERSION=$(helm get values ai-agents-prod -n ai-agents-prod -o json | jq -r '.global.imageTag')
# 新バージョンをblue環境にデプロイ
helm upgrade --install ai-agents-blue helm/ai-agents \
--namespace ai-agents-blue \
--create-namespace \
--values helm/ai-agents/values-production.yaml \
--set global.imageTag=${{ github.sha }} \
--set service.type=ClusterIP \
--wait --timeout=600s
- name: Health check and smoke tests
run: |
./scripts/health-check.sh blue
./scripts/smoke-tests.sh blue
- name: Switch traffic to blue
run: |
# Istio VirtualService を更新してトラフィックを切り替え
kubectl patch virtualservice ai-agents-vs -n ai-agents-prod \
--type='json' \
-p='[{"op": "replace", "path": "/spec/http/0/route/0/destination/subset", "value": "blue"}]'
- name: Monitor deployment
run: |
./scripts/monitor-deployment.sh 300 # 5分間監視
- name: Cleanup old version
if: success()
run: |
helm uninstall ai-agents-green -n ai-agents-green || true
notify:
if: always()
needs: [deploy-staging, deploy-production]
runs-on: ubuntu-latest
steps:
- name: Notify Slack
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
channel: '#ai-agents-deployments'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
運用手順書とベストプラクティス¶
1. 運用チェックリスト¶
# AIエージェント運用チェックリスト
## 日次運用チェック
### システムヘルス確認
- [ ] 全エージェントの稼働状況確認
- [ ] レスポンス時間監視(目標: 95%ile < 5秒)
- [ ] エラー率監視(目標: < 1%)
- [ ] リソース使用率確認(CPU < 80%, Memory < 85%)
### コスト監視
- [ ] 日次コスト確認
- [ ] 予算消化率チェック
- [ ] 異常な使用量の調査
### セキュリティ確認
- [ ] 不審なアクセスログの確認
- [ ] 認証失敗回数の監視
- [ ] シークレットローテーション状況
## 週次運用チェック
### パフォーマンス分析
- [ ] 週次パフォーマンスレポート確認
- [ ] スケーリング履歴の分析
- [ ] ボトルネック特定と改善計画
### 容量計画
- [ ] リソース使用傾向の分析
- [ ] 来月の容量予測
- [ ] スケーリング閾値の調整
### 更新管理
- [ ] セキュリティパッチの適用
- [ ] 依存関係の更新確認
- [ ] 設定変更の反映
## 月次運用チェック
### 包括的レビュー
- [ ] 月次運用レポート作成
- [ ] SLA達成状況確認
- [ ] インシデント分析とアクション項目
- [ ] 運用プロセス改善点の特定
### 災害復旧テスト
- [ ] バックアップ復旧テスト
- [ ] DR環境での動作確認
- [ ] 復旧手順書の更新
2. トラブルシューティングガイド¶
# troubleshooting/diagnostic_tools.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any
import aiohttp
import subprocess
class DiagnosticTools:
"""AIエージェント診断ツール"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.checks = {
"connectivity": self._check_connectivity,
"performance": self._check_performance,
"resources": self._check_resources,
"security": self._check_security,
"costs": self._check_costs
}
async def run_full_diagnostic(self) -> Dict[str, Any]:
"""完全診断の実行"""
print("Starting comprehensive diagnostic...")
results = {}
for check_name, check_func in self.checks.items():
try:
print(f"Running {check_name} check...")
results[check_name] = await check_func()
print(f"✅ {check_name} check completed")
except Exception as e:
print(f"❌ {check_name} check failed: {e}")
results[check_name] = {"error": str(e)}
# 総合評価
overall_health = self._calculate_overall_health(results)
return {
"timestamp": datetime.now().isoformat(),
"overall_health": overall_health,
"detailed_results": results,
"recommendations": self._generate_recommendations(results)
}
async def _check_connectivity(self) -> Dict[str, Any]:
"""接続性チェック"""
endpoints = [
{"name": "Agent Manager", "url": f"{self.config['agent_manager_url']}/health"},
{"name": "Redis", "url": f"{self.config['redis_url']}/ping"},
{"name": "Prometheus", "url": f"{self.config['prometheus_url']}/api/v1/targets"}
]
results = []
for endpoint in endpoints:
try:
async with aiohttp.ClientSession() as session:
start_time = datetime.now()
async with session.get(endpoint["url"], timeout=10) as response:
response_time = (datetime.now() - start_time).total_seconds()
results.append({
"endpoint": endpoint["name"],
"status": "healthy" if response.status == 200 else "unhealthy",
"response_time": response_time,
"status_code": response.status
})
except Exception as e:
results.append({
"endpoint": endpoint["name"],
"status": "error",
"error": str(e)
})
return {
"status": "healthy" if all(r["status"] == "healthy" for r in results) else "degraded",
"endpoints": results
}
async def _check_performance(self) -> Dict[str, Any]:
"""パフォーマンスチェック"""
# Prometheusからメトリクスを取得
metrics_queries = {
"avg_response_time": "avg(ai_agent_response_time_seconds)",
"error_rate": "rate(ai_agent_requests_total{status='error'}[5m])",
"throughput": "rate(ai_agent_requests_total[5m])",
"active_agents": "count(up{job='ai-agents'})"
}
results = {}
async with aiohttp.ClientSession() as session:
for metric_name, query in metrics_queries.items():
try:
async with session.get(
f"{self.config['prometheus_url']}/api/v1/query",
params={"query": query}
) as response:
data = await response.json()
if data["data"]["result"]:
value = float(data["data"]["result"][0]["value"][1])
results[metric_name] = value
else:
results[metric_name] = None
except Exception as e:
results[f"{metric_name}_error"] = str(e)
# パフォーマンス評価
performance_status = "healthy"
if results.get("avg_response_time", 0) > 10:
performance_status = "degraded"
if results.get("error_rate", 0) > 0.05:
performance_status = "unhealthy"
return {
"status": performance_status,
"metrics": results,
"thresholds": {
"max_response_time": 10.0,
"max_error_rate": 0.05
}
}
async def _check_resources(self) -> Dict[str, Any]:
"""リソースチェック"""
# Kubernetesリソース使用量の確認
try:
# kubectl top pods の実行
result = subprocess.run(
["kubectl", "top", "pods", "-n", "ai-agents", "--no-headers"],
capture_output=True,
text=True,
check=True
)
pod_resources = []
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split()
pod_resources.append({
"pod": parts[0],
"cpu": parts[1],
"memory": parts[2]
})
# ノードリソースの確認
node_result = subprocess.run(
["kubectl", "top", "nodes", "--no-headers"],
capture_output=True,
text=True,
check=True
)
node_resources = []
for line in node_result.stdout.strip().split('\n'):
if line:
parts = line.split()
node_resources.append({
"node": parts[0],
"cpu": parts[1],
"cpu_percent": parts[2],
"memory": parts[3],
"memory_percent": parts[4]
})
return {
"status": "healthy",
"pods": pod_resources,
"nodes": node_resources
}
except subprocess.CalledProcessError as e:
return {
"status": "error",
"error": f"Failed to get resource information: {e}"
}
async def _check_security(self) -> Dict[str, Any]:
"""セキュリティチェック"""
security_checks = []
# シークレットの有効期限チェック
try:
result = subprocess.run(
["kubectl", "get", "secrets", "-n", "ai-agents", "-o", "json"],
capture_output=True,
text=True,
check=True
)
secrets_data = json.loads(result.stdout)
for secret in secrets_data["items"]:
creation_time = datetime.fromisoformat(
secret["metadata"]["creationTimestamp"].replace('Z', '+00:00')
)
age_days = (datetime.now(creation_time.tzinfo) - creation_time).days
status = "healthy"
if age_days > 90: # 90日以上古い
status = "warning"
if age_days > 180: # 180日以上古い
status = "critical"
security_checks.append({
"type": "secret_age",
"name": secret["metadata"]["name"],
"age_days": age_days,
"status": status
})
except Exception as e:
security_checks.append({
"type": "secret_check_error",
"error": str(e)
})
# ネットワークポリシーの確認
try:
result = subprocess.run(
["kubectl", "get", "networkpolicies", "-n", "ai-agents"],
capture_output=True,
text=True,
check=True
)
if "No resources found" in result.stdout:
security_checks.append({
"type": "network_policy",
"status": "warning",
"message": "No network policies found"
})
else:
security_checks.append({
"type": "network_policy",
"status": "healthy",
"message": "Network policies configured"
})
except Exception as e:
security_checks.append({
"type": "network_policy_error",
"error": str(e)
})
overall_status = "healthy"
if any(check.get("status") == "warning" for check in security_checks):
overall_status = "warning"
if any(check.get("status") == "critical" for check in security_checks):
overall_status = "critical"
return {
"status": overall_status,
"checks": security_checks
}
async def _check_costs(self) -> Dict[str, Any]:
"""コストチェック"""
# コスト予算との比較(簡略化)
daily_budget = self.config.get("daily_budget", 500.0)
current_spend = await self._get_current_daily_spend()
budget_utilization = current_spend / daily_budget
status = "healthy"
if budget_utilization > 0.8:
status = "warning"
if budget_utilization > 1.0:
status = "critical"
return {
"status": status,
"daily_budget": daily_budget,
"current_spend": current_spend,
"budget_utilization": budget_utilization,
"projected_monthly": current_spend * 30
}
async def _get_current_daily_spend(self) -> float:
"""現在の日次支出を取得"""
# 実際の実装では請求APIやメトリクスから取得
return 423.50 # ダミー値
def _calculate_overall_health(self, results: Dict[str, Any]) -> str:
"""総合ヘルススコアの計算"""
statuses = []
for check_result in results.values():
if isinstance(check_result, dict) and "status" in check_result:
statuses.append(check_result["status"])
if "error" in statuses or "critical" in statuses:
return "critical"
elif "unhealthy" in statuses:
return "unhealthy"
elif "warning" in statuses or "degraded" in statuses:
return "warning"
else:
return "healthy"
def _generate_recommendations(self, results: Dict[str, Any]) -> List[str]:
"""推奨アクションの生成"""
recommendations = []
# パフォーマンス関連
perf_result = results.get("performance", {})
if perf_result.get("status") == "degraded":
recommendations.append("パフォーマンス改善のためのスケールアップを検討してください")
# セキュリティ関連
security_result = results.get("security", {})
if security_result.get("status") in ["warning", "critical"]:
recommendations.append("シークレットのローテーションとセキュリティ設定の確認を行ってください")
# コスト関連
cost_result = results.get("costs", {})
if cost_result.get("status") in ["warning", "critical"]:
recommendations.append("コスト最適化のためのモデル選択やレート制限の見直しを行ってください")
return recommendations
# 使用例とコマンドラインツール
async def main():
"""診断ツールのメイン実行"""
config = {
"agent_manager_url": "http://agent-manager.ai-agents.svc.cluster.local:8080",
"redis_url": "redis://redis.ai-agents.svc.cluster.local:6379",
"prometheus_url": "http://prometheus.monitoring.svc.cluster.local:9090",
"daily_budget": 500.0
}
diagnostic = DiagnosticTools(config)
results = await diagnostic.run_full_diagnostic()
print(json.dumps(results, indent=2))
if __name__ == "__main__":
asyncio.run(main())
まとめ¶
本記事では、2025年8月最新のAIエージェント開発ツールの本番環境での運用に必要な包括的なアーキテクチャとベストプラクティスを詳しく解説しました。
重要な実装ポイント¶
- エンタープライズアーキテクチャ: スケーラブルで信頼性の高いシステム設計
- ゼロトラストセキュリティ: 多層防御による強固なセキュリティ確保
- 包括的監視: Observabilityによる運用状況の完全な可視化
- コスト最適化: 自動化されたコスト管理と予算制御
段階的導入計画¶
- Phase 1: 基本アーキテクチャの構築とセキュリティ設定
- Phase 2: 監視システムの導入と自動化の実装
- Phase 3: 本格運用とコスト最適化の継続改善
運用成功の鍵¶
- 継続的監視: システム状態の常時監視と迅速な対応
- 自動化重視: 人的ミスを排除する包括的な自動化
- セキュリティファースト: セキュリティを最優先とした設計
- コスト意識: 常にコスト効率を意識した運用
AIエージェントの本番運用は、適切なアーキテクチャと運用プロセスにより、確実に企業価値を向上させることができます。