Skip to content

ogpu.events

Async event subscriptions. Six watch_* generators for the critical Nexus events, plus typed event dataclasses. This is the only async module in the SDK — every other package is synchronous. Users who don't need event streaming never touch async.

Each watcher is an async def ... -> AsyncIterator[Event] that polls for new logs via HTTP filter subscriptions (no WebSocket provider — works against any standard Ethereum JSON-RPC endpoint). Every yielded event is a frozen dataclass with a typed payload and the usual trio of block number, transaction hash, and log index.

Start with the events guide for async patterns and composition with the sync publishing/reading APIs.

Mixing sync and async

Publish a task synchronously, then await an async generator for events on that task:

from ogpu.client import publish_task, TaskInfo
from ogpu.events import watch_attempted

task = publish_task(TaskInfo(...))   # sync

async def wait_for_attempt():
    async for event in watch_attempted(task.address):
        return event

attempt = asyncio.run(wait_for_attempt())

Watchers

ogpu.events.watchers.watch_task_published async

watch_task_published(source_address: str, *, from_block: int | None = None, poll_interval: float = 2.0) -> AsyncIterator[TaskPublishedEvent]

Stream TaskPublished events scoped to a specific source.

Async generator that yields TaskPublishedEvent instances for every new task published to source_address. Starts at from_block (or current head if None) and polls the RPC for new logs every poll_interval seconds.

Since the Nexus events have no indexed parameters, filtering happens in Python after the fetch — the watcher reads every TaskPublished event and yields only the ones whose source matches source_address.

Parameters:

Name Type Description Default
source_address str

The source to watch for new tasks on.

required
from_block int | None

Start block. None means "latest" (only new tasks).

None
poll_interval float

Seconds between RPC polls. Raise for less chatter, lower for faster reaction.

2.0

Yields:

Type Description
AsyncIterator[TaskPublishedEvent]

TaskPublishedEvent instances as tasks are published.

Example
import asyncio
from ogpu.events import watch_task_published

async def monitor(source_addr: str):
    async for event in watch_task_published(source_addr):
        print(f"New task: {event.task}")

asyncio.run(monitor("0x..."))

ogpu.events.watchers.watch_attempted async

watch_attempted(task_address: str, *, from_block: int | None = None, poll_interval: float = 2.0) -> AsyncIterator[AttemptedEvent]

Stream Attempted events scoped to a specific task.

Yields an AttemptedEvent every time a provider calls Nexus.attempt on the given task. Useful for watching the provider-side lifecycle of a freshly-published task — the first yielded event tells you who picked it up.

Parameters:

Name Type Description Default
task_address str

The task to watch attempts on.

required
from_block int | None

Start block. None means "latest".

None
poll_interval float

Seconds between RPC polls.

2.0

Yields:

Type Description
AsyncIterator[AttemptedEvent]

AttemptedEvent instances as attempts land.

Example
async for event in watch_attempted(task.address):
    print(f"Attempt from {event.provider}, "
          f"payment={event.suggested_payment}")

ogpu.events.watchers.watch_response_submitted async

watch_response_submitted(task_address: str, *, from_block: int | None = None, poll_interval: float = 2.0) -> AsyncIterator[ResponseSubmittedEvent]

Stream ResponseSubmitted events scoped to a specific task.

Yields a ResponseSubmittedEvent every time a provider submits a response for the given task. Use this to know when a response contract has been deployed — the event carries the new Response address.

Parameters:

Name Type Description Default
task_address str

The task to watch for responses on.

required
from_block int | None

Start block. None means "latest".

None
poll_interval float

Seconds between RPC polls.

2.0

Yields:

Type Description
AsyncIterator[ResponseSubmittedEvent]

ResponseSubmittedEvent instances as responses are submitted.

Example
async for event in watch_response_submitted(task.address):
    response = Response(event.response)
    payload = response.fetch_data()
    print(payload)
    break

ogpu.events.watchers.watch_response_status_changed async

watch_response_status_changed(response_address: str, *, from_block: int | None = None, poll_interval: float = 2.0) -> AsyncIterator[ResponseStatusChangedEvent]

Stream ResponseStatusChanged events scoped to a specific response.

Yields ResponseStatusChangedEvent instances every time the scoped response transitions state (typically from SUBMITTED to CONFIRMED). The status field is decoded into the typed ResponseStatus enum.

Parameters:

Name Type Description Default
response_address str

The response contract to watch.

required
from_block int | None

Start block. None means "latest".

None
poll_interval float

Seconds between RPC polls.

2.0

Yields:

Type Description
AsyncIterator[ResponseStatusChangedEvent]

ResponseStatusChangedEvent instances on every state change.

ogpu.events.watchers.watch_task_status_changed async

watch_task_status_changed(task_address: str, *, from_block: int | None = None, poll_interval: float = 2.0) -> AsyncIterator[TaskStatusChangedEvent]

