NOVATECS AGENTS · M2
Presentación técnica · Búsqueda Inteligente
← Volver a Agentes

Módulo 2 — Búsqueda Inteligente

Pipeline agéntico en Python puro (sin LangGraph) que encuentra proveedores nuevos cuando el catálogo interno no los tiene. 7 fuentes externas corren en paralelo, el scraper enriquece, el evaluador scorea con rúbrica explicable y el presentador arma la respuesta — todo streamea en vivo vía SSE.

1 supervisor Python + 15 sub-agentes ThreadPoolExecutor SSE streaming Hardgate $1.50 Multi-fuente Score explicable

Arquitectura — 1 supervisor + 15 sub-agentes

Supervisor custom en Python (no LangGraph) que orquesta el pipeline secuencial. 7 fuentes externas paralelizadas con ThreadPoolExecutor(workers=7). Cada sub-agente emite eventos SSE al frontend.

SUPERVISOR M2 (ORQUESTADOR)
Orquestador Python directo
Vive en agents/modulo2_busqueda/orquestador/supervisor.py. Ejecuta ejecutar_pipeline(): analizador → memoria → 7 fuentes paralelas → scraper → enriquecedor → evaluador → presentador. Controla hardgate $1.50 + emite eventos SSE. Sin routing LLM = cero overhead.
Python directoThreadPoolExecutorSSE emitter
1 · ANALIZADOR
NL → CriteriosBusqueda Pydantic
Convierte el query natural ("acero estructural en MTY con ISO 9001") a un objeto Pydantic: categoría, estado MX, num_resultados, certificaciones, criterios adicionales.
SonnetPydantic output
2 · CLARIFIER
Slot-questions si ambiguo
Si el query es vago ("proveedores"), sugiere preguntas al usuario ("¿qué categoría?", "¿qué estado?") con opciones pre-armadas. Evita búsquedas desperdiciadas y costosas.
Haikufew-shot
3 · MEMORIA
Lector de catalogo_descubierto
Busca primero en el catálogo interno del tenant. Si encuentra hits con alta confianza, puede skipear fuentes externas (ahorro de costo y latencia).
SQL directosin LLM
4 · DENUE
INEGI API oficial MX
Query a la API pública de DENUE (INEGI). Búsqueda por giro SCIAN + coordenadas + radio. Devuelve establecimientos con CURP, RFC, actividad oficial.
HaikuINEGI API
5 · GOOGLE MAPS
Places + CSE
Google Places API para geolocalización + Custom Search Engine para web snippets. Fallback a web_search si Places no tiene cobertura del giro.
HaikuPlaces + CSE
6 · OPENSTREETMAP
Nominatim + Overpass
Búsqueda geográfica alternativa. Útil para cobertura de pueblos y zonas donde Google Maps falla. Sin quotas pero rate-limit estricto.
HaikuOverpass
7 · WIKIDATA
SPARQL para empresas grandes
Query SPARQL a Wikidata para empresas corporativas con ficha pública. Captura filiales, adquisiciones, certificaciones declaradas.
HaikuSPARQL
8 · DIRECTORIOS
SecciónAmarilla + Empresite
Scrape de 2 directorios MX estándar. Cubre huecos de DENUE para PYMEs locales. Scrapling con cascade fallback a httpx + BeautifulSoup.
HaikuScrapling
9 · BUSCADOR_WEB
Búsqueda general complementaria
Web search genérico cuando los directorios específicos no devuelven suficiente. Extrae URLs de sitios propios del proveedor para paso siguiente.
HaikuCSE + DuckDuckGo
10 · SCRAPER
Enriquecer desde sitios propios
Visita sitios web de cada proveedor candidato. Extrae teléfono, email, dirección, certificaciones, productos. Scrapling primero (anti-bot), fallback a httpx.
PythonScrapling cascade
11 · REFINADOR
Dedup + normalizar
Mergea rows duplicadas (mismo RFC o mismo teléfono con fuentes distintas), normaliza direcciones y estados MX, deja un row canónico por proveedor.
Haikufuzzy match
12 · ENRIQUECEDOR_PRECIOS
Precios de referencia vivos
Si la categoría tiene commodity pricing (acero, cemento, etc), obtiene rangos de precio actuales web. Anexa al row del proveedor como contexto.
Haikuopcional
13 · EVALUADOR
Score explicable por rúbrica
Scorea cada candidato en 4 dimensiones: cobertura geográfica, evidencia de certificaciones, reputación online, fit con criterios. Devuelve score 0-100 + desglose por criterio.
Sonnetrúbrica prompt
14 · PRESENTADOR
Arma respuesta final
Ordena por score, genera resumen ejecutivo, arma tarjeta presentable para UI con CTA "invitar a onboarding" + "agregar a catálogo descubierto".
Sonnetmarkdown output

Flujo happy path — búsqueda completa

De query en NL a top scoreado, con SSE streaming al frontend en cada paso.

1
Comprador escribe query NL en UI
"Busco proveedor de acero estructural en Nuevo León con ISO 9001 y capacidad 50 ton/semana".
API: POST /api/busqueda
2
Analizador extrae Criterios
Pydantic: categoria='acero estructural', estado='NL', certificaciones=['ISO 9001'], capacidad_min=50, num_resultados=10.
analizador
3
Si ambiguo → Clarifier pregunta
Solo si faltan campos obligatorios. Pausa pipeline, UI muestra slot-questions, reanuda tras respuesta.
clarifier (opcional)
4
Memoria consulta catálogo interno
SQL directo a catalogo_descubierto. Si hay hits con confianza > 0.85 y cubren la query, pipeline puede terminar temprano (ahorro).
memoria
5
7 fuentes corren en paralelo
DENUE · Google Maps · OSM · Wikidata · Directorios · Web · (precios si aplica). ThreadPoolExecutor workers=7. Timeout por fuente. SSE event por cada fuente que termina.
pool paralelo
6
Scraper enriquece sitios propios
Para cada candidato con URL, visita, extrae contactos + productos. Scrapling con anti-bot; si falla, httpx puro.
scraper
7
Refinador dedup + normaliza
Mergea duplicados entre fuentes (mismo RFC/tel). Normaliza estados MX y formato direcciones. Output: lista canónica.
refinador
8
Evaluador scorea con rúbrica
4 dimensiones ponderadas. Sonnet explica por qué cada candidato recibió cada sub-score. Rúbrica visible al usuario.
evaluador
9
Presentador arma respuesta UI
Top N cards con score + explicación + CTA "invitar a onboarding M1" + "agregar a catálogo".
presentador
10
Log en busquedas_log + catalogo
Row en busquedas_log con costo_usd, duración, corte_hardgate. Rows nuevas en catalogo_descubierto si el usuario los guarda.
persistencia

Estados del pipeline

Cada búsqueda progresa por estos estados. Emitidos vía SSE al frontend en vivo.

pendiente
analizado
fuentes_ejecutadas
enriquecido
evaluado
presentado
corte_hardgate
error
pendiente ─→ analizado ─→ fuentes_ejecutadas ─→ enriquecido ─→ evaluado ─→ presentado
                  │                    │
                  ↓                    ↓
           clarifier_espera    corte_hardgate (costo_usd > $1.50)
                                        │
                                        └──→ devuelve parcial con banner

Modelo de datos

3 tablas per-tenant específicas de M2. Complementan las tablas M1 cuando un descubierto se convierte en proveedor.

TablaPropósitoCampos clave
catalogo_descubiertoCandidatos persistidosrfc, nombre_empresa, telefono, email, direccion, estado, fuente, score, criterios_match
busquedas_logLog por queryquery_nl, criterios, num_resultados, costo_usd, duracion_ms, corte_hardgate, usuario_id
invitaciones_catalogoInvites a onboardingcatalogo_id, proveedor_id, canal (wa/email), aceptada, fecha

Integraciones externas

← DENUE API (INEGI)

Endpoint público con token. Query SCIAN + coordenadas + radio. Cobertura completa MX.

← Google Places + CSE

Places API para geolocalización. Custom Search Engine para web snippets. Quotas pagadas.

← OpenStreetMap

Nominatim + Overpass. Sin costo. Rate-limit estricto. Cobertura pueblos pequeños.

← Wikidata SPARQL

Empresas corporativas con ficha pública. Filiales, adquisiciones, certificaciones.

← Directorios MX

SecciónAmarilla + Empresite. Scrape con Scrapling (anti-bot) con cascade a httpx+bs4.

→ SSE streaming

Servicio dedicado puerto 8502 (gunicorn gevent). Eventos agent_start, agent_progress, agent_done, error.

→ M1 Alta

Invitación desde catálogo crea proveedor en M1 con fuente='m2'. Flujo onboarding normal sigue.

→ Scrapling

Fetcher anti-bot. Primario para sitios de proveedores. Fallback httpx+BeautifulSoup si falla.

API — Blueprint /api/busqueda/*

POST   /api/busqueda                   # dispara pipeline + devuelve search_id
GET    /api/busqueda/<id>              # estado + resultados si terminó
GET    /api/busqueda/<id>/stream       # SSE stream en vivo (puerto 8502)
POST   /api/busqueda/<id>/clarificar   # responde slot-questions del Clarifier
GET    /api/catalogo                   # catalogo_descubierto con filtros
POST   /api/catalogo/<id>/invitar      # dispara invitación WA/email (crea proveedor M1)
DELETE /api/catalogo/<id>              # borra del catálogo
GET    /api/busquedas                  # log histórico
GET    /api/busquedas/metricas         # costo_usd 30d por usuario

Seguridad y costo

Hardgate $1.50 por búsqueda

Cost tracker acumula tokens + API calls. Si excede $1.50, levanta HardgateException y supervisor devuelve parcial.

SSRF guard en scraper

URLs validadas antes de fetch. Bloquea IPs privadas, localhost, metadata cloud. Solo https + puertos 80/443.

Timeout por fuente

Cada worker tiene timeout 8-15s según fuente. Si timeout, pipeline continúa con las otras. Sin cascada de falla.

Multi-tenant strict

TenantScope en cada request. Catálogos aislados por schema tenant_<slug>. Sin cross-contamination.

Rate limit por tenant

Max N búsquedas simultáneas por tenant. Evita burst que agoten quotas Google o IP bans en Scrapling.

Prompt injection A6

Prompts evaluador + presentador no pueden ser overrideados por contenido scraped. Defensive prompting.

Números del build

1+15
Supervisor + sub-agentes
7
Fuentes paralelas
$1.50
Hardgate por búsqueda
8502
Puerto SSE streaming
3
Tablas per-tenant M2
~15
Endpoints HTTP