Thursday, October 23, 2025

SYSTEMATIC DESIGN OF SOFTWARE SYSTEMS WITH AI/GENAI INTEGRATION




Introduction: The Paradigm Shift in Software Architecture


The integration of Artificial Intelligence and Generative AI capabilities into traditional software systems represents one of the most significant architectural challenges of our time. Unlike conventional software components that follow deterministic patterns, AI systems introduce probabilistic behavior, require substantial computational resources, and often depend on external services or models that may have varying response times and reliability characteristics.

When we design software systems that incorporate AI functionality, we must fundamentally rethink our approach to architecture. Traditional software engineering principles still apply, but they must be adapted to accommodate the unique characteristics of AI systems. These characteristics include non-deterministic outputs, potential latency variations, model versioning complexities, and the need for continuous learning and adaptation.

The key to successful AI integration lies in treating AI components as first-class citizens in our architecture while maintaining clear separation of concerns. This means designing systems where AI functionality is properly abstracted, easily testable, and can evolve independently of the core business logic. The architecture must be resilient enough to handle AI service failures gracefully while being flexible enough to accommodate rapid changes in AI technology.


Foundational Design Principles for AI-Integrated Systems


The foundation of any well-designed AI-integrated system rests on several core principles that extend traditional software design patterns. The principle of separation of concerns becomes even more critical when dealing with AI components because these components often have different scaling requirements, update cycles, and failure modes compared to traditional business logic.

Dependency inversion is particularly important in AI systems because it allows us to abstract away the specific implementation details of AI models or services. This abstraction enables us to switch between different AI providers, model versions, or even local versus cloud-based implementations without affecting the core application logic. The abstraction also facilitates testing by allowing us to inject mock AI services during development and testing phases.

Single responsibility principle takes on new dimensions in AI systems. Each component should have a clearly defined role, whether it's data preprocessing, model inference, result post-processing, or error handling. This clear delineation makes the system more maintainable and allows different team members to work on different aspects of the AI pipeline without interfering with each other.

The principle of fail-fast becomes crucial when dealing with AI systems because AI operations can be computationally expensive and time-consuming. It's better to validate inputs and preconditions early rather than discovering problems after expensive AI processing has already begun.


Core Architectural Patterns for AI Integration


The Adapter Pattern proves invaluable when integrating multiple AI services or models into a single system. Different AI providers often have varying APIs, data formats, and response structures. The Adapter Pattern allows us to create a unified interface that abstracts these differences, making it easier to switch between providers or use multiple providers simultaneously.

Let me illustrate this with a detailed code example. The following implementation demonstrates how to create an adapter pattern for different text generation services. The code defines a common interface that all AI text generators must implement, regardless of their underlying technology or provider. This interface includes a method for generating text and another for checking service health.


from abc import ABC, abstractmethod

from typing import Dict, Any, Optional

import asyncio


class TextGeneratorInterface(ABC):

    @abstractmethod

    async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

        pass

    

    @abstractmethod

    async def health_check(self) -> bool:

        pass


class OpenAIAdapter(TextGeneratorInterface):

    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):

        self.api_key = api_key

        self.model = model

        self.client = None  # Initialize OpenAI client here

    

    async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

        try:

            # Transform parameters to OpenAI format

            openai_params = self._transform_parameters(parameters)

            

            # Make API call to OpenAI

            response = await self.client.chat.completions.create(

                model=self.model,

                messages=[{"role": "user", "content": prompt}],

                **openai_params

            )

            

            return response.choices[0].message.content

        except Exception as e:

            raise AIServiceException(f"OpenAI generation failed: {str(e)}")

    

    async def health_check(self) -> bool:

        try:

            # Simple health check by making a minimal API call

            await self.generate_text("Hello", {"max_tokens": 1})

            return True

        except:

            return False

    

    def _transform_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:

        # Transform generic parameters to OpenAI-specific format

        openai_params = {}

        if "max_length" in parameters:

            openai_params["max_tokens"] = parameters["max_length"]

        if "temperature" in parameters:

            openai_params["temperature"] = parameters["temperature"]

        return openai_params


class AnthropicAdapter(TextGeneratorInterface):

    def __init__(self, api_key: str, model: str = "claude-3-sonnet"):

        self.api_key = api_key

        self.model = model

        self.client = None  # Initialize Anthropic client here

    

    async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

        try:

            # Transform parameters to Anthropic format

            anthropic_params = self._transform_parameters(parameters)

            

            # Make API call to Anthropic

            response = await self.client.messages.create(

                model=self.model,

                messages=[{"role": "user", "content": prompt}],

                **anthropic_params

            )

            

            return response.content[0].text

        except Exception as e:

            raise AIServiceException(f"Anthropic generation failed: {str(e)}")

    

    async def health_check(self) -> bool:

        try:

            await self.generate_text("Hello", {"max_tokens": 1})

            return True

        except:

            return False

    

    def _transform_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:

        # Transform generic parameters to Anthropic-specific format

        anthropic_params = {}

        if "max_length" in parameters:

            anthropic_params["max_tokens"] = parameters["max_length"]

        if "temperature" in parameters:

            anthropic_params["temperature"] = parameters["temperature"]

        return anthropic_params


class AIServiceException(Exception):

    pass


This adapter implementation provides several important benefits. First, it allows the application to work with multiple AI providers through a single, consistent interface. The application code doesn't need to know whether it's talking to OpenAI, Anthropic, or any other provider. Second, each adapter handles the specific parameter transformations and error handling required for its respective service. Third, the health check method provides a way to monitor the availability of each service independently.

The Strategy Pattern complements the Adapter Pattern by allowing dynamic selection of AI services based on runtime conditions. This pattern is particularly useful when you want to choose different AI models based on factors such as cost, performance requirements, or current service availability.

Building on the previous example, here's how we can implement a strategy pattern that intelligently selects the appropriate AI service based on current conditions:


from enum import Enum

from typing import List, Optional

import time


class ServicePriority(Enum):

    COST_OPTIMIZED = "cost_optimized"

    PERFORMANCE_OPTIMIZED = "performance_optimized"

    RELIABILITY_OPTIMIZED = "reliability_optimized"


class AIServiceStrategy:

    def __init__(self, adapters: List[TextGeneratorInterface], priority: ServicePriority):

        self.adapters = adapters

        self.priority = priority

        self.service_metrics = {}

        self._initialize_metrics()

    

    def _initialize_metrics(self):

        for adapter in self.adapters:

            self.service_metrics[adapter.__class__.__name__] = {

                "response_times": [],

                "success_rate": 1.0,

                "last_health_check": time.time(),

                "is_healthy": True,

                "cost_per_request": self._get_cost_estimate(adapter)

            }

    

    def _get_cost_estimate(self, adapter) -> float:

        # Return estimated cost per request for different providers

        cost_mapping = {

            "OpenAIAdapter": 0.002,

            "AnthropicAdapter": 0.003,

            "LocalModelAdapter": 0.0001

        }

        return cost_mapping.get(adapter.__class__.__name__, 0.001)

    

    async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

        selected_adapter = await self._select_optimal_adapter()

        

        if not selected_adapter:

            raise AIServiceException("No healthy AI services available")

        

        start_time = time.time()

        try:

            result = await selected_adapter.generate_text(prompt, parameters)

            self._record_success(selected_adapter, time.time() - start_time)

            return result

        except Exception as e:

            self._record_failure(selected_adapter)

            # Try fallback adapter if available

            fallback_adapter = await self._get_fallback_adapter(selected_adapter)

            if fallback_adapter:

                return await fallback_adapter.generate_text(prompt, parameters)

            raise e

    

    async def _select_optimal_adapter(self) -> Optional[TextGeneratorInterface]:

        healthy_adapters = []

        

        # Check health of all adapters

        for adapter in self.adapters:

            metrics = self.service_metrics[adapter.__class__.__name__]

            

            # Perform health check if it's been too long since last check

            if time.time() - metrics["last_health_check"] > 300:  # 5 minutes

                metrics["is_healthy"] = await adapter.health_check()

                metrics["last_health_check"] = time.time()

            

            if metrics["is_healthy"]:

                healthy_adapters.append(adapter)

        

        if not healthy_adapters:

            return None

        

        # Select based on priority strategy

        if self.priority == ServicePriority.COST_OPTIMIZED:

            return self._select_cheapest(healthy_adapters)

        elif self.priority == ServicePriority.PERFORMANCE_OPTIMIZED:

            return self._select_fastest(healthy_adapters)

        elif self.priority == ServicePriority.RELIABILITY_OPTIMIZED:

            return self._select_most_reliable(healthy_adapters)

        

        return healthy_adapters[0]  # Default fallback

    

    def _select_cheapest(self, adapters: List[TextGeneratorInterface]) -> TextGeneratorInterface:

        return min(adapters, key=lambda a: self.service_metrics[a.__class__.__name__]["cost_per_request"])

    

    def _select_fastest(self, adapters: List[TextGeneratorInterface]) -> TextGeneratorInterface:

        def average_response_time(adapter):

            metrics = self.service_metrics[adapter.__class__.__name__]

            times = metrics["response_times"]

            return sum(times) / len(times) if times else float('inf')

        

        return min(adapters, key=average_response_time)

    

    def _select_most_reliable(self, adapters: List[TextGeneratorInterface]) -> TextGeneratorInterface:

        return max(adapters, key=lambda a: self.service_metrics[a.__class__.__name__]["success_rate"])

    

    async def _get_fallback_adapter(self, failed_adapter: TextGeneratorInterface) -> Optional[TextGeneratorInterface]:

        remaining_adapters = [a for a in self.adapters if a != failed_adapter]

        if remaining_adapters:

            # Simple fallback: return the most reliable remaining adapter

            return self._select_most_reliable(remaining_adapters)

        return None

    

    def _record_success(self, adapter: TextGeneratorInterface, response_time: float):

        metrics = self.service_metrics[adapter.__class__.__name__]

        metrics["response_times"].append(response_time)

        

        # Keep only last 100 response times

        if len(metrics["response_times"]) > 100:

            metrics["response_times"] = metrics["response_times"][-100:]

        

        # Update success rate (simple moving average)

        current_rate = metrics["success_rate"]

        metrics["success_rate"] = (current_rate * 0.9) + (1.0 * 0.1)

    

    def _record_failure(self, adapter: TextGeneratorInterface):

        metrics = self.service_metrics[adapter.__class__.__name__]

        current_rate = metrics["success_rate"]

        metrics["success_rate"] = (current_rate * 0.9) + (0.0 * 0.1)

        metrics["is_healthy"] = False


This strategy implementation provides intelligent service selection based on different optimization criteria. The system continuously monitors the performance and reliability of each service, adapting its selection strategy based on real-world performance data. The metrics collection allows the system to learn from experience and make increasingly better decisions about which service to use for each request.


The AI Service Layer Pattern


The AI Service Layer Pattern represents a crucial architectural component that encapsulates all AI-related functionality behind a clean, well-defined interface. This pattern serves as a bridge between your application's business logic and the underlying AI infrastructure, providing a stable API that remains consistent even as the underlying AI implementations change.

The service layer should handle several key responsibilities. It manages the lifecycle of AI operations, including initialization, execution, and cleanup. It provides a consistent error handling mechanism that translates AI-specific errors into application-domain errors. It implements caching strategies to improve performance and reduce costs. It manages configuration and parameter validation to ensure that AI operations receive valid inputs.

Here's a comprehensive implementation of an AI service layer that demonstrates these principles:


import asyncio

from typing import Dict, Any, Optional, List

from dataclasses import dataclass

from datetime import datetime, timedelta

import hashlib

import json


@dataclass

class AIRequest:

    operation_type: str

    input_data: Dict[str, Any]

    parameters: Dict[str, Any]

    user_id: Optional[str] = None

    session_id: Optional[str] = None

    priority: int = 1

    timeout_seconds: int = 30


@dataclass

class AIResponse:

    result: Any

    confidence_score: Optional[float] = None

    processing_time: float = 0.0

    model_version: Optional[str] = None

    cached: bool = False

    metadata: Dict[str, Any] = None


class AIServiceLayer:

    def __init__(self, strategy: AIServiceStrategy, cache_manager=None, config_validator=None):

        self.strategy = strategy

        self.cache_manager = cache_manager or SimpleCacheManager()

        self.config_validator = config_validator or ConfigValidator()

        self.operation_handlers = {}

        self._register_default_handlers()

    

    def _register_default_handlers(self):

        self.operation_handlers = {

            "text_generation": self._handle_text_generation,

            "text_classification": self._handle_text_classification,

            "sentiment_analysis": self._handle_sentiment_analysis,

            "summarization": self._handle_summarization

        }

    

    async def process_request(self, request: AIRequest) -> AIResponse:

        # Validate the request

        validation_result = self.config_validator.validate_request(request)

        if not validation_result.is_valid:

            raise AIServiceException(f"Invalid request: {validation_result.error_message}")

        

        # Check cache first

        cache_key = self._generate_cache_key(request)

        cached_result = await self.cache_manager.get(cache_key)

        if cached_result:

            return AIResponse(

                result=cached_result["result"],

                confidence_score=cached_result.get("confidence_score"),

                processing_time=0.0,

                model_version=cached_result.get("model_version"),

                cached=True,

                metadata=cached_result.get("metadata", {})

            )

        

        # Process the request

        start_time = time.time()

        try:

            handler = self.operation_handlers.get(request.operation_type)

            if not handler:

                raise AIServiceException(f"Unsupported operation type: {request.operation_type}")

            

            result = await asyncio.wait_for(

                handler(request),

                timeout=request.timeout_seconds

            )

            

            processing_time = time.time() - start_time

            

            # Cache the result

            cache_data = {

                "result": result.result,

                "confidence_score": result.confidence_score,

                "model_version": result.model_version,

                "metadata": result.metadata or {}

            }

            await self.cache_manager.set(cache_key, cache_data, ttl=3600)  # 1 hour TTL

            

            result.processing_time = processing_time

            return result

            

        except asyncio.TimeoutError:

            raise AIServiceException(f"Request timeout after {request.timeout_seconds} seconds")

        except Exception as e:

            processing_time = time.time() - start_time

            await self._log_error(request, e, processing_time)

            raise AIServiceException(f"AI processing failed: {str(e)}")

    

    async def _handle_text_generation(self, request: AIRequest) -> AIResponse:

        prompt = request.input_data.get("prompt")

        if not prompt:

            raise AIServiceException("Prompt is required for text generation")

        

        # Preprocess the prompt if needed

        processed_prompt = self._preprocess_prompt(prompt, request.parameters)

        

        # Generate text using the strategy

        result = await self.strategy.generate_text(processed_prompt, request.parameters)

        

        # Post-process the result

        processed_result = self._postprocess_text_result(result, request.parameters)

        

        return AIResponse(

            result=processed_result,

            confidence_score=self._calculate_confidence(result),

            model_version=self._get_current_model_version(),

            metadata={"original_length": len(prompt), "generated_length": len(result)}

        )

    

    async def _handle_text_classification(self, request: AIRequest) -> AIResponse:

        text = request.input_data.get("text")

        categories = request.parameters.get("categories", [])

        

        if not text:

            raise AIServiceException("Text is required for classification")

        if not categories:

            raise AIServiceException("Categories are required for classification")

        

        # Create a classification prompt

        classification_prompt = self._create_classification_prompt(text, categories)

        

        # Use the text generation strategy for classification

        result = await self.strategy.generate_text(classification_prompt, {

            "temperature": 0.1,  # Low temperature for more deterministic results

            "max_length": 50

        })

        

        # Parse the classification result

        parsed_result = self._parse_classification_result(result, categories)

        

        return AIResponse(

            result=parsed_result,

            confidence_score=parsed_result.get("confidence", 0.0),

            model_version=self._get_current_model_version(),

            metadata={"categories_count": len(categories), "text_length": len(text)}

        )

    

    async def _handle_sentiment_analysis(self, request: AIRequest) -> AIResponse:

        text = request.input_data.get("text")

        if not text:

            raise AIServiceException("Text is required for sentiment analysis")

        

        sentiment_prompt = f"Analyze the sentiment of the following text and respond with only 'positive', 'negative', or 'neutral', followed by a confidence score from 0 to 1:\n\n{text}"

        

        result = await self.strategy.generate_text(sentiment_prompt, {

            "temperature": 0.1,

            "max_length": 20

        })

        

        parsed_sentiment = self._parse_sentiment_result(result)

        

        return AIResponse(

            result=parsed_sentiment,

            confidence_score=parsed_sentiment.get("confidence", 0.0),

            model_version=self._get_current_model_version(),

            metadata={"text_length": len(text)}

        )

    

    async def _handle_summarization(self, request: AIRequest) -> AIResponse:

        text = request.input_data.get("text")

        max_summary_length = request.parameters.get("max_summary_length", 150)

        

        if not text:

            raise AIServiceException("Text is required for summarization")

        

        summary_prompt = f"Summarize the following text in no more than {max_summary_length} words:\n\n{text}"

        

        result = await self.strategy.generate_text(summary_prompt, {

            "temperature": 0.3,

            "max_length": max_summary_length * 2  # Account for tokens vs words

        })

        

        return AIResponse(

            result={"summary": result.strip()},

            confidence_score=0.8,  # Summarization typically has good confidence

            model_version=self._get_current_model_version(),

            metadata={

                "original_length": len(text),

                "summary_length": len(result),

                "compression_ratio": len(result) / len(text)

            }

        )

    

    def _generate_cache_key(self, request: AIRequest) -> str:

        # Create a deterministic cache key based on request content

        key_data = {

            "operation_type": request.operation_type,

            "input_data": request.input_data,

            "parameters": request.parameters

        }

        key_string = json.dumps(key_data, sort_keys=True)

        return hashlib.md5(key_string.encode()).hexdigest()

    

    def _preprocess_prompt(self, prompt: str, parameters: Dict[str, Any]) -> str:

        # Add any necessary preprocessing logic

        if parameters.get("add_context", False):

            context = parameters.get("context", "")

            return f"Context: {context}\n\nPrompt: {prompt}"

        return prompt

    

    def _postprocess_text_result(self, result: str, parameters: Dict[str, Any]) -> str:

        # Add any necessary postprocessing logic

        if parameters.get("trim_whitespace", True):

            result = result.strip()

        if parameters.get("max_output_length"):

            max_length = parameters["max_output_length"]

            if len(result) > max_length:

                result = result[:max_length].rsplit(' ', 1)[0] + "..."

        return result

    

    def _calculate_confidence(self, result: str) -> float:

        # Simple confidence calculation based on result characteristics

        # In a real implementation, this might use model-specific confidence scores

        if len(result) < 10:

            return 0.3

        elif len(result) > 1000:

            return 0.9

        else:

            return 0.7

    

    def _get_current_model_version(self) -> str:

        # Return the current model version being used

        return "v1.0.0"  # This would be dynamic in a real implementation

    

    def _create_classification_prompt(self, text: str, categories: List[str]) -> str:

        categories_str = ", ".join(categories)

        return f"Classify the following text into one of these categories: {categories_str}\n\nText: {text}\n\nCategory:"

    

    def _parse_classification_result(self, result: str, categories: List[str]) -> Dict[str, Any]:

        # Simple parsing logic - in practice, this would be more sophisticated

        result_lower = result.lower().strip()

        for category in categories:

            if category.lower() in result_lower:

                return {

                    "category": category,

                    "confidence": 0.8,

                    "raw_result": result

                }

        return {

            "category": "unknown",

            "confidence": 0.1,

            "raw_result": result

        }

    

    def _parse_sentiment_result(self, result: str) -> Dict[str, Any]:

        # Parse sentiment analysis result

        result_lower = result.lower().strip()

        if "positive" in result_lower:

            sentiment = "positive"

        elif "negative" in result_lower:

            sentiment = "negative"

        elif "neutral" in result_lower:

            sentiment = "neutral"

        else:

            sentiment = "unknown"

        

        # Try to extract confidence score

        confidence = 0.5  # Default confidence

        import re

        confidence_match = re.search(r'(\d+\.?\d*)', result)

        if confidence_match:

            try:

                confidence = float(confidence_match.group(1))

                if confidence > 1:

                    confidence = confidence / 100  # Convert percentage to decimal

            except ValueError:

                pass

        

        return {

            "sentiment": sentiment,

            "confidence": confidence,

            "raw_result": result

        }

    

    async def _log_error(self, request: AIRequest, error: Exception, processing_time: float):

        # Log error details for monitoring and debugging

        error_data = {

            "operation_type": request.operation_type,

            "error_type": type(error).__name__,

            "error_message": str(error),

            "processing_time": processing_time,

            "user_id": request.user_id,

            "session_id": request.session_id,

            "timestamp": datetime.now().isoformat()

        }

        # In a real implementation, this would log to your monitoring system

        print(f"AI Service Error: {json.dumps(error_data, indent=2)}")


