Implementing Multi-Agent SEO Systems: Complete Code Implementation

Implementing Multi-Agent SEO Systems: Complete Code Implementation


From architecture to running code - complete implementation of distributed SEO intelligence

Building on the architectural design, this guide provides complete working implementations of each system component with production-ready code examples.

Implementation Stack

graph TB
    subgraph "Application Layer"
        API[FastAPI Web Interface]
        CLI[Command Line Interface]  
        WEB[Web Dashboard]
    end
    
    subgraph "Agent Framework"
        CREW[CrewAI Framework]
        LANG[LangChain Tools]
        ASYNC[AsyncIO Runtime]
    end
    
    subgraph "Data & Persistence"
        PG[PostgreSQL Database]
        REDIS[Redis Cache/Queue]
        GRAPH[Neo4j Knowledge Graph]
    end
    
    subgraph "External Integration"
        GOOGLE[Google APIs]
        SEO_APIS[SEO Tool APIs]
        WEB_SCRAPE[Web Scraping]
    end
    
    API --> CREW
    CLI --> CREW
    WEB --> API
    
    CREW --> LANG
    LANG --> ASYNC
    
    CREW --> PG
    CREW --> REDIS
    CREW --> GRAPH
    
    LANG --> GOOGLE
    LANG --> SEO_APIS  
    LANG --> WEB_SCRAPE

1️⃣ Core Framework Implementation

Project Structure

seo_intelligence/
├── agents/
│   ├── __init__.py
│   ├── base_agent.py
│   ├── performance_monitor.py
│   ├── technical_audit.py
│   ├── competitor_intelligence.py
│   ├── link_analysis.py
│   └── content_optimization.py
├── tools/
│   ├── __init__.py
│   ├── google_apis.py
│   ├── seo_apis.py
│   ├── web_crawler.py
│   └── data_processors.py
├── core/
│   ├── __init__.py
│   ├── orchestrator.py
│   ├── message_bus.py
│   ├── event_system.py
│   └── knowledge_base.py
├── models/
│   ├── __init__.py
│   ├── events.py
│   ├── tasks.py
│   └── insights.py
├── config/
│   ├── __init__.py
│   ├── settings.py
│   └── logging.py
├── api/
│   ├── __init__.py
│   ├── main.py
│   └── routes/
├── tests/
├── docker/
├── scripts/
└── requirements.txt

Base Agent Framework

🔽 View BaseAgent Implementation
class BaseAgent(ABC):
    def __init__(self, config: AgentConfig, message_bus: MessageBus):
        self.crew_agent = Agent(
            role=config.role, goal=config.goal, 
            tools=self._initialize_tools()
        )
        # Full implementation available on GitHub

🔗 Complete BaseAgent Implementation

Key Features:

  • Abstract base class for all SEO agents
  • CrewAI integration with task management
  • Event-driven inter-agent communication
  • Configurable concurrency and error handling

Message Bus Implementation

class MessageBus:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.subscribers = {}
        
    async def publish(self, event: SEOEvent):
        await self.redis.xadd(f"seo_events:{event.type.value}", 
                             fields={'data': json.dumps(asdict(event))}, id='*')

🔗 Complete MessageBus Implementation

Key Features:

  • Redis Streams for reliable event delivery
  • Asynchronous event processing
  • Type-safe event routing and dispatch

2️⃣ Agent Implementations

Performance Monitor Agent

class PerformanceMonitorAgent(BaseAgent):
    async def process_domain(self, domain: str) -> Dict[str, Any]:
        # Parallel data collection from Google APIs
        tasks = [
            self._collect_gsc_data(domain),
            self._collect_ga_data(domain),
            self._analyze_rankings(domain)
        ]
        
        results = await asyncio.gather(*tasks)
        return self._detect_anomalies(results)
🔽 View Advanced Traffic Analysis
async def _detect_anomalies(self, gsc_data, ga_data):
    # Statistical anomaly detection
    if recent_traffic < baseline_traffic * 0.8:
        await self.publish_insight('traffic_drop', domain, {
            'severity': 'HIGH',
            'magnitude': drop_percentage
        })

