Skip to content

Claude Code Complete Guide

Claude Code Subagent & MCP Implementation Practical Guide


title: "Claude Code Subagent & MCP Implementation Practical Guide" description: "An implementation guide for leveraging Claude Code's subagent functionality and MCP integration in real projects. Complete tutorial with copy-paste code examples and troubleshooting." tags: ["Claude Code", "MCP", "Subagent", "AI Development", "Implementation Guide"] category: "🤖 AI Development & Automation"


Key Points

  • Specialized Agent Utilization

    Improve development efficiency with subagents specialized for specific tasks

  • External System Integration

    Automate API, database, and tool integration via MCP protocol

  • Implementation Templates

    Practical copy-paste code examples and best practices

  • Production Ready

    From troubleshooting to production environment deployment

📖 Subagent & MCP Overview

What Are Subagents?

Subagents are specialized AI assistants running in their own context window with custom system prompts, specific tool access, and independent permissions. They are invoked from the main agent through the Task tool.

Built-in Subagent Types

Claude Code provides six built-in subagent types.

graph TB
    A[Main Agent] --> B[Task Tool]
    B --> C[Explore]
    B --> D[Plan]
    B --> E[general-purpose]
    B --> F[Bash]
    B --> G[statusline-setup]
    B --> H[Claude Code Guide]
    C --> C1[Fast File Discovery & Code Search]
    D --> D1[Research & Planning]
    E --> E1[Complex Multi-step Tasks]
    F --> F1[Terminal Command Execution]
    G --> G1[Status Line Configuration]
    H --> H1[Claude Code Usage Q&A]
SubagentModelAccessPurpose
ExploreHaiku (fast)Read-only (Read, Glob, Grep, etc.)File discovery and code search. Supports thoroughness parameter: quick / medium / very thorough
PlanInherited (parent model)Read-only toolsResearch agent for plan mode. Design and implementation planning
general-purposeInherited (parent model)Full tool accessComplex multi-step task execution. All operations including file editing
BashInherited (parent model)Terminal operationsCommand execution in separate context
statusline-setupSonnetLimitedFor /statusline command configuration
Claude Code GuideHaiku (fast)Read-onlyAnswering questions about Claude Code usage

Custom Subagent Definition

You can define project-specific custom subagents by creating Markdown files in the .claude/agents/ directory.

---
name: code-reviewer
description: Reviews code for security and best practices
tools:
  - Read
  - Grep
  - Glob
model: sonnet
permissionMode: plan
maxTurns: 10
---

# Code Reviewer Agent

Detect security vulnerabilities, performance issues, and best practice violations.
Report review results in the following format:
- Severity (Critical / Warning / Info)
- Affected file and line number
- Problem description and suggested fix

Custom Subagent Frontmatter Fields

FieldDescriptionExample Values
nameAgent identifiercode-reviewer
descriptionAgent descriptionCode review specialist agent
toolsList of available tools[Read, Grep, Glob, Bash]
disallowedToolsProhibited tools[Write, Edit]
modelModel to usesonnet / opus / haiku / inherit
permissionModePermission modeplan / full
maxTurnsMaximum number of turns10
skillsAvailable skillsList of skill names
mcpServersAvailable MCP serversList of server names
hooksAgent-specific hooksHook definitions
memoryAdditional context informationMemory settings

Subagent Constraints

Important Constraints

  • Subagents cannot spawn other subagents (no nesting)
  • Background subagents automatically deny permission requests that are not pre-approved
  • MCP tools are not available in background subagents
  • Disabling subagents: Add Task(agent-name) to permissions.deny to disable specific subagents

The Role of MCP (Model Context Protocol)

MCP is a standardized communication protocol between Claude Code and external tools.

graph LR
    A[Claude Code] --> B[MCP Protocol]
    B --> C[Database MCP]
    B --> D[API MCP]
    B --> E[File System MCP]
    C --> F[PostgreSQL/MySQL]
    D --> G[REST/GraphQL API]
    E --> H[Local/Remote Files]

Transport Types

MCP provides three transport types.

TypeUse CaseNotes
HTTPRemote server connections (recommended)Recommended standard for connecting to remote MCP servers
SSE (Server-Sent Events)Remote server connectionsDeprecated. Migration to HTTP is recommended
stdioLocal process communicationFor locally running MCP servers. Communicates via standard I/O

MCP Scopes

MCP server configurations can be managed at three scopes.

ScopeConfiguration FileApplies To
local (default).claude/mcp.json (project-local Claude config)Current project only (in this working copy)
project.mcp.json (project root)Shared across the project (checked into version control)
user~/.claude/mcp.json (global)Available across all projects for this user

Local configs in .claude/mcp.json override user-level defaults, while project-level .mcp.json provides a shared configuration for all contributors to the project.

Managed MCP (Enterprise)

In enterprise environments, you can deploy managed-mcp.json to system directories to manage organization-wide MCP servers. Use allowedMcpServers / deniedMcpServers policies to control which servers are available.

Output Limits

  • A warning is displayed when MCP tool output exceeds 10,000 tokens
  • Default maximum output is 25,000 tokens (configurable via the MAX_MCP_OUTPUT_TOKENS environment variable)

When the number of MCP tools exceeds 10% of context, the tool search feature is automatically enabled. To enable manually, set the ENABLE_TOOL_SEARCH environment variable.

🚀 Beginner: Basic Setup

1. Basic Subagent Usage

Let's start with the simplest subagent invocation.

# basic_subagent_example.py
"""
Basic subagent usage example
Invoking a subagent using the Task tool within Claude Code
"""

# Usage example within Claude Code (meta-code)
task_result = claude_code.use_tool("Task", {
    "description": "Execute code search",
    "prompt": "Search all Python files in the project for logging configuration and propose a unified logging strategy",
    "subagent_type": "general-purpose"
})

print(f"Task result: {task_result}")

2. Basic MCP Configuration

First, let's create a simple MCP server. Configuration is written in .mcp.json (project scope) or ~/.claude.json (user scope).

.mcp.json supports environment variable expansion using ${VAR} and ${VAR:-default} syntax.

{
  "mcpServers": {
    "simple-file-server": {
      "command": "python",
      "args": ["${PROJECT_ROOT:-/path/to}/simple_mcp_server.py"],
      "env": {
        "WORKSPACE_PATH": "${WORKSPACE_PATH:-.}"
      }
    }
  }
}
# simple_mcp_server.py
"""
Simple file operation MCP server
Basic implementation example for beginners
"""
import json
import sys
import os
from typing import Dict, Any, List