class SimpleCacheManager:

    def __init__(self):

        self.cache = {}

        self.expiry_times = {}

    

    async def get(self, key: str) -> Optional[Dict[str, Any]]:

        if key in self.cache:

            if datetime.now() < self.expiry_times[key]:

                return self.cache[key]

            else:

                # Expired, remove from cache

                del self.cache[key]

                del self.expiry_times[key]

        return None

    

    async def set(self, key: str, value: Dict[str, Any], ttl: int):

        self.cache[key] = value

        self.expiry_times[key] = datetime.now() + timedelta(seconds=ttl)


class ConfigValidator:

    def validate_request(self, request: AIRequest):

        # Implement validation logic

        if not request.operation_type:

            return ValidationResult(False, "Operation type is required")

        if not request.input_data:

            return ValidationResult(False, "Input data is required")

        return ValidationResult(True, "")


@dataclass

class ValidationResult:

    is_valid: bool

    error_message: str


This AI Service Layer implementation provides a comprehensive foundation for AI integration. The service layer abstracts the complexity of AI operations behind a clean interface, handles caching to improve performance, validates inputs to prevent errors, and provides consistent error handling across all AI operations. The modular design allows for easy extension with new operation types and provides hooks for monitoring and logging.


Data Pipeline Architecture for AI Systems


The data pipeline represents the backbone of any AI-integrated system, responsible for ingesting, processing, transforming, and delivering data to AI models in the correct format and at the right time. Unlike traditional data pipelines, AI data pipelines must handle additional complexities such as feature engineering, data versioning, and real-time streaming requirements.

A well-designed AI data pipeline follows the principle of immutable data transformations, where each stage of the pipeline produces new data artifacts without modifying the original input. This approach enables reproducibility, simplifies debugging, and allows for easy rollback when issues are discovered. The pipeline should also implement proper data validation at each stage to ensure data quality and catch issues early in the process.

The following implementation demonstrates a comprehensive data pipeline architecture designed specifically for AI workloads:


import asyncio

from abc import ABC, abstractmethod

from typing import Any, Dict, List, Optional, Callable, AsyncGenerator

from dataclasses import dataclass, field

from datetime import datetime

import json

import hashlib

from enum import Enum


class DataFormat(Enum):

    JSON = "json"

    CSV = "csv"

    PARQUET = "parquet"

    TEXT = "text"

    BINARY = "binary"


class PipelineStage(Enum):

    INGESTION = "ingestion"

    VALIDATION = "validation"

    TRANSFORMATION = "transformation"

    FEATURE_ENGINEERING = "feature_engineering"

    MODEL_PREPARATION = "model_preparation"

    OUTPUT = "output"


@dataclass

class DataArtifact:

    id: str

    data: Any

    format: DataFormat

    metadata: Dict[str, Any] = field(default_factory=dict)

    created_at: datetime = field(default_factory=datetime.now)

    stage: PipelineStage = PipelineStage.INGESTION

    parent_ids: List[str] = field(default_factory=list)

    validation_results: Dict[str, Any] = field(default_factory=dict)

    

    def __post_init__(self):

        if not self.id:

            self.id = self._generate_id()

    

    def _generate_id(self) -> str:

        content_hash = hashlib.md5(str(self.data).encode()).hexdigest()[:8]

        timestamp = self.created_at.strftime("%Y%m%d_%H%M%S")

        return f"{self.stage.value}_{timestamp}_{content_hash}"


