Sistema de Colas de Procesos (Background Job Processing)¶
Descripción General¶
El sistema de colas de procesos permite ejecutar tareas en segundo plano de manera concurrente y eficiente, sin bloquear la aplicación principal. Esto es esencial para operaciones de larga duración como:
- Cálculo de nóminas con cientos o miles de empleados
- Generación de reportes complejos
- Envío masivo de correos electrónicos
- Procesamiento de datos en batch
Arquitectura¶
El sistema implementa dos backends con selección automática:
1. Dramatiq + Redis (Producción/Alta Escala)¶
Características: - Alta performance y escalabilidad horizontal - Workers multi-hilo y multi-proceso - Distribución de carga entre múltiples servidores - Reintentos automáticos con backoff exponencial
Requisitos:
- Redis disponible y accesible
- Variable de entorno REDIS_URL configurada
2. Huey + Filesystem (Desarrollo/Fallback)¶
Características: - Sin dependencias externas (no requiere Redis ni base de datos) - Persistencia en archivos del sistema - Thread-safe para ejecución concurrente local - Ideal para entornos pequeños o desarrollo
Requisitos:
- Permisos de lectura/escritura en el directorio de cola
- Variable de entorno COATI_QUEUE_PATH (opcional)
Selección Automática de Backend¶
El sistema selecciona automáticamente el backend apropiado:
- Si
REDIS_URLexiste y Redis responde → Usa Dramatiq - En caso contrario → Usa Huey con filesystem
from coati_payroll.queue import get_queue_driver
# Obtiene el driver apropiado automáticamente
queue = get_queue_driver()
# O fuerza un backend específico
queue = get_queue_driver(force_backend="huey")
Uso Básico¶
Registrar una Tarea¶
from coati_payroll.queue import get_queue_driver
queue = get_queue_driver()
def calculate_something(employee_id: str, value: int) -> dict:
# Lógica de cálculo
result = value * 2
return {"employee_id": employee_id, "result": result}
# Registrar la tarea
calculate_task = queue.register_task(
calculate_something,
name="calculate_employee",
max_retries=3,
min_backoff=15000, # 15 segundos
max_backoff=3600000, # 1 hora
)
Encolar una Tarea¶
# Encolar inmediatamente
task_id = queue.enqueue("calculate_employee", employee_id="123", value=100)
# Encolar con delay (5 minutos)
task_id = queue.enqueue("calculate_employee", employee_id="456", value=200, delay=300)
Obtener Feedback del Proceso¶
El sistema permite obtener feedback de las tareas encoladas:
# Obtener resultado de una tarea individual
result = queue.get_task_result(task_id)
print(f"Status: {result['status']}")
# Obtener feedback de múltiples tareas (x de y completadas)
task_ids = [task1, task2, task3, task4, task5]
bulk_results = queue.get_bulk_results(task_ids)
print(f"Completadas: {bulk_results['completed']} de {bulk_results['total']}")
print(f"Progreso: {bulk_results['progress_percentage']}%")
Caso de Uso: Procesamiento Paralelo de Nómina¶
Opción 1: Procesar todos los empleados en paralelo¶
from coati_payroll.queue.tasks import process_payroll_parallel_task
# Encolar procesamiento paralelo
result = process_payroll_parallel_task.send(
planilla_id="planilla_123",
periodo_inicio="2024-01-01",
periodo_fin="2024-01-15",
usuario="admin"
)
# Esto encolará una tarea por cada empleado activo
# Las tareas se procesan concurrentemente por los workers
Opción 2: Procesar empleados individuales¶
from coati_payroll.queue.tasks import calculate_employee_payroll_task
# Encolar cálculo para un empleado específico
task_id = calculate_employee_payroll_task.send(
empleado_id="emp_456",
planilla_id="planilla_123",
periodo_inicio="2024-01-01",
periodo_fin="2024-01-15",
usuario="admin"
)
# Obtener resultado cuando esté listo
result = queue.get_task_result(task_id)
if result['status'] == 'completed':
print(f"Salario neto: {result['result']['salario_neto']}")
Configuración¶
Variables de Entorno¶
# Backend principal (Dramatiq)
REDIS_URL=redis://localhost:6379/0
# Configuración de cola
QUEUE_ENABLED=1 # 0 para deshabilitar el sistema de colas
COATI_QUEUE_PATH=/var/lib/coati/queue # Para Huey filesystem (opcional)
Permisos de Filesystem (Huey)¶
Cuando se usa Huey con backend filesystem, el sistema verifica automáticamente los permisos de lectura/escritura:
# El sistema intentará estos directorios en orden:
# 1. /var/lib/coati/queue
# 2. ~/.local/share/coati-payroll/queue
# 3. ./.coati_queue (directorio actual)
# 4. /tmp/coati_queue (fallback)
# Para configurar un directorio personalizado:
export COATI_QUEUE_PATH=/ruta/personalizada/queue
Importante: Asegúrese de que el usuario que ejecuta la aplicación tenga permisos de lectura y escritura en el directorio seleccionado.
Ejecución de Workers¶
Dramatiq Workers¶
# Worker básico
dramatiq coati_payroll.queue.tasks
# Worker con configuración personalizada
dramatiq coati_payroll.queue.tasks \
--threads 8 \
--processes 2 \
--queues default,priority \
--verbose
# Worker en producción (systemd service)
dramatiq coati_payroll.queue.tasks --threads 8 --processes 4
Huey Workers¶
# Worker básico
huey_consumer coati_payroll.queue.drivers.huey_driver.huey
# Worker con múltiples workers
huey_consumer coati_payroll.queue.drivers.huey_driver.huey \
--workers 4 \
--verbose
# Worker en producción
huey_consumer coati_payroll.queue.drivers.huey_driver.huey \
--workers 8 \
--logfile /var/log/coati/huey.log
Monitoreo¶
Obtener Estadísticas de la Cola¶
from coati_payroll.queue import get_queue_driver
queue = get_queue_driver()
stats = queue.get_stats()
print(f"Driver: {stats['driver']}")
print(f"Backend: {stats['backend']}")
print(f"Tareas registradas: {stats['registered_tasks']}")
# Para Huey
if 'pending_tasks' in stats:
print(f"Tareas pendientes: {stats['pending_tasks']}")
# Para Dramatiq
if 'queues' in stats:
for queue_name, length in stats['queues'].items():
print(f"Cola {queue_name}: {length} mensajes")
Seguridad¶
Validación de IDs¶
Las tareas siempre validan los IDs antes de procesarlos:
def calculate_employee_payroll(empleado_id: str, planilla_id: str, ...):
# Validación automática por el ORM
empleado = db.session.get(Empleado, empleado_id)
if not empleado:
return {"success": False, "error": "Employee not found"}
planilla = db.session.get(Planilla, planilla_id)
if not planilla:
return {"success": False, "error": "Planilla not found"}
# Procesar solo si existen y son válidos
Permisos de Archivos¶
El sistema de Huey verifica automáticamente los permisos antes de inicializar:
- Crea directorios con permisos apropiados
- Valida capacidad de lectura/escritura
- Registra advertencias si usa directorios temporales
- Limpia archivos de prueba automáticamente
Ubicación de Archivos¶
Por seguridad, los archivos de cola de Huey se almacenan:
- Preferentemente en
/var/lib/coati/queue(fuera del directorio público) - Alternativamente en
~/.local/share/coati-payroll/queue(usuario actual) - Fallback en directorio temporal con advertencia
Escalabilidad¶
Dramatiq (Alta Escala)¶
- Escalado Horizontal: Agregar más workers conectados al mismo Redis
- Múltiples Procesos: Usar
--processespara aprovechar múltiples CPUs - Múltiples Threads: Usar
--threadspara I/O concurrente - Múltiples Colas: Separar tareas por prioridad
Ejemplo: 1000 empleados procesados en ~30 segundos con 4 workers de 8 threads
Huey (Escala Local)¶
- Múltiples Workers: Usar
--workerspara procesamiento concurrente - Thread-safe: Soporta ejecución multi-thread segura
- Limitación: Solo escala dentro de un servidor (no distribuido)
Ejemplo: 100 empleados procesados en ~10 segundos con 4 workers
Troubleshooting¶
Redis no disponible¶
WARNING: Failed to connect to Redis for Dramatiq: [Errno 111] Connection refused
INFO: Using Huey driver with filesystem backend
Solución: El sistema automáticamente cambia a Huey. Para usar Dramatiq, asegúrese de que Redis esté corriendo.
Permisos insuficientes (Huey)¶
ERROR: Insufficient permissions for queue storage at /var/lib/coati/queue
WARNING: Using temporary directory for queue storage: /tmp/coati_queue
Solución: Asegúrese de que el usuario tenga permisos en el directorio, o configure COATI_QUEUE_PATH a un directorio accesible.
Tareas no se procesan¶
- Verifique que los workers estén corriendo
- Revise los logs de los workers
- Verifique las estadísticas de la cola
- Para Dramatiq, verifique la conexión a Redis
- Para Huey, verifique permisos del directorio de cola
Ejemplos Completos¶
Ver /tests/test_queue.py para ejemplos completos de uso y testing.