🔗 Complete PerformanceMonitor Implementation

Key Analytics:

  • Google Search Console & Analytics integration
  • Statistical anomaly detection (2σ threshold)
  • Real-time traffic pattern analysis
  • Automated alert generation for 20%+ drops

Technical Audit Agent

class TechnicalAuditAgent(BaseAgent):
    async def process_domain(self, domain: str) -> Dict[str, Any]:
        # Parallel technical analysis
        audit_tasks = [
            self._crawl_and_analyze(domain),
            self._analyze_core_web_vitals(domain), 
            self._validate_structured_data(domain)
        ]
        
        results = await asyncio.gather(*audit_tasks)
        return self._prioritize_issues(results)
🔽 View Technical Analysis Logic
def _analyze_page_elements(self, page_data):
    issues = []
    
    # Title analysis
    if not page_data.get('title'):
        issues.append({'type': 'missing_title', 'severity': 'HIGH'})
    
    # Core Web Vitals
    if page_data.get('load_time', 0) > 3.0:
        issues.append({'type': 'slow_load', 'severity': 'HIGH'})
        
    return self._prioritize_issues(issues)

🔗 Complete TechnicalAudit Implementation

Audit Coverage:

  • PageSpeed Insights integration for Core Web Vitals
  • Comprehensive crawling with BeautifulSoup
  • Structured data validation
  • Priority scoring algorithm (severity × impact)

Implementation Flow Diagram

sequenceDiagram
    participant USER as User/Scheduler
    participant ORCH as Orchestrator
    participant PM as Performance Monitor
    participant TA as Technical Audit
    participant CI as Competitor Intelligence
    participant KB as Knowledge Base
    participant MSG as Message Bus
    
    USER->>ORCH: Analyze domain "example.com"
    ORCH->>PM: Start performance analysis
    ORCH->>TA: Start technical audit
    ORCH->>CI: Start competitor analysis
    
    PM->>MSG: Collect GSC data
    PM->>MSG: Collect GA data
    TA->>MSG: Crawl website
    CI->>MSG: Query competitor APIs
    
    PM->>PM: Detect traffic anomaly
    PM->>MSG: Publish TrafficAnomalyEvent
    
    MSG->>TA: Route event to Technical Audit
    MSG->>CI: Route event to Competitor Intelligence
    
    TA->>TA: Emergency technical audit
    CI->>CI: Competitor movement analysis
    
    TA->>MSG: Publish findings
    CI->>MSG: Publish findings
    
    MSG->>KB: Correlate all findings
    KB->>KB: Generate root cause analysis
    
    KB->>ORCH: Return comprehensive analysis
    ORCH->>USER: Deliver insights and recommendations

3️⃣ Data Integration & Processing

Google APIs Integration

class GoogleSearchConsoleAPI:
    def __init__(self, service_account_path: str):
        self.credentials = Credentials.from_service_account_file(service_account_path)
        self.service = build('searchconsole', 'v1', credentials=self.credentials)
        self.rate_limiter = AsyncRateLimit(calls=1200, period=60)
    
    async def get_performance_data(self, site_url: str, start_date: datetime, 
                                 end_date: datetime) -> pd.DataFrame:
        await self.rate_limiter.acquire()
        # API call and DataFrame conversion logic
        return pd.DataFrame(data)
🔽 View Complete API Integration
class AsyncRateLimit:
    async def acquire(self):
        async with self.lock:
            if len(self.call_times) >= self.calls:
                sleep_time = self.period - (now - self.call_times[0]).total_seconds()
                if sleep_time > 0:
                    await asyncio.sleep(sleep_time)

🔗 Complete Google APIs Implementation

Integration Features:

  • Service account authentication for GSC & GA4
  • Async rate limiting (1200 calls/minute)
  • Pandas DataFrame output for analysis
  • Error handling and retry logic

Web Crawler Implementation

