๐ค Claude Code Autonomous & Scheduling Complete Guide - Implementing cron/batch Automated Execution¶
Search Needs Addressed in This Article
- "claude code autonomous" - Building autonomously operating systems
- "claude code cron" - Implementing scheduled execution systems
- "claude code batch" - Automating batch processing
- "claude code scheduling" - Complete automation of task management
๐ฏ Overview¶
With Claude Code's automated execution and scheduling features, you can build fully autonomous development workflows. This guide provides step-by-step practical instructions from implementation to operation of cron, batch processing, and autonomous systems.
๐ Table of Contents¶
- Fundamentals: How Claude Code Automated Execution Works
- Cron Integration: Building Scheduled Execution Systems
- Batch Processing: Automating Large-Scale Tasks
- Autonomous Systems: Fully Autonomous Workflows
- Practical: Production-Grade Scheduling Systems
- Operations, Monitoring & Troubleshooting
๐๏ธ Fundamentals: How Claude Code Automated Execution Works¶
Three Levels of Claude Code Automated Execution¶
graph TD
A[Level 1: Basic Automation] --> B[Auto-approve Commands]
A --> C[File Watch Execution]
D[Level 2: Scheduling] --> E[cron Integration]
D --> F[Batch Processing]
G[Level 3: Autonomous Systems] --> H[Fully Autonomous Execution]
G --> I[Automatic Error Recovery]Basic Automated Execution Configuration¶
# Claude Code auto-approve configuration
claude --auto-approve
# Automation configuration via environment variables
export CLAUDE_AUTO_APPROVE=true
export CLAUDE_BATCH_MODE=true
export CLAUDE_LOGGING=verbose
Automation Configuration via CLAUDE.md¶
## ๐ค Automation Configuration
### Basic Automated Execution
- `--auto-approve`: Auto-approve all tool executions
- `--batch`: Non-interactive execution in batch mode
- `--schedule`: Enable scheduling features
### Execution Level Settings
- `SAFE`: Auto-execute read-only operations only
- `MODERATE`: Auto-execute up to file editing
- `AGGRESSIVE`: Auto-execute all operations
โฐ Cron Integration: Building Scheduled Execution Systems¶
Basic Cron Configuration¶
# Open crontab editor
crontab -e
# Execute code quality check with Claude Code at minute 0 of every hour
0 * * * * /usr/local/bin/claude --auto-approve --batch "Check code quality and report improvement points"
# GA4 analysis & article creation daily at 6:00 AM
0 6 * * * cd /path/to/project && claude --auto-approve "Analyze GA4 data and create article"
# Security check every Monday at 9:00 AM
0 9 * * 1 claude --batch "Scan for security vulnerabilities and fix"
Advanced Cron Script¶
#!/bin/bash
# claude-cron-runner.sh - Claude Code dedicated cron runner
# Configuration
CLAUDE_BIN="/usr/local/bin/claude"
PROJECT_DIR="/home/user/projects/myapp"
LOG_DIR="/var/log/claude-cron"
DATE=$(date +"%Y%m%d_%H%M%S")
# Create log directory
mkdir -p "$LOG_DIR"
# Change to project directory
cd "$PROJECT_DIR" || exit 1
# Claude Code execution function
run_claude_task() {
local task="$1"
local log_file="$LOG_DIR/claude_${DATE}_$(echo "$task" | tr ' ' '_').log"
echo "๐ Starting: $task" | tee "$log_file"
# Execute Claude Code (30 minute timeout)
timeout 1800 "$CLAUDE_BIN" --auto-approve --batch "$task" 2>&1 | tee -a "$log_file"
local exit_code=$?
if [ $exit_code -eq 0 ]; then
echo "โ
Success: $task" | tee -a "$log_file"
# Slack notification (success)
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"โ
Claude Code Task Success: $task\"}" \
"$SLACK_WEBHOOK_URL"
else
echo "โ Failed: $task (Exit code: $exit_code)" | tee -a "$log_file"
# Slack notification (failure)
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"โ Claude Code Task Failed: $task\"}" \
"$SLACK_WEBHOOK_URL"
fi
return $exit_code
}
# Make executable
chmod +x claude-cron-runner.sh
# Register in crontab
echo "0 6 * * * /path/to/claude-cron-runner.sh" | crontab -
Cron Execution in Docker Environment¶
# Dockerfile.claude-cron
FROM python:3.12-slim
# Install Claude Code
RUN pip install claude-code
# Install cron
RUN apt-get update && apt-get install -y cron
# Cron job file
COPY claude-crontab /etc/cron.d/claude-jobs
RUN chmod 0644 /etc/cron.d/claude-jobs
RUN crontab /etc/cron.d/claude-jobs
# Entrypoint
COPY entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
CMD ["/entrypoint.sh"]
#!/bin/bash
# entrypoint.sh
# Start cron
service cron start
# Claude Code daemon
exec claude daemon --auto-approve --schedule
๐ฆ Batch Processing: Automating Large-Scale Tasks¶
Batch Processing Design Pattern¶
#!/usr/bin/env python3
# claude-batch-manager.py
import subprocess
import json
import time
from datetime import datetime
from pathlib import Path
class ClaudeBatchManager:
def __init__(self, config_file="batch-config.json"):
self.config = self.load_config(config_file)
self.log_dir = Path("logs/batch")
self.log_dir.mkdir(parents=True, exist_ok=True)
def load_config(self, config_file):
"""Load batch configuration"""
with open(config_file) as f:
return json.load(f)
def run_batch_task(self, task_config):
"""Execute single batch task"""
task_name = task_config["name"]
prompt = task_config["prompt"]
timeout = task_config.get("timeout", 3600)
retry_count = task_config.get("retry", 3)
for attempt in range(retry_count):
try:
print(f"๐ Running batch task: {task_name} (Attempt {attempt + 1})")
# Execute Claude Code
result = subprocess.run([
"claude", "--auto-approve", "--batch", prompt
], timeout=timeout, capture_output=True, text=True)
if result.returncode == 0:
self.log_success(task_name, result.stdout)
return True
else:
self.log_error(task_name, result.stderr)
except subprocess.TimeoutExpired:
self.log_error(task_name, f"Timeout after {timeout}s")
except Exception as e:
self.log_error(task_name, str(e))
return False
def run_batch_workflow(self, workflow_name):
"""Execute batch workflow"""
workflow = self.config["workflows"][workflow_name]
results = []
for task_config in workflow["tasks"]:
success = self.run_batch_task(task_config)
results.append({
"task": task_config["name"],
"success": success,
"timestamp": datetime.now().isoformat()
})
# Failure handling
if not success and workflow.get("stop_on_failure", True):
print(f"โ Workflow {workflow_name} stopped due to task failure")
break
# Interval
if "interval" in task_config:
time.sleep(task_config["interval"])
self.log_workflow_results(workflow_name, results)
return results
def log_success(self, task_name, output):
"""Log success"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = self.log_dir / f"{task_name}_{timestamp}_success.log"
log_file.write_text(output)
def log_error(self, task_name, error):
"""Log error"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = self.log_dir / f"{task_name}_{timestamp}_error.log"
log_file.write_text(error)
def log_workflow_results(self, workflow_name, results):
"""Log workflow results"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = self.log_dir / f"workflow_{workflow_name}_{timestamp}.json"
log_file.write_text(json.dumps(results, indent=2))
if __name__ == "__main__":
import sys
manager = ClaudeBatchManager()
if len(sys.argv) > 1:
workflow_name = sys.argv[1]
manager.run_batch_workflow(workflow_name)
else:
print("Usage: python claude-batch-manager.py <workflow_name>")
Batch Configuration File¶
{
"workflows": {
"daily_maintenance": {
"description": "Daily maintenance workflow",
"stop_on_failure": false,
"tasks": [
{
"name": "code_quality_check",
"prompt": "Check the quality of all code files and identify improvement points",
"timeout": 1800,
"retry": 2
},
{
"name": "security_scan",
"prompt": "Scan for security vulnerabilities and propose fixes",
"timeout": 2400,
"retry": 3,
"interval": 60
},
{
"name": "documentation_update",
"prompt": "Update README.md and comments to latest state",
"timeout": 1200,
"retry": 1
}
]
},
"analytics_processing": {
"description": "Analytics data processing workflow",
"stop_on_failure": true,
"tasks": [
{
"name": "ga4_analysis",
"prompt": "Analyze GA4 data and create trend report",
"timeout": 3600,
"retry": 2
},
{
"name": "article_generation",
"prompt": "Create technical article based on analysis results",
"timeout": 3600,
"retry": 1,
"interval": 300
}
]
}
}
}
๐ Autonomous Systems: Fully Autonomous Workflows¶
Autonomous System Architecture¶
graph TB
A[Event Detection] --> B[Task Generation]
B --> C[Claude Code Execution]
C --> D[Result Evaluation]
D --> E{Success?}
E -->|Yes| F[Generate Next Task]
E -->|No| G[Error Analysis]
G --> H[Formulate Repair Strategy]
H --> C
F --> I[Continuation Decision]
I --> J{Continue?}
J -->|Yes| A
J -->|No| K[Complete]Autonomous Engine Implementation¶
#!/usr/bin/env python3
# claude-autonomous-engine.py
import asyncio
import subprocess
import json
import time
from datetime import datetime
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class AutonomousEngine:
def __init__(self):
self.is_running = False
self.task_queue = asyncio.Queue()
self.results_log = []
async def start(self):
"""Start autonomous engine"""
self.is_running = True
print("๐ค Starting Claude Code autonomous engine")
# Start concurrent tasks
await asyncio.gather(
self.event_monitor(),
self.task_processor(),
self.health_checker(),
self.adaptive_learner()
)
async def event_monitor(self):
"""Event monitoring"""
observer = Observer()
handler = AutonomousEventHandler(self)
observer.schedule(handler, ".", recursive=True)
observer.start()
while self.is_running:
# Git change monitoring
await self.check_git_changes()
# Error log monitoring
await self.check_error_logs()
# External API change monitoring
await self.check_external_changes()
await asyncio.sleep(30)
async def task_processor(self):
"""Task processing engine"""
while self.is_running:
try:
task = await asyncio.wait_for(
self.task_queue.get(), timeout=60
)
result = await self.execute_claude_task(task)
await self.evaluate_result(task, result)
except asyncio.TimeoutError:
continue
async def execute_claude_task(self, task):
"""Execute Claude Code task"""
prompt = task["prompt"]
context = task.get("context", "")
priority = task.get("priority", "normal")
# Enhance prompt
enhanced_prompt = f"""
{context}
Task: {prompt}
Execution policy:
- Automatically attempt fixes if errors occur
- Propose next actions after completion
- Output results in structured JSON format
"""
try:
process = await asyncio.create_subprocess_exec(
"claude", "--auto-approve", "--batch", enhanced_prompt,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return {
"success": process.returncode == 0,
"output": stdout.decode(),
"error": stderr.decode(),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
async def evaluate_result(self, task, result):
"""Evaluate result and determine next action"""
self.results_log.append({
"task": task,
"result": result
})
if result["success"]:
# On success: generate next tasks
next_tasks = await self.generate_next_tasks(task, result)
for next_task in next_tasks:
await self.task_queue.put(next_task)
else:
# On failure: generate repair task
repair_task = await self.generate_repair_task(task, result)
if repair_task:
await self.task_queue.put(repair_task)
async def generate_next_tasks(self, completed_task, result):
"""Generate next tasks"""
# Infer next actions from completed task results
analysis_prompt = f"""
Completed task: {completed_task['prompt']}
Execution result: {result['output']}
Based on this result, propose up to 3 tasks to execute next.
Output each task in JSON format with the following structure:
{{
"prompt": "specific task content",
"priority": "high|normal|low",
"context": "execution context"
}}
"""
# Request Claude Code to generate next tasks
next_result = await self.execute_claude_task({
"prompt": analysis_prompt,
"context": "next task generation"
})
# Parse results (simplified)
return [] # Implementation omitted
async def adaptive_learner(self):
"""Adaptive learning system"""
while self.is_running:
if len(self.results_log) >= 10:
# Learn from past results
await self.analyze_patterns()
await self.update_strategies()
await asyncio.sleep(300) # 5 minute interval
async def health_checker(self):
"""Health check"""
while self.is_running:
# Check system status
await self.check_system_health()
await asyncio.sleep(120)
class AutonomousEventHandler(FileSystemEventHandler):
def __init__(self, engine):
self.engine = engine
def on_modified(self, event):
if not event.is_directory:
# Generate task when file change detected
task = {
"prompt": f"File {event.src_path} has been modified. Analyze the impact and execute necessary actions",
"context": f"File change event: {event.src_path}",
"priority": "normal"
}
asyncio.create_task(self.engine.task_queue.put(task))
if __name__ == "__main__":
engine = AutonomousEngine()
asyncio.run(engine.start())
๐ Practical: Production-Grade Scheduling Systems¶
GitHub Actions Integration¶
# .github/workflows/claude-autonomous.yml
name: Claude Code Autonomous Workflow
on:
schedule:
# Daily at 6:00 AM
- cron: '0 6 * * *'
# Every hour at minute 15
- cron: '15 * * * *'
push:
branches: [ main ]
workflow_dispatch:
jobs:
claude-autonomous:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Claude Code
run: |
pip install claude-code
echo "${{ secrets.CLAUDE_API_KEY }}" > ~/.claude/api_key
- name: Morning Analytics & Article Creation
if: github.event.schedule == '0 6 * * *'
run: |
claude --auto-approve --batch "Analyze GA4 and GSC data to create search intent-focused technical article"
- name: Hourly Code Quality Check
if: github.event.schedule == '15 * * * *'
run: |
claude --auto-approve --batch "Check codebase quality and fix improvement points if any"
- name: Git Push Changed Files
run: |
git config user.name "Claude Code Bot"
git config user.email "claude@example.com"
git add .
git diff --staged --quiet || git commit -m "๐ค Claude Code auto-update - $(date)"
git push
Kubernetes CronJob Implementation¶
# claude-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: claude-autonomous-scheduler
spec:
schedule: "0 6 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: claude-runner
image: claude-code:latest
command:
- /bin/bash
- -c
- |
cd /workspace
claude --auto-approve --batch "Analyze entire project and execute optimization and improvements"
env:
- name: CLAUDE_API_KEY
valueFrom:
secretKeyRef:
name: claude-secrets
key: api-key
volumeMounts:
- name: workspace
mountPath: /workspace
volumes:
- name: workspace
persistentVolumeClaim:
claimName: claude-workspace-pvc
restartPolicy: OnFailure
---
apiVersion: v1
kind: Secret
metadata:
name: claude-secrets
type: Opaque
data:
api-key: <base64-encoded-claude-api-key>
AWS Lambda Integration¶
# lambda_function.py
import json
import subprocess
import boto3
from datetime import datetime
def lambda_handler(event, context):
"""AWS Lambda Claude Code execution handler"""
# Scheduled execution from CloudWatch Events
if event.get('source') == 'aws.events':
task_type = event.get('detail-type', 'scheduled-task')
if task_type == 'daily-analytics':
result = run_claude_task(
"Analyze GA4 data and create today's trend article"
)
elif task_type == 'code-quality':
result = run_claude_task(
"Quality check and improve all code in GitHub repository"
)
else:
result = {"error": "Unknown task type"}
# Manual execution via API Gateway
elif event.get('httpMethod'):
body = json.loads(event.get('body', '{}'))
prompt = body.get('prompt', 'Execute default task')
result = run_claude_task(prompt)
else:
result = {"error": "Invalid event source"}
# Record results in CloudWatch Logs
print(f"Claude Code execution result: {json.dumps(result, ensure_ascii=False)}")
return {
'statusCode': 200,
'body': json.dumps(result, ensure_ascii=False),
'headers': {
'Content-Type': 'application/json; charset=utf-8'
}
}
def run_claude_task(prompt):
"""Execute Claude Code task"""
try:
# Execute Claude Code in Lambda environment
result = subprocess.run([
'/opt/bin/claude', '--auto-approve', '--batch', prompt
], capture_output=True, text=True, timeout=300)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr if result.returncode != 0 else None,
"timestamp": datetime.now().isoformat()
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Task timeout (300s)",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
๐ง Operations, Monitoring & Troubleshooting¶
Monitoring Dashboard¶
#!/usr/bin/env python3
# claude-monitoring-dashboard.py
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
from datetime import datetime, timedelta
import json
import subprocess
def main():
st.title("๐ค Claude Code Autonomous System Monitoring Dashboard")
# Sidebar
st.sidebar.header("Monitoring Settings")
time_range = st.sidebar.selectbox(
"Time Range", ["1 hour", "6 hours", "24 hours", "7 days"]
)
# Display metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Running Tasks", get_running_tasks())
with col2:
st.metric("Success Rate", f"{get_success_rate():.1f}%")
with col3:
st.metric("Avg Execution Time", f"{get_avg_execution_time():.1f}s")
with col4:
st.metric("Error Rate", f"{get_error_rate():.1f}%")
# Execution history graph
st.subheader("๐ Execution History")
chart_data = get_execution_history(time_range)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=chart_data['timestamp'],
y=chart_data['success_count'],
name='Success',
line=dict(color='green')
))
fig.add_trace(go.Scatter(
x=chart_data['timestamp'],
y=chart_data['error_count'],
name='Error',
line=dict(color='red')
))
st.plotly_chart(fig, use_container_width=True)
# Live logs
st.subheader("๐ Live Logs")
if st.button("Refresh Logs"):
logs = get_recent_logs()
for log in logs:
if log['level'] == 'ERROR':
st.error(f"{log['timestamp']}: {log['message']}")
elif log['level'] == 'WARNING':
st.warning(f"{log['timestamp']}: {log['message']}")
else:
st.info(f"{log['timestamp']}: {log['message']}")
# Manual execution
st.subheader("๐ Manual Execution")
manual_prompt = st.text_area("Claude Code Prompt")
if st.button("Execute"):
if manual_prompt:
with st.spinner("Executing..."):
result = execute_manual_task(manual_prompt)
if result['success']:
st.success("Execution completed")
st.code(result['output'])
else:
st.error("Execution failed")
st.code(result['error'])
def get_running_tasks():
# Implementation omitted
return 3
def get_success_rate():
# Implementation omitted
return 94.2
def execute_manual_task(prompt):
try:
result = subprocess.run([
'claude', '--auto-approve', '--batch', prompt
], capture_output=True, text=True, timeout=300)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
if __name__ == "__main__":
main()
Troubleshooting Guide¶
Common Issues and Solutions¶
# 1. Claude Code not responding
# Cause: API key expired/network issues
# Solution:
claude auth refresh
ping api.anthropic.com
# 2. Automated execution stopped
# Cause: cron configuration error/process abnormal termination
# Solution:
sudo systemctl status cron
crontab -l
ps aux | grep claude
# 3. Out of memory error
# Cause: Simultaneous execution of too many tasks
# Solution:
# Limit batch size
export CLAUDE_MAX_CONCURRENT_TASKS=3
export CLAUDE_MEMORY_LIMIT=2G
# 4. File permission error
# Cause: Insufficient permissions for executing user
# Solution:
sudo chown -R claude-user:claude-group /path/to/project
chmod +x /path/to/claude-scripts/*
Log Analysis and Debugging¶
# Claude Code dedicated log analysis
tail -f /var/log/claude/autonomous.log | grep -E "(ERROR|FATAL)"
# Performance analysis
claude debug --profiling --duration=3600
# Memory usage monitoring
watch -n 5 'ps aux | grep claude | awk "{sum+=\$4} END {print \"Claude Memory Usage: \" sum \"%\"}"'
๐ฏ Summary¶
With Claude Code's automated execution and scheduling features, you can achieve:
๐ฅ Key Outcomes¶
- Full automation: 24-hour autonomous execution via cron/batch processing
- High reliability: Automatic error recovery and monitoring system integration
- Scalability: Docker/Kubernetes/AWS Lambda support
- Operational efficiency: Continuous improvement without human intervention
๐ Expected Benefits¶
- Development efficiency improved by 300%
- Error response time reduced by 80%
- Code quality continuously improved
- Operational costs reduced by 50%
๐ Next Steps¶
- Start with basic cron configuration
- Expand to batch processing system
- Build fully autonomous system
- Establish monitoring and operations framework
Related Articles
- โก Claude Code Automated Execution Guide - Basic automated execution configuration
- ๐ณ Claude Code Docker Integration - Execution in container environments
- ๐ GA4 Data-Driven Development - Implementing analytics automation
I hope this article helps you build Claude Code's automated execution and scheduling systems. Feel free to reach out with questions or suggestions for improvement!