Introduction: The Dawn of Accessible Voice AI
The landscape of artificial intelligence has dramatically shifted in recent years, with large language models becoming increasingly sophisticated and accessible. Voice assistants, once the exclusive domain of tech giants with massive resources, can now be built using entirely open source components. This democratization of AI technology opens unprecedented opportunities for developers, researchers, and organizations to create customized voice interfaces tailored to specific needs.
Building a voice assistant involves orchestrating several complex components that must work seamlessly together. The primary challenge lies not just in implementing individual components, but in creating a cohesive system where speech recognition, language understanding, response generation, and speech synthesis operate in harmony. This article will guide you through creating such a system using only open source tools and libraries.
The open source approach offers several compelling advantages over proprietary solutions. First, it provides complete control over data privacy and security, as all processing can occur locally without sending sensitive information to external services. Second, it allows for unlimited customization and fine-tuning to meet specific requirements. Third, it eliminates ongoing costs associated with cloud-based APIs, making it economically viable for long-term deployment.
Our implementation will leverage several key open source projects. HuggingFace Transformers will provide access to state-of-the-art language models, while OpenAI's Whisper will handle speech recognition. For text-to-speech synthesis, we'll use Coqui TTS, and LangChain will orchestrate the conversation flow and memory management. The entire system will be built using Python, ensuring broad compatibility and ease of deployment.
Architecture Overview: Understanding the Voice Assistant Pipeline
A voice assistant operates through a sophisticated pipeline that transforms spoken input into meaningful responses and back to speech output. Understanding this architecture is crucial for successful implementation and optimization.
The pipeline begins with audio capture, where microphones record user speech in real-time. This raw audio data requires preprocessing to remove noise, normalize volume levels, and segment speech from silence. The preprocessed audio then flows to the speech-to-text component, which converts acoustic signals into textual representations.
Once we have text, the language model processes the user's intent and generates an appropriate response. This stage involves understanding context, maintaining conversation history, and potentially accessing external knowledge sources or APIs. The language model's textual response then moves to the text-to-speech synthesizer, which converts written words back into natural-sounding speech.
Throughout this pipeline, several supporting components ensure smooth operation. A conversation manager maintains dialogue state and context across multiple exchanges. An audio manager handles real-time streaming, buffering, and playback. Error handling and fallback mechanisms ensure graceful degradation when components fail or produce unexpected results.
The modular nature of this architecture allows for independent optimization and replacement of components. For instance, you might start with a smaller, faster language model for prototyping and later upgrade to a more capable but resource-intensive model for production deployment.
Hardware Acceleration Support: Multi-Platform Optimization
Modern voice assistants must efficiently utilize available hardware acceleration across different platforms. Supporting NVIDIA CUDA, AMD ROCm, and Apple Silicon MPS ensures optimal performance regardless of the deployment environment.
Hardware detection and automatic configuration enable seamless deployment across different systems without manual intervention. The system automatically detects available acceleration hardware and configures each component accordingly.
The hardware manager component serves as the foundation for multi-platform support. It detects the current platform architecture, identifies available acceleration devices, and selects the optimal configuration for maximum performance. This approach eliminates the need for manual configuration while ensuring that each component utilizes the best available hardware resources.
The platform detection mechanism identifies the operating system, processor architecture, and Python version to ensure compatibility with different hardware acceleration frameworks. This information guides the selection of appropriate device drivers and optimization strategies.
Device detection encompasses multiple acceleration technologies. CUDA detection verifies GPU availability and enumerates device capabilities including memory capacity and compute capability. MPS detection specifically targets Apple Silicon processors and their unified memory architecture. ROCm detection identifies AMD GPU hardware and verifies driver installation.
def _detect_devices(self):
"""Detect available acceleration devices"""
devices = ['cpu']
# Check for CUDA (NVIDIA)
if torch.cuda.is_available():
cuda_count = torch.cuda.device_count()
devices.extend([f'cuda:{i}' for i in range(cuda_count)])
print(f"CUDA devices found: {cuda_count}")
for i in range(cuda_count):
gpu_name = torch.cuda.get_device_name(i)
memory = torch.cuda.get_device_properties(i).total_memory / 1e9
print(f" GPU {i}: {gpu_name} ({memory:.1f}GB)")
# Check for MPS (Apple Silicon)
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
devices.append('mps')
print("Apple Silicon MPS acceleration available")
# Check for ROCm (AMD)
if self._check_rocm_available():
devices.append('rocm')
print("AMD ROCm acceleration detected")
return devices
The device enumeration process provides detailed information about each available acceleration option. For CUDA devices, this includes GPU model names, memory capacity, and compute capabilities. This information enables intelligent device selection based on workload requirements and available resources.
ROCm detection requires special handling due to its installation complexity and varying support across different AMD GPU generations. The detection process checks for ROCm installation paths, environment variables, and PyTorch compilation flags to determine availability.
def _check_rocm_available(self):
"""Check if ROCm is available"""
try:
# Check for ROCm installation
rocm_paths = [
'/opt/rocm',
'/usr/local/rocm',
os.path.expanduser('~/rocm')
]
for path in rocm_paths:
if os.path.exists(path):
# Try to import torch with ROCm support
if hasattr(torch.version, 'hip') and torch.version.hip is not None:
return True
# Alternative check using environment variables
if 'ROCM_PATH' in os.environ or 'HIP_PATH' in os.environ:
return True
return False
except Exception:
return False
Device selection follows a priority hierarchy based on performance characteristics and compatibility. CUDA devices receive highest priority due to their mature ecosystem and broad software support. Apple Silicon MPS provides excellent performance for Mac users with unified memory architecture. ROCm offers competitive performance for AMD GPU users, while CPU serves as the universal fallback option.
def _select_optimal_device(self):
"""Select optimal device based on availability and performance"""
# Priority order: CUDA > MPS > ROCm > CPU
if any('cuda' in device for device in self.available_devices):
# Select CUDA device with most memory
if torch.cuda.is_available():
best_gpu = 0
max_memory = 0
for i in range(torch.cuda.device_count()):
memory = torch.cuda.get_device_properties(i).total_memory
if memory > max_memory:
max_memory = memory
best_gpu = i
return f'cuda:{best_gpu}'
elif 'mps' in self.available_devices:
return 'mps'
elif 'rocm' in self.available_devices:
return 'rocm'
else:
return 'cpu'
The device configuration process applies platform-specific optimizations to maximize performance and stability. CUDA configurations enable cuDNN benchmarking and memory management optimizations. MPS configurations set fallback options for unsupported operations. ROCm configurations specify graphics version overrides for compatibility.
def configure_torch_device(self, preferred_device=None):
"""Configure PyTorch device with proper settings"""
device = preferred_device or self.optimal_device
if device.startswith('cuda'):
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False
if torch.cuda.is_available():
torch.cuda.empty_cache()
elif device == 'mps':
# Configure MPS-specific settings
if hasattr(torch.backends.mps, 'is_available') and torch.backends.mps.is_available():
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'
elif device == 'rocm':
# Configure ROCm-specific settings
os.environ['HSA_OVERRIDE_GFX_VERSION'] = os.environ.get('HSA_OVERRIDE_GFX_VERSION', '10.3.0')
return device
Memory monitoring capabilities provide insights into resource utilization across different hardware platforms. This information enables dynamic optimization and helps identify potential bottlenecks or resource constraints.
def get_memory_info(self, device=None):
"""Get memory information for specified device"""
device = device or self.optimal_device
if device.startswith('cuda') and torch.cuda.is_available():
gpu_id = int(device.split(':')[1]) if ':' in device else 0
total = torch.cuda.get_device_properties(gpu_id).total_memory / 1e9
allocated = torch.cuda.memory_allocated(gpu_id) / 1e9
cached = torch.cuda.memory_reserved(gpu_id) / 1e9
return {
'total': total,
'allocated': allocated,
'cached': cached,
'free': total - allocated
}
elif device == 'mps':
# MPS memory info is limited
return {
'total': 'Unknown',
'allocated': 'Unknown',
'cached': 'Unknown',
'free': 'Unknown'
}
else:
import psutil
memory = psutil.virtual_memory()
return {
'total': memory.total / 1e9,
'allocated': (memory.total - memory.available) / 1e9,
'cached': 0,
'free': memory.available / 1e9
}
Speech Recognition: Implementing Whisper for Robust STT
OpenAI's Whisper represents a breakthrough in open source speech recognition technology. Unlike traditional ASR systems that require extensive training on domain-specific data, Whisper demonstrates remarkable robustness across languages, accents, and audio conditions due to its training on diverse internet audio.
The enhanced Whisper implementation automatically configures itself for optimal performance across different hardware platforms while maintaining consistent functionality. Device-specific optimizations ensure maximum throughput while preserving transcription accuracy.
import whisper
import torch
import numpy as np
from typing import Optional, Dict, Any
import warnings
class EnhancedWhisperSTT:
def __init__(self, model_size="base", device="auto", hardware_manager=None):
"""
Initialize enhanced Whisper speech-to-text engine with multi-platform support
Args:
model_size: Size of Whisper model (tiny, base, small, medium, large)
device: Computing device (auto, cpu, cuda, mps, rocm)
hardware_manager: HardwareManager instance for device configuration
"""
self.hardware_manager = hardware_manager or HardwareManager()
if device == "auto":
self.device = self.hardware_manager.configure_torch_device()
else:
self.device = self.hardware_manager.configure_torch_device(device)
print(f"Loading Whisper {model_size} model on {self.device}")
# Configure device-specific settings
self._configure_device_settings()
# Load model with device-specific optimizations
try:
self.model = whisper.load_model(model_size, device=self._get_whisper_device())
print(f"Whisper model loaded successfully on {self.device}")
except Exception as e:
print(f"Error loading Whisper model on {self.device}: {e}")
print("Falling back to CPU...")
self.device = "cpu"
self.model = whisper.load_model(model_size, device="cpu")
# Model configuration
self.model_size = model_size
self.sample_rate = 16000
Device-specific configuration optimizes performance characteristics for each hardware platform. CUDA configurations enable cuDNN benchmarking for faster convolution operations. MPS configurations set fallback options for operations not yet supported by Apple's Metal Performance Shaders. ROCm configurations specify graphics version overrides for AMD GPU compatibility.
def _configure_device_settings(self):
"""Configure device-specific settings for optimal performance"""
if self.device.startswith('cuda'):
# CUDA-specific optimizations
torch.backends.cudnn.benchmark = True
if torch.cuda.is_available():
torch.cuda.empty_cache()
elif self.device == 'mps':
# MPS-specific optimizations
# Disable some operations that might not be supported
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'
elif self.device == 'rocm':
# ROCm-specific optimizations
os.environ['HSA_OVERRIDE_GFX_VERSION'] = os.environ.get('HSA_OVERRIDE_GFX_VERSION', '10.3.0')
Whisper device compatibility requires careful handling due to varying support across different acceleration frameworks. While CUDA enjoys full support, MPS and ROCm may require CPU fallbacks for certain operations to ensure stability and compatibility.
def _get_whisper_device(self):
"""Get device string compatible with Whisper"""
if self.device.startswith('cuda'):
return self.device
elif self.device == 'mps':
# Whisper may not directly support MPS, use CPU as fallback
return "cpu"
elif self.device == 'rocm':
# ROCm support depends on PyTorch build
return "cpu" # Fallback to CPU for compatibility
else:
return "cpu"
The enhanced transcription method incorporates sophisticated error handling and performance optimization. Audio preprocessing ensures compatibility with Whisper's input requirements, while confidence scoring provides feedback about transcription quality.
def transcribe_audio(self, audio_path=None, audio_array=None, language=None,
temperature=0.0, best_of=5):
"""
Enhanced transcribe audio to text using Whisper with multi-platform support
Args:
audio_path: Path to audio file
audio_array: Numpy array containing audio data
language: Target language code (optional)
temperature: Sampling temperature for transcription
best_of: Number of candidates to generate
Returns:
dict: Transcription results with text and metadata
"""
try:
# Prepare transcription options
options = {
'language': language,
'temperature': temperature,
'best_of': best_of,
'fp16': self._use_fp16()
}
if audio_path:
result = self.model.transcribe(audio_path, **options)
elif audio_array is not None:
# Ensure audio is in correct format for Whisper
audio_array = self._preprocess_audio(audio_array)
result = self.model.transcribe(audio_array, **options)
else:
raise ValueError("Either audio_path or audio_array must be provided")
return {
'text': result['text'].strip(),
'language': result['language'],
'segments': result['segments'],
'confidence': self._calculate_confidence(result['segments']),
'processing_device': self.device
}
except Exception as e:
print(f"Error in speech recognition: {e}")
return {
'text': '',
'language': 'unknown',
'segments': [],
'confidence': 0.0,
'error': str(e)
}
Precision selection balances performance and accuracy based on hardware capabilities. FP16 precision provides significant speedup on modern GPUs while maintaining acceptable accuracy for most applications. Conservative fallbacks ensure stability on platforms with limited FP16 support.
def _use_fp16(self):
"""Determine if FP16 should be used based on device capabilities"""
if self.device.startswith('cuda'):
return torch.cuda.is_available()
elif self.device == 'mps':
# MPS supports FP16 but may have compatibility issues
return False # Conservative approach
elif self.device == 'rocm':
return False # Conservative approach for ROCm
else:
return False
Audio preprocessing ensures optimal input quality for Whisper transcription. Normalization prevents clipping and ensures consistent amplitude levels, while format conversion handles different input data types seamlessly.
def _preprocess_audio(self, audio_array):
"""Preprocess audio array for optimal transcription"""
# Ensure correct data type
if audio_array.dtype != np.float32:
audio_array = audio_array.astype(np.float32)
# Normalize audio to [-1, 1] range
max_val = np.max(np.abs(audio_array))
if max_val > 1.0:
audio_array = audio_array / max_val
# Ensure correct sample rate (Whisper expects 16kHz)
# Note: This is a simplified approach; proper resampling would be better
return audio_array
Confidence calculation provides quantitative feedback about transcription quality. This information enables the system to request clarification when recognition confidence falls below acceptable thresholds, improving overall user experience.
def _calculate_confidence(self, segments):
"""Calculate average confidence score from segments"""
if not segments:
return 0.0
total_confidence = sum(segment.get('avg_logprob', 0) for segment in segments)
avg_logprob = total_confidence / len(segments)
# Convert log probability to confidence score (0-1)
confidence = max(0.0, min(1.0, (avg_logprob + 1) / 2))
return confidence
Device information reporting enables monitoring and debugging of speech recognition performance across different hardware platforms. This data helps identify optimization opportunities and troubleshoot platform-specific issues.
def get_device_info(self):
"""Get information about current device configuration"""
info = {
'device': self.device,
'model_size': self.model_size,
'fp16_enabled': self._use_fp16()
}
if self.hardware_manager:
memory_info = self.hardware_manager.get_memory_info(self.device)
info['memory'] = memory_info
return info
Language Model Integration: Leveraging HuggingFace Transformers
The language model serves as the brain of our voice assistant, processing user queries and generating contextually appropriate responses. HuggingFace Transformers provides access to thousands of pre-trained models, from lightweight options suitable for edge deployment to powerful models rivaling commercial offerings.
The enhanced language model implementation provides robust support across different hardware platforms while maintaining conversation quality and performance. Automatic device configuration and memory management ensure optimal resource utilization regardless of the deployment environment.
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch
from typing import List, Dict, Optional, Union
import gc
class EnhancedConversationalLLM:
def __init__(self, model_name="microsoft/DialoGPT-medium", device="auto",
max_length=512, hardware_manager=None):
"""
Initialize enhanced conversational language model with multi-platform support
Args:
model_name: HuggingFace model identifier
device: Computing device (auto, cpu, cuda, mps, rocm)
max_length: Maximum response length in tokens
hardware_manager: HardwareManager instance for device configuration
"""
self.hardware_manager = hardware_manager or HardwareManager()
if device == "auto":
self.device = self.hardware_manager.configure_torch_device()
else:
self.device = self.hardware_manager.configure_torch_device(device)
print(f"Loading language model {model_name} on {self.device}")
# Configure model loading parameters based on device
self.model_config = self._get_model_config()
try:
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True
)
# Load model with device-specific optimizations
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=self.model_config['dtype'],
device_map=self.model_config['device_map'],
trust_remote_code=True,
low_cpu_mem_usage=True
)
# Move model to device if not using device_map
if self.model_config['device_map'] is None:
self.model = self.model.to(self.device)
print(f"Language model loaded successfully on {self.device}")
except Exception as e:
print(f"Error loading model on {self.device}: {e}")
print("Falling back to CPU...")
self.device = "cpu"
self.model_config = self._get_model_config()
self._load_model_cpu_fallback(model_name)
Model configuration adapts to hardware capabilities and constraints. CUDA configurations enable FP16 precision and automatic device mapping for multi-GPU systems. MPS configurations use FP32 precision for stability, while ROCm configurations balance performance and compatibility.
# Configure tokenizer
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.max_length = max_length
self.conversation_history = []
self.model_name = model_name
# Performance tracking
self.generation_times = []
def _get_model_config(self):
"""Get model configuration based on device capabilities"""
config = {
'dtype': torch.float32,
'device_map': None,
'use_cache': True
}
if self.device.startswith('cuda'):
config['dtype'] = torch.float16
config['device_map'] = "auto"
elif self.device == 'mps':
# MPS has limited FP16 support, use FP32 for stability
config['dtype'] = torch.float32
config['device_map'] = None
elif self.device == 'rocm':
# ROCm configuration
config['dtype'] = torch.float16
config['device_map'] = None
else: # CPU
config['dtype'] = torch.float32
config['device_map'] = None
return config
CPU fallback handling ensures system reliability when primary acceleration methods fail. The fallback process maintains full functionality while providing clear feedback about the configuration change.
def _load_model_cpu_fallback(self, model_name):
"""Load model with CPU fallback configuration"""
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float32,
device_map=None,
trust_remote_code=True
)
self.model = self.model.to("cpu")
Response generation incorporates sophisticated context management and device-specific optimizations. The system maintains conversation history while applying memory management techniques to prevent resource exhaustion during extended interactions.
def generate_response(self, user_input: str, system_prompt: Optional[str] = None,
temperature: float = 0.7, max_new_tokens: int = 150) -> str:
"""
Generate response to user input with enhanced multi-platform support
Args:
user_input: User's message
system_prompt: Optional system instruction
temperature: Sampling temperature
max_new_tokens: Maximum new tokens to generate
Returns:
str: Generated response
"""
import time
start_time = time.time()
try:
# Prepare conversation context
if system_prompt and not self.conversation_history:
self.conversation_history.append(f"System: {system_prompt}")
# Add user input to history
self.conversation_history.append(f"User: {user_input}")
# Create input text with conversation context
context = self._build_context()
context += "\nAssistant:"
# Tokenize input with device-specific handling
inputs = self._tokenize_input(context)
# Generate response with device-optimized settings
response = self._generate_with_device_optimization(
inputs, temperature, max_new_tokens
)
# Clean and format response
response = self._clean_response(response, context)
# Add to conversation history
self.conversation_history.append(f"Assistant: {response}")
# Track performance
generation_time = time.time() - start_time
self.generation_times.append(generation_time)
return response
except Exception as e:
print(f"Error generating response: {e}")
return "I apologize, but I'm having trouble processing your request right now."
Context building manages conversation memory efficiently by maintaining recent exchanges while preventing unbounded memory growth. This approach ensures coherent responses while maintaining system stability during extended conversations.
def _build_context(self):
"""Build conversation context with memory management"""
# Keep last 10 exchanges to manage memory
recent_history = self.conversation_history[-20:] # 10 exchanges = 20 messages
return "\n".join(recent_history)
Input tokenization handles device-specific requirements and optimizations. The process ensures that tokenized input reaches the appropriate device while managing memory allocation efficiently.
def _tokenize_input(self, context):
"""Tokenize input with device-specific optimizations"""
inputs = self.tokenizer.encode(
context,
return_tensors="pt",
truncation=True,
max_length=self.max_length
)
# Move to appropriate device
if self.device != "cpu":
inputs = inputs.to(self.device)
return inputs
Device-optimized generation applies platform-specific acceleration techniques while maintaining consistent output quality. CUDA configurations leverage automatic mixed precision for faster inference, while other platforms use optimized settings for their respective architectures.
def _generate_with_device_optimization(self, inputs, temperature, max_new_tokens):
"""Generate response with device-specific optimizations"""
generation_kwargs = {
'max_length': inputs.shape[1] + max_new_tokens,
'num_return_sequences': 1,
'temperature': temperature,
'do_sample': True,
'top_p': 0.9,
'pad_token_id': self.tokenizer.eos_token_id,
'attention_mask': torch.ones_like(inputs)
}
# Device-specific optimizations
if self.device.startswith('cuda'):
generation_kwargs['use_cache'] = True
elif self.device == 'mps':
# MPS-specific adjustments
generation_kwargs['use_cache'] = True
elif self.device == 'rocm':
# ROCm-specific adjustments
generation_kwargs['use_cache'] = True
# Generate with memory management
with torch.no_grad():
if self.device.startswith('cuda'):
with torch.cuda.amp.autocast(enabled=self.model_config['dtype'] == torch.float16):
outputs = self.model.generate(inputs, **generation_kwargs)
else:
outputs = self.model.generate(inputs, **generation_kwargs)
# Decode response
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
# Clean up GPU memory if needed
if self.device.startswith('cuda'):
torch.cuda.empty_cache()
return response
Response cleaning ensures that generated text is well-formatted and appropriate for speech synthesis. The cleaning process removes artifacts, handles incomplete sentences, and ensures proper punctuation for natural-sounding speech output.
def _clean_response(self, response: str, context: str) -> str:
"""Clean and format the generated response"""
# Extract only the new response part
response = response[len(context):].strip()
# Remove common artifacts
response = response.replace("User:", "").replace("Assistant:", "")
# Split on newlines and take first complete sentence
lines = response.split('\n')
cleaned_response = lines[0].strip()
# Ensure response ends properly
if cleaned_response and not cleaned_response.endswith(('.', '!', '?')):
# Find last complete sentence
for punct in ['.', '!', '?']:
if punct in cleaned_response:
cleaned_response = cleaned_response[:cleaned_response.rfind(punct) + 1]
break
return cleaned_response if cleaned_response else "I understand."
Memory management includes both conversation history clearing and device-specific cache management. This ensures that the system can recover from memory pressure situations and maintain optimal performance over extended periods.
def clear_history(self):
"""Clear conversation history and free memory"""
self.conversation_history = []
# Force garbage collection
gc.collect()
# Clear device cache if applicable
if self.device.startswith('cuda'):
torch.cuda.empty_cache()
Performance statistics tracking enables monitoring and optimization of language model performance across different hardware platforms. This data helps identify bottlenecks and guide system tuning decisions.
def get_performance_stats(self):
"""Get performance statistics"""
if not self.generation_times:
return {}
return {
'avg_generation_time': sum(self.generation_times) / len(self.generation_times),
'min_generation_time': min(self.generation_times),
'max_generation_time': max(self.generation_times),
'total_generations': len(self.generation_times),
'device': self.device,
'model_name': self.model_name
}
def get_memory_usage(self):
"""Get current memory usage"""
if self.hardware_manager:
return self.hardware_manager.get_memory_info(self.device)
return {}
Text-to-Speech Synthesis: Creating Natural Voice Output
Converting text responses back to speech requires careful attention to naturalness, clarity, and emotional expression. The enhanced TTS implementation provides consistent voice synthesis across different hardware platforms while optimizing performance for each specific environment.
Modern neural TTS systems use sophisticated models to generate human-like speech with proper intonation, emphasis, and emotional expression. The implementation handles device-specific optimizations while maintaining consistent output quality across different platforms.
import torch
import torchaudio
import numpy as np
from typing import Optional
import warnings
import os
# Suppress TTS warnings for cleaner output
warnings.filterwarnings("ignore", category=UserWarning)
class EnhancedNeuralTTS:
def __init__(self, model_name="tts_models/en/ljspeech/tacotron2-DDC",
device="auto", hardware_manager=None):
"""
Initialize enhanced neural text-to-speech system with multi-platform support
Args:
model_name: Coqui TTS model identifier
device: Computing device (auto, cpu, cuda, mps, rocm)
hardware_manager: HardwareManager instance for device configuration
"""
self.hardware_manager = hardware_manager or HardwareManager()
if device == "auto":
self.device = self.hardware_manager.configure_torch_device()
else:
self.device = self.hardware_manager.configure_torch_device(device)
print(f"Loading TTS model {model_name} on {self.device}")
# Configure TTS with device-specific settings
self._configure_tts_device()
try:
from TTS.api import TTS
# Initialize TTS with device support
self.tts = TTS(
model_name=model_name,
progress_bar=False,
gpu=self._use_gpu_acceleration()
)
print(f"TTS model loaded successfully on {self.device}")
except Exception as e:
print(f"Error loading TTS model: {e}")
print("Falling back to basic TTS configuration...")
self._setup_fallback_tts()
TTS device configuration applies platform-specific optimizations for speech synthesis. CUDA configurations enable cuDNN optimizations for faster convolution operations. MPS and ROCm configurations set appropriate fallback options for operations that may not be fully supported.
# Get model sample rate
self.sample_rate = getattr(self.tts, 'synthesizer', {}).get('output_sample_rate', 22050)
if hasattr(self.tts, 'synthesizer') and hasattr(self.tts.synthesizer, 'output_sample_rate'):
self.sample_rate = self.tts.synthesizer.output_sample_rate
else:
self.sample_rate = 22050
self.model_name = model_name
def _configure_tts_device(self):
"""Configure device-specific settings for TTS"""
if self.device.startswith('cuda'):
# CUDA-specific optimizations
torch.backends.cudnn.benchmark = True
if torch.cuda.is_available():
torch.cuda.empty_cache()
elif self.device == 'mps':
# MPS-specific optimizations
os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1'
elif self.device == 'rocm':
# ROCm-specific optimizations
os.environ['HSA_OVERRIDE_GFX_VERSION'] = os.environ.get('HSA_OVERRIDE_GFX_VERSION', '10.3.0')
GPU acceleration determination considers the capabilities and limitations of different TTS libraries across various hardware platforms. While CUDA enjoys broad support, MPS and ROCm may require CPU fallbacks for certain TTS models.
def _use_gpu_acceleration(self):
"""Determine if GPU acceleration should be used for TTS"""
if self.device.startswith('cuda'):
return torch.cuda.is_available()
elif self.device == 'mps':
# TTS library may not support MPS directly
return False
elif self.device == 'rocm':
# TTS library may not support ROCm directly
return False
else:
return False
Fallback TTS setup ensures system reliability when primary TTS models fail to load. The fallback process attempts simpler models before ultimately disabling voice output if no TTS capability can be established.
def _setup_fallback_tts(self):
"""Setup fallback TTS configuration"""
try:
from TTS.api import TTS
# Try with a simpler model
self.tts = TTS(model_name="tts_models/en/ljspeech/glow-tts", progress_bar=False, gpu=False)
except Exception:
# Ultimate fallback - this would need to be implemented with a different TTS library
print("Warning: Could not initialize any TTS model. Voice output will be disabled.")
self.tts = None
Speech synthesis incorporates advanced text preprocessing and device-specific optimizations to produce high-quality audio output. The process handles various input formats and applies normalization techniques for consistent results.
def synthesize_speech(self, text: str, output_path: Optional[str] = None,
speaker_idx: Optional[int] = None) -> np.ndarray:
"""
Convert text to speech with enhanced multi-platform support
Args:
text: Text to synthesize
output_path: Optional path to save audio file
speaker_idx: Optional speaker index for multi-speaker models
Returns:
np.ndarray: Audio waveform
"""
if self.tts is None:
print("TTS not available, returning empty audio")
return np.array([])
try:
# Preprocess text for better synthesis
processed_text = self._preprocess_text(text)
if not processed_text.strip():
return np.array([])
# Prepare synthesis arguments
synthesis_kwargs = {'text': processed_text}
if output_path:
synthesis_kwargs['file_path'] = output_path
if speaker_idx is not None:
synthesis_kwargs['speaker_idx'] = speaker_idx
# Generate speech with device-specific optimizations
wav = self._synthesize_with_device_optimization(**synthesis_kwargs)
# Process output based on device
audio_array = self._process_synthesis_output(wav, output_path)
return audio_array
except Exception as e:
print(f"Error in speech synthesis: {e}")
return np.array([])
Device-optimized synthesis applies platform-specific acceleration techniques while maintaining audio quality. CUDA synthesis can leverage automatic mixed precision where supported, while other platforms use optimized settings for their respective architectures.
def _synthesize_with_device_optimization(self, **kwargs):
"""Synthesize speech with device-specific optimizations"""
if self.device.startswith('cuda') and torch.cuda.is_available():
# CUDA optimizations
with torch.cuda.amp.autocast(enabled=False): # TTS may not support autocast
wav = self.tts.tts(**kwargs)
else:
# CPU/MPS/ROCm synthesis
wav = self.tts.tts(**kwargs)
return wav
Synthesis output processing ensures consistent audio format regardless of the underlying TTS implementation or hardware platform. The process handles various output types and applies necessary conversions for compatibility.
def _process_synthesis_output(self, wav, output_path):
"""Process synthesis output into consistent format"""
if output_path and os.path.exists(output_path):
# Load the saved file to return as array
try:
waveform, sample_rate = torchaudio.load(output_path)
return waveform.numpy().flatten()
except Exception:
# Fallback to direct wav processing
pass
# Process direct wav output
if isinstance(wav, torch.Tensor):
if self.device.startswith('cuda'):
wav = wav.cpu()
return wav.numpy().flatten()
elif isinstance(wav, np.ndarray):
return wav.flatten()
else:
return np.array(wav).flatten()
Enhanced text preprocessing improves speech synthesis quality by handling abbreviations, numbers, and special characters that might confuse TTS models. The preprocessing stage ensures that text is optimized for natural-sounding speech output.
def _preprocess_text(self, text: str) -> str:
"""Enhanced text preprocessing for better TTS output"""
# Remove or replace problematic characters
text = text.replace('\n', ' ').replace('\t', ' ')
# Handle common abbreviations
abbreviations = {
'Dr.': 'Doctor',
'Mr.': 'Mister',
'Mrs.': 'Missus',
'Ms.': 'Miss',
'Prof.': 'Professor',
'etc.': 'etcetera',
'vs.': 'versus',
'e.g.': 'for example',
'i.e.': 'that is',
'AI': 'A I',
'ML': 'M L',
'GPU': 'G P U',
'CPU': 'C P U',
'API': 'A P I'
}
for abbrev, expansion in abbreviations.items():
text = text.replace(abbrev, expansion)
# Handle numbers (enhanced implementation)
import re
# Replace simple numbers with words (0-100)
number_words = {
'0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four', '5': 'five',
'6': 'six', '7': 'seven', '8': 'eight', '9': 'nine', '10': 'ten',
'11': 'eleven', '12': 'twelve', '13': 'thirteen', '14': 'fourteen',
'15': 'fifteen', '16': 'sixteen', '17': 'seventeen', '18': 'eighteen',
'19': 'nineteen', '20': 'twenty', '30': 'thirty', '40': 'forty',
'50': 'fifty', '60': 'sixty', '70': 'seventy', '80': 'eighty',
'90': 'ninety', '100': 'one hundred'
}
for num, word in number_words.items():
text = re.sub(r'\b' + num + r'\b', word, text)
# Handle URLs and email addresses
text = re.sub(r'http[s]?://\S+', 'web link', text)
text = re.sub(r'\S+@\S+\.\S+', 'email address', text)
# Clean up multiple spaces
text = re.sub(r'\s+', ' ', text).strip()
return text
Audio file saving incorporates enhanced format support and error handling. The process ensures compatibility across different platforms while providing fallback options when primary saving methods fail.
def save_audio(self, audio_array: np.ndarray, filename: str,
sample_rate: Optional[int] = None):
"""Save audio array to file with enhanced format support"""
if sample_rate is None:
sample_rate = self.sample_rate
# Ensure audio is in correct format
if audio_array.dtype != np.float32:
audio_array = audio_array.astype(np.float32)
# Normalize audio
max_val = np.max(np.abs(audio_array))
if max_val > 1.0:
audio_array = audio_array / max_val
# Save using torchaudio with device handling
try:
tensor_audio = torch.from_numpy(audio_array).unsqueeze(0)
if self.device.startswith('cuda'):
tensor_audio = tensor_audio.cpu() # Ensure CPU for saving
torchaudio.save(filename, tensor_audio, sample_rate)
except Exception as e:
print(f"Error saving audio: {e}")
# Fallback to scipy if available
try:
from scipy.io import wavfile
# Convert to int16 for scipy
audio_int16 = (audio_array * 32767).astype(np.int16)
wavfile.write(filename, sample_rate, audio_int16)
except ImportError:
print("Could not save audio file - no suitable library available")
Device information reporting provides insights into TTS configuration and performance characteristics. This information enables monitoring and optimization of speech synthesis across different hardware platforms.
def get_device_info(self):
"""Get information about current TTS device configuration"""
info = {
'device': self.device,
'model_name': self.model_name,
'sample_rate': self.sample_rate,
'gpu_acceleration': self._use_gpu_acceleration()
}
if self.hardware_manager:
memory_info = self.hardware_manager.get_memory_info(self.device)
info['memory'] = memory_info
return info
Orchestrating Conversations with LangChain
LangChain provides powerful abstractions for building complex conversational applications that go beyond simple question-and-answer interactions. It enables sophisticated conversation management, memory systems, and integration with external tools and knowledge sources.
The conversation manager handles dialogue state, maintains context across multiple exchanges, and provides memory management capabilities. This component ensures that conversations remain coherent and contextually relevant throughout extended interactions.
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryMemory
from langchain.schema import BaseMessage, HumanMessage, AIMessage
from langchain.callbacks.base import BaseCallbackHandler
from typing import Any, Dict, List, Optional
import json
import datetime
class ConversationManager:
def __init__(self, window_size: int = 10, use_summary: bool = False):
"""
Initialize conversation management system
Args:
window_size: Number of recent exchanges to keep in memory
use_summary: Whether to use conversation summarization
"""
self.window_size = window_size
self.use_summary = use_summary
# Initialize memory system
if use_summary:
self.memory = ConversationSummaryMemory(
return_messages=True,
max_token_limit=1000
)
else:
self.memory = ConversationBufferWindowMemory(
k=window_size,
return_messages=True
)
# Conversation metadata
self.conversation_id = self._generate_conversation_id()
self.start_time = datetime.datetime.now()
self.turn_count = 0
Memory system selection balances between detailed history retention and computational efficiency. Window-based memory maintains recent exchanges in full detail, while summary-based memory compresses longer conversations into concise summaries that preserve important context.
def add_exchange(self, user_input: str, assistant_response: str, metadata: Optional[Dict] = None):
"""
Add a conversation exchange to memory
Args:
user_input: User's message
assistant_response: Assistant's response
metadata: Optional metadata about the exchange
"""
# Add to LangChain memory
self.memory.chat_memory.add_user_message(user_input)
self.memory.chat_memory.add_ai_message(assistant_response)
# Update conversation metadata
self.turn_count += 1
# Store additional metadata if provided
if metadata:
self._store_metadata(metadata)
Context retrieval provides formatted conversation history for language model consumption. The formatting process ensures that context is presented in a consistent manner that maximizes language model comprehension and response quality.
def get_conversation_context(self, max_tokens: Optional[int] = None) -> str:
"""
Get formatted conversation context for language model
Args:
max_tokens: Maximum tokens to include in context
Returns:
str: Formatted conversation history
"""
messages = self.memory.chat_memory.messages
if not messages:
return ""
# Format messages for context
context_parts = []
token_count = 0
for message in reversed(messages):
if isinstance(message, HumanMessage):
formatted = f"User: {message.content}"
elif isinstance(message, AIMessage):
formatted = f"Assistant: {message.content}"
else:
continue
# Rough token estimation (4 chars per token)
estimated_tokens = len(formatted) // 4
if max_tokens and token_count + estimated_tokens > max_tokens:
break
context_parts.append(formatted)
token_count += estimated_tokens
# Reverse to get chronological order
context_parts.reverse()
return "\n".join(context_parts)
Recent context extraction provides access to the most recent conversation exchanges for applications that need detailed information about immediate dialogue history. This capability supports features like conversation analysis and context-aware responses.
def get_recent_context(self, num_exchanges: int = 3) -> List[Dict]:
"""
Get recent conversation exchanges
Args:
num_exchanges: Number of recent exchanges to retrieve
Returns:
List[Dict]: Recent conversation exchanges
"""
messages = self.memory.chat_memory.messages
exchanges = []
# Group messages into exchanges (user + assistant pairs)
for i in range(0, len(messages) - 1, 2):
if i + 1 < len(messages):
user_msg = messages[i]
ai_msg = messages[i + 1]
if isinstance(user_msg, HumanMessage) and isinstance(ai_msg, AIMessage):
exchanges.append({
'user': user_msg.content,
'assistant': ai_msg.content,
'timestamp': getattr(user_msg, 'timestamp', None)
})
# Return most recent exchanges
return exchanges[-num_exchanges:] if exchanges else []
Memory management includes both conversation clearing and metadata tracking. The system maintains conversation statistics and provides summary information for analytics and debugging purposes.
def clear_memory(self):
"""Clear conversation memory"""
self.memory.clear()
self.turn_count = 0
self.start_time = datetime.datetime.now()
self.conversation_id = self._generate_conversation_id()
def get_conversation_summary(self) -> Dict:
"""Get summary of current conversation"""
return {
'conversation_id': self.conversation_id,
'start_time': self.start_time.isoformat(),
'duration_minutes': (datetime.datetime.now() - self.start_time).total_seconds() / 60,
'turn_count': self.turn_count,
'message_count': len(self.memory.chat_memory.messages)
}
def _generate_conversation_id(self) -> str:
"""Generate unique conversation identifier"""
import uuid
return str(uuid.uuid4())[:8]
def _store_metadata(self, metadata: Dict):
"""Store conversation metadata (placeholder for future enhancement)"""
# This could be extended to store metadata in a database
# or file system for conversation analytics
pass
Real-Time Audio Processing: Handling Streaming Audio
Real-time audio processing presents unique challenges in voice assistant implementation. The system must handle continuous audio streams, detect speech boundaries, and process audio chunks efficiently while maintaining low latency for natural conversation flow.
The audio processor manages continuous audio capture, voice activity detection, and speech segmentation. It operates in real-time while maintaining low latency and providing reliable speech boundary detection across various acoustic conditions.
import pyaudio
import numpy as np
import threading
import queue
import time
from collections import deque
import webrtcvad
class RealTimeAudioProcessor:
def __init__(self, sample_rate=16000, chunk_size=1024, channels=1):
"""
Initialize real-time audio processing system
Args:
sample_rate: Audio sample rate in Hz
chunk_size: Audio chunk size for processing
channels: Number of audio channels (1 for mono)
"""
self.sample_rate = sample_rate
self.chunk_size = chunk_size
self.channels = channels
self.format = pyaudio.paInt16
# Audio buffers and queues
self.audio_queue = queue.Queue()
self.recording_buffer = deque(maxlen=100) # Keep last 100 chunks
# Voice activity detection
self.vad = webrtcvad.Vad(2) # Aggressiveness level 0-3
# Processing state
self.is_recording = False
self.is_processing = False
self.speech_detected = False
self.silence_threshold = 30 # Chunks of silence before stopping
self.silence_counter = 0
# Initialize PyAudio
self.audio = pyaudio.PyAudio()
# Threading
self.audio_thread = None
self.processing_thread = None
self.stop_event = threading.Event()
Audio capture initialization configures the audio system for optimal real-time performance. The configuration balances latency, quality, and computational requirements to ensure responsive speech detection and processing.
def start_listening(self):
"""Start continuous audio listening"""
if self.is_recording:
return
self.is_recording = True
self.stop_event.clear()
# Start audio capture thread
self.audio_thread = threading.Thread(target=self._audio_capture_loop)
self.audio_thread.daemon = True
self.audio_thread.start()
# Start processing thread
self.processing_thread = threading.Thread(target=self._audio_processing_loop)
self.processing_thread.daemon = True
self.processing_thread.start()
print("Started listening for audio input...")
Audio capture operates in a dedicated thread to ensure continuous operation without blocking other system components. The capture loop handles audio streaming, buffering, and initial preprocessing for downstream analysis.
def _audio_capture_loop(self):
"""Main audio capture loop"""
try:
# Open audio stream
stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size
)
print(f"Audio stream opened: {self.sample_rate}Hz, {self.chunk_size} samples/chunk")
while self.is_recording and not self.stop_event.is_set():
try:
# Read audio data
data = stream.read(self.chunk_size, exception_on_overflow=False)
# Convert to numpy array
audio_chunk = np.frombuffer(data, dtype=np.int16)
# Add to processing queue
if not self.audio_queue.full():
self.audio_queue.put(audio_chunk)
except Exception as e:
print(f"Error reading audio: {e}")
break
# Clean up
stream.stop_stream()
stream.close()
except Exception as e:
print(f"Error in audio capture: {e}")
Audio processing operates independently from capture to prevent blocking and ensure real-time performance. The processing loop handles voice activity detection, speech segmentation, and utterance completion detection.
def _audio_processing_loop(self):
"""Main audio processing loop"""
while self.is_recording and not self.stop_event.is_set():
try:
# Get audio chunk with timeout
audio_chunk = self.audio_queue.get(timeout=0.1)
# Process audio chunk
self._process_audio_chunk(audio_chunk)
except queue.Empty:
continue
except Exception as e:
print(f"Error processing audio: {e}")
Voice activity detection uses WebRTC VAD for robust speech detection across various acoustic conditions. The system handles different frame sizes and provides fallback detection methods for enhanced reliability.
def _process_audio_chunk(self, audio_chunk):
"""Process individual audio chunk"""
# Add to recording buffer
self.recording_buffer.append(audio_chunk)
# Voice activity detection
is_speech = self._detect_speech(audio_chunk)
if is_speech:
if not self.speech_detected:
print("Speech detected, starting recording...")
self.speech_detected = True
self.silence_counter = 0
else:
if self.speech_detected:
self.silence_counter += 1
# Check if we've had enough silence to stop recording
if self.silence_counter >= self.silence_threshold:
print("Speech ended, processing audio...")
self._process_complete_utterance()
self.speech_detected = False
self.silence_counter = 0
def _detect_speech(self, audio_chunk):
"""Detect speech in audio chunk using WebRTC VAD"""
try:
# Convert to bytes for VAD
audio_bytes = audio_chunk.tobytes()
# WebRTC VAD requires specific frame sizes
# For 16kHz: 160, 320, or 480 samples (10ms, 20ms, 30ms)
frame_size = 320 # 20ms at 16kHz
if len(audio_chunk) >= frame_size:
frame = audio_chunk[:frame_size].tobytes()
return self.vad.is_speech(frame, self.sample_rate)
return False
except Exception as e:
# Fallback to simple energy-based detection
return self._simple_speech_detection(audio_chunk)
Fallback speech detection provides reliability when WebRTC VAD encounters issues or unsupported audio formats. The energy-based approach offers basic speech detection capabilities for system resilience.
def _simple_speech_detection(self, audio_chunk):
"""Simple energy-based speech detection fallback"""
# Calculate RMS energy
rms = np.sqrt(np.mean(audio_chunk.astype(np.float32) ** 2))
# Simple threshold-based detection
return rms > 500 # Adjust threshold based on your environment
Utterance completion processing combines audio chunks into complete speech segments for transcription. The system manages buffer contents and triggers callbacks for downstream processing components.
def _process_complete_utterance(self):
"""Process complete speech utterance"""
if len(self.recording_buffer) < 5: # Too short to be meaningful
return
# Combine audio chunks
complete_audio = np.concatenate(list(self.recording_buffer))
# Clear buffer for next utterance
self.recording_buffer.clear()
# Trigger callback or add to processing queue
self._on_utterance_complete(complete_audio)
def _on_utterance_complete(self, audio_data):
"""Callback for complete utterance (override in subclass)"""
print(f"Complete utterance captured: {len(audio_data)} samples")
# This would typically trigger STT processing
Audio level monitoring provides feedback about input signal strength for user interface elements and system diagnostics. The monitoring system calculates real-time audio levels for display and debugging purposes.
def get_audio_levels(self):
"""Get current audio input levels for monitoring"""
if self.recording_buffer:
recent_audio = np.concatenate(list(self.recording_buffer)[-5:])
rms = np.sqrt(np.mean(recent_audio.astype(np.float32) ** 2))
return min(100, int(rms / 50)) # Scale to 0-100
return 0
System cleanup ensures proper resource management and graceful shutdown of audio processing components. The cleanup process stops all threads and releases audio system resources.
def stop_listening(self):
"""Stop audio listening"""
self.is_recording = False
self.stop_event.set()
if self.audio_thread:
self.audio_thread.join(timeout=1.0)
if self.processing_thread:
self.processing_thread.join(timeout=1.0)
print("Stopped listening for audio input.")
def cleanup(self):
"""Clean up audio resources"""
self.stop_listening()
if hasattr(self, 'audio'):
self.audio.terminate()
System Integration: Complete General Voice Assistant
The complete integration brings together all enhanced components into a cohesive general-purpose voice assistant. The system manages complex state transitions, handles errors gracefully, and provides comprehensive monitoring and debugging capabilities.
The integrated assistant supports multi-platform hardware acceleration, real-time speech processing, natural language understanding, and high-quality voice synthesis. The modular architecture enables easy customization and extension for specific use cases.
import asyncio
import threading
import time
import queue
from typing import Callable, Optional, Dict, Any
from enum import Enum
import json
class EnhancedAssistantState(Enum):
INITIALIZING = "initializing"
IDLE = "idle"
LISTENING = "listening"
PROCESSING_SPEECH = "processing_speech"
GENERATING_RESPONSE = "generating_response"
SYNTHESIZING_SPEECH = "synthesizing_speech"
SPEAKING = "speaking"
ERROR = "error"
class GeneralVoiceAssistant:
def __init__(self, config: Optional[Dict] = None):
"""
Initialize complete general voice assistant with multi-platform support
Args:
config: Configuration dictionary for customizing assistant behavior
"""
self.config = self._load_default_config()
if config:
self.config.update(config)
print("Initializing General Voice Assistant...")
print("=" * 60)
# Initialize hardware manager
self.hardware_manager = HardwareManager()
# Initialize all components with hardware optimization
self._initialize_components()
# System state
self.state = EnhancedAssistantState.INITIALIZING
self.is_running = False
self.conversation_active = False
# Callbacks
self.callbacks = {
'on_state_change': [],
'on_user_speech': [],
'on_assistant_response': [],
'on_error': [],
'on_audio_level': []
}
# Performance monitoring
self.performance_stats = {
'total_interactions': 0,
'successful_interactions': 0,
'error_count': 0,
'response_times': [],
'start_time': time.time()
}
# Audio processing queue
self.audio_processing_queue = queue.Queue(maxsize=10)
print("General Voice Assistant initialized successfully!")
print("=" * 60)
self._set_state(EnhancedAssistantState.IDLE)
Configuration management provides flexible customization of assistant behavior while maintaining sensible defaults. The configuration system supports model selection, performance tuning, and feature enabling across different deployment scenarios.
def _load_default_config(self):
"""Load default configuration"""
return {
'stt_model': 'base',
'llm_model': 'microsoft/DialoGPT-medium',
'tts_model': 'tts_models/en/ljspeech/tacotron2-DDC',
'max_conversation_length': 20,
'response_timeout': 30.0,
'audio_sample_rate': 16000,
'enable_voice_output': True,
'conversation_memory': True,
'system_prompt': """You are a helpful, friendly, and knowledgeable AI assistant.
You provide clear, concise, and accurate responses to user questions.
Keep your responses conversational and under 100 words when possible."""
}
Component initialization orchestrates the setup of all assistant subsystems with proper hardware optimization and error handling. The initialization process ensures that each component is configured for optimal performance on the available hardware.
def _initialize_components(self):
"""Initialize all assistant components with hardware optimization"""
print("Initializing components...")
# Speech-to-Text
print("Loading speech recognition...")
self.stt = EnhancedWhisperSTT(
model_size=self.config['stt_model'],
hardware_manager=self.hardware_manager
)
# Language Model
print("Loading language model...")
self.llm = EnhancedConversationalLLM(
model_name=self.config['llm_model'],
hardware_manager=self.hardware_manager
)
# Text-to-Speech
if self.config['enable_voice_output']:
print("Loading text-to-speech...")
self.tts = EnhancedNeuralTTS(
model_name=self.config['tts_model'],
hardware_manager=self.hardware_manager
)
else:
self.tts = None
print("Voice output disabled")
# Conversation Management
if self.config['conversation_memory']:
print("Initializing conversation management...")
self.conversation = ConversationManager(
window_size=self.config['max_conversation_length']
)
else:
self.conversation = None
# Audio Processing
print("Initializing audio processing...")
self.audio = RealTimeAudioProcessor(
sample_rate=self.config['audio_sample_rate']
)
# Configure audio processor callback
self.audio._on_utterance_complete = self._handle_audio_input
print("All components initialized successfully!")
System startup manages the transition from initialization to active operation. The startup process configures audio processing, initializes conversation context, and prepares the system for user interaction.
def start(self):
"""Start the general voice assistant"""
if self.is_running:
print("Assistant is already running")
return
print("\nStarting General Voice Assistant...")
print("=" * 60)
print("CAPABILITIES:")
print("- General conversation and Q&A")
print("- Multi-platform hardware acceleration")
print("- Real-time speech recognition")
print("- Natural language understanding")
print("- Voice synthesis and output")
print("- Conversation memory and context")
print("\nSay 'Hello' or ask any question to begin!")
print("Press Ctrl+C to stop")
print("=" * 60)
self.is_running = True
self.performance_stats['start_time'] = time.time()
# Add system prompt to conversation
if self.conversation and self.config['system_prompt']:
self.llm.generate_response("", self.config['system_prompt'])
# Start audio processing
self._set_state(EnhancedAssistantState.IDLE)
self.audio.start_listening()
# Start background processing thread
self.processing_thread = threading.Thread(target=self._background_processing_loop)
self.processing_thread.daemon = True
self.processing_thread.start()
print("Voice assistant is ready and listening!")
Audio input handling manages the flow from speech detection to processing queue. The system uses asynchronous processing to maintain responsiveness while handling complex speech recognition and response generation tasks.
def _handle_audio_input(self, audio_data):
"""Handle complete audio utterance"""
if not self.is_running or self.state not in [EnhancedAssistantState.IDLE]:
return
# Add to processing queue
try:
self.audio_processing_queue.put(audio_data, block=False)
except queue.Full:
print("Audio processing queue full, dropping audio")
Background processing manages the complete voice assistant pipeline from speech recognition through response synthesis. The processing loop operates independently to maintain system responsiveness during computationally intensive operations.
def _background_processing_loop(self):
"""Background processing loop for audio input"""
while self.is_running:
try:
# Get audio data with timeout
audio_data = self.audio_processing_queue.get(timeout=1.0)
# Process the audio input
self._process_user_input(audio_data)
except queue.Empty:
continue
except Exception as e:
print(f"Error in background processing: {e}")
self._handle_error(e)
User input processing orchestrates the complete pipeline from speech recognition through response generation and synthesis. The process includes comprehensive error handling, performance monitoring, and state management.
def _process_user_input(self, audio_data):
"""Process user input through the complete pipeline"""
start_time = time.time()
self.performance_stats['total_interactions'] += 1
try:
# Step 1: Speech to Text
self._set_state(EnhancedAssistantState.PROCESSING_SPEECH)
print("\n[PROCESSING] Converting speech to text...")
stt_result = self.stt.transcribe_audio(audio_array=audio_data)
if not stt_result['text']:
print("[INFO] No speech detected or transcription failed")
self._set_state(EnhancedAssistantState.IDLE)
return
user_text = stt_result['text']
print(f"[USER] {user_text}")
print(f"[INFO] Confidence: {stt_result['confidence']:.2f}")
# Trigger callbacks
self._trigger_callbacks('on_user_speech', user_text, stt_result)
# Step 2: Generate Response
self._set_state(EnhancedAssistantState.GENERATING_RESPONSE)
print("[PROCESSING] Generating response...")
response_text = self.llm.generate_response(user_text)
if not response_text:
response_text = "I'm sorry, I didn't understand that. Could you please repeat?"
print(f"[ASSISTANT] {response_text}")
# Step 3: Add to conversation history
if self.conversation:
self.conversation.add_exchange(
user_text,
response_text,
{
'stt_confidence': stt_result['confidence'],
'processing_time': time.time() - start_time,
'device_info': self._get_device_summary()
}
)
# Step 4: Text to Speech (if enabled)
if self.config['enable_voice_output'] and self.tts:
self._set_state(EnhancedAssistantState.SYNTHESIZING_SPEECH)
print("[PROCESSING] Converting response to speech...")
audio_response = self.tts.synthesize_speech(response_text)
if len(audio_response) > 0:
self._set_state(EnhancedAssistantState.SPEAKING)
print("[PLAYING] Speaking response...")
# Play audio response
self._play_audio_response(audio_response)
# Trigger callbacks
self._trigger_callbacks('on_assistant_response', response_text, audio_response)
else:
print("[WARNING] TTS synthesis failed")
# Still trigger callback with text-only response
self._trigger_callbacks('on_assistant_response', response_text, None)
else:
# Text-only mode
self._trigger_callbacks('on_assistant_response', response_text, None)
# Record performance metrics
total_time = time.time() - start_time
self.performance_stats['response_times'].append(total_time)
self.performance_stats['successful_interactions'] += 1
print(f"[INFO] Total response time: {total_time:.2f} seconds")
print("-" * 60)
except Exception as e:
print(f"[ERROR] Error processing user input: {e}")
self.performance_stats['error_count'] += 1
self._handle_error(e)
finally:
# Return to idle state
self._set_state(EnhancedAssistantState.IDLE)
Audio playback handles voice output with enhanced error handling and fallback mechanisms. The playback system ensures consistent audio output across different platforms while providing graceful degradation when audio hardware issues occur.
def _play_audio_response(self, audio_data):
"""Play audio response to user with enhanced error handling"""
try:
import sounddevice as sd
# Ensure audio is in correct format
if audio_data.dtype != np.float32:
audio_data = audio_data.astype(np.float32)
# Normalize audio
max_val = np.max(np.abs(audio_data))
if max_val > 1.0:
audio_data = audio_data / max_val
# Play audio with device-specific settings
sample_rate = self.tts.sample_rate if self.tts else 22050
sd.play(audio_data, samplerate=sample_rate)
sd.wait() # Wait until playback is finished
except Exception as e:
print(f"[ERROR] Error playing audio: {e}")
# Fallback: save to file and notify user
try:
if self.tts:
self.tts.save_audio(audio_data, "last_response.wav")
print("[INFO] Audio response saved to last_response.wav")
except Exception:
print("[WARNING] Could not save audio response")
State management provides clear tracking of system status and enables proper coordination between different processing stages. The state system includes callback mechanisms for external monitoring and integration.
def _set_state(self, new_state: EnhancedAssistantState):
"""Update assistant state with callback triggers"""
if self.state != new_state:
old_state = self.state
self.state = new_state
state_change_msg = f"[STATE] {old_state.value} -> {new_state.value}"
if new_state in [EnhancedAssistantState.IDLE, EnhancedAssistantState.ERROR]:
print(state_change_msg)
self._trigger_callbacks('on_state_change', old_state, new_state)
Error handling includes both immediate recovery attempts and graceful degradation strategies. The error handling system attempts platform-specific recovery techniques while maintaining system stability.
def _handle_error(self, error):
"""Handle system errors with recovery attempts"""
self._set_state(EnhancedAssistantState.ERROR)
self._trigger_callbacks('on_error', error)
# Attempt recovery based on error type
if "CUDA" in str(error) or "GPU" in str(error):
print("[RECOVERY] GPU error detected, clearing cache...")
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Brief pause before returning to idle
time.sleep(1)
Callback management enables extensible event handling for logging, monitoring, and integration with external systems. The callback system provides hooks for all major system events and state transitions.
def _trigger_callbacks(self, callback_type, *args):
"""Trigger registered callbacks"""
for callback in self.callbacks.get(callback_type, []):
try:
callback(*args)
except Exception as e:
print(f"[WARNING] Callback error: {e}")
def add_callback(self, callback_type: str, callback: Callable):
"""Add callback for specific events"""
if callback_type in self.callbacks:
self.callbacks[callback_type].append(callback)
else:
print(f"[WARNING] Unknown callback type: {callback_type}")
Text input processing provides direct text interaction capabilities for testing and text-only operation modes. This functionality enables debugging and development without requiring audio hardware.
def process_text_input(self, text: str) -> str:
"""Process text input directly (for testing or text-only mode)"""
try:
print(f"[USER] {text}")
response = self.llm.generate_response(text)
if self.conversation:
self.conversation.add_exchange(text, response)
print(f"[ASSISTANT] {response}")
# Synthesize speech if enabled
if self.config['enable_voice_output'] and self.tts:
audio_response = self.tts.synthesize_speech(response)
if len(audio_response) > 0:
self._play_audio_response(audio_response)
return response
except Exception as e:
print(f"[ERROR] Error processing text input: {e}")
return "I'm sorry, I encountered an error processing your request."
System status reporting provides comprehensive information about assistant performance, hardware utilization, and operational metrics. The status system enables monitoring and optimization of system performance.
def get_system_status(self) -> Dict[str, Any]:
"""Get comprehensive system status and performance metrics"""
uptime = time.time() - self.performance_stats['start_time']
avg_response_time = (
sum(self.performance_stats['response_times'][-10:]) /
len(self.performance_stats['response_times'][-10:])
if self.performance_stats['response_times'] else 0
)
status = {
'state': self.state.value,
'is_running': self.is_running,
'uptime_seconds': uptime,
'performance': {
'total_interactions': self.performance_stats['total_interactions'],
'successful_interactions': self.performance_stats['successful_interactions'],
'error_count': self.performance_stats['error_count'],
'success_rate': (
self.performance_stats['successful_interactions'] /
max(1, self.performance_stats['total_interactions'])
),
'average_response_time': avg_response_time,
'total_responses': len(self.performance_stats['response_times'])
},
'hardware': self._get_device_summary(),
'audio_level': self.audio.get_audio_levels() if hasattr(self.audio, 'get_audio_levels') else 0
}
if self.conversation:
status['conversation'] = self.conversation.get_conversation_summary()
return status
Device information aggregation provides comprehensive hardware status across all system components. This information enables performance monitoring and troubleshooting across different hardware platforms.
def _get_device_summary(self) -> Dict[str, Any]:
"""Get summary of device information across all components"""
summary = {
'hardware_platform': self.hardware_manager.platform_info,
'optimal_device': self.hardware_manager.optimal_device
}
if hasattr(self.stt, 'get_device_info'):
summary['stt'] = self.stt.get_device_info()
if hasattr(self.llm, 'get_performance_stats'):
summary['llm'] = self.llm.get_performance_stats()
if self.tts and hasattr(self.tts, 'get_device_info'):
summary['tts'] = self.tts.get_device_info()
return summary
System shutdown manages graceful termination of all components and resources. The shutdown process ensures proper cleanup while providing session summary information for analysis and debugging.
def stop(self):
"""Stop the voice assistant"""
if not self.is_running:
return
print("\nStopping General Voice Assistant...")
self.is_running = False
# Stop audio processing
self.audio.stop_listening()
# Clean up resources
self.audio.cleanup()
# Clear model memory
if hasattr(self.llm, 'clear_history'):
self.llm.clear_history()
self._set_state(EnhancedAssistantState.IDLE)
# Print final statistics
self._print_session_summary()
print("General Voice Assistant stopped.")
def _print_session_summary(self):
"""Print session summary statistics"""
status = self.get_system_status()
print("\n" + "=" * 60)
print("SESSION SUMMARY")
print("=" * 60)
print(f"Total interactions: {status['performance']['total_interactions']}")
print(f"Successful interactions: {status['performance']['successful_interactions']}")
print(f"Success rate: {status['performance']['success_rate']:.1%}")
print(f"Average response time: {status['performance']['average_response_time']:.2f}s")
print(f"Session duration: {status['uptime_seconds']:.0f} seconds")
print(f"Hardware platform: {status['hardware']['optimal_device']}")
print("=" * 60)
Interactive testing mode provides comprehensive text-based interaction for development and debugging. The testing mode includes command handling, status reporting, and conversation management capabilities.
def test_text_mode(self):
"""Test the assistant with text input in interactive mode"""
print("\n" + "=" * 60)
print("GENERAL VOICE ASSISTANT - TEXT MODE")
print("=" * 60)
print("Type your questions or statements below.")
print("Commands:")
print(" 'quit' or 'exit' - Exit text mode")
print(" 'status' - Show system status")
print(" 'clear' - Clear conversation history")
print(" 'help' - Show available commands")
print("=" * 60)
while True:
try:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['quit', 'exit', 'bye', 'goodbye']:
print("Goodbye!")
break
if not user_input:
continue
if user_input.lower() == 'status':
status = self.get_system_status()
print(f"\nSystem Status:")
print(f" State: {status['state']}")
print(f" Interactions: {status['performance']['total_interactions']}")
print(f" Success rate: {status['performance']['success_rate']:.1%}")
print(f" Device: {status['hardware']['optimal_device']}")
continue
if user_input.lower() == 'clear':
if self.conversation:
self.conversation.clear_memory()
if hasattr(self.llm, 'clear_history'):
self.llm.clear_history()
print("Conversation history cleared.")
continue
if user_input.lower() == 'help':
print("\nThis is a general AI assistant. You can:")
print("- Ask questions about any topic")
print("- Have conversations")
print("- Request explanations")
print("- Get help with various tasks")
print("- Use voice commands (in voice mode)")
continue
# Process the input
response = self.process_text_input(user_input)
except KeyboardInterrupt:
print("\nExiting text mode...")
break
except Exception as e:
print(f"Error: {e}")
Running Example: Complete General Voice Assistant
The complete running example demonstrates the integration of all components into a functional general-purpose voice assistant. This implementation showcases multi-platform hardware support, comprehensive error handling, and extensible architecture.
# Example usage and testing
if __name__ == "__main__":
# Create configuration for the assistant
config = {
'stt_model': 'base', # Whisper model size
'llm_model': 'microsoft/DialoGPT-medium', # Language model
'tts_model': 'tts_models/en/ljspeech/tacotron2-DDC', # TTS model
'enable_voice_output': True, # Enable voice synthesis
'conversation_memory': True, # Enable conversation memory
'system_prompt': """You are a helpful, friendly, and knowledgeable AI assistant.
You provide clear, accurate, and conversational responses. Keep responses
concise but informative, typically under 100 words unless more detail is requested."""
}
# Initialize the general voice assistant
print("Initializing General Voice Assistant...")
assistant = GeneralVoiceAssistant(config)
# Add some example callbacks
def on_user_speech(text, stt_result):
# Log user speech to file or database
pass
def on_assistant_response(text, audio):
# Log assistant responses
pass
def on_error(error):
# Handle errors (logging, notifications, etc.)
print(f"Assistant error logged: {error}")
# Register callbacks
assistant.add_callback('on_user_speech', on_user_speech)
assistant.add_callback('on_assistant_response', on_assistant_response)
assistant.add_callback('on_error', on_error)
# Test in text mode first
print("\nTesting in text mode...")
assistant.test_text_mode()
# Uncomment to test voice mode
# print("\nStarting voice mode...")
# try:
# assistant.start()
#
# # Keep running until interrupted
# while True:
# time.sleep(1)
#
# # Print status every 60 seconds
# if int(time.time()) % 60 == 0:
# status = assistant.get_system_status()
# print(f"\n[STATUS] Interactions: {status['performance']['total_interactions']}, "
# f"Success rate: {status['performance']['success_rate']:.1%}, "
# f"Avg time: {status['performance']['average_response_time']:.2f}s")
#
# except KeyboardInterrupt:
# print("\nShutting down...")
# assistant.stop()
Performance Optimization and Deployment Considerations
Deploying a voice assistant in production requires careful attention to performance optimization, resource management, and scalability considerations. The enhanced implementation provides multiple optimization strategies for different deployment scenarios.
Model optimization techniques include quantization, pruning, and knowledge distillation to reduce memory usage and inference time. Hardware-specific optimizations leverage platform capabilities while maintaining compatibility across different systems.
Memory management strategies prevent resource exhaustion during extended operation. The system includes automatic garbage collection, device cache management, and conversation history pruning to maintain optimal performance over time.
Monitoring and analytics capabilities enable continuous optimization and troubleshooting. The system tracks performance metrics, error rates, and resource utilization to guide optimization efforts and identify potential issues.
Security considerations include input validation, output filtering, and resource access controls. The implementation provides hooks for security monitoring and includes safeguards against potential attacks or misuse.
Scalability features enable deployment across different system configurations from edge devices to high-performance servers. The modular architecture supports horizontal scaling and load distribution for high-throughput applications.
The future of open source voice assistants continues to evolve with advances in model efficiency, multimodal capabilities, and edge computing optimization. This implementation provides a solid foundation for incorporating future developments while maintaining system stability and compatibility.
Conclusion
This comprehensive guide demonstrates how to build sophisticated voice assistants using entirely open source components with full multi-platform hardware support. The implementation showcases automatic hardware detection and optimization for NVIDIA CUDA, AMD ROCm, and Apple Silicon MPS acceleration.
The modular architecture enables continuous improvement and customization while the complete example provides a functional general-purpose assistant. With proper attention to optimization and deployment considerations, these systems can provide robust, privacy-preserving voice interfaces suitable for a wide range of applications.
The democratization of voice AI technology through open source tools opens new possibilities for innovation, customization, and deployment across diverse domains and use cases. By understanding and implementing these techniques, developers can create voice assistants that meet specific requirements while maintaining full control over functionality, privacy, and performance characteristics.
No comments:
Post a Comment