Skip to main content

Extraction

The extraction resource submits and manages AI metadata extraction jobs. It is available via client.extraction.

import asyncio
from biolevate import BiolevateClient, MetaInput

async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
job = await client.extraction.create_job(
metas=[
MetaInput(
meta="document_title",
answer_type={"dataType": "STRING", "multiValued": False},
description="The full title of the document",
)
],
file_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
)
print(f"Job created: {job.job_id} (status: {job.status})")

asyncio.run(main())

Methods

create_job()

Submit a new extraction job. Define the metadata fields to extract and target documents by file_ids, collection_ids, or both.

from biolevate import MetaInput

job = await client.extraction.create_job(
metas=[
MetaInput(
meta="document_title",
answer_type={"dataType": "STRING", "multiValued": False},
description="The full title of the document",
),
MetaInput(
meta="study_year",
answer_type={"dataType": "INT", "multiValued": False},
description="Year the study was conducted or published",
),
MetaInput(
meta="risk_level",
answer_type={
"dataType": "ENUM",
"multiValued": False,
"enumValues": ["LOW", "MEDIUM", "HIGH"],
},
description="Assessed risk level",
),
],
file_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
)
ParameterTypeDefaultDescription
metaslist[MetaInput]Metadata fields to extract
file_idslist[str] | NoneNoneUUIDs of individual EliseFiles to process
collection_idslist[str] | NoneNoneUUIDs of Collections to process

Returns Job

Raises AuthenticationError, APIError


get_job()

Get the current status of an extraction job. Poll until status is SUCCESS or FAILED.

job = await client.extraction.get_job(job_id)
print(f"Status: {job.status}")
ParameterTypeDescription
job_idstrUUID of the extraction job

Returns Job

Raises NotFoundError, AuthenticationError, APIError


list_jobs()

List all extraction jobs for the current user.

page = await client.extraction.list_jobs(page=0, page_size=20)
for job in page.data:
print(f"{job.job_id}: {job.status}")
ParameterTypeDefaultDescription
pageint0Page number (0-based)
page_sizeint20Number of jobs per page
sort_propertystr | NoneNoneField to sort by
sort_orderstr | NoneNone"ASC" or "DESC"

Returns JobPage

Raises AuthenticationError, APIError


get_job_outputs()

Retrieve the extracted values from a completed extraction job.

outputs = await client.extraction.get_job_outputs(job_id)
for result in outputs.results:
print(f"{result.meta}: {result.raw_value}")
if result.explanation:
print(f" {result.explanation}")
ParameterTypeDescription
job_idstrUUID of the extraction job

Returns ExtractJobOutputs

Raises NotFoundError, AuthenticationError, APIError


get_job_inputs()

Retrieve the original field definitions and file targets submitted to the job.

inputs = await client.extraction.get_job_inputs(job_id)
print(f"Fields: {[m.meta for m in inputs.metas]}")
ParameterTypeDescription
job_idstrUUID of the extraction job

Returns ExtractJobInputs

Raises NotFoundError, AuthenticationError, APIError


get_job_annotations()

Retrieve source annotations for all results — document passages the AI used as evidence for each extracted value.

annotations = await client.extraction.get_job_annotations(job_id)
annotation_map = {a.id.id: a for a in annotations}
ParameterTypeDescription
job_idstrUUID of the extraction job

Returns list[Annotation]

Raises NotFoundError, AuthenticationError, APIError

Full Example — Create, Poll, and Retrieve Results

import asyncio
from biolevate import BiolevateClient, MetaInput

TERMINAL_STATUSES = {"SUCCESS", "FAILED", "ABORTED"}

async def main():
async with BiolevateClient(
base_url="https://<api-domain>",
token="<your-pat>",
) as client:
job = await client.extraction.create_job(
metas=[
MetaInput(
meta="document_title",
answer_type={"dataType": "STRING", "multiValued": False},
description="The full title of the document",
),
MetaInput(
meta="study_year",
answer_type={"dataType": "INT", "multiValued": False},
description="Year of publication",
),
],
file_ids=["a1b2c3d4-e5f6-7890-abcd-ef1234567890"],
)
print(f"Job created: {job.job_id}")

while True:
job = await client.extraction.get_job(job.job_id)
print(f"Status: {job.status}")
if job.status in TERMINAL_STATUSES:
break
await asyncio.sleep(3)

if job.status == "SUCCESS":
outputs = await client.extraction.get_job_outputs(job.job_id)
for result in outputs.results:
print(f"{result.meta}: {result.raw_value}")

annotations = await client.extraction.get_job_annotations(job.job_id)
annotation_map = {a.id.id: a for a in annotations}
for result in outputs.results:
for ref in result.reference_ids:
ann = annotation_map.get(ref.id)
if ann and ann.data:
print(f" Source: [{ann.data.document_name}] {ann.data.content[:80]}")

asyncio.run(main())

Next Steps