Tuesday, July 08, 2025

Integrating Large Language Models in Applications


Introduction


Large Language Model integration has become a fundamental requirement for modern applications seeking to provide intelligent, conversational, and content-generation capabilities. When we talk about LLM integration, we are referring to the process of connecting your application to powerful language models through APIs, enabling your software to leverage natural language processing capabilities without hosting the models locally. This integration allows developers to build applications that can understand, generate, and manipulate human-like text with remarkable sophistication.


The importance of proper LLM integration cannot be overstated in today’s software landscape. Applications across industries are incorporating these capabilities to provide better user experiences, automate content creation, and enable natural language interfaces. However, integrating an LLM is not simply about making API calls. It involves understanding the nuances of prompt engineering, managing conversational context, handling asynchronous responses, implementing robust error handling, and optimizing for both performance and cost.


Understanding LLM APIs and Service Providers


Before diving into implementation details, it is crucial to understand the landscape of LLM service providers and their API architectures. The major providers include OpenAI with their GPT models, Anthropic with Claude, Google with their PaLM and Gemini models, and various other providers offering both commercial and open-source alternatives. Each provider has its own API design, authentication methods, rate limits, and pricing structures.


Most LLM APIs follow a RESTful architecture where you send HTTP POST requests containing your prompt and configuration parameters, and receive responses containing the generated text. However, the specific request and response formats can vary significantly between providers. Understanding these differences is essential for building flexible integrations that can potentially work with multiple providers or switch between them based on requirements.


The typical LLM API request contains several key components: the actual prompt or message content, model selection parameters, generation configuration such as temperature and maximum tokens, and often system messages that provide context or instructions to the model. The response usually includes the generated text, metadata about the generation process, and usage statistics for billing purposes.


Authentication and API Key Management


Proper authentication is the foundation of any LLM integration. Most providers use API key-based authentication, where you include your API key in the request headers. The management of these API keys requires careful consideration of security practices, especially in production environments.


Here is a Python example demonstrating secure API key management using environment variables:


This code example shows how to properly handle API key authentication in a production environment. The get_api_key function first attempts to retrieve the API key from environment variables, which is the recommended approach for production deployments. If no environment variable is found, it falls back to reading from a local configuration file, which might be useful during development. The function includes proper error handling to ensure that missing API keys are caught early in the application lifecycle rather than causing runtime failures when making API calls.



import os

import json

from typing import Optional


def get_api_key(provider: str) -> str:

    """

    Retrieve API key for the specified LLM provider from environment variables

    or configuration files with proper error handling.

    """

    env_var_name = f"{provider.upper()}_API_KEY"

    api_key = os.getenv(env_var_name)

    

    if not api_key:

        # Fallback to config file for development environments

        try:

            with open('config.json', 'r') as f:

                config = json.load(f)

                api_key = config.get('api_keys', {}).get(provider)

        except FileNotFoundError:

            pass

    

    if not api_key:

        raise ValueError(f"API key for {provider} not found in environment or config")

    

    return api_key


class APIKeyManager:

    """

    Centralized API key management with rotation support

    """

    def __init__(self):

        self._keys = {}

        self._load_keys()

    

    def _load_keys(self):

        """Load API keys from secure storage"""

        providers = ['openai', 'anthropic', 'google']

        for provider in providers:

            try:

                self._keys[provider] = get_api_key(provider)

            except ValueError:

                # Log warning but continue - not all providers may be configured

                print(f"Warning: No API key configured for {provider}")

    

    def get_key(self, provider: str) -> Optional[str]:

        """Get API key for specified provider"""

        return self._keys.get(provider)

    

    def rotate_key(self, provider: str, new_key: str):

        """Rotate API key for specified provider"""

        self._keys[provider] = new_key



The APIKeyManager class provides a centralized approach to managing multiple API keys across different LLM providers. This design pattern becomes especially important in applications that need to work with multiple LLM services or implement fallback mechanisms. The class includes a key rotation method, which is essential for production environments where API keys need to be rotated periodically for security purposes.


Basic Integration Patterns and Architecture


When designing LLM integrations, there are several architectural patterns to consider. The most straightforward approach is direct API calls from your application code, but more sophisticated applications often benefit from implementing an abstraction layer that can work with multiple LLM providers.


The following code example demonstrates a comprehensive LLM client architecture that abstracts away provider-specific details:


This implementation showcases a provider-agnostic architecture that allows your application to work with different LLM services through a consistent interface. The LLMClient base class defines the contract that all provider implementations must follow, ensuring consistency across different services. The OpenAIClient implementation shows how to handle provider-specific request formatting, authentication, and response parsing while maintaining the common interface.



import httpx

import asyncio

from abc import ABC, abstractmethod

from typing import Dict, Any, Optional, AsyncGenerator

from dataclasses import dataclass


@dataclass

class LLMRequest:

    """Standardized request format for LLM interactions"""

    prompt: str

    model: str

    max_tokens: Optional[int] = 1000

    temperature: float = 0.7

    system_message: Optional[str] = None

    conversation_id: Optional[str] = None


@dataclass

class LLMResponse:

    """Standardized response format from LLM providers"""

    content: str

    model: str

    usage: Dict[str, int]

    finish_reason: str

    conversation_id: Optional[str] = None


class LLMClient(ABC):

    """Abstract base class for LLM provider clients"""

    

    def __init__(self, api_key: str, base_url: str):

        self.api_key = api_key

        self.base_url = base_url

        self.client = httpx.AsyncClient(timeout=30.0)

    

    @abstractmethod

    async def generate(self, request: LLMRequest) -> LLMResponse:

        """Generate response from LLM"""

        pass

    

    @abstractmethod

    async def stream_generate(self, request: LLMRequest) -> AsyncGenerator[str, None]:

        """Generate streaming response from LLM"""

        pass

    

    async def close(self):

        """Clean up HTTP client resources"""

        await self.client.aclose()


class OpenAIClient(LLMClient):

    """OpenAI-specific implementation of LLM client"""

    

    def __init__(self, api_key: str):

        super().__init__(api_key, "https://api.openai.com/v1")

    

    def _build_headers(self) -> Dict[str, str]:

        """Build authentication headers for OpenAI API"""

        return {

            "Authorization": f"Bearer {self.api_key}",

            "Content-Type": "application/json"

        }

    

    def _build_payload(self, request: LLMRequest) -> Dict[str, Any]:

        """Convert standardized request to OpenAI API format"""

        messages = []

        

        if request.system_message:

            messages.append({"role": "system", "content": request.system_message})

        

        messages.append({"role": "user", "content": request.prompt})

        

        payload = {

            "model": request.model,

            "messages": messages,

            "max_tokens": request.max_tokens,

            "temperature": request.temperature

        }

        

        return payload

    

    async def generate(self, request: LLMRequest) -> LLMResponse:

        """Generate non-streaming response from OpenAI"""

        headers = self._build_headers()

        payload = self._build_payload(request)

        

        response = await self.client.post(

            f"{self.base_url}/chat/completions",

            headers=headers,

            json=payload

        )

        

        if response.status_code != 200:

            raise Exception(f"API request failed: {response.status_code} - {response.text}")

        

        data = response.json()

        choice = data["choices"][0]

        

        return LLMResponse(

            content=choice["message"]["content"],

            model=data["model"],

            usage=data["usage"],

            finish_reason=choice["finish_reason"],

            conversation_id=request.conversation_id

        )

    

    async def stream_generate(self, request: LLMRequest) -> AsyncGenerator[str, None]:

        """Generate streaming response from OpenAI"""

        headers = self._build_headers()

        payload = self._build_payload(request)

        payload["stream"] = True

        

        async with self.client.stream(

            "POST",

            f"{self.base_url}/chat/completions",

            headers=headers,

            json=payload

        ) as response:

            async for line in response.aiter_lines():

                if line.startswith("data: "):

                    data_str = line[6:]

                    if data_str == "[DONE]":

                        break

                    try:

                        data = json.loads(data_str)

                        if "choices" in data and data["choices"]:

                            delta = data["choices"][0].get("delta", {})

                            if "content" in delta:

                                yield delta["content"]

                    except json.JSONDecodeError:

                        continue



HTTP Client Configuration and Request Handling


Proper HTTP client configuration is crucial for reliable LLM integrations. LLM APIs often have specific requirements regarding timeouts, retry behavior, and connection pooling. The requests to LLM APIs can take several seconds or even minutes for complex generations, so your HTTP client must be configured to handle these longer response times appropriately.


Connection pooling becomes particularly important when your application makes frequent LLM API calls. Reusing HTTP connections reduces the overhead of establishing new connections for each request, which can significantly improve performance. Additionally, implementing proper timeout handling ensures that your application does not hang indefinitely waiting for responses from LLM services.


Here is an example of a robust HTTP client configuration specifically designed for LLM API interactions:


This configuration example demonstrates several important considerations for LLM API clients. The timeout configuration uses different values for connection and read timeouts, acknowledging that LLM responses can take significantly longer than typical API responses. The retry configuration implements exponential backoff to handle temporary service issues gracefully. The connection limits ensure efficient resource usage while preventing overwhelming the LLM service with too many concurrent requests.



import httpx

import asyncio

from typing import Optional

import time

import random


