Create a Source Plugin

Build a source plugin that ingests documents from external systems like email, cloud storage, APIs, and databases.

Last updated: 2026-04-06

Source plugins are the entry point of every pipeline. They connect to external systems, fetch documents, and pass them into the platform for classification and extraction. Source plugins handle credential management, state tracking, and incremental fetching automatically.


What Source Plugins Do

A source plugin connects to an external system (email inbox, cloud storage, API endpoint, FTP server, etc.), retrieves documents, and yields them as DocumentInput objects for the platform to ingest.

  • Ingest documents from any external system with credentials
  • Track state between runs to enable incremental fetching (only new documents)
  • Support multiple credential types (API keys, OAuth tokens, username/password)
  • Yield documents one at a time to support large batches without memory issues

How Source Plugins Work

The platform executes source plugins in a five-step cycle:

  1. Credentials — The Engine loads stored credentials and injects them into the plugin via self.credentials.
  2. State — The Engine loads the last saved state (e.g., last fetched timestamp) and injects it into the plugin via self.state.
  3. Fetch — The Engine calls fetch(). Your plugin connects to the external system, retrieves documents, and yields DocumentInput objects.
  4. Create — The Engine creates Document records in the platform for each yielded DocumentInput.
  5. Save State — After a successful run, the Engine persists the updated state for the next run.

Step 1 — Define the State Model

Source plugins track state between runs so they only fetch new documents. Define a state model by extending BaseSourceState.

state.pypython
from bizsupply_sdk import BaseSourceState
from datetime import datetime


class ImapSourceState(BaseSourceState):
    """Tracks the last fetched email UID for incremental fetching."""
    last_uid: int = 0
    last_run: datetime | None = None
    total_fetched: int = 0
💡Tip

State is automatically serialized and persisted by the platform after each successful run. You only need to update self.state in your fetch() method — the Engine handles the rest.


Step 2 — Write the Plugin Code

Implement your source plugin by extending SourcePlugin. You must define source_type, source_state_model, credential_fields, and the fetch() method.

imap_source/plugin.pypython
from bizsupply_sdk import (
    SourcePlugin, BaseSourceState, DocumentInput,
    DynamicCredential, PluginError,
)
from datetime import datetime
import imaplib
import email


class ImapSourceState(BaseSourceState):
    last_uid: int = 0
    last_run: datetime | None = None


class ImapSourcePlugin(SourcePlugin):
    """Fetches PDF and image attachments from an IMAP email inbox."""

    name = "imap-source"
    version = "1.0.0"
    description = "Ingests document attachments from email via IMAP."

    # Required: identifies the source type in the platform
    source_type = "imap"

    # Required: the state model class
    source_state_model = ImapSourceState

    # Required: declares what credentials this plugin needs
    credential_fields = [
        {"name": "host", "type": "string", "required": True},
        {"name": "port", "type": "integer", "required": True, "default": 993},
        {"name": "username", "type": "string", "required": True},
        {"name": "password", "type": "password", "required": True},
        {"name": "folder", "type": "string", "required": False, "default": "INBOX"},
    ]

    # Supported MIME types for attachments
    SUPPORTED_TYPES = {
        "application/pdf",
        "image/png", "image/jpeg", "image/tiff",
    }

    def fetch(self):
        """
        Connect to the IMAP server and yield new email attachments.

        Yields:
            DocumentInput objects for each document attachment.
        """
        cred = self.credentials  # DynamicCredential object
        state = self.state       # ImapSourceState object

        try:
            mail = imaplib.IMAP4_SSL(cred.host, cred.port)
            mail.login(cred.username, cred.password)
            mail.select(cred.folder or "INBOX")
        except Exception as e:
            raise PluginError(f"IMAP connection failed: {e}", retryable=True)

        # Search for emails newer than our last UID
        search_criteria = f"UID {state.last_uid + 1}:*"
        _, msg_ids = mail.uid("search", None, search_criteria)

        for uid in msg_ids[0].split():
            uid_int = int(uid)
            _, msg_data = mail.uid("fetch", uid, "(RFC822)")
            raw_email = msg_data[0][1]
            msg = email.message_from_bytes(raw_email)

            for part in msg.walk():
                content_type = part.get_content_type()
                filename = part.get_filename()

                if filename and content_type in self.SUPPORTED_TYPES:
                    self.log("info", f"Found attachment: {filename} (UID {uid_int})")

                    yield DocumentInput(
                        content=part.get_payload(decode=True),
                        filename=filename,
                        mime_type=content_type,
                        metadata={
                            "source": "imap",
                            "email_subject": msg.get("Subject", ""),
                            "email_from": msg.get("From", ""),
                            "email_date": msg.get("Date", ""),
                            "email_uid": uid_int,
                        },
                    )

            # Update state after processing each email
            state.last_uid = max(state.last_uid, uid_int)

        state.last_run = datetime.utcnow()
        mail.logout()

    def has_new_data(self) -> bool:
        """
        Optional: quick check if there are new documents to fetch.
        The Engine calls this before fetch() to avoid unnecessary runs.
        """
        cred = self.credentials
        try:
            mail = imaplib.IMAP4_SSL(cred.host, cred.port)
            mail.login(cred.username, cred.password)
            mail.select(cred.folder or "INBOX")
            _, msg_ids = mail.uid("search", None, f"UID {self.state.last_uid + 1}:*")
            mail.logout()
            return len(msg_ids[0].split()) > 0
        except Exception:
            return True  # Assume there's data if we can't check

