- Claude Code
- MCP
- Model Context Protocol
- Real-time Processing
- Data Integration
- AI Development
- TypeScript
- Python
- mcp-implementation categories:
- AI Development & Automation
- Development Efficiency author: Claude Code status: production-ready content_type: practical-guide
Complete Guide to Claude Code MCP Implementation: From External Data Integration to Real-time Processing¶
Introduction¶
One of the flagship features of Claude Code GA introduced in this morning's article is MCP (Model Context Protocol) support. MCP is a new protocol that seamlessly connects external data sources with AI models, enabling real-time data processing and context expansion. This article provides an in-depth explanation of MCP implementation methods with concrete code examples.
Key Points¶
Real-time Data Integration
Bidirectional communication with external APIs, databases, and streaming services
Dynamic Context Management
Dynamically expand and update model context at runtime
Secure Authentication Mechanisms
Support for diverse authentication methods including OAuth2, API Key, and mTLS
Event-driven Processing
Automated processing based on webhooks and streaming events
MCP Basic Architecture¶
Protocol Overview¶
MCP consists of three components:
graph LR
A[Claude Code Client] -->|MCP Request| B[MCP Server]
B -->|Data Response| A
B <-->|Fetch/Push| C[External Data Source]
style A fill:#f9f,stroke:#333,stroke-width:2px
style B fill:#bbf,stroke:#333,stroke-width:2px
style C fill:#bfb,stroke:#333,stroke-width:2pxChoosing a Transport Method¶
// MCP configuration file: mcp-config.ts
export interface MCPConfig {
transport: 'sse' | 'http' | 'websocket';
endpoint: string;
authentication?: AuthConfig;
retry?: RetryConfig;
timeout?: number;
}
// SSE (Server-Sent Events) - for real-time data
const sseConfig: MCPConfig = {
transport: 'sse',
endpoint: 'https://api.example.com/mcp/stream',
authentication: {
type: 'bearer',
token: process.env.MCP_API_KEY
},
retry: {
maxAttempts: 3,
backoffMultiplier: 2
}
};
// HTTP - for request/response pattern processing
const httpConfig: MCPConfig = {
transport: 'http',
endpoint: 'https://api.example.com/mcp/query',
timeout: 30000 // 30 seconds
};
Implementation Step 1: Building an MCP Server¶
MCP Server Implementation in Node.js/TypeScript¶
// mcp-server.ts
import express from 'express';
import { MCPServer, MCPRequest, MCPResponse } from '@anthropic/mcp-sdk';
class CustomMCPServer extends MCPServer {
private dataCache = new Map<string, any>();
constructor() {
super({
capabilities: ['query', 'subscribe', 'update'],
version: '1.0.0'
});
}
// Data query handler
async handleQuery(request: MCPRequest): Promise<MCPResponse> {
const { query, parameters } = request.data;
switch (query) {
case 'user_data':
return await this.fetchUserData(parameters.userId);
case 'analytics':
return await this.getAnalytics(parameters);
case 'code_context':
return await this.getCodeContext(parameters.repository);
default:
throw new Error(`Unknown query: ${query}`);
}
}
// Real-time subscription
async handleSubscribe(request: MCPRequest): Promise<AsyncGenerator<MCPResponse>> {
const { topic, filters } = request.data;
async function* streamData() {
const eventSource = new EventSource(`/events/${topic}`);
for await (const event of eventSource) {
if (this.matchesFilters(event, filters)) {
yield {
type: 'event',
data: event.data,
timestamp: new Date().toISOString()
};
}
}
}
return streamData();
}
// Data update handler
async handleUpdate(request: MCPRequest): Promise<MCPResponse> {
const { target, data, validation } = request.data;
// Validation
if (validation) {
await this.validateData(data, validation);
}
// Update processing
await this.updateDataSource(target, data);
// Notify Claude Code
return {
status: 'success',
updated: true,
timestamp: new Date().toISOString()
};
}
}
// Express integration
const app = express();
const mcpServer = new CustomMCPServer();
app.post('/mcp/query', async (req, res) => {
try {
const response = await mcpServer.handleQuery(req.body);
res.json(response);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
app.get('/mcp/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const subscription = await mcpServer.handleSubscribe(req.query);
for await (const event of subscription) {
res.write(`data: ${JSON.stringify(event)}\n\n`);
}
});
app.listen(3000, () => {
console.log('MCP Server running on port 3000');
});
Python Implementation Example¶
# mcp_server.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from typing import AsyncGenerator, Dict, Any
import asyncio
import json
from datetime import datetime
app = FastAPI()
class MCPServer:
def __init__(self):
self.data_sources = {}
self.subscriptions = {}
async def query_handler(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Data query processing"""
query_type = request.get('query')
params = request.get('parameters', {})
handlers = {
'database_query': self.handle_db_query,
'api_fetch': self.handle_api_fetch,
'cache_lookup': self.handle_cache_lookup
}
handler = handlers.get(query_type)
if not handler:
raise ValueError(f"Unknown query type: {query_type}")
return await handler(params)
async def handle_db_query(self, params: Dict) -> Dict:
"""Execute database query"""
# Actual DB connection and query execution
import asyncpg
conn = await asyncpg.connect(
host='localhost',
database=params.get('database'),
user=params.get('user'),
password=params.get('password')
)
try:
result = await conn.fetch(params.get('sql'))
return {
'data': [dict(row) for row in result],
'count': len(result),
'timestamp': datetime.utcnow().isoformat()
}
finally:
await conn.close()
async def stream_handler(self, topic: str) -> AsyncGenerator[str, None]:
"""Real-time streaming processing"""
queue = asyncio.Queue()
self.subscriptions[topic] = queue
try:
while True:
data = await queue.get()
yield f"data: {json.dumps(data)}\n\n"
finally:
del self.subscriptions[topic]
mcp_server = MCPServer()
@app.post("/mcp/query")
async def query_endpoint(request: Dict[str, Any]):
try:
result = await mcp_server.query_handler(request)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/mcp/stream/{topic}")
async def stream_endpoint(topic: str):
return StreamingResponse(
mcp_server.stream_handler(topic),
media_type="text/event-stream"
)
Implementation Step 2: Claude Code Client Configuration¶
Implementation Using TypeScript SDK¶
// claude-code-mcp-client.ts
import { ClaudeCodeClient, MCPClient } from '@anthropic/claude-code-sdk';
class EnhancedClaudeClient {
private claudeClient: ClaudeCodeClient;
private mcpClient: MCPClient;
constructor(apiKey: string, mcpConfig: MCPConfig) {
this.claudeClient = new ClaudeCodeClient({ apiKey });
this.mcpClient = new MCPClient(mcpConfig);
// Setup MCP event handlers
this.setupEventHandlers();
}
private setupEventHandlers() {
// Data update events
this.mcpClient.on('data-update', async (event) => {
console.log('Received data update:', event);
await this.processDataUpdate(event);
});
// Error handling
this.mcpClient.on('error', (error) => {
console.error('MCP Error:', error);
this.handleMCPError(error);
});
// Connection status monitoring
this.mcpClient.on('connection-status', (status) => {
console.log('MCP Connection:', status);
});
}
// Code generation with external data
async generateWithContext(prompt: string, dataQuery: any) {
// Fetch data via MCP
const contextData = await this.mcpClient.query(dataQuery);
// Send to Claude Code with context
const response = await this.claudeClient.complete({
prompt: prompt,
context: {
external_data: contextData,
timestamp: new Date().toISOString()
},
model: 'claude-opus-4'
});
return response;
}
// Real-time collaboration
async startCollaboration(sessionId: string) {
// Start MCP stream
const stream = await this.mcpClient.subscribe({
topic: `collaboration/${sessionId}`,
filters: {
event_types: ['code_change', 'cursor_move', 'selection']
}
});
// Process stream events
for await (const event of stream) {
await this.handleCollaborationEvent(event);
}
}
private async handleCollaborationEvent(event: any) {
switch (event.type) {
case 'code_change':
await this.syncCodeChange(event.data);
break;
case 'cursor_move':
await this.updateCursorPosition(event.data);
break;
case 'selection':
await this.syncSelection(event.data);
break;
}
}
}
// Usage example
const client = new EnhancedClaudeClient(
process.env.CLAUDE_API_KEY!,
{
transport: 'sse',
endpoint: 'https://mcp.example.com',
authentication: {
type: 'oauth2',
clientId: process.env.OAUTH_CLIENT_ID!,
clientSecret: process.env.OAUTH_CLIENT_SECRET!,
tokenEndpoint: 'https://auth.example.com/token'
}
}
);
// Generate code based on database schema
const generatedCode = await client.generateWithContext(
'Create a TypeScript interface for the user table',
{
query: 'database_schema',
parameters: {
table: 'users',
include_relations: true
}
}
);
Implementation Step 3: Authentication and Security¶
Multi-layer Authentication Implementation¶
// mcp-auth.ts
import jwt from 'jsonwebtoken';
import { createHash } from 'crypto';
export class MCPAuthManager {
private secretKey: string;
private allowedOrigins: Set<string>;
constructor(config: AuthConfig) {
this.secretKey = config.secretKey;
this.allowedOrigins = new Set(config.allowedOrigins);
}
// JWT authentication
async authenticateJWT(token: string): Promise<boolean> {
try {
const decoded = jwt.verify(token, this.secretKey) as any;
// Additional validation
if (decoded.exp < Date.now() / 1000) {
throw new Error('Token expired');
}
if (!decoded.permissions?.includes('mcp:access')) {
throw new Error('Insufficient permissions');
}
return true;
} catch (error) {
console.error('JWT Auth failed:', error);
return false;
}
}
// API Key authentication
async authenticateAPIKey(apiKey: string): Promise<boolean> {
const hashedKey = createHash('sha256').update(apiKey).digest('hex');
// Validate hashed key in database
const isValid = await this.validateAPIKey(hashedKey);
if (isValid) {
// Rate limit check
await this.checkRateLimit(apiKey);
}
return isValid;
}
// CORS validation
validateOrigin(origin: string): boolean {
return this.allowedOrigins.has(origin);
}
// Request signature validation
validateRequestSignature(
payload: string,
signature: string,
timestamp: number
): boolean {
// Timestamp validation (within 5 minutes)
if (Math.abs(Date.now() - timestamp) > 300000) {
return false;
}
// HMAC signature validation
const expectedSignature = createHash('sha256')
.update(`${timestamp}.${payload}`)
.digest('hex');
return signature === expectedSignature;
}
}
// Express middleware
export const mcpAuthMiddleware = (authManager: MCPAuthManager) => {
return async (req: any, res: any, next: any) => {
try {
// Bearer token authentication
const authHeader = req.headers.authorization;
if (authHeader?.startsWith('Bearer ')) {
const token = authHeader.slice(7);
if (await authManager.authenticateJWT(token)) {
return next();
}
}
// API Key authentication
const apiKey = req.headers['x-api-key'];
if (apiKey && await authManager.authenticateAPIKey(apiKey)) {
return next();
}
// Signature authentication
const signature = req.headers['x-signature'];
const timestamp = parseInt(req.headers['x-timestamp'] || '0');
if (signature && authManager.validateRequestSignature(
JSON.stringify(req.body),
signature,
timestamp
)) {
return next();
}
res.status(401).json({ error: 'Unauthorized' });
} catch (error) {
res.status(500).json({ error: 'Authentication error' });
}
};
};
Practical Use Cases¶
1. Database-driven Code Generation¶
// database-code-generator.ts
class DatabaseCodeGenerator {
private mcpClient: MCPClient;
private claudeClient: ClaudeCodeClient;
async generateORMModels(databaseUrl: string) {
// Fetch database schema
const schema = await this.mcpClient.query({
query: 'database_schema',
parameters: {
url: databaseUrl,
include_indexes: true,
include_constraints: true
}
});
// Generate models for each table
const models = [];
for (const table of schema.tables) {
const model = await this.claudeClient.complete({
prompt: `Generate a Prisma model for table: ${table.name}`,
context: {
schema: table,
relationships: schema.relationships.filter(
r => r.from_table === table.name || r.to_table === table.name
)
}
});
models.push(model);
}
// Generate schema.prisma file
return this.combineModels(models);
}
}
2. Real-time Log Analysis¶
// realtime-log-analyzer.ts
class RealtimeLogAnalyzer {
private mcpClient: MCPClient;
async startAnalysis(logSource: string) {
const stream = await this.mcpClient.subscribe({
topic: 'logs',
filters: {
source: logSource,
severity: ['error', 'warning'],
pattern: '.*exception.*|.*error.*'
}
});
for await (const logEntry of stream) {
// Analyze logs with Claude Code
const analysis = await this.analyzeLog(logEntry);
if (analysis.requiresAction) {
await this.handleCriticalIssue(analysis);
}
}
}
private async analyzeLog(logEntry: any) {
return await this.claudeClient.complete({
prompt: 'Analyze this log entry and suggest fixes',
context: {
log: logEntry,
recent_logs: await this.getRecentLogs(logEntry.source),
system_state: await this.getSystemState()
}
});
}
}
3. CI/CD Pipeline Integration¶
# .github/workflows/mcp-enhanced-ci.yml
name: MCP Enhanced CI/CD
on:
push:
branches: [main]
pull_request:
jobs:
mcp-analysis:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup MCP Connection
run: |
npm install @anthropic/claude-code-sdk
export MCP_ENDPOINT=${{ secrets.MCP_ENDPOINT }}
export MCP_API_KEY=${{ secrets.MCP_API_KEY }}
- name: Code Analysis with External Context
run: |
node scripts/mcp-code-analysis.js \
--context production-metrics \
--include-dependencies \
--suggest-optimizations
- name: Generate Test Cases
run: |
node scripts/mcp-test-generator.js \
--coverage-target 90 \
--use-production-data \
--mock-external-services
Troubleshooting¶
Common Issues and Solutions¶
// mcp-troubleshooter.ts
export class MCPTroubleshooter {
// Connection timeout mitigation
static handleConnectionTimeout() {
return {
retry: {
maxAttempts: 5,
backoff: 'exponential',
initialDelay: 1000
},
fallback: {
useCache: true,
degradedMode: true
}
};
}
// Data sync error mitigation
static handleSyncError(error: any) {
const strategies = {
'version_mismatch': () => this.resolveVersionConflict(),
'data_corruption': () => this.validateAndRepair(),
'rate_limit': () => this.implementBackoff(),
'auth_failure': () => this.refreshCredentials()
};
const strategy = strategies[error.type];
return strategy ? strategy() : this.defaultErrorHandler(error);
}
// Performance optimization
static optimizePerformance() {
return {
batching: {
enabled: true,
maxBatchSize: 100,
maxWaitTime: 50 // ms
},
caching: {
ttl: 300, // 5 minutes
maxSize: '100MB'
},
compression: {
enabled: true,
algorithm: 'gzip'
}
};
}
}
Debugging Tools¶
// mcp-debugger.ts
export class MCPDebugger {
private logs: any[] = [];
enableDebugMode() {
// Request/response logging
this.mcpClient.interceptors.request.use((config) => {
console.log('MCP Request:', {
url: config.url,
method: config.method,
headers: config.headers,
data: config.data
});
return config;
});
// Metrics collection
this.mcpClient.on('metrics', (metrics) => {
console.log('MCP Metrics:', {
latency: metrics.latency,
throughput: metrics.throughput,
errorRate: metrics.errorRate
});
});
}
// Trace functionality
async trace(operation: string, fn: Function) {
const traceId = generateTraceId();
const startTime = Date.now();
try {
console.log(`[${traceId}] Starting ${operation}`);
const result = await fn();
console.log(`[${traceId}] Completed in ${Date.now() - startTime}ms`);
return result;
} catch (error) {
console.error(`[${traceId}] Failed:`, error);
throw error;
}
}
}
Performance Optimization¶
Batch Processing and Caching¶
// mcp-optimizer.ts
export class MCPOptimizer {
private batchQueue: Map<string, any[]> = new Map();
private cache: LRUCache<string, any>;
constructor() {
this.cache = new LRUCache({
max: 1000,
ttl: 1000 * 60 * 5 // 5 minutes
});
// Execute batch processing periodically
setInterval(() => this.processBatches(), 100);
}
// Batch query
async batchQuery(query: any): Promise<any> {
const cacheKey = this.generateCacheKey(query);
// Check cache
const cached = this.cache.get(cacheKey);
if (cached) return cached;
// Add to batch queue
return new Promise((resolve) => {
const queueKey = query.type;
if (!this.batchQueue.has(queueKey)) {
this.batchQueue.set(queueKey, []);
}
this.batchQueue.get(queueKey)!.push({
query,
resolve,
cacheKey
});
});
}
private async processBatches() {
for (const [type, batch] of this.batchQueue) {
if (batch.length === 0) continue;
// Execute batch request
const queries = batch.map(b => b.query);
const results = await this.mcpClient.batchQuery({
type,
queries
});
// Distribute results to each request
batch.forEach((item, index) => {
const result = results[index];
this.cache.set(item.cacheKey, result);
item.resolve(result);
});
// Clear queue
this.batchQueue.set(type, []);
}
}
}
Summary¶
- MCP Implementation Basics: Implementation of SSE/HTTP transport, authentication setup, and error handling is crucial
- Security: Properly implement multi-layer authentication, signature validation, and rate limiting
- Performance: Optimize response with batch processing, caching, and compression
- Practical Applications: Support diverse use cases including database integration, real-time analysis, and CI/CD integration
- Operational Tips: Establish proper monitoring, debugging tools, and troubleshooting procedures
By leveraging MCP, Claude Code evolves from a simple code generation tool to a powerful development platform that integrates with external systems. Use the implementation patterns introduced here as a reference to start utilizing MCP for your own use cases.