Publishing sources and tasks¶
Publish a source¶
A source is a deployed AI service that providers can register to and run tasks for. To publish one you need:
- A Docker image (or set of images, one per hardware environment) with
your task handler wrapped via
ogpu.serviceand exposed over HTTP - Docker compose files hosted somewhere HTTP-fetchable (IPFS, raw GitHub content, S3, your own server)
from web3 import Web3
from ogpu.client import (
publish_source, SourceInfo, ImageEnvironments, DeliveryMethod,
)
source = publish_source(SourceInfo(
name="sentiment-analyzer",
description="DistilBERT-based sentiment classifier",
logoUrl="https://example.com/logo.png",
imageEnvs=ImageEnvironments(
cpu="https://raw.githubusercontent.com/you/repo/main/docker-compose.yml",
nvidia="https://raw.githubusercontent.com/you/repo/main/docker-compose.gpu.yml",
),
minPayment=Web3.to_wei(0.01, "ether"),
minAvailableLockup=Web3.to_wei(0.5, "ether"),
maxExpiryDuration=86400,
deliveryMethod=DeliveryMethod.FIRST_RESPONSE,
))
print(source.address)
publish_source returns a live Source instance bound to the new
contract address. You can read state back immediately:
source.get_status() # SourceStatus.ACTIVE
source.get_client() # your address
source.get_params().minPayment # 10000000000000000
source.get_metadata() # dict — follows IPFS URL
What the fields mean¶
| Field | Type | Description |
|---|---|---|
name |
str |
Human-readable name shown in dashboards |
description |
str |
Short description of what the source does |
logoUrl |
str |
URL to a logo image (any HTTP-fetchable location) |
imageEnvs |
ImageEnvironments |
One or more docker-compose URLs, keyed by hardware (cpu, nvidia, amd) |
minPayment |
int |
Minimum payment per task, in wei |
minAvailableLockup |
int |
Minimum vault lockup a provider must hold to register to this source |
maxExpiryDuration |
int |
Maximum seconds a task can live before expiring |
deliveryMethod |
DeliveryMethod |
FIRST_RESPONSE (auto-finalize on first submit) or MANUAL_CONFIRMATION (client explicitly confirms) |
Environment support¶
ImageEnvironments is a simple dataclass with three optional fields:
from ogpu.client import ImageEnvironments
# CPU only
ImageEnvironments(cpu="...compose.yml")
# Multi-environment
ImageEnvironments(
cpu="...compose.cpu.yml",
nvidia="...compose.gpu.yml",
amd="...compose.amd.yml",
)
The SDK converts this into a bitmask matching the on-chain
Environment enum (CPU=1, NVIDIA=2, AMD=4). Providers register
with their preferred environment and only get dispatched matching tasks.
Delivery methods¶
| Method | Behavior |
|---|---|
FIRST_RESPONSE |
Task finalizes as soon as one provider submits a response. Fast, cheap, no manual action needed. Good for public-result tasks. |
MANUAL_CONFIRMATION |
Task sits at RESPONDED until the client explicitly calls confirm_response. Multiple providers can attempt and submit. Client picks a winner. Slower but you review quality. |
Publish a task¶
import time
from ogpu.client import publish_task, TaskInfo, TaskInput
task = publish_task(TaskInfo(
source="0xYOUR_SOURCE_ADDRESS",
config=TaskInput(
function_name="predict",
data={"prompt": "is this review positive or negative?"},
),
expiryTime=int(time.time()) + 3600,
payment=Web3.to_wei(0.01, "ether"),
))
TaskInput is the payload routed to the source's @exposed function.
The function_name field is mandatory — it tells the source which
handler to invoke. Everything else in data is passed through unchanged.
# Use a dict for arbitrary JSON
TaskInput(function_name="predict", data={"text": "hello", "top_k": 5})
# Or a pydantic model for stronger typing
from pydantic import BaseModel
class Req(BaseModel):
text: str
top_k: int = 1
TaskInput(function_name="predict", data=Req(text="hello", top_k=5))
Extra keyword arguments become top-level fields in the serialized config
alongside function_name and data:
TaskInput(
function_name="predict",
data={"text": "hi"},
priority="high", # extra — appears at the top level
sensitivity="low", # extra — same
)
Task fields¶
| Field | Type | Description |
|---|---|---|
source |
str |
Source contract address |
config |
TaskInput |
Function name + input data |
expiryTime |
int |
Unix timestamp after which the task expires and cannot be attempted |
payment |
int |
Payment in wei, held in the vault until the task is confirmed |
Cancel a task¶
Before any provider attempts, the client can cancel:
After attempts start landing, cancel reverts.
Update a source¶
Change minPayment, minAvailableLockup, or any other parameter:
from ogpu.client import update_source, SourceInfo, ImageEnvironments
update_source(
source.address,
SourceInfo(
name=source.get_params().client, # same fields except what you're changing
description="updated description",
logoUrl="...",
imageEnvs=ImageEnvironments(cpu="..."),
minPayment=Web3.to_wei(0.05, "ether"), # ← the change
minAvailableLockup=0,
maxExpiryDuration=86400,
),
)
The new params go through Nexus so the SourceUpdated event fires.
Inactivate a source¶
When you're done accepting new tasks to this source:
from ogpu.client import inactivate_source
inactivate_source(source.address)
assert source.get_status().name == "INACTIVE"
Publishing new tasks against an inactive source reverts with
SourceInactiveError.
What happens under the hood¶
SourceInfo/TaskInfoare pure dataclasses — no side effects.client.publish_sourceresolves the signer viaCLIENT_PRIVATE_KEYenv var (or theprivate_key=kwarg).- It builds a
SourceMetadatadict and uploads it to IPFS viapublish_to_ipfs— the resulting URL goes intoSourceParams.imageMetadataUrl. Nexus.publishSource(params)is called viaTxExecutor, which handles nonce, gas, retry, and typed revert decoding.- The
SourcePublishedevent log is parsed to extract the new source address. - A
Sourceinstance is constructed around that address and returned.
publish_task is the same pattern but with taskConfig.json uploaded
to IPFS and Controller.publishTask.
Next¶
- Reading state — instance classes and snapshots
- Responses — fetching and confirming response payloads
- Events — watch
TaskPublished,Attempted, etc. live - IPFS — if you want to upload custom metadata yourself