Stream TaskStatusChanged events scoped to a specific task.

Yields TaskStatusChangedEvent instances on every state transition of the given task. The status field is decoded into the typed TaskStatus enum. Useful for waiting on the full task lifecycle — the stream includes the transitions NEW → ATTEMPTED → RESPONDED → FINALIZED (or CANCELED / EXPIRED).

Parameters:

Name Type Description Default
task_address str

The task to watch.

required
from_block int | None

Start block. None means "latest".

None
poll_interval float

Seconds between RPC polls.

2.0

Yields:

Type Description
AsyncIterator[TaskStatusChangedEvent]

TaskStatusChangedEvent instances on every state change.

Example
async for event in watch_task_status_changed(task.address):
    if event.status == TaskStatus.FINALIZED:
        print("task done!")
        break

ogpu.events.watchers.watch_registered async

watch_registered(source_address: str, *, from_block: int | None = None, poll_interval: float = 2.0) -> AsyncIterator[RegisteredEvent]

Stream Registered events scoped to a specific source.

Yields a RegisteredEvent every time a provider calls Nexus.register on the given source. Useful for watching the growth of a source's registrant list in real time — e.g. a source owner's dashboard that reacts as new providers come online.

Parameters:

Name Type Description Default
source_address str

The source to watch for new registrations.

required
from_block int | None

Start block. None means "latest".

None
poll_interval float

Seconds between RPC polls.

2.0

Yields:

Type Description
AsyncIterator[RegisteredEvent]

RegisteredEvent instances on every registration.

Event dataclasses

ogpu.events.types.TaskPublishedEvent dataclass

TaskPublishedEvent(task: str, source: str, block_number: int, transaction_hash: str, log_index: int)

Yielded by watch_task_published when a new task is published to a source.

Attributes:

Name Type Description
task str

New Task contract address.

source str

Source the task was published to (matches the watcher's scoping address).

block_number int

Block the event was emitted in.

transaction_hash str

Tx hash (hex string) that emitted the event.

log_index int

Log index within the transaction receipt.

ogpu.events.types.AttemptedEvent dataclass

AttemptedEvent(task: str, provider: str, suggested_payment: int, block_number: int, transaction_hash: str, log_index: int)

Yielded by watch_attempted when a provider attempts a task.

Attributes:

Name Type Description
task str

Task being attempted (matches the watcher's scoping address).

provider str

Provider making the attempt.

suggested_payment int

Advisory payment amount in wei the provider expects.

block_number int

Block the event was emitted in.

transaction_hash str

Tx hash (hex string).

log_index int

Log index within the receipt.

ogpu.events.types.ResponseSubmittedEvent dataclass

ResponseSubmittedEvent(response: str, task: str, block_number: int, transaction_hash: str, log_index: int)

Yielded by watch_response_submitted when a provider submits a response.

Attributes:

Name Type Description
response str

Address of the newly-deployed Response contract.

task str

Task the response is for (matches the watcher's scoping address).

block_number int

Block the event was emitted in.

transaction_hash str

Tx hash (hex string).

log_index int

Log index within the receipt.

ogpu.events.types.ResponseStatusChangedEvent dataclass

ResponseStatusChangedEvent(response: str, status: ResponseStatus, block_number: int, transaction_hash: str, log_index: int)

Yielded by watch_response_status_changed when a response transitions state.

Attributes:

Name Type Description
response str

Response contract (matches the watcher's scoping address).

status ResponseStatus

Typed ResponseStatus (SUBMITTED or CONFIRMED) — decoded from the raw uint8 log field.

block_number int

Block the event was emitted in.

transaction_hash str

Tx hash (hex string).

log_index int

Log index within the receipt.

ogpu.events.types.TaskStatusChangedEvent dataclass

TaskStatusChangedEvent(task: str, status: TaskStatus, block_number: int, transaction_hash: str, log_index: int)

Yielded by watch_task_status_changed when a task transitions state.

Attributes:

Name Type Description
task str

Task contract (matches the watcher's scoping address).

status TaskStatus

Typed TaskStatus (NEW, ATTEMPTED, RESPONDED, CANCELED, EXPIRED, FINALIZED) — decoded from the raw uint8 log field.

block_number int

Block the event was emitted in.

transaction_hash str

Tx hash (hex string).

log_index int

Log index within the receipt.

ogpu.events.types.RegisteredEvent dataclass

RegisteredEvent(provider: str, registrant_id: int, source: str, block_number: int, transaction_hash: str, log_index: int)

Yielded by watch_registered when a provider registers to a source.

Attributes:

Name Type Description
provider str

Provider being registered.

registrant_id int

Slot index assigned to this registration.

source str

Source being registered to (matches the watcher's scoping address).

block_number int

Block the event was emitted in.

transaction_hash str

Tx hash (hex string).

log_index int

Log index within the receipt.