Skip to content

API Reference: ogpu.client

Essential API reference for the ogpu.client module - the foundation for task publishing and response management on the OpenGPU network.

Task and Source Management

This module is designed for interacting with the OpenGPU network. Use ogpu.client to publish tasks, manage sources, retrieve responses, and configure blockchain connections.

Source Operations

ogpu.client.source.publish_source

Publish a source to the Nexus contract.

Parameters:

Name Type Description Default
source_info SourceInfo

SourceInfo object containing source configuration

required
private_key str | None

Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

None

Returns:

Type Description
str

Address of the created source contract

Source code in ogpu/client/source.py
def publish_source(
    source_info: SourceInfo,
    private_key: str | None = None,
) -> str:
    """
    Publish a source to the Nexus contract.

    Args:
        source_info: SourceInfo object containing source configuration
        private_key: Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

    Returns:
        Address of the created source contract
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)
    client_address = acc.address

    # Convert SourceInfo to SourceParams
    source_params = source_info.to_source_params(client_address)

    # Get contract instances
    nexus_contract = NexusContract()
    web3 = WEB3()

    tx = nexus_contract.functions.publishSource(
        source_params.to_tuple()
    ).build_transaction(
        {
            "from": acc.address,
            "nonce": web3.eth.get_transaction_count(acc.address),
        }
    )

    signed = web3.eth.account.sign_transaction(tx, private_key)
    tx_hash = web3.eth.send_raw_transaction(signed.raw_transaction)
    receipt = web3.eth.wait_for_transaction_receipt(tx_hash)

    logs = nexus_contract.events.SourcePublished().process_receipt(receipt)
    return web3.to_checksum_address(logs[0]["args"]["source"])

Purpose: Publish AI services as sources on the OpenGPU network.

Example:

from web3 import Web3
from ogpu.client import publish_source, SourceInfo, ImageEnvironments, DeliveryMethod

source_info = SourceInfo(
    name="sentiment-service",
    description="AI sentiment analysis service",
    logoUrl="https://example.com/logo.png",
    imageEnvs=ImageEnvironments(
        cpu="https://raw.githubusercontent.com/user/repo/main/docker-compose.yml",
        nvidia="https://raw.githubusercontent.com/user/repo/main/docker-compose-gpu.yml"
    ),
    minPayment=Web3.to_wei(0.001, "ether"),
    minAvailableLockup=Web3.to_wei(0.01, "ether"),
    maxExpiryDuration=86400,
    deliveryMethod=DeliveryMethod.MANUAL_CONFIRMATION
)

source_address = publish_source(source_info)
print(f"Source published at: {source_address}")


Task Operations

ogpu.client.task.publish_task

Publish a task to the Controller contract.

Parameters:

Name Type Description Default
task_info TaskInfo

TaskInfo object containing task configuration

required
private_key str | None

Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

None

Returns:

Type Description
str

Address of the created task contract

Source code in ogpu/client/task.py
def publish_task(
    task_info: TaskInfo,
    private_key: str | None = None,
) -> str:
    """
    Publish a task to the Controller contract.

    Args:
        task_info: TaskInfo object containing task configuration
        private_key: Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

    Returns:
        Address of the created task contract
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)
    # Convert TaskInfo to TaskParams
    task_params = task_info.to_task_params()

    # Get Web3 instance
    web3 = WEB3()

    # Get contract instances
    controller_contract = ControllerContract()
    nexus_contract = NexusContract()

    tx = controller_contract.functions.publishTask(
        task_params.to_tuple()
    ).build_transaction(
        {"from": acc.address, "nonce": web3.eth.get_transaction_count(acc.address)}
    )

    signed = web3.eth.account.sign_transaction(tx, private_key)
    tx_hash = web3.eth.send_raw_transaction(signed.raw_transaction)
    receipt = web3.eth.wait_for_transaction_receipt(tx_hash)

    logs = nexus_contract.events.TaskPublished().process_receipt(receipt)
    return web3.to_checksum_address(logs[0]["args"]["task"])

Purpose: Publish AI tasks to the OpenGPU network for distributed processing.

Example:

import time
from web3 import Web3
from ogpu.client import publish_task, TaskInfo, TaskInput

# Create task configuration
task_config = TaskInput(
    function_name="analyze_sentiment",
    data={"text": "I love this product!"}
)

task_info = TaskInfo(
    source="0x1234567890123456789012345678901234567890",
    config=task_config,
    expiryTime=int(time.time()) + 3600,  # 1 hour from now
    payment=Web3.to_wei(0.01, "ether")
)

task_address = publish_task(task_info)
print(f"Task published at: {task_address}")


Response Operations

ogpu.client.responses.get_task_responses

Get all responses for a specific task address.

Parameters:

Name Type Description Default
task_address str

The task contract address

required
lower int

Lower bound for response pagination (default: 0)

0
upper Optional[int]

Upper bound for response pagination (default: None, gets all)

None

Returns:

Type Description
List[Response]

List of TaskResponse objects containing response data

Raises:

Type Description
Exception

If the contract call fails or task doesn't exist

Source code in ogpu/client/responses.py
def get_task_responses(
    task_address: str, lower: int = 0, upper: Optional[int] = None
) -> List[Response]:
    """
    Get all responses for a specific task address.

    Args:
        task_address: The task contract address
        lower: Lower bound for response pagination (default: 0)
        upper: Upper bound for response pagination (default: None, gets all)

    Returns:
        List of TaskResponse objects containing response data

    Raises:
        Exception: If the contract call fails or task doesn't exist
    """
    try:
        # Load the task contract
        task_contract = load_task_contract(task_address)

        # Get response addresses from the task
        if upper is None:
            # Get all responses by setting upper to a large number
            # We'll handle the actual limit based on what's available
            try:
                response_addresses = task_contract.functions.getResponsesOf(
                    lower, lower + 1000
                ).call()
            except ContractLogicError:
                # If pagination fails, try to get responses one by one
                response_addresses = []
                i = lower
                while True:
                    try:
                        response_addr = task_contract.functions.getResponsesOf(
                            i, i + 1
                        ).call()
                        if not response_addr:
                            break
                        response_addresses.extend(response_addr)
                        i += 1
                    except (ContractLogicError, Exception):
                        break
        else:
            response_addresses = task_contract.functions.getResponsesOf(
                lower, upper
            ).call()

        # Filter out zero addresses
        response_addresses = [
            addr
            for addr in response_addresses
            if addr != "0x0000000000000000000000000000000000000000"
        ]

        responses = []

        # Get detailed information for each response
        for response_addr in response_addresses:
            try:
                response_contract = load_response_contract(response_addr)

                # Get response parameters
                response_params = response_contract.functions.getResponseParams().call()

                # Get response status
                status = response_contract.functions.getStatus().call()

                # Get response timestamp
                timestamp = response_contract.functions.responseTimestamp().call()

                # Get confirmation status
                confirmed = response_contract.functions.confirmedFinal().call()

                response_data = Response(
                    address=response_addr,
                    task=response_params[0],
                    provider=response_params[1],
                    data=response_params[2],
                    payment=response_params[3],
                    status=status,
                    timestamp=timestamp,
                    confirmed=confirmed,
                )

                responses.append(response_data)

            except Exception as e:
                # Log the error but continue with other responses
                print(f"Error fetching response {response_addr}: {e}")
                continue

        return responses

    except ContractLogicError as e:
        raise Exception(f"Contract call failed for task {task_address}: {e}")
    except Exception as e:
        raise Exception(f"Error fetching responses for task {task_address}: {e}")

Purpose: Retrieve all responses for a published task.

Example:

responses = get_task_responses(task_address)
for response in responses:
    print(f"Provider: {response.provider}")
    print(f"Status: {response.status}")
    print(f"Data: {response.data}")


ogpu.client.responses.get_confirmed_response

Get confirmed response data for a specific task address by calling the API.

Parameters:

Name Type Description Default
task_address str

The task contract address

required

Returns:

Type Description
ConfirmedResponse

ConfirmedResponse object containing the confirmed response data

Raises:

Type Description
Exception

If the API call fails or no confirmed response is found

