Wednesday, May 21, 2025

Building an Agentic Kubernetes Application: Multi-Agent Systems with Hardware-Optimized llama.cpp

 Introduction


Agentic applications represent a paradigm shift in how we design and deploy software systems. Rather than creating monolithic applications with predefined behaviors, agentic systems consist of autonomous components that can perceive their environment, make decisions, and take actions to achieve specific goals. When deployed on Kubernetes, these systems gain powerful scaling and orchestration capabilities, enabling them to respond dynamically to changing workloads and requirements.


In this article, we'll explore how to build a sophisticated multi-agent system on Kubernetes that leverages specialized microservices and hardware-optimized llama.cpp instances as the foundation for AI capabilities. We'll cover the entire process from architecture design to deployment, with particular attention to performance optimization across different hardware platforms.


Architectural Overview


Our multi-agent Kubernetes application will follow a distributed architecture where multiple specialized agents collaborate to accomplish complex tasks. The system consists of several key components: a central agent coordinator service that manages task allocation and agent lifecycle; specialized agent microservices that handle specific domains of functionality; a shared knowledge base for persistent information; and multiple llama.cpp instances optimized for different hardware configurations.


The agents will communicate asynchronously through a message bus, allowing them to operate independently while still collaborating effectively. The system will support dynamic scaling, automatically adjusting the number of agent instances based on workload demands. Each agent will have access to a llama.cpp instance, configured and optimized for the node's hardware capabilities.


Setting Up the Kubernetes Environment


Before diving into the implementation details, we need to establish our Kubernetes environment. We'll assume familiarity with basic Kubernetes concepts, but will provide detailed explanations for more advanced configurations.


First, let's create a dedicated namespace for our agentic application. A namespace provides a scope for Kubernetes resources and helps organize components in a multi-tenant environment. The namespace definition can be created using the following YAML configuration:


This configuration creates a dedicated namespace called "agentic-system" with resource quotas to prevent the system from consuming excessive cluster resources. The namespace has limits on CPU and memory usage, as well as the maximum number of pods that can be deployed.



apiVersion: v1

kind: Namespace

metadata:

  name: agentic-system

  labels:

    name: agentic-system

    environment: production

---

apiVersion: v1

kind: ResourceQuota

metadata:

  name: agentic-quota

  namespace: agentic-system

spec:

  hard:

    pods: "50"

    requests.cpu: "20"

    requests.memory: 40Gi

    limits.cpu: "40"

    limits.memory: 80Gi



Next, we'll need to set up persistent storage for our agents' knowledge base. The following PersistentVolumeClaim will create storage that can be shared across different agent pods:


apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: knowledge-base-pvc

  namespace: agentic-system

spec:

  accessModes:

    - ReadWriteMany

  resources:

    requests:

      storage: 20Gi

  storageClassName: standard



This configuration creates a 20GB persistent volume that can be mounted by multiple pods simultaneously in read-write mode, which is essential for our knowledge base component.


Implementing the Agent Coordinator Service


The agent coordinator is the brain of our multi-agent system. It tracks all available agents, assigns tasks based on agent capabilities, and manages the lifecycle of agent instances. Let's implement this service using Go, which offers excellent performance and native Kubernetes client libraries.


Here's how we would implement the core functionality of the agent coordinator:



package main


import (

    "context"

    "encoding/json"

    "log"

    "net/http"

    "os"

    "time"


    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

    "k8s.io/client-go/kubernetes"

    "k8s.io/client-go/rest"

)


// Agent represents a registered agent in the system

type Agent struct {

    ID             string            `json:"id"`

    Type           string            `json:"type"`

    Capabilities   []string          `json:"capabilities"`

    Status         string            `json:"status"`

    LastHeartbeat  time.Time         `json:"lastHeartbeat"`

    PodName        string            `json:"podName"`

    NodeName       string            `json:"nodeName"`

    HardwareAccel  string            `json:"hardwareAccel,omitempty"`

    Metadata       map[string]string `json:"metadata,omitempty"`

}


// AgentCoordinator manages the multi-agent system

type AgentCoordinator struct {

    agents        map[string]*Agent

    clientset     *kubernetes.Clientset

    namespace     string

    taskQueue     chan Task

    agentHealthCh chan AgentHealth

}


// Task represents a unit of work to be assigned to agents

type Task struct {

    ID          string                 `json:"id"`

    Type        string                 `json:"type"`

    Priority    int                    `json:"priority"`

    RequiredCap []string               `json:"requiredCapabilities"`

    Payload     map[string]interface{} `json:"payload"`

    AssignedTo  string                 `json:"assignedTo,omitempty"`

    Status      string                 `json:"status"`

    CreatedAt   time.Time              `json:"createdAt"`

}


// AgentHealth represents a health update from an agent

type AgentHealth struct {

    AgentID   string    `json:"agentId"`

    Status    string    `json:"status"`

    Timestamp time.Time `json:"timestamp"`

}


// NewAgentCoordinator creates a new coordinator instance

func NewAgentCoordinator() (*AgentCoordinator, error) {

    // Use in-cluster config when running inside Kubernetes

    config, err := rest.InClusterConfig()

    if err != nil {

        return nil, err

    }

    

    clientset, err := kubernetes.NewForConfig(config)

    if err != nil {

        return nil, err

    }

    

    namespace := os.Getenv("NAMESPACE")

    if namespace == "" {

        namespace = "agentic-system"

    }

    

    return &AgentCoordinator{

        agents:        make(map[string]*Agent),

        clientset:     clientset,

        namespace:     namespace,

        taskQueue:     make(chan Task, 100),

        agentHealthCh: make(chan AgentHealth, 100),

    }, nil

}


// Start begins the coordinator's main loops

func (ac *AgentCoordinator) Start() {

    // Start the agent discovery process

    go ac.discoverAgents()

    

    // Start the task assignment process

    go ac.assignTasks()

    

    // Start the health monitoring process

    go ac.monitorAgentHealth()

    

    // Set up HTTP API

    http.HandleFunc("/agents", ac.handleAgents)

    http.HandleFunc("/tasks", ac.handleTasks)

    http.HandleFunc("/health", ac.handleHealth)

    

    log.Fatal(http.ListenAndServe(":8080", nil))

}


// discoverAgents periodically discovers agent pods in the cluster

func (ac *AgentCoordinator) discoverAgents() {

    ticker := time.NewTicker(30 * time.Second)

    defer ticker.Stop()

    

    for {

        <-ticker.C

        

        // List all pods with the "component=agent" label

        pods, err := ac.clientset.CoreV1().Pods(ac.namespace).List(context.Background(), metav1.ListOptions{

            LabelSelector: "component=agent",

        })

        

        if err != nil {

            log.Printf("Error discovering agents: %v", err)

            continue

        }

        

        // Update agents map based on discovered pods

        for _, pod := range pods.Items {

            if pod.Status.Phase != "Running" {

                continue

            }

            

            agentID := pod.Labels["agent-id"]

            if agentID == "" {

                continue

            }

            

            // Check if agent already exists in our map

            if _, exists := ac.agents[agentID]; !exists {

                // New agent discovered

                agent := &Agent{

                    ID:            agentID,

                    Type:          pod.Labels["agent-type"],

                    Capabilities:  []string{},

                    Status:        "active",

                    LastHeartbeat: time.Now(),

                    PodName:       pod.Name,

                    NodeName:      pod.Spec.NodeName,

                    HardwareAccel: pod.Labels["hardware-accel"],

                    Metadata:      make(map[string]string),

                }

                

                // Extract capabilities from pod annotations

                if capsStr, ok := pod.Annotations["capabilities"]; ok {

                    if err := json.Unmarshal([]byte(capsStr), &agent.Capabilities); err != nil {

                        log.Printf("Error parsing capabilities for agent %s: %v", agentID, err)

                    }

                }

                

                ac.agents[agentID] = agent

                log.Printf("New agent discovered: %s (type: %s)", agentID, agent.Type)

            }

        }

    }

}


// The rest of the implementation would include task assignment logic,

// agent health monitoring, and HTTP handlers for the API endpoints.


func main() {

    coordinator, err := NewAgentCoordinator()

    if err != nil {

        log.Fatalf("Failed to create agent coordinator: %v", err)

    }

    

    coordinator.Start()

}



This implementation demonstrates the core functionality of our agent coordinator. The coordinator discovers agent pods in the Kubernetes cluster, tracks their status, and assigns tasks based on agent capabilities. The coordinator also exposes HTTP endpoints for external systems to interact with the agent platform.


Let's now create the Kubernetes Deployment for our coordinator:



apiVersion: apps/v1

kind: Deployment

metadata:

  name: agent-coordinator

  namespace: agentic-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: agent-coordinator

  template:

    metadata:

      labels:

        app: agent-coordinator

    spec:

      serviceAccountName: agent-coordinator-sa

      containers:

      - name: coordinator

        image: agentic-system/agent-coordinator:latest

        resources:

          requests:

            memory: "256Mi"

            cpu: "100m"

          limits:

            memory: "512Mi"

            cpu: "200m"

        ports:

        - containerPort: 8080

        env:

        - name: NAMESPACE

          valueFrom:

            fieldRef:

              fieldPath: metadata.namespace

---

apiVersion: v1

kind: Service

metadata:

  name: agent-coordinator-svc

  namespace: agentic-system

spec:

  selector:

    app: agent-coordinator

  ports:

  - port: 8080

    targetPort: 8080

  type: ClusterIP



The coordinator needs appropriate permissions to interact with the Kubernetes API. Let's create a service account with the necessary RBAC permissions:



apiVersion: v1

kind: ServiceAccount

metadata:

  name: agent-coordinator-sa

  namespace: agentic-system

---

apiVersion: rbac.authorization.k8s.io/v1

kind: Role

metadata:

  name: agent-coordinator-role

  namespace: agentic-system

rules:

- apiGroups: [""]

  resources: ["pods"]

  verbs: ["get", "list", "watch", "create", "update", "delete"]

- apiGroups: ["apps"]

  resources: ["deployments", "statefulsets"]

  verbs: ["get", "list", "watch", "create", "update", "patch"]

---

apiVersion: rbac.authorization.k8s.io/v1

kind: RoleBinding

metadata:

  name: agent-coordinator-rb

  namespace: agentic-system

subjects:

- kind: ServiceAccount

  name: agent-coordinator-sa

  namespace: agentic-system

roleRef:

  kind: Role

  name: agent-coordinator-role

  apiGroup: rbac.authorization.k8s.io



These RBAC rules grant the coordinator permissions to manage pods and deployments within the agentic-system namespace, which is necessary for it to monitor and dynamically scale agent instances.


Building Specialized Agent Microservices


Now, let's implement a specialized agent microservice. Each agent will have specific capabilities and will communicate with the coordinator for task assignments. We'll create a Python-based agent that specializes in data processing as an example.


Here's the implementation of our data processing agent:



import os

import json

import time

import uuid

import requests

import logging

from typing import Dict, List, Any, Optional

import numpy as np

import threading

from flask import Flask, request, jsonify

from llamacpp_client import LlamaCppClient


# Configure logging

logging.basicConfig(level=logging.INFO,

                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger('data-processing-agent')


app = Flask(__name__)


class DataProcessingAgent:

    def __init__(self):

        self.agent_id = os.environ.get('AGENT_ID', f'data-agent-{uuid.uuid4().hex[:8]}')

        self.agent_type = "data-processor"

        self.capabilities = ["dataAnalysis", "textExtraction", "dataTransformation"]

        self.coordinator_url = os.environ.get('COORDINATOR_URL', 'http://agent-coordinator-svc:8080')

        self.status = "initializing"

        self.current_task: Optional[Dict[str, Any]] = None

        self.hardware_accel = self._detect_hardware_acceleration()

        

        # Initialize LLM client

        self.llm = LlamaCppClient(

            model_path=os.environ.get('MODEL_PATH', '/models/llama-2-7b-q4_k_m.gguf'),

            n_ctx=2048,

            n_threads=int(os.environ.get('N_THREADS', '4')),

            hardware_accel=self.hardware_accel

        )

        

        # Start background threads

        self.running = True

        self.heartbeat_thread = threading.Thread(target=self._heartbeat_loop)

        self.task_thread = threading.Thread(target=self._task_polling_loop)

        self.heartbeat_thread.daemon = True

        self.task_thread.daemon = True


    def start(self):

        """Start the agent's background processes and register with the coordinator"""

        logger.info(f"Starting agent {self.agent_id} with capabilities: {self.capabilities}")

        self._register_with_coordinator()

        self.status = "active"

        self.heartbeat_thread.start()

        self.task_thread.start()

        logger.info(f"Agent {self.agent_id} started successfully")


    def _detect_hardware_acceleration(self) -> str:

        """Detect available hardware acceleration"""

        try:

            import torch

            if torch.cuda.is_available():

                return "cuda"

            if hasattr(torch.backends, "mps") and torch.backends.mps.is_available():

                return "mps"

        except ImportError:

            pass

            

        try:

            # Check for ROCm

            import subprocess

            result = subprocess.run(['rocminfo'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

            if result.returncode == 0:

                return "rocm"

        except (ImportError, FileNotFoundError):

            pass

            

        return "cpu"


    def _register_with_coordinator(self):

        """Register this agent with the coordinator service"""

        registration_data = {

            "id": self.agent_id,

            "type": self.agent_type,

            "capabilities": self.capabilities,

            "status": self.status,

            "hardwareAccel": self.hardware_accel,

            "metadata": {

                "language": "python",

                "version": "1.0.0"

            }

        }

        

        try:

            response = requests.post(

                f"{self.coordinator_url}/agents",

                json=registration_data,

                timeout=5

            )

            if response.status_code == 200:

                logger.info(f"Successfully registered with coordinator")

            else:

                logger.error(f"Failed to register with coordinator: {response.status_code} - {response.text}")

        except requests.RequestException as e:

            logger.error(f"Error connecting to coordinator: {e}")


    def _heartbeat_loop(self):

        """Send regular heartbeats to the coordinator"""

        while self.running:

            try:

                health_data = {

                    "agentId": self.agent_id,

                    "status": self.status,

                    "timestamp": time.time()

                }

                

                response = requests.post(

                    f"{self.coordinator_url}/health",

                    json=health_data,

                    timeout=5

                )

                

                if response.status_code != 200:

                    logger.warning(f"Heartbeat failed: {response.status_code}")

            except requests.RequestException as e:

                logger.warning(f"Heartbeat error: {e}")

                

            time.sleep(30)  # Send heartbeat every 30 seconds


    def _task_polling_loop(self):

        """Poll for new tasks from the coordinator"""

        while self.running:

            if self.current_task is None and self.status == "active":

                try:

                    response = requests.get(

                        f"{self.coordinator_url}/tasks?agent={self.agent_id}&status=pending",

                        timeout=5

                    )

                    

                    if response.status_code == 200:

                        tasks = response.json().get("tasks", [])

                        if tasks:

                            self.current_task = tasks[0]

                            self.status = "busy"

                            # Process task in a separate thread

                            threading.Thread(target=self._process_task, args=(self.current_task,)).start()

                except requests.RequestException as e:

                    logger.warning(f"Task polling error: {e}")

            

            time.sleep(5)  # Poll every 5 seconds


    def _process_task(self, task):

        """Process a task using LLM and data processing capabilities"""

        task_id = task["id"]

        task_type = task["type"]

        payload = task["payload"]

        

        logger.info(f"Processing task {task_id} of type {task_type}")

        

        try:

            # Update task status to "processing"

            self._update_task_status(task_id, "processing")

            

            result = None

            

            if task_type == "dataAnalysis":

                result = self._perform_data_analysis(payload)

            elif task_type == "textExtraction":

                result = self._perform_text_extraction(payload)

            elif task_type == "dataTransformation":

                result = self._perform_data_transformation(payload)

            else:

                raise ValueError(f"Unsupported task type: {task_type}")

                

            # Update task with result

            self._update_task_result(task_id, "completed", result)

            

        except Exception as e:

            logger.error(f"Error processing task {task_id}: {e}")

            self._update_task_result(task_id, "failed", {"error": str(e)})

        

        # Reset agent status

        self.current_task = None

        self.status = "active"


    def _perform_data_analysis(self, payload):

        """Analyze data using LLM and numerical processing"""

        data = payload.get("data", [])

        if not data:

            return {"error": "No data provided"}

            

        # Convert data to numpy array for analysis

        try:

            arr = np.array(data)

            

            # Basic statistical analysis

            stats = {

                "mean": float(np.mean(arr)),

                "median": float(np.median(arr)),

                "std": float(np.std(arr)),

                "min": float(np.min(arr)),

                "max": float(np.max(arr))

            }

            

            # Use LLM for insights

            prompt = f"""

            I have the following statistical data:

            Mean: {stats['mean']}

            Median: {stats['median']}

            Standard Deviation: {stats['std']}

            Min: {stats['min']}

            Max: {stats['max']}

            

            Provide 3 key insights about this data and potential implications.

            """

            

            llm_response = self.llm.generate(prompt, max_tokens=512)

            

            return {

                "statistics": stats,

                "insights": llm_response.strip(),

                "dataPoints": len(data)

            }

        except Exception as e:

            logger.error(f"Error in data analysis: {e}")

            return {"error": f"Analysis failed: {str(e)}"}


    # Additional methods would implement other capabilities and communication with the coordinator


# Flask routes for external API

@app.route('/health', methods=['GET'])

def health_check():

    return jsonify({"status": agent.status})


@app.route('/tasks', methods=['POST'])

def receive_task():

    if agent.status != "active":

        return jsonify({"error": "Agent is not available"}), 503

        

    task = request.json

    if not task:

        return jsonify({"error": "No task data provided"}), 400

        

    # Queue the task for processing

    # In a real implementation, this would validate and queue the task

    return jsonify({"accepted": True, "agentId": agent.agent_id})


if __name__ == "__main__":

    agent = DataProcessingAgent()

    agent.start()

    

    # Start the Flask server

    app.run(host='0.0.0.0', port=5000)



This agent implementation demonstrates several key aspects of our multi-agent system:


1. Hardware detection for acceleration

2. Registration with the coordinator

3. Heartbeat mechanism for health monitoring

4. Task polling and processing

5. Integration with llama.cpp for inference

6. Specialized data processing capabilities


Now, let's create a Kubernetes deployment for this agent type:



apiVersion: apps/v1

kind: Deployment

metadata:

  name: data-processing-agent

  namespace: agentic-system

spec:

  replicas: 3

  selector:

    matchLabels:

      app: data-processing-agent

      component: agent

  template:

    metadata:

      labels:

        app: data-processing-agent

        component: agent

        agent-type: data-processor

      annotations:

        capabilities: '["dataAnalysis", "textExtraction", "dataTransformation"]'

    spec:

      containers:

      - name: agent

        image: agentic-system/data-processing-agent:latest

        resources:

          requests:

            memory: "1Gi"

            cpu: "500m"

          limits:

            memory: "2Gi"

            cpu: "1"

        ports:

        - containerPort: 5000

        env:

        - name: COORDINATOR_URL

          value: "http://agent-coordinator-svc:8080"

        - name: MODEL_PATH

          value: "/models/llama-2-7b-q4_k_m.gguf"

        - name: N_THREADS

          value: "4"

        volumeMounts:

        - name: models-volume

          mountPath: /models

        - name: knowledge-base

          mountPath: /knowledge

      volumes:

      - name: models-volume

        persistentVolumeClaim:

          claimName: models-pvc

      - name: knowledge-base

        persistentVolumeClaim:

          claimName: knowledge-base-pvc

---

apiVersion: v1

kind: Service

metadata:

  name: data-processing-agent-svc

  namespace: agentic-system

spec:

  selector:

    app: data-processing-agent

  ports:

  - port: 80

    targetPort: 5000

  type: ClusterIP



This deployment creates three instances of our data processing agent, with appropriate resource allocations and volume mounts for the model files and knowledge base.


Integrating llama.cpp as the AI Foundation


Now, let's focus on integrating llama.cpp into our agents. The llama.cpp library is a high-performance C++ implementation of the LLaMA model, which allows us to run large language models efficiently on various hardware platforms.


We'll first create a C++ service that wraps llama.cpp and exposes it via a RESTful API. The service will be optimized based on the available hardware acceleration.


Here's a simplified implementation of our llama.cpp server:



#include <iostream>

#include <string>

#include <vector>

#include <thread>

#include <mutex>

#include <condition_variable>

#include <queue>

#include <atomic>

#include <chrono>

#include <sstream>


#include <httplib.h>

#include <nlohmann/json.hpp>

#include "llama.h"


using json = nlohmann::json;


class LlamaServer {

public:

    LlamaServer(const std::string& model_path, size_t n_ctx = 2048, int n_threads = 4, bool use_gpu = false) 

        : running_(false), n_ctx_(n_ctx), n_threads_(n_threads), use_gpu_(use_gpu) {

        

        // Initialize llama.cpp

        llama_backend_init(use_gpu);

        

        // Load the model

        llama_model_params model_params = llama_model_default_params();

        model_ = llama_load_model_from_file(model_path.c_str(), model_params);

        

        if (model_ == nullptr) {

            throw std::runtime_error("Failed to load model from " + model_path);

        }

        

        std::cout << "Model loaded successfully from " << model_path << std::endl;

        std::cout << "Using GPU acceleration: " << (use_gpu ? "Yes" : "No") << std::endl;

        

        // Start worker threads

        running_ = true;

        for (int i = 0; i < n_threads_; ++i) {

            workers_.emplace_back([this]() { worker_loop(); });

        }

    }

    

    ~LlamaServer() {

        // Stop all worker threads

        running_ = false;

        cv_.notify_all();

        

        for (auto& worker : workers_) {

            if (worker.joinable()) {

                worker.join();

            }

        }

        

        // Free llama.cpp resources

        if (model_ != nullptr) {

            llama_free_model(model_);

        }

        

        llama_backend_free();

    }

    

    void start_server(const std::string& host, int port) {

        httplib::Server server;

        

        // Define API endpoints

        server.Post("/generate", [this](const httplib::Request& req, httplib::Response& res) {

            try {

                json request_data = json::parse(req.body);

                

                // Extract request parameters

                std::string prompt = request_data["prompt"];

                int max_tokens = request_data.value("max_tokens", 256);

                float temperature = request_data.value("temperature", 0.8f);

                float top_p = request_data.value("top_p", 0.9f);

                

                // Create a new inference task

                InferenceTask task{

                    prompt,

                    max_tokens,

                    temperature,

                    top_p,

                    "",  // Will be populated with the result

                    std::promise<void>()

                };

                

                // Get future from the promise

                std::future<void> future = task.completion.get_future();

                

                // Add task to the queue

                {

                    std::lock_guard<std::mutex> lock(queue_mutex_);

                    task_queue_.push(std::move(task));

                }

                

                // Notify a worker

                cv_.notify_one();

                

                // Wait for the result

                future.wait();

                

                // Send the response

                json response = {

                    {"text", task.result},

                    {"prompt_tokens", prompt.size() / 4},  // Approximate

                    {"completion_tokens", task.result.size() / 4},  // Approximate

                    {"total_tokens", (prompt.size() + task.result.size()) / 4}  // Approximate

                };

                

                res.set_content(response.dump(), "application/json");

                

            } catch (const std::exception& e) {

                json error = {{"error", e.what()}};

                res.status = 400;

                res.set_content(error.dump(), "application/json");

            }

        });

        

        server.Get("/health", [this](const httplib::Request& req, httplib::Response& res) {

            json health = {

                {"status", "healthy"},

                {"model_loaded", model_ != nullptr},

                {"workers", n_threads_},

                {"queue_size", task_queue_.size()},

                {"gpu_enabled", use_gpu_}

            };

            

            res.set_content(health.dump(), "application/json");

        });

        

        std::cout << "Starting Llama.cpp server on " << host << ":" << port << std::endl;

        server.listen(host, port);

    }

    

private:

    struct InferenceTask {

        std::string prompt;

        int max_tokens;

        float temperature;

        float top_p;

        std::string result;

        std::promise<void> completion;

    };

    

    void worker_loop() {

        while (running_) {

            std::unique_lock<std::mutex> lock(queue_mutex_);

            

            // Wait for a task or stop signal

            cv_.wait(lock, [this]() {

                return !task_queue_.empty() || !running_;

            });

            

            if (!running_) {

                break;

            }

            

            if (task_queue_.empty()) {

                continue;

            }

            

            // Get the next task

            InferenceTask task = std::move(task_queue_.front());

            task_queue_.pop();

            

            // Release the lock while processing

            lock.unlock();

            

            try {

                // Create a new context for this inference

                llama_context_params ctx_params = llama_context_default_params();

                ctx_params.n_ctx = n_ctx_;

                ctx_params.n_threads = 1;  // Each worker uses 1 thread

                

                llama_context* ctx = llama_new_context_with_model(model_, ctx_params);

                

                if (ctx == nullptr) {

                    throw std::runtime_error("Failed to create context");

                }

                

                // Tokenize the prompt

                std::vector<llama_token> tokens = llama_tokenize(ctx, task.prompt, true);

                

                if (tokens.size() >= n_ctx_) {

                    tokens.resize(n_ctx_ - 4);  // Leave some room for generated tokens

                }

                

                // Evaluate the prompt

                if (llama_eval(ctx, tokens.data(), tokens.size(), 0, 1) != 0) {

                    throw std::runtime_error("Failed to evaluate prompt");

                }

                

                // Generate response

                std::stringstream result;

                llama_token prev_token = tokens.back();

                

                for (int i = 0; i < task.max_tokens; ++i) {

                    // Get logits for the last token

                    float* logits = llama_get_logits(ctx);

                    

                    // Sample next token

                    llama_token_data_array candidates = llama_token_get_candidates(ctx, logits, llama_n_vocab(model_));

                    llama_token_data* candidates_p = candidates.data;

                    

                    // Apply temperature and top_p sampling

                    llama_sample_top_p(ctx, &candidates_p, candidates.size, task.top_p);

                    llama_token id = llama_sample_token(ctx, candidates_p, candidates.size, task.temperature);

                    

                    // If we hit the end of sequence token, stop generating

                    if (id == llama_token_eos(model_)) {

                        break;

                    }

                    

                    // Convert token to text

                    const char* token_str = llama_token_to_str(ctx, id);

                    if (token_str != nullptr) {

                        result << token_str;

                    }

                    

                    // Evaluate the new token

                    if (llama_eval(ctx, &id, 1, tokens.size() + i, 1) != 0) {

                        throw std::runtime_error("Failed to evaluate token");

                    }

                    

                    prev_token = id;

                }

                

                // Store the result

                task.result = result.str();

                

                // Clean up

                llama_free_context(ctx);

                

            } catch (const std::exception& e) {

                task.result = std::string("Error: ") + e.what();

            }

            

            // Notify completion

            task.completion.set_value();

        }

    }

    

    // llama.cpp resources

    llama_model* model_;

    size_t n_ctx_;

    int n_threads_;

    bool use_gpu_;

    

    // Worker thread management

    std::atomic<bool> running_;

    std::vector<std::thread> workers_;

    std::queue<InferenceTask> task_queue_;

    std::mutex queue_mutex_;

    std::condition_variable cv_;

};


int main(int argc, char** argv) {

    // Parse command line arguments

    std::string model_path = "/models/llama-2-7b-q4_k_m.gguf";

    size_t n_ctx = 2048;

    int n_threads = 4;

    bool use_gpu = false;

    std::string host = "0.0.0.0";

    int port = 8000;

    

    for (int i = 1; i < argc; ++i) {

        std::string arg = argv[i];

        

        if (arg == "--model" && i + 1 < argc) {

            model_path = argv[++i];

        } else if (arg == "--ctx" && i + 1 < argc) {

            n_ctx = std::stoi(argv[++i]);

        } else if (arg == "--threads" && i + 1 < argc) {

            n_threads = std::stoi(argv[++i]);

        } else if (arg == "--gpu") {

            use_gpu = true;

        } else if (arg == "--host" && i + 1 < argc) {

            host = argv[++i];

        } else if (arg == "--port" && i + 1 < argc) {

            port = std::stoi(argv[++i]);

        }

    }

    

    try {

        // Check for hardware acceleration

        const char* gpu_info = getenv("GPU_INFO");

        if (gpu_info != nullptr) {

            std::string gpu_type = gpu_info;

            

            if (gpu_type == "cuda") {

                std::cout << "CUDA detected, enabling GPU acceleration" << std::endl;

                use_gpu = true;

            } else if (gpu_type == "mps") {

                std::cout << "Apple MPS detected, enabling GPU acceleration" << std::endl;

                use_gpu = true;

            } else if (gpu_type == "rocm") {

                std::cout << "AMD ROCm detected, enabling GPU acceleration" << std::endl;

                use_gpu = true;

            }

        }

        

        // Create and start the server

        LlamaServer server(model_path, n_ctx, n_threads, use_gpu);

        server.start_server(host, port);

        

    } catch (const std::exception& e) {

        std::cerr << "Error: " << e.what() << std::endl;

        return 1;

    }

    

    return 0;

}



This C++ service wraps llama.cpp and provides a RESTful API for text generation. It includes features such as multi-threading for handling concurrent requests, temperature and top-p sampling for controlling the generation diversity, and automatic hardware acceleration detection.


Now, let's create a Dockerfile to build this service with the appropriate dependencies:



FROM ubuntu:22.04 as builder


# Install build dependencies

RUN apt-get update && apt-get install -y \

    build-essential \

    cmake \

    git \

    wget \

    libssl-dev \

    libcurl4-openssl-dev \

    python3-dev \

    python3-pip \

    pkg-config \

    && rm -rf /var/lib/apt/lists/*


# Clone and build llama.cpp

WORKDIR /build

RUN git clone https://github.com/ggerganov/llama.cpp.git

WORKDIR /build/llama.cpp


# Build with CUDA support if available

ARG CUDA_SUPPORT=OFF

ARG MPS_SUPPORT=OFF

ARG ROCM_SUPPORT=OFF


RUN if [ "$CUDA_SUPPORT" = "ON" ]; then \

        cmake -B build -DLLAMA_CUBLAS=ON && \

        cmake --build build --config Release; \

    elif [ "$MPS_SUPPORT" = "ON" ]; then \

        cmake -B build -DLLAMA_METAL=ON && \

        cmake --build build --config Release; \

    elif [ "$ROCM_SUPPORT" = "ON" ]; then \

        cmake -B build -DLLAMA_HIPBLAS=ON && \

        cmake --build build --config Release; \

    else \

        cmake -B build && \

        cmake --build build --config Release; \

    fi


# Build the server application

WORKDIR /build/server

COPY server.cpp /build/server/

COPY CMakeLists.txt /build/server/


RUN cmake -B build \

    -DLLAMA_CPP_PATH=/build/llama.cpp \

    -DCUDA_SUPPORT=$CUDA_SUPPORT \

    -DMPS_SUPPORT=$MPS_SUPPORT \

    -DROCM_SUPPORT=$ROCM_SUPPORT && \

    cmake --build build --config Release


# Create the runtime image

FROM ubuntu:22.04


# Install runtime dependencies

RUN apt-get update && apt-get install -y \

    libssl3 \

    libcurl4 \

    python3-minimal \

    && rm -rf /var/lib/apt/lists/*


# Copy the built executables

COPY --from=builder /build/server/build/llama_server /usr/local/bin/

COPY --from=builder /build/llama.cpp/build/libllama.so /usr/local/lib/


# Set library path

ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH


# Create model directory

RUN mkdir -p /models


# Set the entrypoint

ENTRYPOINT ["/usr/local/bin/llama_server"]



This Dockerfile is designed to build the llama.cpp service with optional CUDA, MPS, or ROCm support. The build arguments (CUDA_SUPPORT, MPS_SUPPORT, ROCM_SUPPORT) can be set during the build process to enable the appropriate hardware acceleration.


Next, let's create Kubernetes manifests for deploying the llama.cpp service with hardware-specific configurations:



# Base configuration for the llama.cpp service

apiVersion: v1

kind: ConfigMap

metadata:

  name: llama-config

  namespace: agentic-system

data:

  model-path: "/models/llama-2-7b-q4_k_m.gguf"

  context-size: "2048"

  threads: "4"

---

# Persistent volume for model storage

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: models-pvc

  namespace: agentic-system

spec:

  accessModes:

    - ReadOnlyMany

  resources:

    requests:

      storage: 10Gi

  storageClassName: standard

---

# Deployment for CPU nodes

apiVersion: apps/v1

kind: Deployment

metadata:

  name: llama-cpu

  namespace: agentic-system

spec:

  replicas: 2

  selector:

    matchLabels:

      app: llama-service

      hardware: cpu

  template:

    metadata:

      labels:

        app: llama-service

        hardware: cpu

    spec:

      nodeSelector:

        hardware-type: cpu

      containers:

      - name: llama

        image: agentic-system/llama-service:latest

        args:

        - "--model"

        - "$(MODEL_PATH)"

        - "--ctx"

        - "$(CONTEXT_SIZE)"

        - "--threads"

        - "$(THREADS)"

        resources:

          requests:

            memory: "2Gi"

            cpu: "2"

          limits:

            memory: "4Gi"

            cpu: "4"

        ports:

        - containerPort: 8000

        env:

        - name: MODEL_PATH

          valueFrom:

            configMapKeyRef:

              name: llama-config

              key: model-path

        - name: CONTEXT_SIZE

          valueFrom:

            configMapKeyRef:

              name: llama-config

              key: context-size

        - name: THREADS

          valueFrom:

            configMapKeyRef:

              name: llama-config

              key: threads

        volumeMounts:

        - name: models-volume

          mountPath: /models

          readOnly: true

      volumes:

      - name: models-volume

        persistentVolumeClaim:

          claimName: models-pvc

---

# Deployment for CUDA GPU nodes

apiVersion: apps/v1

kind: Deployment

metadata:

  name: llama-cuda

  namespace: agentic-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: llama-service

      hardware: cuda

  template:

    metadata:

      labels:

        app: llama-service

        hardware: cuda

    spec:

      nodeSelector:

        hardware-type: cuda

      containers:

      - name: llama

        image: agentic-system/llama-service:cuda

        args:

        - "--model"

        - "$(MODEL_PATH)"

        - "--ctx"

        - "$(CONTEXT_SIZE)"

        - "--gpu"

        resources:

          requests:

            memory: "4Gi"

            cpu: "2"

            nvidia.com/gpu: "1"

          limits:

            memory: "8Gi"

            cpu: "4"

            nvidia.com/gpu: "1"

        ports:

        - containerPort: 8000

        env:

        - name: MODEL_PATH

          valueFrom:

            configMapKeyRef:

              name: llama-config

              key: model-path

        - name: CONTEXT_SIZE

          valueFrom:

            configMapKeyRef:

              name: llama-config

              key: context-size

        - name: GPU_INFO

          value: "cuda"

        volumeMounts:

        - name: models-volume

          mountPath: /models

          readOnly: true

      volumes:

      - name: models-volume

        persistentVolumeClaim:

          claimName: models-pvc

---

# Similar deployments for MPS and ROCm would follow the same pattern

# ...

---

# Service to route requests to the llama.cpp instances

apiVersion: v1

kind: Service

metadata:

  name: llama-service

  namespace: agentic-system

spec:

  selector:

    app: llama-service

  ports:

  - port: 80

    targetPort: 8000

  type: ClusterIP



These Kubernetes manifests deploy multiple versions of the llama.cpp service, each optimized for a specific hardware platform. The service uses node selectors to ensure that pods are scheduled on nodes with the appropriate hardware. The common configuration, such as the model path and context size, is stored in a ConfigMap for easy management.


Optimizing llama.cpp for Hardware Acceleration


Now, let's look at how to optimize llama.cpp for different hardware platforms:


CUDA Optimization

For NVIDIA GPUs with CUDA support, we need to build llama.cpp with CUBLAS enabled. Here's the CMakeLists.txt file for our server application with CUDA support:



cmake_minimum_required(VERSION 3.12)

project(llama_server)


set(CMAKE_CXX_STANDARD 17)

set(CMAKE_CXX_STANDARD_REQUIRED ON)


# Check for CUDA support

option(CUDA_SUPPORT "Build with CUDA support" OFF)

if(CUDA_SUPPORT)

    find_package(CUDA REQUIRED)

    add_definitions(-DLLAMA_CUBLAS=1)

    set(CUDA_LIBS ${CUDA_LIBRARIES} ${CUDA_CUBLAS_LIBRARIES})

endif()


# Check for MPS support

option(MPS_SUPPORT "Build with MPS support" OFF)

if(MPS_SUPPORT)

    add_definitions(-DLLAMA_METAL=1)

endif()


# Check for ROCm support

option(ROCM_SUPPORT "Build with ROCm support" OFF)

if(ROCM_SUPPORT)

    find_package(hip REQUIRED)

    add_definitions(-DLLAMA_HIPBLAS=1)

    set(ROCM_LIBS hip::host hip::device)

endif()


# Find llama.cpp

set(LLAMA_CPP_PATH "" CACHE PATH "Path to llama.cpp repository")

if(NOT LLAMA_CPP_PATH)

    message(FATAL_ERROR "LLAMA_CPP_PATH not set. Please specify the path to llama.cpp repository.")

endif()


include_directories(${LLAMA_CPP_PATH})

link_directories(${LLAMA_CPP_PATH}/build)


# Find dependencies

find_package(Threads REQUIRED)

find_package(OpenSSL REQUIRED)

find_package(CURL REQUIRED)


# Add the httplib header-only library

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/deps/cpp-httplib)


# Add the nlohmann/json header-only library

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/deps/json/include)


# Add source files

set(SOURCES

    server.cpp

)


# Build the server executable

add_executable(llama_server ${SOURCES})


# Link against llama.cpp and other libraries

target_link_libraries(llama_server

    llama

    Threads::Threads

    OpenSSL::SSL

    OpenSSL::Crypto

    ${CURL_LIBRARIES}

)


# Add platform-specific libraries

if(CUDA_SUPPORT)

    target_link_libraries(llama_server ${CUDA_LIBS})

endif()


if(ROCM_SUPPORT)

    target_link_libraries(llama_server ${ROCM_LIBS})

endif()


# Install the executable

install(TARGETS llama_server DESTINATION bin)



When deploying on nodes with NVIDIA GPUs, we would build a custom Docker image with CUDA support:



docker build --build-arg CUDA_SUPPORT=ON -t agentic-system/llama-service:cuda .



Apple MPS Optimization


For Apple Silicon with Metal Performance Shaders (MPS), we need to build llama.cpp with Metal support:



docker build --build-arg MPS_SUPPORT=ON -t agentic-system/llama-service:mps .



The deployment would then use node selectors to ensure that the MPS-optimized pods are scheduled on Apple Silicon nodes:



apiVersion: apps/v1

kind: Deployment

metadata:

  name: llama-mps

  namespace: agentic-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: llama-service

      hardware: mps

  template:

    metadata:

      labels:

        app: llama-service

        hardware: mps

    spec:

      nodeSelector:

        hardware-type: mps

      containers:

      - name: llama

        image: agentic-system/llama-service:mps

        # ... similar to CUDA deployment

        env:

        - name: GPU_INFO

          value: "mps"



AMD ROCm Optimization


For AMD GPUs with ROCm support, we would build llama.cpp with HIP/HIPBLAS support:



docker build --build-arg ROCM_SUPPORT=ON -t agentic-system/llama-service:rocm .



The deployment would use node selectors for AMD GPU nodes:



apiVersion: apps/v1

kind: Deployment

metadata:

  name: llama-rocm

  namespace: agentic-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: llama-service

      hardware: rocm

  template:

    metadata:

      labels:

        app: llama-service

        hardware: rocm

    spec:

      nodeSelector:

        hardware-type: rocm

      containers:

      - name: llama

        image: agentic-system/llama-service:rocm

        # ... similar to CUDA deployment

        resources:

          requests:

            memory: "4Gi"

            cpu: "2"

            amd.com/gpu: "1"

          limits:

            memory: "8Gi"

            cpu: "4"

            amd.com/gpu: "1"

        env:

        - name: GPU_INFO

          value: "rocm"



Implementing Inter-Agent Communication


To enable effective collaboration between agents, we need a robust communication mechanism. We'll use a message bus architecture based on NATS, a lightweight, high-performance messaging system.


First, let's deploy NATS in our Kubernetes cluster:



apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: nats

  namespace: agentic-system

spec:

  serviceName: nats

  replicas: 3

  selector:

    matchLabels:

      app: nats

  template:

    metadata:

      labels:

        app: nats

    spec:

      containers:

      - name: nats

        image: nats:2.9.16-alpine

        args:

        - "-c"

        - "/etc/nats/nats.conf"

        ports:

        - containerPort: 4222

          name: client

        - containerPort: 6222

          name: cluster

        - containerPort: 8222

          name: monitor

        volumeMounts:

        - name: config-volume

          mountPath: /etc/nats

      volumes:

      - name: config-volume

        configMap:

          name: nats-config

---

apiVersion: v1

kind: ConfigMap

metadata:

  name: nats-config

  namespace: agentic-system

data:

  nats.conf: |

    # NATS Server Configuration

    pid_file: "/var/run/nats/nats.pid"

    

    http: 8222

    

    cluster {

      port: 6222

      routes [

        nats://nats-0.nats:6222

        nats://nats-1.nats:6222

        nats://nats-2.nats:6222

      ]

    }

    

    jetstream {

      store_dir: "/data"

      max_mem: 1G

      max_file: 10G

    }

---

apiVersion: v1

kind: Service

metadata:

  name: nats

  namespace: agentic-system

spec:

  selector:

    app: nats

  ports:

  - name: client

    port: 4222

    targetPort: client

  - name: cluster

    port: 6222

    targetPort: cluster

  - name: monitor

    port: 8222

    targetPort: monitor

  clusterIP: None

```


Now, let's enhance our agent implementation to include NATS-based communication. We'll add a messaging client to our data processing agent:


import asyncio

import nats

from nats.aio.client import Client as NATS

from nats.aio.errors import ErrConnectionClosed, ErrTimeout


class MessageBus:

    def __init__(self, nats_url="nats://nats:4222"):

        self.nats_url = nats_url

        self.nc = NATS()

        self.js = None

        self.connected = False

        self.subscriptions = {}

    

    async def connect(self):

        try:

            await self.nc.connect(self.nats_url)

            self.js = self.nc.jetstream()

            self.connected = True

            print(f"Connected to NATS at {self.nats_url}")

        except Exception as e:

            print(f"Error connecting to NATS: {e}")

            self.connected = False

    

    async def disconnect(self):

        if self.connected:

            await self.nc.close()

            self.connected = False

    

    async def publish(self, subject, message):

        if not self.connected:

            await self.connect()

        

        if isinstance(message, dict):

            message = json.dumps(message).encode()

        elif isinstance(message, str):

            message = message.encode()

        

        try:

            ack = await self.js.publish(subject, message)

            return ack

        except Exception as e:

            print(f"Error publishing message: {e}")

            return None

    

    async def subscribe(self, subject, queue_group, callback):

        if not self.connected:

            await self.connect()

        

        async def wrapped_callback(msg):

            try:

                payload = msg.data.decode()

                try:

                    payload = json.loads(payload)

                except:

                    pass  # Not JSON, use as is

                

                await callback(subject, payload, msg)

            except Exception as e:

                print(f"Error in message handler: {e}")

        

        try:

            sub = await self.js.subscribe(subject, queue=queue_group, cb=wrapped_callback)

            self.subscriptions[subject] = sub

            return sub

        except Exception as e:

            print(f"Error subscribing to {subject}: {e}")

            return None

    

    async def unsubscribe(self, subject):

        if subject in self.subscriptions:

            try:

                await self.subscriptions[subject].unsubscribe()

                del self.subscriptions[subject]

            except Exception as e:

                print(f"Error unsubscribing from {subject}: {e}")



The MessageBus class provides a clean interface for publishing and subscribing to messages. Now, let's integrate it into our agent implementation:



class DataProcessingAgent:

    def __init__(self):

        # Previous initialization code...

        

        # Initialize message bus

        self.message_bus = MessageBus(os.environ.get('NATS_URL', 'nats://nats:4222'))

        self.event_loop = asyncio.get_event_loop()

        self.event_loop.run_until_complete(self.message_bus.connect())

        

        # Set up message handlers

        self.event_loop.run_until_complete(self.setup_message_handlers())

    

    async def setup_message_handlers(self):

        # Subscribe to agent-specific messages

        await self.message_bus.subscribe(

            f"agent.{self.agent_id}.command", 

            "data-processors",

            self.handle_command

        )

        

        # Subscribe to broadcast messages for all agents

        await self.message_bus.subscribe(

            "agent.broadcast.command",

            "data-processors",

            self.handle_broadcast

        )

        

        # Subscribe to specific capability requests

        for capability in self.capabilities:

            await self.message_bus.subscribe(

                f"capability.{capability}.request",

                "data-processors",

                self.handle_capability_request

            )

    

    async def handle_command(self, subject, message, raw_msg):

        """Handle commands directed to this specific agent"""

        logger.info(f"Received command: {message}")

        

        command = message.get("command")

        if command == "process":

            # Process the task asynchronously

            threading.Thread(target=self._process_task, args=(message.get("task"),)).start()

            

            # Acknowledge receipt

            await self.message_bus.publish(

                f"agent.{self.agent_id}.response",

                {"status": "accepted", "taskId": message.get("task", {}).get("id")}

            )

    

    async def handle_broadcast(self, subject, message, raw_msg):

        """Handle broadcast messages sent to all agents"""

        logger.info(f"Received broadcast: {message}")

        

        command = message.get("command")

        if command == "health_check":

            # Respond with health status

            await self.message_bus.publish(

                "agent.health.response",

                {

                    "agentId": self.agent_id,

                    "status": self.status,

                    "capabilities": self.capabilities,

                    "timestamp": time.time()

                }

            )

    

    async def handle_capability_request(self, subject, message, raw_msg):

        """Handle requests for specific capabilities"""

        capability = subject.split('.')[1]

        logger.info(f"Received capability request for {capability}: {message}")

        

        # Check if we're available to handle this request

        if self.status != "active":

            return

        

        # Offer to take the task if we can handle it

        await self.message_bus.publish(

            f"capability.{capability}.offer",

            {

                "agentId": self.agent_id,

                "requestId": message.get("requestId"),

                "load": 0.5,  # Example load metric

                "priority": 1  # Example priority

            }

        )

    

    async def publish_result(self, task_id, result):

        """Publish task results to the message bus"""

        await self.message_bus.publish(

            f"task.{task_id}.result",

            {

                "agentId": self.agent_id,

                "taskId": task_id,

                "result": result,

                "timestamp": time.time()

            }

        )

    

    # Modified _process_task method to publish results via the message bus

    def _process_task(self, task):

        task_id = task["id"]

        task_type = task["type"]

        payload = task["payload"]

        

        logger.info(f"Processing task {task_id} of type {task_type}")

        

        try:

            # Update task status to "processing"

            self._update_task_status(task_id, "processing")

            

            result = None

            

            if task_type == "dataAnalysis":

                result = self._perform_data_analysis(payload)

            elif task_type == "textExtraction":

                result = self._perform_text_extraction(payload)

            elif task_type == "dataTransformation":

                result = self._perform_data_transformation(payload)

            else:

                raise ValueError(f"Unsupported task type: {task_type}")

                

            # Update task with result

            self._update_task_result(task_id, "completed", result)

            

            # Publish result to message bus

            self.event_loop.run_until_complete(self.publish_result(task_id, result))

            

        except Exception as e:

            logger.error(f"Error processing task {task_id}: {e}")

            self._update_task_result(task_id, "failed", {"error": str(e)})

        

        # Reset agent status

        self.current_task = None

        self.status = "active"



This enhanced implementation allows agents to communicate through the message bus, enabling more complex interaction patterns such as:


1. Direct commands to specific agents

2. Broadcast messages to all agents

3. Capability-based task distribution

4. Asynchronous result publishing


Deployment and Scaling Considerations


A key advantage of running our multi-agent system on Kubernetes is the ability to scale dynamically based on workload demands. Let's implement a Horizontal Pod Autoscaler (HPA) for our agent deployments:



apiVersion: autoscaling/v2

kind: HorizontalPodAutoscaler

metadata:

  name: data-processing-agent-hpa

  namespace: agentic-system

spec:

  scaleTargetRef:

    apiVersion: apps/v1

    kind: Deployment

    name: data-processing-agent

  minReplicas: 1

  maxReplicas: 10

  metrics:

  - type: Resource

    resource:

      name: cpu

      target:

        type: Utilization

        averageUtilization: 70

  - type: Resource

    resource:

      name: memory

      target:

        type: Utilization

        averageUtilization: 80

  behavior:

    scaleUp:

      stabilizationWindowSeconds: 60

      policies:

      - type: Percent

        value: 100

        periodSeconds: 60

    scaleDown:

      stabilizationWindowSeconds: 300

      policies:

      - type: Percent

        value: 20

        periodSeconds: 120



This HPA configuration will automatically scale the number of data processing agent pods based on CPU and memory utilization. The behavior section defines how quickly the deployment should scale up and down, ensuring stability while still responding to changes in demand.


For the llama.cpp service, we may want to implement a custom scaling solution that takes into account the specific hardware requirements. Here's a simple Kubernetes CronJob that periodically checks the queue length and adjusts the number of replicas accordingly:



apiVersion: batch/v1

kind: CronJob

metadata:

  name: llama-scaler

  namespace: agentic-system

spec:

  schedule: "*/5 * * * *"  # Run every 5 minutes

  jobTemplate:

    spec:

      template:

        spec:

          serviceAccountName: llama-scaler-sa

          containers:

          - name: kubectl

            image: bitnami/kubectl:latest

            command:

            - /bin/sh

            - -c

            - |

              #!/bin/sh

              

              # Query the llama service for queue length

              QUEUE_LENGTH=$(curl -s http://llama-service/health | jq '.queue_size')

              

              # Scale based on queue length

              if [ "$QUEUE_LENGTH" -gt 50 ]; then

                # High load, scale up

                kubectl scale deployment llama-cpu --replicas=5

              elif [ "$QUEUE_LENGTH" -gt 20 ]; then

                # Medium load

                kubectl scale deployment llama-cpu --replicas=3

              elif [ "$QUEUE_LENGTH" -gt 5 ]; then

                # Low load

                kubectl scale deployment llama-cpu --replicas=2

              else

                # Minimal load

                kubectl scale deployment llama-cpu --replicas=1

              fi

          restartPolicy: OnFailure

---

apiVersion: v1

kind: ServiceAccount

metadata:

  name: llama-scaler-sa

  namespace: agentic-system

---

apiVersion: rbac.authorization.k8s.io/v1

kind: Role

metadata:

  name: llama-scaler-role

  namespace: agentic-system

rules:

- apiGroups: ["apps"]

  resources: ["deployments"]

  verbs: ["get", "update", "patch"]

---

apiVersion: rbac.authorization.k8s.io/v1

kind: RoleBinding

metadata:

  name: llama-scaler-rb

  namespace: agentic-system

subjects:

- kind: ServiceAccount

  name: llama-scaler-rb

  namespace: agentic-system

subjects:

- kind: ServiceAccount

  name: llama-scaler-sa

  namespace: agentic-system

roleRef:

  kind: Role

  name: llama-scaler-role

  apiGroup: rbac.authorization.k8s.io

```


This scaling solution periodically checks the queue length of the llama.cpp service and adjusts the number of replicas based on the current load. While this approach is relatively simple, it provides a good balance between responsiveness and stability.


Monitoring and Observability for Multi-Agent Systems


Monitoring a distributed multi-agent system requires capturing metrics, logs, and traces from all components. We'll implement a comprehensive observability stack based on Prometheus, Grafana, and OpenTelemetry.


First, let's add instrumentation to our agent code to expose Prometheus metrics:



from prometheus_client import Counter, Gauge, Histogram, start_http_server


class MetricsCollector:

    def __init__(self, port=8000):

        # Define metrics

        self.task_counter = Counter(

            'agent_tasks_total', 

            'Total number of tasks processed',

            ['agent_id', 'task_type', 'status']

        )

        

        self.active_tasks = Gauge(

            'agent_active_tasks',

            'Number of currently active tasks',

            ['agent_id']

        )

        

        self.task_duration = Histogram(

            'agent_task_duration_seconds',

            'Time spent processing tasks',

            ['agent_id', 'task_type'],

            buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0)

        )

        

        self.llm_inference_duration = Histogram(

            'agent_llm_inference_duration_seconds',

            'Time spent on LLM inference',

            ['agent_id', 'model'],

            buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0)

        )

        

        self.queue_size = Gauge(

            'agent_queue_size',

            'Number of tasks in the queue',

            ['agent_id']

        )

        

        # Start metrics server

        start_http_server(port)

        print(f"Metrics server started on port {port}")

    

    def record_task_started(self, agent_id, task_type):

        self.task_counter.labels(agent_id=agent_id, task_type=task_type, status='started').inc()

        self.active_tasks.labels(agent_id=agent_id).inc()

    

    def record_task_completed(self, agent_id, task_type, duration):

        self.task_counter.labels(agent_id=agent_id, task_type=task_type, status='completed').inc()

        self.active_tasks.labels(agent_id=agent_id).dec()

        self.task_duration.labels(agent_id=agent_id, task_type=task_type).observe(duration)

    

    def record_task_failed(self, agent_id, task_type, duration):

        self.task_counter.labels(agent_id=agent_id, task_type=task_type, status='failed').inc()

        self.active_tasks.labels(agent_id=agent_id).dec()

        self.task_duration.labels(agent_id=agent_id, task_type=task_type).observe(duration)

    

    def record_llm_inference(self, agent_id, model, duration):

        self.llm_inference_duration.labels(agent_id=agent_id, model=model).observe(duration)

    

    def set_queue_size(self, agent_id, size):

        self.queue_size.labels(agent_id=agent_id).set(size)



Now, let's integrate this metrics collector into our agent implementation:



class DataProcessingAgent:

    def __init__(self):

        # Previous initialization code...

        

        # Initialize metrics collector

        self.metrics = MetricsCollector(port=8000)

        

    def _process_task(self, task):

        task_id = task["id"]

        task_type = task["type"]

        payload = task["payload"]

        

        logger.info(f"Processing task {task_id} of type {task_type}")

        

        # Record task start

        start_time = time.time()

        self.metrics.record_task_started(self.agent_id, task_type)

        

        try:

            # Update task status to "processing"

            self._update_task_status(task_id, "processing")

            

            result = None

            

            if task_type == "dataAnalysis":

                result = self._perform_data_analysis(payload)

            elif task_type == "textExtraction":

                result = self._perform_text_extraction(payload)

            elif task_type == "dataTransformation":

                result = self._perform_data_transformation(payload)

            else:

                raise ValueError(f"Unsupported task type: {task_type}")

                

            # Update task with result

            self._update_task_result(task_id, "completed", result)

            

            # Publish result to message bus

            self.event_loop.run_until_complete(self.publish_result(task_id, result))

            

            # Record successful completion

            duration = time.time() - start_time

            self.metrics.record_task_completed(self.agent_id, task_type, duration)

            

        except Exception as e:

            logger.error(f"Error processing task {task_id}: {e}")

            self._update_task_result(task_id, "failed", {"error": str(e)})

            

            # Record failure

            duration = time.time() - start_time

            self.metrics.record_task_failed(self.agent_id, task_type, duration)

        

        # Reset agent status

        self.current_task = None

        self.status = "active"

    

    def _task_polling_loop(self):

        """Poll for new tasks from the coordinator"""

        while self.running:

            # Update queue size metric

            task_count = 0

            try:

                response = requests.get(

                    f"{self.coordinator_url}/tasks?agent={self.agent_id}&status=pending",

                    timeout=5

                )

                

                if response.status_code == 200:

                    tasks = response.json().get("tasks", [])

                    task_count = len(tasks)

            except:

                pass

            

            self.metrics.set_queue_size(self.agent_id, task_count)

            

            # Existing task polling logic...

            time.sleep(5)  # Poll every 5 seconds



Next, let's deploy Prometheus and Grafana to collect and visualize these metrics:



apiVersion: v1

kind: ConfigMap

metadata:

  name: prometheus-config

  namespace: agentic-system

data:

  prometheus.yml: |

    global:

      scrape_interval: 15s

      evaluation_interval: 15s

    

    scrape_configs:

      - job_name: 'kubernetes-pods'

        kubernetes_sd_configs:

          - role: pod

        relabel_configs:

          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]

            action: keep

            regex: true

          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]

            action: replace

            target_label: __metrics_path__

            regex: (.+)

          - source_labels: [__address__, __meta_kubernetes_pod_annotation_prometheus_io_port]

            action: replace

            regex: ([^:]+)(?::\d+)?;(\d+)

            replacement: $1:$2

            target_label: __address__

          - action: labelmap

            regex: __meta_kubernetes_pod_label_(.+)

          - source_labels: [__meta_kubernetes_namespace]

            action: replace

            target_label: kubernetes_namespace

          - source_labels: [__meta_kubernetes_pod_name]

            action: replace

            target_label: kubernetes_pod_name

---

apiVersion: apps/v1

kind: Deployment

metadata:

  name: prometheus

  namespace: agentic-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: prometheus

  template:

    metadata:

      labels:

        app: prometheus

    spec:

      serviceAccountName: prometheus

      containers:

      - name: prometheus

        image: prom/prometheus:v2.45.0

        args:

        - "--config.file=/etc/prometheus/prometheus.yml"

        - "--storage.tsdb.path=/prometheus"

        - "--storage.tsdb.retention.time=15d"

        ports:

        - containerPort: 9090

        volumeMounts:

        - name: prometheus-config

          mountPath: /etc/prometheus

        - name: prometheus-storage

          mountPath: /prometheus

      volumes:

      - name: prometheus-config

        configMap:

          name: prometheus-config

      - name: prometheus-storage

        persistentVolumeClaim:

          claimName: prometheus-storage-pvc

---

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: prometheus-storage-pvc

  namespace: agentic-system

spec:

  accessModes:

    - ReadWriteOnce

  resources:

    requests:

      storage: 10Gi

  storageClassName: standard

---

apiVersion: v1

kind: ServiceAccount

metadata:

  name: prometheus

  namespace: agentic-system

---

apiVersion: rbac.authorization.k8s.io/v1

kind: ClusterRole

metadata:

  name: prometheus

rules:

- apiGroups: [""]

  resources: ["nodes", "nodes/proxy", "services", "endpoints", "pods"]

  verbs: ["get", "list", "watch"]

- apiGroups: [""]

  resources: ["configmaps"]

  verbs: ["get"]

- nonResourceURLs: ["/metrics"]

  verbs: ["get"]

---

apiVersion: rbac.authorization.k8s.io/v1

kind: ClusterRoleBinding

metadata:

  name: prometheus

roleRef:

  apiGroup: rbac.authorization.k8s.io

  kind: ClusterRole

  name: prometheus

subjects:

- kind: ServiceAccount

  name: prometheus

  namespace: agentic-system

---

apiVersion: v1

kind: Service

metadata:

  name: prometheus

  namespace: agentic-system

spec:

  selector:

    app: prometheus

  ports:

  - port: 9090

    targetPort: 9090

  type: ClusterIP

---

apiVersion: apps/v1

kind: Deployment

metadata:

  name: grafana

  namespace: agentic-system

spec:

  replicas: 1

  selector:

    matchLabels:

      app: grafana

  template:

    metadata:

      labels:

        app: grafana

    spec:

      containers:

      - name: grafana

        image: grafana/grafana:10.0.3

        ports:

        - containerPort: 3000

        volumeMounts:

        - name: grafana-storage

          mountPath: /var/lib/grafana

        env:

        - name: GF_SECURITY_ADMIN_PASSWORD

          valueFrom:

            secretKeyRef:

              name: grafana-admin

              key: password

        - name: GF_USERS_ALLOW_SIGN_UP

          value: "false"

      volumes:

      - name: grafana-storage

        persistentVolumeClaim:

          claimName: grafana-storage-pvc

---

apiVersion: v1

kind: PersistentVolumeClaim

metadata:

  name: grafana-storage-pvc

  namespace: agentic-system

spec:

  accessModes:

    - ReadWriteOnce

  resources:

    requests:

      storage: 5Gi

  storageClassName: standard

---

apiVersion: v1

kind: Service

metadata:

  name: grafana

  namespace: agentic-system

spec:

  selector:

    app: grafana

  ports:

  - port: 80

    targetPort: 3000

  type: ClusterIP

---

apiVersion: v1

kind: Secret

metadata:

  name: grafana-admin

  namespace: agentic-system

type: Opaque

data:

  password: YWRtaW4xMjM=  # base64 encoded 'admin123' - change in production



This deployment sets up Prometheus for metrics collection and Grafana for visualization. The Prometheus configuration includes automatic discovery of pods with the appropriate annotations, which allows us to dynamically add and remove monitoring targets as agents scale up and down.


Let's also configure an example dashboard for our multi-agent system:



apiVersion: v1

kind: ConfigMap

metadata:

  name: grafana-dashboards

  namespace: agentic-system

data:

  agent-dashboard.json: |

    {

      "annotations": {

        "list": [

          {

            "builtIn": 1,

            "datasource": "-- Grafana --",

            "enable": true,

            "hide": true,

            "iconColor": "rgba(0, 211, 255, 1)",

            "name": "Annotations & Alerts",

            "type": "dashboard"

          }

        ]

      },

      "editable": true,

      "gnetId": null,

      "graphTooltip": 0,

      "id": 1,

      "links": [],

      "panels": [

        {

          "aliasColors": {},

          "bars": false,

          "dashLength": 10,

          "dashes": false,

          "datasource": "Prometheus",

          "fieldConfig": {

            "defaults": {},

            "overrides": []

          },

          "fill": 1,

          "fillGradient": 0,

          "gridPos": {

            "h": 8,

            "w": 12,

            "x": 0,

            "y": 0

          },

          "hiddenSeries": false,

          "id": 2,

          "legend": {

            "avg": false,

            "current": false,

            "max": false,

            "min": false,

            "show": true,

            "total": false,

            "values": false

          },

          "lines": true,

          "linewidth": 1,

          "nullPointMode": "null",

          "options": {

            "alertThreshold": true

          },

          "percentage": false,

          "pluginVersion": "7.5.5",

          "pointradius": 2,

          "points": false,

          "renderer": "flot",

          "seriesOverrides": [],

          "spaceLength": 10,

          "stack": false,

          "steppedLine": false,

          "targets": [

            {

              "exemplar": true,

              "expr": "sum(agent_tasks_total) by (status)",

              "interval": "",

              "legendFormat": "{{status}}",

              "refId": "A"

            }

          ],

          "thresholds": [],

          "timeFrom": null,

          "timeRegions": [],

          "timeShift": null,

          "title": "Total Tasks by Status",

          "tooltip": {

            "shared": true,

            "sort": 0,

            "value_type": "individual"

          },

          "type": "graph",

          "xaxis": {

            "buckets": null,

            "mode": "time",

            "name": null,

            "show": true,

            "values": []

          },

          "yaxes": [

            {

              "format": "short",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            },

            {

              "format": "short",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            }

          ],

          "yaxis": {

            "align": false,

            "alignLevel": null

          }

        },

        {

          "aliasColors": {},

          "bars": false,

          "dashLength": 10,

          "dashes": false,

          "datasource": "Prometheus",

          "fieldConfig": {

            "defaults": {},

            "overrides": []

          },

          "fill": 1,

          "fillGradient": 0,

          "gridPos": {

            "h": 8,

            "w": 12,

            "x": 12,

            "y": 0

          },

          "hiddenSeries": false,

          "id": 4,

          "legend": {

            "avg": false,

            "current": false,

            "max": false,

            "min": false,

            "show": true,

            "total": false,

            "values": false

          },

          "lines": true,

          "linewidth": 1,

          "nullPointMode": "null",

          "options": {

            "alertThreshold": true

          },

          "percentage": false,

          "pluginVersion": "7.5.5",

          "pointradius": 2,

          "points": false,

          "renderer": "flot",

          "seriesOverrides": [],

          "spaceLength": 10,

          "stack": false,

          "steppedLine": false,

          "targets": [

            {

              "exemplar": true,

              "expr": "sum(agent_active_tasks) by (agent_id)",

              "interval": "",

              "legendFormat": "{{agent_id}}",

              "refId": "A"

            }

          ],

          "thresholds": [],

          "timeFrom": null,

          "timeRegions": [],

          "timeShift": null,

          "title": "Active Tasks by Agent",

          "tooltip": {

            "shared": true,

            "sort": 0,

            "value_type": "individual"

          },

          "type": "graph",

          "xaxis": {

            "buckets": null,

            "mode": "time",

            "name": null,

            "show": true,

            "values": []

          },

          "yaxes": [

            {

              "format": "short",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            },

            {

              "format": "short",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            }

          ],

          "yaxis": {

            "align": false,

            "alignLevel": null

          }

        },

        {

          "aliasColors": {},

          "bars": false,

          "dashLength": 10,

          "dashes": false,

          "datasource": "Prometheus",

          "fieldConfig": {

            "defaults": {},

            "overrides": []

          },

          "fill": 1,

          "fillGradient": 0,

          "gridPos": {

            "h": 8,

            "w": 12,

            "x": 0,

            "y": 8

          },

          "hiddenSeries": false,

          "id": 6,

          "legend": {

            "avg": false,

            "current": false,

            "max": false,

            "min": false,

            "show": true,

            "total": false,

            "values": false

          },

          "lines": true,

          "linewidth": 1,

          "nullPointMode": "null",

          "options": {

            "alertThreshold": true

          },

          "percentage": false,

          "pluginVersion": "7.5.5",

          "pointradius": 2,

          "points": false,

          "renderer": "flot",

          "seriesOverrides": [],

          "spaceLength": 10,

          "stack": false,

          "steppedLine": false,

          "targets": [

            {

              "exemplar": true,

              "expr": "histogram_quantile(0.95, sum(rate(agent_task_duration_seconds_bucket[5m])) by (task_type, le))",

              "interval": "",

              "legendFormat": "{{task_type}}",

              "refId": "A"

            }

          ],

          "thresholds": [],

          "timeFrom": null,

          "timeRegions": [],

          "timeShift": null,

          "title": "95th Percentile Task Duration by Type",

          "tooltip": {

            "shared": true,

            "sort": 0,

            "value_type": "individual"

          },

          "type": "graph",

          "xaxis": {

            "buckets": null,

            "mode": "time",

            "name": null,

            "show": true,

            "values": []

          },

          "yaxes": [

            {

              "format": "s",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            },

            {

              "format": "short",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            }

          ],

          "yaxis": {

            "align": false,

            "alignLevel": null

          }

        },

        {

          "aliasColors": {},

          "bars": false,

          "dashLength": 10,

          "dashes": false,

          "datasource": "Prometheus",

          "fieldConfig": {

            "defaults": {},

            "overrides": []

          },

          "fill": 1,

          "fillGradient": 0,

          "gridPos": {

            "h": 8,

            "w": 12,

            "x": 12,

            "y": 8

          },

          "hiddenSeries": false,

          "id": 8,

          "legend": {

            "avg": false,

            "current": false,

            "max": false,

            "min": false,

            "show": true,

            "total": false,

            "values": false

          },

          "lines": true,

          "linewidth": 1,

          "nullPointMode": "null",

          "options": {

            "alertThreshold": true

          },

          "percentage": false,

          "pluginVersion": "7.5.5",

          "pointradius": 2,

          "points": false,

          "renderer": "flot",

          "seriesOverrides": [],

          "spaceLength": 10,

          "stack": false,

          "steppedLine": false,

          "targets": [

            {

              "exemplar": true,

              "expr": "histogram_quantile(0.95, sum(rate(agent_llm_inference_duration_seconds_bucket[5m])) by (model, le))",

              "interval": "",

              "legendFormat": "{{model}}",

              "refId": "A"

            }

          ],

          "thresholds": [],

          "timeFrom": null,

          "timeRegions": [],

          "timeShift": null,

          "title": "95th Percentile LLM Inference Duration by Model",

          "tooltip": {

            "shared": true,

            "sort": 0,

            "value_type": "individual"

          },

          "type": "graph",

          "xaxis": {

            "buckets": null,

            "mode": "time",

            "name": null,

            "show": true,

            "values": []

          },

          "yaxes": [

            {

              "format": "s",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            },

            {

              "format": "short",

              "label": null,

              "logBase": 1,

              "max": null,

              "min": null,

              "show": true

            }

          ],

          "yaxis": {

            "align": false,

            "alignLevel": null

          }

        }

      ],

      "refresh": "10s",

      "schemaVersion": 27,

      "style": "dark",

      "tags": [],

      "templating": {

        "list": []

      },

      "time": {

        "from": "now-6h",

        "to": "now"

      },

      "timepicker": {},

      "timezone": "",

      "title": "Agent System Dashboard",

      "uid": "agent-system",

      "version": 1

    }



This dashboard provides a comprehensive view of the multi-agent system's performance, including task counts, active tasks by agent, task duration percentiles, and LLM inference times.


Conclusion and Future Directions


In this article, we've explored how to build a sophisticated multi-agent Kubernetes application that leverages distributed pods, microservices, and hardware-optimized llama.cpp instances. The system we've designed is scalable, resilient, and capable of adapting to changing workloads and hardware environments.


Key components of our architecture include:


1. An agent coordinator service that manages task allocation and agent lifecycle

2. Specialized agent microservices with specific capabilities

3. A llama.cpp integration optimized for different hardware platforms (CUDA, MPS, ROCm)

4. A messaging system for inter-agent communication

5. Dynamic scaling based on workload demands

6. Comprehensive monitoring and observability


While this implementation provides a solid foundation, there are several areas for future enhancement:


1. Agent Learning: Implementing reinforcement learning mechanisms to allow agents to improve their decision-making over time.

2. Multi-Model Support: Extending the llama.cpp integration to support multiple models of varying sizes, optimized for different tasks.

3. Advanced Task Orchestration: Implementing more sophisticated task scheduling algorithms based on agent capabilities, load, and success rates.

4. Security Enhancements: Adding authentication, authorization, and encryption for agent communication.

5. Federated Learning: Enabling agents to collaboratively learn and share knowledge while maintaining privacy.


By building on this foundation, you can create even more sophisticated multi-agent systems capable of solving complex problems through emergent collaboration.


The approach outlined in this article demonstrates how modern cloud-native technologies can be combined with state-of-the-art AI techniques to create intelligent, distributed systems that can harness the power of specialized hardware and scale dynamically based on demand. As both Kubernetes and language models continue to evolve, the potential for these multi-agent systems will only grow.​​​​​​​​​​​​​​​​

No comments: