Skip to content

Pipeline Automation

This guide covers batch case processing through the automated extraction pipeline.

Overview

Pipeline automation enables:

  • Batch extraction across multiple cases
  • Background processing via Celery tasks
  • Real-time progress monitoring
  • Queue management for bulk operations

Prerequisites

Pipeline automation requires:

Service Port Purpose
Redis 6379 Message broker
Celery Worker - Background tasks
ProEthica 5000 Web interface
OntServe MCP 8082 Ontology validation

Starting Services

./scripts/start_all.sh start

This starts all required services in correct order.

Option 2: Manual Start

Terminal 1 - Redis:

redis-server

Terminal 2 - Celery Worker:

cd /path/to/proethica
source venv-proethica/bin/activate
PYTHONPATH=/path/to/parent:$PYTHONPATH celery -A celery_config worker --loglevel=info

Terminal 3 - Flask:

cd /path/to/proethica
source venv-proethica/bin/activate
PYTHONPATH=/path/to/parent:$PYTHONPATH python run.py

Service Management

./scripts/start_all.sh status   # Check status
./scripts/start_all.sh stop     # Stop services
./scripts/start_all.sh restart  # Restart all

Pipeline Dashboard

Accessing Dashboard

Navigate to: Tools > Batch Processing (requires admin login)

Direct URL: /pipeline/dashboard

Pipeline Dashboard

Dashboard Layout

Section Description
Service Status Health of Redis, Celery, Workers (top right)
Stats Cards Total Cases, Active Runs, Queued count
Recent Runs History with status, duration, and actions
Controls Manage Queue, Reprocess, Details buttons

Processing Time

Typical extraction time per case:

Metric Time
Average ~15-20 minutes
Range 12-30 minutes depending on case complexity
Steps Steps 1-4, Reconcile, 2x OntServe commits, QC audit

Processing is sequential (one case at a time).

Service Status

Status Widget

The header widget shows real-time status:

Indicator Meaning
Redis Message broker connection
Celery Worker process status
Workers Active worker count
Tasks Queue depth

Overall Health

Status Description
Healthy All services operational
Degraded Some services unavailable
Critical Core services offline

Status API

Programmatic access:

curl http://localhost:5000/pipeline/api/service-status

Queue Management

Accessing the Queue

Navigate to: Tools > Batch Queue or click Manage Queue from the batch processing page.

Direct URL: /pipeline/queue

Pipeline Queue Queue management page showing pending cases and queue controls

Queue Page Features

Section Description
Available Cases Cases not yet processed, ready to add to queue
Current Queue Cases waiting to be processed
Queue Controls Add selected cases, clear queue, process next

Adding Cases to Queue

  1. Check cases in the Available Cases list
  2. Click Add Selected to Queue
  3. Cases move to Current Queue section

Queue Actions

Action Description
Add Selected Add checked cases to queue
Clear Queue Remove all pending cases from queue
Process Next Start processing the next queued case
Remove Remove individual case from queue

Queue Statuses

Status Meaning
queued Waiting in queue
running Currently processing
completed Successfully finished
failed Error occurred
cancelled User cancelled
superseded Replaced by reprocess

Running Pipelines

Single Case

To process a single case:

  1. Go to case detail page
  2. Click Queue for Processing
  3. Monitor in the batch processing dashboard

Batch Processing

To process multiple cases:

  1. Go to Tools > Batch Processing
  2. Select cases from list
  3. Click Add to Queue
  4. Monitor progress

Priority Processing

Cases processed in queue order. For priority:

  1. Use direct extraction (non-automated)
  2. Or reorder queue manually

Monitoring Progress

Real-Time Updates

Active runs show:

  • Current step (1 Facts, 1 Discussion, 2 Facts, 2 Discussion, 3, Reconcile, Commit, 4, Commit, QC)
  • Entity count extracted
  • Progress percentage
  • Estimated time remaining

Refresh

Dashboard auto-refreshes every 5 seconds. Manual refresh also available.

Logs

View detailed logs:

  1. Click run in Active Runs
  2. Expand log section
  3. View extraction output

Pipeline Controls

Cancel Running

To cancel a running pipeline:

  1. Click Cancel on active run
  2. Confirms cancellation
  3. Celery task revoked
  4. Run marked cancelled

Reprocess Case

To reprocess a completed case:

  1. Click Reprocess button
  2. Clears existing entities
  3. Marks old run as superseded
  4. Starts fresh extraction

Clear Case

To clear without reprocessing, use the Reprocess button on the batch processing page, which clears existing entities before re-running extraction.

Pipeline Architecture

Extraction Flow

Queue Request
    ↓
Celery Task Created
    ↓
Uncommit (clear any previous OntServe entities)
    ↓
Clear old entities from temporary storage
    ↓
Step 1: Facts (Roles → States || Resources)
    ↓
Step 1: Discussion (Roles → States || Resources)
    ↓
Step 2: Facts (Principles → Obligations → Constraints || Capabilities)
    ↓
Step 2: Discussion (same)
    ↓
Step 3: Unified temporal extraction (LangGraph)
    ↓
Reconcile: Entity deduplication
    ↓
OntServe Commit #1 (Steps 1-3 entities)
    ↓
Step 4: Whole-case synthesis (Phases 2A-2E, 3, 4)
    ↓
OntServe Commit #2 (Step 4 entities)
    ↓
QC Audit (V0-V9 checks across 17 entity types)
    ↓
Complete

Entity Storage

During pipeline:

  • Entities stored in temporary_rdf_storage
  • Linked by extraction_session_id
  • Prompts stored in extraction_prompts
  • Published flag set on OntServe commit

Pipeline Run Record

Each run creates pipeline_run record:

Field Description
id Run identifier
case_id Case being processed
status Current status
current_step Active step
entity_count Entities extracted
started_at Start timestamp
completed_at End timestamp
error_message Error if failed

Celery Configuration

Task Settings

From celery_config.py:

task_time_limit = 7200      # 2 hour hard limit
task_soft_time_limit = 6000 # 100 minute soft limit
worker_prefetch_multiplier = 1  # One task at a time

Queue Name

Default queue: celery

Result Backend

Results stored in Redis DB 1.

Error Handling

Task Failures

If task fails:

  1. Error captured in pipeline_run
  2. Partial entities preserved
  3. Can retry or reprocess

Timeout

If task times out:

  1. Soft limit triggers graceful stop
  2. Hard limit forces termination
  3. Partial progress saved

Recovery

After failure:

  1. Review error message
  2. Fix underlying issue
  3. Reprocess case

Utility Scripts

Check Queue

celery -A celery_config inspect active
celery -A celery_config inspect reserved

Troubleshooting

Services Not Starting

Check each service:

redis-cli ping  # Should return PONG
celery -A celery_config status  # Should show workers
curl http://localhost:5000/  # Should return page

Tasks Not Processing

  1. Check Celery worker running
  2. Verify Redis connection
  3. Check worker logs for errors

Memory Issues

Long-running tasks may use memory:

  1. Monitor worker memory
  2. Restart workers periodically
  3. Process fewer cases at once