
Implementing Multi-Agent SEO Systems: Complete Code Implementation
From architecture to running code - complete implementation of distributed SEO intelligence
Building on the architectural design, this guide provides complete working implementations of each system component with production-ready code examples.
Implementation Stack
graph TB
subgraph "Application Layer"
API[FastAPI Web Interface]
CLI[Command Line Interface]
WEB[Web Dashboard]
end
subgraph "Agent Framework"
CREW[CrewAI Framework]
LANG[LangChain Tools]
ASYNC[AsyncIO Runtime]
end
subgraph "Data & Persistence"
PG[PostgreSQL Database]
REDIS[Redis Cache/Queue]
GRAPH[Neo4j Knowledge Graph]
end
subgraph "External Integration"
GOOGLE[Google APIs]
SEO_APIS[SEO Tool APIs]
WEB_SCRAPE[Web Scraping]
end
API --> CREW
CLI --> CREW
WEB --> API
CREW --> LANG
LANG --> ASYNC
CREW --> PG
CREW --> REDIS
CREW --> GRAPH
LANG --> GOOGLE
LANG --> SEO_APIS
LANG --> WEB_SCRAPE
1️⃣ Core Framework Implementation
Project Structure
seo_intelligence/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ ├── performance_monitor.py
│ ├── technical_audit.py
│ ├── competitor_intelligence.py
│ ├── link_analysis.py
│ └── content_optimization.py
├── tools/
│ ├── __init__.py
│ ├── google_apis.py
│ ├── seo_apis.py
│ ├── web_crawler.py
│ └── data_processors.py
├── core/
│ ├── __init__.py
│ ├── orchestrator.py
│ ├── message_bus.py
│ ├── event_system.py
│ └── knowledge_base.py
├── models/
│ ├── __init__.py
│ ├── events.py
│ ├── tasks.py
│ └── insights.py
├── config/
│ ├── __init__.py
│ ├── settings.py
│ └── logging.py
├── api/
│ ├── __init__.py
│ ├── main.py
│ └── routes/
├── tests/
├── docker/
├── scripts/
└── requirements.txt
Base Agent Framework
🔽 View BaseAgent Implementation
class BaseAgent(ABC):
def __init__(self, config: AgentConfig, message_bus: MessageBus):
self.crew_agent = Agent(
role=config.role, goal=config.goal,
tools=self._initialize_tools()
)
# Full implementation available on GitHub
🔗 Complete BaseAgent Implementation
Key Features:
- Abstract base class for all SEO agents
- CrewAI integration with task management
- Event-driven inter-agent communication
- Configurable concurrency and error handling
Message Bus Implementation
class MessageBus:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.subscribers = {}
async def publish(self, event: SEOEvent):
await self.redis.xadd(f"seo_events:{event.type.value}",
fields={'data': json.dumps(asdict(event))}, id='*')
🔗 Complete MessageBus Implementation
Key Features:
- Redis Streams for reliable event delivery
- Asynchronous event processing
- Type-safe event routing and dispatch
2️⃣ Agent Implementations
Performance Monitor Agent
class PerformanceMonitorAgent(BaseAgent):
async def process_domain(self, domain: str) -> Dict[str, Any]:
# Parallel data collection from Google APIs
tasks = [
self._collect_gsc_data(domain),
self._collect_ga_data(domain),
self._analyze_rankings(domain)
]
results = await asyncio.gather(*tasks)
return self._detect_anomalies(results)
🔽 View Advanced Traffic Analysis
async def _detect_anomalies(self, gsc_data, ga_data):
# Statistical anomaly detection
if recent_traffic < baseline_traffic * 0.8:
await self.publish_insight('traffic_drop', domain, {
'severity': 'HIGH',
'magnitude': drop_percentage
})
🔗 Complete PerformanceMonitor Implementation
Key Analytics:
- Google Search Console & Analytics integration
- Statistical anomaly detection (2σ threshold)
- Real-time traffic pattern analysis
- Automated alert generation for 20%+ drops
Technical Audit Agent
class TechnicalAuditAgent(BaseAgent):
async def process_domain(self, domain: str) -> Dict[str, Any]:
# Parallel technical analysis
audit_tasks = [
self._crawl_and_analyze(domain),
self._analyze_core_web_vitals(domain),
self._validate_structured_data(domain)
]
results = await asyncio.gather(*audit_tasks)
return self._prioritize_issues(results)
🔽 View Technical Analysis Logic
def _analyze_page_elements(self, page_data):
issues = []
# Title analysis
if not page_data.get('title'):
issues.append({'type': 'missing_title', 'severity': 'HIGH'})
# Core Web Vitals
if page_data.get('load_time', 0) > 3.0:
issues.append({'type': 'slow_load', 'severity': 'HIGH'})
return self._prioritize_issues(issues)
🔗 Complete TechnicalAudit Implementation
Audit Coverage:
- PageSpeed Insights integration for Core Web Vitals
- Comprehensive crawling with BeautifulSoup
- Structured data validation
- Priority scoring algorithm (severity × impact)
Implementation Flow Diagram
sequenceDiagram
participant USER as User/Scheduler
participant ORCH as Orchestrator
participant PM as Performance Monitor
participant TA as Technical Audit
participant CI as Competitor Intelligence
participant KB as Knowledge Base
participant MSG as Message Bus
USER->>ORCH: Analyze domain "example.com"
ORCH->>PM: Start performance analysis
ORCH->>TA: Start technical audit
ORCH->>CI: Start competitor analysis
PM->>MSG: Collect GSC data
PM->>MSG: Collect GA data
TA->>MSG: Crawl website
CI->>MSG: Query competitor APIs
PM->>PM: Detect traffic anomaly
PM->>MSG: Publish TrafficAnomalyEvent
MSG->>TA: Route event to Technical Audit
MSG->>CI: Route event to Competitor Intelligence
TA->>TA: Emergency technical audit
CI->>CI: Competitor movement analysis
TA->>MSG: Publish findings
CI->>MSG: Publish findings
MSG->>KB: Correlate all findings
KB->>KB: Generate root cause analysis
KB->>ORCH: Return comprehensive analysis
ORCH->>USER: Deliver insights and recommendations
3️⃣ Data Integration & Processing
Google APIs Integration
class GoogleSearchConsoleAPI:
def __init__(self, service_account_path: str):
self.credentials = Credentials.from_service_account_file(service_account_path)
self.service = build('searchconsole', 'v1', credentials=self.credentials)
self.rate_limiter = AsyncRateLimit(calls=1200, period=60)
async def get_performance_data(self, site_url: str, start_date: datetime,
end_date: datetime) -> pd.DataFrame:
await self.rate_limiter.acquire()
# API call and DataFrame conversion logic
return pd.DataFrame(data)
🔽 View Complete API Integration
class AsyncRateLimit:
async def acquire(self):
async with self.lock:
if len(self.call_times) >= self.calls:
sleep_time = self.period - (now - self.call_times[0]).total_seconds()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
🔗 Complete Google APIs Implementation
Integration Features:
- Service account authentication for GSC & GA4
- Async rate limiting (1200 calls/minute)
- Pandas DataFrame output for analysis
- Error handling and retry logic
Web Crawler Implementation
class WebCrawler:
async def crawl_domain(self, domain: str) -> Dict[str, Any]:
async with aiohttp.ClientSession() as session:
# Crawl in batches with respect for robots.txt
batch_results = await asyncio.gather(*[
self._crawl_page(url, domain) for url in batch
])
return self._analyze_crawl_results(batch_results)
def _extract_seo_elements(self, soup):
return {
'title': self._extract_title(soup),
'meta_description': self._extract_meta_description(soup),
'h1_tags': soup.find_all('h1'),
'schema_markup': self._check_schema_markup(soup)
}
🔽 View Detailed Crawling Logic
@dataclass
class CrawlConfig:
max_pages: int = 1000
delay_between_requests: float = 1.0
respect_robots_txt: bool = True
user_agent: str = "SEO-Intelligence-Bot/1.0"
async def _crawl_page(self, url, domain, config):
# Handle redirects, extract technical elements
if response.status in [301, 302, 303, 307, 308]:
return {'redirect_url': str(response.url)}
soup = BeautifulSoup(content, 'html.parser')
return self._extract_technical_elements(soup)
🔗 Complete WebCrawler Implementation
Crawler Features:
- Async batched crawling with configurable concurrency
- Robots.txt compliance and rate limiting
- Comprehensive SEO element extraction
- Redirect handling and error recovery
4️⃣ Deployment & Operations
Docker Configuration
FROM python:3.11-slim
WORKDIR /app
# Install dependencies and copy code
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Security: non-root user
RUN useradd --create-home app && chown -R app:app /app
USER app
EXPOSE 8000
CMD ["python", "-m", "api.main"]
Infrastructure Stack:
- PostgreSQL: Structured data and task queues
- Redis: Message bus and caching layer
- Neo4j: Knowledge graph relationships
- Agent Workers: Scaled processing (3 replicas)
- Scheduler: Automated task orchestration
System Monitoring
class SystemMonitor:
async def start_monitoring(self):
monitoring_tasks = [
self._collect_system_metrics(),
self._monitor_agent_health(),
self._generate_alerts()
]
await asyncio.gather(*monitoring_tasks)
async def _generate_alerts(self):
# Alert thresholds: CPU > 80%, Memory > 85%, Error rate > 10%
if latest.error_rate > 0.1:
await self._send_alert('high_error_rate', 'CRITICAL')
🔽 View Complete Monitoring System
@dataclass
class SystemMetrics:
cpu_usage: float
memory_usage: float
active_agents: int
tasks_in_queue: int
error_rate: float
response_time_avg: float
# Monitoring every minute, alerts every 5 minutes
async def _collect_system_metrics(self):
cpu_usage = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
await self.metrics_store.store_metrics(metrics)
🔗 Complete Monitoring Implementation
Monitoring Features:
- System Metrics: CPU, memory, disk usage tracking
- Application Metrics: Agent health, queue sizes, response times
- Automated Alerts: Configurable thresholds with severity levels
- Performance History: Rolling metrics for trend analysis
🚀 Quick Start
# Clone the complete implementation
git clone https://github.com/avinash-gupta-rdz/ai-seo-multi-agent-system
cd ai-seo-multi-agent-system
# Start with Docker
docker-compose up -d
# Initialize agents for domain analysis
python -m core.orchestrator --domain example.com
📈 System Architecture
This implementation demonstrates advanced AI engineering patterns:
- Event-driven coordination between specialized agents
- Async Python patterns for high-performance data processing
- Distributed deployment with Docker and message queuing
- Production monitoring with automated alerting
- Scalable architecture supporting multiple concurrent analyses
🔗 Complete Implementation Repository
Ready for production deployment with comprehensive error handling, monitoring, and scalability built-in.