Pipeline agéntico en Python puro (sin LangGraph) que encuentra proveedores nuevos cuando el catálogo interno no los tiene. 7 fuentes externas corren en paralelo, el scraper enriquece, el evaluador scorea con rúbrica explicable y el presentador arma la respuesta — todo streamea en vivo vía SSE.
Supervisor custom en Python (no LangGraph) que orquesta el pipeline secuencial. 7 fuentes externas paralelizadas con ThreadPoolExecutor(workers=7). Cada sub-agente emite eventos SSE al frontend.
agents/modulo2_busqueda/orquestador/supervisor.py. Ejecuta ejecutar_pipeline(): analizador → memoria → 7 fuentes paralelas → scraper → enriquecedor → evaluador → presentador. Controla hardgate $1.50 + emite eventos SSE. Sin routing LLM = cero overhead.De query en NL a top scoreado, con SSE streaming al frontend en cada paso.
catalogo_descubierto. Si hay hits con confianza > 0.85 y cubren la query, pipeline puede terminar temprano (ahorro).busquedas_log con costo_usd, duración, corte_hardgate. Rows nuevas en catalogo_descubierto si el usuario los guarda.Cada búsqueda progresa por estos estados. Emitidos vía SSE al frontend en vivo.
pendiente ─→ analizado ─→ fuentes_ejecutadas ─→ enriquecido ─→ evaluado ─→ presentado
│ │
↓ ↓
clarifier_espera corte_hardgate (costo_usd > $1.50)
│
└──→ devuelve parcial con banner
3 tablas per-tenant específicas de M2. Complementan las tablas M1 cuando un descubierto se convierte en proveedor.
| Tabla | Propósito | Campos clave |
|---|---|---|
catalogo_descubierto | Candidatos persistidos | rfc, nombre_empresa, telefono, email, direccion, estado, fuente, score, criterios_match |
busquedas_log | Log por query | query_nl, criterios, num_resultados, costo_usd, duracion_ms, corte_hardgate, usuario_id |
invitaciones_catalogo | Invites a onboarding | catalogo_id, proveedor_id, canal (wa/email), aceptada, fecha |
Endpoint público con token. Query SCIAN + coordenadas + radio. Cobertura completa MX.
Places API para geolocalización. Custom Search Engine para web snippets. Quotas pagadas.
Nominatim + Overpass. Sin costo. Rate-limit estricto. Cobertura pueblos pequeños.
Empresas corporativas con ficha pública. Filiales, adquisiciones, certificaciones.
SecciónAmarilla + Empresite. Scrape con Scrapling (anti-bot) con cascade a httpx+bs4.
Servicio dedicado puerto 8502 (gunicorn gevent). Eventos agent_start, agent_progress, agent_done, error.
Invitación desde catálogo crea proveedor en M1 con fuente='m2'. Flujo onboarding normal sigue.
Fetcher anti-bot. Primario para sitios de proveedores. Fallback httpx+BeautifulSoup si falla.
/api/busqueda/*POST /api/busqueda # dispara pipeline + devuelve search_id GET /api/busqueda/<id> # estado + resultados si terminó GET /api/busqueda/<id>/stream # SSE stream en vivo (puerto 8502) POST /api/busqueda/<id>/clarificar # responde slot-questions del Clarifier GET /api/catalogo # catalogo_descubierto con filtros POST /api/catalogo/<id>/invitar # dispara invitación WA/email (crea proveedor M1) DELETE /api/catalogo/<id> # borra del catálogo GET /api/busquedas # log histórico GET /api/busquedas/metricas # costo_usd 30d por usuario
Cost tracker acumula tokens + API calls. Si excede $1.50, levanta HardgateException y supervisor devuelve parcial.
URLs validadas antes de fetch. Bloquea IPs privadas, localhost, metadata cloud. Solo https + puertos 80/443.
Cada worker tiene timeout 8-15s según fuente. Si timeout, pipeline continúa con las otras. Sin cascada de falla.
TenantScope en cada request. Catálogos aislados por schema tenant_<slug>. Sin cross-contamination.
Max N búsquedas simultáneas por tenant. Evita burst que agoten quotas Google o IP bans en Scrapling.
Prompts evaluador + presentador no pueden ser overrideados por contenido scraped. Defensive prompting.