class SimpleMCPServer:
    def __init__(self):
        self.workspace_path = os.getenv('WORKSPACE_PATH', '.')

    def list_files(self, directory: str = ".") -> List[str]:
        """Get file list in specified directory"""
        try:
            full_path = os.path.join(self.workspace_path, directory)
            return os.listdir(full_path)
        except Exception as e:
            return [f"Error: {str(e)}"]

    def read_file(self, file_path: str) -> str:
        """Read file contents"""
        try:
            full_path = os.path.join(self.workspace_path, file_path)
            with open(full_path, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            return f"Error: {str(e)}"

    def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Handle MCP request"""
        method = request.get('method')
        params = request.get('params', {})

        if method == "tools/list":
            return {
                "tools": [
                    {
                        "name": "list_files",
                        "description": "Get file list in directory",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "directory": {"type": "string", "default": "."}
                            }
                        }
                    },
                    {
                        "name": "read_file", 
                        "description": "Read file contents",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "file_path": {"type": "string"}
                            },
                            "required": ["file_path"]
                        }
                    }
                ]
            }

        elif method == "tools/call":
            tool_name = params.get('name')
            arguments = params.get('arguments', {})

            if tool_name == "list_files":
                result = self.list_files(arguments.get('directory', '.'))
                return {"content": [{"type": "text", "text": str(result)}]}

            elif tool_name == "read_file":
                result = self.read_file(arguments['file_path'])
                return {"content": [{"type": "text", "text": result}]}

        return {"error": "Unsupported method"}

def main():
    """MCP server main loop"""
    server = SimpleMCPServer()

    for line in sys.stdin:
        try:
            request = json.loads(line.strip())
            response = server.handle_request(request)
            print(json.dumps(response))
            sys.stdout.flush()
        except Exception as e:
            error_response = {"error": str(e)}
            print(json.dumps(error_response))
            sys.stdout.flush()

if __name__ == "__main__":
    main()

🎯 Intermediate: Practical Implementation Patterns

1. Database Integration MCP Server

A database operation MCP server commonly used in real projects.

# database_mcp_server.py
"""
Database operation MCP server
Practical implementation supporting SQLite/PostgreSQL
"""
import json
import sys
import sqlite3
import os
from typing import Dict, Any, List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DatabaseMCPServer:
    def __init__(self):
        self.db_path = os.getenv('DATABASE_URL', 'app.db')
        self.connection = None
        self.connect()

    def connect(self):
        """Connect to database"""
        try:
            self.connection = sqlite3.connect(self.db_path)
            self.connection.row_factory = sqlite3.Row
            logger.info(f"Connected to database: {self.db_path}")
        except Exception as e:
            logger.error(f"Database connection error: {e}")

    def execute_query(self, query: str, params: Optional[List] = None) -> List[Dict]:
        """Execute SQL query and return results"""
        try:
            cursor = self.connection.cursor()
            if params:
                cursor.execute(query, params)
            else:
                cursor.execute(query)

            if query.strip().upper().startswith('SELECT'):
                rows = cursor.fetchall()
                return [dict(row) for row in rows]
            else:
                self.connection.commit()
                return [{"affected_rows": cursor.rowcount}]

        except Exception as e:
            return [{"error": str(e)}]

    def get_table_schema(self, table_name: str) -> List[Dict]:
        """Get table schema"""
        query = f"PRAGMA table_info({table_name})"
        return self.execute_query(query)

    def list_tables(self) -> List[str]:
        """Get list of tables in database"""
        query = "SELECT name FROM sqlite_master WHERE type='table'"
        result = self.execute_query(query)
        return [row['name'] for row in result]

    def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Handle MCP request"""
        method = request.get('method')
        params = request.get('params', {})

        if method == "tools/list":
            return {
                "tools": [
                    {
                        "name": "execute_sql",
                        "description": "Execute SQL query",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "query": {"type": "string"},
                                "params": {"type": "array", "items": {"type": "string"}}
                            },
                            "required": ["query"]
                        }
                    },
                    {
                        "name": "get_schema",
                        "description": "Get table schema",
                        "inputSchema": {
                            "type": "object", 
                            "properties": {
                                "table_name": {"type": "string"}
                            },
                            "required": ["table_name"]
                        }
                    },
                    {
                        "name": "list_tables",
                        "description": "Get list of database tables",
                        "inputSchema": {"type": "object", "properties": {}}
                    }
                ]
            }

        elif method == "tools/call":
            tool_name = params.get('name')
            arguments = params.get('arguments', {})

            if tool_name == "execute_sql":
                result = self.execute_query(
                    arguments['query'], 
                    arguments.get('params')
                )
                return {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}

            elif tool_name == "get_schema":
                result = self.get_table_schema(arguments['table_name'])
                return {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}

            elif tool_name == "list_tables":
                result = self.list_tables()
                return {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}

        return {"error": "Unsupported method"}

def main():
    server = DatabaseMCPServer()

    for line in sys.stdin:
        try:
            request = json.loads(line.strip())
            response = server.handle_request(request)
            print(json.dumps(response))
            sys.stdout.flush()
        except Exception as e:
            error_response = {"error": str(e)}
            print(json.dumps(error_response))
            sys.stdout.flush()

if __name__ == "__main__":
    main()

2. API Integration MCP Server

An implementation example of an MCP server for external API integration.