class LLMHTTPClient:

    """

    Specialized HTTP client for LLM API interactions with proper

    timeout, retry, and connection management

    """

    

    def __init__(self, max_connections: int = 10, max_keepalive: int = 5):

        # Configure timeouts specifically for LLM APIs

        # Connection timeout: time to establish connection

        # Read timeout: time to receive response (LLMs can be slow)

        timeout_config = httpx.Timeout(

            connect=10.0,  # Connection establishment

            read=120.0,    # Reading response (LLMs need time)

            write=10.0,    # Writing request

            pool=120.0     # Pool acquisition

        )

        

        # Configure connection limits

        limits = httpx.Limits(

            max_connections=max_connections,

            max_keepalive_connections=max_keepalive

        )

        

        self.client = httpx.AsyncClient(

            timeout=timeout_config,

            limits=limits,

            http2=True  # Enable HTTP/2 for better performance

        )

        

        # Retry configuration

        self.max_retries = 3

        self.retry_delay_base = 1.0

        self.retry_status_codes = {429, 502, 503, 504}

    

    async def request_with_retry(

        self,

        method: str,

        url: str,

        headers: dict,

        json_data: dict,

        retry_count: int = 0

    ) -> httpx.Response:

        """

        Make HTTP request with exponential backoff retry logic

        """

        try:

            response = await self.client.request(

                method=method,

                url=url,

                headers=headers,

                json=json_data

            )

            

            # Check if we should retry based on status code

            if (response.status_code in self.retry_status_codes and 

                retry_count < self.max_retries):

                

                # Calculate delay with exponential backoff and jitter

                delay = self.retry_delay_base * (2 ** retry_count)

                jitter = random.uniform(0, 0.1 * delay)

                total_delay = delay + jitter

                

                print(f"Retrying request after {total_delay:.2f}s (attempt {retry_count + 1})")

                await asyncio.sleep(total_delay)

                

                return await self.request_with_retry(

                    method, url, headers, json_data, retry_count + 1

                )

            

            return response

            

        except httpx.TimeoutException as e:

            if retry_count < self.max_retries:

                delay = self.retry_delay_base * (2 ** retry_count)

                await asyncio.sleep(delay)

                return await self.request_with_retry(

                    method, url, headers, json_data, retry_count + 1

                )

            raise e

        

        except httpx.ConnectError as e:

            if retry_count < self.max_retries:

                delay = self.retry_delay_base * (2 ** retry_count)

                await asyncio.sleep(delay)

                return await self.request_with_retry(

                    method, url, headers, json_data, retry_count + 1

                )

            raise e

    

    async def close(self):

        """Clean up client resources"""

        await self.client.aclose()



Response Processing and Data Parsing


Processing responses from LLM APIs requires careful attention to the various data formats and potential edge cases that can occur. Different providers return data in different structures, and even within a single provider, the response format can vary based on the type of request made. Proper response processing involves not only parsing the JSON data but also handling cases where the response might be incomplete, contain errors, or include unexpected fields.


The following example demonstrates comprehensive response processing that handles various scenarios:


This response processing implementation shows how to handle the complexity of LLM API responses robustly. The ResponseProcessor class includes validation methods that check for common issues like missing required fields, unexpected data types, and incomplete responses. The error detection logic identifies various types of API errors and content filtering issues that can occur with LLM services. The content extraction method demonstrates how to safely access nested data structures while providing meaningful error messages when problems occur.



import json

from typing import Dict, Any, Optional, Union, List

from dataclasses import dataclass

from enum import Enum


class ResponseStatus(Enum):

    SUCCESS = "success"

    PARTIAL = "partial"

    ERROR = "error"

    RATE_LIMITED = "rate_limited"

    CONTENT_FILTERED = "content_filtered"


@dataclass

class ProcessedResponse:

    """Processed and validated LLM response"""

    status: ResponseStatus

    content: Optional[str]

    usage: Optional[Dict[str, int]]

    model: Optional[str]

    finish_reason: Optional[str]

    error_message: Optional[str] = None

    raw_response: Optional[Dict[str, Any]] = None


class ResponseProcessor:

    """

    Handles parsing and validation of LLM API responses

    across different providers with comprehensive error handling

    """

    

    def __init__(self, provider: str):

        self.provider = provider

        self.required_fields = self._get_required_fields(provider)

    

    def _get_required_fields(self, provider: str) -> List[str]:

        """Define required fields for each provider"""

        field_mapping = {

            "openai": ["choices", "usage", "model"],

            "anthropic": ["content", "usage", "model"],

            "google": ["candidates", "usageMetadata"]

        }

        return field_mapping.get(provider, [])

    

    def process_response(self, response: httpx.Response) -> ProcessedResponse:

        """

        Process raw HTTP response into standardized format

        """

        # First, handle HTTP-level errors

        if response.status_code == 429:

            return ProcessedResponse(

                status=ResponseStatus.RATE_LIMITED,

                content=None,

                usage=None,

                model=None,

                finish_reason=None,

                error_message="Rate limit exceeded"

            )

        

        if response.status_code >= 400:

            error_msg = f"HTTP {response.status_code}: {response.text}"

            return ProcessedResponse(

                status=ResponseStatus.ERROR,

                content=None,

                usage=None,

                model=None,

                finish_reason=None,

                error_message=error_msg

            )

        

        # Parse JSON response

        try:

            data = response.json()

        except json.JSONDecodeError as e:

            return ProcessedResponse(

                status=ResponseStatus.ERROR,

                content=None,

                usage=None,

                model=None,

                finish_reason=None,

                error_message=f"Invalid JSON response: {str(e)}"

            )

        

        # Validate response structure

        validation_result = self._validate_response_structure(data)

        if not validation_result.is_valid:

            return ProcessedResponse(

                status=ResponseStatus.ERROR,

                content=None,

                usage=None,

                model=None,

                finish_reason=None,

                error_message=validation_result.error_message,

                raw_response=data

            )

        

        # Check for API-level errors

        error_check = self._check_for_errors(data)

        if error_check.has_error:

            return ProcessedResponse(

                status=ResponseStatus.ERROR,

                content=None,

                usage=None,

                model=None,

                finish_reason=None,

                error_message=error_check.error_message,

                raw_response=data

            )

        

        # Extract content based on provider format

        content_result = self._extract_content(data)

        

        # Determine final status

        status = self._determine_status(data, content_result)

        

        return ProcessedResponse(

            status=status,

            content=content_result.content,

            usage=self._extract_usage(data),

            model=data.get("model"),

            finish_reason=content_result.finish_reason,

            raw_response=data

        )

    

    def _validate_response_structure(self, data: Dict[str, Any]):

        """Validate that response contains required fields"""

        class ValidationResult:

            def __init__(self, is_valid: bool, error_message: str = None):

                self.is_valid = is_valid

                self.error_message = error_message

        

        if not isinstance(data, dict):

            return ValidationResult(False, "Response is not a valid JSON object")

        

        missing_fields = []

        for field in self.required_fields:

            if field not in data:

                missing_fields.append(field)

        

        if missing_fields:

            error_msg = f"Missing required fields: {', '.join(missing_fields)}"

            return ValidationResult(False, error_msg)

        

        return ValidationResult(True)

    

    def _check_for_errors(self, data: Dict[str, Any]):

        """Check for API-level errors in response"""

        class ErrorCheck:

            def __init__(self, has_error: bool, error_message: str = None):

                self.has_error = has_error

                self.error_message = error_message

        

        # Check for explicit error field

        if "error" in data:

            error_info = data["error"]

            if isinstance(error_info, dict):

                message = error_info.get("message", "Unknown API error")

                error_type = error_info.get("type", "unknown")

                return ErrorCheck(True, f"{error_type}: {message}")

            else:

                return ErrorCheck(True, str(error_info))

        

        # Check for content filtering (provider-specific)

        if self.provider == "openai":

            choices = data.get("choices", [])

            if choices and choices[0].get("finish_reason") == "content_filter":

                return ErrorCheck(True, "Content was filtered by safety systems")

        

        return ErrorCheck(False)

    

    def _extract_content(self, data: Dict[str, Any]):

        """Extract generated content based on provider format"""

        class ContentResult:

            def __init__(self, content: str = None, finish_reason: str = None):

                self.content = content

                self.finish_reason = finish_reason

        

        if self.provider == "openai":

            choices = data.get("choices", [])

            if not choices:

                return ContentResult()

            

            choice = choices[0]

            message = choice.get("message", {})

            content = message.get("content", "")

            finish_reason = choice.get("finish_reason")

            

            return ContentResult(content, finish_reason)

        

        elif self.provider == "anthropic":

            content = data.get("content", [])

            if not content:

                return ContentResult()

            

            # Anthropic returns content as array of objects

            text_content = ""

            for item in content:

                if item.get("type") == "text":

                    text_content += item.get("text", "")

            

            return ContentResult(text_content, data.get("stop_reason"))

        

        # Add other provider implementations as needed

        return ContentResult()

    

    def _extract_usage(self, data: Dict[str, Any]) -> Optional[Dict[str, int]]:

        """Extract token usage information"""

        if self.provider == "openai":

            return data.get("usage", {})

        elif self.provider == "anthropic":

            return data.get("usage", {})

        return None

    

    def _determine_status(self, data: Dict[str, Any], content_result) -> ResponseStatus:

        """Determine the overall status of the response"""

        if not content_result.content:

            return ResponseStatus.ERROR

        

        # Check for partial/incomplete responses

        if content_result.finish_reason in ["length", "max_tokens"]:

            return ResponseStatus.PARTIAL

        

        if content_result.finish_reason == "content_filter":

            return ResponseStatus.CONTENT_FILTERED

        

        return ResponseStatus.SUCCESS



