Skip to content

API Reference: ogpu.service

Essential API reference for the ogpu.service module - the foundation for building custom AI services on the OpenGPU network.

Custom Source Development

This module is designed for creating custom AI sources. Use ogpu.service to build your own AI services that can be deployed as sources on the OpenGPU network.

🎯 Essential Functions

The ogpu.service module provides three core functions for custom source development:

  • @init() - Initialize AI models and resources during service startup
  • @expose() - Make your AI functions available as network services
  • start() - Launch your service to handle OpenGPU network tasks
  • logger - Built-in logging with Sentry integration for production monitoring

ogpu.service.decorators.init()

Decorator to register a single initialization function that will be executed when the OpenGPU service starts up. Useful for downloading libraries, setting up resources, etc.

Only one init function can be registered per service. If multiple init functions are needed, they should be combined into a single function.

The decorated function should take no arguments and return None.

Source code in ogpu/service/decorators.py
def init():
    """
    Decorator to register a single initialization function that will be executed
    when the OpenGPU service starts up. Useful for downloading libraries,
    setting up resources, etc.

    Only one init function can be registered per service. If multiple init
    functions are needed, they should be combined into a single function.

    The decorated function should take no arguments and return None.
    """

    def decorator(func):
        function_name = func.__name__

        # Validate function signature
        sig = inspect.signature(func)
        parameters = list(sig.parameters.values())
        if len(parameters) != 0:
            raise TypeError(
                f"Init function `{function_name}` must take no arguments (got {len(parameters)})"
            )

        # Register the initialization function (will raise error if one already exists)
        add_init_handler(func)
        logger.info(f"Registered init function: `{function_name}`")

        return func

    return decorator

Purpose: Initialize AI models and expensive resources when your service starts, before handling any tasks.

import ogpu.service
from transformers import AutoTokenizer, AutoModel

# Global variables for loaded models
tokenizer = None
model = None

@ogpu.service.init()
def setup_ai_models():
    """Load AI models during service startup."""
    global tokenizer, model

    try:
        ogpu.service.logger.info("🔄 Loading sentiment analysis model...")

        tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")
        model = AutoModel.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment")

        ogpu.service.logger.info("✅ Models loaded successfully!")

    except Exception as e:
        ogpu.service.logger.error(f"❌ Model loading failed: {e}")
        raise  # Stop service startup if models fail to load

Key Points: - Only one @init() function per service - Runs once when service starts, before handling tasks - Use for loading AI models, connecting to databases, etc. - Always use ogpu.service.logger for startup monitoring

ogpu.service.decorators.expose(timeout: int | None = None)

Decorator to expose user functions as handlers for OpenGPU service. The function's input and output must be Pydantic BaseModel. An optional timeout can be set for background execution.

Parameters:

Name Type Description Default
timeout int

Timeout duration in seconds. If set, the handler will return None if not completed within this time.

None
Source code in ogpu/service/decorators.py
def expose(timeout: int | None = None):
    """
    Decorator to expose user functions as handlers for OpenGPU service.
    The function's input and output must be Pydantic BaseModel.
    An optional timeout can be set for background execution.

    Args:
        timeout (int, optional): Timeout duration in seconds. If set, the handler will return None if not completed within this time.
    """

    def decorator(func):
        function_name = func.__name__
        # Check for unique handler names
        existing_names = [f.__name__ for f, _, _ in get_handlers()]
        if function_name in existing_names:
            raise ValueError(
                f"A handler named `{function_name}` is already registered."
            )

        sig = inspect.signature(func)
        parameters = list(sig.parameters.values())
        if len(parameters) != 1:
            raise TypeError(
                f"Function `{function_name}` must take exactly ONE input argument (got {len(parameters)})"
            )

        hints = get_type_hints(func)
        if "return" not in hints:
            raise TypeError(f"Function `{function_name}` must have a return type.")

        input_model = hints[parameters[0].name]
        output_model = hints["return"]

        # Check if input and output types are subclasses of Pydantic BaseModel
        if not (inspect.isclass(input_model) and issubclass(input_model, BaseModel)):
            raise TypeError(
                f"Input to `{function_name}` must be a subclass of pydantic.BaseModel"
            )

        if not (inspect.isclass(output_model) and issubclass(output_model, BaseModel)):
            raise TypeError(
                f"Return type of `{function_name}` must be a subclass of pydantic.BaseModel"
            )

        if timeout:

            @wraps(func)
            def timed_handler(data):
                """
                Handler that runs with a timeout. Returns None if not completed in time.
                """
                result = [None]
                done = threading.Event()

                def run():
                    try:
                        result[0] = func(data)
                    except Exception as e:
                        logger.task_fail(f"Exception in `{function_name}`: {e}")  # type: ignore
                    finally:
                        done.set()

                thread = threading.Thread(target=run)
                thread.start()
                finished = done.wait(timeout)

                if not finished:
                    logger.task_timeout(  # type: ignore
                        f"`{function_name}` timed out after {timeout} seconds"
                    )
                    # Do not wait for the result, just return None immediately
                    return None

                return result[0]

            add_handler(timed_handler, input_model, output_model)
            return func

        # If no timeout, add handler directly
        add_handler(func, input_model, output_model)
        return func

    return decorator

Purpose: Transform your AI functions into network-accessible services that can receive tasks from the OpenGPU network.

from pydantic import BaseModel
import ogpu.service

class TextInput(BaseModel):
    text: str
    model_name: str = "default"

class SentimentOutput(BaseModel):
    sentiment: str
    confidence: float

@ogpu.service.expose()
def analyze_sentiment(data: TextInput) -> SentimentOutput:
    """Custom sentiment analysis service."""
    ogpu.service.logger.info(f"📝 Processing text: '{data.text[:30]}...'")

    try:
        # Your AI model logic here (model loaded from @init())
        result = custom_sentiment_model.predict(data.text, data.model_name)

        ogpu.service.logger.info("✅ Sentiment analysis completed")
        return SentimentOutput(
            sentiment=result.label,
            confidence=result.score
        )

    except Exception as e:
        ogpu.service.logger.error(f"❌ Inference failed: {e}", extra={
            "input_text": data.text[:100],
            "model_name": data.model_name
        })
        raise

@ogpu.service.expose(timeout=300)  # 5 minutes for heavy models
def generate_image(data: ImagePrompt) -> ImageOutput:
    """Custom image generation service."""
    # Heavy model inference
    image_data = custom_diffusion_model.generate(data.prompt)
    return ImageOutput(image_base64=image_data)

Key Requirements: - Input/output types must be Pydantic BaseModel subclasses - Function names become API endpoints (e.g., analyze_sentiment/run/analyze_sentiment/...) - Always use ogpu.service.logger for task monitoring

ogpu.service.server.start()

Serves registered handler functions as HTTP endpoints using FastAPI. Creates a /run/{function}/{task_address} endpoint for each handler.

Source code in ogpu/service/server.py
def start():
    """
    Serves registered handler functions as HTTP endpoints using FastAPI.
    Creates a /run/{function}/{task_address} endpoint for each handler.
    """
    logger.info("Starting OpenGPU Service server...")

    app = FastAPI(title="OpenGPU Service", version="0.1.0", lifespan=lifespan)

    def create_endpoint(handler, input_model, function_name):
        """
        Dynamically generates an endpoint function for each handler.
        """

        async def endpoint(
            task_address: str, data: input_model, background_tasks: BackgroundTasks  # type: ignore
        ):
            """
            Runs the handler in the background when an HTTP request is received.
            """

            def runner():
                try:
                    result = handler(data)
                    if result:

                        logger.task_success(  # type: ignore
                            f"[{task_address}] Function: `{function_name}`, Result → "
                            + ", ".join(
                                [f"{k}={v}" for k, v in result.model_dump().items()]
                            )
                        )

                        send_callback(task_address, result.model_dump())

                except Exception as e:
                    logger.task_fail(  # type: ignore
                        f"[{task_address}] Error in `{function_name}`: {e}"
                    )

            background_tasks.add_task(runner)
            return {"task_address": task_address, "status": "accepted"}

        return endpoint

    # Create endpoints for all registered handlers
    for handler, input_model, _output_model in get_handlers():
        function_name = handler.__name__
        path = f"/run/{function_name}/{{task_address}}"

        endpoint = create_endpoint(handler, input_model, function_name)
        app.post(path, status_code=202)(endpoint)
        logger.info(f"Registered endpoint → /run/{function_name}/{{task_address}}")

    # Start FastAPI server
    uvicorn.run(app, host=SERVICE_HOST, port=SERVICE_PORT, log_level="warning")

Purpose: Start your custom AI service server to handle OpenGPU network tasks.

#!/usr/bin/env python3
"""Complete custom service example."""

import ogpu.service
from pydantic import BaseModel
from transformers import pipeline

class SentimentRequest(BaseModel):
    text: str

class SentimentResponse(BaseModel):
    label: str
    score: float

classifier = None

@ogpu.service.init()
def load_model():
    """Load sentiment analysis model on startup."""
    global classifier
    ogpu.service.logger.info("🔄 Starting model initialization...")

    try:
        classifier = pipeline("sentiment-analysis", 
                             model="cardiffnlp/twitter-roberta-base-sentiment-latest")
        ogpu.service.logger.info("✅ Sentiment model loaded successfully!")

    except Exception as e:
        ogpu.service.logger.error(f"❌ Failed to load model: {e}")
        raise  # Service won't start if model loading fails

@ogpu.service.expose(timeout=60)
def analyze_sentiment(data: SentimentRequest) -> SentimentResponse:
    """Analyze sentiment of input text."""
    ogpu.service.logger.info(f"📝 Analyzing text: '{data.text[:50]}{'...' if len(data.text) > 50 else ''}'")

    try:
        result = classifier(data.text)[0]

        ogpu.service.logger.info(f"✅ Analysis complete: {result['label']} (confidence: {result['score']:.3f})")

        return SentimentResponse(
            label=result['label'],
            score=result['score']
        )

    except Exception as e:
        # This error will appear in your Sentry dashboard!
        ogpu.service.logger.error(f"❌ Sentiment analysis failed: {e}", extra={
            "input_text": data.text[:100],
            "input_length": len(data.text),
            "model": "twitter-roberta-base-sentiment-latest"
        })
        raise  # Re-raise to return error to client

if __name__ == "__main__":
    ogpu.service.logger.info("🚀 Starting sentiment analysis service...")
    ogpu.service.start()

Generated Endpoints:

  • POST /run/{function_name}/{task_address} - Your exposed functions
  • GET /docs - Interactive API documentation (Swagger UI)

logger - Sentry Integration

Purpose: Built-in logging with automatic Sentry integration for production monitoring and error tracking.

import ogpu.service

# Basic logging
ogpu.service.logger.info("Model loaded successfully")
ogpu.service.logger.warning("Low confidence prediction")
ogpu.service.logger.error("Model inference failed")

# Detailed logging with context
ogpu.service.logger.error(f"Failed to process input: {data}", extra={"task_id": task_id})

Sentry Integration Benefits

Why use ogpu.service.logger?

  • Error Tracking: Failed tasks automatically appear in your Sentry dashboard
  • Performance Monitoring: Track inference times and bottlenecks
  • Real-time Alerts: Get notified when your service has issues
  • Context: See exactly what input caused failures
  • Production Ready: Built-in integration with OpenGPU monitoring

Best Practice Example:

@ogpu.service.expose(timeout=60)
def analyze_sentiment(data: SentimentRequest) -> SentimentResponse:
    """Analyze sentiment with proper logging."""
    try:
        ogpu.service.logger.info(f"Analyzing text: {data.text[:50]}...")

        # Your model inference
        result = classifier(data.text)[0]

        ogpu.service.logger.info(f"Analysis complete: {result['label']} ({result['score']:.3f})")

        return SentimentResponse(label=result['label'], score=result['score'])

    except Exception as e:
        # This error will appear in Sentry dashboard!
        ogpu.service.logger.error(f"Sentiment analysis failed: {e}", extra={
            "input_text": data.text[:100],
            "model_name": "sentiment-classifier"
        })
        raise