Introduction: The Paradigm Shift in Software Architecture
The integration of Artificial Intelligence and Generative AI capabilities into traditional software systems represents one of the most significant architectural challenges of our time. Unlike conventional software components that follow deterministic patterns, AI systems introduce probabilistic behavior, require substantial computational resources, and often depend on external services or models that may have varying response times and reliability characteristics.
When we design software systems that incorporate AI functionality, we must fundamentally rethink our approach to architecture. Traditional software engineering principles still apply, but they must be adapted to accommodate the unique characteristics of AI systems. These characteristics include non-deterministic outputs, potential latency variations, model versioning complexities, and the need for continuous learning and adaptation.
The key to successful AI integration lies in treating AI components as first-class citizens in our architecture while maintaining clear separation of concerns. This means designing systems where AI functionality is properly abstracted, easily testable, and can evolve independently of the core business logic. The architecture must be resilient enough to handle AI service failures gracefully while being flexible enough to accommodate rapid changes in AI technology.
Foundational Design Principles for AI-Integrated Systems
The foundation of any well-designed AI-integrated system rests on several core principles that extend traditional software design patterns. The principle of separation of concerns becomes even more critical when dealing with AI components because these components often have different scaling requirements, update cycles, and failure modes compared to traditional business logic.
Dependency inversion is particularly important in AI systems because it allows us to abstract away the specific implementation details of AI models or services. This abstraction enables us to switch between different AI providers, model versions, or even local versus cloud-based implementations without affecting the core application logic. The abstraction also facilitates testing by allowing us to inject mock AI services during development and testing phases.
Single responsibility principle takes on new dimensions in AI systems. Each component should have a clearly defined role, whether it's data preprocessing, model inference, result post-processing, or error handling. This clear delineation makes the system more maintainable and allows different team members to work on different aspects of the AI pipeline without interfering with each other.
The principle of fail-fast becomes crucial when dealing with AI systems because AI operations can be computationally expensive and time-consuming. It's better to validate inputs and preconditions early rather than discovering problems after expensive AI processing has already begun.
Core Architectural Patterns for AI Integration
The Adapter Pattern proves invaluable when integrating multiple AI services or models into a single system. Different AI providers often have varying APIs, data formats, and response structures. The Adapter Pattern allows us to create a unified interface that abstracts these differences, making it easier to switch between providers or use multiple providers simultaneously.
Let me illustrate this with a detailed code example. The following implementation demonstrates how to create an adapter pattern for different text generation services. The code defines a common interface that all AI text generators must implement, regardless of their underlying technology or provider. This interface includes a method for generating text and another for checking service health.
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import asyncio
class TextGeneratorInterface(ABC):
@abstractmethod
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
pass
@abstractmethod
async def health_check(self) -> bool:
pass
class OpenAIAdapter(TextGeneratorInterface):
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
self.api_key = api_key
self.model = model
self.client = None # Initialize OpenAI client here
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
try:
# Transform parameters to OpenAI format
openai_params = self._transform_parameters(parameters)
# Make API call to OpenAI
response = await self.client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**openai_params
)
return response.choices[0].message.content
except Exception as e:
raise AIServiceException(f"OpenAI generation failed: {str(e)}")
async def health_check(self) -> bool:
try:
# Simple health check by making a minimal API call
await self.generate_text("Hello", {"max_tokens": 1})
return True
except:
return False
def _transform_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
# Transform generic parameters to OpenAI-specific format
openai_params = {}
if "max_length" in parameters:
openai_params["max_tokens"] = parameters["max_length"]
if "temperature" in parameters:
openai_params["temperature"] = parameters["temperature"]
return openai_params
class AnthropicAdapter(TextGeneratorInterface):
def __init__(self, api_key: str, model: str = "claude-3-sonnet"):
self.api_key = api_key
self.model = model
self.client = None # Initialize Anthropic client here
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
try:
# Transform parameters to Anthropic format
anthropic_params = self._transform_parameters(parameters)
# Make API call to Anthropic
response = await self.client.messages.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
**anthropic_params
)
return response.content[0].text
except Exception as e:
raise AIServiceException(f"Anthropic generation failed: {str(e)}")
async def health_check(self) -> bool:
try:
await self.generate_text("Hello", {"max_tokens": 1})
return True
except:
return False
def _transform_parameters(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
# Transform generic parameters to Anthropic-specific format
anthropic_params = {}
if "max_length" in parameters:
anthropic_params["max_tokens"] = parameters["max_length"]
if "temperature" in parameters:
anthropic_params["temperature"] = parameters["temperature"]
return anthropic_params
class AIServiceException(Exception):
pass
This adapter implementation provides several important benefits. First, it allows the application to work with multiple AI providers through a single, consistent interface. The application code doesn't need to know whether it's talking to OpenAI, Anthropic, or any other provider. Second, each adapter handles the specific parameter transformations and error handling required for its respective service. Third, the health check method provides a way to monitor the availability of each service independently.
The Strategy Pattern complements the Adapter Pattern by allowing dynamic selection of AI services based on runtime conditions. This pattern is particularly useful when you want to choose different AI models based on factors such as cost, performance requirements, or current service availability.
Building on the previous example, here's how we can implement a strategy pattern that intelligently selects the appropriate AI service based on current conditions:
from enum import Enum
from typing import List, Optional
import time
class ServicePriority(Enum):
COST_OPTIMIZED = "cost_optimized"
PERFORMANCE_OPTIMIZED = "performance_optimized"
RELIABILITY_OPTIMIZED = "reliability_optimized"
class AIServiceStrategy:
def __init__(self, adapters: List[TextGeneratorInterface], priority: ServicePriority):
self.adapters = adapters
self.priority = priority
self.service_metrics = {}
self._initialize_metrics()
def _initialize_metrics(self):
for adapter in self.adapters:
self.service_metrics[adapter.__class__.__name__] = {
"response_times": [],
"success_rate": 1.0,
"last_health_check": time.time(),
"is_healthy": True,
"cost_per_request": self._get_cost_estimate(adapter)
}
def _get_cost_estimate(self, adapter) -> float:
# Return estimated cost per request for different providers
cost_mapping = {
"OpenAIAdapter": 0.002,
"AnthropicAdapter": 0.003,
"LocalModelAdapter": 0.0001
}
return cost_mapping.get(adapter.__class__.__name__, 0.001)
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
selected_adapter = await self._select_optimal_adapter()
if not selected_adapter:
raise AIServiceException("No healthy AI services available")
start_time = time.time()
try:
result = await selected_adapter.generate_text(prompt, parameters)
self._record_success(selected_adapter, time.time() - start_time)
return result
except Exception as e:
self._record_failure(selected_adapter)
# Try fallback adapter if available
fallback_adapter = await self._get_fallback_adapter(selected_adapter)
if fallback_adapter:
return await fallback_adapter.generate_text(prompt, parameters)
raise e
async def _select_optimal_adapter(self) -> Optional[TextGeneratorInterface]:
healthy_adapters = []
# Check health of all adapters
for adapter in self.adapters:
metrics = self.service_metrics[adapter.__class__.__name__]
# Perform health check if it's been too long since last check
if time.time() - metrics["last_health_check"] > 300: # 5 minutes
metrics["is_healthy"] = await adapter.health_check()
metrics["last_health_check"] = time.time()
if metrics["is_healthy"]:
healthy_adapters.append(adapter)
if not healthy_adapters:
return None
# Select based on priority strategy
if self.priority == ServicePriority.COST_OPTIMIZED:
return self._select_cheapest(healthy_adapters)
elif self.priority == ServicePriority.PERFORMANCE_OPTIMIZED:
return self._select_fastest(healthy_adapters)
elif self.priority == ServicePriority.RELIABILITY_OPTIMIZED:
return self._select_most_reliable(healthy_adapters)
return healthy_adapters[0] # Default fallback
def _select_cheapest(self, adapters: List[TextGeneratorInterface]) -> TextGeneratorInterface:
return min(adapters, key=lambda a: self.service_metrics[a.__class__.__name__]["cost_per_request"])
def _select_fastest(self, adapters: List[TextGeneratorInterface]) -> TextGeneratorInterface:
def average_response_time(adapter):
metrics = self.service_metrics[adapter.__class__.__name__]
times = metrics["response_times"]
return sum(times) / len(times) if times else float('inf')
return min(adapters, key=average_response_time)
def _select_most_reliable(self, adapters: List[TextGeneratorInterface]) -> TextGeneratorInterface:
return max(adapters, key=lambda a: self.service_metrics[a.__class__.__name__]["success_rate"])
async def _get_fallback_adapter(self, failed_adapter: TextGeneratorInterface) -> Optional[TextGeneratorInterface]:
remaining_adapters = [a for a in self.adapters if a != failed_adapter]
if remaining_adapters:
# Simple fallback: return the most reliable remaining adapter
return self._select_most_reliable(remaining_adapters)
return None
def _record_success(self, adapter: TextGeneratorInterface, response_time: float):
metrics = self.service_metrics[adapter.__class__.__name__]
metrics["response_times"].append(response_time)
# Keep only last 100 response times
if len(metrics["response_times"]) > 100:
metrics["response_times"] = metrics["response_times"][-100:]
# Update success rate (simple moving average)
current_rate = metrics["success_rate"]
metrics["success_rate"] = (current_rate * 0.9) + (1.0 * 0.1)
def _record_failure(self, adapter: TextGeneratorInterface):
metrics = self.service_metrics[adapter.__class__.__name__]
current_rate = metrics["success_rate"]
metrics["success_rate"] = (current_rate * 0.9) + (0.0 * 0.1)
metrics["is_healthy"] = False
This strategy implementation provides intelligent service selection based on different optimization criteria. The system continuously monitors the performance and reliability of each service, adapting its selection strategy based on real-world performance data. The metrics collection allows the system to learn from experience and make increasingly better decisions about which service to use for each request.
The AI Service Layer Pattern
The AI Service Layer Pattern represents a crucial architectural component that encapsulates all AI-related functionality behind a clean, well-defined interface. This pattern serves as a bridge between your application's business logic and the underlying AI infrastructure, providing a stable API that remains consistent even as the underlying AI implementations change.
The service layer should handle several key responsibilities. It manages the lifecycle of AI operations, including initialization, execution, and cleanup. It provides a consistent error handling mechanism that translates AI-specific errors into application-domain errors. It implements caching strategies to improve performance and reduce costs. It manages configuration and parameter validation to ensure that AI operations receive valid inputs.
Here's a comprehensive implementation of an AI service layer that demonstrates these principles:
import asyncio
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime, timedelta
import hashlib
import json
@dataclass
class AIRequest:
operation_type: str
input_data: Dict[str, Any]
parameters: Dict[str, Any]
user_id: Optional[str] = None
session_id: Optional[str] = None
priority: int = 1
timeout_seconds: int = 30
@dataclass
class AIResponse:
result: Any
confidence_score: Optional[float] = None
processing_time: float = 0.0
model_version: Optional[str] = None
cached: bool = False
metadata: Dict[str, Any] = None
class AIServiceLayer:
def __init__(self, strategy: AIServiceStrategy, cache_manager=None, config_validator=None):
self.strategy = strategy
self.cache_manager = cache_manager or SimpleCacheManager()
self.config_validator = config_validator or ConfigValidator()
self.operation_handlers = {}
self._register_default_handlers()
def _register_default_handlers(self):
self.operation_handlers = {
"text_generation": self._handle_text_generation,
"text_classification": self._handle_text_classification,
"sentiment_analysis": self._handle_sentiment_analysis,
"summarization": self._handle_summarization
}
async def process_request(self, request: AIRequest) -> AIResponse:
# Validate the request
validation_result = self.config_validator.validate_request(request)
if not validation_result.is_valid:
raise AIServiceException(f"Invalid request: {validation_result.error_message}")
# Check cache first
cache_key = self._generate_cache_key(request)
cached_result = await self.cache_manager.get(cache_key)
if cached_result:
return AIResponse(
result=cached_result["result"],
confidence_score=cached_result.get("confidence_score"),
processing_time=0.0,
model_version=cached_result.get("model_version"),
cached=True,
metadata=cached_result.get("metadata", {})
)
# Process the request
start_time = time.time()
try:
handler = self.operation_handlers.get(request.operation_type)
if not handler:
raise AIServiceException(f"Unsupported operation type: {request.operation_type}")
result = await asyncio.wait_for(
handler(request),
timeout=request.timeout_seconds
)
processing_time = time.time() - start_time
# Cache the result
cache_data = {
"result": result.result,
"confidence_score": result.confidence_score,
"model_version": result.model_version,
"metadata": result.metadata or {}
}
await self.cache_manager.set(cache_key, cache_data, ttl=3600) # 1 hour TTL
result.processing_time = processing_time
return result
except asyncio.TimeoutError:
raise AIServiceException(f"Request timeout after {request.timeout_seconds} seconds")
except Exception as e:
processing_time = time.time() - start_time
await self._log_error(request, e, processing_time)
raise AIServiceException(f"AI processing failed: {str(e)}")
async def _handle_text_generation(self, request: AIRequest) -> AIResponse:
prompt = request.input_data.get("prompt")
if not prompt:
raise AIServiceException("Prompt is required for text generation")
# Preprocess the prompt if needed
processed_prompt = self._preprocess_prompt(prompt, request.parameters)
# Generate text using the strategy
result = await self.strategy.generate_text(processed_prompt, request.parameters)
# Post-process the result
processed_result = self._postprocess_text_result(result, request.parameters)
return AIResponse(
result=processed_result,
confidence_score=self._calculate_confidence(result),
model_version=self._get_current_model_version(),
metadata={"original_length": len(prompt), "generated_length": len(result)}
)
async def _handle_text_classification(self, request: AIRequest) -> AIResponse:
text = request.input_data.get("text")
categories = request.parameters.get("categories", [])
if not text:
raise AIServiceException("Text is required for classification")
if not categories:
raise AIServiceException("Categories are required for classification")
# Create a classification prompt
classification_prompt = self._create_classification_prompt(text, categories)
# Use the text generation strategy for classification
result = await self.strategy.generate_text(classification_prompt, {
"temperature": 0.1, # Low temperature for more deterministic results
"max_length": 50
})
# Parse the classification result
parsed_result = self._parse_classification_result(result, categories)
return AIResponse(
result=parsed_result,
confidence_score=parsed_result.get("confidence", 0.0),
model_version=self._get_current_model_version(),
metadata={"categories_count": len(categories), "text_length": len(text)}
)
async def _handle_sentiment_analysis(self, request: AIRequest) -> AIResponse:
text = request.input_data.get("text")
if not text:
raise AIServiceException("Text is required for sentiment analysis")
sentiment_prompt = f"Analyze the sentiment of the following text and respond with only 'positive', 'negative', or 'neutral', followed by a confidence score from 0 to 1:\n\n{text}"
result = await self.strategy.generate_text(sentiment_prompt, {
"temperature": 0.1,
"max_length": 20
})
parsed_sentiment = self._parse_sentiment_result(result)
return AIResponse(
result=parsed_sentiment,
confidence_score=parsed_sentiment.get("confidence", 0.0),
model_version=self._get_current_model_version(),
metadata={"text_length": len(text)}
)
async def _handle_summarization(self, request: AIRequest) -> AIResponse:
text = request.input_data.get("text")
max_summary_length = request.parameters.get("max_summary_length", 150)
if not text:
raise AIServiceException("Text is required for summarization")
summary_prompt = f"Summarize the following text in no more than {max_summary_length} words:\n\n{text}"
result = await self.strategy.generate_text(summary_prompt, {
"temperature": 0.3,
"max_length": max_summary_length * 2 # Account for tokens vs words
})
return AIResponse(
result={"summary": result.strip()},
confidence_score=0.8, # Summarization typically has good confidence
model_version=self._get_current_model_version(),
metadata={
"original_length": len(text),
"summary_length": len(result),
"compression_ratio": len(result) / len(text)
}
)
def _generate_cache_key(self, request: AIRequest) -> str:
# Create a deterministic cache key based on request content
key_data = {
"operation_type": request.operation_type,
"input_data": request.input_data,
"parameters": request.parameters
}
key_string = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_string.encode()).hexdigest()
def _preprocess_prompt(self, prompt: str, parameters: Dict[str, Any]) -> str:
# Add any necessary preprocessing logic
if parameters.get("add_context", False):
context = parameters.get("context", "")
return f"Context: {context}\n\nPrompt: {prompt}"
return prompt
def _postprocess_text_result(self, result: str, parameters: Dict[str, Any]) -> str:
# Add any necessary postprocessing logic
if parameters.get("trim_whitespace", True):
result = result.strip()
if parameters.get("max_output_length"):
max_length = parameters["max_output_length"]
if len(result) > max_length:
result = result[:max_length].rsplit(' ', 1)[0] + "..."
return result
def _calculate_confidence(self, result: str) -> float:
# Simple confidence calculation based on result characteristics
# In a real implementation, this might use model-specific confidence scores
if len(result) < 10:
return 0.3
elif len(result) > 1000:
return 0.9
else:
return 0.7
def _get_current_model_version(self) -> str:
# Return the current model version being used
return "v1.0.0" # This would be dynamic in a real implementation
def _create_classification_prompt(self, text: str, categories: List[str]) -> str:
categories_str = ", ".join(categories)
return f"Classify the following text into one of these categories: {categories_str}\n\nText: {text}\n\nCategory:"
def _parse_classification_result(self, result: str, categories: List[str]) -> Dict[str, Any]:
# Simple parsing logic - in practice, this would be more sophisticated
result_lower = result.lower().strip()
for category in categories:
if category.lower() in result_lower:
return {
"category": category,
"confidence": 0.8,
"raw_result": result
}
return {
"category": "unknown",
"confidence": 0.1,
"raw_result": result
}
def _parse_sentiment_result(self, result: str) -> Dict[str, Any]:
# Parse sentiment analysis result
result_lower = result.lower().strip()
if "positive" in result_lower:
sentiment = "positive"
elif "negative" in result_lower:
sentiment = "negative"
elif "neutral" in result_lower:
sentiment = "neutral"
else:
sentiment = "unknown"
# Try to extract confidence score
confidence = 0.5 # Default confidence
import re
confidence_match = re.search(r'(\d+\.?\d*)', result)
if confidence_match:
try:
confidence = float(confidence_match.group(1))
if confidence > 1:
confidence = confidence / 100 # Convert percentage to decimal
except ValueError:
pass
return {
"sentiment": sentiment,
"confidence": confidence,
"raw_result": result
}
async def _log_error(self, request: AIRequest, error: Exception, processing_time: float):
# Log error details for monitoring and debugging
error_data = {
"operation_type": request.operation_type,
"error_type": type(error).__name__,
"error_message": str(error),
"processing_time": processing_time,
"user_id": request.user_id,
"session_id": request.session_id,
"timestamp": datetime.now().isoformat()
}
# In a real implementation, this would log to your monitoring system
print(f"AI Service Error: {json.dumps(error_data, indent=2)}")
class SimpleCacheManager:
def __init__(self):
self.cache = {}
self.expiry_times = {}
async def get(self, key: str) -> Optional[Dict[str, Any]]:
if key in self.cache:
if datetime.now() < self.expiry_times[key]:
return self.cache[key]
else:
# Expired, remove from cache
del self.cache[key]
del self.expiry_times[key]
return None
async def set(self, key: str, value: Dict[str, Any], ttl: int):
self.cache[key] = value
self.expiry_times[key] = datetime.now() + timedelta(seconds=ttl)
class ConfigValidator:
def validate_request(self, request: AIRequest):
# Implement validation logic
if not request.operation_type:
return ValidationResult(False, "Operation type is required")
if not request.input_data:
return ValidationResult(False, "Input data is required")
return ValidationResult(True, "")
@dataclass
class ValidationResult:
is_valid: bool
error_message: str
This AI Service Layer implementation provides a comprehensive foundation for AI integration. The service layer abstracts the complexity of AI operations behind a clean interface, handles caching to improve performance, validates inputs to prevent errors, and provides consistent error handling across all AI operations. The modular design allows for easy extension with new operation types and provides hooks for monitoring and logging.
Data Pipeline Architecture for AI Systems
The data pipeline represents the backbone of any AI-integrated system, responsible for ingesting, processing, transforming, and delivering data to AI models in the correct format and at the right time. Unlike traditional data pipelines, AI data pipelines must handle additional complexities such as feature engineering, data versioning, and real-time streaming requirements.
A well-designed AI data pipeline follows the principle of immutable data transformations, where each stage of the pipeline produces new data artifacts without modifying the original input. This approach enables reproducibility, simplifies debugging, and allows for easy rollback when issues are discovered. The pipeline should also implement proper data validation at each stage to ensure data quality and catch issues early in the process.
The following implementation demonstrates a comprehensive data pipeline architecture designed specifically for AI workloads:
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Callable, AsyncGenerator
from dataclasses import dataclass, field
from datetime import datetime
import json
import hashlib
from enum import Enum
class DataFormat(Enum):
JSON = "json"
CSV = "csv"
PARQUET = "parquet"
TEXT = "text"
BINARY = "binary"
class PipelineStage(Enum):
INGESTION = "ingestion"
VALIDATION = "validation"
TRANSFORMATION = "transformation"
FEATURE_ENGINEERING = "feature_engineering"
MODEL_PREPARATION = "model_preparation"
OUTPUT = "output"
@dataclass
class DataArtifact:
id: str
data: Any
format: DataFormat
metadata: Dict[str, Any] = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.now)
stage: PipelineStage = PipelineStage.INGESTION
parent_ids: List[str] = field(default_factory=list)
validation_results: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
if not self.id:
self.id = self._generate_id()
def _generate_id(self) -> str:
content_hash = hashlib.md5(str(self.data).encode()).hexdigest()[:8]
timestamp = self.created_at.strftime("%Y%m%d_%H%M%S")
return f"{self.stage.value}_{timestamp}_{content_hash}"
class PipelineProcessor(ABC):
def __init__(self, name: str, stage: PipelineStage):
self.name = name
self.stage = stage
self.metrics = {
"processed_count": 0,
"error_count": 0,
"total_processing_time": 0.0
}
@abstractmethod
async def process(self, artifact: DataArtifact) -> DataArtifact:
pass
async def execute(self, artifact: DataArtifact) -> DataArtifact:
start_time = time.time()
try:
result = await self.process(artifact)
result.stage = self.stage
result.parent_ids.append(artifact.id)
processing_time = time.time() - start_time
self.metrics["processed_count"] += 1
self.metrics["total_processing_time"] += processing_time
await self._log_success(artifact, result, processing_time)
return result
except Exception as e:
self.metrics["error_count"] += 1
await self._log_error(artifact, e, time.time() - start_time)
raise PipelineException(f"Processor {self.name} failed: {str(e)}")
async def _log_success(self, input_artifact: DataArtifact, output_artifact: DataArtifact, processing_time: float):
log_data = {
"processor": self.name,
"stage": self.stage.value,
"input_id": input_artifact.id,
"output_id": output_artifact.id,
"processing_time": processing_time,
"timestamp": datetime.now().isoformat()
}
print(f"Pipeline Success: {json.dumps(log_data)}")
async def _log_error(self, artifact: DataArtifact, error: Exception, processing_time: float):
log_data = {
"processor": self.name,
"stage": self.stage.value,
"artifact_id": artifact.id,
"error_type": type(error).__name__,
"error_message": str(error),
"processing_time": processing_time,
"timestamp": datetime.now().isoformat()
}
print(f"Pipeline Error: {json.dumps(log_data)}")
class DataIngestionProcessor(PipelineProcessor):
def __init__(self, source_config: Dict[str, Any]):
super().__init__("DataIngestion", PipelineStage.INGESTION)
self.source_config = source_config
async def process(self, artifact: DataArtifact) -> DataArtifact:
# Simulate data ingestion from various sources
source_type = self.source_config.get("type", "unknown")
if source_type == "api":
return await self._ingest_from_api(artifact)
elif source_type == "database":
return await self._ingest_from_database(artifact)
elif source_type == "file":
return await self._ingest_from_file(artifact)
else:
# Pass through for already ingested data
return artifact
async def _ingest_from_api(self, artifact: DataArtifact) -> DataArtifact:
# Simulate API data ingestion
api_url = self.source_config.get("url")
headers = self.source_config.get("headers", {})
# In a real implementation, this would make actual HTTP requests
simulated_data = {
"source": "api",
"url": api_url,
"data": artifact.data,
"ingested_at": datetime.now().isoformat()
}
return DataArtifact(
id="",
data=simulated_data,
format=DataFormat.JSON,
metadata={
"source_type": "api",
"source_url": api_url,
"ingestion_method": "http_request"
},
stage=PipelineStage.INGESTION
)
async def _ingest_from_database(self, artifact: DataArtifact) -> DataArtifact:
# Simulate database data ingestion
connection_string = self.source_config.get("connection_string")
query = self.source_config.get("query")
simulated_data = {
"source": "database",
"query": query,
"data": artifact.data,
"ingested_at": datetime.now().isoformat()
}
return DataArtifact(
id="",
data=simulated_data,
format=DataFormat.JSON,
metadata={
"source_type": "database",
"query": query,
"ingestion_method": "sql_query"
},
stage=PipelineStage.INGESTION
)
async def _ingest_from_file(self, artifact: DataArtifact) -> DataArtifact:
# Simulate file data ingestion
file_path = self.source_config.get("path")
file_format = self.source_config.get("format", "json")
simulated_data = {
"source": "file",
"path": file_path,
"data": artifact.data,
"ingested_at": datetime.now().isoformat()
}
return DataArtifact(
id="",
data=simulated_data,
format=DataFormat(file_format),
metadata={
"source_type": "file",
"file_path": file_path,
"file_format": file_format,
"ingestion_method": "file_read"
},
stage=PipelineStage.INGESTION
)
class DataValidationProcessor(PipelineProcessor):
def __init__(self, validation_rules: List[Dict[str, Any]]):
super().__init__("DataValidation", PipelineStage.VALIDATION)
self.validation_rules = validation_rules
async def process(self, artifact: DataArtifact) -> DataArtifact:
validation_results = {}
for rule in self.validation_rules:
rule_name = rule["name"]
rule_type = rule["type"]
rule_config = rule.get("config", {})
try:
if rule_type == "schema_validation":
result = await self._validate_schema(artifact, rule_config)
elif rule_type == "data_quality":
result = await self._validate_data_quality(artifact, rule_config)
elif rule_type == "business_rules":
result = await self._validate_business_rules(artifact, rule_config)
else:
result = {"valid": False, "error": f"Unknown rule type: {rule_type}"}
validation_results[rule_name] = result
except Exception as e:
validation_results[rule_name] = {
"valid": False,
"error": str(e),
"exception_type": type(e).__name__
}
# Check if all validations passed
all_valid = all(result.get("valid", False) for result in validation_results.values())
if not all_valid:
failed_rules = [name for name, result in validation_results.items() if not result.get("valid", False)]
raise PipelineException(f"Data validation failed for rules: {failed_rules}")
# Create new artifact with validation results
validated_artifact = DataArtifact(
id="",
data=artifact.data,
format=artifact.format,
metadata={**artifact.metadata, "validation_passed": True},
stage=PipelineStage.VALIDATION,
validation_results=validation_results
)
return validated_artifact
async def _validate_schema(self, artifact: DataArtifact, config: Dict[str, Any]) -> Dict[str, Any]:
required_fields = config.get("required_fields", [])
field_types = config.get("field_types", {})
if not isinstance(artifact.data, dict):
return {"valid": False, "error": "Data must be a dictionary for schema validation"}
# Check required fields
missing_fields = [field for field in required_fields if field not in artifact.data]
if missing_fields:
return {"valid": False, "error": f"Missing required fields: {missing_fields}"}
# Check field types
type_errors = []
for field, expected_type in field_types.items():
if field in artifact.data:
actual_type = type(artifact.data[field]).__name__
if actual_type != expected_type:
type_errors.append(f"{field}: expected {expected_type}, got {actual_type}")
if type_errors:
return {"valid": False, "error": f"Type validation errors: {type_errors}"}
return {"valid": True, "message": "Schema validation passed"}
async def _validate_data_quality(self, artifact: DataArtifact, config: Dict[str, Any]) -> Dict[str, Any]:
min_records = config.get("min_records", 0)
max_null_percentage = config.get("max_null_percentage", 100)
if isinstance(artifact.data, list):
record_count = len(artifact.data)
if record_count < min_records:
return {"valid": False, "error": f"Insufficient records: {record_count} < {min_records}"}
# Additional quality checks would go here
return {"valid": True, "message": "Data quality validation passed"}
async def _validate_business_rules(self, artifact: DataArtifact, config: Dict[str, Any]) -> Dict[str, Any]:
# Implement custom business rule validation
rules = config.get("rules", [])
for rule in rules:
rule_expression = rule.get("expression")
rule_description = rule.get("description", "Unknown rule")
# Simple rule evaluation (in practice, this would be more sophisticated)
if rule_expression and not self._evaluate_rule(artifact.data, rule_expression):
return {"valid": False, "error": f"Business rule failed: {rule_description}"}
return {"valid": True, "message": "Business rules validation passed"}
def _evaluate_rule(self, data: Any, expression: str) -> bool:
# Simple rule evaluation - in practice, this would use a proper expression evaluator
# For demonstration purposes, we'll just return True
return True
class FeatureEngineeringProcessor(PipelineProcessor):
def __init__(self, feature_config: Dict[str, Any]):
super().__init__("FeatureEngineering", PipelineStage.FEATURE_ENGINEERING)
self.feature_config = feature_config
async def process(self, artifact: DataArtifact) -> DataArtifact:
if not isinstance(artifact.data, dict):
raise PipelineException("Feature engineering requires dictionary data")
engineered_features = {}
original_data = artifact.data.copy()
# Apply feature transformations
for feature_name, feature_def in self.feature_config.items():
try:
feature_value = await self._create_feature(original_data, feature_def)
engineered_features[feature_name] = feature_value
except Exception as e:
raise PipelineException(f"Failed to create feature {feature_name}: {str(e)}")
# Combine original data with engineered features
enhanced_data = {**original_data, **engineered_features}
return DataArtifact(
id="",
data=enhanced_data,
format=artifact.format,
metadata={
**artifact.metadata,
"feature_count": len(engineered_features),
"feature_names": list(engineered_features.keys())
},
stage=PipelineStage.FEATURE_ENGINEERING
)
async def _create_feature(self, data: Dict[str, Any], feature_def: Dict[str, Any]) -> Any:
feature_type = feature_def.get("type")
if feature_type == "text_length":
field_name = feature_def["source_field"]
text_value = data.get(field_name, "")
return len(str(text_value))
elif feature_type == "word_count":
field_name = feature_def["source_field"]
text_value = data.get(field_name, "")
return len(str(text_value).split())
elif feature_type == "categorical_encoding":
field_name = feature_def["source_field"]
mapping = feature_def["mapping"]
value = data.get(field_name)
return mapping.get(value, mapping.get("default", 0))
elif feature_type == "numerical_binning":
field_name = feature_def["source_field"]
bins = feature_def["bins"]
value = data.get(field_name, 0)
for i, bin_threshold in enumerate(bins):
if value <= bin_threshold:
return i
return len(bins)
elif feature_type == "text_sentiment_score":
field_name = feature_def["source_field"]
text_value = data.get(field_name, "")
# Simplified sentiment scoring
positive_words = ["good", "great", "excellent", "amazing", "wonderful"]
negative_words = ["bad", "terrible", "awful", "horrible", "disappointing"]
text_lower = str(text_value).lower()
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
return positive_count - negative_count
else:
raise PipelineException(f"Unknown feature type: {feature_type}")
class AIPipelineOrchestrator:
def __init__(self):
self.processors: List[PipelineProcessor] = []
self.pipeline_metadata = {
"created_at": datetime.now(),
"version": "1.0.0",
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0
}
def add_processor(self, processor: PipelineProcessor):
self.processors.append(processor)
async def execute_pipeline(self, initial_data: Any, data_format: DataFormat = DataFormat.JSON) -> DataArtifact:
# Create initial artifact
current_artifact = DataArtifact(
id="",
data=initial_data,
format=data_format,
metadata={"pipeline_version": self.pipeline_metadata["version"]},
stage=PipelineStage.INGESTION
)
self.pipeline_metadata["total_executions"] += 1
try:
# Execute each processor in sequence
for processor in self.processors:
current_artifact = await processor.execute(current_artifact)
await self._checkpoint_artifact(current_artifact)
self.pipeline_metadata["successful_executions"] += 1
await self._log_pipeline_success(current_artifact)
return current_artifact
except Exception as e:
self.pipeline_metadata["failed_executions"] += 1
await self._log_pipeline_failure(current_artifact, e)
raise PipelineException(f"Pipeline execution failed: {str(e)}")
async def execute_pipeline_stream(self, data_stream: AsyncGenerator[Any, None], data_format: DataFormat = DataFormat.JSON) -> AsyncGenerator[DataArtifact, None]:
async for data_item in data_stream:
try:
result = await self.execute_pipeline(data_item, data_format)
yield result
except Exception as e:
# Log error but continue processing other items
await self._log_stream_error(data_item, e)
continue
async def _checkpoint_artifact(self, artifact: DataArtifact):
# In a real implementation, this would save the artifact to persistent storage
checkpoint_data = {
"artifact_id": artifact.id,
"stage": artifact.stage.value,
"timestamp": datetime.now().isoformat(),
"metadata": artifact.metadata
}
print(f"Checkpoint: {json.dumps(checkpoint_data)}")
async def _log_pipeline_success(self, final_artifact: DataArtifact):
log_data = {
"pipeline_status": "success",
"final_artifact_id": final_artifact.id,
"final_stage": final_artifact.stage.value,
"execution_count": self.pipeline_metadata["total_executions"],
"timestamp": datetime.now().isoformat()
}
print(f"Pipeline Success: {json.dumps(log_data)}")
async def _log_pipeline_failure(self, artifact: DataArtifact, error: Exception):
log_data = {
"pipeline_status": "failure",
"failed_artifact_id": artifact.id,
"failed_stage": artifact.stage.value,
"error_type": type(error).__name__,
"error_message": str(error),
"execution_count": self.pipeline_metadata["total_executions"],
"timestamp": datetime.now().isoformat()
}
print(f"Pipeline Failure: {json.dumps(log_data)}")
async def _log_stream_error(self, data_item: Any, error: Exception):
log_data = {
"stream_error": True,
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": datetime.now().isoformat()
}
print(f"Stream Error: {json.dumps(log_data)}")
def get_pipeline_metrics(self) -> Dict[str, Any]:
processor_metrics = {}
for processor in self.processors:
processor_metrics[processor.name] = processor.metrics
return {
"pipeline_metadata": self.pipeline_metadata,
"processor_metrics": processor_metrics
}
class PipelineException(Exception):
pass
This data pipeline architecture provides a robust foundation for AI data processing. The pipeline supports multiple data formats, implements comprehensive validation and feature engineering capabilities, and provides detailed logging and metrics collection. The modular design allows for easy extension with new processors, and the checkpoint system enables recovery from failures. The streaming support allows for real-time data processing, which is essential for many AI applications.
Error Handling and Resilience Patterns
Error handling in AI-integrated systems requires a sophisticated approach that goes beyond traditional software error handling. AI systems introduce unique failure modes such as model inference timeouts, unexpected model outputs, rate limiting from external AI services, and gradual model performance degradation. A resilient AI system must anticipate these failures and implement appropriate recovery strategies.
The Circuit Breaker Pattern is particularly valuable in AI systems because it prevents cascading failures when AI services become unavailable or start responding slowly. When an AI service begins failing, the circuit breaker opens and prevents further requests from reaching the failing service, instead returning cached results or fallback responses. This approach protects the overall system stability while allowing the failing service time to recover.
Here's a comprehensive implementation of error handling and resilience patterns specifically designed for AI systems:
import asyncio
import time
from enum import Enum
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import random
import json
class CircuitBreakerState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class ErrorType(Enum):
TIMEOUT = "timeout"
RATE_LIMIT = "rate_limit"
MODEL_ERROR = "model_error"
NETWORK_ERROR = "network_error"
AUTHENTICATION_ERROR = "authentication_error"
QUOTA_EXCEEDED = "quota_exceeded"
UNKNOWN = "unknown"
@dataclass
class ErrorContext:
error_type: ErrorType
message: str
timestamp: datetime
request_id: Optional[str] = None
user_id: Optional[str] = None
retry_count: int = 0
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RetryPolicy:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_errors: List[ErrorType] = field(default_factory=lambda: [
ErrorType.TIMEOUT,
ErrorType.NETWORK_ERROR,
ErrorType.RATE_LIMIT
])
class AICircuitBreaker:
def __init__(self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception: type = Exception,
name: str = "AICircuitBreaker"):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.name = name
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
self.success_count = 0
self.total_requests = 0
# Metrics for monitoring
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"circuit_opened_count": 0,
"circuit_closed_count": 0,
"current_state": self.state.value,
"last_state_change": datetime.now().isoformat()
}
async def call(self, func: Callable, *args, **kwargs):
self.total_requests += 1
self.metrics["total_requests"] += 1
if self.state == CircuitBreakerState.OPEN:
if self._should_attempt_reset():
self._move_to_half_open()
else:
raise CircuitBreakerOpenException(f"Circuit breaker {self.name} is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
return (self.last_failure_time and
time.time() - self.last_failure_time >= self.recovery_timeout)
def _move_to_half_open(self):
self.state = CircuitBreakerState.HALF_OPEN
self.success_count = 0
self.metrics["current_state"] = self.state.value
self.metrics["last_state_change"] = datetime.now().isoformat()
print(f"Circuit breaker {self.name} moved to HALF_OPEN state")
def _on_success(self):
self.failure_count = 0
self.metrics["successful_requests"] += 1
if self.state == CircuitBreakerState.HALF_OPEN:
self.success_count += 1
if self.success_count >= 3: # Require 3 successes to close
self._move_to_closed()
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
self.metrics["failed_requests"] += 1
if self.state == CircuitBreakerState.HALF_OPEN:
self._move_to_open()
elif self.failure_count >= self.failure_threshold:
self._move_to_open()
def _move_to_open(self):
self.state = CircuitBreakerState.OPEN
self.metrics["circuit_opened_count"] += 1
self.metrics["current_state"] = self.state.value
self.metrics["last_state_change"] = datetime.now().isoformat()
print(f"Circuit breaker {self.name} moved to OPEN state after {self.failure_count} failures")
def _move_to_closed(self):
self.state = CircuitBreakerState.CLOSED
self.failure_count = 0
self.metrics["circuit_closed_count"] += 1
self.metrics["current_state"] = self.state.value
self.metrics["last_state_change"] = datetime.now().isoformat()
print(f"Circuit breaker {self.name} moved to CLOSED state")
def get_metrics(self) -> Dict[str, Any]:
return {
**self.metrics,
"failure_rate": self.metrics["failed_requests"] / max(self.metrics["total_requests"], 1),
"current_failure_count": self.failure_count,
"time_since_last_failure": time.time() - (self.last_failure_time or time.time())
}
class RetryHandler:
def __init__(self, policy: RetryPolicy):
self.policy = policy
async def execute_with_retry(self,
func: Callable,
error_context: ErrorContext,
*args, **kwargs) -> Any:
last_exception = None
for attempt in range(self.policy.max_attempts):
try:
if attempt > 0:
delay = self._calculate_delay(attempt)
await asyncio.sleep(delay)
result = await func(*args, **kwargs)
if attempt > 0:
await self._log_retry_success(error_context, attempt)
return result
except Exception as e:
last_exception = e
error_type = self._classify_error(e)
error_context.error_type = error_type
error_context.retry_count = attempt + 1
if not self._should_retry(error_type, attempt):
await self._log_retry_exhausted(error_context, e)
break
await self._log_retry_attempt(error_context, e, attempt)
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
delay = self.policy.base_delay * (self.policy.exponential_base ** (attempt - 1))
delay = min(delay, self.policy.max_delay)
if self.policy.jitter:
jitter = random.uniform(0, 0.1) * delay
delay += jitter
return delay
def _classify_error(self, error: Exception) -> ErrorType:
error_message = str(error).lower()
if "timeout" in error_message:
return ErrorType.TIMEOUT
elif "rate limit" in error_message or "too many requests" in error_message:
return ErrorType.RATE_LIMIT
elif "network" in error_message or "connection" in error_message:
return ErrorType.NETWORK_ERROR
elif "authentication" in error_message or "unauthorized" in error_message:
return ErrorType.AUTHENTICATION_ERROR
elif "quota" in error_message or "limit exceeded" in error_message:
return ErrorType.QUOTA_EXCEEDED
elif "model" in error_message:
return ErrorType.MODEL_ERROR
else:
return ErrorType.UNKNOWN
def _should_retry(self, error_type: ErrorType, attempt: int) -> bool:
if attempt >= self.policy.max_attempts - 1:
return False
return error_type in self.policy.retryable_errors
async def _log_retry_attempt(self, context: ErrorContext, error: Exception, attempt: int):
log_data = {
"event": "retry_attempt",
"attempt": attempt + 1,
"max_attempts": self.policy.max_attempts,
"error_type": context.error_type.value,
"error_message": str(error),
"request_id": context.request_id,
"timestamp": datetime.now().isoformat()
}
print(f"Retry Attempt: {json.dumps(log_data)}")
async def _log_retry_success(self, context: ErrorContext, final_attempt: int):
log_data = {
"event": "retry_success",
"final_attempt": final_attempt + 1,
"request_id": context.request_id,
"timestamp": datetime.now().isoformat()
}
print(f"Retry Success: {json.dumps(log_data)}")
async def _log_retry_exhausted(self, context: ErrorContext, final_error: Exception):
log_data = {
"event": "retry_exhausted",
"total_attempts": self.policy.max_attempts,
"final_error_type": context.error_type.value,
"final_error_message": str(final_error),
"request_id": context.request_id,
"timestamp": datetime.now().isoformat()
}
print(f"Retry Exhausted: {json.dumps(log_data)}")
class FallbackStrategy:
def __init__(self):
self.fallback_handlers = {}
self.metrics = {
"fallback_invocations": 0,
"fallback_successes": 0,
"fallback_failures": 0
}
def register_fallback(self, error_type: ErrorType, handler: Callable):
self.fallback_handlers[error_type] = handler
async def execute_fallback(self,
error_context: ErrorContext,
original_args: tuple,
original_kwargs: dict) -> Any:
self.metrics["fallback_invocations"] += 1
handler = self.fallback_handlers.get(error_context.error_type)
if not handler:
handler = self.fallback_handlers.get(ErrorType.UNKNOWN)
if not handler:
self.metrics["fallback_failures"] += 1
raise FallbackException(f"No fallback handler for error type: {error_context.error_type}")
try:
result = await handler(error_context, *original_args, **original_kwargs)
self.metrics["fallback_successes"] += 1
await self._log_fallback_success(error_context)
return result
except Exception as e:
self.metrics["fallback_failures"] += 1
await self._log_fallback_failure(error_context, e)
raise FallbackException(f"Fallback handler failed: {str(e)}")
async def _log_fallback_success(self, context: ErrorContext):
log_data = {
"event": "fallback_success",
"original_error_type": context.error_type.value,
"request_id": context.request_id,
"timestamp": datetime.now().isoformat()
}
print(f"Fallback Success: {json.dumps(log_data)}")
async def _log_fallback_failure(self, context: ErrorContext, error: Exception):
log_data = {
"event": "fallback_failure",
"original_error_type": context.error_type.value,
"fallback_error": str(error),
"request_id": context.request_id,
"timestamp": datetime.now().isoformat()
}
print(f"Fallback Failure: {json.dumps(log_data)}")
class ResilientAIService:
def __init__(self,
ai_service: Any,
circuit_breaker: AICircuitBreaker,
retry_handler: RetryHandler,
fallback_strategy: FallbackStrategy):
self.ai_service = ai_service
self.circuit_breaker = circuit_breaker
self.retry_handler = retry_handler
self.fallback_strategy = fallback_strategy
# Register default fallback handlers
self._register_default_fallbacks()
def _register_default_fallbacks(self):
self.fallback_strategy.register_fallback(
ErrorType.TIMEOUT,
self._timeout_fallback
)
self.fallback_strategy.register_fallback(
ErrorType.RATE_LIMIT,
self._rate_limit_fallback
)
self.fallback_strategy.register_fallback(
ErrorType.MODEL_ERROR,
self._model_error_fallback
)
self.fallback_strategy.register_fallback(
ErrorType.UNKNOWN,
self._generic_fallback
)
async def generate_text(self, prompt: str, parameters: Dict[str, Any], request_id: str = None) -> str:
error_context = ErrorContext(
error_type=ErrorType.UNKNOWN,
message="",
timestamp=datetime.now(),
request_id=request_id or self._generate_request_id()
)
try:
# First, try with circuit breaker protection
result = await self.circuit_breaker.call(
self._execute_with_retry,
prompt,
parameters,
error_context
)
return result
except (CircuitBreakerOpenException, Exception) as e:
# If circuit breaker is open or all retries failed, try fallback
error_context.message = str(e)
error_context.error_type = self.retry_handler._classify_error(e)
try:
fallback_result = await self.fallback_strategy.execute_fallback(
error_context,
(prompt, parameters),
{}
)
return fallback_result
except FallbackException:
# If fallback also fails, raise the original error
raise AIServiceException(f"All resilience strategies failed: {str(e)}")
async def _execute_with_retry(self,
prompt: str,
parameters: Dict[str, Any],
error_context: ErrorContext) -> str:
return await self.retry_handler.execute_with_retry(
self.ai_service.generate_text,
error_context,
prompt,
parameters
)
async def _timeout_fallback(self,
error_context: ErrorContext,
prompt: str,
parameters: Dict[str, Any]) -> str:
# Return a simplified response for timeout scenarios
return f"[Response generated with reduced complexity due to timeout] Based on your request: {prompt[:100]}..."
async def _rate_limit_fallback(self,
error_context: ErrorContext,
prompt: str,
parameters: Dict[str, Any]) -> str:
# Return a cached or pre-generated response for rate limiting
return "[Cached response due to rate limiting] I'm currently experiencing high demand. Please try again in a few moments."
async def _model_error_fallback(self,
error_context: ErrorContext,
prompt: str,
parameters: Dict[str, Any]) -> str:
# Return a safe response when the model encounters an error
return "[Safe response due to model error] I encountered an issue processing your request. Please rephrase and try again."
async def _generic_fallback(self,
error_context: ErrorContext,
prompt: str,
parameters: Dict[str, Any]) -> str:
# Generic fallback for unknown errors
return "[Generic response due to service error] I'm temporarily unable to process your request. Please try again later."
def _generate_request_id(self) -> str:
import uuid
return str(uuid.uuid4())
def get_health_status(self) -> Dict[str, Any]:
return {
"circuit_breaker": self.circuit_breaker.get_metrics(),
"fallback_strategy": self.fallback_strategy.metrics,
"service_status": "operational" if self.circuit_breaker.state == CircuitBreakerState.CLOSED else "degraded"
}
# Custom exceptions
class CircuitBreakerOpenException(Exception):
pass
class FallbackException(Exception):
pass
class AIServiceException(Exception):
pass
# Example usage and testing
async def example_usage():
# Create a mock AI service for demonstration
class MockAIService:
def __init__(self):
self.call_count = 0
self.failure_rate = 0.3 # 30% failure rate for testing
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
self.call_count += 1
# Simulate various types of failures
if random.random() < self.failure_rate:
failure_type = random.choice([
"timeout",
"rate_limit",
"model_error",
"network_error"
])
if failure_type == "timeout":
raise Exception("Request timeout after 30 seconds")
elif failure_type == "rate_limit":
raise Exception("Rate limit exceeded - too many requests")
elif failure_type == "model_error":
raise Exception("Model inference error - invalid input")
else:
raise Exception("Network connection error")
# Simulate successful response
await asyncio.sleep(0.1) # Simulate processing time
return f"Generated response for: {prompt[:50]}..."
# Set up resilient AI service
mock_service = MockAIService()
circuit_breaker = AICircuitBreaker(failure_threshold=3, recovery_timeout=10)
retry_policy = RetryPolicy(max_attempts=3, base_delay=0.5)
retry_handler = RetryHandler(retry_policy)
fallback_strategy = FallbackStrategy()
resilient_service = ResilientAIService(
mock_service,
circuit_breaker,
retry_handler,
fallback_strategy
)
# Test the resilient service
for i in range(10):
try:
result = await resilient_service.generate_text(
f"Test prompt {i}",
{"temperature": 0.7},
f"request_{i}"
)
print(f"Request {i}: Success - {result[:100]}")
except Exception as e:
print(f"Request {i}: Failed - {str(e)}")
await asyncio.sleep(0.5)
# Print final health status
health_status = resilient_service.get_health_status()
print(f"\nFinal Health Status: {json.dumps(health_status, indent=2)}")
# Uncomment to run the example
# asyncio.run(example_usage())
This resilience implementation provides comprehensive error handling for AI systems. The circuit breaker prevents cascade failures by temporarily blocking requests to failing services. The retry handler implements intelligent retry logic with exponential backoff and jitter. The fallback strategy provides graceful degradation when primary AI services are unavailable. Together, these patterns ensure that AI-integrated systems remain operational even when individual AI components experience failures.
Security and Privacy Considerations
Security in AI-integrated systems encompasses traditional application security concerns as well as AI-specific vulnerabilities such as prompt injection attacks, data poisoning, model extraction, and privacy leakage through model outputs. A comprehensive security strategy must address these unique challenges while maintaining the functionality and performance of the AI system.
Data privacy is particularly critical in AI systems because models can inadvertently memorize and later reveal sensitive information from their training data. Additionally, user inputs to AI systems often contain personal or confidential information that must be protected throughout the processing pipeline. The following implementation demonstrates a comprehensive security and privacy framework for AI systems:
import hashlib
import hmac
import secrets
import re
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import asyncio
class SecurityLevel(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class PIIType(Enum):
EMAIL = "email"
PHONE = "phone"
SSN = "ssn"
CREDIT_CARD = "credit_card"
NAME = "name"
ADDRESS = "address"
IP_ADDRESS = "ip_address"
CUSTOM = "custom"
@dataclass
class SecurityContext:
user_id: str
session_id: str
security_level: SecurityLevel
permissions: Set[str] = field(default_factory=set)
ip_address: Optional[str] = None
user_agent: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class PIIDetectionResult:
found_pii: bool
pii_types: List[PIIType]
masked_content: str
confidence_scores: Dict[PIIType, float]
locations: Dict[PIIType, List[tuple]] # (start, end) positions
class InputSanitizer:
def __init__(self):
self.dangerous_patterns = [
# Prompt injection patterns
r"ignore\s+previous\s+instructions",
r"forget\s+everything\s+above",
r"act\s+as\s+if\s+you\s+are",
r"pretend\s+to\s+be",
r"role\s*:\s*system",
r"<\s*system\s*>",
r"```\s*system",
# Code injection patterns
r"<script[^>]*>",
r"javascript:",
r"eval\s*\(",
r"exec\s*\(",
r"import\s+os",
r"__import__",
# SQL injection patterns
r"union\s+select",
r"drop\s+table",
r"delete\s+from",
r"insert\s+into",
# Command injection patterns
r";\s*rm\s+",
r";\s*cat\s+",
r";\s*ls\s+",
r"\|\s*nc\s+",
]
self.compiled_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.dangerous_patterns]
# Maximum input lengths to prevent resource exhaustion
self.max_input_length = 10000
self.max_parameter_count = 50
async def sanitize_input(self, content: str, security_context: SecurityContext) -> str:
if not content:
return content
# Check input length
if len(content) > self.max_input_length:
raise SecurityException(f"Input exceeds maximum length of {self.max_input_length} characters")
# Detect and block dangerous patterns
for pattern in self.compiled_patterns:
if pattern.search(content):
await self._log_security_violation(
"dangerous_pattern_detected",
pattern.pattern,
security_context
)
raise SecurityException(f"Potentially dangerous input pattern detected")
# Normalize and sanitize content
sanitized = self._normalize_content(content)
sanitized = self._remove_control_characters(sanitized)
return sanitized
async def sanitize_parameters(self, parameters: Dict[str, Any], security_context: SecurityContext) -> Dict[str, Any]:
if len(parameters) > self.max_parameter_count:
raise SecurityException(f"Too many parameters: {len(parameters)} > {self.max_parameter_count}")
sanitized_params = {}
for key, value in parameters.items():
# Sanitize parameter keys
if not self._is_safe_parameter_name(key):
await self._log_security_violation(
"unsafe_parameter_name",
key,
security_context
)
continue
# Sanitize parameter values
if isinstance(value, str):
sanitized_value = await self.sanitize_input(value, security_context)
elif isinstance(value, (int, float, bool)):
sanitized_value = value
elif isinstance(value, dict):
sanitized_value = await self.sanitize_parameters(value, security_context)
elif isinstance(value, list):
sanitized_value = [await self.sanitize_input(str(item), security_context)
if isinstance(item, str) else item for item in value]
else:
# Convert unknown types to string and sanitize
sanitized_value = await self.sanitize_input(str(value), security_context)
sanitized_params[key] = sanitized_value
return sanitized_params
def _normalize_content(self, content: str) -> str:
# Normalize unicode characters
import unicodedata
normalized = unicodedata.normalize('NFKC', content)
# Remove excessive whitespace
normalized = re.sub(r'\s+', ' ', normalized)
return normalized.strip()
def _remove_control_characters(self, content: str) -> str:
# Remove control characters except common ones like newlines and tabs
allowed_control = {'\n', '\t', '\r'}
return ''.join(char for char in content
if char in allowed_control or not char.iscntrl())
def _is_safe_parameter_name(self, name: str) -> bool:
# Only allow alphanumeric characters and underscores
return re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', name) is not None
async def _log_security_violation(self, violation_type: str, details: str, context: SecurityContext):
log_data = {
"event": "security_violation",
"violation_type": violation_type,
"details": details,
"user_id": context.user_id,
"session_id": context.session_id,
"ip_address": context.ip_address,
"timestamp": datetime.now().isoformat()
}
print(f"Security Violation: {json.dumps(log_data)}")
class PIIDetector:
def __init__(self):
self.pii_patterns = {
PIIType.EMAIL: re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
PIIType.PHONE: re.compile(r'\b(?:\+?1[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})\b'),
PIIType.SSN: re.compile(r'\b\d{3}-?\d{2}-?\d{4}\b'),
PIIType.CREDIT_CARD: re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
PIIType.IP_ADDRESS: re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'),
}
# Name detection patterns (simple approach)
self.name_patterns = [
re.compile(r'\b[A-Z][a-z]+ [A-Z][a-z]+\b'), # First Last
re.compile(r'\b[A-Z][a-z]+ [A-Z]\. [A-Z][a-z]+\b'), # First M. Last
]
# Address patterns (simplified)
self.address_patterns = [
re.compile(r'\b\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd)\b', re.IGNORECASE),
]
async def detect_pii(self, content: str) -> PIIDetectionResult:
found_pii = False
pii_types = []
confidence_scores = {}
locations = {}
masked_content = content
# Check each PII type
for pii_type, pattern in self.pii_patterns.items():
matches = list(pattern.finditer(content))
if matches:
found_pii = True
pii_types.append(pii_type)
confidence_scores[pii_type] = self._calculate_confidence(pii_type, matches)
locations[pii_type] = [(match.start(), match.end()) for match in matches]
# Mask the PII in the content
masked_content = pattern.sub(self._get_mask_string(pii_type), masked_content)
# Check for names
name_matches = []
for pattern in self.name_patterns:
name_matches.extend(pattern.finditer(content))
if name_matches:
found_pii = True
pii_types.append(PIIType.NAME)
confidence_scores[PIIType.NAME] = 0.7 # Medium confidence for name detection
locations[PIIType.NAME] = [(match.start(), match.end()) for match in name_matches]
# Mask names
for pattern in self.name_patterns:
masked_content = pattern.sub("[NAME]", masked_content)
# Check for addresses
address_matches = []
for pattern in self.address_patterns:
address_matches.extend(pattern.finditer(content))
if address_matches:
found_pii = True
pii_types.append(PIIType.ADDRESS)
confidence_scores[PIIType.ADDRESS] = 0.6 # Lower confidence for address detection
locations[PIIType.ADDRESS] = [(match.start(), match.end()) for match in address_matches]
# Mask addresses
for pattern in self.address_patterns:
masked_content = pattern.sub("[ADDRESS]", masked_content)
return PIIDetectionResult(
found_pii=found_pii,
pii_types=pii_types,
masked_content=masked_content,
confidence_scores=confidence_scores,
locations=locations
)
def _calculate_confidence(self, pii_type: PIIType, matches: List) -> float:
# Simple confidence calculation based on pattern strength
confidence_map = {
PIIType.EMAIL: 0.95,
PIIType.PHONE: 0.85,
PIIType.SSN: 0.98,
PIIType.CREDIT_CARD: 0.90,
PIIType.IP_ADDRESS: 0.80,
}
return confidence_map.get(pii_type, 0.5)
def _get_mask_string(self, pii_type: PIIType) -> str:
mask_map = {
PIIType.EMAIL: "[EMAIL]",
PIIType.PHONE: "[PHONE]",
PIIType.SSN: "[SSN]",
PIIType.CREDIT_CARD: "[CREDIT_CARD]",
PIIType.IP_ADDRESS: "[IP_ADDRESS]",
}
return mask_map.get(pii_type, "[PII]")
class AccessController:
def __init__(self):
self.permissions_cache = {}
self.rate_limits = {}
self.blocked_users = set()
# Define permission requirements for different operations
self.operation_permissions = {
"text_generation": {"ai_text_generation"},
"text_classification": {"ai_classification"},
"sentiment_analysis": {"ai_sentiment"},
"summarization": {"ai_summarization"},
"admin_operations": {"admin", "system_admin"}
}
async def check_access(self, operation: str, security_context: SecurityContext) -> bool:
# Check if user is blocked
if security_context.user_id in self.blocked_users:
await self._log_access_denied("user_blocked", operation, security_context)
return False
# Check rate limits
if not await self._check_rate_limit(security_context):
await self._log_access_denied("rate_limit_exceeded", operation, security_context)
return False
# Check permissions
required_permissions = self.operation_permissions.get(operation, set())
if required_permissions and not required_permissions.issubset(security_context.permissions):
await self._log_access_denied("insufficient_permissions", operation, security_context)
return False
# Check security level requirements
if not await self._check_security_level(operation, security_context):
await self._log_access_denied("insufficient_security_level", operation, security_context)
return False
await self._log_access_granted(operation, security_context)
return True
async def _check_rate_limit(self, security_context: SecurityContext) -> bool:
user_id = security_context.user_id
current_time = datetime.now()
# Initialize rate limit tracking for new users
if user_id not in self.rate_limits:
self.rate_limits[user_id] = {
"requests": [],
"window_start": current_time
}
user_limits = self.rate_limits[user_id]
# Clean old requests (sliding window of 1 hour)
window_duration = timedelta(hours=1)
cutoff_time = current_time - window_duration
user_limits["requests"] = [req_time for req_time in user_limits["requests"]
if req_time > cutoff_time]
# Check if user has exceeded rate limit (100 requests per hour)
max_requests = 100
if len(user_limits["requests"]) >= max_requests:
return False
# Add current request
user_limits["requests"].append(current_time)
return True
async def _check_security_level(self, operation: str, security_context: SecurityContext) -> bool:
# Define minimum security levels for operations
min_security_levels = {
"admin_operations": SecurityLevel.RESTRICTED,
"text_generation": SecurityLevel.INTERNAL,
"text_classification": SecurityLevel.PUBLIC,
"sentiment_analysis": SecurityLevel.PUBLIC,
"summarization": SecurityLevel.INTERNAL
}
required_level = min_security_levels.get(operation, SecurityLevel.PUBLIC)
# Define security level hierarchy
level_hierarchy = {
SecurityLevel.PUBLIC: 0,
SecurityLevel.INTERNAL: 1,
SecurityLevel.CONFIDENTIAL: 2,
SecurityLevel.RESTRICTED: 3
}
user_level_value = level_hierarchy.get(security_context.security_level, 0)
required_level_value = level_hierarchy.get(required_level, 0)
return user_level_value >= required_level_value
async def _log_access_granted(self, operation: str, context: SecurityContext):
log_data = {
"event": "access_granted",
"operation": operation,
"user_id": context.user_id,
"session_id": context.session_id,
"security_level": context.security_level.value,
"timestamp": datetime.now().isoformat()
}
print(f"Access Granted: {json.dumps(log_data)}")
async def _log_access_denied(self, reason: str, operation: str, context: SecurityContext):
log_data = {
"event": "access_denied",
"reason": reason,
"operation": operation,
"user_id": context.user_id,
"session_id": context.session_id,
"ip_address": context.ip_address,
"timestamp": datetime.now().isoformat()
}
print(f"Access Denied: {json.dumps(log_data)}")
class OutputSanitizer:
def __init__(self):
self.sensitive_patterns = [
# API keys and tokens
re.compile(r'[A-Za-z0-9]{32,}'), # Generic long alphanumeric strings
re.compile(r'sk-[A-Za-z0-9]{48}'), # OpenAI API keys
re.compile(r'Bearer\s+[A-Za-z0-9._-]+', re.IGNORECASE),
# Database connection strings
re.compile(r'mongodb://[^/\s]+', re.IGNORECASE),
re.compile(r'postgresql://[^/\s]+', re.IGNORECASE),
re.compile(r'mysql://[^/\s]+', re.IGNORECASE),
# File paths that might contain sensitive info
re.compile(r'/home/[^/\s]+/[^\s]*'),
re.compile(r'C:\\Users\\[^\\]+\\[^\s]*'),
]
self.max_output_length = 5000
async def sanitize_output(self, content: str, security_context: SecurityContext) -> str:
if not content:
return content
# Truncate if too long
if len(content) > self.max_output_length:
content = content[:self.max_output_length] + "... [truncated]"
# Remove sensitive patterns
sanitized = content
for pattern in self.sensitive_patterns:
sanitized = pattern.sub("[REDACTED]", sanitized)
# Additional sanitization based on security level
if security_context.security_level == SecurityLevel.PUBLIC:
sanitized = await self._apply_public_sanitization(sanitized)
return sanitized
async def _apply_public_sanitization(self, content: str) -> str:
# More aggressive sanitization for public-facing content
# Remove any remaining potential PII
pii_detector = PIIDetector()
pii_result = await pii_detector.detect_pii(content)
if pii_result.found_pii:
return pii_result.masked_content
return content
class SecureAIService:
def __init__(self,
ai_service: Any,
input_sanitizer: InputSanitizer,
pii_detector: PIIDetector,
access_controller: AccessController,
output_sanitizer: OutputSanitizer):
self.ai_service = ai_service
self.input_sanitizer = input_sanitizer
self.pii_detector = pii_detector
self.access_controller = access_controller
self.output_sanitizer = output_sanitizer
# Audit trail
self.audit_log = []
async def secure_generate_text(self,
prompt: str,
parameters: Dict[str, Any],
security_context: SecurityContext) -> str:
audit_entry = {
"operation": "text_generation",
"user_id": security_context.user_id,
"timestamp": datetime.now().isoformat(),
"steps": []
}
try:
# Step 1: Access control
if not await self.access_controller.check_access("text_generation", security_context):
audit_entry["steps"].append("access_denied")
raise SecurityException("Access denied for text generation")
audit_entry["steps"].append("access_granted")
# Step 2: Input sanitization
sanitized_prompt = await self.input_sanitizer.sanitize_input(prompt, security_context)
sanitized_parameters = await self.input_sanitizer.sanitize_parameters(parameters, security_context)
audit_entry["steps"].append("input_sanitized")
# Step 3: PII detection and handling
pii_result = await self.pii_detector.detect_pii(sanitized_prompt)
if pii_result.found_pii:
if security_context.security_level == SecurityLevel.PUBLIC:
# For public users, reject requests with PII
audit_entry["steps"].append("pii_detected_rejected")
raise SecurityException("PII detected in input - request rejected")
else:
# For internal users, use masked version
sanitized_prompt = pii_result.masked_content
audit_entry["steps"].append("pii_detected_masked")
else:
audit_entry["steps"].append("no_pii_detected")
# Step 4: AI processing
result = await self.ai_service.generate_text(sanitized_prompt, sanitized_parameters)
audit_entry["steps"].append("ai_processing_completed")
# Step 5: Output sanitization
sanitized_result = await self.output_sanitizer.sanitize_output(result, security_context)
audit_entry["steps"].append("output_sanitized")
# Step 6: Final PII check on output
output_pii_result = await self.pii_detector.detect_pii(sanitized_result)
if output_pii_result.found_pii:
sanitized_result = output_pii_result.masked_content
audit_entry["steps"].append("output_pii_masked")
audit_entry["status"] = "success"
return sanitized_result
except Exception as e:
audit_entry["status"] = "error"
audit_entry["error"] = str(e)
raise e
finally:
self.audit_log.append(audit_entry)
await self._log_audit_entry(audit_entry)
async def _log_audit_entry(self, audit_entry: Dict[str, Any]):
print(f"Security Audit: {json.dumps(audit_entry)}")
def get_security_metrics(self) -> Dict[str, Any]:
total_requests = len(self.audit_log)
successful_requests = sum(1 for entry in self.audit_log if entry["status"] == "success")
return {
"total_requests": total_requests,
"successful_requests": successful_requests,
"error_rate": (total_requests - successful_requests) / max(total_requests, 1),
"recent_audit_entries": self.audit_log[-10:] if self.audit_log else []
}
class SecurityException(Exception):
pass
# Example usage
async def example_secure_usage():
# Mock AI service
class MockAIService:
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
return f"AI Response: {prompt[:100]}... [Generated with parameters: {parameters}]"
# Set up security components
ai_service = MockAIService()
input_sanitizer = InputSanitizer()
pii_detector = PIIDetector()
access_controller = AccessController()
output_sanitizer = OutputSanitizer()
secure_service = SecureAIService(
ai_service,
input_sanitizer,
pii_detector,
access_controller,
output_sanitizer
)
# Create security context
security_context = SecurityContext(
user_id="user123",
session_id="session456",
security_level=SecurityLevel.INTERNAL,
permissions={"ai_text_generation"},
ip_address="192.168.1.100"
)
# Test with normal input
try:
result = await secure_service.secure_generate_text(
"What is the capital of France?",
{"temperature": 0.7},
security_context
)
print(f"Normal request result: {result}")
except SecurityException as e:
print(f"Security error: {e}")
# Test with PII input
try:
result = await secure_service.secure_generate_text(
"My email is john.doe@example.com and my phone is 555-123-4567",
{"temperature": 0.7},
security_context
)
print(f"PII request result: {result}")
except SecurityException as e:
print(f"Security error: {e}")
# Print security metrics
metrics = secure_service.get_security_metrics()
print(f"Security metrics: {json.dumps(metrics, indent=2)}")
# Uncomment to run the example
# asyncio.run(example_secure_usage())
This security implementation provides comprehensive protection for AI systems. The input sanitizer prevents injection attacks and validates all inputs. The PII detector identifies and masks sensitive information. The access controller enforces permissions and rate limiting. The output sanitizer ensures that AI responses don't leak sensitive information. Together, these components create a robust security framework that protects both the AI system and user data.
Testing Strategies for AI-Integrated Systems
Testing AI-integrated systems presents unique challenges that go beyond traditional software testing. AI components introduce non-deterministic behavior, making it difficult to create repeatable tests with predictable outcomes. Additionally, AI systems often depend on external services, large datasets, and complex model behaviors that are difficult to mock or simulate accurately.
A comprehensive testing strategy for AI systems must address multiple layers of functionality, from unit tests for individual components to integration tests that verify the entire AI pipeline. The testing approach must also account for the probabilistic nature of AI outputs, performance characteristics under various loads, and the system's behavior when AI services are unavailable or performing poorly.
Here's a detailed implementation of testing strategies specifically designed for AI-integrated systems:
import asyncio
import pytest
import unittest
from unittest.mock import Mock, AsyncMock, patch
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass
import json
import time
import random
from datetime import datetime
import statistics
@dataclass
class TestResult:
test_name: str
passed: bool
execution_time: float
error_message: Optional[str] = None
metadata: Dict[str, Any] = None
@dataclass
class AITestCase:
name: str
input_data: Dict[str, Any]
expected_output_pattern: Optional[str] = None
expected_output_length_range: Optional[tuple] = None
expected_confidence_threshold: Optional[float] = None
max_execution_time: Optional[float] = None
should_fail: bool = False
failure_type: Optional[str] = None
class AIServiceMock:
def __init__(self, response_patterns: Dict[str, Any]):
self.response_patterns = response_patterns
self.call_count = 0
self.call_history = []
self.latency_simulation = 0.1
self.failure_rate = 0.0
self.responses = []
async def generate_text(self, prompt: str, parameters: Dict[str, Any]) -> str:
self.call_count += 1
call_info = {
"prompt": prompt,
"parameters": parameters,
"timestamp": datetime.now().isoformat()
}
self.call_history.append(call_info)
# Simulate latency
await asyncio.sleep(self.latency_simulation)
# Simulate failures
if random.random() < self.failure_rate:
raise Exception("Simulated AI service failure")
# Generate response based on patterns
response = self._generate_response(prompt, parameters)
self.responses.append(response)
return response
def _generate_response(self, prompt: str, parameters: Dict[str, Any]) -> str:
# Simple pattern matching for predictable test responses
prompt_lower = prompt.lower()
if "capital" in prompt_lower and "france" in prompt_lower:
return "The capital of France is Paris."
elif "sentiment" in prompt_lower:
return "positive"
elif "summarize" in prompt_lower:
return "This is a summary of the provided text."
elif "classify" in prompt_lower:
return "technology"
else:
return f"Generated response for: {prompt[:50]}..."
def reset(self):
self.call_count = 0
self.call_history = []
self.responses = []
def set_failure_rate(self, rate: float):
self.failure_rate = rate
def set_latency(self, latency: float):
self.latency_simulation = latency
class AIUnitTester:
def __init__(self):
self.test_results = []
self.mock_service = AIServiceMock({})
async def test_ai_service_layer(self, ai_service_layer):
"""Test the AI service layer with various inputs and scenarios"""
test_cases = [
AITestCase(
name="basic_text_generation",
input_data={
"operation_type": "text_generation",
"input_data": {"prompt": "What is the capital of France?"},
"parameters": {"temperature": 0.7}
},
expected_output_pattern="Paris",
max_execution_time=5.0
),
AITestCase(
name="text_classification",
input_data={
"operation_type": "text_classification",
"input_data": {"text": "This is about artificial intelligence"},
"parameters": {"categories": ["technology", "sports", "politics"]}
},
expected_output_pattern="technology",
max_execution_time=5.0
),
AITestCase(
name="sentiment_analysis",
input_data={
"operation_type": "sentiment_analysis",
"input_data": {"text": "I love this product!"},
"parameters": {}
},
expected_output_pattern="positive",
max_execution_time=5.0
),
AITestCase(
name="invalid_operation",
input_data={
"operation_type": "invalid_operation",
"input_data": {"text": "test"},
"parameters": {}
},
should_fail=True,
failure_type="AIServiceException"
),
AITestCase(
name="empty_input",
input_data={
"operation_type": "text_generation",
"input_data": {"prompt": ""},
"parameters": {}
},
should_fail=True,
failure_type="AIServiceException"
)
]
for test_case in test_cases:
result = await self._execute_test_case(ai_service_layer, test_case)
self.test_results.append(result)
async def _execute_test_case(self, ai_service_layer, test_case: AITestCase) -> TestResult:
start_time = time.time()
try:
# Create AI request from test case
from your_ai_module import AIRequest # Import your actual AIRequest class
request = AIRequest(
operation_type=test_case.input_data["operation_type"],
input_data=test_case.input_data["input_data"],
parameters=test_case.input_data["parameters"]
)
# Execute the request
response = await ai_service_layer.process_request(request)
execution_time = time.time() - start_time
# Check if test should have failed
if test_case.should_fail:
return TestResult(
test_name=test_case.name,
passed=False,
execution_time=execution_time,
error_message="Expected failure but test passed"
)
# Validate response
validation_result = self._validate_response(response, test_case)
return TestResult(
test_name=test_case.name,
passed=validation_result["passed"],
execution_time=execution_time,
error_message=validation_result.get("error_message"),
metadata={
"response_length": len(str(response.result)),
"confidence_score": response.confidence_score,
"cached": response.cached
}
)
except Exception as e:
execution_time = time.time() - start_time
# Check if this was an expected failure
if test_case.should_fail:
expected_error = test_case.failure_type
actual_error = type(e).__name__
if expected_error and expected_error in actual_error:
return TestResult(
test_name=test_case.name,
passed=True,
execution_time=execution_time,
metadata={"expected_failure": True}
)
return TestResult(
test_name=test_case.name,
passed=False,
execution_time=execution_time,
error_message=str(e)
)
def _validate_response(self, response, test_case: AITestCase) -> Dict[str, Any]:
"""Validate AI response against test case expectations"""
# Check execution time
if test_case.max_execution_time and response.processing_time > test_case.max_execution_time:
return {
"passed": False,
"error_message": f"Execution time {response.processing_time}s exceeded maximum {test_case.max_execution_time}s"
}
# Check output pattern
if test_case.expected_output_pattern:
result_str = str(response.result).lower()
pattern = test_case.expected_output_pattern.lower()
if pattern not in result_str:
return {
"passed": False,
"error_message": f"Expected pattern '{pattern}' not found in result '{result_str}'"
}
# Check output length
if test_case.expected_output_length_range:
result_length = len(str(response.result))
min_length, max_length = test_case.expected_output_length_range
if not (min_length <= result_length <= max_length):
return {
"passed": False,
"error_message": f"Result length {result_length} not in expected range [{min_length}, {max_length}]"
}
# Check confidence threshold
if test_case.expected_confidence_threshold and response.confidence_score:
if response.confidence_score < test_case.expected_confidence_threshold:
return {
"passed": False,
"error_message": f"Confidence score {response.confidence_score} below threshold {test_case.expected_confidence_threshold}"
}
return {"passed": True}
class AIIntegrationTester:
def __init__(self):
self.test_results = []
async def test_end_to_end_pipeline(self, pipeline_orchestrator, test_data: List[Dict[str, Any]]):
"""Test the complete AI pipeline from input to output"""
for i, data_item in enumerate(test_data):
test_name = f"e2e_pipeline_test_{i}"
start_time = time.time()
try:
result = await pipeline_orchestrator.execute_pipeline(data_item)
execution_time = time.time() - start_time
# Validate pipeline result
validation_passed = self._validate_pipeline_result(result, data_item)
self.test_results.append(TestResult(
test_name=test_name,
passed=validation_passed,
execution_time=execution_time,
metadata={
"final_stage": result.stage.value,
"artifact_id": result.id,
"data_format": result.format.value
}
))
except Exception as e:
execution_time = time.time() - start_time
self.test_results.append(TestResult(
test_name=test_name,
passed=False,
execution_time=execution_time,
error_message=str(e)
))
async def test_concurrent_requests(self, ai_service, num_concurrent: int = 10):
"""Test AI service under concurrent load"""
async def single_request(request_id: int):
try:
start_time = time.time()
result = await ai_service.generate_text(
f"Test request {request_id}",
{"temperature": 0.7}
)
execution_time = time.time() - start_time
return TestResult(
test_name=f"concurrent_request_{request_id}",
passed=True,
execution_time=execution_time,
metadata={"result_length": len(result)}
)
except Exception as e:
return TestResult(
test_name=f"concurrent_request_{request_id}",
passed=False,
execution_time=time.time() - start_time,
error_message=str(e)
)
# Execute concurrent requests
tasks = [single_request(i) for i in range(num_concurrent)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for result in results:
if isinstance(result, TestResult):
self.test_results.append(result)
else:
self.test_results.append(TestResult(
test_name="concurrent_request_exception",
passed=False,
execution_time=0.0,
error_message=str(result)
))
def _validate_pipeline_result(self, result, original_data) -> bool:
"""Validate that pipeline result meets basic requirements"""
# Check that result has required attributes
required_attributes = ['id', 'data', 'format', 'stage', 'metadata']
for attr in required_attributes:
if not hasattr(result, attr):
return False
# Check that data was processed (not identical to input)
if result.data == original_data:
return False
# Check that metadata contains expected information
if not result.metadata or 'validation_passed' not in result.metadata:
return False
return True
class AIPerformanceTester:
def __init__(self):
self.performance_metrics = {}
async def benchmark_ai_operations(self, ai_service, test_scenarios: List[Dict[str, Any]]):
"""Benchmark AI operations under various conditions"""
for scenario in test_scenarios:
scenario_name = scenario["name"]
operation_count = scenario.get("operation_count", 10)
concurrent_users = scenario.get("concurrent_users", 1)
print(f"Running benchmark scenario: {scenario_name}")
# Collect performance metrics
response_times = []
success_count = 0
error_count = 0
async def benchmark_operation():
try:
start_time = time.time()
await ai_service.generate_text(
scenario["prompt"],
scenario.get("parameters", {})
)
response_time = time.time() - start_time
response_times.append(response_time)
return True
except Exception:
return False
# Run concurrent operations
for batch in range(0, operation_count, concurrent_users):
batch_size = min(concurrent_users, operation_count - batch)
tasks = [benchmark_operation() for _ in range(batch_size)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if result is True:
success_count += 1
else:
error_count += 1
# Calculate metrics
if response_times:
self.performance_metrics[scenario_name] = {
"total_operations": operation_count,
"successful_operations": success_count,
"failed_operations": error_count,
"success_rate": success_count / operation_count,
"average_response_time": statistics.mean(response_times),
"median_response_time": statistics.median(response_times),
"min_response_time": min(response_times),
"max_response_time": max(response_times),
"p95_response_time": self._calculate_percentile(response_times, 95),
"p99_response_time": self._calculate_percentile(response_times, 99),
"throughput_ops_per_second": success_count / sum(response_times) if response_times else 0
}
def _calculate_percentile(self, data: List[float], percentile: int) -> float:
"""Calculate the specified percentile of the data"""
if not data:
return 0.0
sorted_data = sorted(data)
index = (percentile / 100.0) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
else:
lower_index = int(index)
upper_index = lower_index + 1
weight = index - lower_index
return sorted_data[lower_index] * (1 - weight) + sorted_data[upper_index] * weight
class AIRobustnessTester:
def __init__(self):
self.robustness_results = {}
async def test_error_handling(self, resilient_ai_service):
"""Test system behavior under various error conditions"""
error_scenarios = [
{
"name": "timeout_errors",
"error_type": "timeout",
"error_rate": 0.5,
"expected_behavior": "fallback_or_retry"
},
{
"name": "rate_limit_errors",
"error_type": "rate_limit",
"error_rate": 0.3,
"expected_behavior": "backoff_and_retry"
},
{
"name": "service_unavailable",
"error_type": "service_unavailable",
"error_rate": 1.0,
"expected_behavior": "circuit_breaker_open"
}
]
for scenario in error_scenarios:
scenario_name = scenario["name"]
print(f"Testing error scenario: {scenario_name}")
# Configure mock to simulate errors
if hasattr(resilient_ai_service, 'ai_service'):
if hasattr(resilient_ai_service.ai_service, 'set_failure_rate'):
resilient_ai_service.ai_service.set_failure_rate(scenario["error_rate"])
# Test multiple requests to observe behavior
results = []
for i in range(10):
try:
start_time = time.time()
result = await resilient_ai_service.generate_text(
f"Test prompt {i}",
{"temperature": 0.7},
f"request_{i}"
)
execution_time = time.time() - start_time
results.append({
"success": True,
"execution_time": execution_time,
"result_length": len(result)
})
except Exception as e:
execution_time = time.time() - start_time
results.append({
"success": False,
"execution_time": execution_time,
"error_type": type(e).__name__,
"error_message": str(e)
})
# Analyze results
success_count = sum(1 for r in results if r["success"])
avg_execution_time = statistics.mean([r["execution_time"] for r in results])
self.robustness_results[scenario_name] = {
"total_requests": len(results),
"successful_requests": success_count,
"success_rate": success_count / len(results),
"average_execution_time": avg_execution_time,
"expected_behavior": scenario["expected_behavior"],
"behavior_observed": self._analyze_behavior(results, scenario)
}
def _analyze_behavior(self, results: List[Dict[str, Any]], scenario: Dict[str, Any]) -> str:
"""Analyze the observed behavior pattern"""
success_count = sum(1 for r in results if r["success"])
if scenario["expected_behavior"] == "fallback_or_retry":
if success_count > 0:
return "fallback_working"
else:
return "fallback_failed"
elif scenario["expected_behavior"] == "backoff_and_retry":
# Check if execution times increase (indicating backoff)
execution_times = [r["execution_time"] for r in results]
if len(execution_times) > 1:
increasing_trend = all(execution_times[i] <= execution_times[i+1]
for i in range(len(execution_times)-1))
if increasing_trend:
return "backoff_observed"
return "backoff_not_observed"
elif scenario["expected_behavior"] == "circuit_breaker_open":
# Check if later requests fail faster (circuit breaker open)
if len(results) > 5:
early_times = [r["execution_time"] for r in results[:3]]
later_times = [r["execution_time"] for r in results[-3:]]
if statistics.mean(later_times) < statistics.mean(early_times):
return "circuit_breaker_working"
return "circuit_breaker_not_working"
return "unknown_behavior"
class AITestSuite:
def __init__(self):
self.unit_tester = AIUnitTester()
self.integration_tester = AIIntegrationTester()
self.performance_tester = AIPerformanceTester()
self.robustness_tester = AIRobustnessTester()
self.overall_results = {}
async def run_comprehensive_tests(self,
ai_service_layer,
pipeline_orchestrator,
resilient_ai_service):
"""Run all test suites and generate comprehensive report"""
print("Starting comprehensive AI system tests...")
# Unit tests
print("Running unit tests...")
await self.unit_tester.test_ai_service_layer(ai_service_layer)
# Integration tests
print("Running integration tests...")
test_data = [
{"text": "Sample text for processing", "category": "test"},
{"text": "Another test case", "category": "validation"},
{"text": "Final test item", "category": "verification"}
]
await self.integration_tester.test_end_to_end_pipeline(pipeline_orchestrator, test_data)
await self.integration_tester.test_concurrent_requests(ai_service_layer)
# Performance tests
print("Running performance tests...")
performance_scenarios = [
{
"name": "basic_text_generation",
"prompt": "Generate a short story about AI",
"parameters": {"temperature": 0.7, "max_length": 100},
"operation_count": 20,
"concurrent_users": 5
},
{
"name": "high_concurrency",
"prompt": "What is machine learning?",
"parameters": {"temperature": 0.5},
"operation_count": 50,
"concurrent_users": 10
}
]
await self.performance_tester.benchmark_ai_operations(ai_service_layer, performance_scenarios)
# Robustness tests
print("Running robustness tests...")
await self.robustness_tester.test_error_handling(resilient_ai_service)
# Compile overall results
self._compile_results()
print("All tests completed!")
return self.overall_results
def _compile_results(self):
"""Compile results from all test suites"""
# Unit test results
unit_tests = self.unit_tester.test_results
unit_passed = sum(1 for test in unit_tests if test.passed)
# Integration test results
integration_tests = self.integration_tester.test_results
integration_passed = sum(1 for test in integration_tests if test.passed)
# Performance results
performance_results = self.performance_tester.performance_metrics
# Robustness results
robustness_results = self.robustness_tester.robustness_results
self.overall_results = {
"test_summary": {
"unit_tests": {
"total": len(unit_tests),
"passed": unit_passed,
"failed": len(unit_tests) - unit_passed,
"pass_rate": unit_passed / len(unit_tests) if unit_tests else 0
},
"integration_tests": {
"total": len(integration_tests),
"passed": integration_passed,
"failed": len(integration_tests) - integration_passed,
"pass_rate": integration_passed / len(integration_tests) if integration_tests else 0
}
},
"performance_metrics": performance_results,
"robustness_metrics": robustness_results,
"detailed_results": {
"unit_test_details": [
{
"name": test.test_name,
"passed": test.passed,
"execution_time": test.execution_time,
"error": test.error_message
}
for test in unit_tests
],
"integration_test_details": [
{
"name": test.test_name,
"passed": test.passed,
"execution_time": test.execution_time,
"error": test.error_message
}
for test in integration_tests
]
}
}
def generate_test_report(self) -> str:
"""Generate a comprehensive test report"""
if not self.overall_results:
return "No test results available. Run tests first."
report = []
report.append("AI SYSTEM TEST REPORT")
report.append("=" * 50)
report.append("")
# Test summary
summary = self.overall_results["test_summary"]
report.append("TEST SUMMARY:")
report.append(f"Unit Tests: {summary['unit_tests']['passed']}/{summary['unit_tests']['total']} passed ({summary['unit_tests']['pass_rate']:.2%})")
report.append(f"Integration Tests: {summary['integration_tests']['passed']}/{summary['integration_tests']['total']} passed ({summary['integration_tests']['pass_rate']:.2%})")
report.append("")
# Performance metrics
if self.overall_results["performance_metrics"]:
report.append("PERFORMANCE METRICS:")
for scenario, metrics in self.overall_results["performance_metrics"].items():
report.append(f" {scenario}:")
report.append(f" Success Rate: {metrics['success_rate']:.2%}")
report.append(f" Average Response Time: {metrics['average_response_time']:.3f}s")
report.append(f" P95 Response Time: {metrics['p95_response_time']:.3f}s")
report.append(f" Throughput: {metrics['throughput_ops_per_second']:.2f} ops/sec")
report.append("")
# Robustness metrics
if self.overall_results["robustness_metrics"]:
report.append("ROBUSTNESS METRICS:")
for scenario, metrics in self.overall_results["robustness_metrics"].items():
report.append(f" {scenario}:")
report.append(f" Success Rate: {metrics['success_rate']:.2%}")
report.append(f" Expected Behavior: {metrics['expected_behavior']}")
report.append(f" Observed Behavior: {metrics['behavior_observed']}")
report.append("")
# Failed tests
failed_tests = []
for test in self.overall_results["detailed_results"]["unit_test_details"]:
if not test["passed"]:
failed_tests.append(f"Unit Test - {test['name']}: {test['error']}")
for test in self.overall_results["detailed_results"]["integration_test_details"]:
if not test["passed"]:
failed_tests.append(f"Integration Test - {test['name']}: {test['error']}")
if failed_tests:
report.append("FAILED TESTS:")
for failure in failed_tests:
report.append(f" - {failure}")
else:
report.append("ALL TESTS PASSED!")
return "\n".join(report)
# Example usage
async def example_test_execution():
"""Example of how to use the AI testing framework"""
# This would be your actual AI components
# For demonstration, we'll use mocks
class MockAIServiceLayer:
async def process_request(self, request):
# Mock implementation
from types import SimpleNamespace
return SimpleNamespace(
result="Mock AI response",
confidence_score=0.8,
processing_time=0.5,
cached=False
)
class MockPipelineOrchestrator:
async def execute_pipeline(self, data):
from types import SimpleNamespace
return SimpleNamespace(
id="test_artifact_123",
data={"processed": True, "original": data},
format=SimpleNamespace(value="json"),
stage=SimpleNamespace(value="output"),
metadata={"validation_passed": True}
)
class MockResilientAIService:
async def generate_text(self, prompt, parameters, request_id):
return f"Resilient response for: {prompt}"
# Create test suite
test_suite = AITestSuite()
# Create mock components
ai_service_layer = MockAIServiceLayer()
pipeline_orchestrator = MockPipelineOrchestrator()
resilient_ai_service = MockResilientAIService()
# Run comprehensive tests
results = await test_suite.run_comprehensive_tests(
ai_service_layer,
pipeline_orchestrator,
resilient_ai_service
)
# Generate and print report
report = test_suite.generate_test_report()
print(report)
return results
# Uncomment to run the example
# asyncio.run(example_test_execution())
This comprehensive testing framework addresses the unique challenges of testing AI-integrated systems. The unit tester validates individual AI components with predictable test cases. The integration tester verifies end-to-end functionality and concurrent behavior. The performance tester measures system performance under various loads. The robustness tester validates error handling and resilience patterns. Together, these testing strategies ensure that AI-integrated systems are reliable, performant, and robust in production environments.
Monitoring and Observability
Monitoring AI-integrated systems requires specialized approaches that go beyond traditional application monitoring. AI systems exhibit unique characteristics such as variable response times, probabilistic outputs, model drift over time, and complex dependencies on external AI services. Effective monitoring must track not only system health and performance but also AI-specific metrics such as model accuracy, confidence scores, and output quality.
Observability in AI systems encompasses three key pillars: metrics collection for quantitative analysis, logging for detailed event tracking, and tracing for understanding request flows through complex AI pipelines. The monitoring system must be capable of detecting subtle degradations in AI performance that might not trigger traditional alerting thresholds but could significantly impact user experience.
Here's a comprehensive implementation of monitoring and observability specifically designed for AI-integrated systems:
import asyncio
import time
import json
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import statistics
import threading
from collections import defaultdict, deque
import uuid
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
TIMER = "timer"
class AlertSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Metric:
name: str
value: float
metric_type: MetricType
timestamp: datetime
labels: Dict[str, str] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LogEntry:
level: str
message: str
timestamp: datetime
component: str
request_id: Optional[str] = None
user_id: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TraceSpan:
span_id: str
trace_id: str
operation_name: str
start_time: datetime
end_time: Optional[datetime] = None
parent_span_id: Optional[str] = None
tags: Dict[str, Any] = field(default_factory=dict)
logs: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class Alert:
id: str
name: str
severity: AlertSeverity
message: str
timestamp: datetime
metric_name: str
threshold_value: float
actual_value: float
metadata: Dict[str, Any] = field(default_factory=dict)
class AIMetricsCollector:
def __init__(self):
self.metrics = defaultdict(list)
self.metric_windows = defaultdict(lambda: deque(maxlen=1000))
self.lock = threading.Lock()
# AI-specific metric definitions
self.ai_metrics = {
"ai_request_duration": MetricType.HISTOGRAM,
"ai_request_count": MetricType.COUNTER,
"ai_error_count": MetricType.COUNTER,
"ai_confidence_score": MetricType.HISTOGRAM,
"ai_output_length": MetricType.HISTOGRAM,
"ai_cache_hit_rate": MetricType.GAUGE,
"ai_model_inference_time": MetricType.HISTOGRAM,
"ai_queue_depth": MetricType.GAUGE,
"ai_token_usage": MetricType.COUNTER,
"ai_cost_per_request": MetricType.HISTOGRAM
}
def record_metric(self, name: str, value: float, labels: Dict[str, str] = None, metadata: Dict[str, Any] = None):
"""Record a metric value"""
with self.lock:
metric_type = self.ai_metrics.get(name, MetricType.GAUGE)
metric = Metric(
name=name,
value=value,
metric_type=metric_type,
timestamp=datetime.now(),
labels=labels or {},
metadata=metadata or {}
)
self.metrics[name].append(metric)
self.metric_windows[name].append(metric)
def record_ai_request(self,
operation_type: str,
duration: float,
success: bool,
confidence_score: Optional[float] = None,
output_length: Optional[int] = None,
token_usage: Optional[int] = None,
cost: Optional[float] = None,
model_name: Optional[str] = None):
"""Record metrics for an AI request"""
labels = {
"operation_type": operation_type,
"model_name": model_name or "unknown",
"success": str(success)
}
# Record basic metrics
self.record_metric("ai_request_duration", duration, labels)
self.record_metric("ai_request_count", 1, labels)
if not success:
self.record_metric("ai_error_count", 1, labels)
# Record AI-specific metrics
if confidence_score is not None:
self.record_metric("ai_confidence_score", confidence_score, labels)
if output_length is not None:
self.record_metric("ai_output_length", output_length, labels)
if token_usage is not None:
self.record_metric("ai_token_usage", token_usage, labels)
if cost is not None:
self.record_metric("ai_cost_per_request", cost, labels)
def get_metric_summary(self, metric_name: str, time_window_minutes: int = 60) -> Dict[str, Any]:
"""Get summary statistics for a metric within a time window"""
cutoff_time = datetime.now() - timedelta(minutes=time_window_minutes)
with self.lock:
recent_metrics = [
m for m in self.metrics[metric_name]
if m.timestamp > cutoff_time
]
if not recent_metrics:
return {"count": 0, "summary": "No data available"}
values = [m.value for m in recent_metrics]
summary = {
"count": len(values),
"min": min(values),
"max": max(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"std_dev": statistics.stdev(values) if len(values) > 1 else 0,
"p95": self._calculate_percentile(values, 95),
"p99": self._calculate_percentile(values, 99)
}
return summary
def get_ai_health_metrics(self) -> Dict[str, Any]:
"""Get overall AI system health metrics"""
# Calculate success rate
success_metrics = [m for m in self.metrics["ai_request_count"] if m.labels.get("success") == "true"]
total_metrics = self.metrics["ai_request_count"]
success_rate = len(success_metrics) / len(total_metrics) if total_metrics else 0
# Calculate average confidence score
confidence_metrics = self.metrics["ai_confidence_score"]
avg_confidence = statistics.mean([m.value for m in confidence_metrics]) if confidence_metrics else 0
# Calculate average response time
duration_metrics = self.metrics["ai_request_duration"]
avg_duration = statistics.mean([m.value for m in duration_metrics]) if duration_metrics else 0
# Calculate error rate
error_count = len(self.metrics["ai_error_count"])
total_requests = len(total_metrics)
error_rate = error_count / total_requests if total_requests else 0
return {
"success_rate": success_rate,
"error_rate": error_rate,
"average_confidence_score": avg_confidence,
"average_response_time": avg_duration,
"total_requests": total_requests,
"total_errors": error_count,
"health_score": self._calculate_health_score(success_rate, avg_confidence, avg_duration, error_rate)
}
def _calculate_percentile(self, values: List[float], percentile: int) -> float:
"""Calculate percentile value"""
if not values:
return 0.0
sorted_values = sorted(values)
index = (percentile / 100.0) * (len(sorted_values) - 1)
if index.is_integer():
return sorted_values[int(index)]
else:
lower = int(index)
upper = lower + 1
weight = index - lower
return sorted_values[lower] * (1 - weight) + sorted_values[upper] * weight
def _calculate_health_score(self, success_rate: float, avg_confidence: float, avg_duration: float, error_rate: float) -> float:
"""Calculate overall health score (0-100)"""
# Normalize metrics to 0-1 scale
success_score = success_rate
confidence_score = avg_confidence if avg_confidence <= 1.0 else avg_confidence / 100.0
# Duration score (inverse relationship - lower is better)
duration_score = max(0, 1 - (avg_duration / 10.0)) # Assume 10s is very poor
# Error score (inverse relationship)
error_score = max(0, 1 - error_rate)
# Weighted average
health_score = (
success_score * 0.3 +
confidence_score * 0.25 +
duration_score * 0.25 +
error_score * 0.2
) * 100
return min(100, max(0, health_score))
class AILogger:
def __init__(self, component_name: str):
self.component_name = component_name
self.logs = deque(maxlen=10000)
self.lock = threading.Lock()
def log(self, level: str, message: str, request_id: str = None, user_id: str = None, **metadata):
"""Log a message with AI-specific context"""
log_entry = LogEntry(
level=level,
message=message,
timestamp=datetime.now(),
component=self.component_name,
request_id=request_id,
user_id=user_id,
metadata=metadata
)
with self.lock:
self.logs.append(log_entry)
# Print to console (in production, this would go to your logging system)
self._print_log(log_entry)
def log_ai_request_start(self, request_id: str, operation_type: str, user_id: str = None, **metadata):
"""Log the start of an AI request"""
self.log(
"INFO",
f"AI request started: {operation_type}",
request_id=request_id,
user_id=user_id,
operation_type=operation_type,
**metadata
)
def log_ai_request_complete(self, request_id: str, operation_type: str, duration: float, success: bool, **metadata):
"""Log the completion of an AI request"""
level = "INFO" if success else "ERROR"
status = "completed successfully" if success else "failed"
self.log(
level,
f"AI request {status}: {operation_type} (duration: {duration:.3f}s)",
request_id=request_id,
operation_type=operation_type,
duration=duration,
success=success,
**metadata
)
def log_ai_error(self, request_id: str, operation_type: str, error: Exception, **metadata):
"""Log an AI-related error"""
self.log(
"ERROR",
f"AI error in {operation_type}: {str(error)}",
request_id=request_id,
operation_type=operation_type,
error_type=type(error).__name__,
error_message=str(error),
**metadata
)
def log_model_performance(self, model_name: str, confidence_score: float, processing_time: float, **metadata):
"""Log model performance metrics"""
self.log(
"INFO",
f"Model performance: {model_name} (confidence: {confidence_score:.3f}, time: {processing_time:.3f}s)",
model_name=model_name,
confidence_score=confidence_score,
processing_time=processing_time,
**metadata
)
def get_recent_logs(self, count: int = 100, level_filter: str = None) -> List[LogEntry]:
"""Get recent log entries"""
with self.lock:
logs = list(self.logs)
if level_filter:
logs = [log for log in logs if log.level == level_filter]
return logs[-count:]
def _print_log(self, log_entry: LogEntry):
"""Print log entry to console"""
timestamp = log_entry.timestamp.strftime("%Y-%m-%d %H:%M:%S")
request_info = f" [{log_entry.request_id}]" if log_entry.request_id else ""
print(f"[{timestamp}] {log_entry.level} {log_entry.component}{request_info}: {log_entry.message}")
class AITracer:
def __init__(self):
self.active_spans = {}
self.completed_traces = deque(maxlen=1000)
self.lock = threading.Lock()
def start_span(self, operation_name: str, parent_span_id: str = None, trace_id: str = None) -> str:
"""Start a new trace span"""
span_id = str(uuid.uuid4())
if not trace_id:
trace_id = str(uuid.uuid4())
span = TraceSpan(
span_id=span_id,
trace_id=trace_id,
operation_name=operation_name,
start_time=datetime.now(),
parent_span_id=parent_span_id
)
with self.lock:
self.active_spans[span_id] = span
return span_id
def finish_span(self, span_id: str, tags: Dict[str, Any] = None, error: Exception = None):
"""Finish a trace span"""
with self.lock:
span = self.active_spans.pop(span_id, None)
if not span:
return
span.end_time = datetime.now()
if tags:
span.tags.update(tags)
if error:
span.tags["error"] = True
span.tags["error_type"] = type(error).__name__
span.tags["error_message"] = str(error)
# Add to completed traces
with self.lock:
self.completed_traces.append(span)
def add_span_log(self, span_id: str, message: str, **fields):
"""Add a log entry to a span"""
with self.lock:
span = self.active_spans.get(span_id)
if span:
log_entry = {
"timestamp": datetime.now().isoformat(),
"message": message,
**fields
}
span.logs.append(log_entry)
def get_trace(self, trace_id: str) -> List[TraceSpan]:
"""Get all spans for a trace"""
with self.lock:
spans = [span for span in self.completed_traces if span.trace_id == trace_id]
return sorted(spans, key=lambda s: s.start_time)
def get_trace_duration(self, trace_id: str) -> Optional[float]:
"""Get total duration of a trace"""
spans = self.get_trace(trace_id)
if not spans:
return None
start_time = min(span.start_time for span in spans)
end_time = max(span.end_time for span in spans if span.end_time)
if end_time:
return (end_time - start_time).total_seconds()
return None
class AIAlertManager:
def __init__(self, metrics_collector: AIMetricsCollector):
self.metrics_collector = metrics_collector
self.alert_rules = {}
self.active_alerts = {}
self.alert_history = deque(maxlen=1000)
self.lock = threading.Lock()
# Set up default alert rules
self._setup_default_alerts()
def _setup_default_alerts(self):
"""Set up default alerting rules for AI systems"""
self.add_alert_rule(
"high_error_rate",
metric_name="ai_error_count",
threshold=0.1, # 10% error rate
comparison="greater_than",
severity=AlertSeverity.HIGH,
description="AI error rate is too high"
)
self.add_alert_rule(
"low_confidence_score",
metric_name="ai_confidence_score",
threshold=0.5,
comparison="less_than",
severity=AlertSeverity.MEDIUM,
description="AI confidence scores are low"
)
self.add_alert_rule(
"high_response_time",
metric_name="ai_request_duration",
threshold=10.0, # 10 seconds
comparison="greater_than",
severity=AlertSeverity.HIGH,
description="AI response times are too high"
)
self.add_alert_rule(
"low_success_rate",
metric_name="ai_request_count",
threshold=0.95, # 95% success rate
comparison="less_than",
severity=AlertSeverity.CRITICAL,
description="AI success rate is below threshold"
)
def add_alert_rule(self,
rule_name: str,
metric_name: str,
threshold: float,
comparison: str,
severity: AlertSeverity,
description: str):
"""Add a new alert rule"""
self.alert_rules[rule_name] = {
"metric_name": metric_name,
"threshold": threshold,
"comparison": comparison,
"severity": severity,
"description": description,
"enabled": True
}
async def check_alerts(self):
"""Check all alert rules and trigger alerts if necessary"""
for rule_name, rule in self.alert_rules.items():
if not rule["enabled"]:
continue
try:
await self._check_single_rule(rule_name, rule)
except Exception as e:
print(f"Error checking alert rule {rule_name}: {e}")
async def _check_single_rule(self, rule_name: str, rule: Dict[str, Any]):
"""Check a single alert rule"""
metric_name = rule["metric_name"]
threshold = rule["threshold"]
comparison = rule["comparison"]
# Get current metric value
if metric_name == "ai_error_count":
current_value = self._calculate_error_rate()
elif metric_name == "ai_request_count":
current_value = self._calculate_success_rate()
else:
summary = self.metrics_collector.get_metric_summary(metric_name, time_window_minutes=5)
if summary["count"] == 0:
return
current_value = summary["mean"]
# Check if alert should trigger
should_alert = False
if comparison == "greater_than" and current_value > threshold:
should_alert = True
elif comparison == "less_than" and current_value < threshold:
should_alert = True
elif comparison == "equals" and abs(current_value - threshold) < 0.001:
should_alert = True
# Handle alert state
if should_alert:
if rule_name not in self.active_alerts:
await self._trigger_alert(rule_name, rule, current_value)
else:
if rule_name in self.active_alerts:
await self._resolve_alert(rule_name)
async def _trigger_alert(self, rule_name: str, rule: Dict[str, Any], current_value: float):
"""Trigger a new alert"""
alert = Alert(
id=str(uuid.uuid4()),
name=rule_name,
severity=rule["severity"],
message=f"{rule['description']} (current: {current_value:.3f}, threshold: {rule['threshold']:.3f})",
timestamp=datetime.now(),
metric_name=rule["metric_name"],
threshold_value=rule["threshold"],
actual_value=current_value
)
with self.lock:
self.active_alerts[rule_name] = alert
self.alert_history.append(alert)
# Send alert notification (in production, this would integrate with your alerting system)
await self._send_alert_notification(alert)
async def _resolve_alert(self, rule_name: str):
"""Resolve an active alert"""
with self.lock:
alert = self.active_alerts.pop(rule_name, None)
if alert:
print(f"RESOLVED: Alert {rule_name} has been resolved")
async def _send_alert_notification(self, alert: Alert):
"""Send alert notification"""
severity_emoji = {
AlertSeverity.LOW: "ℹ️",
AlertSeverity.MEDIUM: "⚠️",
AlertSeverity.HIGH: "🚨",
AlertSeverity.CRITICAL: "🔥"
}
emoji = severity_emoji.get(alert.severity, "❓")
print(f"{emoji} ALERT [{alert.severity.value.upper()}]: {alert.message}")
print(f" Alert ID: {alert.id}")
print(f" Timestamp: {alert.timestamp}")
print(f" Metric: {alert.metric_name}")
def _calculate_error_rate(self) -> float:
"""Calculate current error rate"""
total_requests = len(self.metrics_collector.metrics["ai_request_count"])
error_requests = len(self.metrics_collector.metrics["ai_error_count"])
return error_requests / total_requests if total_requests > 0 else 0
def _calculate_success_rate(self) -> float:
"""Calculate current success rate"""
return 1.0 - self._calculate_error_rate()
def get_active_alerts(self) -> List[Alert]:
"""Get all active alerts"""
with self.lock:
return list(self.active_alerts.values())
class AIMonitoringDashboard:
def __init__(self,
metrics_collector: AIMetricsCollector,
logger: AILogger,
tracer: AITracer,
alert_manager: AIAlertManager):
self.metrics_collector = metrics_collector
self.logger = logger
self.tracer = tracer
self.alert_manager = alert_manager
def get_dashboard_data(self) -> Dict[str, Any]:
"""Get comprehensive dashboard data"""
# Get health metrics
health_metrics = self.metrics_collector.get_ai_health_metrics()
# Get recent metrics summaries
metric_summaries = {}
for metric_name in ["ai_request_duration", "ai_confidence_score", "ai_output_length"]:
metric_summaries[metric_name] = self.metrics_collector.get_metric_summary(metric_name)
# Get recent logs
recent_logs = self.logger.get_recent_logs(50)
error_logs = self.logger.get_recent_logs(20, "ERROR")
# Get active alerts
active_alerts = self.alert_manager.get_active_alerts()
# Get recent traces
recent_traces = list(self.tracer.completed_traces)[-10:]
return {
"health_metrics": health_metrics,
"metric_summaries": metric_summaries,
"recent_logs": [
{
"timestamp": log.timestamp.isoformat(),
"level": log.level,
"message": log.message,
"component": log.component,
"request_id": log.request_id
}
for log in recent_logs
],
"error_logs": [
{
"timestamp": log.timestamp.isoformat(),
"message": log.message,
"request_id": log.request_id,
"metadata": log.metadata
}
for log in error_logs
],
"active_alerts": [
{
"id": alert.id,
"name": alert.name,
"severity": alert.severity.value,
"message": alert.message,
"timestamp": alert.timestamp.isoformat()
}
for alert in active_alerts
],
"recent_traces": [
{
"trace_id": trace.trace_id,
"operation": trace.operation_name,
"duration": (trace.end_time - trace.start_time).total_seconds() if trace.end_time else None,
"tags": trace.tags
}
for trace in recent_traces
]
}
def print_dashboard(self):
"""Print a text-based dashboard"""
data = self.get_dashboard_data()
print("=" * 80)
print("AI SYSTEM MONITORING DASHBOARD")
print("=" * 80)
print()
# Health metrics
health = data["health_metrics"]
print(f"SYSTEM HEALTH SCORE: {health['health_score']:.1f}/100")
print(f"Success Rate: {health['success_rate']:.2%}")
print(f"Error Rate: {health['error_rate']:.2%}")
print(f"Avg Confidence: {health['average_confidence_score']:.3f}")
print(f"Avg Response Time: {health['average_response_time']:.3f}s")
print(f"Total Requests: {health['total_requests']}")
print()
# Active alerts
if data["active_alerts"]:
print("ACTIVE ALERTS:")
for alert in data["active_alerts"]:
print(f" 🚨 [{alert['severity'].upper()}] {alert['name']}: {alert['message']}")
print()
# Recent errors
if data["error_logs"]:
print("RECENT ERRORS:")
for error in data["error_logs"][-5:]:
print(f" ❌ {error['timestamp']}: {error['message']}")
print()
# Performance metrics
print("PERFORMANCE METRICS (last hour):")
for metric_name, summary in data["metric_summaries"].items():
if summary["count"] > 0:
print(f" {metric_name}:")
print(f" Mean: {summary['mean']:.3f}, P95: {summary['p95']:.3f}, P99: {summary['p99']:.3f}")
print()
# Example usage and integration
class MonitoredAIService:
def __init__(self):
self.metrics_collector = AIMetricsCollector()
self.logger = AILogger("AIService")
self.tracer = AITracer()
self.alert_manager = AIAlertManager(self.metrics_collector)
self.dashboard = AIMonitoringDashboard(
self.metrics_collector,
self.logger,
self.tracer,
self.alert_manager
)
# Start background alert checking
self._start_alert_monitoring()
async def generate_text_with_monitoring(self, prompt: str, parameters: Dict[str, Any], user_id: str = None) -> str:
"""Generate text with comprehensive monitoring"""
request_id = str(uuid.uuid4())
operation_type = "text_generation"
# Start tracing
trace_id = str(uuid.uuid4())
span_id = self.tracer.start_span("ai_text_generation", trace_id=trace_id)
# Log request start
self.logger.log_ai_request_start(
request_id=request_id,
operation_type=operation_type,
user_id=user_id,
prompt_length=len(prompt),
parameters=parameters
)
start_time = time.time()
try:
# Add span log
self.tracer.add_span_log(span_id, "Starting AI text generation", prompt_length=len(prompt))
# Simulate AI processing (replace with actual AI service call)
await asyncio.sleep(random.uniform(0.5, 2.0)) # Simulate variable processing time
# Simulate occasional failures
if random.random() < 0.05: # 5% failure rate
raise Exception("Simulated AI service error")
# Generate mock response
response = f"Generated response for: {prompt[:50]}..."
confidence_score = random.uniform(0.7, 0.95)
token_usage = len(prompt.split()) + len(response.split())
cost = token_usage * 0.0001 # Mock cost calculation
duration = time.time() - start_time
# Record metrics
self.metrics_collector.record_ai_request(
operation_type=operation_type,
duration=duration,
success=True,
confidence_score=confidence_score,
output_length=len(response),
token_usage=token_usage,
cost=cost,
model_name="mock-model-v1"
)
# Log completion
self.logger.log_ai_request_complete(
request_id=request_id,
operation_type=operation_type,
duration=duration,
success=True,
confidence_score=confidence_score,
output_length=len(response)
)
# Log model performance
self.logger.log_model_performance(
model_name="mock-model-v1",
confidence_score=confidence_score,
processing_time=duration
)
# Finish span
self.tracer.finish_span(span_id, tags={
"operation_type": operation_type,
"success": True,
"confidence_score": confidence_score,
"output_length": len(response)
})
return response
except Exception as e:
duration = time.time() - start_time
# Record error metrics
self.metrics_collector.record_ai_request(
operation_type=operation_type,
duration=duration,
success=False,
model_name="mock-model-v1"
)
# Log error
self.logger.log_ai_error(
request_id=request_id,
operation_type=operation_type,
error=e
)
# Finish span with error
self.tracer.finish_span(span_id, error=e)
raise e
def _start_alert_monitoring(self):
"""Start background alert monitoring"""
async def alert_loop():
while True:
try:
await self.alert_manager.check_alerts()
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Error in alert monitoring: {e}")
await asyncio.sleep(60)
# Start the alert monitoring loop
asyncio.create_task(alert_loop())
def get_monitoring_dashboard(self):
"""Get monitoring dashboard"""
return self.dashboard.get_dashboard_data()
def print_dashboard(self):
"""Print monitoring dashboard"""
self.dashboard.print_dashboard()
# Example usage
async def example_monitoring_usage():
"""Example of how to use the AI monitoring system"""
# Create monitored AI service
ai_service = MonitoredAIService()
# Simulate multiple requests
print("Simulating AI requests with monitoring...")
for i in range(20):
try:
result = await ai_service.generate_text_with_monitoring(
f"Test prompt {i}",
{"temperature": 0.7},
user_id=f"user_{i % 5}"
)
print(f"Request {i}: Success")
except Exception as e:
print(f"Request {i}: Failed - {e}")
# Brief pause between requests
await asyncio.sleep(0.1)
# Wait a bit for metrics to accumulate
await asyncio.sleep(2)
# Print dashboard
print("\n" + "="*80)
ai_service.print_dashboard()
return ai_service
# Uncomment to run the example
# asyncio.run(example_monitoring_usage())
```
This comprehensive monitoring and observability implementation provides deep insights into AI system behavior. The metrics collector tracks AI-specific performance indicators and health metrics. The logger captures detailed events with proper context. The tracer provides request flow visibility through complex AI pipelines. The alert manager proactively identifies issues before they impact users. Together, these components enable effective monitoring and troubleshooting of AI-integrated systems in production environments.
Conclusion and Best Practices Summary
The systematic design of software systems that integrate AI and GenAI functionality requires a fundamental shift in how we approach software architecture. Throughout this article, we have explored the essential patterns, principles, and practices that enable the creation of robust, scalable, and maintainable AI-integrated systems.
The foundation of successful AI integration lies in treating AI components as first-class citizens in our architecture while maintaining clear separation of concerns. This approach allows us to build systems that can evolve with the rapidly changing AI landscape while preserving the stability and reliability that users expect from production software systems.
The architectural patterns we have discussed provide proven solutions to common challenges in AI integration. The Adapter Pattern enables seamless integration with multiple AI providers, allowing systems to switch between different models and services without affecting core business logic. The Strategy Pattern provides intelligent service selection based on runtime conditions, optimizing for cost, performance, or reliability as needed. The AI Service Layer Pattern creates a stable abstraction that shields application logic from the complexities of AI operations.
Data pipeline architecture represents a critical component of AI-integrated systems, requiring specialized approaches to handle the unique characteristics of AI workloads. The pipeline must support immutable data transformations, comprehensive validation, and feature engineering while maintaining auditability and reproducibility. The implementation we explored demonstrates how to build pipelines that can handle both batch and streaming data processing, with proper error handling and recovery mechanisms.
Error handling and resilience patterns become even more critical in AI systems due to the probabilistic nature of AI operations and the potential for various failure modes. The Circuit Breaker Pattern prevents cascading failures when AI services become unavailable, while intelligent retry strategies handle transient errors appropriately. Fallback mechanisms ensure graceful degradation when primary AI services are unavailable, maintaining system functionality even under adverse conditions.
Security and privacy considerations in AI systems extend beyond traditional application security to address AI-specific vulnerabilities such as prompt injection attacks and data leakage through model outputs. The comprehensive security framework we implemented demonstrates how to protect both the AI system and user data through input sanitization, PII detection, access control, and output validation. These security measures must be integrated throughout the AI pipeline rather than treated as an afterthought.
Testing strategies for AI-integrated systems must account for the non-deterministic nature of AI outputs while ensuring system reliability and performance. The testing framework we developed addresses multiple layers of functionality, from unit tests for individual components to integration tests that verify the entire AI pipeline. Performance testing and robustness testing ensure that systems can handle production loads and various failure scenarios.
Monitoring and observability in AI systems require specialized approaches that track not only traditional system metrics but also AI-specific indicators such as model performance, confidence scores, and output quality. The comprehensive monitoring implementation provides the visibility needed to detect issues early and maintain system health in production environments.
Several key best practices emerge from our exploration of AI system design.
First, embrace the probabilistic nature of AI systems rather than fighting against it. Design your architecture to handle variability in AI outputs and response times.
Second, implement comprehensive error handling and fallback mechanisms from the beginning rather than adding them as an afterthought. AI systems will fail in ways that traditional systems do not, and your architecture must be prepared for these failures.
Third, invest heavily in monitoring and observability. AI systems can degrade gradually in ways that are not immediately apparent through traditional monitoring. Implement metrics that track AI-specific performance indicators and set up alerting that can detect subtle changes in system behavior.
Fourth, design for flexibility and evolution. The AI landscape changes rapidly, and your architecture must be able to accommodate new models, providers, and techniques without requiring fundamental restructuring.
Fifth, prioritize security and privacy throughout the system design. AI systems often handle sensitive data and can inadvertently leak information through their outputs. Implement comprehensive security measures that address both traditional security concerns and AI-specific vulnerabilities. Sixth, maintain clear separation between AI logic and business logic. This separation enables independent evolution of each component and makes the system easier to test and maintain.
Seventh, implement comprehensive data validation and quality checks throughout your AI pipeline. Poor data quality leads to poor AI performance, and issues can compound as data flows through the system. Eighth, design for cost optimization from the beginning. AI operations can be expensive, and costs can quickly spiral out of control without proper monitoring and optimization.
The future of AI-integrated systems will likely see even greater complexity as AI capabilities continue to advance. Multi-modal AI systems that combine text, image, and audio processing will require even more sophisticated architectural approaches. Edge AI deployment will introduce new challenges around resource constraints and offline operation. Federated learning systems will require new patterns for distributed AI training and inference.
As AI systems become more prevalent and critical to business operations, the importance of sound architectural principles becomes even greater. The patterns and practices we have explored provide a solid foundation for building AI-integrated systems that can scale, evolve, and operate reliably in production environments. However, the field continues to evolve rapidly, and practitioners must remain adaptable and continue learning as new challenges and opportunities emerge.
The key to success in AI system design lies not in any single pattern or technique, but in the thoughtful application of sound engineering principles adapted to the unique characteristics of AI workloads. By treating AI integration as a first-class architectural concern and applying the patterns and practices we have discussed, software engineers can build systems that harness the power of AI while maintaining the reliability, security, and maintainability that production systems require.
The journey of integrating AI into software systems is complex and challenging, but the potential benefits are enormous. With careful planning, thoughtful architecture, and adherence to proven patterns and practices, we can build AI-integrated systems that not only meet today's requirements but are also prepared for the exciting developments that lie ahead in the field of artificial intelligence.