Step 3 — Validate and Register

bash
="color:#5c6370;font-style:italic"># Validate
bizsupply validate ./plugin.py
="color:#5c6370;font-style:italic"># ✓ Plugin class found: ImapSourcePlugin
="color:#5c6370;font-style:italic"># ✓ Base class: SourcePlugin
="color:#5c6370;font-style:italic"># ✓ source_type defined: imap
="color:#5c6370;font-style:italic"># ✓ source_state_model defined: ImapSourceState
="color:#5c6370;font-style:italic"># ✓ credential_fields defined: 5 fields
="color:#5c6370;font-style:italic"># ✓ Required method implemented: fetch
="color:#5c6370;font-style:italic"># All checks passed.

="color:#5c6370;font-style:italic"># Register
curl -X POST https://api.bizsupply.com/v1/plugins \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "imap-source",
    "type": "source",
    "version": "1.0.0",
    "description": "Ingests document attachments from email via IMAP.",
    "module_path": "imap_source.plugin.ImapSourcePlugin"
  }'

Step 4 — Configure Credentials

After registering the plugin, configure credentials for each source instance. Credentials are encrypted at rest and injected into the plugin at runtime.

bash
curl -X POST https://api.bizsupply.com/v1/sources \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "plugin_id": "plg_a1b2c3d4",
    "name": "AP Inbox",
    "credentials": {
      "host": "imap.company.com",
      "port": 993,
      "username": "ap@company.com",
      "password": "your-app-password",
      "folder": "INBOX"
    }
  }'

Accessing Credentials

Credentials are available via self.credentials as a DynamicCredential object. Fields defined in credential_fields become attributes on this object.

python
# DynamicCredential provides type-safe access to credential fields
cred = self.credentials

# Access fields as attributes
host = cred.host          # str: "imap.company.com"
port = cred.port          # int: 993
username = cred.username  # str: "ap@company.com"
password = cred.password  # str: decrypted password

# Check if an optional field was provided
folder = cred.folder or "INBOX"

# For OAuth sources, tokens are refreshed automatically
token = cred.access_token  # Always a valid, non-expired token

State Management

The platform automatically saves your plugin state after each successful run. Update self.state during fetch() to track progress.

python
# State is loaded before fetch() is called
def fetch(self):
    state = self.state
    print(state.last_uid)    # Loaded from the previous run
    print(state.last_run)    # datetime or None if first run

    # Process documents...
    for doc in new_documents:
        yield DocumentInput(...)
        state.last_uid = doc.uid  # Update incrementally

    state.last_run = datetime.utcnow()
    # State is auto-saved after fetch() returns successfully
⚠️Warning

If fetch() raises an exception, state is NOT saved. This prevents partial state from causing missed documents on retry. Design your state updates to be safe for replay.


Document Metadata

Attach metadata to each DocumentInput to provide context for downstream classification and extraction plugins.

FieldTypeRequiredDescription
contentbytesYesRaw file content (PDF, image, etc.)
filenamestrYesOriginal or suggested filename
mime_typestrYesMIME type (application/pdf, image/png, etc.)
metadatadictNoArbitrary key-value pairs passed to downstream plugins

Common Mistakes

1. Not updating state incrementally

python
# WRONG — only updating state at the end; if fetch() fails
# midway, you'll re-fetch everything on the next run
def fetch(self):
    all_docs = self.get_all_documents()
    for doc in all_docs:
        yield DocumentInput(...)
    self.state.last_id = all_docs[-1].id  # Lost if exception

# CORRECT — update state as you go
def fetch(self):
    for doc in self.get_new_documents(since=self.state.last_id):
        yield DocumentInput(...)
        self.state.last_id = doc.id  # Saved per-item

2. Using synchronous HTTP clients for API sources

python
# WRONG — requests blocks the worker thread
import requests
resp = requests.get("https://api.example.com/docs")

# CORRECT — use the SDK's built-in HTTP helpers
data = self.http_get("https://api.example.com/docs", headers={"Authorization": f"Bearer {self.credentials.api_key}"})

3. Yielding documents with empty content

python
# WRONG — empty content causes ingestion failure
yield DocumentInput(content=b"", filename="doc.pdf", mime_type="application/pdf")