# api_mcp_server.py
"""
External API integration MCP server
Implementation of REST API calls and response handling
"""
import json
import sys
import requests
import os
from typing import Dict, Any, Optional
import logging

class APIMCPServer:
    def __init__(self):
        self.base_url = os.getenv('API_BASE_URL', 'https://api.example.com')
        self.api_key = os.getenv('API_KEY')
        self.session = requests.Session()

        if self.api_key:
            self.session.headers.update({'Authorization': f'Bearer {self.api_key}'})

    def make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> Dict:
        """Execute API request"""
        try:
            url = f"{self.base_url}/{endpoint.lstrip('/')}"
            response = self.session.request(method, url, json=data)
            response.raise_for_status()
            return {
                "status": response.status_code,
                "data": response.json() if response.content else None
            }
        except requests.exceptions.RequestException as e:
            return {"error": str(e)}

    def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Handle MCP request"""
        method = request.get('method')
        params = request.get('params', {})

        if method == "tools/list":
            return {
                "tools": [
                    {
                        "name": "api_call",
                        "description": "Call external API",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "method": {
                                    "type": "string",
                                    "enum": ["GET", "POST", "PUT", "DELETE"]
                                },
                                "endpoint": {"type": "string"},
                                "data": {"type": "object"}
                            },
                            "required": ["method", "endpoint"]
                        }
                    }
                ]
            }

        elif method == "tools/call":
            tool_name = params.get('name')
            arguments = params.get('arguments', {})

            if tool_name == "api_call":
                result = self.make_request(
                    arguments['method'],
                    arguments['endpoint'],
                    arguments.get('data')
                )
                return {"content": [{"type": "text", "text": json.dumps(result, indent=2)}]}

        return {"error": "Unsupported method"}

def main():
    server = APIMCPServer()

    for line in sys.stdin:
        try:
            request = json.loads(line.strip())
            response = server.handle_request(request)
            print(json.dumps(response))
            sys.stdout.flush()
        except Exception as e:
            error_response = {"error": str(e)}
            print(json.dumps(error_response))
            sys.stdout.flush()

if __name__ == "__main__":
    main()

🏆 Advanced: Advanced Integration Patterns

1. Multi-Agent Coordination System

A system where multiple subagents collaborate to handle complex tasks.

# multi_agent_coordinator.py
"""
Multi-agent coordination system
Advanced implementation combining multiple subagents and MCP servers
"""
import asyncio
import json
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum

class AgentType(Enum):
    GENERAL_PURPOSE = "general-purpose"
    STATUSLINE_SETUP = "statusline-setup" 
    OUTPUT_STYLE_SETUP = "output-style-setup"

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    description: str
    agent_type: AgentType
    dependencies: List[str]
    status: TaskStatus = TaskStatus.PENDING
    result: Any = None
    error: str = None

class MultiAgentCoordinator:
    def __init__(self):
        self.tasks: Dict[str, Task] = {}
        self.mcp_connections: Dict[str, Any] = {}

    def add_task(self, task: Task):
        """Add task"""
        self.tasks[task.id] = task

    def add_mcp_server(self, name: str, server_config: Dict):
        """Add MCP server connection"""
        self.mcp_connections[name] = server_config

    async def execute_task(self, task_id: str) -> bool:
        """Execute single task"""
        task = self.tasks.get(task_id)
        if not task:
            return False

        # Check dependencies
        for dep_id in task.dependencies:
            dep_task = self.tasks.get(dep_id)
            if not dep_task or dep_task.status != TaskStatus.COMPLETED:
                return False

        task.status = TaskStatus.IN_PROGRESS

        try:
            # Invoke subagent (pseudo-code)
            if task.agent_type == AgentType.GENERAL_PURPOSE:
                result = await self._call_general_purpose_agent(task)
            elif task.agent_type == AgentType.STATUSLINE_SETUP:
                result = await self._call_statusline_agent(task)
            elif task.agent_type == AgentType.OUTPUT_STYLE_SETUP:
                result = await self._call_output_style_agent(task)

            task.result = result
            task.status = TaskStatus.COMPLETED
            return True

        except Exception as e:
            task.error = str(e)
            task.status = TaskStatus.FAILED
            return False

    async def _call_general_purpose_agent(self, task: Task) -> Any:
        """Call general-purpose agent"""
        # Pseudo-implementation of Claude Code Task tool invocation
        agent_prompt = f"""
        Task: {task.description}

        Available MCP servers: {list(self.mcp_connections.keys())}

        Execute this task and return the result in structured format.
        """

        # In actual implementation, use Claude Code API
        return {"status": "completed", "data": "Sample result"}

    async def _call_statusline_agent(self, task: Task) -> Any:
        """Call status line setup agent"""
        return {"status": "completed", "config": "Status line configuration"}

    async def _call_output_style_agent(self, task: Task) -> Any:
        """Call output style setup agent"""
        return {"status": "completed", "style": "Output style configuration"}

    async def execute_workflow(self, task_ids: List[str]) -> Dict[str, Any]:
        """Execute entire workflow"""
        results = {}

        while True:
            # Identify executable tasks
            executable_tasks = [
                task_id for task_id in task_ids
                if self.tasks[task_id].status == TaskStatus.PENDING
                and all(
                    self.tasks[dep_id].status == TaskStatus.COMPLETED
                    for dep_id in self.tasks[task_id].dependencies
                )
            ]

            if not executable_tasks:
                break

            # Execute in parallel
            tasks = [self.execute_task(task_id) for task_id in executable_tasks]
            await asyncio.gather(*tasks)

        # Aggregate results
        for task_id in task_ids:
            task = self.tasks[task_id]
            results[task_id] = {
                "status": task.status.value,
                "result": task.result,
                "error": task.error
            }

        return results

# Usage example
async def main():
    coordinator = MultiAgentCoordinator()

    # Add MCP servers
    coordinator.add_mcp_server("database", {
        "command": "python",
        "args": ["database_mcp_server.py"]
    })

    coordinator.add_mcp_server("api", {
        "command": "python", 
        "args": ["api_mcp_server.py"]
    })

    # Add tasks
    coordinator.add_task(Task(
        id="task1",
        description="Fetch user data from database",
        agent_type=AgentType.GENERAL_PURPOSE,
        dependencies=[]
    ))

    coordinator.add_task(Task(
        id="task2", 
        description="Send data to external system via API",
        agent_type=AgentType.GENERAL_PURPOSE,
        dependencies=["task1"]
    ))

    coordinator.add_task(Task(
        id="task3",
        description="Display processing status in status line",
        agent_type=AgentType.STATUSLINE_SETUP,
        dependencies=["task2"]
    ))

    # Execute workflow
    results = await coordinator.execute_workflow(["task1", "task2", "task3"])
    print(json.dumps(results, indent=2))

if __name__ == "__main__":
    asyncio.run(main())

2. Real-time Monitoring MCP Server

An MCP server for real-time system state monitoring.

# monitoring_mcp_server.py
"""
Real-time monitoring MCP server
Integrating system metrics, log monitoring, and alert functionality
"""
import json
import sys
import psutil
import time
import threading
from typing import Dict, Any, List
from queue import Queue
import logging
from datetime import datetime

class MonitoringMCPServer:
    def __init__(self):
        self.metrics_queue = Queue()
        self.alerts = []
        self.monitoring_active = False
        self.monitoring_thread = None

    def start_monitoring(self, interval: int = 5):
        """Start monitoring"""
        self.monitoring_active = True
        self.monitoring_thread = threading.Thread(
            target=self._monitor_loop, 
            args=(interval,)
        )
        self.monitoring_thread.daemon = True
        self.monitoring_thread.start()

    def stop_monitoring(self):
        """Stop monitoring"""
        self.monitoring_active = False
        if self.monitoring_thread:
            self.monitoring_thread.join()

    def _monitor_loop(self, interval: int):
        """Monitoring loop"""
        while self.monitoring_active:
            try:
                metrics = self._collect_metrics()
                self.metrics_queue.put(metrics)
                self._check_alerts(metrics)
                time.sleep(interval)
            except Exception as e:
                logging.error(f"Monitoring error: {e}")

    def _collect_metrics(self) -> Dict[str, Any]:
        """Collect system metrics"""
        return {
            "timestamp": datetime.now().isoformat(),
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage('/').percent,
            "network_io": psutil.net_io_counters()._asdict(),
            "process_count": len(psutil.pids())
        }

    def _check_alerts(self, metrics: Dict[str, Any]):
        """Check alert conditions"""
        alerts = []

        if metrics["cpu_percent"] > 80:
            alerts.append({
                "level": "warning",
                "message": f"High CPU usage: {metrics['cpu_percent']}%",
                "timestamp": metrics["timestamp"]
            })

        if metrics["memory_percent"] > 85:
            alerts.append({
                "level": "critical",
                "message": f"High memory usage: {metrics['memory_percent']}%", 
                "timestamp": metrics["timestamp"]
            })

        if metrics["disk_usage"] > 90:
            alerts.append({
                "level": "critical",
                "message": f"High disk usage: {metrics['disk_usage']}%",
                "timestamp": metrics["timestamp"]
            })

        self.alerts.extend(alerts)

        # Limit alert count
        if len(self.alerts) > 100:
            self.alerts = self.alerts[-50:]

    def get_latest_metrics(self, count: int = 10) -> List[Dict[str, Any]]:
        """Get latest metrics"""
        metrics = []
        temp_queue = Queue()

        # Get from queue
        while not self.metrics_queue.empty() and len(metrics) < count:
            metric = self.metrics_queue.get()
            metrics.append(metric)
            temp_queue.put(metric)

        # Put back to queue
        while not temp_queue.empty():
            self.metrics_queue.put(temp_queue.get())

        return list(reversed(metrics))

    def get_alerts(self, level: str = None) -> List[Dict[str, Any]]:
        """Get alerts"""
        if level:
            return [alert for alert in self.alerts if alert["level"] == level]
        return self.alerts.copy()

    def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Handle MCP request"""
        method = request.get('method')
        params = request.get('params', {})

        if method == "tools/list":
            return {
                "tools": [
                    {
                        "name": "start_monitoring",
                        "description": "Start system monitoring",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "interval": {"type": "integer", "default": 5}
                            }
                        }
                    },
                    {
                        "name": "stop_monitoring", 
                        "description": "Stop system monitoring",
                        "inputSchema": {"type": "object", "properties": {}}
                    },
                    {
                        "name": "get_metrics",
                        "description": "Get latest system metrics",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "count": {"type": "integer", "default": 10}
                            }
                        }
                    },
                    {
                        "name": "get_alerts",
                        "description": "Get system alerts",
                        "inputSchema": {
                            "type": "object",
                            "properties": {
                                "level": {
                                    "type": "string",
                                    "enum": ["warning", "critical"]
                                }
                            }
                        }
                    }
                ]
            }

        elif method == "tools/call":
            tool_name = params.get('name')
            arguments = params.get('arguments', {})

            if tool_name == "start_monitoring":
                self.start_monitoring(arguments.get('interval', 5))
                return {"content": [{"type": "text", "text": "Monitoring started"}]}

            elif tool_name == "stop_monitoring":
                self.stop_monitoring()
                return {"content": [{"type": "text", "text": "Monitoring stopped"}]}

            elif tool_name == "get_metrics":
                metrics = self.get_latest_metrics(arguments.get('count', 10))
                return {"content": [{"type": "text", "text": json.dumps(metrics, indent=2)}]}

            elif tool_name == "get_alerts":
                alerts = self.get_alerts(arguments.get('level'))
                return {"content": [{"type": "text", "text": json.dumps(alerts, indent=2)}]}

        return {"error": "Unsupported method"}