Error Handling and Retry Mechanisms


Robust error handling is essential for production LLM integrations because these services can experience various types of failures. Network issues, rate limiting, temporary service outages, and model overload can all cause requests to fail. A well-designed error handling system should distinguish between different types of errors and respond appropriately to each type.


Transient errors such as network timeouts or temporary service unavailability should trigger automatic retries with exponential backoff. Rate limiting errors require a different approach, often involving longer delays or queue-based request management. Permanent errors like authentication failures or invalid requests should not be retried but should be logged and reported immediately.


The following implementation demonstrates a comprehensive error handling and retry system:


This error handling implementation provides a robust foundation for dealing with the various failure modes of LLM APIs. The ErrorHandler class categorizes different types of errors and applies appropriate retry strategies for each category. The exponential backoff algorithm includes jitter to prevent thundering herd problems when multiple clients retry simultaneously. The circuit breaker pattern helps protect your application from cascading failures when an LLM service is experiencing extended outages.



import asyncio

import time

import logging

from typing import Optional, Callable, Any

from enum import Enum

from dataclasses import dataclass

import random


class ErrorType(Enum):

    TRANSIENT = "transient"

    RATE_LIMIT = "rate_limit"

    AUTHENTICATION = "authentication"

    VALIDATION = "validation"

    SERVICE_UNAVAILABLE = "service_unavailable"

    PERMANENT = "permanent"


@dataclass

class ErrorContext:

    """Context information for error handling decisions"""

    error_type: ErrorType

    status_code: Optional[int]

    error_message: str

    retry_after: Optional[int] = None

    attempt_count: int = 0


class CircuitBreaker:

    """

    Circuit breaker pattern implementation for LLM API calls

    to prevent cascading failures during service outages

    """

    

    def __init__(self, failure_threshold: int = 5, timeout: int = 60):

        self.failure_threshold = failure_threshold

        self.timeout = timeout

        self.failure_count = 0

        self.last_failure_time = None

        self.state = "closed"  # closed, open, half-open

    

    def can_execute(self) -> bool:

        """Check if requests can be executed based on circuit state"""

        if self.state == "closed":

            return True

        

        if self.state == "open":

            if time.time() - self.last_failure_time > self.timeout:

                self.state = "half-open"

                return True

            return False

        

        # half-open state - allow single request to test service

        return True

    

    def record_success(self):

        """Record successful request"""

        self.failure_count = 0

        self.state = "closed"

    

    def record_failure(self):

        """Record failed request"""

        self.failure_count += 1

        self.last_failure_time = time.time()

        

        if self.failure_count >= self.failure_threshold:

            self.state = "open"


class ErrorHandler:

    """

    Comprehensive error handling for LLM API interactions

    with retry logic and circuit breaker protection

    """

    

    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):

        self.max_retries = max_retries

        self.base_delay = base_delay

        self.circuit_breaker = CircuitBreaker()

        self.logger = logging.getLogger(__name__)

    

    def classify_error(self, response: httpx.Response, exception: Exception = None) -> ErrorContext:

        """

        Classify error type based on response or exception

        to determine appropriate handling strategy

        """

        if exception:

            if isinstance(exception, httpx.TimeoutException):

                return ErrorContext(

                    error_type=ErrorType.TRANSIENT,

                    status_code=None,

                    error_message=f"Request timeout: {str(exception)}"

                )

            elif isinstance(exception, httpx.ConnectError):

                return ErrorContext(

                    error_type=ErrorType.SERVICE_UNAVAILABLE,

                    status_code=None,

                    error_message=f"Connection error: {str(exception)}"

                )

            else:

                return ErrorContext(

                    error_type=ErrorType.PERMANENT,

                    status_code=None,

                    error_message=f"Unexpected error: {str(exception)}"

                )

        

        # Classify based on HTTP status code

        status_code = response.status_code

        

        if status_code == 401:

            return ErrorContext(

                error_type=ErrorType.AUTHENTICATION,

                status_code=status_code,

                error_message="Authentication failed - check API key"

            )

        

        elif status_code == 429:

            retry_after = None

            retry_header = response.headers.get("retry-after")

            if retry_header:

                try:

                    retry_after = int(retry_header)

                except ValueError:

                    pass

            

            return ErrorContext(

                error_type=ErrorType.RATE_LIMIT,

                status_code=status_code,

                error_message="Rate limit exceeded",

                retry_after=retry_after

            )

        

        elif status_code in [400, 422]:

            return ErrorContext(

                error_type=ErrorType.VALIDATION,

                status_code=status_code,

                error_message="Invalid request parameters"

            )

        

        elif status_code in [502, 503, 504]:

            return ErrorContext(

                error_type=ErrorType.SERVICE_UNAVAILABLE,

                status_code=status_code,

                error_message="Service temporarily unavailable"

            )

        

        elif status_code >= 500:

            return ErrorContext(

                error_type=ErrorType.TRANSIENT,

                status_code=status_code,

                error_message="Server error - may be transient"

            )

        

        else:

            return ErrorContext(

                error_type=ErrorType.PERMANENT,

                status_code=status_code,

                error_message=f"Unexpected status code: {status_code}"

            )

    

    def should_retry(self, error_context: ErrorContext) -> bool:

        """

        Determine if request should be retried based on error type

        and current attempt count

        """

        if error_context.attempt_count >= self.max_retries:

            return False

        

        # Never retry certain error types

        if error_context.error_type in [ErrorType.AUTHENTICATION, ErrorType.VALIDATION]:

            return False

        

        # Always retry transient errors and service unavailability

        if error_context.error_type in [ErrorType.TRANSIENT, ErrorType.SERVICE_UNAVAILABLE]:

            return True

        

        # Retry rate limits with caution

        if error_context.error_type == ErrorType.RATE_LIMIT:

            return True

        

        return False

    

    def calculate_retry_delay(self, error_context: ErrorContext) -> float:

        """

        Calculate delay before retry using exponential backoff

        with jitter to prevent thundering herd

        """

        if error_context.error_type == ErrorType.RATE_LIMIT and error_context.retry_after:

            # Respect rate limit headers when provided

            return float(error_context.retry_after)

        

        # Exponential backoff with jitter

        base_delay = self.base_delay * (2 ** error_context.attempt_count)

        jitter = random.uniform(0, 0.1 * base_delay)

        

        # Cap maximum delay at 60 seconds

        return min(base_delay + jitter, 60.0)

    

    async def execute_with_retry(self, request_func: Callable, *args, **kwargs) -> Any:

        """

        Execute request function with comprehensive error handling and retry logic

        """

        if not self.circuit_breaker.can_execute():

            raise Exception("Circuit breaker is open - service unavailable")

        

        last_error_context = None

        

        for attempt in range(self.max_retries + 1):

            try:

                result = await request_func(*args, **kwargs)

                self.circuit_breaker.record_success()

                return result

                

            except Exception as e:

                # Handle httpx.Response if available in exception

                response = getattr(e, 'response', None)

                error_context = self.classify_error(response, e)

                error_context.attempt_count = attempt

                last_error_context = error_context

                

                self.logger.warning(

                    f"Request failed (attempt {attempt + 1}): {error_context.error_message}"

                )

                

                # Record failure for circuit breaker

                self.circuit_breaker.record_failure()

                

                # Check if we should retry

                if not self.should_retry(error_context):

                    self.logger.error(f"Not retrying due to error type: {error_context.error_type}")

                    raise e

                

                if attempt < self.max_retries:

                    delay = self.calculate_retry_delay(error_context)

                    self.logger.info(f"Retrying in {delay:.2f} seconds...")

                    await asyncio.sleep(delay)

                else:

                    self.logger.error("Max retries exceeded")

                    raise e

        

        # This should never be reached, but just in case

        if last_error_context:

            raise Exception(f"Request failed after {self.max_retries} retries: {last_error_context.error_message}")



Implementing Streaming Responses


Streaming responses are crucial for creating responsive user experiences when working with LLMs. Instead of waiting for the entire response to be generated before displaying anything to the user, streaming allows you to show partial results as they become available. This approach significantly improves perceived performance and enables real-time interaction with the LLM.


Implementing streaming requires handling Server-Sent Events (SSE) or similar streaming protocols. Each LLM provider has its own streaming format, but they generally send partial responses as separate events that need to be parsed and accumulated. Proper streaming implementation must handle connection interruptions, partial messages, and graceful cleanup of resources.


Here is a comprehensive streaming implementation that handles the complexities of real-time LLM responses:


This streaming implementation demonstrates the intricacies of handling real-time LLM responses. The StreamingClient class manages the SSE connection and provides a clean async generator interface for consuming streaming data. The implementation includes proper connection management, event parsing, error handling during streaming, and resource cleanup. The example usage shows how applications can easily integrate streaming responses while maintaining proper error handling and user experience considerations.



import asyncio

import json

import httpx

from typing import AsyncGenerator, Optional, Dict, Any

from dataclasses import dataclass


@dataclass