class PipelineProcessor(ABC):

    def __init__(self, name: str, stage: PipelineStage):

        self.name = name

        self.stage = stage

        self.metrics = {

            "processed_count": 0,

            "error_count": 0,

            "total_processing_time": 0.0

        }

    

    @abstractmethod

    async def process(self, artifact: DataArtifact) -> DataArtifact:

        pass

    

    async def execute(self, artifact: DataArtifact) -> DataArtifact:

        start_time = time.time()

        try:

            result = await self.process(artifact)

            result.stage = self.stage

            result.parent_ids.append(artifact.id)

            

            processing_time = time.time() - start_time

            self.metrics["processed_count"] += 1

            self.metrics["total_processing_time"] += processing_time

            

            await self._log_success(artifact, result, processing_time)

            return result

            

        except Exception as e:

            self.metrics["error_count"] += 1

            await self._log_error(artifact, e, time.time() - start_time)

            raise PipelineException(f"Processor {self.name} failed: {str(e)}")

    

    async def _log_success(self, input_artifact: DataArtifact, output_artifact: DataArtifact, processing_time: float):

        log_data = {

            "processor": self.name,

            "stage": self.stage.value,

            "input_id": input_artifact.id,

            "output_id": output_artifact.id,

            "processing_time": processing_time,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Pipeline Success: {json.dumps(log_data)}")

    

    async def _log_error(self, artifact: DataArtifact, error: Exception, processing_time: float):

        log_data = {

            "processor": self.name,

            "stage": self.stage.value,

            "artifact_id": artifact.id,

            "error_type": type(error).__name__,

            "error_message": str(error),

            "processing_time": processing_time,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Pipeline Error: {json.dumps(log_data)}")


class DataIngestionProcessor(PipelineProcessor):

    def __init__(self, source_config: Dict[str, Any]):

        super().__init__("DataIngestion", PipelineStage.INGESTION)

        self.source_config = source_config

    

    async def process(self, artifact: DataArtifact) -> DataArtifact:

        # Simulate data ingestion from various sources

        source_type = self.source_config.get("type", "unknown")

        

        if source_type == "api":

            return await self._ingest_from_api(artifact)

        elif source_type == "database":

            return await self._ingest_from_database(artifact)

        elif source_type == "file":

            return await self._ingest_from_file(artifact)

        else:

            # Pass through for already ingested data

            return artifact

    

    async def _ingest_from_api(self, artifact: DataArtifact) -> DataArtifact:

        # Simulate API data ingestion

        api_url = self.source_config.get("url")

        headers = self.source_config.get("headers", {})

        

        # In a real implementation, this would make actual HTTP requests

        simulated_data = {

            "source": "api",

            "url": api_url,

            "data": artifact.data,

            "ingested_at": datetime.now().isoformat()

        }

        

        return DataArtifact(

            id="",

            data=simulated_data,

            format=DataFormat.JSON,

            metadata={

                "source_type": "api",

                "source_url": api_url,

                "ingestion_method": "http_request"

            },

            stage=PipelineStage.INGESTION

        )

    

    async def _ingest_from_database(self, artifact: DataArtifact) -> DataArtifact:

        # Simulate database data ingestion

        connection_string = self.source_config.get("connection_string")

        query = self.source_config.get("query")

        

        simulated_data = {

            "source": "database",

            "query": query,

            "data": artifact.data,

            "ingested_at": datetime.now().isoformat()

        }

        

        return DataArtifact(

            id="",

            data=simulated_data,

            format=DataFormat.JSON,

            metadata={

                "source_type": "database",

                "query": query,

                "ingestion_method": "sql_query"

            },

            stage=PipelineStage.INGESTION

        )

    

    async def _ingest_from_file(self, artifact: DataArtifact) -> DataArtifact:

        # Simulate file data ingestion

        file_path = self.source_config.get("path")

        file_format = self.source_config.get("format", "json")

        

        simulated_data = {

            "source": "file",

            "path": file_path,

            "data": artifact.data,

            "ingested_at": datetime.now().isoformat()

        }

        

        return DataArtifact(

            id="",

            data=simulated_data,

            format=DataFormat(file_format),

            metadata={

                "source_type": "file",

                "file_path": file_path,

                "file_format": file_format,

                "ingestion_method": "file_read"

            },

            stage=PipelineStage.INGESTION

        )


class DataValidationProcessor(PipelineProcessor):

    def __init__(self, validation_rules: List[Dict[str, Any]]):

        super().__init__("DataValidation", PipelineStage.VALIDATION)

        self.validation_rules = validation_rules

    

    async def process(self, artifact: DataArtifact) -> DataArtifact:

        validation_results = {}

        

        for rule in self.validation_rules:

            rule_name = rule["name"]

            rule_type = rule["type"]

            rule_config = rule.get("config", {})

            

            try:

                if rule_type == "schema_validation":

                    result = await self._validate_schema(artifact, rule_config)

                elif rule_type == "data_quality":

                    result = await self._validate_data_quality(artifact, rule_config)

                elif rule_type == "business_rules":

                    result = await self._validate_business_rules(artifact, rule_config)

                else:

                    result = {"valid": False, "error": f"Unknown rule type: {rule_type}"}

                

                validation_results[rule_name] = result

                

            except Exception as e:

                validation_results[rule_name] = {

                    "valid": False,

                    "error": str(e),

                    "exception_type": type(e).__name__

                }

        

        # Check if all validations passed

        all_valid = all(result.get("valid", False) for result in validation_results.values())

        

        if not all_valid:

            failed_rules = [name for name, result in validation_results.items() if not result.get("valid", False)]

            raise PipelineException(f"Data validation failed for rules: {failed_rules}")

        

        # Create new artifact with validation results

        validated_artifact = DataArtifact(

            id="",

            data=artifact.data,

            format=artifact.format,

            metadata={**artifact.metadata, "validation_passed": True},

            stage=PipelineStage.VALIDATION,

            validation_results=validation_results

        )

        

        return validated_artifact

    

    async def _validate_schema(self, artifact: DataArtifact, config: Dict[str, Any]) -> Dict[str, Any]:

        required_fields = config.get("required_fields", [])

        field_types = config.get("field_types", {})

        

        if not isinstance(artifact.data, dict):

            return {"valid": False, "error": "Data must be a dictionary for schema validation"}

        

        # Check required fields

        missing_fields = [field for field in required_fields if field not in artifact.data]

        if missing_fields:

            return {"valid": False, "error": f"Missing required fields: {missing_fields}"}

        

        # Check field types

        type_errors = []

        for field, expected_type in field_types.items():

            if field in artifact.data:

                actual_type = type(artifact.data[field]).__name__

                if actual_type != expected_type:

                    type_errors.append(f"{field}: expected {expected_type}, got {actual_type}")

        

        if type_errors:

            return {"valid": False, "error": f"Type validation errors: {type_errors}"}

        

        return {"valid": True, "message": "Schema validation passed"}

    

    async def _validate_data_quality(self, artifact: DataArtifact, config: Dict[str, Any]) -> Dict[str, Any]:

        min_records = config.get("min_records", 0)

        max_null_percentage = config.get("max_null_percentage", 100)

        

        if isinstance(artifact.data, list):

            record_count = len(artifact.data)

            if record_count < min_records:

                return {"valid": False, "error": f"Insufficient records: {record_count} < {min_records}"}

        

        # Additional quality checks would go here

        return {"valid": True, "message": "Data quality validation passed"}

    

    async def _validate_business_rules(self, artifact: DataArtifact, config: Dict[str, Any]) -> Dict[str, Any]:

        # Implement custom business rule validation

        rules = config.get("rules", [])

        

        for rule in rules:

            rule_expression = rule.get("expression")

            rule_description = rule.get("description", "Unknown rule")

            

            # Simple rule evaluation (in practice, this would be more sophisticated)

            if rule_expression and not self._evaluate_rule(artifact.data, rule_expression):

                return {"valid": False, "error": f"Business rule failed: {rule_description}"}

        

        return {"valid": True, "message": "Business rules validation passed"}

    

    def _evaluate_rule(self, data: Any, expression: str) -> bool:

        # Simple rule evaluation - in practice, this would use a proper expression evaluator

        # For demonstration purposes, we'll just return True

        return True


class FeatureEngineeringProcessor(PipelineProcessor):

    def __init__(self, feature_config: Dict[str, Any]):

        super().__init__("FeatureEngineering", PipelineStage.FEATURE_ENGINEERING)

        self.feature_config = feature_config

    

    async def process(self, artifact: DataArtifact) -> DataArtifact:

        if not isinstance(artifact.data, dict):

            raise PipelineException("Feature engineering requires dictionary data")

        

        engineered_features = {}

        original_data = artifact.data.copy()

        

        # Apply feature transformations

        for feature_name, feature_def in self.feature_config.items():

            try:

                feature_value = await self._create_feature(original_data, feature_def)

                engineered_features[feature_name] = feature_value

            except Exception as e:

                raise PipelineException(f"Failed to create feature {feature_name}: {str(e)}")

        

        # Combine original data with engineered features

        enhanced_data = {**original_data, **engineered_features}

        

        return DataArtifact(

            id="",

            data=enhanced_data,

            format=artifact.format,

            metadata={

                **artifact.metadata,

                "feature_count": len(engineered_features),

                "feature_names": list(engineered_features.keys())

            },

            stage=PipelineStage.FEATURE_ENGINEERING

        )

    

    async def _create_feature(self, data: Dict[str, Any], feature_def: Dict[str, Any]) -> Any:

        feature_type = feature_def.get("type")

        

        if feature_type == "text_length":

            field_name = feature_def["source_field"]

            text_value = data.get(field_name, "")

            return len(str(text_value))

        

        elif feature_type == "word_count":

            field_name = feature_def["source_field"]

            text_value = data.get(field_name, "")

            return len(str(text_value).split())

        

        elif feature_type == "categorical_encoding":

            field_name = feature_def["source_field"]

            mapping = feature_def["mapping"]

            value = data.get(field_name)

            return mapping.get(value, mapping.get("default", 0))

        

        elif feature_type == "numerical_binning":

            field_name = feature_def["source_field"]

            bins = feature_def["bins"]

            value = data.get(field_name, 0)

            

            for i, bin_threshold in enumerate(bins):

                if value <= bin_threshold:

                    return i

            return len(bins)

        

        elif feature_type == "text_sentiment_score":

            field_name = feature_def["source_field"]

            text_value = data.get(field_name, "")

            # Simplified sentiment scoring

            positive_words = ["good", "great", "excellent", "amazing", "wonderful"]

            negative_words = ["bad", "terrible", "awful", "horrible", "disappointing"]

            

            text_lower = str(text_value).lower()

            positive_count = sum(1 for word in positive_words if word in text_lower)

            negative_count = sum(1 for word in negative_words if word in text_lower)

            

            return positive_count - negative_count

        

        else:

            raise PipelineException(f"Unknown feature type: {feature_type}")


class AIPipelineOrchestrator:

    def __init__(self):

        self.processors: List[PipelineProcessor] = []

        self.pipeline_metadata = {

            "created_at": datetime.now(),

            "version": "1.0.0",

            "total_executions": 0,

            "successful_executions": 0,

            "failed_executions": 0

        }

    

    def add_processor(self, processor: PipelineProcessor):

        self.processors.append(processor)

    

    async def execute_pipeline(self, initial_data: Any, data_format: DataFormat = DataFormat.JSON) -> DataArtifact:

        # Create initial artifact

        current_artifact = DataArtifact(

            id="",

            data=initial_data,

            format=data_format,

            metadata={"pipeline_version": self.pipeline_metadata["version"]},

            stage=PipelineStage.INGESTION

        )

        

        self.pipeline_metadata["total_executions"] += 1

        

        try:

            # Execute each processor in sequence

            for processor in self.processors:

                current_artifact = await processor.execute(current_artifact)

                await self._checkpoint_artifact(current_artifact)

            

            self.pipeline_metadata["successful_executions"] += 1

            await self._log_pipeline_success(current_artifact)

            return current_artifact

            

        except Exception as e:

            self.pipeline_metadata["failed_executions"] += 1

            await self._log_pipeline_failure(current_artifact, e)

            raise PipelineException(f"Pipeline execution failed: {str(e)}")

    

    async def execute_pipeline_stream(self, data_stream: AsyncGenerator[Any, None], data_format: DataFormat = DataFormat.JSON) -> AsyncGenerator[DataArtifact, None]:

        async for data_item in data_stream:

            try:

                result = await self.execute_pipeline(data_item, data_format)

                yield result

            except Exception as e:

                # Log error but continue processing other items

                await self._log_stream_error(data_item, e)

                continue

    

    async def _checkpoint_artifact(self, artifact: DataArtifact):

        # In a real implementation, this would save the artifact to persistent storage

        checkpoint_data = {

            "artifact_id": artifact.id,

            "stage": artifact.stage.value,

            "timestamp": datetime.now().isoformat(),

            "metadata": artifact.metadata

        }

        print(f"Checkpoint: {json.dumps(checkpoint_data)}")

    

    async def _log_pipeline_success(self, final_artifact: DataArtifact):

        log_data = {

            "pipeline_status": "success",

            "final_artifact_id": final_artifact.id,

            "final_stage": final_artifact.stage.value,

            "execution_count": self.pipeline_metadata["total_executions"],

            "timestamp": datetime.now().isoformat()

        }

        print(f"Pipeline Success: {json.dumps(log_data)}")

    

    async def _log_pipeline_failure(self, artifact: DataArtifact, error: Exception):

        log_data = {

            "pipeline_status": "failure",

            "failed_artifact_id": artifact.id,

            "failed_stage": artifact.stage.value,

            "error_type": type(error).__name__,

            "error_message": str(error),

            "execution_count": self.pipeline_metadata["total_executions"],

            "timestamp": datetime.now().isoformat()

        }

        print(f"Pipeline Failure: {json.dumps(log_data)}")

    

    async def _log_stream_error(self, data_item: Any, error: Exception):

        log_data = {

            "stream_error": True,

            "error_type": type(error).__name__,

            "error_message": str(error),

            "timestamp": datetime.now().isoformat()

        }

        print(f"Stream Error: {json.dumps(log_data)}")

    

    def get_pipeline_metrics(self) -> Dict[str, Any]:

        processor_metrics = {}

        for processor in self.processors:

            processor_metrics[processor.name] = processor.metrics

        

        return {

            "pipeline_metadata": self.pipeline_metadata,

            "processor_metrics": processor_metrics

        }


class PipelineException(Exception):

    pass


This data pipeline architecture provides a robust foundation for AI data processing. The pipeline supports multiple data formats, implements comprehensive validation and feature engineering capabilities, and provides detailed logging and metrics collection. The modular design allows for easy extension with new processors, and the checkpoint system enables recovery from failures. The streaming support allows for real-time data processing, which is essential for many AI applications.


Error Handling and Resilience Patterns

Error handling in AI-integrated systems requires a sophisticated approach that goes beyond traditional software error handling. AI systems introduce unique failure modes such as model inference timeouts, unexpected model outputs, rate limiting from external AI services, and gradual model performance degradation. A resilient AI system must anticipate these failures and implement appropriate recovery strategies.

The Circuit Breaker Pattern is particularly valuable in AI systems because it prevents cascading failures when AI services become unavailable or start responding slowly. When an AI service begins failing, the circuit breaker opens and prevents further requests from reaching the failing service, instead returning cached results or fallback responses. This approach protects the overall system stability while allowing the failing service time to recover.

Here's a comprehensive implementation of error handling and resilience patterns specifically designed for AI systems:


import asyncio

import time

from enum import Enum

from typing import Dict, Any, Optional, Callable, List

from dataclasses import dataclass, field

from datetime import datetime, timedelta

import random

import json


class CircuitBreakerState(Enum):

    CLOSED = "closed"

    OPEN = "open"

    HALF_OPEN = "half_open"


class ErrorType(Enum):

    TIMEOUT = "timeout"

    RATE_LIMIT = "rate_limit"

    MODEL_ERROR = "model_error"

    NETWORK_ERROR = "network_error"

    AUTHENTICATION_ERROR = "authentication_error"

    QUOTA_EXCEEDED = "quota_exceeded"

    UNKNOWN = "unknown"


@dataclass

class ErrorContext:

    error_type: ErrorType

    message: str

    timestamp: datetime

    request_id: Optional[str] = None

    user_id: Optional[str] = None

    retry_count: int = 0

    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass

class RetryPolicy:

    max_attempts: int = 3

    base_delay: float = 1.0

    max_delay: float = 60.0

    exponential_base: float = 2.0

    jitter: bool = True

    retryable_errors: List[ErrorType] = field(default_factory=lambda: [

        ErrorType.TIMEOUT,

        ErrorType.NETWORK_ERROR,

        ErrorType.RATE_LIMIT

    ])


class AICircuitBreaker:

    def __init__(self, 

                 failure_threshold: int = 5,

                 recovery_timeout: int = 60,

                 expected_exception: type = Exception,

                 name: str = "AICircuitBreaker"):

        self.failure_threshold = failure_threshold

        self.recovery_timeout = recovery_timeout

        self.expected_exception = expected_exception

        self.name = name

        

        self.failure_count = 0

        self.last_failure_time = None

        self.state = CircuitBreakerState.CLOSED

        self.success_count = 0

        self.total_requests = 0

        

        # Metrics for monitoring

        self.metrics = {

            "total_requests": 0,

            "successful_requests": 0,

            "failed_requests": 0,

            "circuit_opened_count": 0,

            "circuit_closed_count": 0,

            "current_state": self.state.value,

            "last_state_change": datetime.now().isoformat()

        }

    

    async def call(self, func: Callable, *args, **kwargs):

        self.total_requests += 1

        self.metrics["total_requests"] += 1

        

        if self.state == CircuitBreakerState.OPEN:

            if self._should_attempt_reset():

                self._move_to_half_open()

            else:

                raise CircuitBreakerOpenException(f"Circuit breaker {self.name} is OPEN")

        

        try:

            result = await func(*args, **kwargs)

            self._on_success()

            return result

            

        except self.expected_exception as e:

            self._on_failure()

            raise e

    

    def _should_attempt_reset(self) -> bool:

        return (self.last_failure_time and 

                time.time() - self.last_failure_time >= self.recovery_timeout)

    

    def _move_to_half_open(self):

        self.state = CircuitBreakerState.HALF_OPEN

        self.success_count = 0

        self.metrics["current_state"] = self.state.value

        self.metrics["last_state_change"] = datetime.now().isoformat()

        print(f"Circuit breaker {self.name} moved to HALF_OPEN state")

    

    def _on_success(self):

        self.failure_count = 0

        self.metrics["successful_requests"] += 1

        

        if self.state == CircuitBreakerState.HALF_OPEN:

            self.success_count += 1

            if self.success_count >= 3:  # Require 3 successes to close

                self._move_to_closed()

    

    def _on_failure(self):

        self.failure_count += 1

        self.last_failure_time = time.time()

        self.metrics["failed_requests"] += 1

        

        if self.state == CircuitBreakerState.HALF_OPEN:

            self._move_to_open()

        elif self.failure_count >= self.failure_threshold:

            self._move_to_open()

    

    def _move_to_open(self):

        self.state = CircuitBreakerState.OPEN

        self.metrics["circuit_opened_count"] += 1

        self.metrics["current_state"] = self.state.value

        self.metrics["last_state_change"] = datetime.now().isoformat()

        print(f"Circuit breaker {self.name} moved to OPEN state after {self.failure_count} failures")

    

    def _move_to_closed(self):

        self.state = CircuitBreakerState.CLOSED

        self.failure_count = 0

        self.metrics["circuit_closed_count"] += 1

        self.metrics["current_state"] = self.state.value

        self.metrics["last_state_change"] = datetime.now().isoformat()

        print(f"Circuit breaker {self.name} moved to CLOSED state")

    

    def get_metrics(self) -> Dict[str, Any]:

        return {

            **self.metrics,

            "failure_rate": self.metrics["failed_requests"] / max(self.metrics["total_requests"], 1),

            "current_failure_count": self.failure_count,

            "time_since_last_failure": time.time() - (self.last_failure_time or time.time())

        }


class RetryHandler:

    def __init__(self, policy: RetryPolicy):

        self.policy = policy

    

    async def execute_with_retry(self, 

                                func: Callable, 

                                error_context: ErrorContext,

                                *args, **kwargs) -> Any:

        last_exception = None

        

        for attempt in range(self.policy.max_attempts):

            try:

                if attempt > 0:

                    delay = self._calculate_delay(attempt)

                    await asyncio.sleep(delay)

                

                result = await func(*args, **kwargs)

                

                if attempt > 0:

                    await self._log_retry_success(error_context, attempt)

                

                return result

                

            except Exception as e:

                last_exception = e

                error_type = self._classify_error(e)

                error_context.error_type = error_type

                error_context.retry_count = attempt + 1

                

                if not self._should_retry(error_type, attempt):

                    await self._log_retry_exhausted(error_context, e)

                    break

                

                await self._log_retry_attempt(error_context, e, attempt)

        

        raise last_exception

    

    def _calculate_delay(self, attempt: int) -> float:

        delay = self.policy.base_delay * (self.policy.exponential_base ** (attempt - 1))

        delay = min(delay, self.policy.max_delay)

        

        if self.policy.jitter:

            jitter = random.uniform(0, 0.1) * delay

            delay += jitter

        

        return delay

    

    def _classify_error(self, error: Exception) -> ErrorType:

        error_message = str(error).lower()

        

        if "timeout" in error_message:

            return ErrorType.TIMEOUT

        elif "rate limit" in error_message or "too many requests" in error_message:

            return ErrorType.RATE_LIMIT

        elif "network" in error_message or "connection" in error_message:

            return ErrorType.NETWORK_ERROR

        elif "authentication" in error_message or "unauthorized" in error_message:

            return ErrorType.AUTHENTICATION_ERROR

        elif "quota" in error_message or "limit exceeded" in error_message:

            return ErrorType.QUOTA_EXCEEDED

        elif "model" in error_message:

            return ErrorType.MODEL_ERROR

        else:

            return ErrorType.UNKNOWN

    

    def _should_retry(self, error_type: ErrorType, attempt: int) -> bool:

        if attempt >= self.policy.max_attempts - 1:

            return False

        

        return error_type in self.policy.retryable_errors

    

    async def _log_retry_attempt(self, context: ErrorContext, error: Exception, attempt: int):

        log_data = {

            "event": "retry_attempt",

            "attempt": attempt + 1,

            "max_attempts": self.policy.max_attempts,

            "error_type": context.error_type.value,

            "error_message": str(error),

            "request_id": context.request_id,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Retry Attempt: {json.dumps(log_data)}")

    

    async def _log_retry_success(self, context: ErrorContext, final_attempt: int):

        log_data = {

            "event": "retry_success",

            "final_attempt": final_attempt + 1,

            "request_id": context.request_id,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Retry Success: {json.dumps(log_data)}")

    

    async def _log_retry_exhausted(self, context: ErrorContext, final_error: Exception):

        log_data = {

            "event": "retry_exhausted",

            "total_attempts": self.policy.max_attempts,

            "final_error_type": context.error_type.value,

            "final_error_message": str(final_error),

            "request_id": context.request_id,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Retry Exhausted: {json.dumps(log_data)}")


class FallbackStrategy:

    def __init__(self):

        self.fallback_handlers = {}

        self.metrics = {

            "fallback_invocations": 0,

            "fallback_successes": 0,

            "fallback_failures": 0

        }

    

    def register_fallback(self, error_type: ErrorType, handler: Callable):

        self.fallback_handlers[error_type] = handler

    

    async def execute_fallback(self, 

                              error_context: ErrorContext, 

                              original_args: tuple, 

                              original_kwargs: dict) -> Any:

        self.metrics["fallback_invocations"] += 1

        

        handler = self.fallback_handlers.get(error_context.error_type)

        if not handler:

            handler = self.fallback_handlers.get(ErrorType.UNKNOWN)

        

        if not handler:

            self.metrics["fallback_failures"] += 1

            raise FallbackException(f"No fallback handler for error type: {error_context.error_type}")

        

        try:

            result = await handler(error_context, *original_args, **original_kwargs)

            self.metrics["fallback_successes"] += 1

            await self._log_fallback_success(error_context)

            return result

            

        except Exception as e:

            self.metrics["fallback_failures"] += 1

            await self._log_fallback_failure(error_context, e)

            raise FallbackException(f"Fallback handler failed: {str(e)}")

    

    async def _log_fallback_success(self, context: ErrorContext):

        log_data = {

            "event": "fallback_success",

            "original_error_type": context.error_type.value,

            "request_id": context.request_id,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Fallback Success: {json.dumps(log_data)}")

    

    async def _log_fallback_failure(self, context: ErrorContext, error: Exception):

        log_data = {

            "event": "fallback_failure",

            "original_error_type": context.error_type.value,

            "fallback_error": str(error),

            "request_id": context.request_id,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Fallback Failure: {json.dumps(log_data)}")


class ResilientAIService:

    def __init__(self, 

                 ai_service: Any,

                 circuit_breaker: AICircuitBreaker,

                 retry_handler: RetryHandler,

                 fallback_strategy: FallbackStrategy):

        self.ai_service = ai_service

        self.circuit_breaker = circuit_breaker

        self.retry_handler = retry_handler

        self.fallback_strategy = fallback_strategy

        

        # Register default fallback handlers

        self._register_default_fallbacks()

    

    def _register_default_fallbacks(self):

        self.fallback_strategy.register_fallback(

            ErrorType.TIMEOUT, 

            self._timeout_fallback

        )

        self.fallback_strategy.register_fallback(

            ErrorType.RATE_LIMIT, 

            self._rate_limit_fallback

        )

        self.fallback_strategy.register_fallback(

            ErrorType.MODEL_ERROR, 

            self._model_error_fallback

        )

        self.fallback_strategy.register_fallback(

            ErrorType.UNKNOWN, 

            self._generic_fallback

        )

    

    async def generate_text(self, prompt: str, parameters: Dict[str, Any], request_id: str = None) -> str:

        error_context = ErrorContext(

            error_type=ErrorType.UNKNOWN,

            message="",

            timestamp=datetime.now(),

            request_id=request_id or self._generate_request_id()

        )

        

        try:

            # First, try with circuit breaker protection

            result = await self.circuit_breaker.call(

                self._execute_with_retry,

                prompt,

                parameters,

                error_context

            )

            return result

            

        except (CircuitBreakerOpenException, Exception) as e:

            # If circuit breaker is open or all retries failed, try fallback

            error_context.message = str(e)

            error_context.error_type = self.retry_handler._classify_error(e)

            

            try:

                fallback_result = await self.fallback_strategy.execute_fallback(

                    error_context,

                    (prompt, parameters),

                    {}

                )

                return fallback_result

                

            except FallbackException:

                # If fallback also fails, raise the original error

                raise AIServiceException(f"All resilience strategies failed: {str(e)}")

    

    async def _execute_with_retry(self, 

                                 prompt: str, 

                                 parameters: Dict[str, Any], 

                                 error_context: ErrorContext) -> str:

        return await self.retry_handler.execute_with_retry(

            self.ai_service.generate_text,

            error_context,

            prompt,

            parameters

        )

    

    async def _timeout_fallback(self, 

                               error_context: ErrorContext, 

                               prompt: str, 

                               parameters: Dict[str, Any]) -> str:

        # Return a simplified response for timeout scenarios

        return f"[Response generated with reduced complexity due to timeout] Based on your request: {prompt[:100]}..."

    

    async def _rate_limit_fallback(self, 

                                  error_context: ErrorContext, 

                                  prompt: str, 

                                  parameters: Dict[str, Any]) -> str:

        # Return a cached or pre-generated response for rate limiting

        return "[Cached response due to rate limiting] I'm currently experiencing high demand. Please try again in a few moments."

    

    async def _model_error_fallback(self, 

                                   error_context: ErrorContext, 

                                   prompt: str, 

                                   parameters: Dict[str, Any]) -> str:

        # Return a safe response when the model encounters an error

        return "[Safe response due to model error] I encountered an issue processing your request. Please rephrase and try again."

    

    async def _generic_fallback(self, 

                               error_context: ErrorContext, 

                               prompt: str, 

                               parameters: Dict[str, Any]) -> str:

        # Generic fallback for unknown errors

        return "[Generic response due to service error] I'm temporarily unable to process your request. Please try again later."

    

    def _generate_request_id(self) -> str:

        import uuid

        return str(uuid.uuid4())

    

    def get_health_status(self) -> Dict[str, Any]:

        return {

            "circuit_breaker": self.circuit_breaker.get_metrics(),

            "fallback_strategy": self.fallback_strategy.metrics,

            "service_status": "operational" if self.circuit_breaker.state == CircuitBreakerState.CLOSED else "degraded"

        }


# Custom exceptions

class CircuitBreakerOpenException(Exception):

    pass


class FallbackException(Exception):

    pass


class AIServiceException(Exception):

    pass


# Example usage and testing

async def example_usage():

    # Create a mock AI service for demonstration

    class MockAIService:

        def __init__(self):

            self.call_count = 0

            self.failure_rate = 0.3  # 30% failure rate for testing

        

        async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

            self.call_count += 1

            

            # Simulate various types of failures

            if random.random() < self.failure_rate:

                failure_type = random.choice([

                    "timeout",

                    "rate_limit",

                    "model_error",

                    "network_error"

                ])

                

                if failure_type == "timeout":

                    raise Exception("Request timeout after 30 seconds")

                elif failure_type == "rate_limit":

                    raise Exception("Rate limit exceeded - too many requests")

                elif failure_type == "model_error":

                    raise Exception("Model inference error - invalid input")

                else:

                    raise Exception("Network connection error")

            

            # Simulate successful response

            await asyncio.sleep(0.1)  # Simulate processing time

            return f"Generated response for: {prompt[:50]}..."

    

    # Set up resilient AI service

    mock_service = MockAIService()

    circuit_breaker = AICircuitBreaker(failure_threshold=3, recovery_timeout=10)

    retry_policy = RetryPolicy(max_attempts=3, base_delay=0.5)

    retry_handler = RetryHandler(retry_policy)

    fallback_strategy = FallbackStrategy()

    

    resilient_service = ResilientAIService(

        mock_service,

        circuit_breaker,

        retry_handler,

        fallback_strategy

    )

    

    # Test the resilient service

    for i in range(10):

        try:

            result = await resilient_service.generate_text(

                f"Test prompt {i}",

                {"temperature": 0.7},

                f"request_{i}"

            )

            print(f"Request {i}: Success - {result[:100]}")

            

        except Exception as e:

            print(f"Request {i}: Failed - {str(e)}")

        

        await asyncio.sleep(0.5)

    

    # Print final health status

    health_status = resilient_service.get_health_status()

    print(f"\nFinal Health Status: {json.dumps(health_status, indent=2)}")


# Uncomment to run the example

# asyncio.run(example_usage())


This resilience implementation provides comprehensive error handling for AI systems. The circuit breaker prevents cascade failures by temporarily blocking requests to failing services. The retry handler implements intelligent retry logic with exponential backoff and jitter. The fallback strategy provides graceful degradation when primary AI services are unavailable. Together, these patterns ensure that AI-integrated systems remain operational even when individual AI components experience failures.


Security and Privacy Considerations

Security in AI-integrated systems encompasses traditional application security concerns as well as AI-specific vulnerabilities such as prompt injection attacks, data poisoning, model extraction, and privacy leakage through model outputs. A comprehensive security strategy must address these unique challenges while maintaining the functionality and performance of the AI system.

Data privacy is particularly critical in AI systems because models can inadvertently memorize and later reveal sensitive information from their training data. Additionally, user inputs to AI systems often contain personal or confidential information that must be protected throughout the processing pipeline. The following implementation demonstrates a comprehensive security and privacy framework for AI systems:


import hashlib

import hmac

import secrets

import re

from typing import Dict, Any, List, Optional, Set

from dataclasses import dataclass, field

from datetime import datetime, timedelta

from enum import Enum

import json

import asyncio


class SecurityLevel(Enum):

    PUBLIC = "public"

    INTERNAL = "internal"

    CONFIDENTIAL = "confidential"

    RESTRICTED = "restricted"


class PIIType(Enum):

    EMAIL = "email"

    PHONE = "phone"

    SSN = "ssn"

    CREDIT_CARD = "credit_card"

    NAME = "name"

    ADDRESS = "address"

    IP_ADDRESS = "ip_address"

    CUSTOM = "custom"


@dataclass

class SecurityContext:

    user_id: str

    session_id: str

    security_level: SecurityLevel

    permissions: Set[str] = field(default_factory=set)

    ip_address: Optional[str] = None

    user_agent: Optional[str] = None

    timestamp: datetime = field(default_factory=datetime.now)

    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass

class PIIDetectionResult:

    found_pii: bool

    pii_types: List[PIIType]

    masked_content: str

    confidence_scores: Dict[PIIType, float]

    locations: Dict[PIIType, List[tuple]]  # (start, end) positions


class InputSanitizer:

    def __init__(self):

        self.dangerous_patterns = [

            # Prompt injection patterns

            r"ignore\s+previous\s+instructions",

            r"forget\s+everything\s+above",

            r"act\s+as\s+if\s+you\s+are",

            r"pretend\s+to\s+be",

            r"role\s*:\s*system",

            r"<\s*system\s*>",

            r"```\s*system",

            

            # Code injection patterns

            r"<script[^>]*>",

            r"javascript:",

            r"eval\s*\(",

            r"exec\s*\(",

            r"import\s+os",

            r"__import__",

            

            # SQL injection patterns

            r"union\s+select",

            r"drop\s+table",

            r"delete\s+from",

            r"insert\s+into",

            

            # Command injection patterns

            r";\s*rm\s+",

            r";\s*cat\s+",

            r";\s*ls\s+",

            r"\|\s*nc\s+",

        ]

        

        self.compiled_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.dangerous_patterns]

        

        # Maximum input lengths to prevent resource exhaustion

        self.max_input_length = 10000

        self.max_parameter_count = 50

    

    async def sanitize_input(self, content: str, security_context: SecurityContext) -> str:

        if not content:

            return content

        

        # Check input length

        if len(content) > self.max_input_length:

            raise SecurityException(f"Input exceeds maximum length of {self.max_input_length} characters")

        

        # Detect and block dangerous patterns

        for pattern in self.compiled_patterns:

            if pattern.search(content):

                await self._log_security_violation(

                    "dangerous_pattern_detected",

                    pattern.pattern,

                    security_context

                )

                raise SecurityException(f"Potentially dangerous input pattern detected")

        

        # Normalize and sanitize content

        sanitized = self._normalize_content(content)

        sanitized = self._remove_control_characters(sanitized)

        

        return sanitized

    

    async def sanitize_parameters(self, parameters: Dict[str, Any], security_context: SecurityContext) -> Dict[str, Any]:

        if len(parameters) > self.max_parameter_count:

            raise SecurityException(f"Too many parameters: {len(parameters)} > {self.max_parameter_count}")

        

        sanitized_params = {}

        

        for key, value in parameters.items():

            # Sanitize parameter keys

            if not self._is_safe_parameter_name(key):

                await self._log_security_violation(

                    "unsafe_parameter_name",

                    key,

                    security_context

                )

                continue

            

            # Sanitize parameter values

            if isinstance(value, str):

                sanitized_value = await self.sanitize_input(value, security_context)

            elif isinstance(value, (int, float, bool)):

                sanitized_value = value

            elif isinstance(value, dict):

                sanitized_value = await self.sanitize_parameters(value, security_context)

            elif isinstance(value, list):

                sanitized_value = [await self.sanitize_input(str(item), security_context) 

                                 if isinstance(item, str) else item for item in value]

            else:

                # Convert unknown types to string and sanitize

                sanitized_value = await self.sanitize_input(str(value), security_context)

            

            sanitized_params[key] = sanitized_value

        

        return sanitized_params

    

    def _normalize_content(self, content: str) -> str:

        # Normalize unicode characters

        import unicodedata

        normalized = unicodedata.normalize('NFKC', content)

        

        # Remove excessive whitespace

        normalized = re.sub(r'\s+', ' ', normalized)

        

        return normalized.strip()

    

    def _remove_control_characters(self, content: str) -> str:

        # Remove control characters except common ones like newlines and tabs

        allowed_control = {'\n', '\t', '\r'}

        return ''.join(char for char in content 

                      if char in allowed_control or not char.iscntrl())

    

    def _is_safe_parameter_name(self, name: str) -> bool:

        # Only allow alphanumeric characters and underscores

        return re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name) is not None

    

    async def _log_security_violation(self, violation_type: str, details: str, context: SecurityContext):

        log_data = {

            "event": "security_violation",

            "violation_type": violation_type,

            "details": details,

            "user_id": context.user_id,

            "session_id": context.session_id,

            "ip_address": context.ip_address,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Security Violation: {json.dumps(log_data)}")


class PIIDetector:

    def __init__(self):

        self.pii_patterns = {

            PIIType.EMAIL: re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),

            PIIType.PHONE: re.compile(r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b'),

            PIIType.SSN: re.compile(r'\b\d{3}-?\d{2}-?\d{4}\b'),

            PIIType.CREDIT_CARD: re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),

            PIIType.IP_ADDRESS: re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),

        }

        

        # Name detection patterns (simple approach)

        self.name_patterns = [

            re.compile(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b'),  # First Last

            re.compile(r'\b[A-Z][a-z]+ [A-Z]\. [A-Z][a-z]+\b'),  # First M. Last

        ]

        

        # Address patterns (simplified)

        self.address_patterns = [

            re.compile(r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b', re.IGNORECASE),

        ]

    

    async def detect_pii(self, content: str) -> PIIDetectionResult:

        found_pii = False

        pii_types = []

        confidence_scores = {}

        locations = {}

        masked_content = content

        

        # Check each PII type

        for pii_type, pattern in self.pii_patterns.items():

            matches = list(pattern.finditer(content))

            if matches:

                found_pii = True

                pii_types.append(pii_type)

                confidence_scores[pii_type] = self._calculate_confidence(pii_type, matches)

                locations[pii_type] = [(match.start(), match.end()) for match in matches]

                

                # Mask the PII in the content

                masked_content = pattern.sub(self._get_mask_string(pii_type), masked_content)

        

        # Check for names

        name_matches = []

        for pattern in self.name_patterns:

            name_matches.extend(pattern.finditer(content))

        

        if name_matches:

            found_pii = True

            pii_types.append(PIIType.NAME)

            confidence_scores[PIIType.NAME] = 0.7  # Medium confidence for name detection

            locations[PIIType.NAME] = [(match.start(), match.end()) for match in name_matches]

            

            # Mask names

            for pattern in self.name_patterns:

                masked_content = pattern.sub("[NAME]", masked_content)

        

        # Check for addresses

        address_matches = []

        for pattern in self.address_patterns:

            address_matches.extend(pattern.finditer(content))

        

        if address_matches:

            found_pii = True

            pii_types.append(PIIType.ADDRESS)

            confidence_scores[PIIType.ADDRESS] = 0.6  # Lower confidence for address detection

            locations[PIIType.ADDRESS] = [(match.start(), match.end()) for match in address_matches]

            

            # Mask addresses

            for pattern in self.address_patterns:

                masked_content = pattern.sub("[ADDRESS]", masked_content)

        

        return PIIDetectionResult(

            found_pii=found_pii,

            pii_types=pii_types,

            masked_content=masked_content,

            confidence_scores=confidence_scores,

            locations=locations

        )

    

    def _calculate_confidence(self, pii_type: PIIType, matches: List) -> float:

        # Simple confidence calculation based on pattern strength

        confidence_map = {

            PIIType.EMAIL: 0.95,

            PIIType.PHONE: 0.85,

            PIIType.SSN: 0.98,

            PIIType.CREDIT_CARD: 0.90,

            PIIType.IP_ADDRESS: 0.80,

        }

        return confidence_map.get(pii_type, 0.5)

    

    def _get_mask_string(self, pii_type: PIIType) -> str:

        mask_map = {

            PIIType.EMAIL: "[EMAIL]",

            PIIType.PHONE: "[PHONE]",

            PIIType.SSN: "[SSN]",

            PIIType.CREDIT_CARD: "[CREDIT_CARD]",

            PIIType.IP_ADDRESS: "[IP_ADDRESS]",

        }

        return mask_map.get(pii_type, "[PII]")


class AccessController:

    def __init__(self):

        self.permissions_cache = {}

        self.rate_limits = {}

        self.blocked_users = set()

        

        # Define permission requirements for different operations

        self.operation_permissions = {

            "text_generation": {"ai_text_generation"},

            "text_classification": {"ai_classification"},

            "sentiment_analysis": {"ai_sentiment"},

            "summarization": {"ai_summarization"},

            "admin_operations": {"admin", "system_admin"}

        }

    

    async def check_access(self, operation: str, security_context: SecurityContext) -> bool:

        # Check if user is blocked

        if security_context.user_id in self.blocked_users:

            await self._log_access_denied("user_blocked", operation, security_context)

            return False

        

        # Check rate limits

        if not await self._check_rate_limit(security_context):

            await self._log_access_denied("rate_limit_exceeded", operation, security_context)

            return False

        

        # Check permissions

        required_permissions = self.operation_permissions.get(operation, set())

        if required_permissions and not required_permissions.issubset(security_context.permissions):

            await self._log_access_denied("insufficient_permissions", operation, security_context)

            return False

        

        # Check security level requirements

        if not await self._check_security_level(operation, security_context):

            await self._log_access_denied("insufficient_security_level", operation, security_context)

            return False

        

        await self._log_access_granted(operation, security_context)

        return True

    

    async def _check_rate_limit(self, security_context: SecurityContext) -> bool:

        user_id = security_context.user_id

        current_time = datetime.now()

        

        # Initialize rate limit tracking for new users

        if user_id not in self.rate_limits:

            self.rate_limits[user_id] = {

                "requests": [],

                "window_start": current_time

            }

        

        user_limits = self.rate_limits[user_id]

        

        # Clean old requests (sliding window of 1 hour)

        window_duration = timedelta(hours=1)

        cutoff_time = current_time - window_duration

        user_limits["requests"] = [req_time for req_time in user_limits["requests"] 

                                  if req_time > cutoff_time]

        

        # Check if user has exceeded rate limit (100 requests per hour)

        max_requests = 100

        if len(user_limits["requests"]) >= max_requests:

            return False

        

        # Add current request

        user_limits["requests"].append(current_time)

        return True

    

    async def _check_security_level(self, operation: str, security_context: SecurityContext) -> bool:

        # Define minimum security levels for operations

        min_security_levels = {

            "admin_operations": SecurityLevel.RESTRICTED,

            "text_generation": SecurityLevel.INTERNAL,

            "text_classification": SecurityLevel.PUBLIC,

            "sentiment_analysis": SecurityLevel.PUBLIC,

            "summarization": SecurityLevel.INTERNAL

        }

        

        required_level = min_security_levels.get(operation, SecurityLevel.PUBLIC)

        

        # Define security level hierarchy

        level_hierarchy = {

            SecurityLevel.PUBLIC: 0,

            SecurityLevel.INTERNAL: 1,

            SecurityLevel.CONFIDENTIAL: 2,

            SecurityLevel.RESTRICTED: 3

        }

        

        user_level_value = level_hierarchy.get(security_context.security_level, 0)

        required_level_value = level_hierarchy.get(required_level, 0)

        

        return user_level_value >= required_level_value

    

    async def _log_access_granted(self, operation: str, context: SecurityContext):

        log_data = {

            "event": "access_granted",

            "operation": operation,

            "user_id": context.user_id,

            "session_id": context.session_id,

            "security_level": context.security_level.value,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Access Granted: {json.dumps(log_data)}")

    

    async def _log_access_denied(self, reason: str, operation: str, context: SecurityContext):

        log_data = {

            "event": "access_denied",

            "reason": reason,

            "operation": operation,

            "user_id": context.user_id,

            "session_id": context.session_id,

            "ip_address": context.ip_address,

            "timestamp": datetime.now().isoformat()

        }

        print(f"Access Denied: {json.dumps(log_data)}")


class OutputSanitizer:

    def __init__(self):

        self.sensitive_patterns = [

            # API keys and tokens

            re.compile(r'[A-Za-z0-9]{32,}'),  # Generic long alphanumeric strings

            re.compile(r'sk-[A-Za-z0-9]{48}'),  # OpenAI API keys

            re.compile(r'Bearer\s+[A-Za-z0-9._-]+', re.IGNORECASE),

            

            # Database connection strings

            re.compile(r'mongodb://[^/\s]+', re.IGNORECASE),

            re.compile(r'postgresql://[^/\s]+', re.IGNORECASE),

            re.compile(r'mysql://[^/\s]+', re.IGNORECASE),

            

            # File paths that might contain sensitive info

            re.compile(r'/home/[^/\s]+/[^\s]*'),

            re.compile(r'C:\\Users\\[^\\]+\\[^\s]*'),

        ]

        

        self.max_output_length = 5000

    

    async def sanitize_output(self, content: str, security_context: SecurityContext) -> str:

        if not content:

            return content

        

        # Truncate if too long

        if len(content) > self.max_output_length:

            content = content[:self.max_output_length] + "... [truncated]"

        

        # Remove sensitive patterns

        sanitized = content

        for pattern in self.sensitive_patterns:

            sanitized = pattern.sub("[REDACTED]", sanitized)

        

        # Additional sanitization based on security level

        if security_context.security_level == SecurityLevel.PUBLIC:

            sanitized = await self._apply_public_sanitization(sanitized)

        

        return sanitized

    

    async def _apply_public_sanitization(self, content: str) -> str:

        # More aggressive sanitization for public-facing content

        

        # Remove any remaining potential PII

        pii_detector = PIIDetector()

        pii_result = await pii_detector.detect_pii(content)

        

        if pii_result.found_pii:

            return pii_result.masked_content

        

        return content


class SecureAIService:

    def __init__(self, 

                 ai_service: Any,

                 input_sanitizer: InputSanitizer,

                 pii_detector: PIIDetector,

                 access_controller: AccessController,

                 output_sanitizer: OutputSanitizer):

        self.ai_service = ai_service

        self.input_sanitizer = input_sanitizer

        self.pii_detector = pii_detector

        self.access_controller = access_controller

        self.output_sanitizer = output_sanitizer

        

        # Audit trail

        self.audit_log = []

    

    async def secure_generate_text(self, 

                                  prompt: str, 

                                  parameters: Dict[str, Any],

                                  security_context: SecurityContext) -> str:

        

        audit_entry = {

            "operation": "text_generation",

            "user_id": security_context.user_id,

            "timestamp": datetime.now().isoformat(),

            "steps": []

        }

        

        try:

            # Step 1: Access control

            if not await self.access_controller.check_access("text_generation", security_context):

                audit_entry["steps"].append("access_denied")

                raise SecurityException("Access denied for text generation")

            audit_entry["steps"].append("access_granted")

            

            # Step 2: Input sanitization

            sanitized_prompt = await self.input_sanitizer.sanitize_input(prompt, security_context)

            sanitized_parameters = await self.input_sanitizer.sanitize_parameters(parameters, security_context)

            audit_entry["steps"].append("input_sanitized")

            

            # Step 3: PII detection and handling

            pii_result = await self.pii_detector.detect_pii(sanitized_prompt)

            if pii_result.found_pii:

                if security_context.security_level == SecurityLevel.PUBLIC:

                    # For public users, reject requests with PII

                    audit_entry["steps"].append("pii_detected_rejected")

                    raise SecurityException("PII detected in input - request rejected")

                else:

                    # For internal users, use masked version

                    sanitized_prompt = pii_result.masked_content

                    audit_entry["steps"].append("pii_detected_masked")

            else:

                audit_entry["steps"].append("no_pii_detected")

            

            # Step 4: AI processing

            result = await self.ai_service.generate_text(sanitized_prompt, sanitized_parameters)

            audit_entry["steps"].append("ai_processing_completed")

            

            # Step 5: Output sanitization

            sanitized_result = await self.output_sanitizer.sanitize_output(result, security_context)

            audit_entry["steps"].append("output_sanitized")

            

            # Step 6: Final PII check on output

            output_pii_result = await self.pii_detector.detect_pii(sanitized_result)

            if output_pii_result.found_pii:

                sanitized_result = output_pii_result.masked_content

                audit_entry["steps"].append("output_pii_masked")

            

            audit_entry["status"] = "success"

            return sanitized_result

            

        except Exception as e:

            audit_entry["status"] = "error"

            audit_entry["error"] = str(e)

            raise e

        

        finally:

            self.audit_log.append(audit_entry)

            await self._log_audit_entry(audit_entry)

    

    async def _log_audit_entry(self, audit_entry: Dict[str, Any]):

        print(f"Security Audit: {json.dumps(audit_entry)}")

    

    def get_security_metrics(self) -> Dict[str, Any]:

        total_requests = len(self.audit_log)

        successful_requests = sum(1 for entry in self.audit_log if entry["status"] == "success")

        

        return {

            "total_requests": total_requests,

            "successful_requests": successful_requests,

            "error_rate": (total_requests - successful_requests) / max(total_requests, 1),

            "recent_audit_entries": self.audit_log[-10:] if self.audit_log else []

        }


class SecurityException(Exception):

    pass


# Example usage

async def example_secure_usage():

    # Mock AI service

    class MockAIService:

        async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

            return f"AI Response: {prompt[:100]}... [Generated with parameters: {parameters}]"

    

    # Set up security components

    ai_service = MockAIService()

    input_sanitizer = InputSanitizer()

    pii_detector = PIIDetector()

    access_controller = AccessController()

    output_sanitizer = OutputSanitizer()

    

    secure_service = SecureAIService(

        ai_service,

        input_sanitizer,

        pii_detector,

        access_controller,

        output_sanitizer

    )

    

    # Create security context

    security_context = SecurityContext(

        user_id="user123",

        session_id="session456",

        security_level=SecurityLevel.INTERNAL,

        permissions={"ai_text_generation"},

        ip_address="192.168.1.100"

    )

    

    # Test with normal input

    try:

        result = await secure_service.secure_generate_text(

            "What is the capital of France?",

            {"temperature": 0.7},

            security_context

        )

        print(f"Normal request result: {result}")

    except SecurityException as e:

        print(f"Security error: {e}")

    

    # Test with PII input

    try:

        result = await secure_service.secure_generate_text(

            "My email is john.doe@example.com and my phone is 555-123-4567",

            {"temperature": 0.7},

            security_context

        )

        print(f"PII request result: {result}")

    except SecurityException as e:

        print(f"Security error: {e}")

    

    # Print security metrics

    metrics = secure_service.get_security_metrics()

    print(f"Security metrics: {json.dumps(metrics, indent=2)}")


# Uncomment to run the example

# asyncio.run(example_secure_usage())


This security implementation provides comprehensive protection for AI systems. The input sanitizer prevents injection attacks and validates all inputs. The PII detector identifies and masks sensitive information. The access controller enforces permissions and rate limiting. The output sanitizer ensures that AI responses don't leak sensitive information. Together, these components create a robust security framework that protects both the AI system and user data.


Testing Strategies for AI-Integrated Systems


Testing AI-integrated systems presents unique challenges that go beyond traditional software testing. AI components introduce non-deterministic behavior, making it difficult to create repeatable tests with predictable outcomes. Additionally, AI systems often depend on external services, large datasets, and complex model behaviors that are difficult to mock or simulate accurately.

A comprehensive testing strategy for AI systems must address multiple layers of functionality, from unit tests for individual components to integration tests that verify the entire AI pipeline. The testing approach must also account for the probabilistic nature of AI outputs, performance characteristics under various loads, and the system's behavior when AI services are unavailable or performing poorly.

Here's a detailed implementation of testing strategies specifically designed for AI-integrated systems:


import asyncio

import pytest

import unittest

from unittest.mock import Mock, AsyncMock, patch

from typing import Dict, Any, List, Optional, Callable

from dataclasses import dataclass

import json

import time

import random

from datetime import datetime

import statistics


@dataclass

class TestResult:

    test_name: str

    passed: bool

    execution_time: float

    error_message: Optional[str] = None

    metadata: Dict[str, Any] = None


@dataclass

class AITestCase:

    name: str

    input_data: Dict[str, Any]

    expected_output_pattern: Optional[str] = None

    expected_output_length_range: Optional[tuple] = None

    expected_confidence_threshold: Optional[float] = None

    max_execution_time: Optional[float] = None

    should_fail: bool = False

    failure_type: Optional[str] = None


class AIServiceMock:

    def __init__(self, response_patterns: Dict[str, Any]):

        self.response_patterns = response_patterns

        self.call_count = 0

        self.call_history = []

        self.latency_simulation = 0.1

        self.failure_rate = 0.0

        self.responses = []

    

    async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:

        self.call_count += 1

        call_info = {

            "prompt": prompt,

            "parameters": parameters,

            "timestamp": datetime.now().isoformat()

        }

        self.call_history.append(call_info)

        

        # Simulate latency

        await asyncio.sleep(self.latency_simulation)

        

        # Simulate failures

        if random.random() < self.failure_rate:

            raise Exception("Simulated AI service failure")

        

        # Generate response based on patterns

        response = self._generate_response(prompt, parameters)

        self.responses.append(response)

        return response

    

    def _generate_response(self, prompt: str, parameters: Dict[str, Any]) -> str:

        # Simple pattern matching for predictable test responses

        prompt_lower = prompt.lower()

        

        if "capital" in prompt_lower and "france" in prompt_lower:

            return "The capital of France is Paris."

        elif "sentiment" in prompt_lower:

            return "positive"

        elif "summarize" in prompt_lower:

            return "This is a summary of the provided text."

        elif "classify" in prompt_lower:

            return "technology"

        else:

            return f"Generated response for: {prompt[:50]}..."

    

    def reset(self):

        self.call_count = 0

        self.call_history = []

        self.responses = []

    

    def set_failure_rate(self, rate: float):

        self.failure_rate = rate

    

    def set_latency(self, latency: float):

        self.latency_simulation = latency


class AIUnitTester:

    def __init__(self):

        self.test_results = []

        self.mock_service = AIServiceMock({})

    

    async def test_ai_service_layer(self, ai_service_layer):

        """Test the AI service layer with various inputs and scenarios"""

        test_cases = [

            AITestCase(

                name="basic_text_generation",

                input_data={

                    "operation_type": "text_generation",

                    "input_data": {"prompt": "What is the capital of France?"},

                    "parameters": {"temperature": 0.7}

                },

                expected_output_pattern="Paris",

                max_execution_time=5.0

            ),

            AITestCase(

                name="text_classification",

                input_data={

                    "operation_type": "text_classification",

                    "input_data": {"text": "This is about artificial intelligence"},

                    "parameters": {"categories": ["technology", "sports", "politics"]}

                },

                expected_output_pattern="technology",

                max_execution_time=5.0

            ),

            AITestCase(

                name="sentiment_analysis",

                input_data={

                    "operation_type": "sentiment_analysis",

                    "input_data": {"text": "I love this product!"},

                    "parameters": {}

                },

                expected_output_pattern="positive",

                max_execution_time=5.0

            ),

            AITestCase(

                name="invalid_operation",

                input_data={

                    "operation_type": "invalid_operation",

                    "input_data": {"text": "test"},

                    "parameters": {}

                },

                should_fail=True,

                failure_type="AIServiceException"

            ),

            AITestCase(

                name="empty_input",

                input_data={

                    "operation_type": "text_generation",

                    "input_data": {"prompt": ""},

                    "parameters": {}

                },

                should_fail=True,

                failure_type="AIServiceException"

            )

        ]

        

        for test_case in test_cases:

            result = await self._execute_test_case(ai_service_layer, test_case)

            self.test_results.append(result)

    

    async def _execute_test_case(self, ai_service_layer, test_case: AITestCase) -> TestResult:

        start_time = time.time()

        

        try:

            # Create AI request from test case

            from your_ai_module import AIRequest  # Import your actual AIRequest class

            request = AIRequest(

                operation_type=test_case.input_data["operation_type"],

                input_data=test_case.input_data["input_data"],

                parameters=test_case.input_data["parameters"]

            )

            

            # Execute the request

            response = await ai_service_layer.process_request(request)

            execution_time = time.time() - start_time

            

            # Check if test should have failed

            if test_case.should_fail:

                return TestResult(

                    test_name=test_case.name,

                    passed=False,

                    execution_time=execution_time,

                    error_message="Expected failure but test passed"

                )

            

            # Validate response

            validation_result = self._validate_response(response, test_case)

            

            return TestResult(

                test_name=test_case.name,

                passed=validation_result["passed"],

                execution_time=execution_time,

                error_message=validation_result.get("error_message"),

                metadata={

                    "response_length": len(str(response.result)),

                    "confidence_score": response.confidence_score,

                    "cached": response.cached

                }

            )

            

        except Exception as e:

            execution_time = time.time() - start_time

            

            # Check if this was an expected failure

            if test_case.should_fail:

                expected_error = test_case.failure_type

                actual_error = type(e).__name__

                

                if expected_error and expected_error in actual_error:

                    return TestResult(

                        test_name=test_case.name,

                        passed=True,

                        execution_time=execution_time,

                        metadata={"expected_failure": True}

                    )

            

            return TestResult(

                test_name=test_case.name,

                passed=False,

                execution_time=execution_time,

                error_message=str(e)

            )

    

    def _validate_response(self, response, test_case: AITestCase) -> Dict[str, Any]:

        """Validate AI response against test case expectations"""

        

        # Check execution time

        if test_case.max_execution_time and response.processing_time > test_case.max_execution_time:

            return {

                "passed": False,

                "error_message": f"Execution time {response.processing_time}s exceeded maximum {test_case.max_execution_time}s"

            }

        

        # Check output pattern

        if test_case.expected_output_pattern:

            result_str = str(response.result).lower()

            pattern = test_case.expected_output_pattern.lower()

            

            if pattern not in result_str:

                return {

                    "passed": False,

                    "error_message": f"Expected pattern '{pattern}' not found in result '{result_str}'"

                }

        

        # Check output length

        if test_case.expected_output_length_range:

            result_length = len(str(response.result))

            min_length, max_length = test_case.expected_output_length_range

            

            if not (min_length <= result_length <= max_length):

                return {

                    "passed": False,

                    "error_message": f"Result length {result_length} not in expected range [{min_length}, {max_length}]"

                }

        

        # Check confidence threshold

        if test_case.expected_confidence_threshold and response.confidence_score:

            if response.confidence_score < test_case.expected_confidence_threshold:

                return {

                    "passed": False,

                    "error_message": f"Confidence score {response.confidence_score} below threshold {test_case.expected_confidence_threshold}"

                }

        

        return {"passed": True}


class AIIntegrationTester:

    def __init__(self):

        self.test_results = []

    

    async def test_end_to_end_pipeline(self, pipeline_orchestrator, test_data: List[Dict[str, Any]]):

        """Test the complete AI pipeline from input to output"""

        

        for i, data_item in enumerate(test_data):

            test_name = f"e2e_pipeline_test_{i}"

            start_time = time.time()

            

            try:

                result = await pipeline_orchestrator.execute_pipeline(data_item)

                execution_time = time.time() - start_time

                

                # Validate pipeline result

                validation_passed = self._validate_pipeline_result(result, data_item)

                

                self.test_results.append(TestResult(

                    test_name=test_name,

                    passed=validation_passed,

                    execution_time=execution_time,

                    metadata={

                        "final_stage": result.stage.value,

                        "artifact_id": result.id,

                        "data_format": result.format.value

                    }

                ))

                

            except Exception as e:

                execution_time = time.time() - start_time

                self.test_results.append(TestResult(

                    test_name=test_name,

                    passed=False,

                    execution_time=execution_time,

                    error_message=str(e)

                ))

    

    async def test_concurrent_requests(self, ai_service, num_concurrent: int = 10):

        """Test AI service under concurrent load"""

        

        async def single_request(request_id: int):

            try:

                start_time = time.time()

                result = await ai_service.generate_text(

                    f"Test request {request_id}",

                    {"temperature": 0.7}

                )

                execution_time = time.time() - start_time

                

                return TestResult(

                    test_name=f"concurrent_request_{request_id}",

                    passed=True,

                    execution_time=execution_time,

                    metadata={"result_length": len(result)}

                )

                

            except Exception as e:

                return TestResult(

                    test_name=f"concurrent_request_{request_id}",

                    passed=False,

                    execution_time=time.time() - start_time,

                    error_message=str(e)

                )

        

        # Execute concurrent requests

        tasks = [single_request(i) for i in range(num_concurrent)]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        

        # Process results

        for result in results:

            if isinstance(result, TestResult):

                self.test_results.append(result)

            else:

                self.test_results.append(TestResult(

                    test_name="concurrent_request_exception",

                    passed=False,

                    execution_time=0.0,

                    error_message=str(result)

                ))

    

    def _validate_pipeline_result(self, result, original_data) -> bool:

        """Validate that pipeline result meets basic requirements"""

        

        # Check that result has required attributes

        required_attributes = ['id', 'data', 'format', 'stage', 'metadata']

        for attr in required_attributes:

            if not hasattr(result, attr):

                return False

        

        # Check that data was processed (not identical to input)

        if result.data == original_data:

            return False

        

        # Check that metadata contains expected information

        if not result.metadata or 'validation_passed' not in result.metadata:

            return False

        

        return True


class AIPerformanceTester:

    def __init__(self):

        self.performance_metrics = {}

    

    async def benchmark_ai_operations(self, ai_service, test_scenarios: List[Dict[str, Any]]):

        """Benchmark AI operations under various conditions"""

        

        for scenario in test_scenarios:

            scenario_name = scenario["name"]

            operation_count = scenario.get("operation_count", 10)

            concurrent_users = scenario.get("concurrent_users", 1)

            

            print(f"Running benchmark scenario: {scenario_name}")

            

            # Collect performance metrics

            response_times = []

            success_count = 0

            error_count = 0

            

            async def benchmark_operation():

                try:

                    start_time = time.time()

                    await ai_service.generate_text(

                        scenario["prompt"],

                        scenario.get("parameters", {})

                    )

                    response_time = time.time() - start_time

                    response_times.append(response_time)

                    return True

                except Exception:

                    return False

            

            # Run concurrent operations

            for batch in range(0, operation_count, concurrent_users):

                batch_size = min(concurrent_users, operation_count - batch)

                tasks = [benchmark_operation() for _ in range(batch_size)]

                results = await asyncio.gather(*tasks, return_exceptions=True)

                

                for result in results:

                    if result is True:

                        success_count += 1

                    else:

                        error_count += 1

            

            # Calculate metrics

            if response_times:

                self.performance_metrics[scenario_name] = {

                    "total_operations": operation_count,

                    "successful_operations": success_count,

                    "failed_operations": error_count,

                    "success_rate": success_count / operation_count,

                    "average_response_time": statistics.mean(response_times),

                    "median_response_time": statistics.median(response_times),

                    "min_response_time": min(response_times),

                    "max_response_time": max(response_times),

                    "p95_response_time": self._calculate_percentile(response_times, 95),

                    "p99_response_time": self._calculate_percentile(response_times, 99),

                    "throughput_ops_per_second": success_count / sum(response_times) if response_times else 0

                }

    

    def _calculate_percentile(self, data: List[float], percentile: int) -> float:

        """Calculate the specified percentile of the data"""

        if not data:

            return 0.0

        

        sorted_data = sorted(data)

        index = (percentile / 100.0) * (len(sorted_data) - 1)

        

        if index.is_integer():

            return sorted_data[int(index)]

        else:

            lower_index = int(index)

            upper_index = lower_index + 1

            weight = index - lower_index

            

            return sorted_data[lower_index] * (1 - weight) + sorted_data[upper_index] * weight


class AIRobustnessTester:

    def __init__(self):

        self.robustness_results = {}

    

    async def test_error_handling(self, resilient_ai_service):

        """Test system behavior under various error conditions"""

        

        error_scenarios = [

            {

                "name": "timeout_errors",

                "error_type": "timeout",

                "error_rate": 0.5,

                "expected_behavior": "fallback_or_retry"

            },

            {

                "name": "rate_limit_errors",

                "error_type": "rate_limit",

                "error_rate": 0.3,

                "expected_behavior": "backoff_and_retry"

            },

            {

                "name": "service_unavailable",

                "error_type": "service_unavailable",

                "error_rate": 1.0,

                "expected_behavior": "circuit_breaker_open"

            }

        ]

        

        for scenario in error_scenarios:

            scenario_name = scenario["name"]

            print(f"Testing error scenario: {scenario_name}")

            

            # Configure mock to simulate errors

            if hasattr(resilient_ai_service, 'ai_service'):

                if hasattr(resilient_ai_service.ai_service, 'set_failure_rate'):

                    resilient_ai_service.ai_service.set_failure_rate(scenario["error_rate"])

            

            # Test multiple requests to observe behavior

            results = []

            for i in range(10):

                try:

                    start_time = time.time()

                    result = await resilient_ai_service.generate_text(

                        f"Test prompt {i}",

                        {"temperature": 0.7},

                        f"request_{i}"

                    )

                    execution_time = time.time() - start_time

                    results.append({

                        "success": True,

                        "execution_time": execution_time,

                        "result_length": len(result)

                    })

                except Exception as e:

                    execution_time = time.time() - start_time

                    results.append({

                        "success": False,

                        "execution_time": execution_time,

                        "error_type": type(e).__name__,

                        "error_message": str(e)

                    })

            

            # Analyze results

            success_count = sum(1 for r in results if r["success"])

            avg_execution_time = statistics.mean([r["execution_time"] for r in results])

            

            self.robustness_results[scenario_name] = {

                "total_requests": len(results),

                "successful_requests": success_count,

                "success_rate": success_count / len(results),

                "average_execution_time": avg_execution_time,

                "expected_behavior": scenario["expected_behavior"],

                "behavior_observed": self._analyze_behavior(results, scenario)

            }

    

    def _analyze_behavior(self, results: List[Dict[str, Any]], scenario: Dict[str, Any]) -> str:

        """Analyze the observed behavior pattern"""

        

        success_count = sum(1 for r in results if r["success"])

        

        if scenario["expected_behavior"] == "fallback_or_retry":

            if success_count > 0:

                return "fallback_working"

            else:

                return "fallback_failed"

        

        elif scenario["expected_behavior"] == "backoff_and_retry":

            # Check if execution times increase (indicating backoff)

            execution_times = [r["execution_time"] for r in results]

            if len(execution_times) > 1:

                increasing_trend = all(execution_times[i] <= execution_times[i+1] 

                                     for i in range(len(execution_times)-1))

                if increasing_trend:

                    return "backoff_observed"

            return "backoff_not_observed"

        

        elif scenario["expected_behavior"] == "circuit_breaker_open":

            # Check if later requests fail faster (circuit breaker open)

            if len(results) > 5:

                early_times = [r["execution_time"] for r in results[:3]]

                later_times = [r["execution_time"] for r in results[-3:]]

                

                if statistics.mean(later_times) < statistics.mean(early_times):

                    return "circuit_breaker_working"

            return "circuit_breaker_not_working"

        

        return "unknown_behavior"


class AITestSuite:

    def __init__(self):

        self.unit_tester = AIUnitTester()

        self.integration_tester = AIIntegrationTester()

        self.performance_tester = AIPerformanceTester()

        self.robustness_tester = AIRobustnessTester()

        

        self.overall_results = {}

    

    async def run_comprehensive_tests(self, 

                                    ai_service_layer,

                                    pipeline_orchestrator,

                                    resilient_ai_service):

        """Run all test suites and generate comprehensive report"""

        

        print("Starting comprehensive AI system tests...")

        

        # Unit tests

        print("Running unit tests...")

        await self.unit_tester.test_ai_service_layer(ai_service_layer)

        

        # Integration tests

        print("Running integration tests...")

        test_data = [

            {"text": "Sample text for processing", "category": "test"},

            {"text": "Another test case", "category": "validation"},

            {"text": "Final test item", "category": "verification"}

        ]

        await self.integration_tester.test_end_to_end_pipeline(pipeline_orchestrator, test_data)

        await self.integration_tester.test_concurrent_requests(ai_service_layer)

        

        # Performance tests

        print("Running performance tests...")

        performance_scenarios = [

            {

                "name": "basic_text_generation",

                "prompt": "Generate a short story about AI",

                "parameters": {"temperature": 0.7, "max_length": 100},

                "operation_count": 20,

                "concurrent_users": 5

            },

            {

                "name": "high_concurrency",

                "prompt": "What is machine learning?",

                "parameters": {"temperature": 0.5},

                "operation_count": 50,

                "concurrent_users": 10

            }

        ]

        await self.performance_tester.benchmark_ai_operations(ai_service_layer, performance_scenarios)

        

        # Robustness tests

        print("Running robustness tests...")

        await self.robustness_tester.test_error_handling(resilient_ai_service)

        

        # Compile overall results

        self._compile_results()

        

        print("All tests completed!")

        return self.overall_results

    

    def _compile_results(self):

        """Compile results from all test suites"""

        

        # Unit test results

        unit_tests = self.unit_tester.test_results

        unit_passed = sum(1 for test in unit_tests if test.passed)

        

        # Integration test results

        integration_tests = self.integration_tester.test_results

        integration_passed = sum(1 for test in integration_tests if test.passed)

        

        # Performance results

        performance_results = self.performance_tester.performance_metrics

        

        # Robustness results

        robustness_results = self.robustness_tester.robustness_results

        

        self.overall_results = {

            "test_summary": {

                "unit_tests": {

                    "total": len(unit_tests),

                    "passed": unit_passed,

                    "failed": len(unit_tests) - unit_passed,

                    "pass_rate": unit_passed / len(unit_tests) if unit_tests else 0

                },

                "integration_tests": {

                    "total": len(integration_tests),

                    "passed": integration_passed,

                    "failed": len(integration_tests) - integration_passed,

                    "pass_rate": integration_passed / len(integration_tests) if integration_tests else 0

                }

            },

            "performance_metrics": performance_results,

            "robustness_metrics": robustness_results,

            "detailed_results": {

                "unit_test_details": [

                    {

                        "name": test.test_name,

                        "passed": test.passed,

                        "execution_time": test.execution_time,

                        "error": test.error_message

                    }

                    for test in unit_tests

                ],

                "integration_test_details": [

                    {

                        "name": test.test_name,

                        "passed": test.passed,

                        "execution_time": test.execution_time,

                        "error": test.error_message

                    }

                    for test in integration_tests

                ]

            }

        }

    

    def generate_test_report(self) -> str:

        """Generate a comprehensive test report"""

        

        if not self.overall_results:

            return "No test results available. Run tests first."

        

        report = []

        report.append("AI SYSTEM TEST REPORT")

        report.append("=" * 50)

        report.append("")

        

        # Test summary

        summary = self.overall_results["test_summary"]

        report.append("TEST SUMMARY:")

        report.append(f"Unit Tests: {summary['unit_tests']['passed']}/{summary['unit_tests']['total']} passed ({summary['unit_tests']['pass_rate']:.2%})")

        report.append(f"Integration Tests: {summary['integration_tests']['passed']}/{summary['integration_tests']['total']} passed ({summary['integration_tests']['pass_rate']:.2%})")

        report.append("")

        

        # Performance metrics

        if self.overall_results["performance_metrics"]:

            report.append("PERFORMANCE METRICS:")

            for scenario, metrics in self.overall_results["performance_metrics"].items():

                report.append(f"  {scenario}:")

                report.append(f"    Success Rate: {metrics['success_rate']:.2%}")

                report.append(f"    Average Response Time: {metrics['average_response_time']:.3f}s")

                report.append(f"    P95 Response Time: {metrics['p95_response_time']:.3f}s")

                report.append(f"    Throughput: {metrics['throughput_ops_per_second']:.2f} ops/sec")

            report.append("")

        

        # Robustness metrics

        if self.overall_results["robustness_metrics"]:

            report.append("ROBUSTNESS METRICS:")

            for scenario, metrics in self.overall_results["robustness_metrics"].items():

                report.append(f"  {scenario}:")

                report.append(f"    Success Rate: {metrics['success_rate']:.2%}")

                report.append(f"    Expected Behavior: {metrics['expected_behavior']}")

                report.append(f"    Observed Behavior: {metrics['behavior_observed']}")

            report.append("")

        

        # Failed tests

        failed_tests = []

        for test in self.overall_results["detailed_results"]["unit_test_details"]:

            if not test["passed"]:

                failed_tests.append(f"Unit Test - {test['name']}: {test['error']}")

        

        for test in self.overall_results["detailed_results"]["integration_test_details"]:

            if not test["passed"]:

                failed_tests.append(f"Integration Test - {test['name']}: {test['error']}")

        

        if failed_tests:

            report.append("FAILED TESTS:")

            for failure in failed_tests:

                report.append(f"  - {failure}")

        else:

            report.append("ALL TESTS PASSED!")

        

        return "\n".join(report)


# Example usage

async def example_test_execution():

    """Example of how to use the AI testing framework"""

    

    # This would be your actual AI components

    # For demonstration, we'll use mocks

    

    class MockAIServiceLayer:

        async def process_request(self, request):

            # Mock implementation

            from types import SimpleNamespace

            return SimpleNamespace(

                result="Mock AI response",

                confidence_score=0.8,

                processing_time=0.5,

                cached=False

            )

    

    class MockPipelineOrchestrator:

        async def execute_pipeline(self, data):

            from types import SimpleNamespace

            return SimpleNamespace(

                id="test_artifact_123",

                data={"processed": True, "original": data},

                format=SimpleNamespace(value="json"),

                stage=SimpleNamespace(value="output"),

                metadata={"validation_passed": True}

            )

    

    class MockResilientAIService:

        async def generate_text(self, prompt, parameters, request_id):

            return f"Resilient response for: {prompt}"

    

    # Create test suite

    test_suite = AITestSuite()

    

    # Create mock components

    ai_service_layer = MockAIServiceLayer()

    pipeline_orchestrator = MockPipelineOrchestrator()

    resilient_ai_service = MockResilientAIService()

    

    # Run comprehensive tests

    results = await test_suite.run_comprehensive_tests(

        ai_service_layer,

        pipeline_orchestrator,

        resilient_ai_service

    )

    

    # Generate and print report

    report = test_suite.generate_test_report()

    print(report)

    

    return results


# Uncomment to run the example

# asyncio.run(example_test_execution())


This comprehensive testing framework addresses the unique challenges of testing AI-integrated systems. The unit tester validates individual AI components with predictable test cases. The integration tester verifies end-to-end functionality and concurrent behavior. The performance tester measures system performance under various loads. The robustness tester validates error handling and resilience patterns. Together, these testing strategies ensure that AI-integrated systems are reliable, performant, and robust in production environments.


Monitoring and Observability


Monitoring AI-integrated systems requires specialized approaches that go beyond traditional application monitoring. AI systems exhibit unique characteristics such as variable response times, probabilistic outputs, model drift over time, and complex dependencies on external AI services. Effective monitoring must track not only system health and performance but also AI-specific metrics such as model accuracy, confidence scores, and output quality.

Observability in AI systems encompasses three key pillars: metrics collection for quantitative analysis, logging for detailed event tracking, and tracing for understanding request flows through complex AI pipelines. The monitoring system must be capable of detecting subtle degradations in AI performance that might not trigger traditional alerting thresholds but could significantly impact user experience.

Here's a comprehensive implementation of monitoring and observability specifically designed for AI-integrated systems:


import asyncio

import time

import json

from typing import Dict, Any, List, Optional, Callable

from dataclasses import dataclass, field

from datetime import datetime, timedelta

from enum import Enum

import statistics

import threading

from collections import defaultdict, deque

import uuid


class MetricType(Enum):

    COUNTER = "counter"

    GAUGE = "gauge"

    HISTOGRAM = "histogram"

    TIMER = "timer"


class AlertSeverity(Enum):

    LOW = "low"

    MEDIUM = "medium"

    HIGH = "high"

    CRITICAL = "critical"


@dataclass

class Metric:

    name: str

    value: float

    metric_type: MetricType

    timestamp: datetime

    labels: Dict[str, str] = field(default_factory=dict)

    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass

class LogEntry:

    level: str

    message: str

    timestamp: datetime

    component: str

    request_id: Optional[str] = None

    user_id: Optional[str] = None

    metadata: Dict[str, Any] = field(default_factory=dict)


@dataclass

class TraceSpan:

    span_id: str

    trace_id: str

    operation_name: str

    start_time: datetime

    end_time: Optional[datetime] = None

    parent_span_id: Optional[str] = None

    tags: Dict[str, Any] = field(default_factory=dict)

    logs: List[Dict[str, Any]] = field(default_factory=list)


@dataclass

class Alert:

    id: str

    name: str

    severity: AlertSeverity

    message: str

    timestamp: datetime

    metric_name: str

    threshold_value: float

    actual_value: float

    metadata: Dict[str, Any] = field(default_factory=dict)


class AIMetricsCollector:

    def __init__(self):

        self.metrics = defaultdict(list)

        self.metric_windows = defaultdict(lambda: deque(maxlen=1000))

        self.lock = threading.Lock()

        

        # AI-specific metric definitions

        self.ai_metrics = {

            "ai_request_duration": MetricType.HISTOGRAM,

            "ai_request_count": MetricType.COUNTER,

            "ai_error_count": MetricType.COUNTER,

            "ai_confidence_score": MetricType.HISTOGRAM,

            "ai_output_length": MetricType.HISTOGRAM,

            "ai_cache_hit_rate": MetricType.GAUGE,

            "ai_model_inference_time": MetricType.HISTOGRAM,

            "ai_queue_depth": MetricType.GAUGE,

            "ai_token_usage": MetricType.COUNTER,

            "ai_cost_per_request": MetricType.HISTOGRAM

        }

    

    def record_metric(self, name: str, value: float, labels: Dict[str, str] = None, metadata: Dict[str, Any] = None):

        """Record a metric value"""

        with self.lock:

            metric_type = self.ai_metrics.get(name, MetricType.GAUGE)

            

            metric = Metric(

                name=name,

                value=value,

                metric_type=metric_type,

                timestamp=datetime.now(),

                labels=labels or {},

                metadata=metadata or {}

            )

            

            self.metrics[name].append(metric)

            self.metric_windows[name].append(metric)

    

    def record_ai_request(self, 

                         operation_type: str,

                         duration: float,

                         success: bool,

                         confidence_score: Optional[float] = None,

                         output_length: Optional[int] = None,

                         token_usage: Optional[int] = None,

                         cost: Optional[float] = None,

                         model_name: Optional[str] = None):

        """Record metrics for an AI request"""

        

        labels = {

            "operation_type": operation_type,

            "model_name": model_name or "unknown",

            "success": str(success)

        }

        

        # Record basic metrics

        self.record_metric("ai_request_duration", duration, labels)

        self.record_metric("ai_request_count", 1, labels)

        

        if not success:

            self.record_metric("ai_error_count", 1, labels)

        

        # Record AI-specific metrics

        if confidence_score is not None:

            self.record_metric("ai_confidence_score", confidence_score, labels)

        

        if output_length is not None:

            self.record_metric("ai_output_length", output_length, labels)

        

        if token_usage is not None:

            self.record_metric("ai_token_usage", token_usage, labels)

        

        if cost is not None:

            self.record_metric("ai_cost_per_request", cost, labels)

    

    def get_metric_summary(self, metric_name: str, time_window_minutes: int = 60) -> Dict[str, Any]:

        """Get summary statistics for a metric within a time window"""

        

        cutoff_time = datetime.now() - timedelta(minutes=time_window_minutes)

        

        with self.lock:

            recent_metrics = [

                m for m in self.metrics[metric_name]

                if m.timestamp > cutoff_time

            ]

        

        if not recent_metrics:

            return {"count": 0, "summary": "No data available"}

        

        values = [m.value for m in recent_metrics]

        

        summary = {

            "count": len(values),

            "min": min(values),

            "max": max(values),

            "mean": statistics.mean(values),

            "median": statistics.median(values),

            "std_dev": statistics.stdev(values) if len(values) > 1 else 0,

            "p95": self._calculate_percentile(values, 95),

            "p99": self._calculate_percentile(values, 99)

        }

        

        return summary

    

    def get_ai_health_metrics(self) -> Dict[str, Any]:

        """Get overall AI system health metrics"""

        

        # Calculate success rate

        success_metrics = [m for m in self.metrics["ai_request_count"] if m.labels.get("success") == "true"]

        total_metrics = self.metrics["ai_request_count"]

        

        success_rate = len(success_metrics) / len(total_metrics) if total_metrics else 0

        

        # Calculate average confidence score

        confidence_metrics = self.metrics["ai_confidence_score"]

        avg_confidence = statistics.mean([m.value for m in confidence_metrics]) if confidence_metrics else 0

        

        # Calculate average response time

        duration_metrics = self.metrics["ai_request_duration"]

        avg_duration = statistics.mean([m.value for m in duration_metrics]) if duration_metrics else 0

        

        # Calculate error rate

        error_count = len(self.metrics["ai_error_count"])

        total_requests = len(total_metrics)

        error_rate = error_count / total_requests if total_requests else 0

        

        return {

            "success_rate": success_rate,

            "error_rate": error_rate,

            "average_confidence_score": avg_confidence,

            "average_response_time": avg_duration,

            "total_requests": total_requests,

            "total_errors": error_count,

            "health_score": self._calculate_health_score(success_rate, avg_confidence, avg_duration, error_rate)

        }

    

    def _calculate_percentile(self, values: List[float], percentile: int) -> float:

        """Calculate percentile value"""

        if not values:

            return 0.0

        

        sorted_values = sorted(values)

        index = (percentile / 100.0) * (len(sorted_values) - 1)

        

        if index.is_integer():

            return sorted_values[int(index)]

        else:

            lower = int(index)

            upper = lower + 1

            weight = index - lower

            return sorted_values[lower] * (1 - weight) + sorted_values[upper] * weight

    

    def _calculate_health_score(self, success_rate: float, avg_confidence: float, avg_duration: float, error_rate: float) -> float:

        """Calculate overall health score (0-100)"""

        

        # Normalize metrics to 0-1 scale

        success_score = success_rate

        confidence_score = avg_confidence if avg_confidence <= 1.0 else avg_confidence / 100.0

        

        # Duration score (inverse relationship - lower is better)

        duration_score = max(0, 1 - (avg_duration / 10.0))  # Assume 10s is very poor

        

        # Error score (inverse relationship)

        error_score = max(0, 1 - error_rate)

        

        # Weighted average

        health_score = (

            success_score * 0.3 +

            confidence_score * 0.25 +

            duration_score * 0.25 +

            error_score * 0.2

        ) * 100

        

        return min(100, max(0, health_score))


class AILogger:

    def __init__(self, component_name: str):

        self.component_name = component_name

        self.logs = deque(maxlen=10000)

        self.lock = threading.Lock()

    

    def log(self, level: str, message: str, request_id: str = None, user_id: str = None, **metadata):

        """Log a message with AI-specific context"""

        

        log_entry = LogEntry(

            level=level,

            message=message,

            timestamp=datetime.now(),

            component=self.component_name,

            request_id=request_id,

            user_id=user_id,

            metadata=metadata

        )

        

        with self.lock:

            self.logs.append(log_entry)

        

        # Print to console (in production, this would go to your logging system)

        self._print_log(log_entry)

    

    def log_ai_request_start(self, request_id: str, operation_type: str, user_id: str = None, **metadata):

        """Log the start of an AI request"""

        self.log(

            "INFO",

            f"AI request started: {operation_type}",

            request_id=request_id,

            user_id=user_id,

            operation_type=operation_type,

            **metadata

        )

    

    def log_ai_request_complete(self, request_id: str, operation_type: str, duration: float, success: bool, **metadata):

        """Log the completion of an AI request"""

        level = "INFO" if success else "ERROR"

        status = "completed successfully" if success else "failed"

        

        self.log(

            level,

            f"AI request {status}: {operation_type} (duration: {duration:.3f}s)",

            request_id=request_id,

            operation_type=operation_type,

            duration=duration,

            success=success,

            **metadata

        )

    

    def log_ai_error(self, request_id: str, operation_type: str, error: Exception, **metadata):

        """Log an AI-related error"""

        self.log(

            "ERROR",

            f"AI error in {operation_type}: {str(error)}",

            request_id=request_id,

            operation_type=operation_type,

            error_type=type(error).__name__,

            error_message=str(error),

            **metadata

        )

    

    def log_model_performance(self, model_name: str, confidence_score: float, processing_time: float, **metadata):

        """Log model performance metrics"""

        self.log(

            "INFO",

            f"Model performance: {model_name} (confidence: {confidence_score:.3f}, time: {processing_time:.3f}s)",

            model_name=model_name,

            confidence_score=confidence_score,

            processing_time=processing_time,

            **metadata

        )

    

    def get_recent_logs(self, count: int = 100, level_filter: str = None) -> List[LogEntry]:

        """Get recent log entries"""

        with self.lock:

            logs = list(self.logs)

        

        if level_filter:

            logs = [log for log in logs if log.level == level_filter]

        

        return logs[-count:]

    

    def _print_log(self, log_entry: LogEntry):

        """Print log entry to console"""

        timestamp = log_entry.timestamp.strftime("%Y-%m-%d %H:%M:%S")

        request_info = f" [{log_entry.request_id}]" if log_entry.request_id else ""

        

        print(f"[{timestamp}] {log_entry.level} {log_entry.component}{request_info}: {log_entry.message}")


class AITracer:

    def __init__(self):

        self.active_spans = {}

        self.completed_traces = deque(maxlen=1000)

        self.lock = threading.Lock()

    

    def start_span(self, operation_name: str, parent_span_id: str = None, trace_id: str = None) -> str:

        """Start a new trace span"""

        

        span_id = str(uuid.uuid4())

        if not trace_id:

            trace_id = str(uuid.uuid4())

        

        span = TraceSpan(

            span_id=span_id,

            trace_id=trace_id,

            operation_name=operation_name,

            start_time=datetime.now(),

            parent_span_id=parent_span_id

        )

        

        with self.lock:

            self.active_spans[span_id] = span

        

        return span_id

    

    def finish_span(self, span_id: str, tags: Dict[str, Any] = None, error: Exception = None):

        """Finish a trace span"""

        

        with self.lock:

            span = self.active_spans.pop(span_id, None)

        

        if not span:

            return

        

        span.end_time = datetime.now()

        

        if tags:

            span.tags.update(tags)

        

        if error:

            span.tags["error"] = True

            span.tags["error_type"] = type(error).__name__

            span.tags["error_message"] = str(error)

        

        # Add to completed traces

        with self.lock:

            self.completed_traces.append(span)

    

    def add_span_log(self, span_id: str, message: str, **fields):

        """Add a log entry to a span"""

        

        with self.lock:

            span = self.active_spans.get(span_id)

        

        if span:

            log_entry = {

                "timestamp": datetime.now().isoformat(),

                "message": message,

                **fields

            }

            span.logs.append(log_entry)

    

    def get_trace(self, trace_id: str) -> List[TraceSpan]:

        """Get all spans for a trace"""

        

        with self.lock:

            spans = [span for span in self.completed_traces if span.trace_id == trace_id]

        

        return sorted(spans, key=lambda s: s.start_time)

    

    def get_trace_duration(self, trace_id: str) -> Optional[float]:

        """Get total duration of a trace"""

        

        spans = self.get_trace(trace_id)

        if not spans:

            return None

        

        start_time = min(span.start_time for span in spans)

        end_time = max(span.end_time for span in spans if span.end_time)

        

        if end_time:

            return (end_time - start_time).total_seconds()

        

        return None


class AIAlertManager:

    def __init__(self, metrics_collector: AIMetricsCollector):

        self.metrics_collector = metrics_collector

        self.alert_rules = {}

        self.active_alerts = {}

        self.alert_history = deque(maxlen=1000)

        self.lock = threading.Lock()

        

        # Set up default alert rules

        self._setup_default_alerts()

    

    def _setup_default_alerts(self):

        """Set up default alerting rules for AI systems"""

        

        self.add_alert_rule(

            "high_error_rate",

            metric_name="ai_error_count",

            threshold=0.1,  # 10% error rate

            comparison="greater_than",

            severity=AlertSeverity.HIGH,

            description="AI error rate is too high"

        )

        

        self.add_alert_rule(

            "low_confidence_score",

            metric_name="ai_confidence_score",

            threshold=0.5,

            comparison="less_than",

            severity=AlertSeverity.MEDIUM,

            description="AI confidence scores are low"

        )

        

        self.add_alert_rule(

            "high_response_time",

            metric_name="ai_request_duration",

            threshold=10.0,  # 10 seconds

            comparison="greater_than",

            severity=AlertSeverity.HIGH,

            description="AI response times are too high"

        )

        

        self.add_alert_rule(

            "low_success_rate",

            metric_name="ai_request_count",

            threshold=0.95,  # 95% success rate

            comparison="less_than",

            severity=AlertSeverity.CRITICAL,

            description="AI success rate is below threshold"

        )

    

    def add_alert_rule(self, 

                      rule_name: str,

                      metric_name: str,

                      threshold: float,

                      comparison: str,

                      severity: AlertSeverity,

                      description: str):

        """Add a new alert rule"""

        

        self.alert_rules[rule_name] = {

            "metric_name": metric_name,

            "threshold": threshold,

            "comparison": comparison,

            "severity": severity,

            "description": description,

            "enabled": True

        }

    

    async def check_alerts(self):

        """Check all alert rules and trigger alerts if necessary"""

        

        for rule_name, rule in self.alert_rules.items():

            if not rule["enabled"]:

                continue

            

            try:

                await self._check_single_rule(rule_name, rule)

            except Exception as e:

                print(f"Error checking alert rule {rule_name}: {e}")

    

    async def _check_single_rule(self, rule_name: str, rule: Dict[str, Any]):

        """Check a single alert rule"""

        

        metric_name = rule["metric_name"]

        threshold = rule["threshold"]

        comparison = rule["comparison"]

        

        # Get current metric value

        if metric_name == "ai_error_count":

            current_value = self._calculate_error_rate()

        elif metric_name == "ai_request_count":

            current_value = self._calculate_success_rate()

        else:

            summary = self.metrics_collector.get_metric_summary(metric_name, time_window_minutes=5)

            if summary["count"] == 0:

                return

            current_value = summary["mean"]

        

        # Check if alert should trigger

        should_alert = False

        

        if comparison == "greater_than" and current_value > threshold:

            should_alert = True

        elif comparison == "less_than" and current_value < threshold:

            should_alert = True

        elif comparison == "equals" and abs(current_value - threshold) < 0.001:

            should_alert = True

        

        # Handle alert state

        if should_alert:

            if rule_name not in self.active_alerts:

                await self._trigger_alert(rule_name, rule, current_value)

        else:

            if rule_name in self.active_alerts:

                await self._resolve_alert(rule_name)

    

    async def _trigger_alert(self, rule_name: str, rule: Dict[str, Any], current_value: float):

        """Trigger a new alert"""

        

        alert = Alert(

            id=str(uuid.uuid4()),

            name=rule_name,

            severity=rule["severity"],

            message=f"{rule['description']} (current: {current_value:.3f}, threshold: {rule['threshold']:.3f})",

            timestamp=datetime.now(),

            metric_name=rule["metric_name"],

            threshold_value=rule["threshold"],

            actual_value=current_value

        )

        

        with self.lock:

            self.active_alerts[rule_name] = alert

            self.alert_history.append(alert)

        

        # Send alert notification (in production, this would integrate with your alerting system)

        await self._send_alert_notification(alert)

    

    async def _resolve_alert(self, rule_name: str):

        """Resolve an active alert"""

        

        with self.lock:

            alert = self.active_alerts.pop(rule_name, None)

        

        if alert:

            print(f"RESOLVED: Alert {rule_name} has been resolved")

    

    async def _send_alert_notification(self, alert: Alert):

        """Send alert notification"""

        

        severity_emoji = {

            AlertSeverity.LOW: "ℹ️",

            AlertSeverity.MEDIUM: "⚠️",

            AlertSeverity.HIGH: "🚨",

            AlertSeverity.CRITICAL: "🔥"

        }

        

        emoji = severity_emoji.get(alert.severity, "❓")

        

        print(f"{emoji} ALERT [{alert.severity.value.upper()}]: {alert.message}")

        print(f"   Alert ID: {alert.id}")

        print(f"   Timestamp: {alert.timestamp}")

        print(f"   Metric: {alert.metric_name}")

    

    def _calculate_error_rate(self) -> float:

        """Calculate current error rate"""

        

        total_requests = len(self.metrics_collector.metrics["ai_request_count"])

        error_requests = len(self.metrics_collector.metrics["ai_error_count"])

        

        return error_requests / total_requests if total_requests > 0 else 0

    

    def _calculate_success_rate(self) -> float:

        """Calculate current success rate"""

        

        return 1.0 - self._calculate_error_rate()

    

    def get_active_alerts(self) -> List[Alert]:

        """Get all active alerts"""

        

        with self.lock:

            return list(self.active_alerts.values())


class AIMonitoringDashboard:

    def __init__(self, 

                 metrics_collector: AIMetricsCollector,

                 logger: AILogger,

                 tracer: AITracer,

                 alert_manager: AIAlertManager):

        self.metrics_collector = metrics_collector

        self.logger = logger

        self.tracer = tracer

        self.alert_manager = alert_manager

    

    def get_dashboard_data(self) -> Dict[str, Any]:

        """Get comprehensive dashboard data"""

        

        # Get health metrics

        health_metrics = self.metrics_collector.get_ai_health_metrics()

        

        # Get recent metrics summaries

        metric_summaries = {}

        for metric_name in ["ai_request_duration", "ai_confidence_score", "ai_output_length"]:

            metric_summaries[metric_name] = self.metrics_collector.get_metric_summary(metric_name)

        

        # Get recent logs

        recent_logs = self.logger.get_recent_logs(50)

        error_logs = self.logger.get_recent_logs(20, "ERROR")

        

        # Get active alerts

        active_alerts = self.alert_manager.get_active_alerts()

        

        # Get recent traces

        recent_traces = list(self.tracer.completed_traces)[-10:]

        

        return {

            "health_metrics": health_metrics,

            "metric_summaries": metric_summaries,

            "recent_logs": [

                {

                    "timestamp": log.timestamp.isoformat(),

                    "level": log.level,

                    "message": log.message,

                    "component": log.component,

                    "request_id": log.request_id

                }

                for log in recent_logs

            ],

            "error_logs": [

                {

                    "timestamp": log.timestamp.isoformat(),

                    "message": log.message,

                    "request_id": log.request_id,

                    "metadata": log.metadata

                }

                for log in error_logs

            ],

            "active_alerts": [

                {

                    "id": alert.id,

                    "name": alert.name,

                    "severity": alert.severity.value,

                    "message": alert.message,

                    "timestamp": alert.timestamp.isoformat()

                }

                for alert in active_alerts

            ],

            "recent_traces": [

                {

                    "trace_id": trace.trace_id,

                    "operation": trace.operation_name,

                    "duration": (trace.end_time - trace.start_time).total_seconds() if trace.end_time else None,

                    "tags": trace.tags

                }

                for trace in recent_traces

            ]

        }

    

    def print_dashboard(self):

        """Print a text-based dashboard"""

        

        data = self.get_dashboard_data()

        

        print("=" * 80)

        print("AI SYSTEM MONITORING DASHBOARD")

        print("=" * 80)

        print()

        

        # Health metrics

        health = data["health_metrics"]

        print(f"SYSTEM HEALTH SCORE: {health['health_score']:.1f}/100")

        print(f"Success Rate: {health['success_rate']:.2%}")

        print(f"Error Rate: {health['error_rate']:.2%}")

        print(f"Avg Confidence: {health['average_confidence_score']:.3f}")

        print(f"Avg Response Time: {health['average_response_time']:.3f}s")

        print(f"Total Requests: {health['total_requests']}")

        print()

        

        # Active alerts

        if data["active_alerts"]:

            print("ACTIVE ALERTS:")

            for alert in data["active_alerts"]:

                print(f"  ðŸš¨ [{alert['severity'].upper()}] {alert['name']}: {alert['message']}")

            print()

        

        # Recent errors

        if data["error_logs"]:

            print("RECENT ERRORS:")

            for error in data["error_logs"][-5:]:

                print(f"  ❌ {error['timestamp']}: {error['message']}")

            print()

        

        # Performance metrics

        print("PERFORMANCE METRICS (last hour):")

        for metric_name, summary in data["metric_summaries"].items():

            if summary["count"] > 0:

                print(f"  {metric_name}:")

                print(f"    Mean: {summary['mean']:.3f}, P95: {summary['p95']:.3f}, P99: {summary['p99']:.3f}")

        print()


# Example usage and integration

class MonitoredAIService:

    def __init__(self):

        self.metrics_collector = AIMetricsCollector()

        self.logger = AILogger("AIService")

        self.tracer = AITracer()

        self.alert_manager = AIAlertManager(self.metrics_collector)

        self.dashboard = AIMonitoringDashboard(

            self.metrics_collector,

            self.logger,

            self.tracer,

            self.alert_manager

        )

        

        # Start background alert checking

        self._start_alert_monitoring()

    

    async def generate_text_with_monitoring(self, prompt: str, parameters: Dict[str, Any], user_id: str = None) -> str:

        """Generate text with comprehensive monitoring"""

        

        request_id = str(uuid.uuid4())

        operation_type = "text_generation"

        

        # Start tracing

        trace_id = str(uuid.uuid4())

        span_id = self.tracer.start_span("ai_text_generation", trace_id=trace_id)

        

        # Log request start

        self.logger.log_ai_request_start(

            request_id=request_id,

            operation_type=operation_type,

            user_id=user_id,

            prompt_length=len(prompt),

            parameters=parameters

        )

        

        start_time = time.time()

        

        try:

            # Add span log

            self.tracer.add_span_log(span_id, "Starting AI text generation", prompt_length=len(prompt))

            

            # Simulate AI processing (replace with actual AI service call)

            await asyncio.sleep(random.uniform(0.5, 2.0))  # Simulate variable processing time

            

            # Simulate occasional failures

            if random.random() < 0.05:  # 5% failure rate

                raise Exception("Simulated AI service error")

            

            # Generate mock response

            response = f"Generated response for: {prompt[:50]}..."

            confidence_score = random.uniform(0.7, 0.95)

            token_usage = len(prompt.split()) + len(response.split())

            cost = token_usage * 0.0001  # Mock cost calculation

            

            duration = time.time() - start_time

            

            # Record metrics

            self.metrics_collector.record_ai_request(

                operation_type=operation_type,

                duration=duration,

                success=True,

                confidence_score=confidence_score,

                output_length=len(response),

                token_usage=token_usage,

                cost=cost,

                model_name="mock-model-v1"

            )

            

            # Log completion

            self.logger.log_ai_request_complete(

                request_id=request_id,

                operation_type=operation_type,

                duration=duration,

                success=True,

                confidence_score=confidence_score,

                output_length=len(response)

            )

            

            # Log model performance

            self.logger.log_model_performance(

                model_name="mock-model-v1",

                confidence_score=confidence_score,

                processing_time=duration

            )

            

            # Finish span

            self.tracer.finish_span(span_id, tags={

                "operation_type": operation_type,

                "success": True,

                "confidence_score": confidence_score,

                "output_length": len(response)

            })

            

            return response

            

        except Exception as e:

            duration = time.time() - start_time

            

            # Record error metrics

            self.metrics_collector.record_ai_request(

                operation_type=operation_type,

                duration=duration,

                success=False,

                model_name="mock-model-v1"

            )

            

            # Log error

            self.logger.log_ai_error(

                request_id=request_id,

                operation_type=operation_type,

                error=e

            )

            

            # Finish span with error

            self.tracer.finish_span(span_id, error=e)

            

            raise e

    

    def _start_alert_monitoring(self):

        """Start background alert monitoring"""

        

        async def alert_loop():

            while True:

                try:

                    await self.alert_manager.check_alerts()

                    await asyncio.sleep(30)  # Check every 30 seconds

                except Exception as e:

                    print(f"Error in alert monitoring: {e}")

                    await asyncio.sleep(60)

        

        # Start the alert monitoring loop

        asyncio.create_task(alert_loop())

    

    def get_monitoring_dashboard(self):

        """Get monitoring dashboard"""

        return self.dashboard.get_dashboard_data()

    

    def print_dashboard(self):

        """Print monitoring dashboard"""

        self.dashboard.print_dashboard()


# Example usage

async def example_monitoring_usage():

    """Example of how to use the AI monitoring system"""

    

    # Create monitored AI service

    ai_service = MonitoredAIService()

    

    # Simulate multiple requests

    print("Simulating AI requests with monitoring...")

    

    for i in range(20):

        try:

            result = await ai_service.generate_text_with_monitoring(

                f"Test prompt {i}",

                {"temperature": 0.7},

                user_id=f"user_{i % 5}"

            )

            print(f"Request {i}: Success")

            

        except Exception as e:

            print(f"Request {i}: Failed - {e}")

        

        # Brief pause between requests

        await asyncio.sleep(0.1)

    

    # Wait a bit for metrics to accumulate

    await asyncio.sleep(2)

    

    # Print dashboard

    print("\n" + "="*80)

    ai_service.print_dashboard()

    

    return ai_service


# Uncomment to run the example

# asyncio.run(example_monitoring_usage())

```


This comprehensive monitoring and observability implementation provides deep insights into AI system behavior. The metrics collector tracks AI-specific performance indicators and health metrics. The logger captures detailed events with proper context. The tracer provides request flow visibility through complex AI pipelines. The alert manager proactively identifies issues before they impact users. Together, these components enable effective monitoring and troubleshooting of AI-integrated systems in production environments.


Conclusion and Best Practices Summary


The systematic design of software systems that integrate AI and GenAI functionality requires a fundamental shift in how we approach software architecture. Throughout this article, we have explored the essential patterns, principles, and practices that enable the creation of robust, scalable, and maintainable AI-integrated systems.

The foundation of successful AI integration lies in treating AI components as first-class citizens in our architecture while maintaining clear separation of concerns. This approach allows us to build systems that can evolve with the rapidly changing AI landscape while preserving the stability and reliability that users expect from production software systems.

The architectural patterns we have discussed provide proven solutions to common challenges in AI integration. The Adapter Pattern enables seamless integration with multiple AI providers, allowing systems to switch between different models and services without affecting core business logic. The Strategy Pattern provides intelligent service selection based on runtime conditions, optimizing for cost, performance, or reliability as needed. The AI Service Layer Pattern creates a stable abstraction that shields application logic from the complexities of AI operations.

Data pipeline architecture represents a critical component of AI-integrated systems, requiring specialized approaches to handle the unique characteristics of AI workloads. The pipeline must support immutable data transformations, comprehensive validation, and feature engineering while maintaining auditability and reproducibility. The implementation we explored demonstrates how to build pipelines that can handle both batch and streaming data processing, with proper error handling and recovery mechanisms.

Error handling and resilience patterns become even more critical in AI systems due to the probabilistic nature of AI operations and the potential for various failure modes. The Circuit Breaker Pattern prevents cascading failures when AI services become unavailable, while intelligent retry strategies handle transient errors appropriately. Fallback mechanisms ensure graceful degradation when primary AI services are unavailable, maintaining system functionality even under adverse conditions.

Security and privacy considerations in AI systems extend beyond traditional application security to address AI-specific vulnerabilities such as prompt injection attacks and data leakage through model outputs. The comprehensive security framework we implemented demonstrates how to protect both the AI system and user data through input sanitization, PII detection, access control, and output validation. These security measures must be integrated throughout the AI pipeline rather than treated as an afterthought.

Testing strategies for AI-integrated systems must account for the non-deterministic nature of AI outputs while ensuring system reliability and performance. The testing framework we developed addresses multiple layers of functionality, from unit tests for individual components to integration tests that verify the entire AI pipeline. Performance testing and robustness testing ensure that systems can handle production loads and various failure scenarios.

Monitoring and observability in AI systems require specialized approaches that track not only traditional system metrics but also AI-specific indicators such as model performance, confidence scores, and output quality. The comprehensive monitoring implementation provides the visibility needed to detect issues early and maintain system health in production environments.

Several key best practices emerge from our exploration of AI system design. 

First, embrace the probabilistic nature of AI systems rather than fighting against it. Design your architecture to handle variability in AI outputs and response times. 

Second, implement comprehensive error handling and fallback mechanisms from the beginning rather than adding them as an afterthought. AI systems will fail in ways that traditional systems do not, and your architecture must be prepared for these failures.

Third, invest heavily in monitoring and observability. AI systems can degrade gradually in ways that are not immediately apparent through traditional monitoring. Implement metrics that track AI-specific performance indicators and set up alerting that can detect subtle changes in system behavior. 

Fourth, design for flexibility and evolution. The AI landscape changes rapidly, and your architecture must be able to accommodate new models, providers, and techniques without requiring fundamental restructuring.

Fifth, prioritize security and privacy throughout the system design. AI systems often handle sensitive data and can inadvertently leak information through their outputs. Implement comprehensive security measures that address both traditional security concerns and AI-specific vulnerabilities. Sixth, maintain clear separation between AI logic and business logic. This separation enables independent evolution of each component and makes the system easier to test and maintain.

Seventh, implement comprehensive data validation and quality checks throughout your AI pipeline. Poor data quality leads to poor AI performance, and issues can compound as data flows through the system. Eighth, design for cost optimization from the beginning. AI operations can be expensive, and costs can quickly spiral out of control without proper monitoring and optimization.

The future of AI-integrated systems will likely see even greater complexity as AI capabilities continue to advance. Multi-modal AI systems that combine text, image, and audio processing will require even more sophisticated architectural approaches. Edge AI deployment will introduce new challenges around resource constraints and offline operation. Federated learning systems will require new patterns for distributed AI training and inference.

As AI systems become more prevalent and critical to business operations, the importance of sound architectural principles becomes even greater. The patterns and practices we have explored provide a solid foundation for building AI-integrated systems that can scale, evolve, and operate reliably in production environments. However, the field continues to evolve rapidly, and practitioners must remain adaptable and continue learning as new challenges and opportunities emerge.

The key to success in AI system design lies not in any single pattern or technique, but in the thoughtful application of sound engineering principles adapted to the unique characteristics of AI workloads. By treating AI integration as a first-class architectural concern and applying the patterns and practices we have discussed, software engineers can build systems that harness the power of AI while maintaining the reliability, security, and maintainability that production systems require.

The journey of integrating AI into software systems is complex and challenging, but the potential benefits are enormous. With careful planning, thoughtful architecture, and adherence to proven patterns and practices, we can build AI-integrated systems that not only meet today's requirements but are also prepared for the exciting developments that lie ahead in the field of artificial intelligence.

No comments: