Introduction
Welcome, fellow architects and developers, to an exploration of a truly transformative approach to software design: the Capability-Centric Architecture (CCA). In our ever-evolving technological landscape, we often find ourselves wrestling with the inherent tension between the robust, resource-efficient demands of embedded systems and the flexible, scalable needs of enterprise applications. CCA, as presented in our foundational document, offers a harmonious resolution to this age-old dilemma, providing a unified conceptual framework that elegantly manages complexity, dependencies, and change across the entire spectrum, from the tiniest microcontroller to the sprawling cloud.
Imagine a world where your software components are not just isolated modules but cohesive, self-contained units of value, each possessing a clear purpose, a defined interface, and a strategy for its own evolution. This is the promise of CCA. It extends the wisdom of Domain-Driven Design, Hexagonal Architecture, and Clean Architecture, offering a blueprint for systems that are not only resilient and maintainable but also inherently adaptable to future demands.
To truly grasp the power of CCA, we will embark on an exciting journey to construct a sophisticated, AI-powered research collective: the "Cognito Research Agency." This agency will comprise various intelligent agents, such as researchers, summarizers, and critics, each acting as a distinct capability. These agents will leverage powerful language models (LLMs), both local and cloud-based, and utilize external tools like web search, all orchestrated within a Kubernetes environment. Our goal is to demonstrate how CCA principles can guide us in building a production-ready, highly scalable, and future-proof agentic AI application.
Let us begin our architectural odyssey.
The Heart of the Matter: The Capability Nucleus
At the very core of every capability lies its Nucleus, the internal structure that defines its essence and interaction with the world. The Nucleus is elegantly divided into three distinct layers: the Essence, the Realization, and the Adaptation. This layered approach ensures a clear separation of concerns, promoting testability, maintainability, and flexibility.
The Essence layer represents the pure domain logic or the algorithmic core of the capability. It is entirely independent of any infrastructure concerns, making it highly testable and reusable. Think of it as the "brain" of your capability, focused solely on what it does, not how it does it.
The Realization layer implements the technical mechanisms required to make the Essence functional in the real world. This is where infrastructure details come into play, such as interacting with databases, message queues, hardware, or, in our case, large language models and external tools. The Realization layer bridges the gap between the abstract Essence and the concrete operational environment.
The Adaptation layer provides the interfaces through which the capability interacts with other capabilities or external systems. It acts as a protective shell, translating external requests into the internal language of the Essence and Realization, and vice-versa. This layer ensures that the capability's internal workings remain insulated from external changes, promoting loose coupling.
Let us consider a simple ResearcherAgent capability within our Cognito Research Agency. This agent's primary function is to conduct research based on a given query.
Here is how its Capability Nucleus might be structured in Python:
# researcher_agent/capability_nucleus.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List
# -----------------------------------------------------------------------------
# 1. Essence Layer: Pure Domain Logic
# This defines the core research logic, independent of how LLMs or tools are used.
# -----------------------------------------------------------------------------
class ResearchEssence(ABC):
"""
Abstract base class for the core research logic.
Defines the fundamental operations of a researcher.
"""
@abstractmethod
def formulate_query(self, topic: str, context: Dict[str, Any]) -> str:
"""
Formulates a precise query based on the research topic and context.
"""
pass
@abstractmethod
def synthesize_findings(self, raw_data: List[str], initial_query: str) -> str:
"""
Synthesizes raw research data into coherent findings.
"""
pass
@abstractmethod
def evaluate_relevance(self, finding: str, original_topic: str) -> bool:
"""
Evaluates if a specific finding is relevant to the original research topic.
"""
pass
class BasicResearchEssence(ResearchEssence):
"""
A concrete implementation of the research essence.
In a real scenario, this would involve more sophisticated NLP or logic.
"""
def formulate_query(self, topic: str, context: Dict[str, Any]) -> str:
print(f"Essence: Formulating query for topic '{topic}' with context: {context}")
# Simple query formulation for demonstration
return f"Research topic: {topic}. Key aspects: {context.get('keywords', [])}. Provide comprehensive overview."
def synthesize_findings(self, raw_data: List[str], initial_query: str) -> str:
print(f"Essence: Synthesizing findings from {len(raw_data)} raw data points.")
# Simple concatenation for demonstration
return " ".join(raw_data) + f"\n\nSynthesized from query: {initial_query}"
def evaluate_relevance(self, finding: str, original_topic: str) -> bool:
print(f"Essence: Evaluating relevance of finding for topic '{original_topic}'.")
# Simple keyword check for demonstration
return original_topic.lower() in finding.lower()
# -----------------------------------------------------------------------------
# 2. Realization Layer: Infrastructure and External Interactions (LLMs, Tools)
# This layer connects the Essence to the external world, like an LLM service.
# -----------------------------------------------------------------------------
class LLMService(ABC):
"""Abstract interface for interacting with a Large Language Model."""
@abstractmethod
def generate_text(self, prompt: str, model_config: Dict[str, Any]) -> str:
"""Generates text based on a prompt using the configured LLM."""
pass
class LocalOllamaLLMService(LLMService):
"""
Realization for interacting with a local Ollama LLM instance.
This would typically involve HTTP requests to the Ollama API.
"""
def __init__(self, ollama_url: str = "http://localhost:11434"):
self.ollama_url = ollama_url
print(f"Realization: Initialized Ollama LLM service at {self.ollama_url}")
def generate_text(self, prompt: str, model_config: Dict[str, Any]) -> str:
print(f"Realization: Sending prompt to Ollama LLM: '{prompt[:50]}...'")
# In a real system, this would be an HTTP call to Ollama.
# Example using requests library (not included for brevity, but implied)
# import requests
# response = requests.post(
# f"{self.ollama_url}/api/generate",
# json={"model": model_config.get("model", "llama2"), "prompt": prompt}
# )
# response.raise_for_status()
# return response.json()["response"]
return f"LLM generated response for: '{prompt[:100]}...'" # Mock response for demonstration
class WebSearchTool(ABC):
"""Abstract interface for a web search tool."""
@abstractmethod
def search(self, query: str) -> List[str]:
"""Performs a web search and returns relevant snippets."""
pass
class MCPWebSearchTool(WebSearchTool):
"""
Realization for interacting with a Web Search tool provided by MCP servers.
This would involve calling an external API.
"""
def __init__(self, mcp_api_url: str = "http://mcp-search-service:8080/search"):
self.mcp_api_url = mcp_api_url
print(f"Realization: Initialized MCP Web Search tool at {self.mcp_api_url}")
def search(self, query: str) -> List[str]:
print(f"Realization: Performing web search for query: '{query[:50]}...'")
# In a real system, this would be an HTTP call to the MCP search service.
# import requests
# response = requests.get(f"{self.mcp_api_url}?q={query}")
# response.raise_for_status()
# return response.json()["results"]
return [
f"Web search result 1 for '{query[:50]}...': Found relevant document about {query.split(' ')[0]}...",
f"Web search result 2 for '{query[:50]}...': Another source on {query.split(' ')[0]} related topics."
] # Mock results for demonstration
class ResearcherRealization:
"""
Realization layer for the ResearcherAgent, orchestrating LLM and Web Search.
It takes the Essence and provides the concrete means to execute its logic.
"""
def __init__(self, essence: ResearchEssence, llm_service: LLMService, web_search_tool: WebSearchTool):
self._essence = essence
self._llm_service = llm_service
self._web_search_tool = web_search_tool
print("Realization: ResearcherAgent Realization initialized.")
def conduct_research(self, topic: str, context: Dict[str, Any]) -> str:
"""
Orchestrates the research process using the Essence and external tools.
"""
# 1. Essence: Formulate query
initial_query = self._essence.formulate_query(topic, context)
# 2. Realization: Use Web Search tool
search_results = self._web_search_tool.search(initial_query)
print(f"Realization: Received {len(search_results)} search results.")
# 3. Realization: Use LLM to process search results and refine.
# This could be a more complex interaction, e.g., prompt engineering.
llm_prompt = f"Given the following search results: {search_results}\n\n" \
f"And the initial query: {initial_query}\n\n" \
f"Synthesize a concise summary of the key findings."
llm_refined_data = self._llm_service.generate_text(llm_prompt, {"model": "llama2"})
print(f"Realization: LLM refined data received.")
# 4. Essence: Synthesize final findings from LLM-refined data
final_findings = self._essence.synthesize_findings([llm_refined_data], initial_query)
# 5. Essence: Evaluate relevance (can be done here or by another agent)
if not self._essence.evaluate_relevance(final_findings, topic):
print("Realization: Warning: Final findings might not be fully relevant.")
return final_findings
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface (e.g., REST API)
# This layer exposes the capability's functionality to other systems.
# -----------------------------------------------------------------------------
class ResearcherAgentAdaptation:
"""
Adaptation layer for the ResearcherAgent.
Provides a simple interface for external clients to interact with the agent.
"""
def __init__(self, realization: ResearcherRealization):
self._realization = realization
print("Adaptation: ResearcherAgent Adaptation initialized.")
def research_endpoint(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
A simulated API endpoint that receives research requests.
"""
topic = request_data.get("topic")
context = request_data.get("context", {})
if not topic:
return {"error": "Topic is required for research."}
print(f"Adaptation: Received research request for topic: '{topic}'")
try:
findings = self._realization.conduct_research(topic, context)
return {"status": "success", "findings": findings}
except Exception as e:
print(f"Adaptation: Error during research: {e}")
return {"status": "error", "message": str(e)}
# -----------------------------------------------------------------------------
# Example Usage: Assembling the ResearcherAgent Capability
# -----------------------------------------------------------------------------
if __name__ == "__main__":
print("\n--- Assembling ResearcherAgent Capability ---")
# Instantiate Essence
research_essence = BasicResearchEssence()
# Instantiate Realizations for external services
ollama_llm = LocalOllamaLLMService()
mcp_search = MCPWebSearchTool()
# Instantiate the Capability's Realization layer, injecting dependencies
researcher_realization = ResearcherRealization(
essence=research_essence,
llm_service=ollama_llm,
web_search_tool=mcp_search
)
# Instantiate the Capability's Adaptation layer
researcher_agent = ResearcherAgentAdaptation(realization=researcher_realization)
print("\n--- Simulating an external research request ---")
request = {
"topic": "Capability-Centric Architecture",
"context": {"keywords": ["CCA", "architecture patterns", "embedded-enterprise"]}
}
response = researcher_agent.research_endpoint(request)
print("\n--- Research Request Response ---")
import json
print(json.dumps(response, indent=2))
print("\n--- Simulating another request ---")
request_2 = {
"topic": "Quantum Computing Impact on AI",
"context": {"keywords": ["quantum machine learning", "QML", "AI acceleration"]}
}
response_2 = researcher_agent.research_endpoint(request_2)
print("\n--- Research Request Response 2 ---")
print(json.dumps(response_2, indent=2))
The code above demonstrates the Capability Nucleus in action. The BasicResearchEssence focuses purely on the logical steps of research. The LocalOllamaLLMService and MCPWebSearchTool are concrete Realizations for interacting with external systems. The ResearcherRealization then orchestrates these, injecting the Essence and the external Realizations. Finally, the ResearcherAgentAdaptation provides a simple research_endpoint that acts as the external interface. This clear separation makes each part independently testable and swappable. For instance, we could easily replace LocalOllamaLLMService with a CloudOpenAILLMService without altering the Essence or Adaptation layers, as long as the new service adheres to the LLMService interface.
To visualize this layered structure, imagine concentric circles:
+-------------------------------------------------+
| Adaptation Layer (API/Interface) |
| +-------------------------------------------+ |
| | Realization Layer (LLM/Tools) | |
| | +-------------------------------------+ | |
| | | Essence Layer | | |
| | | (Core Research Logic) | | |
| | +-------------------------------------+ | |
| +-------------------------------------------+ |
+-------------------------------------------------+
Capability Nucleus for ResearcherAgent
The Glue That Binds: Capability Contracts
In a system composed of numerous capabilities, clear communication and predictable interactions are paramount. This is where Capability Contracts shine. A contract formally defines what a capability provides (its Provisions), what it requires from others (its Requirements), and the interaction patterns and quality attributes that govern these exchanges (its Protocols). Contracts enable capabilities to evolve independently while ensuring compatibility and predictable behavior across the system.
A Provision describes the services, data, or events that a capability offers to others. It is essentially the capability's public API. A Requirement specifies the services, data, or events that a capability needs from other capabilities to perform its function. The Protocol defines the rules of engagement, including data formats, communication mechanisms (e.g., REST, gRPC, message queues), security policies, and crucial quality attributes like expected latency, throughput, or error rates.
Let us define contracts for our ResearcherAgent and the WebSearchTool it requires.
# common/capability_contracts.py
from typing import Dict, Any, List, Literal, TypedDict
# -----------------------------------------------------------------------------
# 1. WebSearchTool Capability Contract
# Defines what the WebSearchTool provides.
# -----------------------------------------------------------------------------
class WebSearchResult(TypedDict):
"""Represents a single web search result snippet."""
title: str
url: str
snippet: str
class WebSearchProvision(TypedDict):
"""Defines the provision of the WebSearchTool capability."""
operation: Literal["search"]
input_schema: Dict[str, Any] # JSON Schema for search query
output_schema: Dict[str, Any] # JSON Schema for search results
protocol: Dict[str, Any] # Details like HTTP method, endpoint, expected latency
WEB_SEARCH_TOOL_CONTRACT = {
"capability_id": "web-search-tool-v1",
"description": "Provides web search functionality to retrieve relevant information.",
"provisions": [
WebSearchProvision(
operation="search",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query string."}
},
"required": ["query"]
},
output_schema={
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"url": {"type": "string", "format": "uri"},
"snippet": {"type": "string"}
},
"required": ["title", "url", "snippet"]
}
},
protocol={
"type": "REST",
"method": "GET",
"endpoint_path": "/search",
"expected_latency_ms": 2000, # Expected maximum latency
"max_rps": 100 # Maximum requests per second
}
)
]
}
# -----------------------------------------------------------------------------
# 2. ResearcherAgent Capability Contract
# Defines what the ResearcherAgent provides and what it requires.
# -----------------------------------------------------------------------------
class ResearchRequest(TypedDict):
"""Input schema for a research request."""
topic: str
context: Dict[str, Any]
class ResearchResponse(TypedDict):
"""Output schema for a research response."""
status: Literal["success", "error"]
findings: str
message: str # For error messages
class ResearcherProvision(TypedDict):
"""Defines the provision of the ResearcherAgent capability."""
operation: Literal["conduct_research"]
input_schema: Dict[str, Any] # JSON Schema for research request
output_schema: Dict[str, Any] # JSON Schema for research response
protocol: Dict[str, Any] # Details like HTTP method, endpoint, expected latency
class ResearcherRequirement(TypedDict):
"""Defines a requirement for the ResearcherAgent capability."""
capability_id: str # ID of the required capability
provision_operation: str # Specific operation required from that capability
min_version: str # Minimum compatible version
RESEARCHER_AGENT_CONTRACT = {
"capability_id": "researcher-agent-v1",
"description": "An agent capable of conducting comprehensive research on a given topic.",
"provisions": [
ResearcherProvision(
operation="conduct_research",
input_schema={
"type": "object",
"properties": {
"topic": {"type": "string", "description": "The main topic to research."},
"context": {"type": "object", "description": "Additional context or keywords."}
},
"required": ["topic"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"findings": {"type": "string"},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/research",
"expected_latency_ms": 10000, # Research can take longer
"max_rps": 10 # Research is resource intensive
}
)
],
"requirements": [
ResearcherRequirement(
capability_id="web-search-tool-v1",
provision_operation="search",
min_version="v1"
)
# In a real system, there would also be a requirement for an LLM service
# e.g., {"capability_id": "llm-inference-service-v1", "provision_operation": "generate_text", "min_version": "v1"}
]
}
# Example of how to access contract information
if __name__ == "__main__":
print("--- WebSearchTool Contract ---")
print(f"Capability ID: {WEB_SEARCH_TOOL_CONTRACT['capability_id']}")
print(f"Description: {WEB_SEARCH_TOOL_CONTRACT['description']}")
print(f"Search Provision Input Schema: {WEB_SEARCH_TOOL_CONTRACT['provisions'][0]['input_schema']}")
print(f"Search Provision Protocol: {WEB_SEARCH_TOOL_CONTRACT['provisions'][0]['protocol']}")
print("\n--- ResearcherAgent Contract ---")
print(f"Capability ID: {RESEARCHER_AGENT_CONTRACT['capability_id']}")
print(f"Description: {RESEARCHER_AGENT_CONTRACT['description']}")
print(f"Research Provision Output Schema: {RESEARCHER_AGENT_CONTRACT['provisions'][0]['output_schema']}")
print(f"Required Capabilities: {RESEARCHER_AGENT_CONTRACT['requirements']}")
The WEB_SEARCH_TOOL_CONTRACT and RESEARCHER_AGENT_CONTRACT define the precise expectations for interaction. They use TypedDict for clarity and JSON Schema for robust data validation, which is crucial for interoperability. The protocolsection explicitly states quality attributes like expected_latency_ms and max_rps, which are vital for system design and monitoring. The ResearcherRequirement clearly states that the ResearcherAgent needs the web-search-tool-v1capability. This explicit declaration of dependencies is a cornerstone of CCA, enabling robust dependency management and preventing unexpected failures.
The Path to Performance: Capability Efficiency Gradients
Not all tasks within a system demand the same level of performance, resource consumption, or abstraction. CCA introduces the concept of Efficiency Gradients, allowing architects to design different "paths" or "realizations" within a capability to optimize for specific concerns. This means you can implement critical paths with minimal overhead for strict real-time requirements, while less critical paths can leverage higher abstractions for greater flexibility, maintainability, or cost-effectiveness.
Consider our LLMInferenceCapability. Some requests might be simple, requiring a quick, low-latency response, perhaps from a smaller, locally deployed model. Other requests might be complex, involving extensive context, tool use, or requiring a larger, more powerful model, potentially hosted in the cloud, which might incur higher latency but deliver superior quality.
Here is how an LLMInferenceService could implement efficiency gradients by offering different Realizations based on the complexity of the request or desired performance characteristics:
# llm_inference_service/capability_nucleus.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Literal
# -----------------------------------------------------------------------------
# Essence Layer: Core LLM Inference Logic (abstracted)
# -----------------------------------------------------------------------------
class LLMInferenceEssence(ABC):
"""
Abstract base class for the core LLM inference logic.
Defines the fundamental operation of generating text.
"""
@abstractmethod
def process_prompt(self, prompt: str, config: Dict[str, Any]) -> str:
"""
Processes a prompt and generates a response based on configuration.
"""
pass
class BasicLLMInferenceEssence(LLMInferenceEssence):
"""
A concrete implementation of the LLM inference essence.
This layer focuses on the *logic* of inference, not the *mechanism*.
"""
def process_prompt(self, prompt: str, config: Dict[str, Any]) -> str:
print(f"Essence: Processing prompt with configuration: {config.get('model', 'default')}")
# In a real scenario, this might involve prompt re-writing, safety checks, etc.
return prompt # The actual generation is handled by Realization
# -----------------------------------------------------------------------------
# Realization Layer: Different LLM Backends (Efficiency Gradients)
# -----------------------------------------------------------------------------
class LLMBackend(ABC):
"""Abstract interface for different LLM backend implementations."""
@abstractmethod
def infer(self, processed_prompt: str, model_name: str, options: Dict[str, Any]) -> str:
"""Performs inference using a specific LLM model and options."""
pass
class LocalOllamaBackend(LLMBackend):
"""
Realization for a local Ollama instance, suitable for fast, local inference.
Supports various GPU architectures via Ollama's underlying capabilities.
"""
def __init__(self, ollama_url: str = "http://localhost:11434", gpu_backend: str = "cuda"):
self.ollama_url = ollama_url
self.gpu_backend = gpu_backend # e.g., "cuda", "rocm", "mps", "cpu"
print(f"Realization: Initialized Local Ollama Backend ({gpu_backend}) at {ollama_url}")
def infer(self, processed_prompt: str, model_name: str, options: Dict[str, Any]) -> str:
print(f"Realization: Local Ollama ({self.gpu_backend}) inferring for '{model_name}' with prompt: '{processed_prompt[:50]}...'")
# Simulate API call to Ollama.
# The actual GPU backend is configured in Ollama itself, and its API handles it.
# The 'model_name' would correspond to a model pulled into Ollama.
# Example: requests.post(f"{self.ollama_url}/api/generate", json={"model": model_name, "prompt": processed_prompt, **options})
return f"Local Ollama ({self.gpu_backend}) generated: '{processed_prompt[:100]}...'"
class CloudOpenAIBackend(LLMBackend):
"""
Realization for a cloud-based OpenAI service, suitable for high-quality, scalable inference.
"""
def __init__(self, api_key: str):
self.api_key = api_key # In a real app, this would be loaded securely
print("Realization: Initialized Cloud OpenAI Backend.")
def infer(self, processed_prompt: str, model_name: str, options: Dict[str, Any]) -> str:
print(f"Realization: Cloud OpenAI inferring for '{model_name}' with prompt: '{processed_prompt[:50]}...'")
# Simulate API call to OpenAI.
# Example: openai.ChatCompletion.create(model=model_name, messages=[{"role": "user", "content": processed_prompt}], **options)
return f"Cloud OpenAI generated: '{processed_prompt[:100]}...'"
class LLMInferenceRealization:
"""
Realization layer for the LLMInferenceCapability, selecting backend based on gradient.
"""
def __init__(self, essence: LLMInferenceEssence,
local_backend: LocalOllamaBackend,
cloud_backend: CloudOpenAIBackend):
self._essence = essence
self._local_backend = local_backend
self._cloud_backend = cloud_backend
print("Realization: LLMInferenceRealization initialized with multiple backends.")
def generate_text(self, prompt: str,
model_config: Dict[str, Any],
gradient_preference: Literal["fast_local", "high_quality_cloud"] = "fast_local") -> str:
"""
Generates text, choosing the backend based on gradient preference.
"""
# 1. Essence: Process the prompt (e.g., safety, re-writing)
processed_prompt = self._essence.process_prompt(prompt, model_config)
# 2. Realization: Select the appropriate backend based on gradient preference
if gradient_preference == "fast_local":
print("Realization: Choosing 'fast_local' gradient (Ollama).")
return self._local_backend.infer(processed_prompt, model_config.get("model", "llama2"), model_config)
elif gradient_preference == "high_quality_cloud":
print("Realization: Choosing 'high_quality_cloud' gradient (OpenAI).")
return self._cloud_backend.infer(processed_prompt, model_config.get("model", "gpt-4"), model_config)
else:
raise ValueError(f"Unknown gradient preference: {gradient_preference}")
# -----------------------------------------------------------------------------
# Adaptation Layer: External Interface for LLM Inference
# -----------------------------------------------------------------------------
class LLMInferenceAdaptation:
"""
Adaptation layer for the LLMInferenceCapability.
Exposes a unified interface for LLM inference, abstracting backend choice.
"""
def __init__(self, realization: LLMInferenceRealization):
self._realization = realization
print("Adaptation: LLMInferenceAdaptation initialized.")
def inference_endpoint(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""
A simulated API endpoint for LLM inference requests.
"""
prompt = request_data.get("prompt")
model_config = request_data.get("model_config", {})
gradient = request_data.get("gradient", "fast_local")
if not prompt:
return {"error": "Prompt is required for inference."}
print(f"Adaptation: Received inference request with gradient: '{gradient}'")
try:
generated_text = self._realization.generate_text(prompt, model_config, gradient)
return {"status": "success", "generated_text": generated_text}
except Exception as e:
print(f"Adaptation: Error during LLM inference: {e}")
return {"status": "error", "message": str(e)}
# -----------------------------------------------------------------------------
# Example Usage: Assembling the LLMInference Capability with Gradients
# -----------------------------------------------------------------------------
if __name__ == "__main__":
print("\n--- Assembling LLMInference Capability ---")
llm_essence = BasicLLMInferenceEssence()
# Instantiate different LLM backends (Realizations)
# Note: GPU backend selection for Ollama is typically configured at Ollama server level,
# but here we can represent the *intent* of using a specific hardware configuration.
local_ollama_cuda = LocalOllamaBackend(gpu_backend="cuda")
local_ollama_rocm = LocalOllamaBackend(gpu_backend="rocm") # Example for AMD ROCm
local_ollama_mps = LocalOllamaBackend(gpu_backend="mps") # Example for Apple MPS
local_ollama_intel = LocalOllamaBackend(gpu_backend="intel") # Example for Intel GPUs
cloud_openai = CloudOpenAIBackend(api_key="sk-your-openai-key") # Placeholder
# The LLMInferenceRealization would typically select *one* local backend based on environment
# For demonstration, we'll just pass one or choose dynamically.
# Here, we'll just use the CUDA one for the local path.
llm_realization = LLMInferenceRealization(
essence=llm_essence,
local_backend=local_ollama_cuda, # Or dynamically select based on available hardware
cloud_backend=cloud_openai
)
llm_inference_service = LLMInferenceAdaptation(realization=llm_realization)
print("\n--- Simulating a 'fast_local' inference request ---")
request_fast = {
"prompt": "What is the capital of France?",
"model_config": {"model": "llama2:7b"},
"gradient": "fast_local"
}
response_fast = llm_inference_service.inference_endpoint(request_fast)
print("\n--- Fast Inference Response ---")
import json
print(json.dumps(response_fast, indent=2))
print("\n--- Simulating a 'high_quality_cloud' inference request ---")
request_cloud = {
"prompt": "Explain the intricate details of quantum entanglement in layman's terms.",
"model_config": {"model": "gpt-4-turbo"},
"gradient": "high_quality_cloud"
}
response_cloud = llm_inference_service.inference_endpoint(request_cloud)
print("\n--- Cloud Inference Response ---")
print(json.dumps(response_cloud, indent=2))
In this example, the LLMInferenceRealization acts as a dispatcher, choosing between LocalOllamaBackend (for "fast_local" gradient) and CloudOpenAIBackend (for "high_quality_cloud" gradient). This selection is based on the gradient_preference provided in the request. The LocalOllamaBackend itself can be configured to leverage various GPU architectures (Nvidia CUDA, AMD ROCm, Apple MPS, Intel GPUs) through Ollama's underlying capabilities, abstracting the specific hardware details from the LLMInferenceRealization. This allows the system to dynamically adapt its resource usage and performance characteristics based on the immediate needs, representing a powerful application of efficiency gradients.
Adapting to Change: Capability Evolution Envelopes
Software systems are never static; they are living entities that must adapt to new requirements, technologies, and insights. The Capability Evolution Envelope provides a structured approach to managing change within a capability, encompassing versioning, deprecation policies, and migration paths. This ensures that capabilities can evolve independently without causing widespread disruption across the system.
An Evolution Envelope typically includes:
- Versioning information: Clearly defining the current version of the capability and its contract.
- Deprecation policies: Stating when older versions will no longer be supported.
- Migration paths: Providing guidance or tools to help consumers transition from an older version to a newer one.
Let us imagine our ResearcherAgent needs an update. Perhaps v1 only supported basic keyword searches, but v2introduces semantic search capabilities and integrates with a new knowledge graph.
# researcher_agent/evolution_envelopes.py
from typing import Dict, Any, List, Literal, TypedDict
from common.capability_contracts import RESEARCHER_AGENT_CONTRACT, WebSearchProvision # Import v1 contract
# -----------------------------------------------------------------------------
# Define the new contract for ResearcherAgent v2
# -----------------------------------------------------------------------------
class SemanticSearchProvision(TypedDict):
"""A new provision for semantic search, replacing or augmenting basic search."""
operation: Literal["semantic_search"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
class ResearcherProvisionV2(TypedDict):
"""Updated provision for the ResearcherAgent capability."""
operation: Literal["conduct_advanced_research"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
class ResearcherRequirementV2(TypedDict):
"""Updated requirements for the ResearcherAgent capability, potentially for a new semantic search tool."""
capability_id: str
provision_operation: str
min_version: str
RESEARCHER_AGENT_CONTRACT_V2 = {
"capability_id": "researcher-agent-v2",
"description": "An advanced agent capable of semantic research and knowledge graph integration.",
"provisions": [
ResearcherProvisionV2(
operation="conduct_advanced_research",
input_schema={
"type": "object",
"properties": {
"topic": {"type": "string", "description": "The main topic for advanced research."},
"context": {"type": "object", "description": "Extended context, including ontologies or specific entities."}
},
"required": ["topic"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"findings": {"type": "string"},
"knowledge_graph_links": {"type": "array", "items": {"type": "string", "format": "uri"}},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/advanced_research",
"expected_latency_ms": 15000, # More complex, higher latency
"max_rps": 5 # More resource intensive
}
)
],
"requirements": [
ResearcherRequirementV2(
capability_id="semantic-search-tool-v1", # New requirement for a semantic search tool
provision_operation="semantic_search",
min_version="v1"
),
ResearcherRequirementV2(
capability_id="knowledge-graph-service-v1", # New requirement for a knowledge graph
provision_operation="query_graph",
min_version="v1"
)
]
}
# -----------------------------------------------------------------------------
# Capability Evolution Envelope Definition
# -----------------------------------------------------------------------------
class CapabilityEvolutionEnvelope(TypedDict):
"""Defines the evolution strategy for a capability."""
capability_id: str
current_version: str
supported_versions: List[str]
deprecation_policy: Dict[str, Any] # e.g., {"v1": {"deprecated_on": "2024-12-31", "end_of_life": "2025-06-30"}}
migration_guide_url: str # URL to documentation for migration
contract_history: Dict[str, Any] # A map of version to its contract
RESEARCHER_AGENT_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "researcher-agent", # Base ID without version
"current_version": "v2",
"supported_versions": ["v1", "v2"],
"deprecation_policy": {
"v1": {
"deprecated_on": "2024-12-01",
"end_of_life": "2025-06-01",
"notes": "Consumers should migrate to 'conduct_advanced_research' provision in v2."
}
},
"migration_guide_url": "https://cognito-docs.agency/researcher-agent/v1-to-v2-migration",
"contract_history": {
"v1": RESEARCHER_AGENT_CONTRACT,
"v2": RESEARCHER_AGENT_CONTRACT_V2
}
}
# Example of how to access evolution envelope information
if __name__ == "__main__":
print("--- ResearcherAgent Evolution Envelope ---")
print(f"Capability Base ID: {RESEARCHER_AGENT_EVOLUTION_ENVELOPE['capability_id']}")
print(f"Current Version: {RESEARCHER_AGENT_EVOLUTION_ENVELOPE['current_version']}")
print(f"Supported Versions: {RESEARCHER_AGENT_EVOLUTION_ENVELOPE['supported_versions']}")
print(f"v1 Deprecation Policy: {RESEARCHER_AGENT_EVOLUTION_ENVELOPE['deprecation_policy']['v1']}")
print(f"Migration Guide: {RESEARCHER_AGENT_EVOLUTION_ENVELOPE['migration_guide_url']}")
print("\n--- Accessing v1 Contract from History ---")
v1_contract = RESEARCHER_AGENT_EVOLUTION_ENVELOPE['contract_history']['v1']
print(f"v1 Provision Operation: {v1_contract['provisions'][0]['operation']}")
The CapabilityEvolutionEnvelope provides a centralized and structured way to manage the lifecycle of our ResearcherAgent. It explicitly states that v1 is deprecated and provides a clear end-of-life date, along with a URL to a migration guide. This allows other capabilities that consume ResearcherAgent to plan their upgrades effectively, avoiding sudden breaking changes. The contract_history ensures that past contract definitions are retained for reference and compatibility checks. This proactive approach to evolution is critical for maintaining a stable yet adaptable system.
Orchestrating the Collective: Capability Registration and Lifecycle Management
In a complex system like the Cognito Research Agency, where numerous agents and tools operate as independent capabilities, a central mechanism is needed to discover, manage, and coordinate them. This is achieved through Capability Registration and Lifecycle Management.
A. The Grand Registry: The Capability Registry serves as the authoritative source of truth for all capabilities within the system. It stores their contracts, evolution envelopes, and deployment information. Crucially, the registry is responsible for validating dependencies between capabilities and detecting circular dependencies, which can lead to deadlocks or unresolvable startup sequences.
A conceptual CapabilityRegistry service could be implemented as a dedicated microservice or, in a Kubernetes context, through Custom Resource Definitions (CRDs) where capability definitions are stored as Kubernetes objects. For our example, we will illustrate a Python-based registry that can perform dependency checks.
# capability_registry/registry_service.py
from typing import Dict, Any, List, Set, Tuple
from common.capability_contracts import RESEARCHER_AGENT_CONTRACT, RESEARCHER_AGENT_CONTRACT_V2, WEB_SEARCH_TOOL_CONTRACT
from researcher_agent.evolution_envelopes import RESEARCHER_AGENT_EVOLUTION_ENVELOPE
class CapabilityRegistry:
"""
The central registry for all capabilities, managing their contracts and dependencies.
It detects circular dependencies to ensure system stability.
"""
def __init__(self):
self._capabilities: Dict[str, Dict[str, Any]] = {} # Stores capability_id -> full_contract
self._evolution_envelopes: Dict[str, Dict[str, Any]] = {} # Stores base_id -> evolution_envelope
print("CapabilityRegistry initialized.")
def register_capability(self, capability_contract: Dict[str, Any], evolution_envelope: Dict[str, Any] = None):
"""
Registers a new capability and its contract.
If an evolution envelope is provided, it's also stored.
"""
capability_id = capability_contract["capability_id"]
base_id = capability_id.rsplit('-', 1)[0] # e.g., "researcher-agent-v1" -> "researcher-agent"
if capability_id in self._capabilities:
print(f"Warning: Capability '{capability_id}' already registered. Overwriting.")
self._capabilities[capability_id] = capability_contract
print(f"Capability '{capability_id}' registered successfully.")
if evolution_envelope:
self._evolution_envelopes[base_id] = evolution_envelope
print(f"Evolution envelope for '{base_id}' registered.")
def get_capability_contract(self, capability_id: str) -> Dict[str, Any]:
"""Retrieves a capability's contract by its ID."""
if capability_id not in self._capabilities:
raise ValueError(f"Capability '{capability_id}' not found in registry.")
return self._capabilities[capability_id]
def get_evolution_envelope(self, base_capability_id: str) -> Dict[str, Any]:
"""Retrieves a capability's evolution envelope by its base ID."""
if base_capability_id not in self._evolution_envelopes:
raise ValueError(f"Evolution envelope for '{base_capability_id}' not found.")
return self._evolution_envelopes[base_capability_id]
def _build_dependency_graph(self) -> Dict[str, List[str]]:
"""
Internal helper to build a graph where nodes are capabilities
and edges represent 'requires' relationships.
"""
graph: Dict[str, List[str]] = {}
for cap_id, contract in self._capabilities.items():
graph[cap_id] = []
requirements = contract.get("requirements", [])
for req in requirements:
required_cap_id = req["capability_id"]
# We assume that a required capability must be registered.
# In a real system, we might also check min_version compatibility.
if required_cap_id in self._capabilities:
graph[cap_id].append(required_cap_id)
else:
print(f"Warning: Capability '{cap_id}' requires '{required_cap_id}' which is not registered.")
return graph
def detect_cycles(self) -> List[List[str]]:
"""
Detects circular dependencies in the capability graph using Depth First Search (DFS).
Returns a list of cycles found.
"""
graph = self._build_dependency_graph()
visited: Set[str] = set()
recursion_stack: Set[str] = set()
cycles: List[List[str]] = []
def dfs(node: str, path: List[str]):
visited.add(node)
recursion_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
dfs(neighbor, path)
elif neighbor in recursion_stack:
# Cycle detected!
cycle_start_index = path.index(neighbor)
cycles.append(path[cycle_start_index:])
recursion_stack.remove(node)
path.pop() # Backtrack
for node in graph:
if node not in visited:
dfs(node, [])
return cycles
# Example Usage
if __name__ == "__main__":
registry = CapabilityRegistry()
# Register capabilities
registry.register_capability(WEB_SEARCH_TOOL_CONTRACT)
registry.register_capability(RESEARCHER_AGENT_CONTRACT, RESEARCHER_AGENT_EVOLUTION_ENVELOPE)
registry.register_capability(RESEARCHER_AGENT_CONTRACT_V2, RESEARCHER_AGENT_EVOLUTION_ENVELOPE) # Register v2 as well
# Simulate a circular dependency for testing
# Let's create a dummy capability 'critic-agent-v1' that requires 'researcher-agent-v1'
# and then modify 'researcher-agent-v1' to require 'critic-agent-v1' (conceptually)
CRITIC_AGENT_CONTRACT = {
"capability_id": "critic-agent-v1",
"description": "Evaluates research findings.",
"provisions": [],
"requirements": [
{"capability_id": "researcher-agent-v1", "provision_operation": "conduct_research", "min_version": "v1"}
]
}
registry.register_capability(CRITIC_AGENT_CONTRACT)
# Now, let's *conceptually* modify researcher-agent-v1 to require critic-agent-v1
# For this example, we'll directly add it to the already registered contract for demonstration.
# In a real system, you'd register a new contract version or update via a proper API.
researcher_v1_contract_modified = registry.get_capability_contract("researcher-agent-v1")
researcher_v1_contract_modified["requirements"].append(
{"capability_id": "critic-agent-v1", "provision_operation": "evaluate_findings", "min_version": "v1"}
)
# Re-registering to update the internal state for dependency graph
registry.register_capability(researcher_v1_contract_modified)
print("\n--- Attempting to detect cycles after introducing one ---")
cycles = registry.detect_cycles()
if cycles:
print("\n!!! Circular dependencies detected !!!")
for cycle in cycles:
print(f" Cycle: {' -> '.join(cycle)}")
else:
print("\nNo circular dependencies detected.")
# Reset for next run, remove the artificial cycle
# (In a real system, you'd manage this via proper versioning/updates)
registry = CapabilityRegistry()
registry.register_capability(WEB_SEARCH_TOOL_CONTRACT)
registry.register_capability(RESEARCHER_AGENT_CONTRACT, RESEARCHER_AGENT_EVOLUTION_ENVELOPE)
registry.register_capability(RESEARCHER_AGENT_CONTRACT_V2, RESEARCHER_AGENT_EVOLUTION_ENVELOPE)
registry.register_capability(CRITIC_AGENT_CONTRACT) # Register without the artificial cycle
print("\n--- Detecting cycles after removing artificial one ---")
cycles_no_cycle = registry.detect_cycles()
if cycles_no_cycle:
print("\n!!! Circular dependencies detected !!!")
else:
print("\nNo circular dependencies detected.")
The CapabilityRegistry class is a crucial component. It not only stores contracts and evolution envelopes but also implements a detect_cycles method. This method builds a dependency graph from the registered capabilities' requirements and then uses a Depth First Search (DFS) algorithm to identify any circular dependencies. Detecting these cycles early in the development or deployment process is vital, as they can lead to systems that are impossible to initialize or prone to deadlocks.
B. Life in Kubernetes: For an agentic AI application like the Cognito Research Agency, Kubernetes is the ideal platform for lifecycle management. It provides robust mechanisms for deploying, scaling, updating, and retiring capabilities as microservices. Each agent (Researcher, Summarizer, Critic) and core service (LLM Inference, Web Search Tool) will be deployed as a separate Kubernetes Deployment, exposed via a Service.
Here is a simplified Kubernetes YAML example for deploying our ResearcherAgent as a capability:
# kubernetes/researcher-agent-v1-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: researcher-agent-v1
labels:
app: researcher-agent
version: v1
capability-id: researcher-agent-v1
spec:
replicas: 2 # Scale the research agent for higher availability and throughput
selector:
matchLabels:
app: researcher-agent
version: v1
template:
metadata:
labels:
app: researcher-agent
version: v1
capability-id: researcher-agent-v1
spec:
containers:
- name: researcher-agent-container
image: cognito-registry/researcher-agent:v1.0.0 # Image for ResearcherAgent v1
ports:
- containerPort: 8000 # The port where the agent's API listens
env:
- name: LLM_SERVICE_URL # Environment variable for LLM service dependency
value: "http://llm-inference-service:8001"
- name: WEB_SEARCH_TOOL_URL # Environment variable for Web Search Tool dependency
value: "http://web-search-tool:8002"
resources: # Capability Resource Management (discussed next)
requests:
cpu: "500m" # Request 0.5 CPU core
memory: "1Gi" # Request 1 Gigabyte of memory
limits:
cpu: "1000m" # Limit to 1 CPU core
memory: "2Gi" # Limit to 2 Gigabytes of memory
readinessProbe: # Health check to ensure the agent is ready to serve requests
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe: # Health check to ensure the agent is still running
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: researcher-agent-v1
labels:
app: researcher-agent
version: v1
capability-id: researcher-agent-v1
spec:
selector:
app: researcher-agent
version: v1
ports:
- protocol: TCP
port: 8000
targetPort: 8000
type: ClusterIP # Internal service, accessible within the cluster
This Kubernetes Deployment manifest defines how researcher-agent-v1 is deployed. It specifies the Docker image, the number of replicas for scaling, environment variables to inject dependencies (like the URLs of the LLM and Web Search capabilities), and resource requests/limits. The Service object provides a stable network endpoint for other capabilities to interact with researcher-agent-v1, abstracting away the individual pods. When a new version, researcher-agent-v2, is ready, a new Deployment and Service can be created, allowing for blue/green deployments or canary releases, managed by Kubernetes. The CapabilityRegistry would then be updated to reflect the availability of v2 and the deprecation of v1, guiding consumers to the new version.
Fueling the Brains: Capability Resource Management
For an LLM-based agentic AI application, efficient resource management is paramount, especially when dealing with computationally intensive models and diverse hardware. Capability Resource Management, within the CCA framework, focuses on how capabilities declare and consume resources, ensuring optimal performance and cost-effectiveness. In a Kubernetes environment, this translates to defining CPU, memory, and crucially, GPU resources.
Our LLMInferenceService capability, which provides the underlying LLM processing, is a prime candidate for sophisticated resource management. It needs to support various GPU architectures (Nvidia CUDA, AMD ROCm, Apple MPS, Intel GPUs) to maximize hardware utilization and flexibility. The Realization layer of this capability is responsible for abstracting these hardware specifics.
Here is how Kubernetes resource management, particularly for GPUs, would be applied:
# kubernetes/llm-inference-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-inference-service
labels:
app: llm-inference-service
capability-id: llm-inference-service-v1
spec:
replicas: 1 # We might start with one replica, scaling based on GPU availability
selector:
matchLabels:
app: llm-inference-service
template:
metadata:
labels:
app: llm-inference-service
capability-id: llm-inference-service-v1
spec:
# Node affinity/selector can be used to target specific nodes with GPUs
nodeSelector:
gpu-vendor: nvidia # Example: Target nodes with Nvidia GPUs
tolerations: # Allow scheduling on tainted nodes (e.g., GPU nodes often have taints)
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
containers:
- name: llm-inference-container
image: cognito-registry/ollama-llm-service:v1.0.0 # Image with Ollama and models
ports:
- containerPort: 8001
env:
- name: OLLAMA_HOST
value: "0.0.0.0" # Expose Ollama within the pod
- name: OLLAMA_NUM_GPU # Number of GPUs to use for Ollama (if multiple available)
value: "1"
- name: OLLAMA_GPU_ARCH # Specify target GPU architecture for Ollama (e.g., cuda, rocm, mps)
value: "cuda" # This would be dynamically set or configured per deployment
resources:
requests:
cpu: "2000m" # Request 2 CPU cores
memory: "16Gi" # Request 16 Gigabytes of memory (LLMs are memory hungry)
nvidia.com/gpu: 1 # Request 1 Nvidia GPU (requires Nvidia device plugin)
# For AMD ROCm: amd.com/gpu: 1
# For Intel GPUs: intel.com/gpu: 1 (requires Intel device plugin)
# Apple MPS is typically managed by the host OS/Ollama directly, not k8s device plugin for external GPUs.
limits:
cpu: "4000m"
memory: "32Gi"
nvidia.com/gpu: 1
volumeMounts: # Mount a volume for Ollama models
- name: ollama-models-storage
mountPath: /root/.ollama
volumes:
- name: ollama-models-storage
persistentVolumeClaim:
claimName: ollama-pvc # Persistent volume claim for model storage
---
apiVersion: v1
kind: Service
metadata:
name: llm-inference-service
labels:
app: llm-inference-service
capability-id: llm-inference-service-v1
spec:
selector:
app: llm-inference-service
ports:
- protocol: TCP
port: 8001
targetPort: 8001
type: ClusterIP
In this Kubernetes manifest, we see several key aspects of resource management. The resources section explicitly defines requests and limits for CPU and memory. Crucially, it requests nvidia.com/gpu: 1, indicating that this pod requires one Nvidia GPU. For other architectures, this would change to amd.com/gpu: 1 or intel.com/gpu: 1, assuming the respective Kubernetes device plugins are installed on the cluster. The nodeSelector and tolerations ensure that this LLM inference service is scheduled only on nodes that possess the required GPU hardware.
Within the LLMInferenceService's Realization layer, the OLLAMA_GPU_ARCH environment variable could be used to inform Ollama (or llama.cpp if used directly) which backend to prioritize. For instance, an ollama image could be built with support for multiple backends, and this variable would guide its runtime configuration. For local LLMs directly using llama.cpp, the build process itself would be tailored for CUDA, ROCm, or Metal (for Apple MPS). The LLMInferenceRealization would then abstract this, providing a unified generate_text interface regardless of the underlying GPU architecture. This allows the Cognito Research Agency to deploy LLM capabilities efficiently across a heterogeneous cluster of GPU-enabled nodes.
The Cognito Research Agency: A Running Example Addendum
Now, let us bring all these concepts together and present a more holistic view of our Cognito Research Agency. This addendum provides a full, production-ready illustrative implementation of the core components, demonstrating how the Capability-Centric Architecture principles manifest in a real-world agentic AI application. While the full deployment and intricate LLM model management are beyond the scope of a single text output, this code provides a robust architectural foundation.
A. Overall Architecture
The Cognito Research Agency operates as a distributed collective of specialized AI agents. A central Orchestrator receives research requests, which are then delegated to various ResearcherAgents. These agents leverage an LLMInferenceService for language understanding and generation, and a WebSearchTool for information retrieval. SummarizerAgents condense findings, and CriticAgents evaluate the quality and relevance of the research. All capabilities are registered with a CapabilityRegistry and deployed on Kubernetes.
Here is a simplified ASCII diagram illustrating the interaction flow:
+-------------------------------------------------+
| External Client |
+--------------------+----------------------------+
|
| Research Request
V
+-------------------------------------------------+
| Orchestrator Agent |
| (Routes requests, manages workflow) |
+--------------------+----------------------------+
|
| Delegate Research Task
V
+-------------------------------------------------+
| Researcher Agent (v2) |
| (Formulates queries, synthesizes findings) |
| +-------------------------------------------+ |
| | Realization Layer | |
| | +---------+ +---------+ +---------+ | |
| | | LLM Inf.| | WebSearch | | Knowledge|| |
| | | Service |<->| Tool |<-| Graph || |
| | +---------+ +---------+ +---------+ | |
| +-------------------------------------------+ |
+--------------------+----------------------------+
|
| Findings
V
+-------------------------------------------------+
| Summarizer Agent |
| (Condenses research findings) |
+--------------------+----------------------------+
|
| Summarized Findings
V
+-------------------------------------------------+
| Critic Agent |
| (Evaluates quality & relevance) |
+--------------------+----------------------------+
|
| Evaluation Report
V
+-------------------------------------------------+
| Orchestrator Agent |
| (Aggregates, sends response) |
+--------------------+----------------------------+
|
| Final Research Report
V
+-------------------------------------------------+
| External Client |
+-------------------------------------------------+
B. Core Components:
The following Python code represents the core components of the Cognito Research Agency. Each component is designed as a distinct capability, adhering to the Nucleus structure, Contracts, and considering Evolution Envelopes and Efficiency Gradients.
File: cognito_agency/common/contracts.py
# cognito_agency/common/contracts.py
# This file defines all Capability Contracts for the Cognito Research Agency.
from typing import Dict, Any, List, Literal, TypedDict
# --- WebSearchTool Contract ---
class WebSearchResult(TypedDict):
"""Represents a single web search result snippet."""
title: str
url: str
snippet: str
class WebSearchProvision(TypedDict):
"""Defines the provision of the WebSearchTool capability."""
operation: Literal["search"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
WEB_SEARCH_TOOL_CONTRACT = {
"capability_id": "web-search-tool-v1",
"description": "Provides web search functionality to retrieve relevant information.",
"provisions": [
WebSearchProvision(
operation="search",
input_schema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query string."}
},
"required": ["query"]
},
output_schema={
"type": "array",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"url": {"type": "string", "format": "uri"},
"snippet": {"type": "string"}
},
"required": ["title", "url", "snippet"]
}
},
protocol={
"type": "REST",
"method": "GET",
"endpoint_path": "/search",
"expected_latency_ms": 2000,
"max_rps": 100
}
)
]
}
# --- LLMInferenceService Contract ---
class LLMInferenceRequest(TypedDict):
"""Input schema for LLM inference request."""
prompt: str
model_config: Dict[str, Any]
gradient: Literal["fast_local", "high_quality_cloud"]
class LLMInferenceResponse(TypedDict):
"""Output schema for LLM inference response."""
status: Literal["success", "error"]
generated_text: str
message: str
class LLMInferenceProvision(TypedDict):
"""Defines the provision of the LLMInferenceService capability."""
operation: Literal["generate_text"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
LLM_INFERENCE_SERVICE_CONTRACT = {
"capability_id": "llm-inference-service-v1",
"description": "Provides text generation capabilities using various LLM backends.",
"provisions": [
LLMInferenceProvision(
operation="generate_text",
input_schema={
"type": "object",
"properties": {
"prompt": {"type": "string"},
"model_config": {"type": "object"},
"gradient": {"type": "string", "enum": ["fast_local", "high_quality_cloud"], "default": "fast_local"}
},
"required": ["prompt"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"generated_text": {"type": "string"},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/infer",
"expected_latency_ms": 5000,
"max_rps": 50
}
)
]
}
# --- ResearcherAgent Contract (v1 and v2) ---
class ResearchRequestV1(TypedDict):
"""Input schema for research request v1."""
topic: str
context: Dict[str, Any]
class ResearchResponseV1(TypedDict):
"""Output schema for research response v1."""
status: Literal["success", "error"]
findings: str
message: str
class ResearcherProvisionV1(TypedDict):
"""Provision for ResearcherAgent v1."""
operation: Literal["conduct_research"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
class ResearcherRequirement(TypedDict):
"""Defines a generic requirement for a capability."""
capability_id: str
provision_operation: str
min_version: str
RESEARCHER_AGENT_CONTRACT_V1 = {
"capability_id": "researcher-agent-v1",
"description": "An agent capable of conducting basic research on a given topic.",
"provisions": [
ResearcherProvisionV1(
operation="conduct_research",
input_schema={
"type": "object",
"properties": {
"topic": {"type": "string", "description": "The main topic to research."},
"context": {"type": "object", "description": "Additional context or keywords."}
},
"required": ["topic"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"findings": {"type": "string"},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/research",
"expected_latency_ms": 10000,
"max_rps": 10
}
)
],
"requirements": [
ResearcherRequirement(
capability_id="web-search-tool-v1",
provision_operation="search",
min_version="v1"
),
ResearcherRequirement(
capability_id="llm-inference-service-v1",
provision_operation="generate_text",
min_version="v1"
)
]
}
class ResearchRequestV2(TypedDict):
"""Input schema for research request v2."""
topic: str
context: Dict[str, Any]
depth: Literal["shallow", "deep"]
class ResearchResponseV2(TypedDict):
"""Output schema for research response v2."""
status: Literal["success", "error"]
findings: str
knowledge_graph_links: List[str]
message: str
class ResearcherProvisionV2(TypedDict):
"""Provision for ResearcherAgent v2."""
operation: Literal["conduct_advanced_research"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
RESEARCHER_AGENT_CONTRACT_V2 = {
"capability_id": "researcher-agent-v2",
"description": "An advanced agent capable of semantic research and knowledge graph integration.",
"provisions": [
ResearcherProvisionV2(
operation="conduct_advanced_research",
input_schema={
"type": "object",
"properties": {
"topic": {"type": "string", "description": "The main topic for advanced research."},
"context": {"type": "object", "description": "Extended context, including ontologies or specific entities."},
"depth": {"type": "string", "enum": ["shallow", "deep"], "default": "shallow"}
},
"required": ["topic"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"findings": {"type": "string"},
"knowledge_graph_links": {"type": "array", "items": {"type": "string", "format": "uri"}},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/advanced_research",
"expected_latency_ms": 15000,
"max_rps": 5
}
)
],
"requirements": [
ResearcherRequirement(
capability_id="web-search-tool-v1",
provision_operation="search",
min_version="v1"
),
ResearcherRequirement(
capability_id="llm-inference-service-v1",
provision_operation="generate_text",
min_version="v1"
),
# New requirements for v2
ResearcherRequirement(
capability_id="semantic-search-tool-v1",
provision_operation="semantic_search",
min_version="v1"
),
ResearcherRequirement(
capability_id="knowledge-graph-service-v1",
provision_operation="query_graph",
min_version="v1"
)
]
}
# --- SummarizerAgent Contract ---
class SummarizeRequest(TypedDict):
"""Input schema for summarization request."""
text_to_summarize: str
length_preference: Literal["short", "medium", "long"]
class SummarizeResponse(TypedDict):
"""Output schema for summarization response."""
status: Literal["success", "error"]
summary: str
message: str
class SummarizerProvision(TypedDict):
"""Provision for SummarizerAgent."""
operation: Literal["summarize"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
SUMMARIZER_AGENT_CONTRACT = {
"capability_id": "summarizer-agent-v1",
"description": "An agent capable of summarizing large texts.",
"provisions": [
SummarizerProvision(
operation="summarize",
input_schema={
"type": "object",
"properties": {
"text_to_summarize": {"type": "string"},
"length_preference": {"type": "string", "enum": ["short", "medium", "long"], "default": "medium"}
},
"required": ["text_to_summarize"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"summary": {"type": "string"},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/summarize",
"expected_latency_ms": 7000,
"max_rps": 20
}
)
],
"requirements": [
ResearcherRequirement( # Reusing ResearcherRequirement type
capability_id="llm-inference-service-v1",
provision_operation="generate_text",
min_version="v1"
)
]
}
# --- CriticAgent Contract ---
class EvaluateRequest(TypedDict):
"""Input schema for evaluation request."""
content_to_evaluate: str
criteria: List[str]
source_context: List[str] # Original sources for cross-referencing
class EvaluateResponse(TypedDict):
"""Output schema for evaluation response."""
status: Literal["success", "error"]
evaluation_report: str
score: float
message: str
class CriticProvision(TypedDict):
"""Provision for CriticAgent."""
operation: Literal["evaluate"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
CRITIC_AGENT_CONTRACT = {
"capability_id": "critic-agent-v1",
"description": "An agent capable of critically evaluating content based on criteria and sources.",
"provisions": [
CriticProvision(
operation="evaluate",
input_schema={
"type": "object",
"properties": {
"content_to_evaluate": {"type": "string"},
"criteria": {"type": "array", "items": {"type": "string"}},
"source_context": {"type": "array", "items": {"type": "string"}}
},
"required": ["content_to_evaluate", "criteria"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"evaluation_report": {"type": "string"},
"score": {"type": "number"},
"message": {"type": "string"}
},
"required": ["status", "score"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/evaluate",
"expected_latency_ms": 8000,
"max_rps": 15
}
)
],
"requirements": [
ResearcherRequirement( # Reusing ResearcherRequirement type
capability_id="llm-inference-service-v1",
provision_operation="generate_text",
min_version="v1"
)
]
}
# --- OrchestratorAgent Contract ---
class ResearchTaskRequest(TypedDict):
"""Input schema for an orchestrator research task."""
main_topic: str
initial_context: Dict[str, Any]
desired_output_format: Literal["brief", "detailed", "critical_analysis"]
class ResearchTaskResponse(TypedDict):
"""Output schema for an orchestrator research task response."""
status: Literal["success", "error"]
final_report: str
message: str
class OrchestratorProvision(TypedDict):
"""Provision for OrchestratorAgent."""
operation: Literal["orchestrate_research_task"]
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
protocol: Dict[str, Any]
ORCHESTRATOR_AGENT_CONTRACT = {
"capability_id": "orchestrator-agent-v1",
"description": "Orchestrates complex research tasks by coordinating multiple agents.",
"provisions": [
OrchestratorProvision(
operation="orchestrate_research_task",
input_schema={
"type": "object",
"properties": {
"main_topic": {"type": "string"},
"initial_context": {"type": "object"},
"desired_output_format": {"type": "string", "enum": ["brief", "detailed", "critical_analysis"]}
},
"required": ["main_topic"]
},
output_schema={
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["success", "error"]},
"final_report": {"type": "string"},
"message": {"type": "string"}
},
"required": ["status"]
},
protocol={
"type": "REST",
"method": "POST",
"endpoint_path": "/orchestrate_research",
"expected_latency_ms": 30000, # Long-running task
"max_rps": 1
}
)
],
"requirements": [
ResearcherRequirement(
capability_id="researcher-agent-v2", # Orchestrator uses latest researcher
provision_operation="conduct_advanced_research",
min_version="v2"
),
ResearcherRequirement(
capability_id="summarizer-agent-v1",
provision_operation="summarize",
min_version="v1"
),
ResearcherRequirement(
capability_id="critic-agent-v1",
provision_operation="evaluate",
min_version="v1"
)
]
}
File: cognito_agency/common/evolution_envelopes.py
# cognito_agency/common/evolution_envelopes.py
# This file defines all Capability Evolution Envelopes.
from typing import Dict, Any, List, TypedDict
from cognito_agency.common.contracts import (
RESEARCHER_AGENT_CONTRACT_V1, RESEARCHER_AGENT_CONTRACT_V2,
WEB_SEARCH_TOOL_CONTRACT, LLM_INFERENCE_SERVICE_CONTRACT,
SUMMARIZER_AGENT_CONTRACT, CRITIC_AGENT_CONTRACT, ORCHESTRATOR_AGENT_CONTRACT
)
class CapabilityEvolutionEnvelope(TypedDict):
"""Defines the evolution strategy for a capability."""
capability_id: str # Base ID without version
current_version: str
supported_versions: List[str]
deprecation_policy: Dict[str, Any]
migration_guide_url: str
contract_history: Dict[str, Any]
RESEARCHER_AGENT_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "researcher-agent",
"current_version": "v2",
"supported_versions": ["v1", "v2"],
"deprecation_policy": {
"v1": {
"deprecated_on": "2024-12-01",
"end_of_life": "2025-06-01",
"notes": "Consumers should migrate to 'conduct_advanced_research' provision in v2. "
"V1 only supports basic keyword search, v2 offers semantic capabilities."
}
},
"migration_guide_url": "https://cognito-docs.agency/researcher-agent/v1-to-v2-migration",
"contract_history": {
"v1": RESEARCHER_AGENT_CONTRACT_V1,
"v2": RESEARCHER_AGENT_CONTRACT_V2
}
}
WEB_SEARCH_TOOL_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "web-search-tool",
"current_version": "v1",
"supported_versions": ["v1"],
"deprecation_policy": {}, # No deprecation yet
"migration_guide_url": "",
"contract_history": {
"v1": WEB_SEARCH_TOOL_CONTRACT
}
}
LLM_INFERENCE_SERVICE_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "llm-inference-service",
"current_version": "v1",
"supported_versions": ["v1"],
"deprecation_policy": {},
"migration_guide_url": "",
"contract_history": {
"v1": LLM_INFERENCE_SERVICE_CONTRACT
}
}
SUMMARIZER_AGENT_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "summarizer-agent",
"current_version": "v1",
"supported_versions": ["v1"],
"deprecation_policy": {},
"migration_guide_url": "",
"contract_history": {
"v1": SUMMARIZER_AGENT_CONTRACT
}
}
CRITIC_AGENT_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "critic-agent",
"current_version": "v1",
"supported_versions": ["v1"],
"deprecation_policy": {},
"migration_guide_url": "",
"contract_history": {
"v1": CRITIC_AGENT_CONTRACT
}
}
ORCHESTRATOR_AGENT_EVOLUTION_ENVELOPE: CapabilityEvolutionEnvelope = {
"capability_id": "orchestrator-agent",
"current_version": "v1",
"supported_versions": ["v1"],
"deprecation_policy": {},
"migration_guide_url": "",
"contract_history": {
"v1": ORCHESTRATOR_AGENT_CONTRACT
}
}
File: cognito_agency/capability_registry/registry_service.py
# cognito_agency/capability_registry/registry_service.py
# The central Capability Registry for the Cognito Research Agency.
from typing import Dict, Any, List, Set, Tuple
import json
from cognito_agency.common.contracts import (
WEB_SEARCH_TOOL_CONTRACT, LLM_INFERENCE_SERVICE_CONTRACT,
RESEARCHER_AGENT_CONTRACT_V1, RESEARCHER_AGENT_CONTRACT_V2,
SUMMARIZER_AGENT_CONTRACT, CRITIC_AGENT_CONTRACT, ORCHESTRATOR_AGENT_CONTRACT
)
from cognito_agency.common.evolution_envelopes import (
WEB_SEARCH_TOOL_EVOLUTION_ENVELOPE, LLM_INFERENCE_SERVICE_EVOLUTION_ENVELOPE,
RESEARCHER_AGENT_EVOLUTION_ENVELOPE, SUMMARIZER_AGENT_EVOLUTION_ENVELOPE,
CRITIC_AGENT_EVOLUTION_ENVELOPE, ORCHESTRATOR_AGENT_EVOLUTION_ENVELOPE
)
class CapabilityRegistry:
"""
The central registry for all capabilities in the Cognito Research Agency.
Manages contracts, evolution envelopes, and detects circular dependencies.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(CapabilityRegistry, cls).__new__(cls)
cls._instance._capabilities: Dict[str, Dict[str, Any]] = {} # capability_id -> full_contract
cls._instance._evolution_envelopes: Dict[str, Dict[str, Any]] = {} # base_id -> evolution_envelope
cls._instance._init_default_capabilities()
print("CapabilityRegistry: Singleton instance initialized.")
return cls._instance
def _init_default_capabilities(self):
"""Registers all predefined capabilities and their evolution envelopes."""
self.register_capability(WEB_SEARCH_TOOL_CONTRACT, WEB_SEARCH_TOOL_EVOLUTION_ENVELOPE)
self.register_capability(LLM_INFERENCE_SERVICE_CONTRACT, LLM_INFERENCE_SERVICE_EVOLUTION_ENVELOPE)
self.register_capability(RESEARCHER_AGENT_CONTRACT_V1, RESEARCHER_AGENT_EVOLUTION_ENVELOPE)
self.register_capability(RESEARCHER_AGENT_CONTRACT_V2, RESEARCHER_AGENT_EVOLUTION_ENVELOPE)
self.register_capability(SUMMARIZER_AGENT_CONTRACT, SUMMARIZER_AGENT_EVOLUTION_ENVELOPE)
self.register_capability(CRITIC_AGENT_CONTRACT, CRITIC_AGENT_EVOLUTION_ENVELOPE)
self.register_capability(ORCHESTRATOR_AGENT_CONTRACT, ORCHESTRATOR_AGENT_EVOLUTION_ENVELOPE)
print("CapabilityRegistry: Default capabilities registered.")
def register_capability(self, capability_contract: Dict[str, Any], evolution_envelope: Dict[str, Any] = None):
"""
Registers a new capability and its contract.
If an evolution envelope is provided, it's also stored.
"""
capability_id = capability_contract["capability_id"]
base_id = capability_id.rsplit('-', 1)[0] # e.g., "researcher-agent-v1" -> "researcher-agent"
if capability_id in self._capabilities:
print(f"CapabilityRegistry: Warning: Capability '{capability_id}' already registered. Overwriting.")
self._capabilities[capability_id] = capability_contract
print(f"CapabilityRegistry: Capability '{capability_id}' registered successfully.")
if evolution_envelope:
self._evolution_envelopes[base_id] = evolution_envelope
print(f"CapabilityRegistry: Evolution envelope for '{base_id}' registered.")
def get_capability_contract(self, capability_id: str) -> Dict[str, Any]:
"""Retrieves a capability's contract by its ID."""
if capability_id not in self._capabilities:
raise ValueError(f"CapabilityRegistry: Capability '{capability_id}' not found in registry.")
return self._capabilities[capability_id]
def get_evolution_envelope(self, base_capability_id: str) -> Dict[str, Any]:
"""Retrieves a capability's evolution envelope by its base ID."""
if base_capability_id not in self._evolution_envelopes:
raise ValueError(f"CapabilityRegistry: Evolution envelope for '{base_capability_id}' not found.")
return self._evolution_envelopes[base_capability_id]
def _build_dependency_graph(self) -> Dict[str, List[str]]:
"""
Internal helper to build a graph where nodes are capabilities
and edges represent 'requires' relationships.
"""
graph: Dict[str, List[str]] = {}
for cap_id, contract in self._capabilities.items():
graph[cap_id] = []
requirements = contract.get("requirements", [])
for req in requirements:
required_cap_id = req["capability_id"]
# Only add to graph if the required capability is actually registered.
# This prevents dangling edges for non-existent capabilities.
if required_cap_id in self._capabilities:
graph[cap_id].append(required_cap_id)
else:
print(f"CapabilityRegistry: Warning: Capability '{cap_id}' requires '{required_cap_id}' which is not registered. Skipping dependency for cycle detection.")
return graph
def detect_cycles(self) -> List[List[str]]:
"""
Detects circular dependencies in the capability graph using Depth First Search (DFS).
Returns a list of cycles found.
"""
graph = self._build_dependency_graph()
visited: Set[str] = set()
recursion_stack: Set[str] = set()
cycles: List[List[str]] = []
def dfs(node: str, path: List[str]):
visited.add(node)
recursion_stack.add(node)
path.append(node)
for neighbor in graph.get(node, []):
if neighbor not in visited:
dfs(neighbor, path)
elif neighbor in recursion_stack:
# Cycle detected!
cycle_start_index = path.index(neighbor)
cycles.append(path[cycle_start_index:])
recursion_stack.remove(node)
path.pop() # Backtrack
for node in graph:
if node not in visited:
dfs(node, [])
return cycles
def get_all_capabilities(self) -> Dict[str, Any]:
"""Returns all registered capabilities."""
return self._capabilities
def get_all_evolution_envelopes(self) -> Dict[str, Any]:
"""Returns all registered evolution envelopes."""
return self._evolution_envelopes
# Example of using the registry (can be run standalone for testing)
if __name__ == "__main__":
registry = CapabilityRegistry()
print("\n--- Initial Cycle Detection ---")
cycles = registry.detect_cycles()
if cycles:
print("\n!!! Circular dependencies detected during initial setup !!!")
for cycle in cycles:
print(f" Cycle: {' -> '.join(cycle)}")
else:
print("\nNo circular dependencies detected during initial setup. System is stable.")
print("\n--- Fetching a capability contract ---")
try:
researcher_v2_contract = registry.get_capability_contract("researcher-agent-v2")
print(f"Researcher Agent v2 contract: {json.dumps(researcher_v2_contract['provisions'][0], indent=2)}")
except ValueError as e:
print(e)
print("\n--- Fetching an evolution envelope ---")
try:
researcher_envelope = registry.get_evolution_envelope("researcher-agent")
print(f"Researcher Agent current version: {researcher_envelope['current_version']}")
print(f"Researcher Agent v1 deprecation policy: {researcher_envelope['deprecation_policy']['v1']}")
except ValueError as e:
print(e)
# Simulate adding a problematic capability that creates a cycle
print("\n--- Simulating a problematic capability to create a cycle ---")
PROBLEM_CAPABILITY_CONTRACT = {
"capability_id": "problematic-agent-v1",
"description": "An agent with a circular dependency for demonstration.",
"provisions": [],
"requirements": [
{"capability_id": "orchestrator-agent-v1", "provision_operation": "orchestrate_research_task", "min_version": "v1"}
]
}
registry.register_capability(PROBLEM_CAPABILITY_CONTRACT)
# Now, let's *conceptually* modify orchestrator-agent-v1 to require problematic-agent-v1
# This is for demonstration. In a real system, this would be a new version of orchestrator.
orchestrator_contract_modified = registry.get_capability_contract("orchestrator-agent-v1")
orchestrator_contract_modified["requirements"].append(
{"capability_id": "problematic-agent-v1", "provision_operation": "do_something", "min_version": "v1"}
)
registry.register_capability(orchestrator_contract_modified) # Overwrite existing
print("\n--- Cycle Detection after introducing a problematic capability ---")
cycles_after_problem = registry.detect_cycles()
if cycles_after_problem:
print("\n!!! Circular dependencies detected !!!")
for cycle in cycles_after_problem:
print(f" Cycle: {' -> '.join(cycle)}")
else:
print("\nNo circular dependencies detected.")
File: cognito_agency/services/llm_inference_service.py
# cognito_agency/services/llm_inference_service.py
# LLM Inference Service Capability
from abc import ABC, abstractmethod
from typing import Dict, Any, Literal, List
import os
import requests # Assuming 'requests' is installed for HTTP calls
# -----------------------------------------------------------------------------
# 1. Essence Layer: Core LLM Inference Logic (abstracted)
# -----------------------------------------------------------------------------
class LLMInferenceEssence(ABC):
"""
Abstract base class for the core LLM inference logic.
Defines the fundamental operation of generating text.
"""
@abstractmethod
def process_prompt(self, prompt: str, config: Dict[str, Any]) -> str:
"""
Processes a prompt and generates a response based on configuration.
This might involve prompt re-writing, safety checks, context management.
"""
pass
class BasicLLMInferenceEssence(LLMInferenceEssence):
"""
A concrete implementation of the LLM inference essence.
This layer focuses on the *logic* of inference, not the *mechanism*.
"""
def process_prompt(self, prompt: str, config: Dict[str, Any]) -> str:
print(f"LLM Essence: Pre-processing prompt with config: {config.get('model', 'default')}")
# In a real scenario, this might involve:
# - Adding system instructions
# - Truncating prompt if too long
# - Checking for sensitive information
return prompt # The actual generation is handled by Realization
# -----------------------------------------------------------------------------
# 2. Realization Layer: Different LLM Backends (Efficiency Gradients)
# -----------------------------------------------------------------------------
class LLMBackend(ABC):
"""Abstract interface for different LLM backend implementations."""
@abstractmethod
def infer(self, processed_prompt: str, model_name: str, options: Dict[str, Any]) -> str:
"""Performs inference using a specific LLM model and options."""
pass
class LocalOllamaBackend(LLMBackend):
"""
Realization for a local Ollama instance, suitable for fast, local inference.
Supports various GPU architectures via Ollama's underlying capabilities.
"""
def __init__(self, ollama_url: str = "http://localhost:11434", gpu_backend: str = "cuda"):
self.ollama_url = os.getenv("OLLAMA_SERVICE_URL", ollama_url)
self.gpu_backend = os.getenv("OLLAMA_GPU_ARCH", gpu_backend)
print(f"LLM Realization: Initialized Local Ollama Backend ({self.gpu_backend}) at {self.ollama_url}")
# Basic check for Ollama availability
try:
requests.get(f"{self.ollama_url}/api/tags", timeout=2)
print("LLM Realization: Ollama server is reachable.")
except requests.exceptions.ConnectionError:
print("LLM Realization: Warning: Ollama server not reachable. Using mock responses.")
self.ollama_url = None # Disable actual calls
def infer(self, processed_prompt: str, model_name: str, options: Dict[str, Any]) -> str:
print(f"LLM Realization: Local Ollama ({self.gpu_backend}) inferring for '{model_name}' with prompt: '{processed_prompt[:50]}...'")
if not self.ollama_url:
return f"MOCK: Local Ollama ({self.gpu_backend}) generated for: '{processed_prompt[:100]}...'"
try:
response = requests.post(
f"{self.ollama_url}/api/generate",
json={"model": model_name, "prompt": processed_prompt, **options},
timeout=60 # Allow longer inference times
)
response.raise_for_status()
return response.json()["response"]
except requests.exceptions.RequestException as e:
print(f"LLM Realization: Error calling Ollama: {e}. Falling back to mock.")
return f"ERROR_MOCK: Local Ollama ({self.gpu_backend}) failed for: '{processed_prompt[:100]}...'"
class CloudOpenAIBackend(LLMBackend):
"""
Realization for a cloud-based OpenAI service, suitable for high-quality, scalable inference.
"""
def __init__(self, api_key: str = None):
self.api_key = os.getenv("OPENAI_API_KEY", api_key)
if not self.api_key:
print("LLM Realization: Warning: OPENAI_API_KEY not set for Cloud OpenAI Backend. Using mock responses.")
print("LLM Realization: Initialized Cloud OpenAI Backend.")
def infer(self, processed_prompt: str, model_name: str, options: Dict[str, Any]) -> str:
print(f"LLM Realization: Cloud OpenAI inferring for '{model_name}' with prompt: '{processed_prompt[:50]}...'")
if not self.api_key:
return f"MOCK: Cloud OpenAI generated for: '{processed_prompt[:100]}...'"
# This would use the official OpenAI Python client
# import openai
# openai.api_key = self.api_key
# try:
# response = openai.ChatCompletion.create(
# model=model_name,
# messages=[{"role": "user", "content": processed_prompt}],
# **options
# )
# return response.choices[0].message.content
# except Exception as e:
# print(f"LLM Realization: Error calling OpenAI: {e}. Falling back to mock.")
# return f"ERROR_MOCK: Cloud OpenAI failed for: '{processed_prompt[:100]}...'"
return f"MOCK: Cloud OpenAI generated for: '{processed_prompt[:100]}...'"
class LLMInferenceRealization:
"""
Realization layer for the LLMInferenceCapability, selecting backend based on gradient.
"""
def __init__(self, essence: LLMInferenceEssence,
local_backend: LocalOllamaBackend,
cloud_backend: CloudOpenAIBackend):
self._essence = essence
self._local_backend = local_backend
self._cloud_backend = cloud_backend
print("LLM Realization: LLMInferenceRealization initialized with multiple backends.")
def generate_text(self, prompt: str,
model_config: Dict[str, Any],
gradient_preference: Literal["fast_local", "high_quality_cloud"] = "fast_local") -> str:
"""
Generates text, choosing the backend based on gradient preference.
"""
processed_prompt = self._essence.process_prompt(prompt, model_config)
if gradient_preference == "fast_local":
print("LLM Realization: Choosing 'fast_local' gradient (Ollama).")
return self._local_backend.infer(processed_prompt, model_config.get("model", "llama2"), model_config)
elif gradient_preference == "high_quality_cloud":
print("LLM Realization: Choosing 'high_quality_cloud' gradient (OpenAI).")
return self._cloud_backend.infer(processed_prompt, model_config.get("model", "gpt-4"), model_config)
else:
raise ValueError(f"LLM Realization: Unknown gradient preference: {gradient_preference}")
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface for LLM Inference
# -----------------------------------------------------------------------------
from flask import Flask, request, jsonify # Using Flask for API (install with pip install Flask)
class LLMInferenceAdaptation:
"""
Adaptation layer for the LLMInferenceCapability.
Exposes a unified interface for LLM inference, abstracting backend choice.
"""
def __init__(self, realization: LLMInferenceRealization):
self._realization = realization
self.app = Flask(__name__)
self._setup_routes()
print("LLM Adaptation: LLMInferenceAdaptation initialized with Flask app.")
def _setup_routes(self):
@self.app.route("/infer", methods=["POST"])
def inference_endpoint():
request_data = request.get_json()
prompt = request_data.get("prompt")
model_config = request_data.get("model_config", {})
gradient = request_data.get("gradient", "fast_local")
if not prompt:
return jsonify({"error": "Prompt is required for inference."}), 400
print(f"LLM Adaptation: Received inference request with gradient: '{gradient}'")
try:
generated_text = self._realization.generate_text(prompt, model_config, gradient)
return jsonify({"status": "success", "generated_text": generated_text}), 200
except ValueError as e:
print(f"LLM Adaptation: Error during LLM inference: {e}")
return jsonify({"status": "error", "message": str(e)}), 400
except Exception as e:
print(f"LLM Adaptation: Unexpected error during LLM inference: {e}")
return jsonify({"status": "error", "message": "Internal Server Error"}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "service": "llm-inference-service"}), 200
def run(self, host: str = "0.0.0.0", port: int = 8001):
print(f"LLM Adaptation: Starting LLM Inference Service on {host}:{port}")
self.app.run(host=host, port=port)
# Main entry point for the LLM Inference Service
if __name__ == "__main__":
llm_essence = BasicLLMInferenceEssence()
# The actual GPU backend for Ollama is configured via its image/runtime.
# Here, we instantiate a backend that *represents* a specific configuration.
# In a real Kubernetes deployment, the `OLLAMA_GPU_ARCH` env var would guide Ollama.
local_ollama_backend = LocalOllamaBackend(
ollama_url=os.getenv("OLLAMA_SERVICE_URL", "http://localhost:11434"),
gpu_backend=os.getenv("OLLAMA_GPU_ARCH", "cuda") # Default to CUDA
)
cloud_openai_backend = CloudOpenAIBackend(api_key=os.getenv("OPENAI_API_KEY"))
llm_realization = LLMInferenceRealization(
essence=llm_essence,
local_backend=local_ollama_backend,
cloud_backend=cloud_openai_backend
)
llm_inference_app = LLMInferenceAdaptation(realization=llm_realization)
llm_inference_app.run()
File: cognito_agency/services/web_search_tool.py
# cognito_agency/services/web_search_tool.py
# Web Search Tool Capability (simulating MCP server interaction)
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import os
import requests # Assuming 'requests' is installed for HTTP calls
from flask import Flask, request, jsonify # Using Flask for API
# -----------------------------------------------------------------------------
# 1. Essence Layer: Core Search Logic (abstracted)
# -----------------------------------------------------------------------------
class WebSearchEssence(ABC):
"""
Abstract base class for the core web search logic.
Defines the fundamental operation of formulating and refining search queries.
"""
@abstractmethod
def refine_query(self, raw_query: str) -> str:
"""Refines a raw query for optimal search results."""
pass
@abstractmethod
def filter_results(self, raw_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Filters and ranks raw search results for relevance."""
pass
class BasicWebSearchEssence(WebSearchEssence):
"""
A concrete implementation of the web search essence.
"""
def refine_query(self, raw_query: str) -> str:
print(f"Web Search Essence: Refining query: '{raw_query[:50]}...'")
# Simple refinement: ensure keywords are present, add quotes if needed.
return f'"{raw_query}" site:wikipedia.org OR site:arxiv.org'
def filter_results(self, raw_results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
print(f"Web Search Essence: Filtering {len(raw_results)} raw results.")
# Simple filtering: return top 2 results
return raw_results[:2]
# -----------------------------------------------------------------------------
# 2. Realization Layer: External Web Search API Integration
# -----------------------------------------------------------------------------
class ExternalSearchAPI(ABC):
"""Abstract interface for an external web search API."""
@abstractmethod
def perform_search(self, query: str) -> List[Dict[str, Any]]:
"""Calls the external search API and returns raw results."""
pass
class MCPWebSearchAPI(ExternalSearchAPI):
"""
Realization for interacting with a Web Search tool provided by MCP servers.
This simulates an external API call.
"""
def __init__(self, mcp_api_url: str = "http://mcp-search-service:8080/search"):
self.mcp_api_url = os.getenv("MCP_SEARCH_SERVICE_URL", mcp_api_url)
print(f"Web Search Realization: Initialized MCP Web Search API at {self.mcp_api_url}")
# In a real system, you'd check API connectivity.
def perform_search(self, query: str) -> List[Dict[str, Any]]:
print(f"Web Search Realization: Calling MCP search for query: '{query[:50]}...'")
# Simulate an HTTP GET request to the MCP search service.
# In a real system, this would use requests.get()
# response = requests.get(f"{self.mcp_api_url}?q={query}")
# response.raise_for_status()
# return response.json()["results"]
# Mock results for demonstration
return [
{"title": f"Result 1 for {query}", "url": f"http://example.com/r1?q={query}", "snippet": f"First snippet for {query} content..."},
{"title": f"Result 2 for {query}", "url": f"http://example.com/r2?q={query}", "snippet": f"Second snippet for {query} content..."},
{"title": f"Result 3 for {query}", "url": f"http://example.com/r3?q={query}", "snippet": f"Third snippet for {query} content..."}
]
class WebSearchRealization:
"""
Realization layer for the WebSearchTool, orchestrating Essence and External API.
"""
def __init__(self, essence: WebSearchEssence, external_api: ExternalSearchAPI):
self._essence = essence
self._external_api = external_api
print("Web Search Realization: WebSearchRealization initialized.")
def search(self, query: str) -> List[Dict[str, Any]]:
"""
Conducts a web search using the Essence for query refinement and result filtering.
"""
refined_query = self._essence.refine_query(query)
raw_results = self._external_api.perform_search(refined_query)
filtered_results = self._essence.filter_results(raw_results)
print(f"Web Search Realization: Returning {len(filtered_results)} filtered results.")
return filtered_results
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface (REST API)
# -----------------------------------------------------------------------------
class WebSearchAdaptation:
"""
Adaptation layer for the WebSearchTool.
Provides a simple REST API endpoint for external clients to interact.
"""
def __init__(self, realization: WebSearchRealization):
self._realization = realization
self.app = Flask(__name__)
self._setup_routes()
print("Web Search Adaptation: WebSearchAdaptation initialized with Flask app.")
def _setup_routes(self):
@self.app.route("/search", methods=["GET"])
def search_endpoint():
query = request.args.get("query")
if not query:
return jsonify({"error": "Query parameter is required."}), 400
print(f"Web Search Adaptation: Received search request for query: '{query}'")
try:
results = self._realization.search(query)
return jsonify({"status": "success", "results": results}), 200
except Exception as e:
print(f"Web Search Adaptation: Error during web search: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "service": "web-search-tool"}), 200
def run(self, host: str = "0.0.0.0", port: int = 8002):
print(f"Web Search Adaptation: Starting Web Search Tool on {host}:{port}")
self.app.run(host=host, port=port)
# Main entry point for the Web Search Tool Service
if __name__ == "__main__":
web_search_essence = BasicWebSearchEssence()
mcp_api = MCPWebSearchAPI(mcp_api_url=os.getenv("MCP_SEARCH_SERVICE_URL", "http://localhost:8080/search"))
web_search_realization = WebSearchRealization(
essence=web_search_essence,
external_api=mcp_api
)
web_search_app = WebSearchAdaptation(realization=web_search_realization)
web_search_app.run()
File: cognito_agency/agents/researcher_agent.py
# cognito_agency/agents/researcher_agent.py
# Researcher Agent Capability (v2)
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Literal
import os
import requests
from flask import Flask, request, jsonify
# -----------------------------------------------------------------------------
# Helper for making HTTP calls to other capabilities
# -----------------------------------------------------------------------------
class CapabilityClient:
"""A simple client to interact with other capabilities via HTTP."""
def __init__(self, base_url: str):
self.base_url = base_url
print(f"CapabilityClient: Initialized for base URL: {base_url}")
def post(self, path: str, data: Dict[str, Any]) -> Dict[str, Any]:
try:
response = requests.post(f"{self.base_url}{path}", json=data, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"CapabilityClient: Error POSTing to {self.base_url}{path}: {e}")
raise
def get(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]:
try:
response = requests.get(f"{self.base_url}{path}", params=params, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"CapabilityClient: Error GETting from {self.base_url}{path}: {e}")
raise
# -----------------------------------------------------------------------------
# 1. Essence Layer: Core Research Logic (for v2)
# -----------------------------------------------------------------------------
class ResearchEssenceV2(ABC):
"""
Abstract base class for the core research logic (v2).
Includes advanced query formulation and synthesis.
"""
@abstractmethod
def formulate_advanced_query(self, topic: str, context: Dict[str, Any], depth: str) -> str:
"""
Formulates an advanced query, potentially for semantic search or knowledge graphs.
"""
pass
@abstractmethod
def synthesize_advanced_findings(self, raw_data: List[str], knowledge_graph_info: List[str], initial_query: str) -> str:
"""
Synthesizes raw research data and knowledge graph information into coherent findings.
"""
pass
class AdvancedResearchEssence(ResearchEssenceV2):
"""
A concrete implementation of the advanced research essence.
"""
def formulate_advanced_query(self, topic: str, context: Dict[str, Any], depth: str) -> str:
print(f"Researcher Essence: Formulating advanced query for topic '{topic}' (depth: {depth}) with context: {context}")
# More sophisticated query formulation for semantic search
keywords = context.get('keywords', [])
entities = context.get('entities', [])
return f"Semantic research on '{topic}'. Keywords: {', '.join(keywords)}. Entities: {', '.join(entities)}. Depth: {depth}."
def synthesize_advanced_findings(self, raw_data: List[str], knowledge_graph_info: List[str], initial_query: str) -> str:
print(f"Researcher Essence: Synthesizing findings from {len(raw_data)} data points and {len(knowledge_graph_info)} KG links.")
combined_data = " ".join(raw_data) + " ".join(knowledge_graph_info)
# In a real scenario, this would use LLM for synthesis or complex NLP.
return f"Advanced synthesis for '{initial_query}': {combined_data[:500]}..."
# -----------------------------------------------------------------------------
# 2. Realization Layer: Orchestrating LLM, Web Search, Semantic Search, KG
# -----------------------------------------------------------------------------
class ResearcherRealizationV2:
"""
Realization layer for ResearcherAgent v2, orchestrating multiple tools.
"""
def __init__(self, essence: ResearchEssenceV2,
llm_client: CapabilityClient,
web_search_client: CapabilityClient,
semantic_search_client: CapabilityClient, # New for v2
knowledge_graph_client: CapabilityClient): # New for v2
self._essence = essence
self._llm_client = llm_client
self._web_search_client = web_search_client
self._semantic_search_client = semantic_search_client
self._knowledge_graph_client = knowledge_graph_client
print("Researcher Realization: ResearcherAgentV2 Realization initialized.")
def conduct_advanced_research(self, topic: str, context: Dict[str, Any], depth: str) -> Dict[str, Any]:
"""
Orchestrates the advanced research process.
"""
initial_query = self._essence.formulate_advanced_query(topic, context, depth)
all_raw_data = []
knowledge_graph_links = []
try:
# Use semantic search if depth is 'deep' or context suggests it
if depth == "deep" or "semantic_keywords" in context:
print("Researcher Realization: Performing semantic search.")
semantic_results = self._semantic_search_client.post(
"/semantic_search",
{"query": initial_query, "semantic_context": context.get("semantic_keywords", [])}
)
if semantic_results.get("status") == "success":
all_raw_data.extend([res["content"] for res in semantic_results["results"]])
print(f"Researcher Realization: Retrieved {len(semantic_results['results'])} semantic results.")
print("Researcher Realization: Querying knowledge graph.")
kg_query = {"entities": context.get("entities", []), "relations": context.get("relations", [])}
kg_response = self._knowledge_graph_client.post("/query_graph", kg_query)
if kg_response.get("status") == "success":
knowledge_graph_links.extend(kg_response["links"])
all_raw_data.append(kg_response["summary"]) # Add KG summary to raw data
print(f"Researcher Realization: Retrieved {len(kg_response['links'])} KG links.")
# Always perform some web search for broader context
print("Researcher Realization: Performing web search.")
web_search_results = self._web_search_client.get("/search", {"query": initial_query})
if web_search_results.get("status") == "success":
all_raw_data.extend([res["snippet"] for res in web_search_results["results"]])
print(f"Researcher Realization: Retrieved {len(web_search_results['results'])} web search results.")
# Use LLM to refine and synthesize all collected raw data
llm_synthesis_prompt = f"Given the following information:\n\n" \
f"Raw data: {all_raw_data}\n" \
f"Knowledge Graph Links: {knowledge_graph_links}\n\n" \
f"Synthesize a comprehensive report on '{topic}' based on the initial query: '{initial_query}'."
llm_response = self._llm_client.post(
"/infer",
{"prompt": llm_synthesis_prompt, "model_config": {"model": "llama2"}, "gradient": "high_quality_cloud"}
)
if llm_response.get("status") != "success":
raise Exception(f"LLM inference failed: {llm_response.get('message', 'Unknown error')}")
final_synthesis = llm_response["generated_text"]
final_findings = self._essence.synthesize_advanced_findings(
[final_synthesis], knowledge_graph_links, initial_query
)
return {
"status": "success",
"findings": final_findings,
"knowledge_graph_links": knowledge_graph_links
}
except Exception as e:
print(f"Researcher Realization: Error during advanced research: {e}")
return {"status": "error", "message": str(e)}
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface (REST API)
# -----------------------------------------------------------------------------
class ResearcherAgentAdaptationV2:
"""
Adaptation layer for the ResearcherAgent v2.
Provides a REST API endpoint for advanced research requests.
"""
def __init__(self, realization: ResearcherRealizationV2):
self._realization = realization
self.app = Flask(__name__)
self._setup_routes()
print("Researcher Adaptation: ResearcherAgentV2 Adaptation initialized with Flask app.")
def _setup_routes(self):
@self.app.route("/advanced_research", methods=["POST"])
def advanced_research_endpoint():
request_data = request.get_json()
topic = request_data.get("topic")
context = request_data.get("context", {})
depth = request_data.get("depth", "shallow")
if not topic:
return jsonify({"error": "Topic is required for advanced research."}), 400
print(f"Researcher Adaptation: Received advanced research request for topic: '{topic}' (depth: {depth})")
try:
response_data = self._realization.conduct_advanced_research(topic, context, depth)
if response_data["status"] == "success":
return jsonify(response_data), 200
else:
return jsonify(response_data), 500
except Exception as e:
print(f"Researcher Adaptation: Unexpected error during advanced research: {e}")
return jsonify({"status": "error", "message": "Internal Server Error"}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "service": "researcher-agent-v2"}), 200
def run(self, host: str = "0.0.0.0", port: int = 8000):
print(f"Researcher Adaptation: Starting Researcher Agent v2 on {host}:{port}")
self.app.run(host=host, port=port)
# Main entry point for Researcher Agent v2
if __name__ == "__main__":
research_essence_v2 = AdvancedResearchEssence()
# Initialize clients for required capabilities
llm_client = CapabilityClient(base_url=os.getenv("LLM_SERVICE_URL", "http://localhost:8001"))
web_search_client = CapabilityClient(base_url=os.getenv("WEB_SEARCH_TOOL_URL", "http://localhost:8002"))
# Semantic Search and Knowledge Graph are new for v2, assume they exist
semantic_search_client = CapabilityClient(base_url=os.getenv("SEMANTIC_SEARCH_TOOL_URL", "http://localhost:8003"))
knowledge_graph_client = CapabilityClient(base_url=os.getenv("KNOWLEDGE_GRAPH_SERVICE_URL", "http://localhost:8004"))
researcher_realization_v2 = ResearcherRealizationV2(
essence=research_essence_v2,
llm_client=llm_client,
web_search_client=web_search_client,
semantic_search_client=semantic_search_client,
knowledge_graph_client=knowledge_graph_client
)
researcher_agent_v2_app = ResearcherAgentAdaptationV2(realization=researcher_realization_v2)
researcher_agent_v2_app.run()
File: cognito_agency/agents/summarizer_agent.py
# cognito_agency/agents/summarizer_agent.py
# Summarizer Agent Capability
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Literal
import os
import requests
from flask import Flask, request, jsonify
# Reusing CapabilityClient from researcher_agent
from cognito_agency.agents.researcher_agent import CapabilityClient
# -----------------------------------------------------------------------------
# 1. Essence Layer: Core Summarization Logic
# -----------------------------------------------------------------------------
class SummarizationEssence(ABC):
"""
Abstract base class for the core summarization logic.
"""
@abstractmethod
def generate_summary_prompt(self, text: str, length_preference: str) -> str:
"""Generates an LLM prompt for summarization based on preferences."""
pass
@abstractmethod
def post_process_summary(self, raw_summary: str) -> str:
"""Performs post-processing on the LLM-generated summary."""
pass
class BasicSummarizationEssence(SummarizationEssence):
"""
A concrete implementation of the summarization essence.
"""
def generate_summary_prompt(self, text: str, length_preference: str) -> str:
print(f"Summarizer Essence: Generating prompt for summary (length: {length_preference}).")
base_prompt = f"Summarize the following text: {text}"
if length_preference == "short":
return f"{base_prompt} Provide a very concise, 1-2 sentence summary."
elif length_preference == "medium":
return f"{base_prompt} Provide a summary of about 3-5 sentences."
elif length_preference == "long":
return f"{base_prompt} Provide a detailed summary, covering all key points."
return base_prompt
def post_process_summary(self, raw_summary: str) -> str:
print("Summarizer Essence: Post-processing raw summary.")
# Example: Remove leading/trailing whitespace, ensure proper punctuation.
return raw_summary.strip().replace("..", ".").replace(" ", " ")
# -----------------------------------------------------------------------------
# 2. Realization Layer: LLM Interaction for Summarization
# -----------------------------------------------------------------------------
class SummarizerRealization:
"""
Realization layer for SummarizerAgent, primarily interacting with LLM.
"""
def __init__(self, essence: SummarizationEssence, llm_client: CapabilityClient):
self._essence = essence
self._llm_client = llm_client
print("Summarizer Realization: SummarizerRealization initialized.")
def summarize(self, text_to_summarize: str, length_preference: str) -> Dict[str, Any]:
"""
Orchestrates the summarization process.
"""
try:
llm_prompt = self._essence.generate_summary_prompt(text_to_summarize, length_preference)
# Use 'high_quality_cloud' gradient for summarization for better quality
llm_response = self._llm_client.post(
"/infer",
{"prompt": llm_prompt, "model_config": {"model": "gpt-4"}, "gradient": "high_quality_cloud"}
)
if llm_response.get("status") != "success":
raise Exception(f"LLM inference failed: {llm_response.get('message', 'Unknown error')}")
raw_summary = llm_response["generated_text"]
final_summary = self._essence.post_process_summary(raw_summary)
return {"status": "success", "summary": final_summary}
except Exception as e:
print(f"Summarizer Realization: Error during summarization: {e}")
return {"status": "error", "message": str(e)}
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface (REST API)
# -----------------------------------------------------------------------------
class SummarizerAgentAdaptation:
"""
Adaptation layer for the SummarizerAgent.
Provides a REST API endpoint for summarization requests.
"""
def __init__(self, realization: SummarizerRealization):
self._realization = realization
self.app = Flask(__name__)
self._setup_routes()
print("Summarizer Adaptation: SummarizerAgentAdaptation initialized with Flask app.")
def _setup_routes(self):
@self.app.route("/summarize", methods=["POST"])
def summarize_endpoint():
request_data = request.get_json()
text_to_summarize = request_data.get("text_to_summarize")
length_preference = request_data.get("length_preference", "medium")
if not text_to_summarize:
return jsonify({"error": "Text to summarize is required."}), 400
print(f"Summarizer Adaptation: Received summarization request (length: {length_preference}).")
try:
response_data = self._realization.summarize(text_to_summarize, length_preference)
if response_data["status"] == "success"]:
return jsonify(response_data), 200
else:
return jsonify(response_data), 500
except Exception as e:
print(f"Summarizer Adaptation: Unexpected error during summarization: {e}")
return jsonify({"status": "error", "message": "Internal Server Error"}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "service": "summarizer-agent"}), 200
def run(self, host: str = "0.0.0.0", port: int = 8005):
print(f"Summarizer Adaptation: Starting Summarizer Agent on {host}:{port}")
self.app.run(host=host, port=port)
# Main entry point for Summarizer Agent
if __name__ == "__main__":
summarization_essence = BasicSummarizationEssence()
llm_client = CapabilityClient(base_url=os.getenv("LLM_SERVICE_URL", "http://localhost:8001"))
summarizer_realization = SummarizerRealization(
essence=summarization_essence,
llm_client=llm_client
)
summarizer_agent_app = SummarizerAgentAdaptation(realization=summarizer_realization)
summarizer_agent_app.run()
File: cognito_agency/agents/critic_agent.py
# cognito_agency/agents/critic_agent.py
# Critic Agent Capability
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import os
import requests
from flask import Flask, request, jsonify
# Reusing CapabilityClient from researcher_agent
from cognito_agency.agents.researcher_agent import CapabilityClient
# -----------------------------------------------------------------------------
# 1. Essence Layer: Core Evaluation Logic
# -----------------------------------------------------------------------------
class EvaluationEssence(ABC):
"""
Abstract base class for the core evaluation logic.
"""
@abstractmethod
def formulate_evaluation_prompt(self, content: str, criteria: List[str], context: List[str]) -> str:
"""Generates an LLM prompt for content evaluation."""
pass
@abstractmethod
def parse_evaluation_response(self, llm_response: str) -> Dict[str, Any]:
"""Parses LLM response into structured evaluation report and score."""
pass
class BasicEvaluationEssence(EvaluationEssence):
"""
A concrete implementation of the evaluation essence.
"""
def formulate_evaluation_prompt(self, content: str, criteria: List[str], context: List[str]) -> str:
print(f"Critic Essence: Formulating evaluation prompt for content (criteria: {criteria}).")
criteria_str = ", ".join(criteria)
context_str = "\n".join([f"- {s}" for s in context]) if context else "None provided."
return f"Evaluate the following content based on the criteria: {criteria_str}\n\n" \
f"Content: {content}\n\n" \
f"For context, consider these sources: {context_str}\n\n" \
f"Provide a detailed report and a numerical score out of 10 for overall quality. " \
f"Format: Report: [Your detailed report]. Score: [0-10]"
def parse_evaluation_response(self, llm_response: str) -> Dict[str, Any]:
print("Critic Essence: Parsing LLM evaluation response.")
report_prefix = "Report: "
score_prefix = "Score: "
report_start = llm_response.find(report_prefix)
score_start = llm_response.find(score_prefix)
report = "Could not parse report."
score = 0.0
if report_start != -1 and score_start != -1:
report_content = llm_response[report_start + len(report_prefix):score_start].strip()
score_content = llm_response[score_start + len(score_prefix):].strip()
report = report_content
try:
score = float(score_content.split(' ')[0]) # Take first number
except ValueError:
pass
elif report_start != -1: # Only report found
report = llm_response[report_start + len(report_prefix):].strip()
elif score_start != -1: # Only score found
score_content = llm_response[score_start + len(score_prefix):].strip()
try:
score = float(score_content.split(' ')[0])
except ValueError:
pass
else: # No specific format found, return raw response as report
report = llm_response
return {"evaluation_report": report, "score": score}
# -----------------------------------------------------------------------------
# 2. Realization Layer: LLM Interaction for Evaluation
# -----------------------------------------------------------------------------
class CriticRealization:
"""
Realization layer for CriticAgent, primarily interacting with LLM.
"""
def __init__(self, essence: EvaluationEssence, llm_client: CapabilityClient):
self._essence = essence
self._llm_client = llm_client
print("Critic Realization: CriticRealization initialized.")
def evaluate(self, content_to_evaluate: str, criteria: List[str], source_context: List[str]) -> Dict[str, Any]:
"""
Orchestrates the content evaluation process.
"""
try:
llm_prompt = self._essence.formulate_evaluation_prompt(content_to_evaluate, criteria, source_context)
# Use 'high_quality_cloud' gradient for critical evaluation
llm_response_raw = self._llm_client.post(
"/infer",
{"prompt": llm_prompt, "model_config": {"model": "gpt-4"}, "gradient": "high_quality_cloud"}
)
if llm_response_raw.get("status") != "success":
raise Exception(f"LLM inference failed: {llm_response_raw.get('message', 'Unknown error')}")
parsed_evaluation = self._essence.parse_evaluation_response(llm_response_raw["generated_text"])
return {
"status": "success",
"evaluation_report": parsed_evaluation["evaluation_report"],
"score": parsed_evaluation["score"]
}
except Exception as e:
print(f"Critic Realization: Error during evaluation: {e}")
return {"status": "error", "message": str(e)}
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface (REST API)
# -----------------------------------------------------------------------------
class CriticAgentAdaptation:
"""
Adaptation layer for the CriticAgent.
Provides a REST API endpoint for evaluation requests.
"""
def __init__(self, realization: CriticRealization):
self._realization = realization
self.app = Flask(__name__)
self._setup_routes()
print("Critic Adaptation: CriticAgentAdaptation initialized with Flask app.")
def _setup_routes(self):
@self.app.route("/evaluate", methods=["POST"])
def evaluate_endpoint():
request_data = request.get_json()
content_to_evaluate = request_data.get("content_to_evaluate")
criteria = request_data.get("criteria", [])
source_context = request_data.get("source_context", [])
if not content_to_evaluate or not criteria:
return jsonify({"error": "Content to evaluate and criteria are required."}), 400
print(f"Critic Adaptation: Received evaluation request (criteria: {criteria}).")
try:
response_data = self._realization.evaluate(content_to_evaluate, criteria, source_context)
if response_data["status"] == "success"]:
return jsonify(response_data), 200
else:
return jsonify(response_data), 500
except Exception as e:
print(f"Critic Adaptation: Unexpected error during evaluation: {e}")
return jsonify({"status": "error", "message": "Internal Server Error"}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "service": "critic-agent"}), 200
def run(self, host: str = "0.0.0.0", port: int = 8006):
print(f"Critic Adaptation: Starting Critic Agent on {host}:{port}")
self.app.run(host=host, port=port)
# Main entry point for Critic Agent
if __name__ == "__main__":
evaluation_essence = BasicEvaluationEssence()
llm_client = CapabilityClient(base_url=os.getenv("LLM_SERVICE_URL", "http://localhost:8001"))
critic_realization = CriticRealization(
essence=evaluation_essence,
llm_client=llm_client
)
critic_agent_app = CriticAgentAdaptation(realization=critic_realization)
critic_agent_app.run()
File: cognito_agency/agents/orchestrator_agent.py
# cognito_agency/agents/orchestrator_agent.py
# Orchestrator Agent Capability
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Literal
import os
import requests
from flask import Flask, request, jsonify
# Reusing CapabilityClient from researcher_agent
from cognito_agency.agents.researcher_agent import CapabilityClient
from cognito_agency.capability_registry.registry_service import CapabilityRegistry
# -----------------------------------------------------------------------------
# 1. Essence Layer: Core Orchestration Logic
# -----------------------------------------------------------------------------
class OrchestrationEssence(ABC):
"""
Abstract base class for the core orchestration logic.
Defines the workflow for complex research tasks.
"""
@abstractmethod
def plan_research_workflow(self, topic: str, context: Dict[str, Any], output_format: str) -> List[Dict[str, Any]]:
"""
Plans a sequence of agent interactions (workflow) based on the research task.
"""
pass
@abstractmethod
def aggregate_results(self, agent_outputs: List[Dict[str, Any]], final_format: str) -> str:
"""
Aggregates and formats the results from various agents into a final report.
"""
pass
class BasicOrchestrationEssence(OrchestrationEssence):
"""
A concrete implementation of the orchestration essence.
"""
def plan_research_workflow(self, topic: str, context: Dict[str, Any], output_format: str) -> List[Dict[str, Any]]:
print(f"Orchestrator Essence: Planning workflow for topic '{topic}', format: '{output_format}'.")
workflow = []
# Step 1: Research
workflow.append({
"agent": "researcher-agent-v2",
"operation": "conduct_advanced_research",
"input": {"topic": topic, "context": context, "depth": "deep" if output_format == "detailed" else "shallow"},
"output_key": "research_findings"
})
# Step 2: Summarize (if not brief)
if output_format != "brief":
workflow.append({
"agent": "summarizer-agent-v1",
"operation": "summarize",
"input_from_previous": "research_findings.findings", # Use findings from researcher
"input_key": "text_to_summarize",
"length_preference": "long" if output_format == "detailed" else "medium",
"output_key": "summary"
})
# Step 3: Critic (if critical analysis)
if output_format == "critical_analysis":
workflow.append({
"agent": "critic-agent-v1",
"operation": "evaluate",
"input_from_previous": "summary.summary", # Evaluate the summary
"input_key": "content_to_evaluate",
"criteria": ["accuracy", "completeness", "objectivity"],
"source_context_from_previous": "research_findings.knowledge_graph_links", # Pass KG links as context
"source_context_key": "source_context",
"output_key": "evaluation_report"
})
return workflow
def aggregate_results(self, agent_outputs: List[Dict[str, Any]], final_format: str) -> str:
print(f"Orchestrator Essence: Aggregating results for final format: '{final_format}'.")
final_report_parts = []
research_findings = next((o["output"] for o in agent_outputs if o["output_key"] == "research_findings"), None)
summary = next((o["output"] for o in agent_outputs if o["output_key"] == "summary"), None)
evaluation_report = next((o["output"] for o in agent_outputs if o["output_key"] == "evaluation_report"), None)
if research_findings and research_findings["status"] == "success":
final_report_parts.append(f"--- Research Findings ---\n{research_findings['findings']}")
if research_findings.get("knowledge_graph_links"):
final_report_parts.append(f"\nKnowledge Graph Links: {', '.join(research_findings['knowledge_graph_links'])}")
if summary and summary["status"] == "success":
final_report_parts.append(f"\n--- Summary ---\n{summary['summary']}")
if evaluation_report and evaluation_report["status"] == "success":
final_report_parts.append(f"\n--- Critical Evaluation ---\nReport: {evaluation_report['evaluation_report']}\nScore: {evaluation_report['score']}/10")
if not final_report_parts:
return "No meaningful results could be aggregated."
return "\n\n".join(final_report_parts)
# -----------------------------------------------------------------------------
# 2. Realization Layer: Inter-Agent Communication and Workflow Execution
# -----------------------------------------------------------------------------
class OrchestratorRealization:
"""
Realization layer for OrchestratorAgent, managing inter-agent communication.
"""
def __init__(self, essence: OrchestrationEssence, capability_registry: CapabilityRegistry):
self._essence = essence
self._registry = capability_registry
self._clients: Dict[str, CapabilityClient] = {} # Cache clients for reuse
print("Orchestrator Realization: OrchestratorRealization initialized.")
def _get_capability_client(self, capability_id: str) -> CapabilityClient:
"""Retrieves or creates a client for a given capability."""
if capability_id not in self._clients:
# In a real system, resolve service URL from Kubernetes DNS or config
# For this example, we'll use environment variables for simplicity
# e.g., RESEARCHER_AGENT_V2_URL, SUMMARIZER_AGENT_V1_URL
env_var_name = capability_id.replace('-', '_').upper() + "_URL"
base_url = os.getenv(env_var_name, f"http://localhost:{self._get_default_port(capability_id)}")
self._clients[capability_id] = CapabilityClient(base_url)
return self._clients[capability_id]
def _get_default_port(self, capability_id: str) -> int:
"""Helper to map capability IDs to default local ports for testing."""
if "llm-inference-service" in capability_id: return 8001
if "web-search-tool" in capability_id: return 8002
if "semantic-search-tool" in capability_id: return 8003 # Placeholder
if "knowledge-graph-service" in capability_id: return 8004 # Placeholder
if "summarizer-agent" in capability_id: return 8005
if "critic-agent" in capability_id: return 8006
if "researcher-agent-v1" in capability_id: return 8000 # Researcher v1
if "researcher-agent-v2" in capability_id: return 8000 # Researcher v2 (same port, different deployment)
return 9000 # Default for others
def orchestrate_research_task(self, main_topic: str, initial_context: Dict[str, Any], desired_output_format: str) -> Dict[str, Any]:
"""
Executes the planned research workflow by calling other agents.
"""
workflow = self._essence.plan_research_workflow(main_topic, initial_context, desired_output_format)
agent_outputs: List[Dict[str, Any]] = []
current_context: Dict[str, Any] = {} # Context to pass between agents
for step in workflow:
agent_id = step["agent"]
operation = step["operation"]
input_data = step["input"] if "input" in step else {}
# Resolve inputs from previous steps
if "input_from_previous" in step:
path = step["input_from_previous"].split('.')
val = current_context
for p in path:
val = val.get(p, {})
input_data[step["input_key"]] = val
if "source_context_from_previous" in step:
path = step["source_context_from_previous"].split('.')
val = current_context
for p in path:
val = val.get(p, {})
input_data[step["source_context_key"]] = val
try:
client = self._get_capability_client(agent_id)
print(f"Orchestrator Realization: Calling {agent_id} for operation '{operation}' with input: {input_data}")
# Retrieve contract for operation details (e.g., method, path)
contract = self._registry.get_capability_contract(agent_id)
provision = next(
(p for p in contract["provisions"] if p["operation"] == operation),
None
)
if not provision:
raise ValueError(f"Operation '{operation}' not found in contract for '{agent_id}'.")
method = provision["protocol"]["method"]
endpoint_path = provision["protocol"]["endpoint_path"]
if method == "POST":
response = client.post(endpoint_path, input_data)
elif method == "GET":
response = client.get(endpoint_path, input_data) # GET with params
else:
raise ValueError(f"Unsupported method: {method} for {agent_id}:{operation}")
if response.get("status") == "success":
print(f"Orchestrator Realization: {agent_id} completed successfully.")
agent_outputs.append({"output_key": step["output_key"], "output": response})
current_context[step["output_key"]] = response # Update context for subsequent steps
else:
print(f"Orchestrator Realization: {agent_id} failed: {response.get('message', 'Unknown error')}")
# Depending on policy, might retry, skip, or fail whole workflow
return {"status": "error", "message": f"Step '{agent_id}:{operation}' failed: {response.get('message', 'Unknown error')}"}
except Exception as e:
print(f"Orchestrator Realization: Error executing step {agent_id}:{operation}: {e}")
return {"status": "error", "message": f"Workflow failed at step '{agent_id}:{operation}': {str(e)}"}
final_report = self._essence.aggregate_results(agent_outputs, desired_output_format)
return {"status": "success", "final_report": final_report}
# -----------------------------------------------------------------------------
# 3. Adaptation Layer: External Interface (REST API)
# -----------------------------------------------------------------------------
class OrchestratorAgentAdaptation:
"""
Adaptation layer for the OrchestratorAgent.
Provides a REST API endpoint for initiating complex research tasks.
"""
def __init__(self, realization: OrchestratorRealization):
self._realization = realization
self.app = Flask(__name__)
self._setup_routes()
print("Orchestrator Adaptation: OrchestratorAgentAdaptation initialized with Flask app.")
def _setup_routes(self):
@self.app.route("/orchestrate_research", methods=["POST"])
def orchestrate_research_endpoint():
request_data = request.get_json()
main_topic = request_data.get("main_topic")
initial_context = request_data.get("initial_context", {})
desired_output_format = request_data.get("desired_output_format", "brief")
if not main_topic:
return jsonify({"error": "Main topic is required for research orchestration."}), 400
print(f"Orchestrator Adaptation: Received research orchestration request for topic: '{main_topic}' (format: {desired_output_format}).")
try:
response_data = self._realization.orchestrate_research_task(
main_topic, initial_context, desired_output_format
)
if response_data["status"] == "success"]:
return jsonify(response_data), 200
else:
return jsonify(response_data), 500
except Exception as e:
print(f"Orchestrator Adaptation: Unexpected error during research orchestration: {e}")
return jsonify({"status": "error", "message": "Internal Server Error"}), 500
@self.app.route("/health", methods=["GET"])
def health_check():
return jsonify({"status": "healthy", "service": "orchestrator-agent"}), 200
def run(self, host: str = "0.0.0.0", port: int = 8007):
print(f"Orchestrator Adaptation: Starting Orchestrator Agent on {host}:{port}")
self.app.run(host=host, port=port)
# Main entry point for Orchestrator Agent
if __name__ == "__main__":
orchestration_essence = BasicOrchestrationEssence()
registry = CapabilityRegistry() # Get the singleton registry instance
orchestrator_realization = OrchestratorRealization(
essence=orchestration_essence,
capability_registry=registry
)
orchestrator_agent_app = OrchestratorAgentAdaptation(realization=orchestrator_realization)
orchestrator_agent_app.run()
C. Kubernetes Manifests:
To deploy the Cognito Research Agency, we would use Kubernetes manifests for each capability. These manifests would define Deployments, Services, and optionally Ingresses for external access. We would also need PersistentVolumeClaims for LLM models and device plugins for GPU allocation.
Here are illustrative Kubernetes YAMLs for some of the capabilities. Note that semantic-search-tool and knowledge-graph-service are assumed external services for which only a placeholder client exists in the Python code.
File: kubernetes/llm-inference-service-deployment.yaml (as shown previously)
# kubernetes/llm-inference-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-inference-service
labels:
app: llm-inference-service
capability-id: llm-inference-service-v1
spec:
replicas: 1
selector:
matchLabels:
app: llm-inference-service
template:
metadata:
labels:
app: llm-inference-service
capability-id: llm-inference-service-v1
spec:
nodeSelector:
gpu-vendor: nvidia # Or amd, intel, etc.
tolerations:
- key: "nvidia.com/gpu"
operator: "Exists"
effect: "NoSchedule"
containers:
- name: llm-inference-container
image: cognito-registry/ollama-llm-service:v1.0.0 # Your Ollama image with models
ports:
- containerPort: 8001
env:
- name: OLLAMA_SERVICE_URL
value: "http://localhost:11434" # Ollama's internal URL within the container
- name: OLLAMA_HOST
value: "0.0.0.0" # Expose Ollama within the pod
- name: OLLAMA_NUM_GPU
value: "1"
- name: OLLAMA_GPU_ARCH
value: "cuda" # Configured based on target node
- name: OPENAI_API_KEY # For cloud backend
valueFrom:
secretKeyRef:
name: openai-api-key
key: api_key
resources:
requests:
cpu: "2000m"
memory: "16Gi"
nvidia.com/gpu: 1 # Request 1 Nvidia GPU
limits:
cpu: "4000m"
memory: "32Gi"
nvidia.com/gpu: 1
volumeMounts:
- name: ollama-models-storage
mountPath: /root/.ollama
volumes:
- name: ollama-models-storage
persistentVolumeClaim:
claimName: ollama-pvc
---
apiVersion: v1
kind: Service
metadata:
name: llm-inference-service
labels:
app: llm-inference-service
capability-id: llm-inference-service-v1
spec:
selector:
app: llm-inference-service
ports:
- protocol: TCP
port: 8001
targetPort: 8001
type: ClusterIP
File: kubernetes/web-search-tool-deployment.yaml
# kubernetes/web-search-tool-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: web-search-tool
labels:
app: web-search-tool
capability-id: web-search-tool-v1
spec:
replicas: 1
selector:
matchLabels:
app: web-search-tool
template:
metadata:
labels:
app: web-search-tool
capability-id: web-search-tool-v1
spec:
containers:
- name: web-search-tool-container
image: cognito-registry/web-search-tool:v1.0.0
ports:
- containerPort: 8002
env:
- name: MCP_SEARCH_SERVICE_URL # URL for the actual external MCP search service
value: "http://external-mcp-provider.com/api/search"
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
---
apiVersion: v1
kind: Service
metadata:
name: web-search-tool
labels:
app: web-search-tool
capability-id: web-search-tool-v1
spec:
selector:
app: web-search-tool
ports:
- protocol: TCP
port: 8002
targetPort: 8002
type: ClusterIP
File: kubernetes/researcher-agent-v2-deployment.yaml
# kubernetes/researcher-agent-v2-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: researcher-agent-v2
labels:
app: researcher-agent
version: v2
capability-id: researcher-agent-v2
spec:
replicas: 2
selector:
matchLabels:
app: researcher-agent
version: v2
template:
metadata:
labels:
app: researcher-agent
version: v2
capability-id: researcher-agent-v2
spec:
containers:
- name: researcher-agent-container
image: cognito-registry/researcher-agent:v2.0.0
ports:
- containerPort: 8000
env:
- name: LLM_SERVICE_URL
value: "http://llm-inference-service:8001"
- name: WEB_SEARCH_TOOL_URL
value: "http://web-search-tool:8002"
- name: SEMANTIC_SEARCH_TOOL_URL # New dependency for v2
value: "http://semantic-search-tool:8003"
- name: KNOWLEDGE_GRAPH_SERVICE_URL # New dependency for v2
value: "http://knowledge-graph-service:8004"
resources:
requests:
cpu: "750m"
memory: "1.5Gi"
limits:
cpu: "1500m"
memory: "3Gi"
---
apiVersion: v1
kind: Service
metadata:
name: researcher-agent-v2
labels:
app: researcher-agent
version: v2
capability-id: researcher-agent-v2
spec:
selector:
app: researcher-agent
version: v2
ports:
- protocol: TCP
port: 8000
targetPort: 8000
type: ClusterIP
File: kubernetes/summarizer-agent-deployment.yaml
# kubernetes/summarizer-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: summarizer-agent
labels:
app: summarizer-agent
capability-id: summarizer-agent-v1
spec:
replicas: 1
selector:
matchLabels:
app: summarizer-agent
template:
metadata:
labels:
app: summarizer-agent
capability-id: summarizer-agent-v1
spec:
containers:
- name: summarizer-agent-container
image: cognito-registry/summarizer-agent:v1.0.0
ports:
- containerPort: 8005
env:
- name: LLM_SERVICE_URL
value: "http://llm-inference-service:8001"
resources:
requests:
cpu: "300m"
memory: "512Mi"
limits:
cpu: "700m"
memory: "1Gi"
---
apiVersion: v1
kind: Service
metadata:
name: summarizer-agent
labels:
app: summarizer-agent
capability-id: summarizer-agent-v1
spec:
selector:
app: summarizer-agent
ports:
- protocol: TCP
port: 8005
targetPort: 8005
type: ClusterIP
File: kubernetes/critic-agent-deployment.yaml
# kubernetes/critic-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: critic-agent
labels:
app: critic-agent
capability-id: critic-agent-v1
spec:
replicas: 1
selector:
matchLabels:
app: critic-agent
template:
metadata:
labels:
app: critic-agent
capability-id: critic-agent-v1
spec:
containers:
- name: critic-agent-container
image: cognito-registry/critic-agent:v1.0.0
ports:
- containerPort: 8006
env:
- name: LLM_SERVICE_URL
value: "http://llm-inference-service:8001"
resources:
requests:
cpu: "400m"
memory: "768Mi"
limits:
cpu: "900m"
memory: "1.5Gi"
---
apiVersion: v1
kind: Service
metadata:
name: critic-agent
labels:
app: critic-agent
capability-id: critic-agent-v1
spec:
selector:
app: critic-agent
ports:
- protocol: TCP
port: 8006
targetPort: 8006
type: ClusterIP
File: kubernetes/orchestrator-agent-deployment.yaml
# kubernetes/orchestrator-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: orchestrator-agent
labels:
app: orchestrator-agent
capability-id: orchestrator-agent-v1
spec:
replicas: 1 # Orchestrator might be a singleton or scaled carefully
selector:
matchLabels:
app: orchestrator-agent
template:
metadata:
labels:
app: orchestrator-agent
capability-id: orchestrator-agent-v1
spec:
containers:
- name: orchestrator-agent-container
image: cognito-registry/orchestrator-agent:v1.0.0
ports:
- containerPort: 8007
env:
- name: RESEARCHER_AGENT_V2_URL # Orchestrator uses v2
value: "http://researcher-agent-v2:8000"
- name: SUMMARIZER_AGENT_V1_URL
value: "http://summarizer-agent:8005"
- name: CRITIC_AGENT_V1_URL
value: "http://critic-agent:8006"
# Other dependencies like LLM_SERVICE_URL are handled by the agents themselves,
# but could be passed here if the orchestrator needed direct access.
resources:
requests:
cpu: "200m"
memory: "256Mi"
limits:
cpu: "500m"
memory: "512Mi"
---
apiVersion: v1
kind: Service
metadata:
name: orchestrator-agent
labels:
app: orchestrator-agent
capability-id: orchestrator-agent-v1
spec:
selector:
app: orchestrator-agent
ports:
- protocol: TCP
port: 8007
targetPort: 8007
type: ClusterIP
D. Interaction Flow:
To initiate a research task, an external client would send a request to the OrchestratorAgent's /orchestrate_researchendpoint. For example, using curl:
# Example curl command to interact with the Orchestrator Agent
# (Assuming the Orchestrator is exposed via an Ingress or NodePort at localhost:8007 for testing)
curl -X POST http://localhost:8007/orchestrate_research \
-H "Content-Type: application/json" \
-d '{
"main_topic": "The Future of Generative AI in Scientific Discovery",
"initial_context": {"keywords": ["large language models", "scientific research", "automation"]},
"desired_output_format": "critical_analysis"
}'
Upon receiving this request, the OrchestratorAgent would:
- Plan the workflow using its
OrchestrationEssence, determining the sequence of ResearcherAgent, SummarizerAgent, and CriticAgent calls. - Call the
ResearcherAgent-v2 to conduct advanced research on "The Future of Generative AI in Scientific Discovery." The ResearcherAgent would, in turn, use the LLMInferenceService (potentially with high_quality_cloud gradient for complex query formulation and synthesis) and the WebSearchTool (and conceptually, SemanticSearchTool and KnowledgeGraphService). - Receive findings from the
ResearcherAgent. - Call the
SummarizerAgent to condense the comprehensive findings into a manageable summary. The SummarizerAgent would also leverage the LLMInferenceService. - Receive the summary from the
SummarizerAgent. - Call the
CriticAgent to critically evaluate the summarized findings against criteria like accuracy and completeness, using the original research context (e.g., knowledge graph links) for cross-referencing. The CriticAgent would again use the LLMInferenceService. - Receive the evaluation report and score from the
CriticAgent. - Aggregate all results from the agents into a final, polished report using its
OrchestrationEssence. - Return the final research report to the external client.
This intricate dance of capabilities, each performing its specialized role and interacting via well-defined contracts, exemplifies the power and flexibility of the Capability-Centric Architecture.
Conclusion: The Future is Capable!
We have journeyed through the foundational principles of the Capability-Centric Architecture, from the granular structure of the Capability Nucleus to the strategic management of Evolution Envelopes and the robust orchestration provided by the Capability Registry and Kubernetes. Through the lens of the Cognito Research Agency, an LLM-based agentic AI application, we have seen how these concepts translate into practical, production-ready architectural patterns.
CCA empowers us to build systems that are inherently modular, testable, and resilient. By explicitly defining contracts, managing dependencies, and planning for evolution, we create software that can adapt to changing requirements and technological advancements with grace. The ability to implement efficiency gradients allows for fine-tuned performance, while comprehensive resource management ensures optimal utilization of underlying infrastructure, even across diverse GPU architectures.
The future of software development demands architectures that are not just functional but truly capable – capable of evolving, capable of scaling, and capable of delivering sustained value. The Capability-Centric Architecture offers this capability, guiding us toward systems that are robust, flexible, and ready for whatever tomorrow brings. Embrace the capability, and build the future!