class WebCrawler:
    async def crawl_domain(self, domain: str) -> Dict[str, Any]:
        async with aiohttp.ClientSession() as session:
            # Crawl in batches with respect for robots.txt
            batch_results = await asyncio.gather(*[
                self._crawl_page(url, domain) for url in batch
            ])
            
            return self._analyze_crawl_results(batch_results)
    
    def _extract_seo_elements(self, soup):
        return {
            'title': self._extract_title(soup),
            'meta_description': self._extract_meta_description(soup),
            'h1_tags': soup.find_all('h1'),
            'schema_markup': self._check_schema_markup(soup)
        }
🔽 View Detailed Crawling Logic
@dataclass
class CrawlConfig:
    max_pages: int = 1000
    delay_between_requests: float = 1.0
    respect_robots_txt: bool = True
    user_agent: str = "SEO-Intelligence-Bot/1.0"

async def _crawl_page(self, url, domain, config):
    # Handle redirects, extract technical elements
    if response.status in [301, 302, 303, 307, 308]:
        return {'redirect_url': str(response.url)}
    
    soup = BeautifulSoup(content, 'html.parser')
    return self._extract_technical_elements(soup)

🔗 Complete WebCrawler Implementation

Crawler Features:

  • Async batched crawling with configurable concurrency
  • Robots.txt compliance and rate limiting
  • Comprehensive SEO element extraction
  • Redirect handling and error recovery

4️⃣ Deployment & Operations

Docker Configuration

FROM python:3.11-slim
WORKDIR /app

# Install dependencies and copy code
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

# Security: non-root user
RUN useradd --create-home app && chown -R app:app /app
USER app

EXPOSE 8000
CMD ["python", "-m", "api.main"]

🔗 Complete Docker Setup

Infrastructure Stack:

  • PostgreSQL: Structured data and task queues
  • Redis: Message bus and caching layer
  • Neo4j: Knowledge graph relationships
  • Agent Workers: Scaled processing (3 replicas)
  • Scheduler: Automated task orchestration

System Monitoring

class SystemMonitor:
    async def start_monitoring(self):
        monitoring_tasks = [
            self._collect_system_metrics(),
            self._monitor_agent_health(), 
            self._generate_alerts()
        ]
        await asyncio.gather(*monitoring_tasks)
    
    async def _generate_alerts(self):
        # Alert thresholds: CPU > 80%, Memory > 85%, Error rate > 10%
        if latest.error_rate > 0.1:
            await self._send_alert('high_error_rate', 'CRITICAL')
🔽 View Complete Monitoring System
@dataclass
class SystemMetrics:
    cpu_usage: float
    memory_usage: float 
    active_agents: int
    tasks_in_queue: int
    error_rate: float
    response_time_avg: float

# Monitoring every minute, alerts every 5 minutes
async def _collect_system_metrics(self):
    cpu_usage = psutil.cpu_percent(interval=1)
    memory = psutil.virtual_memory()
    await self.metrics_store.store_metrics(metrics)

🔗 Complete Monitoring Implementation

Monitoring Features:

  • System Metrics: CPU, memory, disk usage tracking
  • Application Metrics: Agent health, queue sizes, response times
  • Automated Alerts: Configurable thresholds with severity levels
  • Performance History: Rolling metrics for trend analysis

🚀 Quick Start

# Clone the complete implementation
git clone https://github.com/avinash-gupta-rdz/ai-seo-multi-agent-system
cd ai-seo-multi-agent-system

# Start with Docker
docker-compose up -d

# Initialize agents for domain analysis
python -m core.orchestrator --domain example.com

📈 System Architecture

This implementation demonstrates advanced AI engineering patterns:

  • Event-driven coordination between specialized agents
  • Async Python patterns for high-performance data processing
  • Distributed deployment with Docker and message queuing
  • Production monitoring with automated alerting
  • Scalable architecture supporting multiple concurrent analyses

🔗 Complete Implementation Repository


Ready for production deployment with comprehensive error handling, monitoring, and scalability built-in.