Skip to main content
reader-py is the official Python SDK for the Reader API. It wraps the HTTP contract, parses responses into Pydantic models, polls async jobs to completion, raises typed exceptions, and retries transient failures.

Installation

pip install reader-py
Current version: 0.2.0. Requires Python 3.9+.

Quick start

import os
from reader_py import ReaderClient

client = ReaderClient(api_key=os.environ["READER_KEY"])

result = client.read(url="https://example.com")
if result.kind == "scrape":
    print(result.data.markdown)
    print(f"scraped in {result.data.metadata.duration}ms")

Async client

import asyncio
from reader_py import AsyncReaderClient

async def main():
    async with AsyncReaderClient(api_key=os.environ["READER_KEY"]) as client:
        result = await client.read(url="https://example.com")
        if result.kind == "scrape":
            print(result.data.markdown)

asyncio.run(main())
Every method on ReaderClient has an awaitable equivalent on AsyncReaderClient.

Configuration

client = ReaderClient(
    api_key="rdr_your_key",              # required
    base_url="https://api.reader.dev",   # optional, override for self-hosted
    timeout=60,                          # per-request timeout in seconds (default 60)
)

Scraping

Single URL, synchronous

Single-URL requests return immediately with ReadResult(kind="scrape", data=ScrapeResult).
result = client.read(
    url="https://example.com",
    formats=["markdown"],
    only_main_content=True,
)

if result.kind == "scrape":
    print(result.data.url)                         # canonical URL after redirects
    print(result.data.markdown)                    # clean markdown
    print(result.data.metadata.title)              # page title
    print(result.data.metadata.status_code)        # 200
    print(result.data.metadata.duration)           # ms
    print(result.data.metadata.cached)             # True if served from cache
    print(result.data.metadata.proxy_mode)         # "standard" | "stealth"
    print(result.data.metadata.proxy_escalated)    # True only if auto escalated

Multiple URLs (batch)

Passing urls creates an async job. The SDK auto-polls until the job terminates and returns ReadResult(kind="job", data=Job) with all results collected across pagination.
result = client.read(
    urls=[
        "https://example.com/page-1",
        "https://example.com/page-2",
        "https://example.com/page-3",
    ],
)

if result.kind == "job":
    print(f"completed {result.data.completed} / {result.data.total}")
    for page in result.data.results:
        if page.error:
            print(f"{page.url}: {page.error}")
        else:
            print(page.url, len(page.markdown or ""), "chars")

Crawl

Same shape as batch, but with max_depth or max_pages:
result = client.read(
    url="https://docs.example.com",
    max_depth=3,
    max_pages=100,
)

Proxy mode

Control how aggressively Reader bypasses bot walls with proxy_mode:
# Default: auto. Starts standard, escalates to stealth on block
client.read(url=url)

# Explicit: force the cheaper tier (error if blocked)
client.read(url=url, proxy_mode="standard")

# Explicit: force the bypass tier (3x credits but works on hostile sites)
client.read(url=url, proxy_mode="stealth")
See Proxy modes for the full picture.

Job management

The SDK’s read() method auto-polls batches and crawls, so most callers never need to touch job APIs directly. When you do:
# Fetch a single page of a job's results
job, has_more = client.get_job(job_id, skip=0, limit=20)

# Collect every page automatically
all_pages = client.get_all_job_results(job_id)

# Poll a job by ID until it terminates (collects all results on completion)
job = client.wait_for_job(job_id, poll_interval=2, timeout=300)

# Cancel a queued or processing job
client.cancel_job(job_id)

# Retry failed URLs in a completed job
retry_info = client.retry_job(job_id)
print(f"retrying {retry_info['retrying']} failed URLs")

Streaming

For real-time progress updates on a job, use client.stream(job_id), a generator that yields StreamEvent instances as the job makes progress.
for event in client.stream(job_id):
    if event.type == "progress":
        print(f"{event.completed} / {event.total}")
    elif event.type == "page":
        print("page done:", event.data.url)
    elif event.type == "error":
        print("page failed:", event.url, event.error)
    elif event.type == "done":
        print("job finished:", event.status)
        break
AsyncReaderClient.stream() returns an async generator. Use async for with it.

Credits

credits = client.get_credits()
print(f"{credits.balance} / {credits.limit}, tier: {credits.tier}")
print(f"resets at: {credits.reset_at}")

if credits.balance < 100:
    # Warn, pause workers, upgrade tier, etc.
    pass

Error handling

Every error response from the API is parsed into a specific ReaderApiError subclass. Catch the specific class rather than checking HTTP status codes.
from reader_py import (
    ReaderApiError,
    InvalidRequestError,
    UnauthenticatedError,
    InsufficientCreditsError,
    UrlBlockedError,
    NotFoundError,
    ConflictError,
    RateLimitedError,
    ConcurrencyLimitedError,
    InternalServerError,
    UpstreamUnavailableError,
    ScrapeTimeoutError,
)

try:
    result = client.read(url=url)
except InsufficientCreditsError as err:
    print(f"Need {err.required} credits, have {err.available}")
    print(f"Resets at {err.reset_at}")
except RateLimitedError as err:
    print(f"Rate limited. Retry after {err.retry_after_seconds}s")
except UrlBlockedError as err:
    print(f"URL blocked: {err.reason}")
except ScrapeTimeoutError as err:
    print(f"Scrape exceeded {err.timeout_ms}ms")
except ReaderApiError as err:
    # Catch-all for known API errors
    print(f"[{err.code}] {err}")
    print(f"Docs: {err.docs_url}")
    print(f"Request ID: {err.request_id}")
Every error has:
  • code: one of 11 stable codes (e.g. "insufficient_credits", "rate_limited")
  • http_status: the HTTP status code
  • details: dict with error-specific fields
  • docs_url: deep link to the error’s documentation
  • request_id: the x-request-id header from the response, for support tickets
The full catalog is at Errors.

Backwards compatibility

ReaderError is re-exported as an alias for ReaderApiError so code written against the 0.1 SDK continues to work. New code should use ReaderApiError directly.

Automatic retries

The SDK retries these codes automatically with exponential backoff before raising: rate_limited (honors Retry-After), concurrency_limited, internal_error, upstream_unavailable, scrape_timeout. All other codes raise immediately.

Webhooks per request

Every read() call can include an inline webhook config that fires on job lifecycle events, useful for fire-and-forget batches.
client.read(
    urls=many_urls,
    webhook={
        "url": "https://your-app.example.com/hooks/reader",
        "events": ["job.completed", "job.failed"],
        "secret": os.environ["READER_WEBHOOK_SECRET"],
    },
)
See Webhooks for the full delivery contract and signature verification.

Types

All public types are re-exported from the package root:
from reader_py import (
    ReadParams,
    ReadResult,      # kind: "scrape" | "job", data: ScrapeResult | Job
    ScrapeResult,
    ScrapeMetadata,
    Job,
    Page,
    Credits,
    UsageEntry,
    Pagination,
)
All models are Pydantic BaseModel subclasses with snake_case field names. The SDK internally translates to/from the API’s camelCase.

Next