Create a Pipeline
Wire plugins, ontologies, and credentials into a complete document processing workflow with scheduled execution.
A pipeline connects your plugins, ontology, and credentials into a complete document processing workflow. This guide walks you through creating, executing, and monitoring a pipeline end to end.
Prerequisites
Before creating a pipeline, ensure you have:
- A registered Source plugin (to ingest documents).
- A registered Classification plugin (to assign document types).
- A registered Extraction plugin (to extract structured data).
- A registered Ontology (to define the fields for extraction).
- Stored credentials (if your Source plugin requires authentication).
You can list your registered plugins, ontologies, and credentials at any time using the GET endpoints described below.
Gather Your Components
First, identify the IDs of the resources you will wire together.
List your plugins
curl -X GET "https://api.bizsupply.com/v1/plugins" \
-H "Authorization: Bearer YOUR_API_KEY"{
"data": [
{ "id": "plg_src_imap", "name": "imap-source", "type": "source" },
{ "id": "plg_cls_invoice", "name": "invoice-classifier", "type": "classification" },
{ "id": "plg_ext_invoice", "name": "invoice-extractor", "type": "extraction" },
{ "id": "plg_agg_spend", "name": "spend-aggregator", "type": "aggregation" }
],
"meta": { "total": 4 }
}List your ontologies
curl -X GET "https://api.bizsupply.com/v1/ontologies" \
-H "Authorization: Bearer YOUR_API_KEY"{
"data": [
{ "id": "ont_invoice_v2", "taxonomy": "invoice", "field_count": 7 }
],
"meta": { "total": 1 }
}List your credentials
curl -X GET "https://api.bizsupply.com/v1/credentials" \
-H "Authorization: Bearer YOUR_API_KEY"{
"data": [
{ "id": "cred_imap_ap", "name": "Accounts Payable Inbox", "type": "imap" }
],
"meta": { "total": 1 }
}Create the Pipeline
Use the POST /v1/pipelines endpoint to create a new pipeline:
curl -X POST "https://api.bizsupply.com/v1/pipelines" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "Invoice Processing Pipeline",
"description": "Ingests invoices from email, classifies, and extracts structured data.",
"source_plugin": "plg_src_imap",
"classification_plugin": "plg_cls_invoice",
"extraction_plugin": "plg_ext_invoice",
"aggregation_plugin": "plg_agg_spend",
"ontology_id": "ont_invoice_v2",
"credentials": ["cred_imap_ap"],
"config": {
"plg_cls_invoice": {
"confidence_threshold": 0.85
},
"plg_ext_invoice": {
"max_content_length": 10000,
"include_line_items": true
}
}
}'{
"id": "pip_a1b2c3d4",
"name": "Invoice Processing Pipeline",
"status": "active",
"source_plugin": "plg_src_imap",
"classification_plugin": "plg_cls_invoice",
"extraction_plugin": "plg_ext_invoice",
"aggregation_plugin": "plg_agg_spend",
"ontology_id": "ont_invoice_v2",
"created_at": "2026-01-20T14: 00: 00Z"
}Execute the Pipeline
Trigger a pipeline execution to create a new job:
curl -X POST "https://api.bizsupply.com/v1/pipelines/pip_a1b2c3d4/execute" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"options": {
"max_documents": 100,
"skip_duplicates": true
}
}'{
"job_id": "job_x9y8z7",
"pipeline_id": "pip_a1b2c3d4",
"status": "queued",
"created_at": "2026-01-20T14: 05: 00Z"
}Monitor the Job
Poll the job status endpoint to track progress:
curl -X GET "https://api.bizsupply.com/v1/jobs/job_x9y8z7" \
-H "Authorization: Bearer YOUR_API_KEY"{
"id": "job_x9y8z7",
"pipeline_id": "pip_a1b2c3d4",
"status": "running",
"progress": {
"total_documents": 47,
"processed": 32,
"succeeded": 30,
"failed": 2,
"pending": 15
},
"started_at": "2026-01-20T14: 05: 02Z",
"updated_at": "2026-01-20T14: 06: 15Z"
}The job transitions through these states: queued, running, completed, failed, partial, or cancelled.
For long-running jobs, poll every 5-10 seconds. The progress object updates in real time as documents are processed.
Retrieve Processed Documents
Once the job completes, retrieve the processed documents with their extracted data:
curl -X GET "https://api.bizsupply.com/v1/jobs/job_x9y8z7/documents" \
-H "Authorization: Bearer YOUR_API_KEY"{
"data": [
{
"id": "doc_inv_001",
"filename": "invoice-2026-001.pdf",
"document_type": "invoice",
"status": "completed",
"fields": {
"vendor_name": "Acme Corp",
"invoice_number": "INV-2026-001",
"invoice_date": "2026-01-15",
"due_date": "2026-02-14",
"total_amount": 2450.00,
"currency": "USD",
"line_items": [
{ "description": "Consulting Services", "quantity": 10, "unit_price": 200, "amount": 2000 },
{ "description": "Travel Expenses", "quantity": 1, "unit_price": 450, "amount": 450 }
]
}
}
],
"meta": { "total": 45, "page": 1, "per_page": 20 }
}Plugin Execution Order
Plugins execute in a fixed order within every pipeline. Understanding this order is important for debugging and for designing plugins that depend on previous stages.
- Source plugin — fetch_documents() is called once per job. Returns a list of raw documents.
- Classification plugin — classify() is called once per document. Each document is classified independently.
- Extraction plugin — extract() is called once per classified document. The ontology fields are passed based on the classification result.
- Aggregation plugin — aggregate() is called once per job after all documents are extracted. Receives the full list of processed documents.
If a document fails classification, it is not passed to the extraction stage. If extraction fails for a document, it is excluded from aggregation but other documents proceed normally.
Pre-conditions
You can define pre-conditions in the pipeline configuration to filter documents between stages. Pre-conditions are evaluated after classification and before extraction.
"color:#5c6370;font-style:italic"># In the pipeline config, add pre-conditions:
"color:#e06c75">preconditions:
"color:#e06c75">extraction:
"color:#e06c75">document_types:
- invoice
- purchase_order
"color:#e06c75">min_content_length: 100
"color:#e06c75">max_file_size_mb: 50Documents that do not meet the pre-conditions are skipped (status set to "skipped") and do not count as failures.
Direct Execution (Without a Pipeline)
For ad-hoc processing, you can execute a plugin directly on a document without creating a full pipeline. This is useful for testing and one-off tasks.
curl -X POST "https://api.bizsupply.com/v1/plugins/plg_cls_invoice/execute" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"document_id": "doc_test_001"
}'{
"document_id": "doc_test_001",
"plugin_id": "plg_cls_invoice",
"result": "invoice",
"execution_time_ms": 1240
}Pipeline Management
The API provides full CRUD operations for pipelines.
| Operation | Method | Endpoint | Description |
|---|---|---|---|
| List | GET | /v1/pipelines | List all pipelines with optional filters. |
| Get | GET | /v1/pipelines/:id | Retrieve a single pipeline by ID. |
| Create | POST | /v1/pipelines | Create a new pipeline. |
| Update | PUT | /v1/pipelines/:id | Update pipeline configuration, plugins, or schedule. |
| Delete | DELETE | /v1/pipelines/:id | Delete a pipeline (does not delete associated jobs or documents). |
| Execute | POST | /v1/pipelines/:id/execute | Trigger a new job for this pipeline. |
| Pause | POST | /v1/pipelines/:id/pause | Pause a scheduled pipeline. |
| Resume | POST | /v1/pipelines/:id/resume | Resume a paused pipeline. |
Scheduled Execution
Pipelines can run automatically on a schedule using cron expressions. Set the schedule field when creating or updating a pipeline.
curl -X PUT "https://api.bizsupply.com/v1/pipelines/pip_a1b2c3d4" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"schedule": "0 */6 * * *"
}'Common schedule patterns:
| Schedule | Cron Expression | Description |
|---|---|---|
| Every 6 hours | 0 */6 * * * | Runs at midnight, 6 AM, noon, and 6 PM. |
| Daily at 8 AM | 0 8 * * * | Runs once per day at 8:00 AM UTC. |
| Weekdays at 9 AM | 0 9 * * 1-5 | Monday through Friday at 9:00 AM UTC. |
| Every hour | 0 * * * * | Runs at the top of every hour. |
Common Issues
| Issue | Cause | Resolution |
|---|---|---|
| Job stuck in "queued" | No workers available to process the job. | Check platform status. Jobs are picked up in FIFO order. Free-tier accounts have lower worker priority. |
| All documents fail classification | Classification plugin returning invalid types or the LLM prompt is too vague. | Test the Classification plugin individually using direct execution. Check the LLM prompt and valid type list. |
| Extraction returns empty fields | Ontology field names do not match what the LLM returns, or content is too short. | Verify ontology field names and descriptions. Increase max_content_length if documents are being truncated. |
| Credential errors | Stored credentials are expired or incorrect. | Update the credential via PUT /v1/credentials/:id. For OAuth2, refresh the token. |
| Duplicate documents | Source plugin fetching the same documents on repeated runs. | Enable skip_duplicates in execution options, or implement deduplication logic in your Source plugin. |
Best Practices
- Start with a small test run — use max_documents: 5 on your first execution to verify the pipeline works before processing a full batch.
- Enable skip_duplicates — prevents reprocessing documents that have already been ingested in previous runs.
- Use meaningful pipeline names — you will have many pipelines; clear names like "AP Invoice Processing (IMAP)" are better than "Pipeline 1".
- Monitor job progress — do not assume jobs complete instantly. Always poll for status or use webhooks.
- Set up scheduled pipelines — for recurring ingestion (e.g., email inboxes), use cron schedules instead of manual execution.
- Keep plugin configs in the pipeline — override plugin defaults at the pipeline level rather than modifying plugin code for different use cases.