Skip to content

Recipes

Short snippets for common patterns. Each one is standalone — copy it into your code and adapt.

Publish and wait for finalization

import asyncio, time
from web3 import Web3
from ogpu import ChainConfig, ChainId
from ogpu.client import publish_task, TaskInfo, TaskInput
from ogpu.events import watch_task_status_changed
from ogpu.types import TaskStatus

ChainConfig.set_chain(ChainId.OGPU_TESTNET)

task = publish_task(TaskInfo(
    source="0x...",
    config=TaskInput(function_name="predict", data={"x": 1}),
    expiryTime=int(time.time()) + 3600,
    payment=Web3.to_wei(0.01, "ether"),
))

async def wait_until_finalized(addr: str):
    async for event in watch_task_status_changed(addr):
        if event.status == TaskStatus.FINALIZED:
            return event

asyncio.run(wait_until_finalized(task.address))
print(f"done: {task.get_winning_provider()}")

Batch-fetch state for N tasks

from ogpu.protocol import Task

task_addresses = ["0x...", "0x...", "0x..."]
snapshots = [Task(addr).snapshot() for addr in task_addresses]

for snap in snapshots:
    print(f"{snap.address}  {snap.status}  {snap.attempt_count} attempts")

snapshot() is the efficient way — each instance issues one logical batch of RPCs per call, rather than four or five separate getter calls.

Filter responses by provider

task = Task.load("0x...")
my_provider = "0xMY_PROVIDER"

my_response = task.get_response_of(my_provider)
if my_response and my_response.is_confirmed():
    payload = my_response.fetch_data()

Only confirm responses above a quality threshold

task = Task.load("0x...")

for r in task.get_responses():
    payload = r.fetch_data()
    if payload.get("confidence", 0) > 0.95:
        r.confirm(signer=CLIENT_KEY)
        break

Check provider health before dispatching

from ogpu.protocol import Provider

def is_healthy(addr: str) -> bool:
    p = Provider(addr)
    snap = p.snapshot()
    return (
        snap.is_provider
        and snap.is_eligible
        and snap.lockup > 10**18          # at least 1 OGPU locked
        and not snap.default_agent_disabled
    )

for addr in source.get_registrants():
    if is_healthy(addr):
        # dispatch a task to this provider
        ...

Dashboard line for a source

from ogpu.protocol import Source

def format_source(addr: str) -> str:
    s = Source.load(addr)
    snap = s.snapshot()
    return (
        f"{snap.address[:10]} "
        f"{snap.status.name:8s} "
        f"tasks={snap.task_count:4d} "
        f"providers={snap.registrant_count:3d} "
        f"min_payment={snap.params.minPayment:>20d} wei"
    )

for addr in my_sources:
    print(format_source(addr))

Retry on nonce collision

TxExecutor already does this automatically — you shouldn't need to write it manually. But if you want manual control:

from ogpu import fix_nonce
from ogpu.types import NonceError

try:
    result = client.publish_task(info)
except NonceError:
    fix_nonce()            # cancel stuck pending txs
    result = client.publish_task(info)   # retry once

Cancel every task for a source

source = Source.load("0xSOURCE")
for task in source.get_tasks():
    if task.get_status().name == "NEW":
        task.cancel(signer=CLIENT_KEY)

Load a Provider and check earnings

from ogpu.protocol import Provider

provider = Provider.load("0xPROVIDER")
snap = provider.snapshot()

print(f"Master:         {snap.master}")
print(f"Balance:        {snap.balance}")
print(f"Lockup:         {snap.lockup}")
print(f"Total earnings: {snap.total_earnings}")
print(f"Eligible:       {snap.is_eligible}")

Publish once, read fields many times

source = publish_source(source_info)

# Multiple fields, multiple RPCs — the stateless way
print(source.get_client())
print(source.get_status())
print(source.get_task_count())

# Or: one batch capture, many local reads
snap = source.snapshot()
print(snap.client, snap.status, snap.task_count)

Use whichever is cleaner. snapshot() wins when you need more than three fields at once.

Next