# CORRECT — skip empty documents
if content and len(content) > 0:
    yield DocumentInput(content=content, filename=filename, mime_type=mime_type)

4. Not handling pagination

python
# WRONG — only fetches the first page
resp = self.http_get(f"{api_url}/documents")
for doc in resp["items"]:
    yield DocumentInput(...)

# CORRECT — paginate through all results
next_url = f"{api_url}/documents?since={self.state.last_id}"
while next_url:
    resp = self.http_get(next_url)
    for doc in resp["items"]:
        yield DocumentInput(...)
        self.state.last_id = doc["id"]
    next_url = resp.get("next_page_url")

Example: Gmail OAuth Source

This example shows a source plugin that fetches attachments from Gmail using OAuth credentials. The platform handles token refresh automatically.

gmail_source/plugin.pypython
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput


class GmailSourceState(BaseSourceState):
    last_history_id: str = ""


class GmailSourcePlugin(SourcePlugin):
    name = "gmail-source"
    version = "1.0.0"
    source_type = "gmail"
    source_state_model = GmailSourceState

    credential_fields = [
        {"name": "access_token", "type": "oauth", "provider": "google", "required": True},
        {"name": "label", "type": "string", "required": False, "default": "INBOX"},
    ]

    def fetch(self):
        cred = self.credentials
        label = cred.label or "INBOX"
        headers = {"Authorization": f"Bearer {cred.access_token}"}

        # Get messages since last history ID
        params = f"labelIds={label}"
        if self.state.last_history_id:
            params += f"&startHistoryId={self.state.last_history_id}"

        messages = self.http_get(
            f"https://gmail.googleapis.com/gmail/v1/users/me/messages?{params}",
            headers=headers,
        )

        for msg_ref in messages.get("messages", []):
            msg = self.http_get(
                f"https://gmail.googleapis.com/gmail/v1/users/me/messages/{msg_ref['id']}",
                headers=headers,
            )

            for part in msg.get("payload", {}).get("parts", []):
                if part.get("filename") and part.get("body", {}).get("attachmentId"):
                    attachment = self.http_get(
                        f"https://gmail.googleapis.com/gmail/v1/users/me/messages/{msg_ref['id']}/attachments/{part['body']['attachmentId']}",
                        headers=headers,
                    )

                    yield DocumentInput(
                        content=self.base64_decode(attachment["data"]),
                        filename=part["filename"],
                        mime_type=part.get("mimeType", "application/octet-stream"),
                        metadata={"source": "gmail", "message_id": msg_ref["id"]},
                    )

        if messages.get("historyId"):
            self.state.last_history_id = messages["historyId"]

Example: Salesforce Source

This example fetches document attachments from Salesforce ContentDocument records.

salesforce_source/plugin.pypython
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput
from datetime import datetime


class SalesforceSourceState(BaseSourceState):
    last_modified: str = "1970-01-01T00:00:00Z"


class SalesforceSourcePlugin(SourcePlugin):
    name = "salesforce-source"
    version = "1.0.0"
    source_type = "salesforce"
    source_state_model = SalesforceSourceState

    credential_fields = [
        {"name": "instance_url", "type": "string", "required": True},
        {"name": "access_token", "type": "oauth", "provider": "salesforce", "required": True},
    ]

    def fetch(self):
        cred = self.credentials
        base_url = cred.instance_url.rstrip("/")
        headers = {"Authorization": f"Bearer {cred.access_token}"}

        query = (
            f"SELECT Id, Title, FileType, ContentSize, LastModifiedDate "
            f"FROM ContentDocument "
            f"WHERE LastModifiedDate > {self.state.last_modified} "
            f"ORDER BY LastModifiedDate ASC"
        )

        result = self.http_get(
            f"{base_url}/services/data/v59.0/query?q={query}",
            headers=headers,
        )

        for record in result.get("records", []):
            # Fetch the actual file content
            version_url = f"{base_url}/services/data/v59.0/sobjects/ContentDocument/{record['Id']}/LatestPublishedVersionId"
            content = self.http_get_binary(
                f"{base_url}/services/data/v59.0/sobjects/ContentVersion/{record['Id']}/VersionData",
                headers=headers,
            )

            yield DocumentInput(
                content=content,
                filename=f"{record['Title']}.{record['FileType'].lower()}",
                mime_type=self.guess_mime_type(record["FileType"]),
                metadata={
                    "source": "salesforce",
                    "salesforce_id": record["Id"],
                    "file_size": record["ContentSize"],
                },
            )

            self.state.last_modified = record["LastModifiedDate"]

Next Steps

  • Create a Classification Plugin to categorize the documents your source ingests.
  • Create a Pipeline to connect your source to classification and extraction stages.
  • Create a Prompt to define reusable LLM instruction templates for document analysis.
  • Read the Plugin Service API Reference for the full list of available service methods.