Skip to content

API Reference: ogpu.client

Essential API reference for the ogpu.client module - the foundation for task publishing and response management on the OpenGPU network.

Task and Source Management

This module is designed for interacting with the OpenGPU network. Use ogpu.client to publish tasks, manage sources, retrieve responses, and configure blockchain connections.

Source Operations

ogpu.client.source.publish_source

Publish a source to the Nexus contract.

Parameters:

Name Type Description Default
source_info SourceInfo

SourceInfo object containing source configuration

required
private_key Optional[str]

Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

None
nonce Optional[int]

Optional manual nonce override. If None, will be fetched automatically.

None
auto_fix_nonce bool

If True, automatically retry on nonce errors (default: True)

True
max_retries int

Maximum number of retry attempts on recoverable errors (default: 3)

3

Returns:

Type Description
str

Address of the created source contract

Raises:

Type Description
Exception

If transaction fails after all retries

Example

from ogpu.client import publish_source, SourceInfo

Normal usage with auto-retry

source_address = publish_source(source_info)

Manual nonce override (for advanced use)

source_address = publish_source(source_info, nonce=42)

Source code in ogpu/client/source.py
def publish_source(
    source_info: SourceInfo,
    private_key: Optional[str] = None,
    nonce: Optional[int] = None,
    auto_fix_nonce: bool = True,
    max_retries: int = 3,
) -> str:
    """
    Publish a source to the Nexus contract.

    Args:
        source_info: SourceInfo object containing source configuration
        private_key: Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.
        nonce: Optional manual nonce override. If None, will be fetched automatically.
        auto_fix_nonce: If True, automatically retry on nonce errors (default: True)
        max_retries: Maximum number of retry attempts on recoverable errors (default: 3)

    Returns:
        Address of the created source contract

    Raises:
        Exception: If transaction fails after all retries

    Example:
        >>> from ogpu.client import publish_source, SourceInfo
        >>> # Normal usage with auto-retry
        >>> source_address = publish_source(source_info)
        >>>
        >>> # Manual nonce override (for advanced use)
        >>> source_address = publish_source(source_info, nonce=42)
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)

    for attempt in range(max_retries):
        try:
            client_address = acc.address

            # Convert SourceInfo to SourceParams
            source_params = source_info.to_source_params(client_address)

            # Get contract instances
            nexus_contract = NexusContract()
            web3 = WEB3()

            # Get nonce (manual override or managed)
            if nonce is not None:
                tx_nonce = nonce
            else:
                tx_nonce = NonceManager.get_nonce(acc.address, web3)

            tx = nexus_contract.functions.publishSource(
                source_params.to_tuple()
            ).build_transaction({"from": acc.address, "nonce": tx_nonce})

            signed = web3.eth.account.sign_transaction(tx, private_key)
            tx_hash = web3.eth.send_raw_transaction(signed.raw_transaction)
            receipt = web3.eth.wait_for_transaction_receipt(tx_hash)

            # Success! Increment nonce if we're managing it
            if nonce is None:
                NonceManager.increment_nonce(acc.address, web3)

            logs = nexus_contract.events.SourcePublished().process_receipt(receipt)
            return web3.to_checksum_address(logs[0]["args"]["source"])

        except Exception as e:
            error_msg = str(e).lower()

            # Check if this is a nonce-related error
            is_nonce_error = any(
                x in error_msg
                for x in ["nonce too low", "known transaction", "already known"]
            )

            # Check if this is a replacement underpriced error
            is_underpriced = "replacement transaction underpriced" in error_msg

            if is_nonce_error:
                print(
                    f"⚠️  Nonce error detected on attempt {attempt + 1}/{max_retries}"
                )

                if auto_fix_nonce and attempt < max_retries - 1:
                    print(f"🔧 Auto-fixing nonce...")
                    from .nonce_utils import fix_nonce

                    fix_nonce(acc.address, private_key)
                    print(f"🔄 Retrying transaction...")
                    continue
                else:
                    raise Exception(
                        f"Nonce error after {attempt + 1} attempts: {e}\n"
                        f"💡 Try calling fix_nonce() to manually resolve this issue."
                    )

            elif is_underpriced:
                print(
                    f"⚠️  Transaction underpriced on attempt {attempt + 1}/{max_retries}"
                )

                if auto_fix_nonce and attempt < max_retries - 1:
                    print(
                        f"⛽ Waiting 5 seconds for gas prices to update and retrying..."
                    )
                    time.sleep(5)
                    NonceManager.reset_nonce(acc.address, web3)
                    continue
                else:
                    raise Exception(
                        f"Transaction underpriced after {attempt + 1} attempts: {e}\n"
                        f"💡 Wait a few seconds and try again, or increase gas price."
                    )

            else:
                # Not a recoverable error, re-raise immediately
                raise

    # Max retries exceeded
    raise Exception(f"Failed to publish source after {max_retries} attempts")

Purpose: Publish AI services as sources on the OpenGPU network.

Example:

from web3 import Web3
from ogpu.client import publish_source, SourceInfo, ImageEnvironments, DeliveryMethod

source_info = SourceInfo(
    name="sentiment-service",
    description="AI sentiment analysis service",
    logoUrl="https://example.com/logo.png",
    imageEnvs=ImageEnvironments(
        cpu="https://raw.githubusercontent.com/user/repo/main/docker-compose.yml",
        nvidia="https://raw.githubusercontent.com/user/repo/main/docker-compose-gpu.yml"
    ),
    minPayment=Web3.to_wei(0.001, "ether"),
    minAvailableLockup=Web3.to_wei(0.01, "ether"),
    maxExpiryDuration=86400,
    deliveryMethod=DeliveryMethod.MANUAL_CONFIRMATION
)

source_address = publish_source(source_info)
print(f"Source published at: {source_address}")


Task Operations

ogpu.client.task.publish_task

Publish a task to the Controller contract.

Parameters:

Name Type Description Default
task_info TaskInfo

TaskInfo object containing task configuration

required
private_key Optional[str]

Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

None
nonce Optional[int]

Optional manual nonce override. If None, will be fetched automatically.

None
auto_fix_nonce bool

If True, automatically retry on nonce errors (default: True)

True
max_retries int

Maximum number of retry attempts on recoverable errors (default: 3)

3

Returns:

Type Description
str

Address of the created task contract

Raises:

Type Description
Exception

If transaction fails after all retries

Example

from ogpu.client import publish_task, TaskInfo

Normal usage with auto-retry

task_address = publish_task(task_info)

Manual nonce override (for advanced use)

task_address = publish_task(task_info, nonce=42)

Disable auto-fix

task_address = publish_task(task_info, auto_fix_nonce=False)

Source code in ogpu/client/task.py
def publish_task(
    task_info: TaskInfo,
    private_key: Optional[str] = None,
    nonce: Optional[int] = None,
    auto_fix_nonce: bool = True,
    max_retries: int = 3,
) -> str:
    """
    Publish a task to the Controller contract.

    Args:
        task_info: TaskInfo object containing task configuration
        private_key: Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.
        nonce: Optional manual nonce override. If None, will be fetched automatically.
        auto_fix_nonce: If True, automatically retry on nonce errors (default: True)
        max_retries: Maximum number of retry attempts on recoverable errors (default: 3)

    Returns:
        Address of the created task contract

    Raises:
        Exception: If transaction fails after all retries

    Example:
        >>> from ogpu.client import publish_task, TaskInfo
        >>> # Normal usage with auto-retry
        >>> task_address = publish_task(task_info)
        >>>
        >>> # Manual nonce override (for advanced use)
        >>> task_address = publish_task(task_info, nonce=42)
        >>>
        >>> # Disable auto-fix
        >>> task_address = publish_task(task_info, auto_fix_nonce=False)
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)

    for attempt in range(max_retries):
        try:
            # Convert TaskInfo to TaskParams
            task_params = task_info.to_task_params()

            # Get Web3 instance
            web3 = WEB3()

            # Get contract instances
            controller_contract = ControllerContract()
            nexus_contract = NexusContract()

            # Get nonce (manual override or managed)
            if nonce is not None:
                tx_nonce = nonce
            else:
                tx_nonce = NonceManager.get_nonce(acc.address, web3)

            tx = controller_contract.functions.publishTask(
                task_params.to_tuple()
            ).build_transaction({"from": acc.address, "nonce": tx_nonce})

            signed = web3.eth.account.sign_transaction(tx, private_key)
            tx_hash = web3.eth.send_raw_transaction(signed.raw_transaction)
            receipt = web3.eth.wait_for_transaction_receipt(tx_hash)

            # Success! Increment nonce if we're managing it
            if nonce is None:
                NonceManager.increment_nonce(acc.address, web3)

            logs = nexus_contract.events.TaskPublished().process_receipt(receipt)
            return web3.to_checksum_address(logs[0]["args"]["task"])

        except Exception as e:
            error_msg = str(e).lower()

            # Check if this is a nonce-related error
            is_nonce_error = any(
                x in error_msg
                for x in ["nonce too low", "known transaction", "already known"]
            )

            # Check if this is a replacement underpriced error
            is_underpriced = "replacement transaction underpriced" in error_msg

            if is_nonce_error:
                print(
                    f"⚠️  Nonce error detected on attempt {attempt + 1}/{max_retries}"
                )

                if auto_fix_nonce and attempt < max_retries - 1:
                    print(f"🔧 Auto-fixing nonce...")
                    # Import here to avoid circular dependency
                    from .nonce_utils import fix_nonce

                    fix_nonce(acc.address, private_key)
                    print(f"🔄 Retrying transaction...")
                    continue
                else:
                    raise Exception(
                        f"Nonce error after {attempt + 1} attempts: {e}\n"
                        f"💡 Try calling fix_nonce() to manually resolve this issue."
                    )

            elif is_underpriced:
                print(
                    f"⚠️  Transaction underpriced on attempt {attempt + 1}/{max_retries}"
                )

                if auto_fix_nonce and attempt < max_retries - 1:
                    print(
                        f"⛽ Waiting 5 seconds for gas prices to update and retrying..."
                    )
                    time.sleep(5)
                    # Reset nonce cache to get fresh values
                    NonceManager.reset_nonce(acc.address, web3)
                    continue
                else:
                    raise Exception(
                        f"Transaction underpriced after {attempt + 1} attempts: {e}\n"
                        f"💡 Wait a few seconds and try again, or increase gas price."
                    )

            else:
                # Not a recoverable error, re-raise immediately
                raise

    # Max retries exceeded
    raise Exception(f"Failed to publish task after {max_retries} attempts")

Purpose: Publish AI tasks to the OpenGPU network for distributed processing.

Example:

import time
from web3 import Web3
from ogpu.client import publish_task, TaskInfo, TaskInput

# Create task configuration
task_config = TaskInput(
    function_name="analyze_sentiment",
    data={"text": "I love this product!"}
)

task_info = TaskInfo(
    source="0x1234567890123456789012345678901234567890",
    config=task_config,
    expiryTime=int(time.time()) + 3600,  # 1 hour from now
    payment=Web3.to_wei(0.01, "ether")
)

task_address = publish_task(task_info)
print(f"Task published at: {task_address}")


Response Operations

ogpu.client.responses.get_task_responses

Get all responses for a specific task address.

Parameters:

Name Type Description Default
task_address str

The task contract address

required
lower int

Lower bound for response pagination (default: 0)

0
upper Optional[int]

Upper bound for response pagination (default: None, gets all)

None

Returns:

Type Description
List[Response]

List of TaskResponse objects containing response data

Raises:

Type Description
Exception

If the contract call fails or task doesn't exist

Source code in ogpu/client/responses.py
def get_task_responses(
    task_address: str, lower: int = 0, upper: Optional[int] = None
) -> List[Response]:
    """
    Get all responses for a specific task address.

    Args:
        task_address: The task contract address
        lower: Lower bound for response pagination (default: 0)
        upper: Upper bound for response pagination (default: None, gets all)

    Returns:
        List of TaskResponse objects containing response data

    Raises:
        Exception: If the contract call fails or task doesn't exist
    """
    try:
        # Load the task contract
        task_contract = load_task_contract(task_address)

        # Get response addresses from the task
        if upper is None:
            # Get all responses by setting upper to a large number
            # We'll handle the actual limit based on what's available
            try:
                response_addresses = task_contract.functions.getResponsesOf(
                    lower, lower + 1000
                ).call()
            except ContractLogicError:
                # If pagination fails, try to get responses one by one
                response_addresses = []
                i = lower
                while True:
                    try:
                        response_addr = task_contract.functions.getResponsesOf(
                            i, i + 1
                        ).call()
                        if not response_addr:
                            break
                        response_addresses.extend(response_addr)
                        i += 1
                    except (ContractLogicError, Exception):
                        break
        else:
            response_addresses = task_contract.functions.getResponsesOf(
                lower, upper
            ).call()

        # Filter out zero addresses
        response_addresses = [
            addr
            for addr in response_addresses
            if addr != "0x0000000000000000000000000000000000000000"
        ]

        responses = []

        # Get detailed information for each response
        for response_addr in response_addresses:
            try:
                response_contract = load_response_contract(response_addr)

                # Get response parameters
                response_params = response_contract.functions.getResponseParams().call()

                # Get response status
                status = response_contract.functions.getStatus().call()

                # Get response timestamp
                timestamp = response_contract.functions.responseTimestamp().call()

                # Get confirmation status
                confirmed = response_contract.functions.confirmedFinal().call()

                response_data = Response(
                    address=response_addr,
                    task=response_params[0],
                    provider=response_params[1],
                    data=response_params[2],
                    payment=response_params[3],
                    status=status,
                    timestamp=timestamp,
                    confirmed=confirmed,
                )

                responses.append(response_data)

            except Exception as e:
                # Log the error but continue with other responses
                print(f"Error fetching response {response_addr}: {e}")
                continue

        return responses

    except ContractLogicError as e:
        raise Exception(f"Contract call failed for task {task_address}: {e}")
    except Exception as e:
        raise Exception(f"Error fetching responses for task {task_address}: {e}")

Purpose: Retrieve all responses for a published task.

Example:

responses = get_task_responses(task_address)
for response in responses:
    print(f"Provider: {response.provider}")
    print(f"Status: {response.status}")
    print(f"Data: {response.data}")


ogpu.client.responses.get_confirmed_response

Get confirmed response data for a specific task address by calling the API.

Parameters:

Name Type Description Default
task_address str

The task contract address

required

Returns:

Type Description
ConfirmedResponse

ConfirmedResponse object containing the confirmed response data

Raises:

Type Description
Exception

If the API call fails or no confirmed response is found

Source code in ogpu/client/responses.py
def get_confirmed_response(task_address: str) -> ConfirmedResponse:
    """
    Get confirmed response data for a specific task address by calling the API.

    Args:
        task_address: The task contract address

    Returns:
        ConfirmedResponse object containing the confirmed response data

    Raises:
        Exception: If the API call fails or no confirmed response is found
    """
    try:
        # Make API request to get task responses
        api_url = f"https://management-backend.opengpu.network/api/tasks/{task_address}"

        response = requests.get(api_url)
        response.raise_for_status()  # Raise an exception for HTTP errors

        api_data = response.json()

        # Extract data and address from the API response
        if "data" not in api_data:
            raise Exception(f"Invalid API response format for task {task_address}")

        data_content = api_data["data"]
        if "address" not in data_content:
            raise Exception(
                f"Address not found in API response for task {task_address}"
            )

        # Create ConfirmedResponse object with the API data
        confirmed_response = ConfirmedResponse(
            address=data_content["address"],
            data=data_content,
        )

        return confirmed_response

    except requests.RequestException as e:
        raise Exception(f"API request failed for task {task_address}: {e}")
    except Exception as e:
        raise Exception(
            f"Error fetching confirmed response for task {task_address}: {e}"
        )

Purpose: Retrieve the confirmed response for a completed task.

Example:

confirmed = get_confirmed_response(task_address)
if confirmed:
    print(f"Result: {confirmed.data}")


ogpu.client.responses.confirm_response

Confirm a response using the Controller contract.

Parameters:

Name Type Description Default
response_address str

The response contract address to confirm

required
private_key Optional[str]

Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.

None
nonce Optional[int]

Optional manual nonce override. If None, will be fetched automatically.

None
auto_fix_nonce bool

If True, automatically retry on nonce errors (default: True)

True
max_retries int

Maximum number of retry attempts on recoverable errors (default: 3)

3

Returns:

Name Type Description
str str

Transaction hash of the confirmation

Raises:

Type Description
Exception

If the confirmation fails after all retries

Example

from ogpu.client import confirm_response

Normal usage with auto-retry

tx_hash = confirm_response(response_address)

Manual nonce override

tx_hash = confirm_response(response_address, nonce=42)

Source code in ogpu/client/responses.py
def confirm_response(
    response_address: str,
    private_key: Optional[str] = None,
    nonce: Optional[int] = None,
    auto_fix_nonce: bool = True,
    max_retries: int = 3,
) -> str:
    """
    Confirm a response using the Controller contract.

    Args:
        response_address: The response contract address to confirm
        private_key: Private key for signing the transaction. If None, will use CLIENT_PRIVATE_KEY environment variable.
        nonce: Optional manual nonce override. If None, will be fetched automatically.
        auto_fix_nonce: If True, automatically retry on nonce errors (default: True)
        max_retries: Maximum number of retry attempts on recoverable errors (default: 3)

    Returns:
        str: Transaction hash of the confirmation

    Raises:
        Exception: If the confirmation fails after all retries

    Example:
        >>> from ogpu.client import confirm_response
        >>> # Normal usage with auto-retry
        >>> tx_hash = confirm_response(response_address)
        >>>
        >>> # Manual nonce override
        >>> tx_hash = confirm_response(response_address, nonce=42)
    """
    if private_key is None:
        private_key = get_private_key()

    # Validate response address format
    web3 = WEB3()
    if not web3.is_address(response_address):
        raise ValueError(f"Invalid response address format: {response_address}")

    acc = Account.from_key(private_key)

    # Validate response exists (do this once, outside retry loop)
    try:
        response_contract = load_response_contract(response_address)
        confirmed = response_contract.functions.confirmedFinal().call()
        if confirmed:
            raise Exception(f"Response {response_address} is already confirmed")
    except Exception as e:
        if "execution reverted" in str(e).lower():
            raise Exception(
                f"Response contract {response_address} may not exist or is invalid"
            )
        # If it's another error, continue and let the transaction attempt provide more info

    # Retry loop for transaction sending
    for attempt in range(max_retries):
        try:
            # Execute the confirmation transaction
            controller_contract = ControllerContract()

            # Get nonce (manual override or managed)
            if nonce is not None:
                tx_nonce = nonce
            else:
                tx_nonce = NonceManager.get_nonce(acc.address, web3)

            tx = controller_contract.functions.confirmResponse(
                response_address
            ).build_transaction({"from": acc.address, "nonce": tx_nonce})

            signed = web3.eth.account.sign_transaction(tx, private_key)
            tx_hash = web3.eth.send_raw_transaction(signed.raw_transaction)
            receipt = web3.eth.wait_for_transaction_receipt(tx_hash)

            # Success! Increment nonce if we're managing it
            if nonce is None:
                NonceManager.increment_nonce(acc.address, web3)

            if receipt["status"] != 1:
                raise Exception(f"Transaction failed: {tx_hash.hex()}")
            return tx_hash.hex()

        except Exception as e:
            error_msg = str(e).lower()

            # Skip retry logic for contract logic errors (these won't be fixed by retrying)
            if "execution reverted" in error_msg or isinstance(e, ContractLogicError):
                raise Exception(
                    f"Contract execution reverted. Possible reasons: "
                    f"1) Response {response_address} doesn't exist, "
                    f"2) Already confirmed, "
                    f"3) Caller {acc.address} doesn't have permission to confirm, "
                    f"4) Response is in invalid state for confirmation"
                )

            # Check if this is a nonce-related error
            is_nonce_error = any(
                x in error_msg
                for x in ["nonce too low", "known transaction", "already known"]
            )

            # Check if this is a replacement underpriced error
            is_underpriced = "replacement transaction underpriced" in error_msg

            if is_nonce_error:
                print(
                    f"⚠️  Nonce error detected on attempt {attempt + 1}/{max_retries}"
                )

                if auto_fix_nonce and attempt < max_retries - 1:
                    print(f"🔧 Auto-fixing nonce...")
                    from .nonce_utils import fix_nonce

                    fix_nonce(acc.address, private_key)
                    print(f"🔄 Retrying transaction...")
                    continue
                else:
                    raise Exception(
                        f"Nonce error after {attempt + 1} attempts: {e}\n"
                        f"💡 Try calling fix_nonce() to manually resolve this issue."
                    )

            elif is_underpriced:
                print(
                    f"⚠️  Transaction underpriced on attempt {attempt + 1}/{max_retries}"
                )

                if auto_fix_nonce and attempt < max_retries - 1:
                    print(
                        f"⛽ Waiting 5 seconds for gas prices to update and retrying..."
                    )
                    time.sleep(5)
                    NonceManager.reset_nonce(acc.address, web3)
                    continue
                else:
                    raise Exception(
                        f"Transaction underpriced after {attempt + 1} attempts: {e}\n"
                        f"💡 Wait a few seconds and try again, or increase gas price."
                    )

            else:
                # Not a recoverable error, re-raise immediately
                raise

    # Max retries exceeded
    raise Exception(f"Failed to confirm response after {max_retries} attempts")

Purpose: Confirm a response to complete the task and release payment.

Example:

tx_hash = confirm_response(response_address)
print(f"Confirmation transaction: {tx_hash}")


Network Configuration

ogpu.client.chain_config.ChainId

Enum for supported blockchain networks.

Attributes:

Name Type Description
OGPU_MAINNET

Main OpenGPU network (Chain ID: 1071)

OGPU_TESTNET

Test OpenGPU network (Chain ID: 200820172034)

Purpose: Enum defining supported blockchain networks.


ogpu.client.chain_config.ChainConfig.set_chain classmethod

Set the current active chain

Source code in ogpu/client/chain_config.py
@classmethod
def set_chain(cls, chain_id: ChainId) -> None:
    """Set the current active chain"""
    if chain_id not in cls.CHAIN_CONTRACTS:
        raise ValueError(f"Chain {chain_id} is not supported")
    cls._current_chain = chain_id

Purpose: Set the current active blockchain network.

Example:

from ogpu.client import ChainConfig, ChainId

# Set to testnet
ChainConfig.set_chain(ChainId.OGPU_TESTNET)

# Set to mainnet
ChainConfig.set_chain(ChainId.OGPU_MAINNET)


ogpu.client.chain_config.ChainConfig.get_current_chain classmethod

Get the current active chain

Source code in ogpu/client/chain_config.py
@classmethod
def get_current_chain(cls) -> ChainId:
    """Get the current active chain"""
    if cls._current_chain is None:
        raise ValueError("No chain has been set. Call set_chain() first.")
    return cls._current_chain

Purpose: Get the currently active blockchain network.

Example:

current_chain = ChainConfig.get_current_chain()
print(f"Current chain: {current_chain.name}")
print(f"Chain ID: {current_chain.value}")


ogpu.client.chain_config.ChainConfig.get_contract_address classmethod

Get contract address for the current chain

Source code in ogpu/client/chain_config.py
@classmethod
def get_contract_address(cls, contract_name: str) -> str:
    """Get contract address for the current chain"""
    current_chain = cls.get_current_chain()

    if current_chain not in cls.CHAIN_CONTRACTS:
        raise ValueError(f"Chain {current_chain} is not configured")

    chain_contracts = cls.CHAIN_CONTRACTS[current_chain]
    if contract_name not in chain_contracts:
        raise ValueError(
            f"Contract {contract_name} not found for chain {current_chain}"
        )

    return chain_contracts[contract_name]

Purpose: Get contract address for a specific contract on the current chain.

Example:

# Get contract addresses for current chain
nexus_address = ChainConfig.get_contract_address("NEXUS")
controller_address = ChainConfig.get_contract_address("CONTROLLER")
terminal_address = ChainConfig.get_contract_address("TERMINAL")

print(f"NEXUS: {nexus_address}")
print(f"CONTROLLER: {controller_address}")
print(f"TERMINAL: {terminal_address}")


ogpu.client.chain_config.ChainConfig.get_all_supported_chains classmethod

Get list of all supported chains

Source code in ogpu/client/chain_config.py
@classmethod
def get_all_supported_chains(cls) -> list[ChainId]:
    """Get list of all supported chains"""
    return list(cls.CHAIN_CONTRACTS.keys())

Purpose: Get list of all supported blockchain networks.

Example:

supported_chains = ChainConfig.get_all_supported_chains()
for chain in supported_chains:
    print(f"Chain: {chain.name} (ID: {chain.value})")


Nonce Management (v0.2.0.14+)

New in v0.2.0.14

Automatic nonce management with error detection and recovery.

ogpu.client.nonce_utils.fix_nonce

Fix stuck nonce issues by canceling pending transactions.

This function will: 1. Detect pending transactions (transactions stuck in mempool) 2. Cancel them by sending 0 ETH self-transfers with higher gas price 3. Clear SDK's internal nonce cache 4. Return the next available nonce

Parameters:

Name Type Description Default
address Optional[str]

Ethereum address to fix (optional if private_key provided)

None
private_key Optional[str]

Private key for signing cancellation transactions If None, will use CLIENT_PRIVATE_KEY environment variable

None

Returns:

Type Description
int

Next available nonce after fixing

Raises:

Type Description
ValueError

If neither address nor private_key is provided

Example

from ogpu.client import fix_nonce

Fix nonce for current account

next_nonce = fix_nonce() print(f"Ready to send transaction with nonce: {next_nonce}")

Source code in ogpu/client/nonce_utils.py
def fix_nonce(address: Optional[str] = None, private_key: Optional[str] = None) -> int:
    """
    Fix stuck nonce issues by canceling pending transactions.

    This function will:
    1. Detect pending transactions (transactions stuck in mempool)
    2. Cancel them by sending 0 ETH self-transfers with higher gas price
    3. Clear SDK's internal nonce cache
    4. Return the next available nonce

    Args:
        address: Ethereum address to fix (optional if private_key provided)
        private_key: Private key for signing cancellation transactions
                    If None, will use CLIENT_PRIVATE_KEY environment variable

    Returns:
        Next available nonce after fixing

    Raises:
        ValueError: If neither address nor private_key is provided

    Example:
        >>> from ogpu.client import fix_nonce
        >>> # Fix nonce for current account
        >>> next_nonce = fix_nonce()
        >>> print(f"Ready to send transaction with nonce: {next_nonce}")
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)
    if address is None:
        address = acc.address

    web3 = WEB3()
    address = web3.to_checksum_address(address)

    print(f"🔧 Fixing nonce for {address}...")

    # Get current nonce state
    mined_nonce = web3.eth.get_transaction_count(address, "latest")
    pending_nonce = web3.eth.get_transaction_count(address, "pending")

    print(f"   📊 Mined nonce: {mined_nonce}")
    print(f"   📊 Pending nonce: {pending_nonce}")

    if pending_nonce > mined_nonce:
        stuck_count = pending_nonce - mined_nonce
        print(f"   ⚠️  {stuck_count} pending transaction(s) detected!")
        print(f"   🗑️  Attempting to cancel stuck transactions...")

        # Cancel each stuck transaction
        success_count = 0
        for nonce in range(mined_nonce, pending_nonce):
            try:
                tx_hash = _cancel_transaction_with_nonce(
                    address, nonce, private_key, web3
                )
                print(f"      ✅ Cancelled nonce {nonce} (tx: {tx_hash[:10]}...)")
                success_count += 1
                time.sleep(0.5)  # Small delay between cancellations
            except Exception as e:
                print(f"      ⚠️  Could not cancel nonce {nonce}: {e}")

        if success_count > 0:
            print(f"   ✅ Successfully cancelled {success_count} transaction(s)")
            print(f"   ⏳ Waiting 3 seconds for cancellations to propagate...")
            time.sleep(3)
        else:
            print(f"   ⚠️  Could not cancel any transactions automatically")
            print(f"   💡 They may resolve naturally or you may need to wait")
    else:
        print(f"   ✅ No pending transactions found")

    # Clear SDK internal cache
    NonceManager.reset_nonce(address, web3)
    print(f"   🧹 SDK nonce cache cleared")

    # Get fresh nonce
    final_nonce = web3.eth.get_transaction_count(address, "pending")
    print(f"   ✅ Fixed! Next available nonce: {final_nonce}")

    return final_nonce

Purpose: Fix stuck nonce issues by cancelling pending transactions.

Example:

from ogpu.client import fix_nonce

# Fix all stuck transactions
next_nonce = fix_nonce()
print(f"Ready to send with nonce: {next_nonce}")


ogpu.client.nonce_utils.get_nonce_info

Get detailed nonce information for an address.

Parameters:

Name Type Description Default
address Optional[str]

Ethereum address (optional if private_key provided)

None
private_key Optional[str]

Private key to derive address from If None, will use CLIENT_PRIVATE_KEY environment variable

None

Returns:

Type Description
dict

Dictionary containing:

dict
  • address: The address
dict
  • mined_nonce: Number of mined transactions
dict
  • pending_nonce: Number of mined + pending transactions
dict
  • cached_nonce: SDK's cached nonce (None if not cached)
dict
  • has_pending: Whether there are pending transactions
Example

from ogpu.client import get_nonce_info info = get_nonce_info() print(f"Pending transactions: {info['pending_nonce'] - info['mined_nonce']}")

Source code in ogpu/client/nonce_utils.py
def get_nonce_info(
    address: Optional[str] = None, private_key: Optional[str] = None
) -> dict:
    """
    Get detailed nonce information for an address.

    Args:
        address: Ethereum address (optional if private_key provided)
        private_key: Private key to derive address from
                    If None, will use CLIENT_PRIVATE_KEY environment variable

    Returns:
        Dictionary containing:
        - address: The address
        - mined_nonce: Number of mined transactions
        - pending_nonce: Number of mined + pending transactions
        - cached_nonce: SDK's cached nonce (None if not cached)
        - has_pending: Whether there are pending transactions

    Example:
        >>> from ogpu.client import get_nonce_info
        >>> info = get_nonce_info()
        >>> print(f"Pending transactions: {info['pending_nonce'] - info['mined_nonce']}")
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)
    if address is None:
        address = acc.address

    web3 = WEB3()
    address = web3.to_checksum_address(address)

    mined_nonce = web3.eth.get_transaction_count(address, "latest")
    pending_nonce = web3.eth.get_transaction_count(address, "pending")
    cached_nonce = NonceManager.get_cached_nonce(address, web3)

    return {
        "address": address,
        "mined_nonce": mined_nonce,
        "pending_nonce": pending_nonce,
        "cached_nonce": cached_nonce,
        "has_pending": pending_nonce > mined_nonce,
        "pending_count": pending_nonce - mined_nonce,
    }

Purpose: Get detailed nonce information for an address.

Example:

from ogpu.client import get_nonce_info

info = get_nonce_info()
if info['has_pending']:
    print(f"Warning: {info['pending_count']} pending transactions")


ogpu.client.nonce_utils.reset_nonce_cache

Reset the SDK's internal nonce cache without canceling transactions.

This is useful when you want to force the SDK to fetch a fresh nonce from the blockchain without canceling any pending transactions.

Parameters:

Name Type Description Default
address Optional[str]

Ethereum address to reset (optional if private_key provided)

None
private_key Optional[str]

Private key to derive address from If None, will use CLIENT_PRIVATE_KEY environment variable

None
Example

from ogpu.client import reset_nonce_cache reset_nonce_cache() print("Nonce cache cleared")

Source code in ogpu/client/nonce_utils.py
def reset_nonce_cache(
    address: Optional[str] = None, private_key: Optional[str] = None
) -> None:
    """
    Reset the SDK's internal nonce cache without canceling transactions.

    This is useful when you want to force the SDK to fetch a fresh nonce
    from the blockchain without canceling any pending transactions.

    Args:
        address: Ethereum address to reset (optional if private_key provided)
        private_key: Private key to derive address from
                    If None, will use CLIENT_PRIVATE_KEY environment variable

    Example:
        >>> from ogpu.client import reset_nonce_cache
        >>> reset_nonce_cache()
        >>> print("Nonce cache cleared")
    """
    if private_key is None:
        private_key = get_private_key()

    acc = Account.from_key(private_key)
    if address is None:
        address = acc.address

    web3 = WEB3()
    NonceManager.reset_nonce(address, web3)
    print(f"✅ Nonce cache cleared for {address}")

Purpose: Clear SDK nonce cache.

Example:

from ogpu.client import reset_nonce_cache

reset_nonce_cache()
# Next transaction fetches fresh nonce


ogpu.client.nonce_utils.clear_all_nonce_caches

Clear all nonce caches for all addresses.

This is useful for testing or when you want to completely reset the SDK's nonce state.

Example

from ogpu.client import clear_all_nonce_caches clear_all_nonce_caches() print("All nonce caches cleared")

Source code in ogpu/client/nonce_utils.py
def clear_all_nonce_caches() -> None:
    """
    Clear all nonce caches for all addresses.

    This is useful for testing or when you want to completely reset
    the SDK's nonce state.

    Example:
        >>> from ogpu.client import clear_all_nonce_caches
        >>> clear_all_nonce_caches()
        >>> print("All nonce caches cleared")
    """
    NonceManager.clear_all()
    print("✅ All nonce caches cleared")

Purpose: Clear all nonce caches for all addresses.

Example:

from ogpu.client import clear_all_nonce_caches

clear_all_nonce_caches()

Learn More

See the Nonce Management Guide for comprehensive documentation and best practices.


Data Types

ogpu.client.types.ImageEnvironments dataclass

Docker compose file paths for different environments.

Attributes:

Name Type Description
cpu str

URL to CPU-only docker-compose.yml file

nvidia str

URL to NVIDIA GPU docker-compose.yml file

amd str

URL to AMD GPU docker-compose.yml file

Purpose: Docker compose file URLs for different hardware environments.

Example:

image_envs = ImageEnvironments(
    cpu="https://raw.githubusercontent.com/user/repo/main/docker-compose.yml",
    nvidia="https://raw.githubusercontent.com/user/repo/main/docker-compose-gpu.yml",
    amd="https://raw.githubusercontent.com/user/repo/main/docker-compose-amd.yml"
)


ogpu.client.types.DeliveryMethod

Enum for delivery method options.

Attributes:

Name Type Description
MANUAL_CONFIRMATION

Client manually confirms the response

FIRST_RESPONSE

First provider to submit a response wins

Purpose: Enum defining how task responses are delivered and confirmed.


ogpu.client.types.SourceInfo dataclass

User-friendly source information structure.

Attributes:

Name Type Description
name str

Human-readable name for the source

description str

Description of the AI service

logoUrl str

URL to the source logo image

imageEnvs ImageEnvironments

Docker environment configurations

minPayment int

Minimum payment required in wei

minAvailableLockup int

Minimum lockup amount in wei

maxExpiryDuration int

Maximum task duration in seconds

deliveryMethod DeliveryMethod

How responses are delivered

Purpose: Configuration data for publishing AI services as sources.

Example:

source_info = SourceInfo(
    name="sentiment-service",
    description="AI sentiment analysis service",
    logoUrl="https://example.com/logo.png",
    imageEnvs=ImageEnvironments(
        cpu="https://raw.githubusercontent.com/user/repo/main/docker-compose.yml",
        nvidia="https://raw.githubusercontent.com/user/repo/main/docker-compose-gpu.yml"
    ),
    minPayment=Web3.to_wei(0.001, "ether"),
    minAvailableLockup=Web3.to_wei(0.01, "ether"),
    maxExpiryDuration=86400,
    deliveryMethod=DeliveryMethod.MANUAL_CONFIRMATION
)


ogpu.client.types.TaskInput dataclass

Configuration structure for tasks.

Attributes:

Name Type Description
function_name str

Name of the function to call on the source

data BaseModel | dict[str, Any]

Input data for the function (Pydantic model or dictionary)

Purpose: Configuration for task input data and function specification.

Example:

# Using a dictionary
task_input = TaskInput(
    function_name="inference",
    data={"inputs": "Translate to French: Hello"}
)

# Using a Pydantic model
from pydantic import BaseModel

class InferenceData(BaseModel):
    inputs: str
    parameters: dict = {}

task_input = TaskInput(
    function_name="inference", 
    data=InferenceData(inputs="Hello world")
)


ogpu.client.types.TaskInfo dataclass

User-friendly task information structure.

Attributes:

Name Type Description
source str

Address of the source to run the task on

config TaskInput

Task input configuration and function call

expiryTime int

Unix timestamp when task expires

payment int

Payment amount for the task in wei

Purpose: Configuration data for publishing tasks to the network.

Example:

task_info = TaskInfo(
    source="0x1234...",
    config=TaskInput(
        function_name="inference",
        data={"inputs": "What is AI?"}
    ),
    expiryTime=1640995200,
    payment=1000000000000000000  # 1 OGPU in wei
)


ogpu.client.types.Response dataclass

Response data structure for task responses.

Attributes:

Name Type Description
address str

Blockchain address of the response

task str

Address of the task this responds to

provider str

Address of the provider who submitted the response

data str

Response data from the AI service

payment int

Payment amount in wei

status int

Response status code

timestamp int

Unix timestamp when response was submitted

confirmed bool

Whether the response has been confirmed

Purpose: Response data structure from completed tasks.

Example:

# Response object from get_task_responses()
response = Response(
    address="0xabcd...",
    task="0x1234...",
    provider="0x5678...",
    data='{"result": "positive sentiment"}',
    payment=1000000000000000000,
    status=1,
    timestamp=1640995200,
    confirmed=False
)


ogpu.client.types.ConfirmedResponse dataclass

Simplified confirmed response data structure.

Attributes:

Name Type Description
address str

Blockchain address of the confirmed response

data str

The confirmed response data

Purpose: Simplified confirmed response data structure.

Example:

# Confirmed response from get_confirmed_response()
confirmed = ConfirmedResponse(
    address="0xabcd...",
    data='{"result": "positive sentiment", "confidence": 0.95}'
)