Common Patterns
This guide covers recurring patterns you will encounter across the API: pagination strategies, large file uploads via presigned URLs, AI job polling, and error handling. These patterns apply to all resources described in the previous guides.
Pagination
The API uses two different pagination strategies depending on the resource type.
Page-Based Pagination
Used by: Providers, Collections, Files in Collections
These endpoints accept page (0-based) and pageSize parameters. The response includes totalPages, totalElements, and hasNext to help navigate through results.
- Python (httpx)
- R
- SDK
from pydantic import BaseModel, Field
from typing import TypeVar, Generic
T = TypeVar("T")
class PageResponse(BaseModel, Generic[T]):
data: list[T]
total_pages: int = Field(alias="totalPages")
total_elements: int = Field(alias="totalElements")
has_next: bool = Field(alias="hasNext")
def fetch_all_pages(endpoint: str, params: dict | None = None) -> list[dict]:
all_items: list[dict] = []
page = 0
base_params = params or {}
while True:
response = client.get(
endpoint,
params={**base_params, "page": page, "pageSize": 50},
)
response.raise_for_status()
body = response.json()
all_items.extend(body["data"])
if not body["hasNext"]:
break
page += 1
return all_items
all_collections = fetch_all_pages("/collections")
print(f"Total collections: {len(all_collections)}")
fetch_all_pages <- function(endpoint, params = list()) {
all_items <- list()
page <- 0
repeat {
req <- base_req |>
req_url_path_append(endpoint) |>
req_url_query(!!!params, page = page, pageSize = 50)
resp <- req_perform(req)
body <- resp_body_json(resp)
all_items <- c(all_items, body$data)
if (!body$hasNext) break
page <- page + 1
}
all_items
}
all_collections <- fetch_all_pages("collections")
cat(sprintf("Total collections: %d\n", length(all_collections)))
import asyncio
from biolevate import BiolevateClient
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
all_collections = []
page = 0
while True:
result = await client.collections.list(page=page, page_size=50)
all_collections.extend(result.data)
if not result.has_next:
break
page += 1
print(f"Total collections: {len(all_collections)}")
asyncio.run(main())
Cursor-Based Pagination
Used by: Provider Items
The items endpoint returns a nextCursor token instead of page numbers. This is more efficient for browsing large directory trees where the total count is not known upfront.
- Python (httpx)
- R
- SDK
def fetch_all_items(provider_id: str, key: str = "") -> list[dict]:
all_items: list[dict] = []
cursor = None
while True:
params: dict = {"key": key, "limit": 50}
if cursor:
params["cursor"] = cursor
response = client.get(f"/providers/{provider_id}/items", params=params)
response.raise_for_status()
body = response.json()
all_items.extend(body["items"])
if not body.get("nextCursor"):
break
cursor = body["nextCursor"]
return all_items
fetch_all_items <- function(provider_id, key = "") {
all_items <- list()
cursor <- NULL
repeat {
req <- base_req |>
req_url_path_append("providers", provider_id, "items") |>
req_url_query(key = key, limit = 50)
if (!is.null(cursor)) {
req <- req |> req_url_query(cursor = cursor)
}
resp <- req_perform(req)
body <- resp_body_json(resp)
all_items <- c(all_items, body$items)
if (is.null(body$nextCursor)) break
cursor <- body$nextCursor
}
all_items
}
import asyncio
from biolevate import BiolevateClient
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
provider_id = "550e8400-e29b-41d4-a716-446655440000"
all_items = []
cursor = None
while True:
result = await client.items.list(provider_id, key="", cursor=cursor, limit=50)
all_items.extend(result.items)
if not result.next_cursor:
break
cursor = result.next_cursor
print(f"Total items: {len(all_items)}")
asyncio.run(main())
Large File Upload with Presigned URLs
For large files, uploading directly through the API (multipart) may be slow or impractical. The presigned URL flow lets you upload directly to the storage backend (S3, Azure, GCS), bypassing the API server.
The flow consists of three steps:
Step 1: Request a Presigned Upload URL
- cURL
- Python (httpx)
- R
- SDK
curl -X POST "https://<api-domain>/api/core/providers/${PROVIDER_ID}/items/upload-url" \
-H "Authorization: Bearer <your-pat>" \
-H "Content-Type: application/json" \
-d '{
"key": "reports/large-dataset.csv",
"size": 104857600,
"mediaType": "text/csv"
}'
from pydantic import BaseModel, Field
class UploadUrlResponse(BaseModel):
url: str | None = None
expires_in_seconds: int = Field(alias="expiresInSeconds")
supported: bool
response = client.post(
f"/providers/{provider_id}/items/upload-url",
json={
"key": "reports/large-dataset.csv",
"size": 104857600,
"mediaType": "text/csv",
},
)
response.raise_for_status()
upload_info = UploadUrlResponse.model_validate(response.json())
if not upload_info.supported:
print("Provider does not support presigned uploads. Use multipart upload instead.")
else:
print(f"Upload URL obtained (expires in {upload_info.expires_in_seconds}s)")
resp <- base_req |>
req_url_path_append("providers", provider_id, "items", "upload-url") |>
req_body_json(list(
key = "reports/large-dataset.csv",
size = 104857600,
mediaType = "text/csv"
)) |>
req_perform()
upload_info <- resp_body_json(resp)
if (!upload_info$supported) {
cat("Provider does not support presigned uploads. Use multipart upload instead.\n")
} else {
cat(sprintf("Upload URL obtained (expires in %ds)\n", upload_info$expiresInSeconds))
}
import asyncio
from biolevate import BiolevateClient
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
provider_id = "550e8400-e29b-41d4-a716-446655440000"
upload_info = await client.items.get_upload_url(
provider_id=provider_id,
key="reports/large-dataset.csv",
size=104857600,
media_type="text/csv",
)
if not upload_info.supported:
print("Provider does not support presigned uploads.")
else:
print(f"Upload URL obtained (expires in {upload_info.expires_in_seconds}s)")
asyncio.run(main())
Not all providers support presigned uploads. When supported is false, fall back to multipart upload via the standard upload endpoint.
Step 2: Upload to the Presigned URL
Upload the file directly to the storage backend using an HTTP PUT. This request does not go through the Biolevate API.
- cURL
- Python (httpx)
- R
- SDK
curl -X PUT "${PRESIGNED_URL}" \
-H "Content-Type: text/csv" \
--data-binary @large-dataset.csv
from pathlib import Path
file_path = Path("large-dataset.csv")
with file_path.open("rb") as f:
put_response = httpx.put(
upload_info.url,
content=f,
headers={"Content-Type": "text/csv"},
)
put_response.raise_for_status()
print("File uploaded to storage backend")
request(upload_info$url) |>
req_method("PUT") |>
req_body_file("large-dataset.csv", type = "text/csv") |>
req_perform()
cat("File uploaded to storage backend\n")
import asyncio
import httpx
async def main():
with open("large-dataset.csv", "rb") as f:
file_bytes = f.read()
async with httpx.AsyncClient() as http:
response = await http.put(
upload_info.url,
content=file_bytes,
headers={"Content-Type": "text/csv"},
)
response.raise_for_status()
print("File uploaded to storage backend")
asyncio.run(main())
Step 3: Confirm the Upload
After the file is uploaded to the storage backend, confirm the upload with the API so that the file appears in the provider's item listing.
- cURL
- Python (httpx)
- R
- SDK
curl -X POST "https://<api-domain>/api/core/providers/${PROVIDER_ID}/items/confirm" \
-H "Authorization: Bearer <your-pat>" \
-H "Content-Type: application/json" \
-d '{"key": "reports/large-dataset.csv"}'
response = client.post(
f"/providers/{provider_id}/items/confirm",
json={"key": "reports/large-dataset.csv"},
)
response.raise_for_status()
confirmed = ProviderItem.model_validate(response.json())
print(f"Confirmed: {confirmed.key}")
resp <- base_req |>
req_url_path_append("providers", provider_id, "items", "confirm") |>
req_body_json(list(key = "reports/large-dataset.csv")) |>
req_perform()
confirmed <- resp_body_json(resp)
cat(sprintf("Confirmed: %s\n", confirmed$key))
import asyncio
from biolevate import BiolevateClient
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
provider_id = "550e8400-e29b-41d4-a716-446655440000"
item = await client.items.confirm_upload(
provider_id=provider_id,
key="reports/large-dataset.csv",
)
print(f"Confirmed: {item.key}")
asyncio.run(main())
Complete Presigned Upload Helper
Here is a reusable function that handles the full presigned upload flow with fallback to multipart.
- Python (httpx)
- R
- SDK
from pathlib import Path
def upload_file(
provider_id: str,
file_path: Path,
target_folder_key: str = "",
media_type: str = "application/octet-stream",
) -> ProviderItem:
"""Upload a file using presigned URL if supported, with multipart fallback."""
file_size = file_path.stat().st_size
file_key = f"{target_folder_key}{file_path.name}"
url_response = client.post(
f"/providers/{provider_id}/items/upload-url",
json={
"key": file_key,
"size": file_size,
"mediaType": media_type,
},
)
url_response.raise_for_status()
upload_info = UploadUrlResponse.model_validate(url_response.json())
if upload_info.supported and upload_info.url:
with file_path.open("rb") as f:
put_resp = httpx.put(
upload_info.url,
content=f,
headers={"Content-Type": media_type},
)
put_resp.raise_for_status()
confirm_resp = client.post(
f"/providers/{provider_id}/items/confirm",
json={"key": file_key},
)
confirm_resp.raise_for_status()
return ProviderItem.model_validate(confirm_resp.json())
with file_path.open("rb") as f:
resp = client.post(
f"/providers/{provider_id}/items",
params={"key": target_folder_key},
files={"file": (file_path.name, f, media_type)},
)
resp.raise_for_status()
return ProviderItem.model_validate(resp.json())
upload_file <- function(provider_id, file_path, target_folder_key = "", media_type = "application/octet-stream") {
file_size <- file.info(file_path)$size
file_key <- paste0(target_folder_key, basename(file_path))
url_resp <- base_req |>
req_url_path_append("providers", provider_id, "items", "upload-url") |>
req_body_json(list(
key = file_key,
size = file_size,
mediaType = media_type
)) |>
req_perform()
upload_info <- resp_body_json(url_resp)
if (upload_info$supported && !is.null(upload_info$url)) {
request(upload_info$url) |>
req_method("PUT") |>
req_body_file(file_path, type = media_type) |>
req_perform()
confirm_resp <- base_req |>
req_url_path_append("providers", provider_id, "items", "confirm") |>
req_body_json(list(key = file_key)) |>
req_perform()
return(resp_body_json(confirm_resp))
}
resp <- base_req |>
req_url_path_append("providers", provider_id, "items") |>
req_url_query(key = target_folder_key) |>
req_body_multipart(file = curl::form_file(file_path, media_type)) |>
req_perform()
resp_body_json(resp)
}
import asyncio
import io
import httpx
from biolevate import BiolevateClient
async def upload_file(
client: BiolevateClient,
provider_id: str,
file_path: str,
target_folder_key: str = "",
media_type: str = "application/octet-stream",
):
with open(file_path, "rb") as f:
file_bytes = f.read()
file_name = file_path.split("/")[-1]
file_key = f"{target_folder_key}{file_name}"
upload_info = await client.items.get_upload_url(
provider_id=provider_id,
key=file_key,
size=len(file_bytes),
media_type=media_type,
)
if upload_info.supported and upload_info.url:
async with httpx.AsyncClient() as http:
response = await http.put(
upload_info.url,
content=file_bytes,
headers={"Content-Type": media_type},
)
response.raise_for_status()
return await client.items.confirm_upload(provider_id, key=file_key)
return await client.items.upload(
provider_id=provider_id,
key=f"{target_folder_key}",
file=io.BytesIO(file_bytes),
file_name=file_name,
mime_type=media_type,
)
AI Job Polling
Question Answering and Extraction jobs run asynchronously. After creating a job, you must poll until status is SUCCESS or FAILED. The pattern is identical for both job types — only the base path differs (/qa/jobs vs /extraction/jobs).
- Python (httpx)
- R
- SDK
import time
from pydantic import BaseModel, Field
class Job(BaseModel):
job_id: str = Field(alias="jobId")
status: str
error_message: str | None = Field(default=None, alias="errorMessage")
TERMINAL_STATUSES = {"SUCCESS", "FAILED", "ABORTED"}
def poll_job(base_path: str, job_id: str, poll_interval: float = 3.0) -> Job:
"""Poll a QA or Extraction job until it reaches a terminal status."""
while True:
response = client.get(f"/{base_path}/jobs/{job_id}")
response.raise_for_status()
job = Job.model_validate(response.json())
if job.status in TERMINAL_STATUSES:
return job
time.sleep(poll_interval)
# Usage
qa_job = poll_job("qa", job_id)
if qa_job.status != "SUCCESS":
raise RuntimeError(f"QA job failed: {qa_job.error_message}")
extraction_job = poll_job("extraction", job_id)
if extraction_job.status != "SUCCESS":
raise RuntimeError(f"Extraction job failed: {extraction_job.error_message}")
TERMINAL_STATUSES <- c("SUCCESS", "FAILED", "ABORTED")
poll_job <- function(base_path, job_id, poll_interval = 3) {
repeat {
resp <- base_req |>
req_url_path_append(base_path, "jobs", job_id) |>
req_perform()
job <- resp_body_json(resp)
if (job$status %in% TERMINAL_STATUSES) {
return(job)
}
Sys.sleep(poll_interval)
}
}
# Usage
qa_job <- poll_job("qa", qa_job_id)
if (qa_job$status != "SUCCESS") stop(paste("QA job failed:", qa_job$errorMessage))
extraction_job <- poll_job("extraction", extraction_job_id)
if (extraction_job$status != "SUCCESS") stop(paste("Extraction job failed:", extraction_job$errorMessage))
import asyncio
from biolevate import BiolevateClient, QuestionInput
TERMINAL_STATUSES = {"SUCCESS", "FAILED", "ABORTED"}
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
job = await client.qa.create_job(
questions=[
QuestionInput(
id="q1",
question="What is the main conclusion?",
answer_type={"dataType": "STRING", "multiValued": False},
)
],
file_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
)
while True:
job = await client.qa.get_job(job.job_id)
if job.status in TERMINAL_STATUSES:
break
await asyncio.sleep(3)
if job.status != "SUCCESS":
raise RuntimeError(f"QA job failed: {job.error_message}")
asyncio.run(main())
Start with a 3-second interval. For large document sets, jobs can take tens of seconds; exponential backoff (3s → 6s → 12s, capped at 30s) avoids unnecessary requests without adding significant latency.
Error Handling
The API returns standard HTTP status codes with JSON error bodies when something goes wrong.
Common Error Codes
| Status Code | Meaning | Common Causes |
|---|---|---|
400 | Bad Request | Missing required field, unsupported file extension, invalid parameter |
401 | Unauthorized | Missing or expired PAT |
403 | Forbidden | Insufficient permissions on the resource |
404 | Not Found | Resource does not exist or has been deleted |
Handling Errors in Code
- Python (httpx)
- R
- SDK
import httpx
try:
response = client.get(f"/files/{file_id}")
response.raise_for_status()
except httpx.HTTPStatusError as exc:
match exc.response.status_code:
case 401:
print("Authentication failed. Check your PAT.")
case 403:
print("You do not have permission to access this resource.")
case 404:
print("Resource not found.")
case _:
print(f"API error {exc.response.status_code}: {exc.response.text}")
tryCatch(
{
resp <- base_req |>
req_url_path_append("files", file_id) |>
req_error(is_error = \(resp) FALSE) |>
req_perform()
status <- resp_status(resp)
if (status == 200) {
file_info <- resp_body_json(resp)
cat(sprintf("File: %s\n", file_info$name))
} else if (status == 401) {
cat("Authentication failed. Check your PAT.\n")
} else if (status == 403) {
cat("You do not have permission to access this resource.\n")
} else if (status == 404) {
cat("Resource not found.\n")
} else {
cat(sprintf("API error %d\n", status))
}
},
error = function(e) {
cat(sprintf("Request failed: %s\n", e$message))
}
)
import asyncio
from biolevate import BiolevateClient, NotFoundError, AuthenticationError, APIError
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
try:
file = await client.files.get("a1b2c3d4-e5f6-7890-abcd-ef1234567890")
print(f"File: {file.name}")
except NotFoundError:
print("Resource not found.")
except AuthenticationError:
print("Authentication failed. Check your PAT.")
except APIError as e:
print(f"API error {e.status_code}: {e.message}")
asyncio.run(main())
End-to-End Workflow
Here is a complete example that demonstrates the full workflow: browse a provider, upload a file, index it, add it to a collection, and run a QA job on it.
- Python (httpx)
- R
- SDK
import httpx
from pathlib import Path
from pydantic import BaseModel, Field
client = httpx.Client(
base_url="https://<api-domain>/api/core",
headers={"Authorization": "Bearer <your-pat>"},
)
providers = client.get("/providers").json()["data"]
provider_id = providers[0]["id"]["id"]
print(f"Using provider: {providers[0]['name']}")
file_path = Path("research-paper.pdf")
with file_path.open("rb") as f:
upload_resp = client.post(
f"/providers/{provider_id}/items",
params={"key": ""},
files={"file": (file_path.name, f, "application/pdf")},
)
upload_resp.raise_for_status()
print(f"Uploaded: {file_path.name}")
file_resp = client.post(
"/files",
json={
"providerId": provider_id,
"key": file_path.name,
},
)
file_resp.raise_for_status()
file_id = file_resp.json()["id"]["id"]
print(f"EliseFile created: {file_id}")
coll_resp = client.post(
"/collections",
json={"name": "My Research", "description": "Research papers"},
)
coll_resp.raise_for_status()
collection_id = coll_resp.json()["id"]["id"]
client.post(
f"/collections/{collection_id}/files",
json={"fileId": file_id},
).raise_for_status()
print(f"File added to collection: {collection_id}")
# Run a QA job on the indexed file
qa_resp = client.post(
"/qa/jobs",
json={
"files": {"fileIds": [file_id], "collectionIds": []},
"questions": [
{
"id": "q1",
"question": "What is the main conclusion of this document?",
"answerType": {"dataType": "STRING", "multiValued": False},
}
],
},
)
qa_resp.raise_for_status()
qa_job_id = qa_resp.json()["jobId"]
qa_job = poll_job("qa", qa_job_id)
if qa_job.status == "SUCCESS":
results = client.get(f"/qa/jobs/{qa_job_id}/results").json()["results"]
print(f"Answer: {results[0]['rawValue']}")
library(httr2)
base_req <- request("https://<api-domain>/api/core") |>
req_auth_bearer_token("<your-pat>")
providers <- base_req |>
req_url_path_append("providers") |>
req_perform() |>
resp_body_json()
provider_id <- providers$data[[1]]$id$id
cat(sprintf("Using provider: %s\n", providers$data[[1]]$name))
upload_resp <- base_req |>
req_url_path_append("providers", provider_id, "items") |>
req_url_query(key = "") |>
req_body_multipart(file = curl::form_file("research-paper.pdf", "application/pdf")) |>
req_perform()
cat("Uploaded: research-paper.pdf\n")
file_resp <- base_req |>
req_url_path_append("files") |>
req_body_json(list(
providerId = provider_id,
key = "research-paper.pdf"
)) |>
req_perform()
file_id <- resp_body_json(file_resp)$id$id
cat(sprintf("EliseFile created: %s\n", file_id))
coll_resp <- base_req |>
req_url_path_append("collections") |>
req_body_json(list(name = "My Research", description = "Research papers")) |>
req_perform()
collection_id <- resp_body_json(coll_resp)$id$id
base_req |>
req_url_path_append("collections", collection_id, "files") |>
req_body_json(list(fileId = file_id)) |>
req_perform()
cat(sprintf("File added to collection: %s\n", collection_id))
# Run a QA job on the indexed file
qa_resp <- base_req |>
req_url_path_append("qa", "jobs") |>
req_body_json(list(
files = list(fileIds = list(file_id), collectionIds = list()),
questions = list(
list(
id = "q1",
question = "What is the main conclusion of this document?",
answerType = list(dataType = "STRING", multiValued = FALSE)
)
)
)) |>
req_perform()
qa_job_id <- resp_body_json(qa_resp)$jobId
qa_job <- poll_job("qa", qa_job_id)
if (qa_job$status == "SUCCESS") {
results <- base_req |>
req_url_path_append("qa", "jobs", qa_job_id, "results") |>
req_perform() |>
resp_body_json()
cat(sprintf("Answer: %s\n", results$results[[1]]$rawValue))
}
import asyncio
from biolevate import BiolevateClient, QuestionInput
TERMINAL_STATUSES = {"SUCCESS", "FAILED", "ABORTED"}
async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
providers = await client.providers.list()
provider_id = providers.data[0].id.id
print(f"Using provider: {providers.data[0].name}")
with open("research-paper.pdf", "rb") as f:
await client.items.upload(
provider_id=provider_id,
key="",
file=f,
file_name="research-paper.pdf",
mime_type="application/pdf",
)
print("Uploaded: research-paper.pdf")
file = await client.files.create(
provider_id=provider_id,
key="research-paper.pdf",
)
file_id = file.id.id
print(f"EliseFile created: {file_id}")
collection = await client.collections.create(
name="My Research",
description="Research papers",
)
collection_id = collection.id.id
await client.collections.add_file(collection_id, file_id)
print(f"File added to collection: {collection_id}")
job = await client.qa.create_job(
questions=[
QuestionInput(
id="q1",
question="What is the main conclusion of this document?",
answer_type={"dataType": "STRING", "multiValued": False},
)
],
file_ids=[file_id],
)
while True:
job = await client.qa.get_job(job.job_id)
if job.status in TERMINAL_STATUSES:
break
await asyncio.sleep(3)
if job.status == "SUCCESS":
outputs = await client.qa.get_job_outputs(job.job_id)
print(f"Answer: {outputs.results[0].raw_value}")
asyncio.run(main())