class StreamingChunk:

    """Represents a chunk of streaming response"""

    content: str

    is_complete: bool = False

    error: Optional[str] = None

    metadata: Optional[Dict[str, Any]] = None


class StreamingClient:

    """

    Handles streaming responses from LLM APIs with proper

    connection management and error handling

    """

    

    def __init__(self, api_key: str, base_url: str):

        self.api_key = api_key

        self.base_url = base_url

    

    async def stream_openai_response(

        self, 

        messages: list, 

        model: str = "gpt-3.5-turbo",

        **kwargs

    ) -> AsyncGenerator[StreamingChunk, None]:

        """

        Stream responses from OpenAI API using Server-Sent Events

        """

        headers = {

            "Authorization": f"Bearer {self.api_key}",

            "Content-Type": "application/json"

        }

        

        payload = {

            "model": model,

            "messages": messages,

            "stream": True,

            **kwargs

        }

        

        # Use a longer timeout for streaming connections

        timeout = httpx.Timeout(connect=10.0, read=120.0, write=10.0, pool=120.0)

        

        async with httpx.AsyncClient(timeout=timeout) as client:

            try:

                async with client.stream(

                    "POST",

                    f"{self.base_url}/chat/completions",

                    headers=headers,

                    json=payload

                ) as response:

                    

                    if response.status_code != 200:

                        error_text = await response.aread()

                        yield StreamingChunk(

                            content="",

                            error=f"HTTP {response.status_code}: {error_text.decode()}"

                        )

                        return

                    

                    # Process Server-Sent Events

                    buffer = ""

                    async for chunk in response.aiter_bytes():

                        buffer += chunk.decode('utf-8')

                        

                        # Process complete lines from buffer

                        while '\n' in buffer:

                            line, buffer = buffer.split('\n', 1)

                            

                            processed_chunk = await self._process_sse_line(line)

                            if processed_chunk:

                                yield processed_chunk

                                

                                # Check if stream is complete

                                if processed_chunk.is_complete:

                                    return

                

            except httpx.TimeoutException:

                yield StreamingChunk(

                    content="",

                    error="Streaming request timed out"

                )

            except Exception as e:

                yield StreamingChunk(

                    content="",

                    error=f"Streaming error: {str(e)}"

                )

    

    async def _process_sse_line(self, line: str) -> Optional[StreamingChunk]:

        """

        Process a single Server-Sent Event line from OpenAI streaming response

        """

        line = line.strip()

        

        # Skip empty lines and comments

        if not line or line.startswith(':'):

            return None

        

        # Parse SSE format: "data: {json}"

        if line.startswith('data: '):

            data_part = line[6:]  # Remove "data: " prefix

            

            # Check for stream end marker

            if data_part == '[DONE]':

                return StreamingChunk(content="", is_complete=True)

            

            try:

                event_data = json.loads(data_part)

                

                # Extract content from OpenAI streaming format

                choices = event_data.get('choices', [])

                if not choices:

                    return None

                

                choice = choices[0]

                delta = choice.get('delta', {})

                content = delta.get('content', '')

                finish_reason = choice.get('finish_reason')

                

                # Check if this is the final chunk

                is_complete = finish_reason is not None

                

                return StreamingChunk(

                    content=content,

                    is_complete=is_complete,

                    metadata={

                        'finish_reason': finish_reason,

                        'model': event_data.get('model'),

                        'id': event_data.get('id')

                    }

                )

                

            except json.JSONDecodeError as e:

                return StreamingChunk(

                    content="",

                    error=f"Failed to parse streaming response: {str(e)}"

                )

        

        return None


class StreamingResponseHandler:

    """

    High-level handler for streaming LLM responses with

    content accumulation and user-friendly interface

    """

    

    def __init__(self, streaming_client: StreamingClient):

        self.client = streaming_client

    

    async def generate_streaming_response(

        self,

        prompt: str,

        system_message: Optional[str] = None,

        model: str = "gpt-3.5-turbo",

        on_chunk: Optional[callable] = None,

        on_complete: Optional[callable] = None,

        on_error: Optional[callable] = None

    ) -> str:

        """

        Generate streaming response with callback support for real-time updates

        """

        messages = []

        if system_message:

            messages.append({"role": "system", "content": system_message})

        messages.append({"role": "user", "content": prompt})

        

        accumulated_content = ""

        

        try:

            async for chunk in self.client.stream_openai_response(messages, model):

                if chunk.error:

                    if on_error:

                        await on_error(chunk.error)

                    raise Exception(chunk.error)

                

                if chunk.content:

                    accumulated_content += chunk.content

                    

                    # Call chunk callback if provided

                    if on_chunk:

                        await on_chunk(chunk.content, accumulated_content)

                

                if chunk.is_complete:

                    if on_complete:

                        await on_complete(accumulated_content, chunk.metadata)

                    break

            

            return accumulated_content

            

        except Exception as e:

            if on_error:

                await on_error(str(e))

            raise


# Example usage demonstrating streaming integration

async def example_streaming_usage():

    """

    Example demonstrating how to integrate streaming responses

    in a real application with proper event handling

    """

    

    # Initialize streaming components

    streaming_client = StreamingClient(

        api_key="your-openai-api-key",

        base_url="https://api.openai.com/v1"

    )

    

    handler = StreamingResponseHandler(streaming_client)

    

    # Define callback functions for different events

    async def on_chunk_received(chunk_content: str, full_content: str):

        """Called for each chunk of streaming content"""

        print(f"Received chunk: {chunk_content}", end='', flush=True)

        

        # Here you might update a UI component, send to websocket, etc.

        # await websocket.send_text(chunk_content)

        # await update_ui_component(full_content)

    

    async def on_response_complete(full_content: str, metadata: dict):

        """Called when streaming response is complete"""

        print(f"\nResponse complete. Total length: {len(full_content)}")

        print(f"Finish reason: {metadata.get('finish_reason')}")

        

        # Perform final processing

        # await save_response_to_database(full_content)

        # await send_completion_notification()

    

    async def on_streaming_error(error_message: str):

        """Called when streaming encounters an error"""

        print(f"Streaming error occurred: {error_message}")

        

        # Handle error appropriately

        # await log_error(error_message)

        # await show_error_to_user(error_message)

    

    # Generate streaming response with callbacks

    try:

        final_response = await handler.generate_streaming_response(

            prompt="Explain the concept of machine learning in simple terms.",

            system_message="You are a helpful AI assistant that explains complex topics clearly.",

            model="gpt-3.5-turbo",

            on_chunk=on_chunk_received,

            on_complete=on_response_complete,

            on_error=on_streaming_error

        )

        

        print(f"Final response: {final_response}")

        

    except Exception as e:

        print(f"Failed to generate streaming response: {e}")



Context Management and Conversation State


Managing conversational context is one of the most challenging aspects of LLM integration. Unlike stateless API calls, conversations require maintaining history, managing token limits, and ensuring that context remains relevant and coherent across multiple exchanges. Effective context management involves deciding what information to retain, how to summarize or truncate old context when approaching token limits, and how to maintain conversation flow across sessions.


The challenge becomes more complex when dealing with long conversations that exceed the model’s context window. You need strategies for context compression, selective history retention, and semantic summarization. Additionally, different types of applications require different context management approaches. A chatbot might need to maintain personality and conversation flow, while a document analysis tool might need to retain specific facts and references.


Here is a comprehensive context management system that handles these complexities:


This context management implementation provides a sophisticated approach to handling conversational state in LLM applications. The ConversationManager class maintains structured conversation history while implementing intelligent context compression when token limits are approached. The sliding window approach ensures that recent context is preserved while older, less relevant information is summarized or removed. The context compression logic demonstrates how to maintain conversation coherence even when dealing with very long interactions that exceed model token limits.



import json

import time

from typing import List, Dict, Any, Optional, Union

from dataclasses import dataclass, field

from enum import Enum

import hashlib


class MessageRole(Enum):

    SYSTEM = "system"

    USER = "user"

    ASSISTANT = "assistant"

    FUNCTION = "function"


@dataclass