def main():
    server = MonitoringMCPServer()

    for line in sys.stdin:
        try:
            request = json.loads(line.strip())
            response = server.handle_request(request)
            print(json.dumps(response))
            sys.stdout.flush()
        except Exception as e:
            error_response = {"error": str(e)}
            print(json.dumps(error_response))
            sys.stdout.flush()

if __name__ == "__main__":
    main()

🛠️ Integration Configuration and Deployment

Claude Code Configuration File

Configuration example for a complete integration environment. .mcp.json supports environment variable expansion (${VAR:-default} syntax).

{
  "mcpServers": {
    "database": {
      "command": "python",
      "args": ["/project/mcp/database_mcp_server.py"],
      "env": {
        "DATABASE_URL": "${DATABASE_URL:-sqlite:///app.db}",
        "LOG_LEVEL": "${LOG_LEVEL:-INFO}"
      }
    },
    "api-integration": {
      "command": "python",
      "args": ["/project/mcp/api_mcp_server.py"],
      "env": {
        "API_BASE_URL": "https://api.example.com",
        "API_KEY": "${API_KEY}"
      }
    },
    "monitoring": {
      "command": "python",
      "args": ["/project/mcp/monitoring_mcp_server.py"],
      "env": {
        "MONITORING_INTERVAL": "5"
      }
    }
  }
}

Hooks Configuration

Hooks are configured in .claude/settings.json or your project's settings.json. Use the official event names.

{
  "hooks": {
    "PreToolUse": [
      {
        "matcher": "Bash",
        "hooks": [
          {
            "type": "command",
            "command": "echo 'Before tool execution: Bash'"
          }
        ]
      }
    ],
    "PostToolUse": [
      {
        "matcher": "Bash",
        "hooks": [
          {
            "type": "command",
            "command": "echo 'After tool execution: Bash'"
          }
        ]
      }
    ],
    "SubagentStart": [
      {
        "matcher": "general-purpose",
        "hooks": [
          {
            "type": "command",
            "command": "echo 'Subagent started: $(date)'"
          }
        ]
      }
    ],
    "SubagentStop": [
      {
        "matcher": "general-purpose",
        "hooks": [
          {
            "type": "command",
            "command": "echo 'Subagent stopped: $(date)'"
          }
        ]
      }
    ]
  }
}

Available Hook Events

Event NameTiming
PreToolUseBefore tool execution
PostToolUseAfter tool execution
SubagentStartWhen subagent starts
SubagentStopWhen subagent stops
NotificationOn notification
StopOn session end

Docker Container Configuration

Build a complete development environment including MCP servers with Docker.

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    sqlite3 \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies
COPY requirements.txt .
RUN pip install -r requirements.txt

# Copy MCP server files
COPY mcp/ ./mcp/
COPY config/ ./config/

# Claude Code configuration
RUN mkdir -p /root/.claude
COPY config/claude-config.json /root/.claude/config.json

# Set execution permissions
RUN chmod +x mcp/*.py

EXPOSE 8000

CMD ["python", "-m", "http.server", "8000"]
# docker-compose.yml
version: '3.8'

services:
  claude-mcp-environment:
    build: .
    volumes:
      - .:/app
      - claude-cache:/root/.claude
    environment:
      - DATABASE_URL=sqlite:///data/app.db
      - API_KEY=${API_KEY}
      - PYTHONPATH=/app
    ports:
      - "8000:8000"
    depends_on:
      - database

  database:
    image: postgres:14
    environment:
      - POSTGRES_DB=claude_mcp
      - POSTGRES_USER=claude
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

volumes:
  claude-cache:
  postgres_data:

🚨 Troubleshooting

Common Issues and Solutions

1. MCP Server Connection Error

Symptom: Unable to connect to MCP server

# Debugging steps
# 1. Test if MCP server runs standalone
echo '{"method": "tools/list", "params": {}}' | python mcp/simple_mcp_server.py

# 2. Check configuration file syntax
python -m json.tool config/claude-config.json

# 3. Verify environment variables
env | grep -E "(DATABASE_URL|API_KEY)"

Solution: - Check path and Python environment - Fix permission settings
- Properly configure environment variables

2. Subagent No Response

Symptom: Subagent does not respond or behaves unexpectedly

# Debug code
def debug_subagent_call():
    """Debug subagent invocation"""
    import logging
    logging.basicConfig(level=logging.DEBUG)

    # Enable logging for Task tool calls
    task_params = {
        "description": "Test task - display file list",
        "prompt": "Display file list in current directory. Report in detail including debug information.",
        "subagent_type": "general-purpose"
    }

    print(f"Invocation parameters: {task_params}")
    # Actually invoke Claude Code Task tool here

Solution: - Clarify prompt - Verify subagent type - Adjust timeout settings

3. Performance Issues

Symptom: Slow processing, high memory usage

# Performance optimization example
import asyncio
import aiofiles
from concurrent.futures import ThreadPoolExecutor

class OptimizedMCPServer:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=4)

    async def handle_async_request(self, request):
        """Asynchronous request handling"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor, 
            self.handle_sync_request, 
            request
        )

    def handle_sync_request(self, request):
        """Synchronous request handling (existing implementation)"""
        # Existing handle_request logic
        pass

Solution: - Introduce asynchronous processing - Implement caching - Optimize batch processing

📈 Best Practices

1. Error Handling

# Robust error handling example
import traceback
from functools import wraps

def mcp_error_handler(func):
    """MCP server error handler"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            error_info = {
                "error": str(e),
                "type": type(e).__name__,
                "traceback": traceback.format_exc()
            }
            return {"content": [{"type": "text", "text": json.dumps(error_info)}]}
    return wrapper

class RobustMCPServer:
    @mcp_error_handler
    def handle_request(self, request):
        # Request handling logic
        pass

2. Logging Strategy

# Structured logging implementation
import logging
import json
from datetime import datetime

class MCPLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        handler = logging.StreamHandler()
        handler.setFormatter(self.JSONFormatter())
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    class JSONFormatter(logging.Formatter):
        def format(self, record):
            log_entry = {
                "timestamp": datetime.utcnow().isoformat(),
                "level": record.levelname,
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName
            }
            return json.dumps(log_entry)

    def log_request(self, request, response):
        """Log request/response"""
        self.logger.info("MCP Request", extra={
            "request": request,
            "response_status": "success" if "error" not in response else "error"
        })

3. Testing Strategy

# MCP server test example
import unittest
import json
from io import StringIO
import sys

class TestMCPServer(unittest.TestCase):
    def setUp(self):
        self.server = SimpleMCPServer()

    def test_tools_list(self):
        """Test tools/list method"""
        request = {"method": "tools/list", "params": {}}
        response = self.server.handle_request(request)

        self.assertIn("tools", response)
        self.assertIsInstance(response["tools"], list)

    def test_file_operations(self):
        """Test file operations"""
        # Create test file
        test_file = "test_file.txt"
        with open(test_file, 'w') as f:
            f.write("Test content")

        # Test file read
        request = {
            "method": "tools/call",
            "params": {
                "name": "read_file",
                "arguments": {"file_path": test_file}
            }
        }
        response = self.server.handle_request(request)

        self.assertIn("content", response)
        self.assertIn("Test content", response["content"][0]["text"])

        # Cleanup
        os.remove(test_file)

if __name__ == "__main__":
    unittest.main()

Related articles for deeper implementation knowledge:

📝 Summary

Claude Code's subagent functionality and MCP integration can dramatically streamline traditional development workflows.

Implementation Key Points

  1. Incremental Adoption: Start with simple MCP servers and expand functionality gradually
  2. Error Handling: Ensure stable production operation with robust error handling
  3. Performance Optimization: Handle large-scale projects with async processing and caching
  4. Test Automation: Testing strategy for continuous quality assurance

Next Steps

  • Implement MCP servers for your own projects
  • Build collaborative systems with multiple subagents
  • Set up monitoring and alerting in production environments

Use this implementation guide as a reference to maximize the potential of AI-assisted development environments.