Introduction
Agentic applications represent a paradigm shift in how we design and deploy software systems. Rather than creating monolithic applications with predefined behaviors, agentic systems consist of autonomous components that can perceive their environment, make decisions, and take actions to achieve specific goals. When deployed on Kubernetes, these systems gain powerful scaling and orchestration capabilities, enabling them to respond dynamically to changing workloads and requirements.
In this article, we'll explore how to build a sophisticated multi-agent system on Kubernetes that leverages specialized microservices and hardware-optimized llama.cpp instances as the foundation for AI capabilities. We'll cover the entire process from architecture design to deployment, with particular attention to performance optimization across different hardware platforms.
Architectural Overview
Our multi-agent Kubernetes application will follow a distributed architecture where multiple specialized agents collaborate to accomplish complex tasks. The system consists of several key components: a central agent coordinator service that manages task allocation and agent lifecycle; specialized agent microservices that handle specific domains of functionality; a shared knowledge base for persistent information; and multiple llama.cpp instances optimized for different hardware configurations.
The agents will communicate asynchronously through a message bus, allowing them to operate independently while still collaborating effectively. The system will support dynamic scaling, automatically adjusting the number of agent instances based on workload demands. Each agent will have access to a llama.cpp instance, configured and optimized for the node's hardware capabilities.
Setting Up the Kubernetes Environment
Before diving into the implementation details, we need to establish our Kubernetes environment. We'll assume familiarity with basic Kubernetes concepts, but will provide detailed explanations for more advanced configurations.
First, let's create a dedicated namespace for our agentic application. A namespace provides a scope for Kubernetes resources and helps organize components in a multi-tenant environment. The namespace definition can be created using the following YAML configuration:
This configuration creates a dedicated namespace called "agentic-system" with resource quotas to prevent the system from consuming excessive cluster resources. The namespace has limits on CPU and memory usage, as well as the maximum number of pods that can be deployed.
apiVersion: v1
kind: Namespace
metadata:
name: agentic-system
labels:
name: agentic-system
environment: production
---
apiVersion: v1
kind: ResourceQuota
metadata:
name: agentic-quota
namespace: agentic-system
spec:
hard:
pods: "50"
requests.cpu: "20"
requests.memory: 40Gi
limits.cpu: "40"
limits.memory: 80Gi
Next, we'll need to set up persistent storage for our agents' knowledge base. The following PersistentVolumeClaim will create storage that can be shared across different agent pods:
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: knowledge-base-pvc
namespace: agentic-system
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 20Gi
storageClassName: standard
This configuration creates a 20GB persistent volume that can be mounted by multiple pods simultaneously in read-write mode, which is essential for our knowledge base component.
Implementing the Agent Coordinator Service
The agent coordinator is the brain of our multi-agent system. It tracks all available agents, assigns tasks based on agent capabilities, and manages the lifecycle of agent instances. Let's implement this service using Go, which offers excellent performance and native Kubernetes client libraries.
Here's how we would implement the core functionality of the agent coordinator:
package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// Agent represents a registered agent in the system
type Agent struct {
ID string `json:"id"`
Type string `json:"type"`
Capabilities []string `json:"capabilities"`
Status string `json:"status"`
LastHeartbeat time.Time `json:"lastHeartbeat"`
PodName string `json:"podName"`
NodeName string `json:"nodeName"`
HardwareAccel string `json:"hardwareAccel,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// AgentCoordinator manages the multi-agent system
type AgentCoordinator struct {
agents map[string]*Agent
clientset *kubernetes.Clientset
namespace string
taskQueue chan Task
agentHealthCh chan AgentHealth
}
// Task represents a unit of work to be assigned to agents
type Task struct {
ID string `json:"id"`
Type string `json:"type"`
Priority int `json:"priority"`
RequiredCap []string `json:"requiredCapabilities"`
Payload map[string]interface{} `json:"payload"`
AssignedTo string `json:"assignedTo,omitempty"`
Status string `json:"status"`
CreatedAt time.Time `json:"createdAt"`
}
// AgentHealth represents a health update from an agent
type AgentHealth struct {
AgentID string `json:"agentId"`
Status string `json:"status"`
Timestamp time.Time `json:"timestamp"`
}
// NewAgentCoordinator creates a new coordinator instance
func NewAgentCoordinator() (*AgentCoordinator, error) {
// Use in-cluster config when running inside Kubernetes
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
namespace := os.Getenv("NAMESPACE")
if namespace == "" {
namespace = "agentic-system"
}
return &AgentCoordinator{
agents: make(map[string]*Agent),
clientset: clientset,
namespace: namespace,
taskQueue: make(chan Task, 100),
agentHealthCh: make(chan AgentHealth, 100),
}, nil
}
// Start begins the coordinator's main loops
func (ac *AgentCoordinator) Start() {
// Start the agent discovery process
go ac.discoverAgents()
// Start the task assignment process
go ac.assignTasks()
// Start the health monitoring process
go ac.monitorAgentHealth()
// Set up HTTP API
http.HandleFunc("/agents", ac.handleAgents)
http.HandleFunc("/tasks", ac.handleTasks)
http.HandleFunc("/health", ac.handleHealth)
log.Fatal(http.ListenAndServe(":8080", nil))
}
// discoverAgents periodically discovers agent pods in the cluster
func (ac *AgentCoordinator) discoverAgents() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
<-ticker.C
// List all pods with the "component=agent" label
pods, err := ac.clientset.CoreV1().Pods(ac.namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: "component=agent",
})
if err != nil {
log.Printf("Error discovering agents: %v", err)
continue
}
// Update agents map based on discovered pods
for _, pod := range pods.Items {
if pod.Status.Phase != "Running" {
continue
}
agentID := pod.Labels["agent-id"]
if agentID == "" {
continue
}
// Check if agent already exists in our map
if _, exists := ac.agents[agentID]; !exists {
// New agent discovered
agent := &Agent{
ID: agentID,
Type: pod.Labels["agent-type"],
Capabilities: []string{},
Status: "active",
LastHeartbeat: time.Now(),
PodName: pod.Name,
NodeName: pod.Spec.NodeName,
HardwareAccel: pod.Labels["hardware-accel"],
Metadata: make(map[string]string),
}
// Extract capabilities from pod annotations
if capsStr, ok := pod.Annotations["capabilities"]; ok {
if err := json.Unmarshal([]byte(capsStr), &agent.Capabilities); err != nil {
log.Printf("Error parsing capabilities for agent %s: %v", agentID, err)
}
}
ac.agents[agentID] = agent
log.Printf("New agent discovered: %s (type: %s)", agentID, agent.Type)
}
}
}
}
// The rest of the implementation would include task assignment logic,
// agent health monitoring, and HTTP handlers for the API endpoints.
func main() {
coordinator, err := NewAgentCoordinator()
if err != nil {
log.Fatalf("Failed to create agent coordinator: %v", err)
}
coordinator.Start()
}
This implementation demonstrates the core functionality of our agent coordinator. The coordinator discovers agent pods in the Kubernetes cluster, tracks their status, and assigns tasks based on agent capabilities. The coordinator also exposes HTTP endpoints for external systems to interact with the agent platform.
Let's now create the Kubernetes Deployment for our coordinator:
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-coordinator
namespace: agentic-system
spec:
replicas: 1
selector:
matchLabels:
app: agent-coordinator
template:
metadata:
labels:
app: agent-coordinator
spec:
serviceAccountName: agent-coordinator-sa
containers:
- name: coordinator
image: agentic-system/agent-coordinator:latest
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "200m"
ports:
- containerPort: 8080
env:
- name: NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
---
apiVersion: v1
kind: Service
metadata:
name: agent-coordinator-svc
namespace: agentic-system
spec:
selector:
app: agent-coordinator
ports:
- port: 8080
targetPort: 8080
type: ClusterIP
The coordinator needs appropriate permissions to interact with the Kubernetes API. Let's create a service account with the necessary RBAC permissions:
apiVersion: v1
kind: ServiceAccount
metadata:
name: agent-coordinator-sa
namespace: agentic-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: agent-coordinator-role
namespace: agentic-system
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: agent-coordinator-rb
namespace: agentic-system
subjects:
- kind: ServiceAccount
name: agent-coordinator-sa
namespace: agentic-system
roleRef:
kind: Role
name: agent-coordinator-role
apiGroup: rbac.authorization.k8s.io
These RBAC rules grant the coordinator permissions to manage pods and deployments within the agentic-system namespace, which is necessary for it to monitor and dynamically scale agent instances.
Building Specialized Agent Microservices
Now, let's implement a specialized agent microservice. Each agent will have specific capabilities and will communicate with the coordinator for task assignments. We'll create a Python-based agent that specializes in data processing as an example.
Here's the implementation of our data processing agent:
import os
import json
import time
import uuid
import requests
import logging
from typing import Dict, List, Any, Optional
import numpy as np
import threading
from flask import Flask, request, jsonify
from llamacpp_client import LlamaCppClient
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('data-processing-agent')
app = Flask(__name__)
class DataProcessingAgent:
def __init__(self):
self.agent_id = os.environ.get('AGENT_ID', f'data-agent-{uuid.uuid4().hex[:8]}')
self.agent_type = "data-processor"
self.capabilities = ["dataAnalysis", "textExtraction", "dataTransformation"]
self.coordinator_url = os.environ.get('COORDINATOR_URL', 'http://agent-coordinator-svc:8080')
self.status = "initializing"
self.current_task: Optional[Dict[str, Any]] = None
self.hardware_accel = self._detect_hardware_acceleration()
# Initialize LLM client
self.llm = LlamaCppClient(
model_path=os.environ.get('MODEL_PATH', '/models/llama-2-7b-q4_k_m.gguf'),
n_ctx=2048,
n_threads=int(os.environ.get('N_THREADS', '4')),
hardware_accel=self.hardware_accel
)
# Start background threads
self.running = True
self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop)
self.task_thread = threading.Thread(target=self._task_polling_loop)
self.heartbeat_thread.daemon = True
self.task_thread.daemon = True
def start(self):
"""Start the agent's background processes and register with the coordinator"""
logger.info(f"Starting agent {self.agent_id} with capabilities: {self.capabilities}")
self._register_with_coordinator()
self.status = "active"
self.heartbeat_thread.start()
self.task_thread.start()
logger.info(f"Agent {self.agent_id} started successfully")
def _detect_hardware_acceleration(self) -> str:
"""Detect available hardware acceleration"""
try:
import torch
if torch.cuda.is_available():
return "cuda"
if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
return "mps"
except ImportError:
pass
try:
# Check for ROCm
import subprocess
result = subprocess.run(['rocminfo'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode == 0:
return "rocm"
except (ImportError, FileNotFoundError):
pass
return "cpu"
def _register_with_coordinator(self):
"""Register this agent with the coordinator service"""
registration_data = {
"id": self.agent_id,
"type": self.agent_type,
"capabilities": self.capabilities,
"status": self.status,
"hardwareAccel": self.hardware_accel,
"metadata": {
"language": "python",
"version": "1.0.0"
}
}
try:
response = requests.post(
f"{self.coordinator_url}/agents",
json=registration_data,
timeout=5
)
if response.status_code == 200:
logger.info(f"Successfully registered with coordinator")
else:
logger.error(f"Failed to register with coordinator: {response.status_code} - {response.text}")
except requests.RequestException as e:
logger.error(f"Error connecting to coordinator: {e}")
def _heartbeat_loop(self):
"""Send regular heartbeats to the coordinator"""
while self.running:
try:
health_data = {
"agentId": self.agent_id,
"status": self.status,
"timestamp": time.time()
}
response = requests.post(
f"{self.coordinator_url}/health",
json=health_data,
timeout=5
)
if response.status_code != 200:
logger.warning(f"Heartbeat failed: {response.status_code}")
except requests.RequestException as e:
logger.warning(f"Heartbeat error: {e}")
time.sleep(30) # Send heartbeat every 30 seconds
def _task_polling_loop(self):
"""Poll for new tasks from the coordinator"""
while self.running:
if self.current_task is None and self.status == "active":
try:
response = requests.get(
f"{self.coordinator_url}/tasks?agent={self.agent_id}&status=pending",
timeout=5
)
if response.status_code == 200:
tasks = response.json().get("tasks", [])
if tasks:
self.current_task = tasks[0]
self.status = "busy"
# Process task in a separate thread
threading.Thread(target=self._process_task, args=(self.current_task,)).start()
except requests.RequestException as e:
logger.warning(f"Task polling error: {e}")
time.sleep(5) # Poll every 5 seconds
def _process_task(self, task):
"""Process a task using LLM and data processing capabilities"""
task_id = task["id"]
task_type = task["type"]
payload = task["payload"]
logger.info(f"Processing task {task_id} of type {task_type}")
try:
# Update task status to "processing"
self._update_task_status(task_id, "processing")
result = None
if task_type == "dataAnalysis":
result = self._perform_data_analysis(payload)
elif task_type == "textExtraction":
result = self._perform_text_extraction(payload)
elif task_type == "dataTransformation":
result = self._perform_data_transformation(payload)
else:
raise ValueError(f"Unsupported task type: {task_type}")
# Update task with result
self._update_task_result(task_id, "completed", result)
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}")
self._update_task_result(task_id, "failed", {"error": str(e)})
# Reset agent status
self.current_task = None
self.status = "active"
def _perform_data_analysis(self, payload):
"""Analyze data using LLM and numerical processing"""
data = payload.get("data", [])
if not data:
return {"error": "No data provided"}
# Convert data to numpy array for analysis
try:
arr = np.array(data)
# Basic statistical analysis
stats = {
"mean": float(np.mean(arr)),
"median": float(np.median(arr)),
"std": float(np.std(arr)),
"min": float(np.min(arr)),
"max": float(np.max(arr))
}
# Use LLM for insights
prompt = f"""
I have the following statistical data:
Mean: {stats['mean']}
Median: {stats['median']}
Standard Deviation: {stats['std']}
Min: {stats['min']}
Max: {stats['max']}
Provide 3 key insights about this data and potential implications.
"""
llm_response = self.llm.generate(prompt, max_tokens=512)
return {
"statistics": stats,
"insights": llm_response.strip(),
"dataPoints": len(data)
}
except Exception as e:
logger.error(f"Error in data analysis: {e}")
return {"error": f"Analysis failed: {str(e)}"}
# Additional methods would implement other capabilities and communication with the coordinator
# Flask routes for external API
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({"status": agent.status})
@app.route('/tasks', methods=['POST'])
def receive_task():
if agent.status != "active":
return jsonify({"error": "Agent is not available"}), 503
task = request.json
if not task:
return jsonify({"error": "No task data provided"}), 400
# Queue the task for processing
# In a real implementation, this would validate and queue the task
return jsonify({"accepted": True, "agentId": agent.agent_id})
if __name__ == "__main__":
agent = DataProcessingAgent()
agent.start()
# Start the Flask server
app.run(host='0.0.0.0', port=5000)
This agent implementation demonstrates several key aspects of our multi-agent system:
1. Hardware detection for acceleration
2. Registration with the coordinator
3. Heartbeat mechanism for health monitoring
4. Task polling and processing
5. Integration with llama.cpp for inference
6. Specialized data processing capabilities
Now, let's create a Kubernetes deployment for this agent type:
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-processing-agent
namespace: agentic-system
spec:
replicas: 3
selector:
matchLabels:
app: data-processing-agent
component: agent
template:
metadata:
labels:
app: data-processing-agent
component: agent
agent-type: data-processor
annotations:
capabilities: '["dataAnalysis", "textExtraction", "dataTransformation"]'
spec:
containers:
- name: agent
image: agentic-system/data-processing-agent:latest
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1"
ports:
- containerPort: 5000
env:
- name: COORDINATOR_URL
value: "http://agent-coordinator-svc:8080"
- name: MODEL_PATH
value: "/models/llama-2-7b-q4_k_m.gguf"
- name: N_THREADS
value: "4"
volumeMounts:
- name: models-volume
mountPath: /models
- name: knowledge-base
mountPath: /knowledge
volumes:
- name: models-volume
persistentVolumeClaim:
claimName: models-pvc
- name: knowledge-base
persistentVolumeClaim:
claimName: knowledge-base-pvc
---
apiVersion: v1
kind: Service
metadata:
name: data-processing-agent-svc
namespace: agentic-system
spec:
selector:
app: data-processing-agent
ports:
- port: 80
targetPort: 5000
type: ClusterIP
This deployment creates three instances of our data processing agent, with appropriate resource allocations and volume mounts for the model files and knowledge base.
Integrating llama.cpp as the AI Foundation
Now, let's focus on integrating llama.cpp into our agents. The llama.cpp library is a high-performance C++ implementation of the LLaMA model, which allows us to run large language models efficiently on various hardware platforms.
We'll first create a C++ service that wraps llama.cpp and exposes it via a RESTful API. The service will be optimized based on the available hardware acceleration.
Here's a simplified implementation of our llama.cpp server:
#include <iostream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <chrono>
#include <sstream>
#include <httplib.h>
#include <nlohmann/json.hpp>
#include "llama.h"
using json = nlohmann::json;
class LlamaServer {
public:
LlamaServer(const std::string& model_path, size_t n_ctx = 2048, int n_threads = 4, bool use_gpu = false)
: running_(false), n_ctx_(n_ctx), n_threads_(n_threads), use_gpu_(use_gpu) {
// Initialize llama.cpp
llama_backend_init(use_gpu);
// Load the model
llama_model_params model_params = llama_model_default_params();
model_ = llama_load_model_from_file(model_path.c_str(), model_params);
if (model_ == nullptr) {
throw std::runtime_error("Failed to load model from " + model_path);
}
std::cout << "Model loaded successfully from " << model_path << std::endl;
std::cout << "Using GPU acceleration: " << (use_gpu ? "Yes" : "No") << std::endl;
// Start worker threads
running_ = true;
for (int i = 0; i < n_threads_; ++i) {
workers_.emplace_back([this]() { worker_loop(); });
}
}
~LlamaServer() {
// Stop all worker threads
running_ = false;
cv_.notify_all();
for (auto& worker : workers_) {
if (worker.joinable()) {
worker.join();
}
}
// Free llama.cpp resources
if (model_ != nullptr) {
llama_free_model(model_);
}
llama_backend_free();
}
void start_server(const std::string& host, int port) {
httplib::Server server;
// Define API endpoints
server.Post("/generate", [this](const httplib::Request& req, httplib::Response& res) {
try {
json request_data = json::parse(req.body);
// Extract request parameters
std::string prompt = request_data["prompt"];
int max_tokens = request_data.value("max_tokens", 256);
float temperature = request_data.value("temperature", 0.8f);
float top_p = request_data.value("top_p", 0.9f);
// Create a new inference task
InferenceTask task{
prompt,
max_tokens,
temperature,
top_p,
"", // Will be populated with the result
std::promise<void>()
};
// Get future from the promise
std::future<void> future = task.completion.get_future();
// Add task to the queue
{
std::lock_guard<std::mutex> lock(queue_mutex_);
task_queue_.push(std::move(task));
}
// Notify a worker
cv_.notify_one();
// Wait for the result
future.wait();
// Send the response
json response = {
{"text", task.result},
{"prompt_tokens", prompt.size() / 4}, // Approximate
{"completion_tokens", task.result.size() / 4}, // Approximate
{"total_tokens", (prompt.size() + task.result.size()) / 4} // Approximate
};
res.set_content(response.dump(), "application/json");
} catch (const std::exception& e) {
json error = {{"error", e.what()}};
res.status = 400;
res.set_content(error.dump(), "application/json");
}
});
server.Get("/health", [this](const httplib::Request& req, httplib::Response& res) {
json health = {
{"status", "healthy"},
{"model_loaded", model_ != nullptr},
{"workers", n_threads_},
{"queue_size", task_queue_.size()},
{"gpu_enabled", use_gpu_}
};
res.set_content(health.dump(), "application/json");
});
std::cout << "Starting Llama.cpp server on " << host << ":" << port << std::endl;
server.listen(host, port);
}
private:
struct InferenceTask {
std::string prompt;
int max_tokens;
float temperature;
float top_p;
std::string result;
std::promise<void> completion;
};
void worker_loop() {
while (running_) {
std::unique_lock<std::mutex> lock(queue_mutex_);
// Wait for a task or stop signal
cv_.wait(lock, [this]() {
return !task_queue_.empty() || !running_;
});
if (!running_) {
break;
}
if (task_queue_.empty()) {
continue;
}
// Get the next task
InferenceTask task = std::move(task_queue_.front());
task_queue_.pop();
// Release the lock while processing
lock.unlock();
try {
// Create a new context for this inference
llama_context_params ctx_params = llama_context_default_params();
ctx_params.n_ctx = n_ctx_;
ctx_params.n_threads = 1; // Each worker uses 1 thread
llama_context* ctx = llama_new_context_with_model(model_, ctx_params);
if (ctx == nullptr) {
throw std::runtime_error("Failed to create context");
}
// Tokenize the prompt
std::vector<llama_token> tokens = llama_tokenize(ctx, task.prompt, true);
if (tokens.size() >= n_ctx_) {
tokens.resize(n_ctx_ - 4); // Leave some room for generated tokens
}
// Evaluate the prompt
if (llama_eval(ctx, tokens.data(), tokens.size(), 0, 1) != 0) {
throw std::runtime_error("Failed to evaluate prompt");
}
// Generate response
std::stringstream result;
llama_token prev_token = tokens.back();
for (int i = 0; i < task.max_tokens; ++i) {
// Get logits for the last token
float* logits = llama_get_logits(ctx);
// Sample next token
llama_token_data_array candidates = llama_token_get_candidates(ctx, logits, llama_n_vocab(model_));
llama_token_data* candidates_p = candidates.data;
// Apply temperature and top_p sampling
llama_sample_top_p(ctx, &candidates_p, candidates.size, task.top_p);
llama_token id = llama_sample_token(ctx, candidates_p, candidates.size, task.temperature);
// If we hit the end of sequence token, stop generating
if (id == llama_token_eos(model_)) {
break;
}
// Convert token to text
const char* token_str = llama_token_to_str(ctx, id);
if (token_str != nullptr) {
result << token_str;
}
// Evaluate the new token
if (llama_eval(ctx, &id, 1, tokens.size() + i, 1) != 0) {
throw std::runtime_error("Failed to evaluate token");
}
prev_token = id;
}
// Store the result
task.result = result.str();
// Clean up
llama_free_context(ctx);
} catch (const std::exception& e) {
task.result = std::string("Error: ") + e.what();
}
// Notify completion
task.completion.set_value();
}
}
// llama.cpp resources
llama_model* model_;
size_t n_ctx_;
int n_threads_;
bool use_gpu_;
// Worker thread management
std::atomic<bool> running_;
std::vector<std::thread> workers_;
std::queue<InferenceTask> task_queue_;
std::mutex queue_mutex_;
std::condition_variable cv_;
};
int main(int argc, char** argv) {
// Parse command line arguments
std::string model_path = "/models/llama-2-7b-q4_k_m.gguf";
size_t n_ctx = 2048;
int n_threads = 4;
bool use_gpu = false;
std::string host = "0.0.0.0";
int port = 8000;
for (int i = 1; i < argc; ++i) {
std::string arg = argv[i];
if (arg == "--model" && i + 1 < argc) {
model_path = argv[++i];
} else if (arg == "--ctx" && i + 1 < argc) {
n_ctx = std::stoi(argv[++i]);
} else if (arg == "--threads" && i + 1 < argc) {
n_threads = std::stoi(argv[++i]);
} else if (arg == "--gpu") {
use_gpu = true;
} else if (arg == "--host" && i + 1 < argc) {
host = argv[++i];
} else if (arg == "--port" && i + 1 < argc) {
port = std::stoi(argv[++i]);
}
}
try {
// Check for hardware acceleration
const char* gpu_info = getenv("GPU_INFO");
if (gpu_info != nullptr) {
std::string gpu_type = gpu_info;
if (gpu_type == "cuda") {
std::cout << "CUDA detected, enabling GPU acceleration" << std::endl;
use_gpu = true;
} else if (gpu_type == "mps") {
std::cout << "Apple MPS detected, enabling GPU acceleration" << std::endl;
use_gpu = true;
} else if (gpu_type == "rocm") {
std::cout << "AMD ROCm detected, enabling GPU acceleration" << std::endl;
use_gpu = true;
}
}
// Create and start the server
LlamaServer server(model_path, n_ctx, n_threads, use_gpu);
server.start_server(host, port);
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
This C++ service wraps llama.cpp and provides a RESTful API for text generation. It includes features such as multi-threading for handling concurrent requests, temperature and top-p sampling for controlling the generation diversity, and automatic hardware acceleration detection.
Now, let's create a Dockerfile to build this service with the appropriate dependencies:
FROM ubuntu:22.04 as builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
build-essential \
cmake \
git \
wget \
libssl-dev \
libcurl4-openssl-dev \
python3-dev \
python3-pip \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# Clone and build llama.cpp
WORKDIR /build
RUN git clone https://github.com/ggerganov/llama.cpp.git
WORKDIR /build/llama.cpp
# Build with CUDA support if available
ARG CUDA_SUPPORT=OFF
ARG MPS_SUPPORT=OFF
ARG ROCM_SUPPORT=OFF
RUN if [ "$CUDA_SUPPORT" = "ON" ]; then \
cmake -B build -DLLAMA_CUBLAS=ON && \
cmake --build build --config Release; \
elif [ "$MPS_SUPPORT" = "ON" ]; then \
cmake -B build -DLLAMA_METAL=ON && \
cmake --build build --config Release; \
elif [ "$ROCM_SUPPORT" = "ON" ]; then \
cmake -B build -DLLAMA_HIPBLAS=ON && \
cmake --build build --config Release; \
else \
cmake -B build && \
cmake --build build --config Release; \
fi
# Build the server application
WORKDIR /build/server
COPY server.cpp /build/server/
COPY CMakeLists.txt /build/server/
RUN cmake -B build \
-DLLAMA_CPP_PATH=/build/llama.cpp \
-DCUDA_SUPPORT=$CUDA_SUPPORT \
-DMPS_SUPPORT=$MPS_SUPPORT \
-DROCM_SUPPORT=$ROCM_SUPPORT && \
cmake --build build --config Release
# Create the runtime image
FROM ubuntu:22.04
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
libssl3 \
libcurl4 \
python3-minimal \
&& rm -rf /var/lib/apt/lists/*
# Copy the built executables
COPY --from=builder /build/server/build/llama_server /usr/local/bin/
COPY --from=builder /build/llama.cpp/build/libllama.so /usr/local/lib/
# Set library path
ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
# Create model directory
RUN mkdir -p /models
# Set the entrypoint
ENTRYPOINT ["/usr/local/bin/llama_server"]
This Dockerfile is designed to build the llama.cpp service with optional CUDA, MPS, or ROCm support. The build arguments (CUDA_SUPPORT, MPS_SUPPORT, ROCM_SUPPORT) can be set during the build process to enable the appropriate hardware acceleration.
Next, let's create Kubernetes manifests for deploying the llama.cpp service with hardware-specific configurations:
# Base configuration for the llama.cpp service
apiVersion: v1
kind: ConfigMap
metadata:
name: llama-config
namespace: agentic-system
data:
model-path: "/models/llama-2-7b-q4_k_m.gguf"
context-size: "2048"
threads: "4"
---
# Persistent volume for model storage
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: models-pvc
namespace: agentic-system
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 10Gi
storageClassName: standard
---
# Deployment for CPU nodes
apiVersion: apps/v1
kind: Deployment
metadata:
name: llama-cpu
namespace: agentic-system
spec:
replicas: 2
selector:
matchLabels:
app: llama-service
hardware: cpu
template:
metadata:
labels:
app: llama-service
hardware: cpu
spec:
nodeSelector:
hardware-type: cpu
containers:
- name: llama
image: agentic-system/llama-service:latest
args:
- "--model"
- "$(MODEL_PATH)"
- "--ctx"
- "$(CONTEXT_SIZE)"
- "--threads"
- "$(THREADS)"
resources:
requests:
memory: "2Gi"
cpu: "2"
limits:
memory: "4Gi"
cpu: "4"
ports:
- containerPort: 8000
env:
- name: MODEL_PATH
valueFrom:
configMapKeyRef:
name: llama-config
key: model-path
- name: CONTEXT_SIZE
valueFrom:
configMapKeyRef:
name: llama-config
key: context-size
- name: THREADS
valueFrom:
configMapKeyRef:
name: llama-config
key: threads
volumeMounts:
- name: models-volume
mountPath: /models
readOnly: true
volumes:
- name: models-volume
persistentVolumeClaim:
claimName: models-pvc
---
# Deployment for CUDA GPU nodes
apiVersion: apps/v1
kind: Deployment
metadata:
name: llama-cuda
namespace: agentic-system
spec:
replicas: 1
selector:
matchLabels:
app: llama-service
hardware: cuda
template:
metadata:
labels:
app: llama-service
hardware: cuda
spec:
nodeSelector:
hardware-type: cuda
containers:
- name: llama
image: agentic-system/llama-service:cuda
args:
- "--model"
- "$(MODEL_PATH)"
- "--ctx"
- "$(CONTEXT_SIZE)"
- "--gpu"
resources:
requests:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: "1"
limits:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: "1"
ports:
- containerPort: 8000
env:
- name: MODEL_PATH
valueFrom:
configMapKeyRef:
name: llama-config
key: model-path
- name: CONTEXT_SIZE
valueFrom:
configMapKeyRef:
name: llama-config
key: context-size
- name: GPU_INFO
value: "cuda"
volumeMounts:
- name: models-volume
mountPath: /models
readOnly: true
volumes:
- name: models-volume
persistentVolumeClaim:
claimName: models-pvc
---
# Similar deployments for MPS and ROCm would follow the same pattern
# ...
---
# Service to route requests to the llama.cpp instances
apiVersion: v1
kind: Service
metadata:
name: llama-service
namespace: agentic-system
spec:
selector:
app: llama-service
ports:
- port: 80
targetPort: 8000
type: ClusterIP
These Kubernetes manifests deploy multiple versions of the llama.cpp service, each optimized for a specific hardware platform. The service uses node selectors to ensure that pods are scheduled on nodes with the appropriate hardware. The common configuration, such as the model path and context size, is stored in a ConfigMap for easy management.
Optimizing llama.cpp for Hardware Acceleration
Now, let's look at how to optimize llama.cpp for different hardware platforms:
CUDA Optimization
For NVIDIA GPUs with CUDA support, we need to build llama.cpp with CUBLAS enabled. Here's the CMakeLists.txt file for our server application with CUDA support:
cmake_minimum_required(VERSION 3.12)
project(llama_server)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# Check for CUDA support
option(CUDA_SUPPORT "Build with CUDA support" OFF)
if(CUDA_SUPPORT)
find_package(CUDA REQUIRED)
add_definitions(-DLLAMA_CUBLAS=1)
set(CUDA_LIBS ${CUDA_LIBRARIES} ${CUDA_CUBLAS_LIBRARIES})
endif()
# Check for MPS support
option(MPS_SUPPORT "Build with MPS support" OFF)
if(MPS_SUPPORT)
add_definitions(-DLLAMA_METAL=1)
endif()
# Check for ROCm support
option(ROCM_SUPPORT "Build with ROCm support" OFF)
if(ROCM_SUPPORT)
find_package(hip REQUIRED)
add_definitions(-DLLAMA_HIPBLAS=1)
set(ROCM_LIBS hip::host hip::device)
endif()
# Find llama.cpp
set(LLAMA_CPP_PATH "" CACHE PATH "Path to llama.cpp repository")
if(NOT LLAMA_CPP_PATH)
message(FATAL_ERROR "LLAMA_CPP_PATH not set. Please specify the path to llama.cpp repository.")
endif()
include_directories(${LLAMA_CPP_PATH})
link_directories(${LLAMA_CPP_PATH}/build)
# Find dependencies
find_package(Threads REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(CURL REQUIRED)
# Add the httplib header-only library
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/deps/cpp-httplib)
# Add the nlohmann/json header-only library
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/deps/json/include)
# Add source files
set(SOURCES
server.cpp
)
# Build the server executable
add_executable(llama_server ${SOURCES})
# Link against llama.cpp and other libraries
target_link_libraries(llama_server
llama
Threads::Threads
OpenSSL::SSL
OpenSSL::Crypto
${CURL_LIBRARIES}
)
# Add platform-specific libraries
if(CUDA_SUPPORT)
target_link_libraries(llama_server ${CUDA_LIBS})
endif()
if(ROCM_SUPPORT)
target_link_libraries(llama_server ${ROCM_LIBS})
endif()
# Install the executable
install(TARGETS llama_server DESTINATION bin)
When deploying on nodes with NVIDIA GPUs, we would build a custom Docker image with CUDA support:
docker build --build-arg CUDA_SUPPORT=ON -t agentic-system/llama-service:cuda .
Apple MPS Optimization
For Apple Silicon with Metal Performance Shaders (MPS), we need to build llama.cpp with Metal support:
The deployment would then use node selectors to ensure that the MPS-optimized pods are scheduled on Apple Silicon nodes:
apiVersion: apps/v1
kind: Deployment
metadata:
name: llama-mps
namespace: agentic-system
spec:
replicas: 1
selector:
matchLabels:
app: llama-service
hardware: mps
template:
metadata:
labels:
app: llama-service
hardware: mps
spec:
nodeSelector:
hardware-type: mps
containers:
- name: llama
image: agentic-system/llama-service:mps
# ... similar to CUDA deployment
env:
- name: GPU_INFO
value: "mps"
AMD ROCm Optimization
For AMD GPUs with ROCm support, we would build llama.cpp with HIP/HIPBLAS support:
docker build --build-arg ROCM_SUPPORT=ON -t agentic-system/llama-service:rocm .
The deployment would use node selectors for AMD GPU nodes:
apiVersion: apps/v1
kind: Deployment
metadata:
name: llama-rocm
namespace: agentic-system
spec:
replicas: 1
selector:
matchLabels:
app: llama-service
hardware: rocm
template:
metadata:
labels:
app: llama-service
hardware: rocm
spec:
nodeSelector:
hardware-type: rocm
containers:
- name: llama
image: agentic-system/llama-service:rocm
# ... similar to CUDA deployment
resources:
requests:
memory: "4Gi"
cpu: "2"
amd.com/gpu: "1"
limits:
memory: "8Gi"
cpu: "4"
amd.com/gpu: "1"
env:
- name: GPU_INFO
value: "rocm"
Implementing Inter-Agent Communication
To enable effective collaboration between agents, we need a robust communication mechanism. We'll use a message bus architecture based on NATS, a lightweight, high-performance messaging system.
First, let's deploy NATS in our Kubernetes cluster:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: nats
namespace: agentic-system
spec:
serviceName: nats
replicas: 3
selector:
matchLabels:
app: nats
template:
metadata:
labels:
app: nats
spec:
containers:
- name: nats
image: nats:2.9.16-alpine
args:
- "-c"
- "/etc/nats/nats.conf"
ports:
- containerPort: 4222
name: client
- containerPort: 6222
name: cluster
- containerPort: 8222
name: monitor
volumeMounts:
- name: config-volume
mountPath: /etc/nats
volumes:
- name: config-volume
configMap:
name: nats-config
---
apiVersion: v1
kind: ConfigMap
metadata:
name: nats-config
namespace: agentic-system
data:
nats.conf: |
# NATS Server Configuration
pid_file: "/var/run/nats/nats.pid"
http: 8222
cluster {
port: 6222
routes [
nats://nats-0.nats:6222
nats://nats-1.nats:6222
nats://nats-2.nats:6222
]
}
jetstream {
store_dir: "/data"
max_mem: 1G
max_file: 10G
}
---
apiVersion: v1
kind: Service
metadata:
name: nats
namespace: agentic-system
spec:
selector:
app: nats
ports:
- name: client
port: 4222
targetPort: client
- name: cluster
port: 6222
targetPort: cluster
- name: monitor
port: 8222
targetPort: monitor
clusterIP: None
```
Now, let's enhance our agent implementation to include NATS-based communication. We'll add a messaging client to our data processing agent:
import asyncio
import nats
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout
class MessageBus:
def __init__(self, nats_url="nats://nats:4222"):
self.nats_url = nats_url
self.nc = NATS()
self.js = None
self.connected = False
self.subscriptions = {}
async def connect(self):
try:
await self.nc.connect(self.nats_url)
self.js = self.nc.jetstream()
self.connected = True
print(f"Connected to NATS at {self.nats_url}")
except Exception as e:
print(f"Error connecting to NATS: {e}")
self.connected = False
async def disconnect(self):
if self.connected:
await self.nc.close()
self.connected = False
async def publish(self, subject, message):
if not self.connected:
await self.connect()
if isinstance(message, dict):
message = json.dumps(message).encode()
elif isinstance(message, str):
message = message.encode()
try:
ack = await self.js.publish(subject, message)
return ack
except Exception as e:
print(f"Error publishing message: {e}")
return None
async def subscribe(self, subject, queue_group, callback):
if not self.connected:
await self.connect()
async def wrapped_callback(msg):
try:
payload = msg.data.decode()
try:
payload = json.loads(payload)
except:
pass # Not JSON, use as is
await callback(subject, payload, msg)
except Exception as e:
print(f"Error in message handler: {e}")
try:
sub = await self.js.subscribe(subject, queue=queue_group, cb=wrapped_callback)
self.subscriptions[subject] = sub
return sub
except Exception as e:
print(f"Error subscribing to {subject}: {e}")
return None
async def unsubscribe(self, subject):
if subject in self.subscriptions:
try:
await self.subscriptions[subject].unsubscribe()
del self.subscriptions[subject]
except Exception as e:
print(f"Error unsubscribing from {subject}: {e}")
The MessageBus class provides a clean interface for publishing and subscribing to messages. Now, let's integrate it into our agent implementation:
class DataProcessingAgent:
def __init__(self):
# Previous initialization code...
# Initialize message bus
self.message_bus = MessageBus(os.environ.get('NATS_URL', 'nats://nats:4222'))
self.event_loop = asyncio.get_event_loop()
self.event_loop.run_until_complete(self.message_bus.connect())
# Set up message handlers
self.event_loop.run_until_complete(self.setup_message_handlers())
async def setup_message_handlers(self):
# Subscribe to agent-specific messages
await self.message_bus.subscribe(
f"agent.{self.agent_id}.command",
"data-processors",
self.handle_command
)
# Subscribe to broadcast messages for all agents
await self.message_bus.subscribe(
"agent.broadcast.command",
"data-processors",
self.handle_broadcast
)
# Subscribe to specific capability requests
for capability in self.capabilities:
await self.message_bus.subscribe(
f"capability.{capability}.request",
"data-processors",
self.handle_capability_request
)
async def handle_command(self, subject, message, raw_msg):
"""Handle commands directed to this specific agent"""
logger.info(f"Received command: {message}")
command = message.get("command")
if command == "process":
# Process the task asynchronously
threading.Thread(target=self._process_task, args=(message.get("task"),)).start()
# Acknowledge receipt
await self.message_bus.publish(
f"agent.{self.agent_id}.response",
{"status": "accepted", "taskId": message.get("task", {}).get("id")}
)
async def handle_broadcast(self, subject, message, raw_msg):
"""Handle broadcast messages sent to all agents"""
logger.info(f"Received broadcast: {message}")
command = message.get("command")
if command == "health_check":
# Respond with health status
await self.message_bus.publish(
"agent.health.response",
{
"agentId": self.agent_id,
"status": self.status,
"capabilities": self.capabilities,
"timestamp": time.time()
}
)
async def handle_capability_request(self, subject, message, raw_msg):
"""Handle requests for specific capabilities"""
capability = subject.split('.')[1]
logger.info(f"Received capability request for {capability}: {message}")
# Check if we're available to handle this request
if self.status != "active":
return
# Offer to take the task if we can handle it
await self.message_bus.publish(
f"capability.{capability}.offer",
{
"agentId": self.agent_id,
"requestId": message.get("requestId"),
"load": 0.5, # Example load metric
"priority": 1 # Example priority
}
)
async def publish_result(self, task_id, result):
"""Publish task results to the message bus"""
await self.message_bus.publish(
f"task.{task_id}.result",
{
"agentId": self.agent_id,
"taskId": task_id,
"result": result,
"timestamp": time.time()
}
)
# Modified _process_task method to publish results via the message bus
def _process_task(self, task):
task_id = task["id"]
task_type = task["type"]
payload = task["payload"]
logger.info(f"Processing task {task_id} of type {task_type}")
try:
# Update task status to "processing"
self._update_task_status(task_id, "processing")
result = None
if task_type == "dataAnalysis":
result = self._perform_data_analysis(payload)
elif task_type == "textExtraction":
result = self._perform_text_extraction(payload)
elif task_type == "dataTransformation":
result = self._perform_data_transformation(payload)
else:
raise ValueError(f"Unsupported task type: {task_type}")
# Update task with result
self._update_task_result(task_id, "completed", result)
# Publish result to message bus
self.event_loop.run_until_complete(self.publish_result(task_id, result))
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}")
self._update_task_result(task_id, "failed", {"error": str(e)})
# Reset agent status
self.current_task = None
self.status = "active"
This enhanced implementation allows agents to communicate through the message bus, enabling more complex interaction patterns such as:
1. Direct commands to specific agents
2. Broadcast messages to all agents
3. Capability-based task distribution
4. Asynchronous result publishing
Deployment and Scaling Considerations
A key advantage of running our multi-agent system on Kubernetes is the ability to scale dynamically based on workload demands. Let's implement a Horizontal Pod Autoscaler (HPA) for our agent deployments:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: data-processing-agent-hpa
namespace: agentic-system
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: data-processing-agent
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 20
periodSeconds: 120
This HPA configuration will automatically scale the number of data processing agent pods based on CPU and memory utilization. The behavior section defines how quickly the deployment should scale up and down, ensuring stability while still responding to changes in demand.
For the llama.cpp service, we may want to implement a custom scaling solution that takes into account the specific hardware requirements. Here's a simple Kubernetes CronJob that periodically checks the queue length and adjusts the number of replicas accordingly:
apiVersion: batch/v1
kind: CronJob
metadata:
name: llama-scaler
namespace: agentic-system
spec:
schedule: "*/5 * * * *" # Run every 5 minutes
jobTemplate:
spec:
template:
spec:
serviceAccountName: llama-scaler-sa
containers:
- name: kubectl
image: bitnami/kubectl:latest
command:
- /bin/sh
- -c
- |
#!/bin/sh
# Query the llama service for queue length
QUEUE_LENGTH=$(curl -s http://llama-service/health | jq '.queue_size')
# Scale based on queue length
if [ "$QUEUE_LENGTH" -gt 50 ]; then
# High load, scale up
kubectl scale deployment llama-cpu --replicas=5
elif [ "$QUEUE_LENGTH" -gt 20 ]; then
# Medium load
kubectl scale deployment llama-cpu --replicas=3
elif [ "$QUEUE_LENGTH" -gt 5 ]; then
# Low load
kubectl scale deployment llama-cpu --replicas=2
else
# Minimal load
kubectl scale deployment llama-cpu --replicas=1
fi
restartPolicy: OnFailure
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: llama-scaler-sa
namespace: agentic-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: llama-scaler-role
namespace: agentic-system
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: llama-scaler-rb
namespace: agentic-system
subjects:
- kind: ServiceAccount
name: llama-scaler-rb
namespace: agentic-system
subjects:
- kind: ServiceAccount
name: llama-scaler-sa
namespace: agentic-system
roleRef:
kind: Role
name: llama-scaler-role
apiGroup: rbac.authorization.k8s.io
```
This scaling solution periodically checks the queue length of the llama.cpp service and adjusts the number of replicas based on the current load. While this approach is relatively simple, it provides a good balance between responsiveness and stability.
Monitoring and Observability for Multi-Agent Systems
Monitoring a distributed multi-agent system requires capturing metrics, logs, and traces from all components. We'll implement a comprehensive observability stack based on Prometheus, Grafana, and OpenTelemetry.
First, let's add instrumentation to our agent code to expose Prometheus metrics:
from prometheus_client import Counter, Gauge, Histogram, start_http_server
class MetricsCollector:
def __init__(self, port=8000):
# Define metrics
self.task_counter = Counter(
'agent_tasks_total',
'Total number of tasks processed',
['agent_id', 'task_type', 'status']
)
self.active_tasks = Gauge(
'agent_active_tasks',
'Number of currently active tasks',
['agent_id']
)
self.task_duration = Histogram(
'agent_task_duration_seconds',
'Time spent processing tasks',
['agent_id', 'task_type'],
buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0)
)
self.llm_inference_duration = Histogram(
'agent_llm_inference_duration_seconds',
'Time spent on LLM inference',
['agent_id', 'model'],
buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0)
)
self.queue_size = Gauge(
'agent_queue_size',
'Number of tasks in the queue',
['agent_id']
)
# Start metrics server
start_http_server(port)
print(f"Metrics server started on port {port}")
def record_task_started(self, agent_id, task_type):
self.task_counter.labels(agent_id=agent_id, task_type=task_type, status='started').inc()
self.active_tasks.labels(agent_id=agent_id).inc()
def record_task_completed(self, agent_id, task_type, duration):
self.task_counter.labels(agent_id=agent_id, task_type=task_type, status='completed').inc()
self.active_tasks.labels(agent_id=agent_id).dec()
self.task_duration.labels(agent_id=agent_id, task_type=task_type).observe(duration)
def record_task_failed(self, agent_id, task_type, duration):
self.task_counter.labels(agent_id=agent_id, task_type=task_type, status='failed').inc()
self.active_tasks.labels(agent_id=agent_id).dec()
self.task_duration.labels(agent_id=agent_id, task_type=task_type).observe(duration)
def record_llm_inference(self, agent_id, model, duration):
self.llm_inference_duration.labels(agent_id=agent_id, model=model).observe(duration)
def set_queue_size(self, agent_id, size):
self.queue_size.labels(agent_id=agent_id).set(size)
Now, let's integrate this metrics collector into our agent implementation:
class DataProcessingAgent:
def __init__(self):
# Previous initialization code...
# Initialize metrics collector
self.metrics = MetricsCollector(port=8000)
def _process_task(self, task):
task_id = task["id"]
task_type = task["type"]
payload = task["payload"]
logger.info(f"Processing task {task_id} of type {task_type}")
# Record task start
start_time = time.time()
self.metrics.record_task_started(self.agent_id, task_type)
try:
# Update task status to "processing"
self._update_task_status(task_id, "processing")
result = None
if task_type == "dataAnalysis":
result = self._perform_data_analysis(payload)
elif task_type == "textExtraction":
result = self._perform_text_extraction(payload)
elif task_type == "dataTransformation":
result = self._perform_data_transformation(payload)
else:
raise ValueError(f"Unsupported task type: {task_type}")
# Update task with result
self._update_task_result(task_id, "completed", result)
# Publish result to message bus
self.event_loop.run_until_complete(self.publish_result(task_id, result))
# Record successful completion
duration = time.time() - start_time
self.metrics.record_task_completed(self.agent_id, task_type, duration)
except Exception as e:
logger.error(f"Error processing task {task_id}: {e}")
self._update_task_result(task_id, "failed", {"error": str(e)})
# Record failure
duration = time.time() - start_time
self.metrics.record_task_failed(self.agent_id, task_type, duration)
# Reset agent status
self.current_task = None
self.status = "active"
def _task_polling_loop(self):
"""Poll for new tasks from the coordinator"""
while self.running:
# Update queue size metric
task_count = 0
try:
response = requests.get(
f"{self.coordinator_url}/tasks?agent={self.agent_id}&status=pending",
timeout=5
)
if response.status_code == 200:
tasks = response.json().get("tasks", [])
task_count = len(tasks)
except:
pass
self.metrics.set_queue_size(self.agent_id, task_count)
# Existing task polling logic...
time.sleep(5) # Poll every 5 seconds
Next, let's deploy Prometheus and Grafana to collect and visualize these metrics:
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
namespace: agentic-system
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
- source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
target_label: __address__
- action: labelmap
regex: __meta_kubernetes_pod_label_(.+)
- source_labels: [__meta_kubernetes_namespace]
action: replace
target_label: kubernetes_namespace
- source_labels: [__meta_kubernetes_pod_name]
action: replace
target_label: kubernetes_pod_name
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus
namespace: agentic-system
spec:
replicas: 1
selector:
matchLabels:
app: prometheus
template:
metadata:
labels:
app: prometheus
spec:
serviceAccountName: prometheus
containers:
- name: prometheus
image: prom/prometheus:v2.45.0
args:
- "--config.file=/etc/prometheus/prometheus.yml"
- "--storage.tsdb.path=/prometheus"
- "--storage.tsdb.retention.time=15d"
ports:
- containerPort: 9090
volumeMounts:
- name: prometheus-config
mountPath: /etc/prometheus
- name: prometheus-storage
mountPath: /prometheus
volumes:
- name: prometheus-config
configMap:
name: prometheus-config
- name: prometheus-storage
persistentVolumeClaim:
claimName: prometheus-storage-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: prometheus-storage-pvc
namespace: agentic-system
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
storageClassName: standard
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: prometheus
namespace: agentic-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: prometheus
rules:
- apiGroups: [""]
resources: ["nodes", "nodes/proxy", "services", "endpoints", "pods"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get"]
- nonResourceURLs: ["/metrics"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: prometheus
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: prometheus
subjects:
- kind: ServiceAccount
name: prometheus
namespace: agentic-system
---
apiVersion: v1
kind: Service
metadata:
name: prometheus
namespace: agentic-system
spec:
selector:
app: prometheus
ports:
- port: 9090
targetPort: 9090
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: grafana
namespace: agentic-system
spec:
replicas: 1
selector:
matchLabels:
app: grafana
template:
metadata:
labels:
app: grafana
spec:
containers:
- name: grafana
image: grafana/grafana:10.0.3
ports:
- containerPort: 3000
volumeMounts:
- name: grafana-storage
mountPath: /var/lib/grafana
env:
- name: GF_SECURITY_ADMIN_PASSWORD
valueFrom:
secretKeyRef:
name: grafana-admin
key: password
- name: GF_USERS_ALLOW_SIGN_UP
value: "false"
volumes:
- name: grafana-storage
persistentVolumeClaim:
claimName: grafana-storage-pvc
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: grafana-storage-pvc
namespace: agentic-system
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: standard
---
apiVersion: v1
kind: Service
metadata:
name: grafana
namespace: agentic-system
spec:
selector:
app: grafana
ports:
- port: 80
targetPort: 3000
type: ClusterIP
---
apiVersion: v1
kind: Secret
metadata:
name: grafana-admin
namespace: agentic-system
type: Opaque
data:
password: YWRtaW4xMjM= # base64 encoded 'admin123' - change in production
This deployment sets up Prometheus for metrics collection and Grafana for visualization. The Prometheus configuration includes automatic discovery of pods with the appropriate annotations, which allows us to dynamically add and remove monitoring targets as agents scale up and down.
Let's also configure an example dashboard for our multi-agent system:
apiVersion: v1
kind: ConfigMap
metadata:
name: grafana-dashboards
namespace: agentic-system
data:
agent-dashboard.json: |
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": 1,
"links": [],
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "Prometheus",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"hiddenSeries": false,
"id": 2,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(agent_tasks_total) by (status)",
"interval": "",
"legendFormat": "{{status}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Total Tasks by Status",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "Prometheus",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"hiddenSeries": false,
"id": 4,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(agent_active_tasks) by (agent_id)",
"interval": "",
"legendFormat": "{{agent_id}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Active Tasks by Agent",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "Prometheus",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"hiddenSeries": false,
"id": 6,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "histogram_quantile(0.95, sum(rate(agent_task_duration_seconds_bucket[5m])) by (task_type, le))",
"interval": "",
"legendFormat": "{{task_type}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "95th Percentile Task Duration by Type",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "Prometheus",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"hiddenSeries": false,
"id": 8,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.5",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "histogram_quantile(0.95, sum(rate(agent_llm_inference_duration_seconds_bucket[5m])) by (model, le))",
"interval": "",
"legendFormat": "{{model}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "95th Percentile LLM Inference Duration by Model",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": "10s",
"schemaVersion": 27,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-6h",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "Agent System Dashboard",
"uid": "agent-system",
"version": 1
}
This dashboard provides a comprehensive view of the multi-agent system's performance, including task counts, active tasks by agent, task duration percentiles, and LLM inference times.
Conclusion and Future Directions
In this article, we've explored how to build a sophisticated multi-agent Kubernetes application that leverages distributed pods, microservices, and hardware-optimized llama.cpp instances. The system we've designed is scalable, resilient, and capable of adapting to changing workloads and hardware environments.
Key components of our architecture include:
1. An agent coordinator service that manages task allocation and agent lifecycle
2. Specialized agent microservices with specific capabilities
3. A llama.cpp integration optimized for different hardware platforms (CUDA, MPS, ROCm)
4. A messaging system for inter-agent communication
5. Dynamic scaling based on workload demands
6. Comprehensive monitoring and observability
While this implementation provides a solid foundation, there are several areas for future enhancement:
1. Agent Learning: Implementing reinforcement learning mechanisms to allow agents to improve their decision-making over time.
2. Multi-Model Support: Extending the llama.cpp integration to support multiple models of varying sizes, optimized for different tasks.
3. Advanced Task Orchestration: Implementing more sophisticated task scheduling algorithms based on agent capabilities, load, and success rates.
4. Security Enhancements: Adding authentication, authorization, and encryption for agent communication.
5. Federated Learning: Enabling agents to collaboratively learn and share knowledge while maintaining privacy.
By building on this foundation, you can create even more sophisticated multi-agent systems capable of solving complex problems through emergent collaboration.
The approach outlined in this article demonstrates how modern cloud-native technologies can be combined with state-of-the-art AI techniques to create intelligent, distributed systems that can harness the power of specialized hardware and scale dynamically based on demand. As both Kubernetes and language models continue to evolve, the potential for these multi-agent systems will only grow.
No comments:
Post a Comment