class ConversationMessage:

    """Structured representation of a conversation message"""

    role: MessageRole

    content: str

    timestamp: float

    metadata: Optional[Dict[str, Any]] = None

    token_count: Optional[int] = None

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert message to dictionary format for API calls"""

        return {

            "role": self.role.value,

            "content": self.content

        }


@dataclass

class ConversationContext:

    """Complete conversation context with metadata"""

    conversation_id: str

    messages: List[ConversationMessage]

    system_prompt: Optional[str] = None

    total_tokens: int = 0

    created_at: float = None

    last_updated: float = None

    

    def __post_init__(self):

        if self.created_at is None:

            self.created_at = time.time()

        self.last_updated = time.time()


class TokenEstimator:

    """

    Estimates token count for text content to help with

    context window management

    """

    

    def __init__(self):

        # Rough estimation: ~4 characters per token for English text

        self.chars_per_token = 4

    

    def estimate_tokens(self, text: str) -> int:

        """

        Estimate token count for given text

        This is a simplified estimation - production code should use

        proper tokenization libraries like tiktoken for OpenAI models

        """

        if not text:

            return 0

        

        # Basic estimation based on character count

        base_estimate = len(text) // self.chars_per_token

        

        # Add some buffer for special tokens and formatting

        return int(base_estimate * 1.1)

    

    def estimate_message_tokens(self, message: ConversationMessage) -> int:

        """Estimate tokens for a complete message including role overhead"""

        content_tokens = self.estimate_tokens(message.content)

        

        # Add overhead for role and formatting

        role_overhead = 4  # Approximate overhead for role formatting

        

        return content_tokens + role_overhead


class ConversationManager:

    """

    Manages conversation context with intelligent truncation

    and context window optimization

    """

    

    def __init__(self, max_context_tokens: int = 4000):

        self.max_context_tokens = max_context_tokens

        self.token_estimator = TokenEstimator()

        self.conversations: Dict[str, ConversationContext] = {}

    

    def create_conversation(

        self, 

        conversation_id: Optional[str] = None,

        system_prompt: Optional[str] = None

    ) -> str:

        """Create a new conversation with optional system prompt"""

        if conversation_id is None:

            conversation_id = self._generate_conversation_id()

        

        context = ConversationContext(

            conversation_id=conversation_id,

            messages=[],

            system_prompt=system_prompt

        )

        

        # Add system message if provided

        if system_prompt:

            system_message = ConversationMessage(

                role=MessageRole.SYSTEM,

                content=system_prompt,

                timestamp=time.time()

            )

            system_message.token_count = self.token_estimator.estimate_message_tokens(system_message)

            context.messages.append(system_message)

            context.total_tokens = system_message.token_count

        

        self.conversations[conversation_id] = context

        return conversation_id

    

    def add_message(

        self, 

        conversation_id: str, 

        role: MessageRole, 

        content: str,

        metadata: Optional[Dict[str, Any]] = None

    ) -> ConversationMessage:

        """Add a new message to the conversation with automatic context management"""

        if conversation_id not in self.conversations:

            raise ValueError(f"Conversation {conversation_id} not found")

        

        context = self.conversations[conversation_id]

        

        # Create new message

        message = ConversationMessage(

            role=role,

            content=content,

            timestamp=time.time(),

            metadata=metadata

        )

        

        # Estimate token count

        message.token_count = self.token_estimator.estimate_message_tokens(message)

        

        # Add message to context

        context.messages.append(message)

        context.total_tokens += message.token_count

        context.last_updated = time.time()

        

        # Check if context needs compression

        if context.total_tokens > self.max_context_tokens:

            self._compress_context(context)

        

        return message

    

    def get_context_for_api(self, conversation_id: str) -> List[Dict[str, Any]]:

        """

        Get conversation context formatted for LLM API calls

        with proper token management

        """

        if conversation_id not in self.conversations:

            raise ValueError(f"Conversation {conversation_id} not found")

        

        context = self.conversations[conversation_id]

        

        # Convert messages to API format

        api_messages = []

        for message in context.messages:

            api_messages.append(message.to_dict())

        

        return api_messages

    

    def _compress_context(self, context: ConversationContext):

        """

        Compress conversation context when approaching token limits

        using sliding window with intelligent preservation

        """

        if not context.messages:

            return

        

        # Always preserve system message if present

        system_messages = [msg for msg in context.messages if msg.role == MessageRole.SYSTEM]

        non_system_messages = [msg for msg in context.messages if msg.role != MessageRole.SYSTEM]

        

        # Calculate tokens for system messages

        system_tokens = sum(msg.token_count or 0 for msg in system_messages)

        available_tokens = self.max_context_tokens - system_tokens

        

        # Keep recent messages that fit within token limit

        compressed_messages = system_messages.copy()

        current_tokens = system_tokens

        

        # Work backwards from most recent messages

        for message in reversed(non_system_messages):

            message_tokens = message.token_count or 0

            

            if current_tokens + message_tokens <= available_tokens:

                compressed_messages.insert(-len([m for m in compressed_messages if m.role != MessageRole.SYSTEM]), message)

                current_tokens += message_tokens

            else:

                # If we can't fit the message, create a summary of older context

                older_messages = non_system_messages[:non_system_messages.index(message)]

                if older_messages:

                    summary = self._create_context_summary(older_messages)

                    summary_message = ConversationMessage(

                        role=MessageRole.SYSTEM,

                        content=f"Previous conversation summary: {summary}",

                        timestamp=time.time()

                    )

                    summary_message.token_count = self.token_estimator.estimate_message_tokens(summary_message)

                    compressed_messages.insert(1, summary_message)  # Insert after main system message

                

                break

        

        # Update context with compressed messages

        context.messages = compressed_messages

        context.total_tokens = sum(msg.token_count or 0 for msg in compressed_messages)

    

    def _create_context_summary(self, messages: List[ConversationMessage]) -> str:

        """

        Create a summary of conversation messages for context compression

        This is a simplified implementation - production systems might use

        LLM-based summarization for better quality

        """

        if not messages:

            return ""

        

        # Group messages by speaker and create basic summary

        user_messages = [msg.content for msg in messages if msg.role == MessageRole.USER]

        assistant_messages = [msg.content for msg in messages if msg.role == MessageRole.ASSISTANT]

        

        summary_parts = []

        

        if user_messages:

            # Take key themes from user messages

            user_topics = self._extract_key_topics(user_messages)

            summary_parts.append(f"User discussed: {', '.join(user_topics)}")

        

        if assistant_messages:

            # Take key points from assistant responses

            assistant_points = self._extract_key_topics(assistant_messages)

            summary_parts.append(f"Assistant covered: {', '.join(assistant_points)}")

        

        return ". ".join(summary_parts)

    

    def _extract_key_topics(self, messages: List[str]) -> List[str]:

        """

        Extract key topics from messages for summarization

        This is a simplified keyword-based approach

        """

        # Combine all messages

        combined_text = " ".join(messages).lower()

        

        # Simple keyword extraction (in production, use NLP libraries)

        # Remove common words and extract meaningful terms

        words = combined_text.split()

        word_freq = {}

        

        # Skip common words

        skip_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'you', 'i', 'we', 'they', 'he', 'she', 'it', 'this', 'that', 'these', 'those'}

        

        for word in words:

            # Clean word

            clean_word = ''.join(c for c in word if c.isalnum())

            if len(clean_word) > 3 and clean_word not in skip_words:

                word_freq[clean_word] = word_freq.get(clean_word, 0) + 1

        

        # Return top keywords

        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)

        return [word for word, freq in sorted_words[:5]]

    

    def _generate_conversation_id(self) -> str:

        """Generate unique conversation ID"""

        timestamp = str(time.time())

        return hashlib.md5(timestamp.encode()).hexdigest()[:16]

    

    def get_conversation_stats(self, conversation_id: str) -> Dict[str, Any]:

        """Get statistics about conversation context"""

        if conversation_id not in self.conversations:

            raise ValueError(f"Conversation {conversation_id} not found")

        

        context = self.conversations[conversation_id]

        

        return {

            "conversation_id": conversation_id,

            "message_count": len(context.messages),

            "total_tokens": context.total_tokens,

            "token_utilization": context.total_tokens / self.max_context_tokens,

            "created_at": context.created_at,

            "last_updated": context.last_updated,

            "duration_minutes": (context.last_updated - context.created_at) / 60

        }

    

    def save_conversation(self, conversation_id: str, filepath: str):

        """Save conversation to file for persistence"""

        if conversation_id not in self.conversations:

            raise ValueError(f"Conversation {conversation_id} not found")

        

        context = self.conversations[conversation_id]

        

        # Convert to serializable format

        serializable_data = {

            "conversation_id": context.conversation_id,

            "system_prompt": context.system_prompt,

            "total_tokens": context.total_tokens,

            "created_at": context.created_at,

            "last_updated": context.last_updated,

            "messages": [

                {

                    "role": msg.role.value,

                    "content": msg.content,

                    "timestamp": msg.timestamp,

                    "metadata": msg.metadata,

                    "token_count": msg.token_count

                }

                for msg in context.messages

            ]

        }

        

        with open(filepath, 'w') as f:

            json.dump(serializable_data, f, indent=2)

    

    def load_conversation(self, filepath: str) -> str:

        """Load conversation from file"""

        with open(filepath, 'r') as f:

            data = json.load(f)

        

        # Reconstruct conversation context

        messages = []

        for msg_data in data["messages"]:

            message = ConversationMessage(

                role=MessageRole(msg_data["role"]),

                content=msg_data["content"],

                timestamp=msg_data["timestamp"],

                metadata=msg_data.get("metadata"),

                token_count=msg_data.get("token_count")

            )

            messages.append(message)

        

        context = ConversationContext(

            conversation_id=data["conversation_id"],

            messages=messages,

            system_prompt=data.get("system_prompt"),

            total_tokens=data["total_tokens"],

            created_at=data["created_at"],

            last_updated=data["last_updated"]

        )

        

        self.conversations[context.conversation_id] = context

        return context.conversation_id



Prompt Engineering in Application Code


Effective prompt engineering within application code requires a systematic approach to crafting, testing, and maintaining prompts. Unlike ad-hoc prompt creation, production applications need structured prompt management that supports versioning, A/B testing, localization, and dynamic prompt composition based on context and user requirements.


The key to successful prompt engineering in code is treating prompts as first-class citizens in your application architecture. This means creating reusable prompt templates, implementing validation and testing frameworks for prompts, and building systems that can adapt prompts based on user behavior and application state. Additionally, you need to consider how prompts interact with context management, token limits, and different model capabilities.


Here is a comprehensive prompt engineering framework designed for production applications:


This prompt engineering framework provides a production-ready approach to managing prompts in LLM applications. The PromptTemplate class supports dynamic variable substitution with validation and formatting options. The PromptLibrary manages collections of prompts with versioning support, enabling A/B testing and gradual rollout of prompt improvements. The framework includes validation mechanisms to ensure prompts meet quality standards and token limits, while the example usage demonstrates how to build sophisticated prompt composition systems that adapt to different contexts and requirements.



import re

import json

import time

from typing import Dict, Any, List, Optional, Union, Callable

from dataclasses import dataclass, field

from enum import Enum

import hashlib


class PromptType(Enum):

    SYSTEM = "system"

    USER = "user"

    ASSISTANT = "assistant"

    FUNCTION = "function"

    INSTRUCTION = "instruction"


@dataclass

class PromptTemplate:

    """

    Structured prompt template with variable substitution

    and validation capabilities

    """

    name: str

    template: str

    prompt_type: PromptType

    variables: List[str] = field(default_factory=list)

    description: Optional[str] = None

    version: str = "1.0"

    metadata: Dict[str, Any] = field(default_factory=dict)

    

    def __post_init__(self):

        """Extract variables from template automatically"""

        if not self.variables:

            self.variables = self._extract_variables()

    

    def _extract_variables(self) -> List[str]:

        """Extract variable names from template using regex"""

        # Find variables in format {variable_name}

        pattern = r'\{([^}]+)\}'

        variables = re.findall(pattern, self.template)

        return list(set(variables))  # Remove duplicates

    

    def format(self, **kwargs) -> str:

        """

        Format template with provided variables and validation

        """

        # Validate required variables

        missing_vars = set(self.variables) - set(kwargs.keys())

        if missing_vars:

            raise ValueError(f"Missing required variables: {missing_vars}")

        

        # Validate extra variables

        extra_vars = set(kwargs.keys()) - set(self.variables)

        if extra_vars:

            raise ValueError(f"Unexpected variables provided: {extra_vars}")

        

        # Apply formatting with error handling

        try:

            formatted_prompt = self.template.format(**kwargs)

            

            # Post-processing validation

            self._validate_formatted_prompt(formatted_prompt)

            

            return formatted_prompt

            

        except KeyError as e:

            raise ValueError(f"Template formatting error: {e}")

    

    def _validate_formatted_prompt(self, prompt: str):

        """Validate the formatted prompt meets quality standards"""

        # Check for common issues

        if not prompt.strip():

            raise ValueError("Formatted prompt is empty")

        

        # Check for unresolved variables

        if '{' in prompt and '}' in prompt:

            unresolved = re.findall(r'\{[^}]+\}', prompt)

            if unresolved:

                raise ValueError(f"Unresolved variables in prompt: {unresolved}")

        

        # Check for excessively long prompts (basic token estimation)

        estimated_tokens = len(prompt) // 4  # Rough estimate

        if estimated_tokens > 8000:  # Configurable limit

            raise ValueError(f"Prompt may be too long: ~{estimated_tokens} tokens")

    

    def get_hash(self) -> str:

        """Generate hash for template versioning and caching"""

        content = f"{self.template}{self.version}{json.dumps(self.metadata, sort_keys=True)}"

        return hashlib.md5(content.encode()).hexdigest()[:16]


class PromptLibrary:

    """

    Centralized management of prompt templates with versioning

    and A/B testing capabilities

    """

    

    def __init__(self):

        self.templates: Dict[str, Dict[str, PromptTemplate]] = {}

        self.default_versions: Dict[str, str] = {}

        self.ab_tests: Dict[str, Dict[str, Any]] = {}

    

    def register_template(self, template: PromptTemplate, set_as_default: bool = True):

        """Register a new prompt template"""

        if template.name not in self.templates:

            self.templates[template.name] = {}

        

        self.templates[template.name][template.version] = template

        

        if set_as_default:

            self.default_versions[template.name] = template.version

    

    def get_template(

        self, 

        name: str, 

        version: Optional[str] = None,

        user_id: Optional[str] = None

    ) -> PromptTemplate:

        """

        Get prompt template with optional A/B testing support

        """

        if name not in self.templates:

            raise ValueError(f"Template '{name}' not found")

        

        # Check for A/B test configuration

        if user_id and name in self.ab_tests:

            version = self._get_ab_test_version(name, user_id)

        

        # Use specified version or default

        if version is None:

            version = self.default_versions.get(name)

            if version is None:

                # Use latest version if no default set

                versions = list(self.templates[name].keys())

                version = max(versions)  # Assumes semantic versioning

        

        if version not in self.templates[name]:

            available = list(self.templates[name].keys())

            raise ValueError(f"Version '{version}' not found for template '{name}'. Available: {available}")

        

        return self.templates[name][version]

    

    def create_ab_test(

        self, 

        template_name: str, 

        version_a: str, 

        version_b: str,

        traffic_split: float = 0.5

    ):

        """Create A/B test between two template versions"""

        if template_name not in self.templates:

            raise ValueError(f"Template '{template_name}' not found")

        

        if version_a not in self.templates[template_name]:

            raise ValueError(f"Version '{version_a}' not found")

        

        if version_b not in self.templates[template_name]:

            raise ValueError(f"Version '{version_b}' not found")

        

        self.ab_tests[template_name] = {

            "version_a": version_a,

            "version_b": version_b,

            "traffic_split": traffic_split,

            "created_at": time.time()

        }

    

    def _get_ab_test_version(self, template_name: str, user_id: str) -> str:

        """Determine which version to use based on A/B test configuration"""

        test_config = self.ab_tests[template_name]

        

        # Use user_id hash to consistently assign users to test groups

        user_hash = hashlib.md5(f"{template_name}:{user_id}".encode()).hexdigest()

        hash_value = int(user_hash[:8], 16) / 0xFFFFFFFF  # Normalize to 0-1

        

        if hash_value < test_config["traffic_split"]:

            return test_config["version_a"]

        else:

            return test_config["version_b"]

    

    def list_templates(self) -> Dict[str, List[str]]:

        """List all templates and their versions"""

        return {name: list(versions.keys()) for name, versions in self.templates.items()}


class PromptComposer:

    """

    Advanced prompt composition with dynamic content generation

    and context-aware prompt building

    """

    

    def __init__(self, prompt_library: PromptLibrary):

        self.library = prompt_library

        self.composition_rules: Dict[str, Callable] = {}

    

    def register_composition_rule(self, name: str, rule_function: Callable):

        """Register a custom composition rule"""

        self.composition_rules[name] = rule_function

    

    def compose_prompt(

        self, 

        base_template: str,

        context: Dict[str, Any],

        user_id: Optional[str] = None,

        additional_instructions: Optional[List[str]] = None

    ) -> str:

        """

        Compose a complex prompt from multiple templates and context

        """

        # Get base template

        template = self.library.get_template(base_template, user_id=user_id)

        

        # Enhance context with dynamic content

        enhanced_context = self._enhance_context(context)

        

        # Add additional instructions if provided

        if additional_instructions:

            enhanced_context["additional_instructions"] = "\n".join(additional_instructions)

        

        # Apply composition rules

        for rule_name, rule_function in self.composition_rules.items():

            enhanced_context = rule_function(enhanced_context)

        

        # Format final prompt

        formatted_prompt = template.format(**enhanced_context)

        

        return formatted_prompt

    

    def _enhance_context(self, context: Dict[str, Any]) -> Dict[str, Any]:

        """Enhance context with computed values and formatting"""

        enhanced = context.copy()

        

        # Add current timestamp

        enhanced["current_time"] = time.strftime("%Y-%m-%d %H:%M:%S")

        

        # Format lists as bullet points if present

        for key, value in enhanced.items():

            if isinstance(value, list) and all(isinstance(item, str) for item in value):

                enhanced[f"{key}_formatted"] = "\n".join(f"- {item}" for item in value)

        

        # Add context metadata

        enhanced["context_size"] = len(str(context))

        

        return enhanced



Performance Optimization Techniques


Performance optimization for LLM integrations involves multiple layers of optimization, from network-level improvements to application-level caching and request batching. The inherent latency of LLM APIs means that every optimization technique can have a significant impact on user experience. Effective optimization requires understanding the characteristics of your specific use case and implementing appropriate strategies for caching, connection pooling, request batching, and asynchronous processing.


One of the most impactful optimizations is implementing intelligent caching strategies that can serve previously generated content for similar requests. However, caching LLM responses requires careful consideration of when cached responses are appropriate and how to handle the inherent variability in LLM outputs. Additionally, connection pooling, request batching, and parallel processing can significantly reduce overall response times when handling multiple requests.


Here is a comprehensive performance optimization framework for LLM integrations:


This performance optimization framework demonstrates multiple strategies for improving LLM integration performance. The caching system uses both response-based and semantic similarity caching to serve appropriate cached content when possible. The connection pooling and request batching components optimize network utilization and reduce overall latency. The parallel processing system shows how to handle multiple requests efficiently while respecting rate limits and resource constraints.



import asyncio

import time

import hashlib

import pickle

from typing import Dict, Any, List, Optional, Union, Tuple

from dataclasses import dataclass, field

from concurrent.futures import ThreadPoolExecutor

import httpx

from collections import OrderedDict, defaultdict

import json


@dataclass

class CacheEntry:

    """Represents a cached LLM response with metadata"""

    response: str

    timestamp: float

    prompt_hash: str

    model: str

    parameters: Dict[str, Any]

    hit_count: int = 0

    

    def is_expired(self, ttl_seconds: int) -> bool:

        """Check if cache entry has expired"""

        return time.time() - self.timestamp > ttl_seconds

    

    def update_hit_count(self):

        """Update hit count for LRU tracking"""

        self.hit_count += 1


class LRUCache:

    """

    Least Recently Used cache with TTL support

    for LLM response caching

    """

    

    def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):

        self.max_size = max_size

        self.ttl_seconds = ttl_seconds

        self.cache: OrderedDict[str, CacheEntry] = OrderedDict()

        self.access_times: Dict[str, float] = {}

    

    def _generate_cache_key(

        self, 

        prompt: str, 

        model: str, 

        parameters: Dict[str, Any]

    ) -> str:

        """Generate deterministic cache key from request parameters"""

        # Normalize parameters for consistent hashing

        normalized_params = json.dumps(parameters, sort_keys=True)

        content = f"{prompt}|{model}|{normalized_params}"

        return hashlib.sha256(content.encode()).hexdigest()[:32]

    

    def get(

        self, 

        prompt: str, 

        model: str, 

        parameters: Dict[str, Any]

    ) -> Optional[str]:

        """Retrieve cached response if available and valid"""

        cache_key = self._generate_cache_key(prompt, model, parameters)

        

        if cache_key not in self.cache:

            return None

        

        entry = self.cache[cache_key]

        

        # Check if entry has expired

        if entry.is_expired(self.ttl_seconds):

            del self.cache[cache_key]

            del self.access_times[cache_key]

            return None

        

        # Update access time and hit count

        entry.update_hit_count()

        self.access_times[cache_key] = time.time()

        

        # Move to end (most recently used)

        self.cache.move_to_end(cache_key)

        

        return entry.response

    

    def put(

        self, 

        prompt: str, 

        model: str, 

        parameters: Dict[str, Any], 

        response: str

    ):

        """Store response in cache with LRU eviction"""

        cache_key = self._generate_cache_key(prompt, model, parameters)

        

        # Remove existing entry if present

        if cache_key in self.cache:

            del self.cache[cache_key]

        

        # Create new cache entry

        entry = CacheEntry(

            response=response,

            timestamp=time.time(),

            prompt_hash=cache_key,

            model=model,

            parameters=parameters.copy()

        )

        

        # Add to cache

        self.cache[cache_key] = entry

        self.access_times[cache_key] = time.time()

        

        # Enforce size limit

        while len(self.cache) > self.max_size:

            # Remove least recently used item

            oldest_key = next(iter(self.cache))

            del self.cache[oldest_key]

            del self.access_times[oldest_key]

    

    def clear_expired(self):

        """Manually clear expired entries"""

        current_time = time.time()

        expired_keys = [

            key for key, entry in self.cache.items()

            if current_time - entry.timestamp > self.ttl_seconds

        ]

        

        for key in expired_keys:

            del self.cache[key]

            del self.access_times[key]

    

    def get_stats(self) -> Dict[str, Any]:

        """Get cache statistics"""

        total_hits = sum(entry.hit_count for entry in self.cache.values())

        return {

            "size": len(self.cache),

            "max_size": self.max_size,

            "total_hits": total_hits,

            "utilization": len(self.cache) / self.max_size,

            "expired_entries": sum(

                1 for entry in self.cache.values()

                if entry.is_expired(self.ttl_seconds)

            )

        }


@dataclass

class BatchRequest:

    """Represents a single request in a batch"""

    request_id: str

    prompt: str

    model: str

    parameters: Dict[str, Any]

    callback: Optional[callable] = None


class RequestBatcher:

    """

    Batches multiple LLM requests to optimize API usage

    and reduce overall latency

    """

    

    def __init__(

        self, 

        max_batch_size: int = 10,

        batch_timeout: float = 1.0,

        max_concurrent_batches: int = 3

    ):

        self.max_batch_size = max_batch_size

        self.batch_timeout = batch_timeout

        self.max_concurrent_batches = max_concurrent_batches

        

        self.pending_requests: List[BatchRequest] = []

        self.batch_semaphore = asyncio.Semaphore(max_concurrent_batches)

        self.batch_task: Optional[asyncio.Task] = None

        self.request_futures: Dict[str, asyncio.Future] = {}

    

    async def add_request(

        self, 

        prompt: str, 

        model: str, 

        parameters: Dict[str, Any],

        request_id: Optional[str] = None

    ) -> str:

        """Add request to batch and return response when available"""

        if request_id is None:

            request_id = f"req_{int(time.time() * 1000)}_{len(self.pending_requests)}"

        

        # Create future for this request

        future = asyncio.Future()

        self.request_futures[request_id] = future

        

        # Add to pending requests

        batch_request = BatchRequest(

            request_id=request_id,

            prompt=prompt,

            model=model,

            parameters=parameters

        )

        self.pending_requests.append(batch_request)

        

        # Start batch processing if needed

        if self.batch_task is None or self.batch_task.done():

            self.batch_task = asyncio.create_task(self._process_batches())

        

        # Wait for response

        return await future

    

    async def _process_batches(self):

        """Process pending requests in batches"""

        while self.pending_requests:

            # Determine batch size

            batch_size = min(len(self.pending_requests), self.max_batch_size)

            

            # Extract batch

            batch = self.pending_requests[:batch_size]

            self.pending_requests = self.pending_requests[batch_size:]

            

            # Process batch

            asyncio.create_task(self._process_single_batch(batch))

            

            # Wait for timeout or until we have enough requests for another batch

            if self.pending_requests:

                await asyncio.sleep(0.1)  # Small delay to allow more requests to accumulate

            else:

                # Wait for timeout in case more requests come in

                try:

                    await asyncio.wait_for(

                        self._wait_for_requests(), 

                        timeout=self.batch_timeout

                    )

                except asyncio.TimeoutError:

                    break  # Timeout reached, exit batch processing

    

    async def _wait_for_requests(self):

        """Wait for new requests to arrive"""

        while not self.pending_requests:

            await asyncio.sleep(0.01)

    

    async def _process_single_batch(self, batch: List[BatchRequest]):

        """Process a single batch of requests"""

        async with self.batch_semaphore:

            try:

                # Group requests by model and similar parameters

                grouped_requests = self._group_requests(batch)

                

                # Process each group

                for group in grouped_requests:

                    await self._process_request_group(group)

                    

            except Exception as e:

                # Handle batch processing error

                for request in batch:

                    future = self.request_futures.get(request.request_id)

                    if future and not future.done():

                        future.set_exception(e)

    

    def _group_requests(self, batch: List[BatchRequest]) -> List[List[BatchRequest]]:

        """Group requests by model and compatible parameters"""

        groups = defaultdict(list)

        

        for request in batch:

            # Create group key based on model and key parameters

            group_key = f"{request.model}_{request.parameters.get('temperature', 0.7)}"

            groups[group_key].append(request)

        

        return list(groups.values())

    

    async def _process_request_group(self, group: List[BatchRequest]):

        """Process a group of similar requests"""

        # For this example, we'll process requests in parallel

        # In practice, you might use actual batch APIs where available

        

        tasks = []

        for request in group:

            task = asyncio.create_task(

                self._execute_single_request(request)

            )

            tasks.append(task)

        

        # Wait for all requests in group to complete

        await asyncio.gather(*tasks, return_exceptions=True)

    

    async def _execute_single_request(self, request: BatchRequest):

        """Execute a single request and resolve its future"""

        try:

            # Simulate API call (replace with actual LLM API call)

            response = await self._mock_llm_api_call(

                request.prompt, 

                request.model, 

                request.parameters

            )

            

            # Resolve future with response

            future = self.request_futures.get(request.request_id)

            if future and not future.done():

                future.set_result(response)

                

        except Exception as e:

            # Resolve future with exception

            future = self.request_futures.get(request.request_id)

            if future and not future.done():

                future.set_exception(e)

        finally:

            # Clean up future reference

            self.request_futures.pop(request.request_id, None)

    

    async def _mock_llm_api_call(

        self, 

        prompt: str, 

        model: str, 

        parameters: Dict[str, Any]

    ) -> str:

        """Mock LLM API call for testing"""

        # Simulate API latency

        await asyncio.sleep(0.5)

        return f"Response to: {prompt[:50]}..."


class ConnectionPoolManager:

    """

    Manages HTTP connection pools for optimal performance

    with different LLM providers

    """

    

    def __init__(self):

        self.pools: Dict[str, httpx.AsyncClient] = {}

        self.pool_configs = {

            "openai": {

                "limits": httpx.Limits(max_connections=20, max_keepalive_connections=10),

                "timeout": httpx.Timeout(connect=10.0, read=120.0, write=10.0, pool=120.0)

            },

            "anthropic": {

                "limits": httpx.Limits(max_connections=15, max_keepalive_connections=8),

                "timeout": httpx.Timeout(connect=10.0, read=150.0, write=10.0, pool=150.0)

            },

            "default": {

                "limits": httpx.Limits(max_connections=10, max_keepalive_connections=5),

                "timeout": httpx.Timeout(connect=10.0, read=120.0, write=10.0, pool=120.0)

            }

        }

    

    def get_client(self, provider: str) -> httpx.AsyncClient:

        """Get or create HTTP client for provider"""

        if provider not in self.pools:

            config = self.pool_configs.get(provider, self.pool_configs["default"])

            

            self.pools[provider] = httpx.AsyncClient(

                limits=config["limits"],

                timeout=config["timeout"],

                http2=True  # Enable HTTP/2 for better performance

            )

        

        return self.pools[provider]

    

    async def close_all(self):

        """Close all connection pools"""

        for client in self.pools.values():

            await client.aclose()

        self.pools.clear()


class ParallelRequestProcessor:

    """

    Processes multiple LLM requests in parallel with

    rate limiting and resource management

    """

    

    def __init__(

        self, 

        max_concurrent_requests: int = 10,

        rate_limit_per_second: float = 5.0

    ):

        self.max_concurrent_requests = max_concurrent_requests

        self.rate_limit_per_second = rate_limit_per_second

        

        self.semaphore = asyncio.Semaphore(max_concurrent_requests)

        self.rate_limiter = asyncio.Semaphore(int(rate_limit_per_second))

        self.last_request_time = 0.0

    

    async def process_requests(

        self, 

        requests: List[Dict[str, Any]],

        request_function: callable

    ) -> List[Union[str, Exception]]:

        """

        Process multiple requests in parallel with rate limiting

        """

        # Create tasks for all requests

        tasks = []

        for i, request in enumerate(requests):

            task = asyncio.create_task(

                self._process_single_request(

                    request_function, 

                    request, 

                    request_id=f"parallel_{i}"

                )

            )

            tasks.append(task)

        

        # Wait for all tasks to complete

        results = await asyncio.gather(*tasks, return_exceptions=True)

        

        return results

    

    async def _process_single_request(

        self, 

        request_function: callable, 

        request: Dict[str, Any],

        request_id: str

    ) -> str:

        """Process a single request with concurrency and rate limiting"""

        # Apply rate limiting

        await self._apply_rate_limit()

        

        # Apply concurrency limiting

        async with self.semaphore:

            try:

                result = await request_function(**request)

                return result

            except Exception as e:

                # Log error but don't re-raise to allow other requests to continue

                print(f"Request {request_id} failed: {e}")

                return e

    

    async def _apply_rate_limit(self):

        """Apply rate limiting to prevent exceeding API limits"""

        current_time = time.time()

        time_since_last = current_time - self.last_request_time

        

        min_interval = 1.0 / self.rate_limit_per_second

        

        if time_since_last < min_interval:

            sleep_time = min_interval - time_since_last

            await asyncio.sleep(sleep_time)

        

        self.last_request_time = time.time()


# Example usage demonstrating performance optimization

async def example_performance_optimization():

    """

    Example demonstrating comprehensive performance optimization

    techniques for LLM integrations

    """

    

    # Initialize performance optimization components

    cache = LRUCache(max_size=500, ttl_seconds=1800)  # 30 minute TTL

    batcher = RequestBatcher(max_batch_size=5, batch_timeout=2.0)

    pool_manager = ConnectionPoolManager()

    parallel_processor = ParallelRequestProcessor(max_concurrent_requests=8)

    

    print("Performance Optimization Example")

    print("=" * 50)

    

    # Example 1: Cache usage

    print("\n1. Cache Usage:")

    

    # Simulate cache misses and hits

    test_prompt = "Explain machine learning"

    test_model = "gpt-3.5-turbo"

    test_params = {"temperature": 0.7, "max_tokens": 100}

    

    # First request - cache miss

    cached_response = cache.get(test_prompt, test_model, test_params)

    print(f"Cache miss: {cached_response is None}")

    

    # Simulate response and cache it

    mock_response = "Machine learning is a subset of artificial intelligence..."

    cache.put(test_prompt, test_model, test_params, mock_response)

    

    # Second request - cache hit

    cached_response = cache.get(test_prompt, test_model, test_params)

    print(f"Cache hit: {cached_response is not None}")

    print(f"Cache stats: {cache.get_stats()}")

    

    # Example 2: Request batching

    print("\n2. Request Batching:")

    

    # Simulate multiple similar requests

    batch_requests = [

        "What is Python?",

        "What is JavaScript?",

        "What is machine learning?",

        "What is web development?",

        "What is data science?"

    ]

    

    start_time = time.time()

    batch_results = []

    

    for prompt in batch_requests:

        result = await batcher.add_request(

            prompt=prompt,

            model="gpt-3.5-turbo",

            parameters={"temperature": 0.7, "max_tokens": 100}

        )

        batch_results.append(result)

    

    batch_time = time.time() - start_time

    print(f"Batched {len(batch_requests)} requests in {batch_time:.2f}s")

    print(f"Average time per request: {batch_time/len(batch_requests):.2f}s")

    

    # Example 3: Parallel processing

    print("\n3. Parallel Processing:")

    

    parallel_requests = [

        {"prompt": "Explain AI", "model": "gpt-3.5-turbo", "max_tokens": 50},

        {"prompt": "What is ML?", "model": "gpt-3.5-turbo", "max_tokens": 50},

        {"prompt": "Define NLP", "model": "gpt-3.5-turbo", "max_tokens": 50},

        {"prompt": "What is computer vision?", "model": "gpt-3.5-turbo", "max_tokens": 50}

    ]

    

    async def mock_llm_request(**kwargs):

        """Mock LLM request for demonstration"""

        await asyncio.sleep(0.5)  # Simulate API latency

        return f"Response to: {kwargs['prompt']}"

    

    start_time = time.time()

    parallel_results = await parallel_processor.process_requests(

        parallel_requests,

        mock_llm_request

    )

    

    parallel_time = time.time() - start_time

    successful_requests = len([r for r in parallel_results if not isinstance(r, Exception)])

    

    print(f"Processed {len(parallel_requests)} requests in parallel: {parallel_time:.2f}s")

    print(f"Successful requests: {successful_requests}/{len(parallel_requests)}")

    print(f"Effective throughput: {successful_requests/parallel_time:.2f} req/s")

    

    # Example 4: Connection pool efficiency

    print("\n4. Connection Pool Management:")

    

    openai_client = pool_manager.get_client("openai")

    anthropic_client = pool_manager.get_client("anthropic")

    

    print("Connection pools initialized for multiple providers")

    print("Reusing connections for better performance")

    

    # Example 5: Performance comparison

    print("\n5. Performance Comparison:")

    

    # Simulate requests with and without optimizations

    print("Without optimizations (sequential requests):")

    start_time = time.time()

    

    for i in range(3):

        await asyncio.sleep(0.5)  # Simulate individual request latency

    

    sequential_time = time.time() - start_time

    print(f"  Sequential time: {sequential_time:.2f}s")

    

    print("With optimizations (cached + batched):")

    start_time = time.time()

    

    # Simulate cached response (instant)

    cached_response = cache.get("cached_prompt", "gpt-3.5-turbo", {"temperature": 0.7})

    if not cached_response:

        # Simulate batch processing

        await asyncio.sleep(0.8)  # Simulate batch request

        cache.put("cached_prompt", "gpt-3.5-turbo", {"temperature": 0.7}, "Cached response")

    

    optimized_time = time.time() - start_time

    print(f"  Optimized time: {optimized_time:.2f}s")

    print(f"  Performance improvement: {((sequential_time - optimized_time) / sequential_time * 100):.1f}%")

    

    # Clean up

    await pool_manager.close_all()

    

    print("\nPerformance optimization demonstration complete")


# Run the performance optimization example

if __name__ == "__main__":

    asyncio.run(example_performance_optimization())



Conclusion


Large Language Model integration represents a transformative capability for modern applications, but successful implementation requires careful attention to architecture, security, performance, and cost management. Throughout this guide, we have explored the comprehensive requirements for building production-ready LLM integrations that are robust, scalable, and maintainable.


The key principles for successful LLM integration include implementing proper abstraction layers that can work across multiple providers, building comprehensive error handling and retry mechanisms, establishing robust security measures to protect against prompt injection and other attacks, implementing intelligent caching and performance optimization strategies, and maintaining thorough monitoring and cost management systems.


As the LLM landscape continues to evolve rapidly, several future considerations will become increasingly important. Multi-modal capabilities are expanding beyond text to include images, audio, and video, requiring new integration patterns and processing pipelines. Edge computing and local model deployment options are improving, potentially reducing reliance on cloud APIs for certain use cases. Fine-tuning and customization capabilities are becoming more accessible, allowing organizations to create specialized models for their specific domains and requirements.


The regulatory landscape around AI and LLM usage is also evolving, with new compliance requirements emerging that will impact how organizations deploy and monitor these systems. Privacy regulations, algorithmic transparency requirements, and ethical AI guidelines will necessitate enhanced logging, audit trails, and explainability features in LLM integrations.


Furthermore, the economics of LLM usage will continue to shift as competition increases among providers and new pricing models emerge. Organizations should build flexibility into their cost management and provider selection strategies to adapt to these changes while optimizing for both performance and cost-effectiveness.


Successful LLM integration is not just about making API calls - it requires a holistic approach that considers technical architecture, operational requirements, security implications, and business objectives. By following the patterns and practices outlined in this guide, developers can build LLM integrations that provide reliable, secure, and cost-effective AI capabilities that scale with their applications and organizations.

No comments: