Pipeline Automation¶
This guide covers batch case processing through the automated extraction pipeline.
Overview¶
Pipeline automation enables:
- Batch extraction across multiple cases
- Background processing via Celery tasks
- Real-time progress monitoring
- Queue management for bulk operations
Prerequisites¶
Pipeline automation requires:
| Service | Port | Purpose |
|---|---|---|
| Redis | 6379 | Message broker |
| Celery Worker | - | Background tasks |
| ProEthica | 5000 | Web interface |
| OntServe MCP | 8082 | Ontology validation |
Starting Services¶
Option 1: Single Command (Recommended)¶
./scripts/start_all.sh start
This starts all required services in correct order.
Option 2: Manual Start¶
Terminal 1 - Redis:
redis-server
Terminal 2 - Celery Worker:
cd /path/to/proethica
source venv-proethica/bin/activate
PYTHONPATH=/path/to/parent:$PYTHONPATH celery -A celery_config worker --loglevel=info
Terminal 3 - Flask:
cd /path/to/proethica
source venv-proethica/bin/activate
PYTHONPATH=/path/to/parent:$PYTHONPATH python run.py
Service Management¶
./scripts/start_all.sh status # Check status
./scripts/start_all.sh stop # Stop services
./scripts/start_all.sh restart # Restart all
Pipeline Dashboard¶
Accessing Dashboard¶
Navigate to: Tools > Pipeline Dashboard (requires admin login)
Direct URL: /pipeline/dashboard

Dashboard Layout¶
| Section | Description |
|---|---|
| Service Status | Health of Redis, Celery, Workers (top right) |
| Stats Cards | Total Cases, Active Runs, Queued count |
| Recent Runs | History with status, duration, and actions |
| Controls | Manage Queue, Reprocess, Details buttons |
Processing Time¶
Typical extraction time per case:
| Metric | Time |
|---|---|
| Average | ~10 minutes |
| Range | 8-15 minutes depending on case complexity |
| Steps | 5 extraction passes (Steps 1, 1b, 2, 2b, 3) |
Processing is currently sequential (one case at a time). Parallel processing across multiple workers is planned for future releases to reduce batch processing time.
Service Status¶
Status Widget¶
The header widget shows real-time status:
| Indicator | Meaning |
|---|---|
| Redis | Message broker connection |
| Celery | Worker process status |
| Workers | Active worker count |
| Tasks | Queue depth |
Overall Health¶
| Status | Description |
|---|---|
| Healthy | All services operational |
| Degraded | Some services unavailable |
| Critical | Core services offline |
Status API¶
Programmatic access:
curl http://localhost:5000/pipeline/api/service-status
Queue Management¶
Accessing the Queue¶
Navigate to: Pipeline Dashboard > Manage Queue button
Or direct URL: /pipeline/queue
Queue management page showing pending cases and queue controls
Queue Page Features¶
| Section | Description |
|---|---|
| Available Cases | Cases not yet processed, ready to add to queue |
| Current Queue | Cases waiting to be processed |
| Queue Controls | Add selected cases, clear queue, process next |
Adding Cases to Queue¶
- Check cases in the Available Cases list
- Click Add Selected to Queue
- Cases move to Current Queue section
Queue Actions¶
| Action | Description |
|---|---|
| Add Selected | Add checked cases to queue |
| Clear Queue | Remove all pending cases from queue |
| Process Next | Start processing the next queued case |
| Remove | Remove individual case from queue |
Queue Statuses¶
| Status | Meaning |
|---|---|
| queued | Waiting in queue |
| running | Currently processing |
| completed | Successfully finished |
| failed | Error occurred |
| cancelled | User cancelled |
| superseded | Replaced by reprocess |
Running Pipelines¶
Single Case¶
To process a single case:
- Go to case detail page
- Click Queue for Processing
- Monitor in Pipeline Dashboard
Batch Processing¶
To process multiple cases:
- Go to Pipeline Dashboard
- Select cases from list
- Click Add to Queue
- Monitor progress
Priority Processing¶
Cases processed in queue order. For priority:
- Use direct extraction (non-automated)
- Or reorder queue manually
Monitoring Progress¶
Real-Time Updates¶
Active runs show:
- Current step (1, 1b, 2, 2b, 3)
- Entity count extracted
- Progress percentage
- Estimated time remaining
Refresh¶
Dashboard auto-refreshes every 5 seconds. Manual refresh also available.
Logs¶
View detailed logs:
- Click run in Active Runs
- Expand log section
- View extraction output
Pipeline Controls¶
Cancel Running¶
To cancel a running pipeline:
- Click Cancel on active run
- Confirms cancellation
- Celery task revoked
- Run marked cancelled
Reprocess Case¶
To reprocess a completed case:
- Click Reprocess button
- Clears existing entities
- Marks old run as superseded
- Starts fresh extraction
Clear Case¶
To clear without reprocessing:
python scripts/clear_case_extractions.py <case_id>
Options:
- --include-runs: Also clear pipeline_run records
Pipeline Architecture¶
Extraction Flow¶
Queue Request
↓
Celery Task Created
↓
Step 1: Facts (Roles, States, Resources)
↓
Step 1b: Discussion (Roles, States, Resources)
↓
Step 2: Facts (Principles, Obligations, Constraints, Capabilities)
↓
Step 2b: Discussion (Principles, Obligations, Constraints, Capabilities)
↓
Step 3: Temporal (Actions, Events, Causal Relationships)
↓
Complete
Entity Storage¶
During pipeline:
- Entities stored in
temporary_rdf_storage - Linked by
extraction_session_id - Prompts stored in
extraction_prompts
Pipeline Run Record¶
Each run creates pipeline_run record:
| Field | Description |
|---|---|
id |
Run identifier |
case_id |
Case being processed |
status |
Current status |
current_step |
Active step |
entity_count |
Entities extracted |
started_at |
Start timestamp |
completed_at |
End timestamp |
error_message |
Error if failed |
Celery Configuration¶
Task Settings¶
From celery_config.py:
task_time_limit = 7200 # 2 hour hard limit
task_soft_time_limit = 6000 # 100 minute soft limit
worker_prefetch_multiplier = 1 # One task at a time
Queue Name¶
Default queue: celery
Result Backend¶
Results stored in Redis DB 1.
Error Handling¶
Task Failures¶
If task fails:
- Error captured in pipeline_run
- Partial entities preserved
- Can retry or reprocess
Timeout¶
If task times out:
- Soft limit triggers graceful stop
- Hard limit forces termination
- Partial progress saved
Recovery¶
After failure:
- Review error message
- Fix underlying issue
- Reprocess case
Utility Scripts¶
Clear Case Extractions¶
python scripts/clear_case_extractions.py <case_id> [--include-runs]
Cleanup Orphaned Entities¶
python scripts/cleanup_orphaned_entities.py [--delete]
Check Queue¶
celery -A celery_config inspect active
celery -A celery_config inspect reserved
Troubleshooting¶
Services Not Starting¶
Check each service:
redis-cli ping # Should return PONG
celery -A celery_config status # Should show workers
curl http://localhost:5000/ # Should return page
Tasks Not Processing¶
- Check Celery worker running
- Verify Redis connection
- Check worker logs for errors
Memory Issues¶
Long-running tasks may use memory:
- Monitor worker memory
- Restart workers periodically
- Process fewer cases at once
Orphaned Entities¶
If entities without sessions:
python scripts/cleanup_orphaned_entities.py --delete
Related Guides¶
- Running Extractions - Manual extraction
- Entity Review - Reviewing results
- Settings - Configuration options