Source code in ogpu/client/responses.py
def get_confirmed_response(task_address: str) -> ConfirmedResponse:
    """
    Get confirmed response data for a specific task address by calling the API.

    Args:
        task_address: The task contract address

    Returns:
        ConfirmedResponse object containing the confirmed response data

    Raises:
        Exception: If the API call fails or no confirmed response is found
    """
    try:
        # Make API request to get task responses
        api_url = f"https://management-backend.opengpu.network/api/tasks/{task_address}"

        response = requests.get(api_url)
        response.raise_for_status()  # Raise an exception for HTTP errors

        api_data = response.json()

        # Extract data and address from the API response
        if "data" not in api_data:
            raise Exception(f"Invalid API response format for task {task_address}")

        data_content = api_data["data"]
        if "address" not in data_content:
            raise Exception(
                f"Address not found in API response for task {task_address}"
            )

        # Create ConfirmedResponse object with the API data
        confirmed_response = ConfirmedResponse(
            address=data_content["address"],
            data=data_content,
        )

        return confirmed_response

    except requests.RequestException as e:
        raise Exception(f"API request failed for task {task_address}: {e}")
    except Exception as e:
        raise Exception(
            f"Error fetching confirmed response for task {task_address}: {e}"
        )

Purpose: Retrieve the confirmed response for a completed task.

Example:

confirmed = get_confirmed_response(task_address)
if confirmed:
    print(f"Result: {confirmed.data}")


ogpu.client.responses.confirm_response

Confirm a response using the Controller contract.

Parameters:

Name Type Description Default
response_address str

The response contract address to confirm

required
private_key str | None

Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

None

Returns:

Name Type Description
str str

Transaction hash of the confirmation

Raises:

Type Description
Exception

If the confirmation fails

Source code in ogpu/client/responses.py
def confirm_response(
    response_address: str,
    private_key: str | None = None,
) -> str:
    """
    Confirm a response using the Controller contract.

    Args:
        response_address: The response contract address to confirm
        private_key: Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

    Returns:
        str: Transaction hash of the confirmation

    Raises:
        Exception: If the confirmation fails
    """
    if private_key is None:
        private_key = get_private_key()

    # Validate response address format
    if not WEB3.is_address(response_address):
        raise ValueError(f"Invalid response address format: {response_address}")

    acc = Account.from_key(private_key)

    try:
        # First, try to validate the response exists and can be confirmed
        response_contract = load_response_contract(response_address)

        # Check if response is already confirmed
        try:
            confirmed = response_contract.functions.confirmedFinal().call()
            if confirmed:
                raise Exception(f"Response {response_address} is already confirmed")
        except Exception as e:
            if "execution reverted" in str(e).lower():
                raise Exception(
                    f"Response contract {response_address} may not exist or is invalid"
                )
            # If it's another error, continue and let the transaction attempt provide more info

        # Execute the confirmation transaction
        controller_contract = ControllerContract()
        web3 = WEB3()

        tx = controller_contract.functions.confirmResponse(
            response_address
        ).build_transaction(
            {
                "from": acc.address,
                "nonce": web3.eth.get_transaction_count(acc.address),
            }
        )

        signed = web3.eth.account.sign_transaction(tx, private_key)
        tx_hash = web3.eth.send_raw_transaction(signed.raw_transaction)
        receipt = web3.eth.wait_for_transaction_receipt(tx_hash)

        if receipt["status"] != 1:
            raise Exception(f"Transaction failed: {tx_hash.hex()}")
        return tx_hash.hex()

    except ContractLogicError as e:
        if "execution reverted" in str(e):
            raise Exception(
                f"Contract execution reverted. Possible reasons: "
                f"1) Response {response_address} doesn't exist, "
                f"2) Already confirmed, "
                f"3) Caller {acc.address} doesn't have permission to confirm, "
                f"4) Response is in invalid state for confirmation"
            )
        raise Exception(f"Contract logic error: {e}")
    except Exception as e:
        if "execution reverted" in str(e).lower():
            raise Exception(
                f"Transaction reverted for response {response_address}. "
                f"Check if the response exists and you have permission to confirm it."
            )
        raise Exception(f"Error confirming response {response_address}: {e}")

Purpose: Confirm a response to complete the task and release payment.

Example:

tx_hash = confirm_response(response_address)
print(f"Confirmation transaction: {tx_hash}")


Network Configuration

ogpu.client.chain_config.ChainId

Enum for supported blockchain networks.

Attributes:

Name Type Description
OGPU_MAINNET

Main OpenGPU network (Chain ID: 1071)

OGPU_TESTNET

Test OpenGPU network (Chain ID: 200820172034)

Purpose: Enum defining supported blockchain networks.


ogpu.client.chain_config.ChainConfig.set_chain classmethod

Set the current active chain

Source code in ogpu/client/chain_config.py
@classmethod
def set_chain(cls, chain_id: ChainId) -> None:
    """Set the current active chain"""
    if chain_id not in cls.CHAIN_CONTRACTS:
        raise ValueError(f"Chain {chain_id} is not supported")
    cls._current_chain = chain_id

Purpose: Set the current active blockchain network.

Example:

from ogpu.client import ChainConfig, ChainId

# Set to testnet
ChainConfig.set_chain(ChainId.OGPU_TESTNET)

# Set to mainnet
ChainConfig.set_chain(ChainId.OGPU_MAINNET)


ogpu.client.chain_config.ChainConfig.get_current_chain classmethod

Get the current active chain

Source code in ogpu/client/chain_config.py
@classmethod
def get_current_chain(cls) -> ChainId:
    """Get the current active chain"""
    if cls._current_chain is None:
        raise ValueError("No chain has been set. Call set_chain() first.")
    return cls._current_chain

Purpose: Get the currently active blockchain network.

Example:

current_chain = ChainConfig.get_current_chain()
print(f"Current chain: {current_chain.name}")
print(f"Chain ID: {current_chain.value}")


ogpu.client.chain_config.ChainConfig.get_contract_address classmethod

Get contract address for the current chain

Source code in ogpu/client/chain_config.py
@classmethod
def get_contract_address(cls, contract_name: str) -> str:
    """Get contract address for the current chain"""
    current_chain = cls.get_current_chain()

    if current_chain not in cls.CHAIN_CONTRACTS:
        raise ValueError(f"Chain {current_chain} is not configured")

    chain_contracts = cls.CHAIN_CONTRACTS[current_chain]
    if contract_name not in chain_contracts:
        raise ValueError(
            f"Contract {contract_name} not found for chain {current_chain}"
        )

    return chain_contracts[contract_name]

Purpose: Get contract address for a specific contract on the current chain.

Example:

# Get contract addresses for current chain
nexus_address = ChainConfig.get_contract_address("NEXUS")
controller_address = ChainConfig.get_contract_address("CONTROLLER")
terminal_address = ChainConfig.get_contract_address("TERMINAL")

print(f"NEXUS: {nexus_address}")
print(f"CONTROLLER: {controller_address}")
print(f"TERMINAL: {terminal_address}")


ogpu.client.chain_config.ChainConfig.get_all_supported_chains classmethod

Get list of all supported chains

Source code in ogpu/client/chain_config.py
@classmethod
def get_all_supported_chains(cls) -> list[ChainId]:
    """Get list of all supported chains"""
    return list(cls.CHAIN_CONTRACTS.keys())

Purpose: Get list of all supported blockchain networks.

Example:

supported_chains = ChainConfig.get_all_supported_chains()
for chain in supported_chains:
    print(f"Chain: {chain.name} (ID: {chain.value})")


Data Types

ogpu.client.types.ImageEnvironments dataclass

Docker compose file paths for different environments.

Attributes:

Name Type Description
cpu str

URL to CPU-only docker-compose.yml file

nvidia str

URL to NVIDIA GPU docker-compose.yml file

amd str

URL to AMD GPU docker-compose.yml file

Purpose: Docker compose file URLs for different hardware environments.

Example:

image_envs = ImageEnvironments(
    cpu="https://raw.githubusercontent.com/user/repo/main/docker-compose.yml",
    nvidia="https://raw.githubusercontent.com/user/repo/main/docker-compose-gpu.yml",
    amd="https://raw.githubusercontent.com/user/repo/main/docker-compose-amd.yml"
)


ogpu.client.types.DeliveryMethod

Enum for delivery method options.

Attributes:

Name Type Description
MANUAL_CONFIRMATION

Client manually confirms the response

FIRST_RESPONSE

First provider to submit a response wins

Purpose: Enum defining how task responses are delivered and confirmed.


ogpu.client.types.SourceInfo dataclass

User-friendly source information structure.

Attributes:

Name Type Description
name str

Human-readable name for the source

description str

Description of the AI service

logoUrl str

URL to the source logo image

imageEnvs ImageEnvironments

Docker environment configurations

minPayment int

Minimum payment required in wei

minAvailableLockup int

Minimum lockup amount in wei

maxExpiryDuration int

Maximum task duration in seconds

deliveryMethod DeliveryMethod

How responses are delivered

Purpose: Configuration data for publishing AI services as sources.

Example:

source_info = SourceInfo(
    name="sentiment-service",
    description="AI sentiment analysis service",
    logoUrl="https://example.com/logo.png",
    imageEnvs=ImageEnvironments(
        cpu="https://raw.githubusercontent.com/user/repo/main/docker-compose.yml",
        nvidia="https://raw.githubusercontent.com/user/repo/main/docker-compose-gpu.yml"
    ),
    minPayment=Web3.to_wei(0.001, "ether"),
    minAvailableLockup=Web3.to_wei(0.01, "ether"),
    maxExpiryDuration=86400,
    deliveryMethod=DeliveryMethod.MANUAL_CONFIRMATION
)


ogpu.client.types.TaskInput dataclass

Configuration structure for tasks.

Attributes:

Name Type Description
function_name str

Name of the function to call on the source

data BaseModel | dict[str, Any]

Input data for the function (Pydantic model or dictionary)

Purpose: Configuration for task input data and function specification.

Example:

# Using a dictionary
task_input = TaskInput(
    function_name="inference",
    data={"inputs": "Translate to French: Hello"}
)

# Using a Pydantic model
from pydantic import BaseModel

class InferenceData(BaseModel):
    inputs: str
    parameters: dict = {}

task_input = TaskInput(
    function_name="inference", 
    data=InferenceData(inputs="Hello world")
)


ogpu.client.types.TaskInfo dataclass

User-friendly task information structure.

Attributes:

Name Type Description
source str

Address of the source to run the task on

config TaskInput

Task input configuration and function call

expiryTime int

Unix timestamp when task expires

payment int

Payment amount for the task in wei

Purpose: Configuration data for publishing tasks to the network.

Example:

task_info = TaskInfo(
    source="0x1234...",
    config=TaskInput(
        function_name="inference",
        data={"inputs": "What is AI?"}
    ),
    expiryTime=1640995200,
    payment=1000000000000000000  # 1 OGPU in wei
)


ogpu.client.types.Response dataclass

Response data structure for task responses.

Attributes:

Name Type Description
address str

Blockchain address of the response

task str

Address of the task this responds to

provider str

Address of the provider who submitted the response

data str

Response data from the AI service

payment int

Payment amount in wei

status int

Response status code

timestamp int

Unix timestamp when response was submitted

confirmed bool

Whether the response has been confirmed

Purpose: Response data structure from completed tasks.

Example:

# Response object from get_task_responses()
response = Response(
    address="0xabcd...",
    task="0x1234...",
    provider="0x5678...",
    data='{"result": "positive sentiment"}',
    payment=1000000000000000000,
    status=1,
    timestamp=1640995200,
    confirmed=False
)


ogpu.client.types.ConfirmedResponse dataclass

Simplified confirmed response data structure.

Attributes:

Name Type Description
address str

Blockchain address of the confirmed response

data str

The confirmed response data

Purpose: Simplified confirmed response data structure.

Example:

# Confirmed response from get_confirmed_response()
confirmed = ConfirmedResponse(
    address="0xabcd...",
    data='{"result": "positive sentiment", "confidence": 0.95}'
)