Skip to main content

Common Patterns

Practical Patterns

This guide covers recurring patterns you will encounter across the API: pagination strategies, large file uploads via presigned URLs, AI job polling, and error handling. These patterns apply to all resources described in the previous guides.

Pagination

The API uses two different pagination strategies depending on the resource type.

Page-Based Pagination

Used by: Providers, Collections, Files in Collections

These endpoints accept page (0-based) and pageSize parameters. The response includes totalPages, totalElements, and hasNext to help navigate through results.

from pydantic import BaseModel, Field
from typing import TypeVar, Generic

T = TypeVar("T")

class PageResponse(BaseModel, Generic[T]):
data: list[T]
total_pages: int = Field(alias="totalPages")
total_elements: int = Field(alias="totalElements")
has_next: bool = Field(alias="hasNext")

def fetch_all_pages(endpoint: str, params: dict | None = None) -> list[dict]:
all_items: list[dict] = []
page = 0
base_params = params or {}

while True:
response = client.get(
endpoint,
params={**base_params, "page": page, "pageSize": 50},
)
response.raise_for_status()
body = response.json()

all_items.extend(body["data"])

if not body["hasNext"]:
break
page += 1

return all_items

all_collections = fetch_all_pages("/collections")
print(f"Total collections: {len(all_collections)}")

Cursor-Based Pagination

Used by: Provider Items

The items endpoint returns a nextCursor token instead of page numbers. This is more efficient for browsing large directory trees where the total count is not known upfront.

def fetch_all_items(provider_id: str, key: str = "") -> list[dict]:
all_items: list[dict] = []
cursor = None

while True:
params: dict = {"key": key, "limit": 50}
if cursor:
params["cursor"] = cursor

response = client.get(f"/providers/{provider_id}/items", params=params)
response.raise_for_status()
body = response.json()

all_items.extend(body["items"])

if not body.get("nextCursor"):
break
cursor = body["nextCursor"]

return all_items

Large File Upload with Presigned URLs

For large files, uploading directly through the API (multipart) may be slow or impractical. The presigned URL flow lets you upload directly to the storage backend (S3, Azure, GCS), bypassing the API server.

The flow consists of three steps:

Step 1: Request a Presigned Upload URL

curl -X POST "https://<api-domain>/api/core/providers/${PROVIDER_ID}/items/upload-url" \
-H "Authorization: Bearer <your-pat>" \
-H "Content-Type: application/json" \
-d '{
"key": "reports/large-dataset.csv",
"size": 104857600,
"mediaType": "text/csv"
}'
Provider Support

Not all providers support presigned uploads. When supported is false, fall back to multipart upload via the standard upload endpoint.

Step 2: Upload to the Presigned URL

Upload the file directly to the storage backend using an HTTP PUT. This request does not go through the Biolevate API.

curl -X PUT "${PRESIGNED_URL}" \
-H "Content-Type: text/csv" \
--data-binary @large-dataset.csv

Step 3: Confirm the Upload

After the file is uploaded to the storage backend, confirm the upload with the API so that the file appears in the provider's item listing.

curl -X POST "https://<api-domain>/api/core/providers/${PROVIDER_ID}/items/confirm" \
-H "Authorization: Bearer <your-pat>" \
-H "Content-Type: application/json" \
-d '{"key": "reports/large-dataset.csv"}'

Complete Presigned Upload Helper

Here is a reusable function that handles the full presigned upload flow with fallback to multipart.

from pathlib import Path

def upload_file(
provider_id: str,
file_path: Path,
target_folder_key: str = "",
media_type: str = "application/octet-stream",
) -> ProviderItem:
"""Upload a file using presigned URL if supported, with multipart fallback."""
file_size = file_path.stat().st_size
file_key = f"{target_folder_key}{file_path.name}"

url_response = client.post(
f"/providers/{provider_id}/items/upload-url",
json={
"key": file_key,
"size": file_size,
"mediaType": media_type,
},
)
url_response.raise_for_status()
upload_info = UploadUrlResponse.model_validate(url_response.json())

if upload_info.supported and upload_info.url:
with file_path.open("rb") as f:
put_resp = httpx.put(
upload_info.url,
content=f,
headers={"Content-Type": media_type},
)
put_resp.raise_for_status()

confirm_resp = client.post(
f"/providers/{provider_id}/items/confirm",
json={"key": file_key},
)
confirm_resp.raise_for_status()
return ProviderItem.model_validate(confirm_resp.json())

with file_path.open("rb") as f:
resp = client.post(
f"/providers/{provider_id}/items",
params={"key": target_folder_key},
files={"file": (file_path.name, f, media_type)},
)
resp.raise_for_status()
return ProviderItem.model_validate(resp.json())

AI Job Polling

Question Answering and Extraction jobs run asynchronously. After creating a job, you must poll until status is SUCCESS or FAILED. The pattern is identical for both job types — only the base path differs (/qa/jobs vs /extraction/jobs).

import time
from pydantic import BaseModel, Field

class Job(BaseModel):
job_id: str = Field(alias="jobId")
status: str
error_message: str | None = Field(default=None, alias="errorMessage")

TERMINAL_STATUSES = {"SUCCESS", "FAILED", "ABORTED"}

def poll_job(base_path: str, job_id: str, poll_interval: float = 3.0) -> Job:
"""Poll a QA or Extraction job until it reaches a terminal status."""
while True:
response = client.get(f"/{base_path}/jobs/{job_id}")
response.raise_for_status()
job = Job.model_validate(response.json())

if job.status in TERMINAL_STATUSES:
return job

time.sleep(poll_interval)

# Usage
qa_job = poll_job("qa", job_id)
if qa_job.status != "SUCCESS":
raise RuntimeError(f"QA job failed: {qa_job.error_message}")

extraction_job = poll_job("extraction", job_id)
if extraction_job.status != "SUCCESS":
raise RuntimeError(f"Extraction job failed: {extraction_job.error_message}")
Recommended poll interval

Start with a 3-second interval. For large document sets, jobs can take tens of seconds; exponential backoff (3s → 6s → 12s, capped at 30s) avoids unnecessary requests without adding significant latency.

Error Handling

The API returns standard HTTP status codes with JSON error bodies when something goes wrong.

Common Error Codes

Status CodeMeaningCommon Causes
400Bad RequestMissing required field, unsupported file extension, invalid parameter
401UnauthorizedMissing or expired PAT
403ForbiddenInsufficient permissions on the resource
404Not FoundResource does not exist or has been deleted

Handling Errors in Code

import httpx

try:
response = client.get(f"/files/{file_id}")
response.raise_for_status()
except httpx.HTTPStatusError as exc:
match exc.response.status_code:
case 401:
print("Authentication failed. Check your PAT.")
case 403:
print("You do not have permission to access this resource.")
case 404:
print("Resource not found.")
case _:
print(f"API error {exc.response.status_code}: {exc.response.text}")

End-to-End Workflow

Here is a complete example that demonstrates the full workflow: browse a provider, upload a file, index it, add it to a collection, and run a QA job on it.

import httpx
from pathlib import Path
from pydantic import BaseModel, Field

client = httpx.Client(
base_url="https://<api-domain>/api/core",
headers={"Authorization": "Bearer <your-pat>"},
)

providers = client.get("/providers").json()["data"]
provider_id = providers[0]["id"]["id"]
print(f"Using provider: {providers[0]['name']}")

file_path = Path("research-paper.pdf")
with file_path.open("rb") as f:
upload_resp = client.post(
f"/providers/{provider_id}/items",
params={"key": ""},
files={"file": (file_path.name, f, "application/pdf")},
)
upload_resp.raise_for_status()
print(f"Uploaded: {file_path.name}")

file_resp = client.post(
"/files",
json={
"providerId": provider_id,
"key": file_path.name,
},
)
file_resp.raise_for_status()
file_id = file_resp.json()["id"]["id"]
print(f"EliseFile created: {file_id}")

coll_resp = client.post(
"/collections",
json={"name": "My Research", "description": "Research papers"},
)
coll_resp.raise_for_status()
collection_id = coll_resp.json()["id"]["id"]

client.post(
f"/collections/{collection_id}/files",
json={"fileId": file_id},
).raise_for_status()
print(f"File added to collection: {collection_id}")

# Run a QA job on the indexed file
qa_resp = client.post(
"/qa/jobs",
json={
"files": {"fileIds": [file_id], "collectionIds": []},
"questions": [
{
"id": "q1",
"question": "What is the main conclusion of this document?",
"answerType": {"dataType": "STRING", "multiValued": False},
}
],
},
)
qa_resp.raise_for_status()
qa_job_id = qa_resp.json()["jobId"]

qa_job = poll_job("qa", qa_job_id)
if qa_job.status == "SUCCESS":
results = client.get(f"/qa/jobs/{qa_job_id}/results").json()["results"]
print(f"Answer: {results[0]['rawValue']}")