Introduction
The vision of autonomous software development has long captivated researchers and practitioners in artificial intelligence and software engineering. TeamAI represents a sophisticated implementation of this vision through a multiagent architecture where specialized AI agents collaborate to design, implement, test, and deliver complete software systems. Unlike monolithic AI assistants that attempt to handle all aspects of development in isolation, TeamAI mirrors the structure of professional software development teams with dedicated roles for coordination, requirements analysis, architecture design, domain modeling, implementation, testing, and quality assurance.
The fundamental premise underlying TeamAI is that complex software development benefits from specialization and collaboration. Just as human development teams assign different responsibilities to product managers, architects, domain experts, developers, and quality assurance engineers, TeamAI distributes these responsibilities across purpose-built agents. Each agent possesses deep expertise in its domain while maintaining the ability to communicate and coordinate with other agents to achieve the overarching goal of delivering production-ready software systems.
This article explores the architecture, implementation, and operational dynamics of TeamAI. We examine how large language models power each agent, how agents communicate through structured message passing, how the system maintains architectural coherence across distributed development efforts, and how the entire system adapts to evolving requirements through an agile feedback mechanism.
Throughout this exploration, we develop a concrete running example: a task management system that demonstrates TeamAI's capabilities in handling realistic software development scenarios.
Architectural Overview
TeamAI employs a hierarchical multiagent architecture organized around specialized roles and clear lines of communication. At the apex sits the Coordinator Agent, which serves as the primary interface between human users and the AI development team. Below the coordinator, specialized agents handle distinct phases and aspects of software development: requirements analysis, domain modeling, architecture design, implementation, testing, and quality review.
The communication structure follows a parent-child pattern where agents can send messages to their immediate parent for clarification and feedback. This bidirectional communication enables the agile adaptation that distinguishes TeamAI from rigid, waterfall-style automation. When a developer agent encounters ambiguity in a specification, it can request clarification from the architecture agent, which may in turn consult the requirements agent or even escalate to the coordinator for user input.
The system architecture supports both local and remote LLM deployment, accommodating various GPU architectures including Intel integrated graphics, AMD ROCm, Apple Metal Performance Shaders, and Nvidia CUDA. This flexibility ensures that TeamAI can operate in diverse computational environments, from developer workstations to cloud infrastructure, without requiring specific hardware dependencies.
Agent Roles and Responsibilities
The Coordinator Agent
The Coordinator Agent serves as the primary interface between users and the development team. When a user initiates a project, they describe their vision, business goals, and fundamental requirements to the coordinator. The coordinator's first responsibility is comprehension: it must extract the essential intent from potentially vague or incomplete descriptions and identify gaps that require clarification.
The coordinator employs sophisticated question generation to elicit missing information. Rather than bombarding users with generic questions, it analyzes the specification to identify architecturally significant decisions that remain unresolved. For instance, if a user requests a task management system without specifying deployment constraints, the coordinator might ask whether the system should support offline operation, what scale of concurrent users is anticipated, or whether integration with existing enterprise systems is required.
Once the coordinator determines that sufficient information has been gathered, it synthesizes a comprehensive specification and forwards it to the Requirements Agent. Throughout the development process, the coordinator remains available to handle evolving requirements. Users can request modifications, additions, or refinements, and the coordinator manages the propagation of these changes through the development team.
The Requirements Agent
The Requirements Agent transforms the coordinator's specification into a detailed requirements document that guides all subsequent development activities. This transformation involves identifying functional requirements, quality attributes, constraints, and assumptions. The requirements agent distinguishes between architecturally significant requirements that shape fundamental design decisions and implementation details that can be deferred to later stages.
The requirements specification produced by this agent includes end-to-end scenarios that illustrate how users will interact with the system to accomplish their goals. These scenarios provide context for developers and testers, ensuring that implementation efforts align with actual usage patterns. The agent also identifies quality attributes such as performance targets, security requirements, scalability expectations, and maintainability considerations.
For our running example of a task management system, the requirements agent might produce a specification that includes scenarios like "A project manager creates a new project, adds team members, and assigns tasks with deadlines and priorities" or "A team member receives notifications when assigned a task and updates task status as work progresses." Quality attributes might specify that the system must support at least one thousand concurrent users, respond to user actions within two hundred milliseconds, and maintain data consistency across distributed deployments.
The Domain Agent
The Domain Agent specializes in understanding and modeling the business domain within which the software system operates. It employs Domain-Driven Design principles to identify bounded contexts, entities, value objects, aggregates, and domain events. This domain model serves as the foundation for the architecture and guides implementation decisions across all developer agents.
Domain-Driven Design recognizes that complex business domains cannot be adequately represented by simplistic data models. Instead, the domain agent identifies the core concepts, their relationships, and the business rules that govern their behavior. It distinguishes between core domains that provide competitive advantage and generic subdomains that can leverage standard solutions.
In our task management example, the domain agent identifies bounded contexts such as Project Management, Task Tracking, User Management, and Notification Services. Within the Task Tracking context, it recognizes entities like Task and Project, value objects like TaskStatus and Priority, and domain events like TaskAssigned and TaskCompleted. The domain model captures business rules such as "A task cannot be marked complete if it has incomplete dependencies" or "Only project members can be assigned tasks within that project."
The Architecture Agent
The Architecture Agent holds the most complex and multifaceted role in TeamAI. It receives the requirements specification and domain model, then synthesizes an architecture baseline that addresses all identified requirements while maintaining conceptual integrity. The architecture agent must balance competing concerns: modularity versus integration, performance versus maintainability, flexibility versus simplicity.
The architecture agent begins by identifying major subsystems aligned with bounded contexts from the domain model. It defines interfaces between subsystems, selects architectural patterns appropriate to the requirements, and establishes coding conventions and design guidelines. As development progresses, the architecture agent coordinates developer agents, assigns implementation tasks, and integrates tactical design decisions into the evolving architecture specification.
One of the architecture agent's critical responsibilities is prioritization. It analyzes requirements to determine implementation order based on dependencies, risk, and business value. High-risk architectural assumptions might be validated early through proof-of-concept implementations. Core functionality that other features depend upon receives priority over peripheral capabilities.
The architecture agent also collaborates with the test agent to develop a risk-based testing strategy. This strategy identifies areas of highest risk such as complex business logic, external integrations, or performance-critical paths and allocates testing resources accordingly. The architecture agent ensures that integration and system tests validate the architecture's ability to satisfy quality attributes.
Throughout development, the architecture agent maintains Architecture Decision Records that document significant design choices, the context in which they were made, alternatives considered, and rationale for the selected approach. These records provide invaluable context for future maintenance and evolution.
For our task management system, the architecture agent might establish a microservices architecture with separate services for project management, task tracking, user management, and notifications. It would define RESTful APIs for inter-service communication, select a message queue for asynchronous event processing, and specify a database-per-service pattern to maintain bounded context isolation. Architecture Decision Records would document choices such as selecting PostgreSQL for transactional data storage, Redis for caching, and RabbitMQ for message queuing.
Developer Agents
Developer Agents receive implementation tasks from the architecture agent and produce working code that satisfies specified requirements while adhering to architectural guidelines. Each developer agent typically owns a subsystem corresponding to a bounded context from the domain model. The agent implements domain entities, business logic, data access, and API endpoints within its assigned subsystem.
Developer agents refine the architecture through tactical design decisions. While the architecture agent establishes strategic direction and subsystem boundaries, developer agents determine class structures, design patterns, and implementation techniques within their subsystems. These tactical decisions are documented and communicated back to the architecture agent for incorporation into the architecture specification.
Coordination between developer agents occurs when subsystems must interact. If the task tracking developer agent needs to verify that a user has permission to modify a task, it must coordinate with the user management developer agent to understand the authentication and authorization interface. This coordination ensures that integration points are well-defined and consistently implemented.
Each developer agent implements comprehensive unit tests for its code. These tests verify that individual components behave correctly in isolation and provide regression protection as the codebase evolves. The developer agent employs test-driven development practices, writing tests before implementation to clarify expected behavior and ensure testability.
In our running example, a developer agent assigned to the task tracking subsystem would implement classes representing tasks and projects, business logic for task assignment and status transitions, a repository for data persistence, and a REST API for external access. Unit tests would verify that business rules are enforced, such as preventing task completion when dependencies remain incomplete.
Test Agents
Test Agents collaborate with the architecture agent to implement integration and system tests that validate the system as a whole. While developer agents verify individual components through unit tests, test agents ensure that subsystems interact correctly and that the complete system satisfies end-to-end requirements.
The test agent works with the architecture agent to develop a risk-based testing strategy that focuses effort on areas of highest risk. This strategy considers factors such as architectural complexity, requirement criticality, and potential failure impact. High-risk areas receive more extensive testing, while low-risk areas may rely primarily on unit tests and basic integration validation.
Integration tests verify that subsystems communicate correctly through their defined interfaces. For our task management system, integration tests might verify that when a task is assigned through the task tracking API, the notification service receives the appropriate event and delivers a notification to the assigned user. System tests validate complete user scenarios, such as a project manager creating a project, adding team members, creating tasks, and assigning them to team members.
The test agent also implements performance tests to validate that the system meets specified quality attributes. Load tests verify that the system can handle the expected number of concurrent users, stress tests identify breaking points, and endurance tests ensure stability over extended operation.
Review Agents
Review Agents provide quality assurance through code and architecture reviews. Developer agents invoke review agents to examine their implementations, while the architecture agent requests reviews of the overall architecture. Review agents analyze code for adherence to coding standards, identification of potential bugs, security vulnerabilities, performance issues, and maintainability concerns.
Architecture reviews assess whether the architecture adequately addresses requirements, whether subsystem boundaries are well-defined, whether architectural patterns are appropriately applied, and whether the design exhibits desirable qualities such as modularity, cohesion, and loose coupling. Review agents provide findings to the requesting agent, which then refines the artifacts based on the feedback.
The review process creates a feedback loop that continuously improves code and architecture quality. Rather than relegating quality assurance to a final validation phase, TeamAI integrates review throughout development, catching issues early when they are less expensive to address.
Communication and Coordination Mechanisms
Effective multiagent collaboration requires robust communication mechanisms that enable agents to exchange information, request clarification, and provide feedback. TeamAI implements a structured message-passing system where messages contain typed content, metadata about the sender and recipient, and context about the conversation thread.
Messages follow a hierarchical routing pattern. When a developer agent needs clarification about a requirement, it sends a message to its parent, the architecture agent.
The architecture agent may be able to resolve the question directly, or it may need to escalate to the requirements agent or even the coordinator. This escalation ensures that questions reach the agent with the appropriate knowledge and authority to answer.
The message structure includes fields for message type, such as specification, clarification request, feedback, or status update. Typed messages enable agents to route and process communications appropriately. A clarification request triggers different processing than a status update, and the message type makes this distinction explicit.
Context preservation is critical for maintaining coherent conversations across multiple message exchanges. Each message includes a thread identifier that links it to previous messages in the conversation. This threading enables agents to understand the full context of a question or feedback, rather than treating each message in isolation.
Here is a simplified representation of the message structure used in TeamAI:
class Message:
def __init__(self, sender, recipient, message_type, content, thread_id=None):
self.sender = sender
self.recipient = recipient
self.message_type = message_type
self.content = content
self.thread_id = thread_id if thread_id else self.generate_thread_id()
self.timestamp = datetime.now()
def generate_thread_id(self):
return f"{self.sender}_{self.recipient}_{int(time.time())}"
The content field contains the substantive information being communicated. For a specification message, content includes the requirements document. For a clarification request, content describes the ambiguity and the information needed to resolve it. For feedback, content provides observations, concerns, or suggestions.
Agents maintain conversation histories that enable them to reference previous exchanges. When the architecture agent receives a clarification request from a developer agent, it can review the original specification it provided, understand what aspects remain unclear, and formulate a response that addresses the specific confusion.
LLM Integration and Inference
At the core of each agent lies a large language model that provides the intelligence necessary to understand specifications, generate code, identify issues, and communicate effectively. TeamAI supports both local and remote LLM deployment, enabling operation in diverse environments with varying computational resources and privacy requirements.
Local LLM deployment runs models directly on the host system, leveraging available GPU acceleration. This approach provides maximum privacy since no data leaves the local environment, enables operation without internet connectivity, and avoids per-request costs associated with cloud API services. However, local deployment requires sufficient computational resources and may limit the size and capability of models that can be effectively utilized.
Remote LLM deployment accesses models through cloud APIs such as OpenAI, Anthropic, or other providers. This approach enables access to the most capable models without requiring local computational resources, simplifies deployment, and ensures access to the latest model versions. However, remote deployment introduces latency, ongoing costs, potential privacy concerns, and dependency on network connectivity.
TeamAI abstracts LLM access through a unified interface that enables seamless switching between local and remote providers. Each agent receives an LLM client configured for the appropriate provider, but the agent code remains independent of the specific implementation.
class LLMClient:
def __init__(self, provider, model_name, device=None):
self.provider = provider
self.model_name = model_name
self.device = device
self.initialize_provider()
def initialize_provider(self):
if self.provider == "local":
self.client = LocalLLMProvider(self.model_name, self.device)
elif self.provider == "openai":
self.client = OpenAIProvider(self.model_name)
elif self.provider == "anthropic":
self.client = AnthropicProvider(self.model_name)
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def generate(self, prompt, max_tokens=2000, temperature=0.7):
return self.client.generate(prompt, max_tokens, temperature)
For local deployment, TeamAI supports multiple GPU architectures through appropriate backend selection. Nvidia CUDA provides the most mature and performant option for Nvidia GPUs. AMD ROCm enables utilization of AMD GPUs. Apple Metal Performance Shaders leverages the unified memory architecture of Apple Silicon. Intel oneAPI supports Intel integrated and discrete GPUs.
The local LLM provider detects available hardware and selects the appropriate backend automatically:
class LocalLLMProvider:
def __init__(self, model_name, device=None):
self.model_name = model_name
self.device = device if device else self.detect_device()
self.load_model()
def detect_device(self):
if torch.cuda.is_available():
return "cuda"
elif torch.backends.mps.is_available():
return "mps"
elif hasattr(torch, 'xpu') and torch.xpu.is_available():
return "xpu"
else:
return "cpu"
def load_model(self):
self.model = AutoModelForCausalLM.from_pretrained(
self.model_name,
device_map=self.device,
torch_dtype=torch.float16 if self.device != "cpu" else torch.float32
)
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
def generate(self, prompt, max_tokens=2000, temperature=0.7):
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
outputs = self.model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=True
)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
Each agent constructs prompts tailored to its specific role and the task at hand. The coordinator agent's prompts focus on extracting user intent and identifying missing information. The requirements agent's prompts emphasize structured analysis and quality attribute identification. Developer agent prompts include architectural guidelines, coding conventions, and the specific implementation task.
Prompt engineering plays a critical role in eliciting high-quality outputs from LLMs. Effective prompts provide clear context, specify the desired output format, include relevant examples, and constrain the response to avoid hallucination or irrelevant content. TeamAI employs prompt templates that encapsulate best practices for each agent role:
class PromptTemplate:
def __init__(self, role, task_type):
self.role = role
self.task_type = task_type
self.template = self.load_template()
def load_template(self):
templates = {
"coordinator": {
"initial_analysis": """You are a software development coordinator. A user has described a system they want built:
{user_description}
Your task is to:
1. Extract the core business goals and requirements
2. Identify any ambiguities or missing information
3. Generate clarifying questions to fill gaps
Provide your analysis in the following format:
GOALS: [list of business goals]
REQUIREMENTS: [list of identified requirements]
QUESTIONS: [list of clarifying questions]""",
},
"developer": {
"implementation": """You are a software developer implementing the {subsystem} subsystem.
ARCHITECTURE GUIDELINES:
{architecture_guidelines}
CODING CONVENTIONS:
{coding_conventions}
TASK:
{task_description}
Implement the required functionality following all guidelines and conventions. Include comprehensive unit tests. Provide your implementation with clear comments explaining design decisions.""",
}
}
return templates[self.role][self.task_type]
def format(self, **kwargs):
return self.template.format(**kwargs)
The Development Workflow
The development workflow in TeamAI follows a structured yet adaptive process that mirrors agile software development practices. The process begins with user interaction and proceeds through requirements analysis, domain modeling, architecture design, iterative implementation, testing, and review. At each stage, feedback loops enable refinement and adaptation.
Initial Specification Phase
The workflow begins when a user describes their desired system to the coordinator agent. This initial description may be high-level and incomplete, focusing on business goals rather than technical details. The coordinator analyzes the description to extract essential information and identify gaps.
For our task management system example, a user might provide this initial description: "I need a system for managing projects and tasks across my development team. Team members should be able to see what they are assigned, update status, and receive notifications. Project managers need visibility into progress and the ability to reassign work."
The coordinator processes this description and identifies several areas requiring clarification. What is the expected team size? Should the system support multiple concurrent projects? Are there integration requirements with existing tools? Does the system need to support time tracking or just task status? Should notifications be real-time or periodic summaries?
The coordinator generates targeted questions and presents them to the user. The user's responses provide additional context that refines the specification. This iterative clarification continues until the coordinator determines that sufficient information exists to proceed.
Requirements Analysis Phase
Once the coordinator has gathered adequate information, it forwards the specification to the requirements agent. The requirements agent performs a systematic analysis to produce a detailed requirements document. This document structures the information into functional requirements, quality attributes, constraints, and assumptions.
Functional requirements describe what the system must do. For the task management system, functional requirements include capabilities such as creating projects, adding team members, creating and assigning tasks, updating task status, and delivering notifications. Each requirement is stated clearly and unambiguously.
Quality attributes specify how well the system must perform its functions. These include performance requirements such as response time and throughput, scalability requirements such as concurrent user support, availability requirements such as uptime targets, security requirements such as authentication and authorization, and maintainability requirements such as code quality and documentation standards.
Constraints identify limitations within which the system must operate. These might include technology constraints such as required programming languages or frameworks, deployment constraints such as cloud platform requirements, or regulatory constraints such as data privacy regulations.
The requirements agent also develops end-to-end scenarios that illustrate how users will interact with the system. These scenarios provide concrete examples that guide implementation and testing. A scenario for the task management system might describe a project manager creating a new project, adding team members with specific roles, creating a hierarchy of tasks with dependencies, and assigning tasks to team members based on their skills and availability.
Domain Modeling Phase
The architecture agent requests domain analysis from the domain agent. The domain agent applies Domain-Driven Design principles to identify bounded contexts, entities, value objects, aggregates, and domain events within the business domain.
Bounded contexts represent distinct areas of the domain with their own models and terminology. For task management, bounded contexts might include Project Management, Task Tracking, User Management, Notification Services, and Reporting. Each bounded context maintains its own model that may represent similar concepts differently based on the context's specific concerns.
Within each bounded context, the domain agent identifies entities that have unique identity and lifecycle. In the Task Tracking context, Task and Project are entities because each has a unique identifier and persists over time with changing state. The domain agent also identifies value objects that represent descriptive aspects without unique identity, such as TaskStatus or Priority.
Aggregates group related entities and value objects into consistency boundaries. The domain agent might define a Task aggregate that includes the task entity along with associated value objects and ensures that business rules are enforced consistently. Aggregate roots serve as the entry point for all operations on the aggregate.
Domain events represent significant occurrences within the domain. The domain agent identifies events such as ProjectCreated, TaskAssigned, TaskStatusChanged, and TaskCompleted. These events enable loose coupling between bounded contexts through event-driven integration.
The domain model produced by the domain agent serves as the foundation for architecture and implementation decisions. It ensures that the software structure aligns with the business domain and that ubiquitous language is maintained throughout the codebase.
Architecture Design Phase
With requirements and domain model in hand, the architecture agent synthesizes an architecture baseline. This baseline defines the major subsystems, their responsibilities, and their interactions. The architecture agent selects architectural patterns appropriate to the requirements and establishes design principles that guide implementation.
For the task management system, the architecture agent might select a microservices architecture that aligns subsystems with bounded contexts. Each microservice owns its data and exposes a well-defined API. Services communicate through synchronous REST APIs for request-response interactions and asynchronous message queues for event-driven integration.
The architecture agent defines the technology stack, selecting programming languages, frameworks, databases, and infrastructure components. For our example, the architecture might specify Python with FastAPI for service implementation, PostgreSQL for relational data storage, Redis for caching and session management, and RabbitMQ for message queuing.
The architecture agent establishes coding conventions that ensure consistency across developer agents. These conventions cover naming standards, code organization, error handling, logging, and documentation. The architecture agent also defines design guidelines such as preferring composition over inheritance, maintaining single responsibility, and minimizing coupling.
Architecture Decision Records document significant design choices. An ADR for the task management system might document the decision to use microservices rather than a monolithic architecture, explaining that the microservices approach provides better scalability, enables independent deployment of services, and aligns with the bounded context structure. The ADR would also note alternatives considered, such as a modular monolith, and explain why microservices were preferred given the requirements.
Iterative Implementation Phase
The architecture agent prioritizes requirements and assigns implementation tasks to developer agents. Prioritization considers dependencies, risk, and business value. Core functionality that other features depend upon receives early implementation. High-risk architectural assumptions are validated through proof-of-concept implementations.
Each developer agent receives a task specification that includes the requirement to implement, relevant portions of the architecture and domain model, coding conventions, and acceptance criteria. The developer agent generates an implementation plan, identifies the classes and modules to create, and begins coding.
As the developer agent works, it may encounter ambiguities or discover that the specification is incomplete or inconsistent. Rather than making assumptions, the developer agent sends a clarification request to the architecture agent. The architecture agent analyzes the question, consults the requirements or domain model if necessary, and provides clarification. If the architecture agent cannot resolve the question, it escalates to the requirements agent or coordinator.
The developer agent implements comprehensive unit tests alongside the production code. These tests verify that individual components behave correctly and provide regression protection. The developer agent follows test-driven development practices, writing tests before implementation to clarify expected behavior.
When the developer agent completes a task, it invokes a review agent to examine the code. The review agent analyzes the implementation for adherence to coding conventions, potential bugs, security vulnerabilities, and maintainability issues. The review agent provides findings, and the developer agent refines the code based on the feedback.
The developer agent documents tactical design decisions and sends this documentation to the architecture agent. The architecture agent integrates these tactical decisions into the evolving architecture specification, maintaining a comprehensive view of the system design.
This iterative process continues as the architecture agent assigns subsequent tasks. Developer agents coordinate with each other when implementing integration points. The architecture agent monitors progress, addresses issues, and ensures that the implementation remains aligned with the architecture.
Integration and Testing Phase
As developer agents complete their subsystems, the test agent begins integration testing. Integration tests verify that subsystems interact correctly through their defined interfaces. For the task management system, integration tests might verify that when a task is created through the Task Tracking service, the Notification service receives the TaskCreated event and delivers notifications to relevant users.
The test agent implements system tests that validate end-to-end scenarios from the requirements specification. These tests exercise the complete system, verifying that it satisfies functional requirements and quality attributes. System tests for our example would validate scenarios such as a project manager creating a project, adding team members, creating tasks, assigning tasks, and team members receiving notifications and updating task status.
Performance testing validates that the system meets specified quality attributes. Load tests verify that the system can handle the expected number of concurrent users with acceptable response times. Stress tests identify breaking points and ensure graceful degradation under extreme load. Endurance tests validate stability over extended operation.
The test agent collaborates with the architecture agent to implement the risk-based testing strategy. High-risk areas such as complex business logic, external integrations, and performance-critical paths receive more extensive testing. Test coverage metrics ensure that critical code paths are thoroughly validated.
Review and Refinement Phase
Throughout development, review agents provide quality assurance through code and architecture reviews. These reviews create feedback loops that continuously improve quality. Rather than relegating quality assurance to a final validation phase, TeamAI integrates review throughout development.
Architecture reviews assess whether the architecture adequately addresses requirements, whether subsystem boundaries are well-defined, and whether the design exhibits desirable qualities such as modularity and loose coupling. The review agent provides findings to the architecture agent, which refines the architecture based on the feedback.
Code reviews examine implementations for adherence to coding standards, potential bugs, security vulnerabilities, and maintainability concerns. Review agents provide findings to developer agents, which refine their code accordingly.
This continuous review and refinement ensures that quality is built in rather than inspected in. Issues are caught early when they are less expensive to address, and the cumulative effect is a higher-quality final product.
Delivery Phase
When all requirements have been implemented and validated, TeamAI delivers the complete system to the user. The delivery includes the fully designed software architecture with comprehensive documentation, the runnable implementation with all source code, comprehensive unit tests, integration and system tests, the risk-based testing strategy, and Architecture Decision Records documenting significant design choices.
The architecture documentation provides a comprehensive view of the system structure, including subsystem responsibilities, interfaces, data models, and design patterns. It includes both strategic architecture decisions made by the architecture agent and tactical design decisions made by developer agents.
The implementation is production-ready code that adheres to coding conventions and design guidelines. It includes comprehensive error handling, logging, and monitoring. Configuration is externalized to enable deployment in different environments.
The test suite provides confidence that the system behaves correctly and continues to do so as it evolves. Unit tests validate individual components, integration tests verify subsystem interactions, and system tests validate end-to-end scenarios. The risk-based testing strategy documents how testing resources were allocated based on risk assessment.
Architecture Decision Records provide invaluable context for future maintenance and evolution. They document why specific design choices were made, what alternatives were considered, and what trade-offs were accepted. This context enables future developers to understand the rationale behind the architecture and make informed decisions about modifications.
Running Example: Complete Implementation
To illustrate the concepts discussed throughout this article, we now present a complete implementation of the task management system developed by TeamAI. This implementation demonstrates how the various agents collaborate to produce a working system that satisfies the specified requirements.
The implementation follows a microservices architecture with separate services for project management, task tracking, user management, and notifications. Each service is implemented in Python using the FastAPI framework, with PostgreSQL for data persistence, Redis for caching, and RabbitMQ for event-driven integration.
Project Structure
The complete system is organized into the following directory structure:
teamai-task-management/
services/
project-service/
app/
models/
repositories/
services/
api/
events/
tests/
main.py
requirements.txt
task-service/
app/
models/
repositories/
services/
api/
events/
tests/
main.py
requirements.txt
user-service/
app/
models/
repositories/
services/
api/
tests/
main.py
requirements.txt
notification-service/
app/
models/
services/
consumers/
tests/
main.py
requirements.txt
shared/
events/
messaging/
database/
infrastructure/
docker-compose.yml
kubernetes/
docs/
architecture/
adr/
Shared Infrastructure Components
Before implementing individual services, we establish shared infrastructure components that all services utilize. These include database utilities, message queue integration, and event definitions.
Database Configuration
# shared/database/config.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://teamai:teamai@localhost:5432/teamai"
)
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""
Dependency function that provides database sessions to API endpoints.
Ensures proper session lifecycle management with automatic cleanup.
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
Message Queue Integration
# shared/messaging/rabbitmq.py
import pika
import json
import os
from typing import Callable
import logging
logger = logging.getLogger(__name__)
class MessageQueue:
"""
Provides abstraction over RabbitMQ for publishing and consuming events.
Supports both direct exchanges for point-to-point messaging and topic
exchanges for publish-subscribe patterns.
"""
def __init__(self):
self.host = os.getenv("RABBITMQ_HOST", "localhost")
self.port = int(os.getenv("RABBITMQ_PORT", "5672"))
self.username = os.getenv("RABBITMQ_USER", "guest")
self.password = os.getenv("RABBITMQ_PASS", "guest")
self.connection = None
self.channel = None
def connect(self):
"""
Establishes connection to RabbitMQ server with credentials.
Creates a channel for subsequent operations.
"""
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
def declare_exchange(self, exchange_name: str, exchange_type: str = "topic"):
"""
Declares an exchange for routing messages. Topic exchanges enable
flexible routing based on routing keys with wildcard matching.
"""
if not self.channel:
self.connect()
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type=exchange_type,
durable=True
)
def publish(self, exchange: str, routing_key: str, message: dict):
"""
Publishes a message to the specified exchange with a routing key.
Messages are serialized to JSON and marked as persistent.
"""
if not self.channel:
self.connect()
self.channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Make message persistent
content_type="application/json"
)
)
logger.info(f"Published message to {exchange}/{routing_key}")
def consume(self, queue_name: str, callback: Callable):
"""
Consumes messages from a queue, invoking the callback for each message.
Automatically acknowledges messages after successful processing.
"""
if not self.channel:
self.connect()
def wrapped_callback(ch, method, properties, body):
try:
message = json.loads(body)
callback(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=wrapped_callback
)
logger.info(f"Starting consumption from queue {queue_name}")
self.channel.start_consuming()
def declare_queue(self, queue_name: str, exchange: str, routing_key: str):
"""
Declares a queue and binds it to an exchange with a routing key.
Enables consumers to receive messages matching the routing pattern.
"""
if not self.channel:
self.connect()
self.channel.queue_declare(queue=queue_name, durable=True)
self.channel.queue_bind(
exchange=exchange,
queue=queue_name,
routing_key=routing_key
)
def close(self):
"""
Closes the channel and connection to RabbitMQ.
Should be called during application shutdown.
"""
if self.channel:
self.channel.close()
if self.connection:
self.connection.close()
Event Definitions
# shared/events/definitions.py
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel
class DomainEvent(BaseModel):
"""
Base class for all domain events. Includes metadata common to all events
such as event identifier, timestamp, and event type.
"""
event_id: str
event_type: str
timestamp: datetime
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class ProjectCreated(DomainEvent):
"""
Published when a new project is created. Contains project details
and the list of initial team members.
"""
project_id: str
name: str
description: Optional[str]
owner_id: str
member_ids: List[str]
class ProjectMemberAdded(DomainEvent):
"""
Published when a member is added to a project. Enables other services
to update their views of project membership.
"""
project_id: str
member_id: str
role: str
class TaskCreated(DomainEvent):
"""
Published when a new task is created. Contains task details including
assignment, priority, and deadline information.
"""
task_id: str
project_id: str
title: str
description: Optional[str]
assigned_to: Optional[str]
priority: str
status: str
due_date: Optional[datetime]
class TaskAssigned(DomainEvent):
"""
Published when a task is assigned to a user. Triggers notification
delivery to inform the assignee.
"""
task_id: str
project_id: str
assigned_to: str
assigned_by: str
task_title: str
class TaskStatusChanged(DomainEvent):
"""
Published when task status changes. Enables tracking of task progress
and triggering of workflow automation.
"""
task_id: str
project_id: str
old_status: str
new_status: str
changed_by: str
class TaskCompleted(DomainEvent):
"""
Published when a task is marked complete. May trigger dependent task
activation or project completion checks.
"""
task_id: str
project_id: str
completed_by: str
completion_date: datetime
User Service Implementation
The User Service manages user accounts, authentication, and authorization. It provides APIs for user registration, login, and profile management.
User Models
# services/user-service/app/models/user.py
from sqlalchemy import Column, String, DateTime, Boolean
from sqlalchemy.sql import func
from shared.database.config import Base
import uuid
class User(Base):
"""
Represents a user account in the system. Stores authentication credentials
and profile information. User IDs are UUIDs to ensure global uniqueness.
"""
__tablename__ = "users"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
email = Column(String, unique=True, nullable=False, index=True)
username = Column(String, unique=True, nullable=False, index=True)
hashed_password = Column(String, nullable=False)
full_name = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<User(id={self.id}, username={self.username}, email={self.email})>"
User Repository
# services/user-service/app/repositories/user_repository.py
from sqlalchemy.orm import Session
from app.models.user import User
from typing import Optional
class UserRepository:
"""
Provides data access operations for User entities. Encapsulates
database queries and ensures consistent data access patterns.
"""
def __init__(self, db: Session):
self.db = db
def create(self, user: User) -> User:
"""
Persists a new user to the database. Returns the created user
with database-generated fields populated.
"""
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def get_by_id(self, user_id: str) -> Optional[User]:
"""
Retrieves a user by their unique identifier. Returns None if
no user exists with the specified ID.
"""
return self.db.query(User).filter(User.id == user_id).first()
def get_by_email(self, email: str) -> Optional[User]:
"""
Retrieves a user by their email address. Used during authentication
to locate the user account.
"""
return self.db.query(User).filter(User.email == email).first()
def get_by_username(self, username: str) -> Optional[User]:
"""
Retrieves a user by their username. Supports alternative login
methods beyond email-based authentication.
"""
return self.db.query(User).filter(User.username == username).first()
def update(self, user: User) -> User:
"""
Updates an existing user in the database. Commits changes and
refreshes the user object with updated values.
"""
self.db.commit()
self.db.refresh(user)
return user
def delete(self, user_id: str) -> bool:
"""
Deletes a user from the database. Returns True if a user was
deleted, False if no user existed with the specified ID.
"""
user = self.get_by_id(user_id)
if user:
self.db.delete(user)
self.db.commit()
return True
return False
Authentication Service
# services/user-service/app/services/auth_service.py
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta
from typing import Optional
import os
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class AuthService:
"""
Provides authentication and authorization services including password
hashing, token generation, and token validation.
"""
@staticmethod
def hash_password(password: str) -> str:
"""
Hashes a plaintext password using bcrypt. The resulting hash
is safe to store in the database.
"""
return pwd_context.hash(password)
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verifies that a plaintext password matches a hashed password.
Used during login to validate user credentials.
"""
return pwd_context.verify(plain_password, hashed_password)
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Creates a JWT access token containing the specified data. The token
expires after the specified duration or a default of 30 minutes.
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@staticmethod
def decode_access_token(token: str) -> Optional[dict]:
"""
Decodes and validates a JWT access token. Returns the token payload
if valid, None if the token is invalid or expired.
"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
User API
# services/user-service/app/api/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel, EmailStr
from app.models.user import User
from app.repositories.user_repository import UserRepository
from app.services.auth_service import AuthService
from shared.database.config import get_db
from typing import Optional
router = APIRouter(prefix="/users", tags=["users"])
class UserCreate(BaseModel):
"""Request model for user registration."""
email: EmailStr
username: str
password: str
full_name: Optional[str] = None
class UserLogin(BaseModel):
"""Request model for user login."""
email: EmailStr
password: str
class UserResponse(BaseModel):
"""Response model for user data."""
id: str
email: str
username: str
full_name: Optional[str]
is_active: bool
class Config:
orm_mode = True
class TokenResponse(BaseModel):
"""Response model for authentication tokens."""
access_token: str
token_type: str
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def register_user(user_data: UserCreate, db: Session = Depends(get_db)):
"""
Registers a new user account. Validates that the email and username
are not already in use, hashes the password, and creates the user.
"""
repo = UserRepository(db)
if repo.get_by_email(user_data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
)
if repo.get_by_username(user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already taken"
)
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=AuthService.hash_password(user_data.password),
full_name=user_data.full_name
)
created_user = repo.create(user)
return created_user
@router.post("/login", response_model=TokenResponse)
def login(credentials: UserLogin, db: Session = Depends(get_db)):
"""
Authenticates a user and returns an access token. Validates credentials
and generates a JWT token for subsequent authenticated requests.
"""
repo = UserRepository(db)
user = repo.get_by_email(credentials.email)
if not user or not AuthService.verify_password(credentials.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is inactive"
)
access_token = AuthService.create_access_token(
data={"sub": user.id, "email": user.email}
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
def get_current_user(token: str, db: Session = Depends(get_db)):
"""
Retrieves the currently authenticated user based on the provided token.
Used by clients to fetch user profile information.
"""
payload = AuthService.decode_access_token(token)
if not payload:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication token"
)
user_id = payload.get("sub")
repo = UserRepository(db)
user = repo.get_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
Project Service Implementation
The Project Service manages projects and project membership. It provides APIs for creating projects, adding members, and querying project information.
Project Models
# services/project-service/app/models/project.py
from sqlalchemy import Column, String, DateTime, Table, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from shared.database.config import Base
import uuid
project_members = Table(
'project_members',
Base.metadata,
Column('project_id', String, ForeignKey('projects.id'), primary_key=True),
Column('user_id', String, primary_key=True),
Column('role', String, nullable=False),
Column('added_at', DateTime(timezone=True), server_default=func.now())
)
class Project(Base):
"""
Represents a project that contains tasks and has team members.
Projects serve as the organizational unit for task management.
"""
__tablename__ = "projects"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String, nullable=False)
description = Column(String)
owner_id = Column(String, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
def __repr__(self):
return f"<Project(id={self.id}, name={self.name}, owner_id={self.owner_id})>"
Project Repository
# services/project-service/app/repositories/project_repository.py
from sqlalchemy.orm import Session
from app.models.project import Project, project_members
from typing import List, Optional
class ProjectRepository:
"""
Provides data access operations for Project entities and project
membership relationships.
"""
def __init__(self, db: Session):
self.db = db
def create(self, project: Project) -> Project:
"""
Persists a new project to the database. Returns the created
project with database-generated fields populated.
"""
self.db.add(project)
self.db.commit()
self.db.refresh(project)
return project
def get_by_id(self, project_id: str) -> Optional[Project]:
"""
Retrieves a project by its unique identifier. Returns None if
no project exists with the specified ID.
"""
return self.db.query(Project).filter(Project.id == project_id).first()
def get_by_owner(self, owner_id: str) -> List[Project]:
"""
Retrieves all projects owned by a specific user. Used to display
a user's projects in the UI.
"""
return self.db.query(Project).filter(Project.owner_id == owner_id).all()
def add_member(self, project_id: str, user_id: str, role: str) -> bool:
"""
Adds a user to a project with the specified role. Returns True
if successful, False if the user is already a member.
"""
existing = self.db.execute(
project_members.select().where(
project_members.c.project_id == project_id,
project_members.c.user_id == user_id
)
).first()
if existing:
return False
self.db.execute(
project_members.insert().values(
project_id=project_id,
user_id=user_id,
role=role
)
)
self.db.commit()
return True
def get_members(self, project_id: str) -> List[dict]:
"""
Retrieves all members of a project along with their roles.
Returns a list of dictionaries containing user IDs and roles.
"""
results = self.db.execute(
project_members.select().where(
project_members.c.project_id == project_id
)
).fetchall()
return [
{"user_id": row.user_id, "role": row.role}
for row in results
]
def is_member(self, project_id: str, user_id: str) -> bool:
"""
Checks whether a user is a member of a project. Used for
authorization checks before allowing project operations.
"""
result = self.db.execute(
project_members.select().where(
project_members.c.project_id == project_id,
project_members.c.user_id == user_id
)
).first()
return result is not None
Project API
# services/project-service/app/api/projects.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.models.project import Project
from app.repositories.project_repository import ProjectRepository
from shared.database.config import get_db
from shared.messaging.rabbitmq import MessageQueue
from shared.events.definitions import ProjectCreated, ProjectMemberAdded
from typing import Optional, List
from datetime import datetime
import uuid
router = APIRouter(prefix="/projects", tags=["projects"])
class ProjectCreate(BaseModel):
"""Request model for project creation."""
name: str
description: Optional[str] = None
member_ids: List[str] = []
class ProjectResponse(BaseModel):
"""Response model for project data."""
id: str
name: str
description: Optional[str]
owner_id: str
created_at: datetime
class Config:
orm_mode = True
class MemberAdd(BaseModel):
"""Request model for adding a member to a project."""
user_id: str
role: str = "member"
@router.post("/", response_model=ProjectResponse, status_code=status.HTTP_201_CREATED)
def create_project(
project_data: ProjectCreate,
owner_id: str,
db: Session = Depends(get_db)
):
"""
Creates a new project owned by the authenticated user. Automatically
adds the owner as a member with admin role and publishes a ProjectCreated
event to notify other services.
"""
repo = ProjectRepository(db)
project = Project(
name=project_data.name,
description=project_data.description,
owner_id=owner_id
)
created_project = repo.create(project)
repo.add_member(created_project.id, owner_id, "admin")
for member_id in project_data.member_ids:
repo.add_member(created_project.id, member_id, "member")
mq = MessageQueue()
mq.connect()
mq.declare_exchange("projects", "topic")
event = ProjectCreated(
event_id=str(uuid.uuid4()),
event_type="ProjectCreated",
timestamp=datetime.utcnow(),
project_id=created_project.id,
name=created_project.name,
description=created_project.description,
owner_id=owner_id,
member_ids=[owner_id] + project_data.member_ids
)
mq.publish("projects", "project.created", event.dict())
mq.close()
return created_project
@router.post("/{project_id}/members", status_code=status.HTTP_201_CREATED)
def add_member(
project_id: str,
member_data: MemberAdd,
current_user_id: str,
db: Session = Depends(get_db)
):
"""
Adds a member to a project. Only project owners and admins can add
members. Publishes a ProjectMemberAdded event to notify other services.
"""
repo = ProjectRepository(db)
project = repo.get_by_id(project_id)
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
if project.owner_id != current_user_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only project owner can add members"
)
success = repo.add_member(project_id, member_data.user_id, member_data.role)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User is already a project member"
)
mq = MessageQueue()
mq.connect()
mq.declare_exchange("projects", "topic")
event = ProjectMemberAdded(
event_id=str(uuid.uuid4()),
event_type="ProjectMemberAdded",
timestamp=datetime.utcnow(),
project_id=project_id,
member_id=member_data.user_id,
role=member_data.role
)
mq.publish("projects", "project.member.added", event.dict())
mq.close()
return {"message": "Member added successfully"}
@router.get("/{project_id}", response_model=ProjectResponse)
def get_project(
project_id: str,
current_user_id: str,
db: Session = Depends(get_db)
):
"""
Retrieves project details. Only project members can view project
information to maintain privacy and access control.
"""
repo = ProjectRepository(db)
project = repo.get_by_id(project_id)
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Project not found"
)
if not repo.is_member(project_id, current_user_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access denied"
)
return project
Task Service Implementation
The Task Service manages tasks within projects. It provides APIs for creating tasks, assigning tasks, updating task status, and querying task information.
Task Models
# services/task-service/app/models/task.py
from sqlalchemy import Column, String, DateTime, Enum as SQLEnum
from sqlalchemy.sql import func
from shared.database.config import Base
import uuid
import enum
class TaskStatus(str, enum.Enum):
"""Enumeration of possible task statuses."""
TODO = "todo"
IN_PROGRESS = "in_progress"
BLOCKED = "blocked"
COMPLETED = "completed"
class TaskPriority(str, enum.Enum):
"""Enumeration of task priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class Task(Base):
"""
Represents a task within a project. Tasks have status, priority,
assignment, and deadline information.
"""
__tablename__ = "tasks"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
project_id = Column(String, nullable=False, index=True)
title = Column(String, nullable=False)
description = Column(String)
status = Column(SQLEnum(TaskStatus), default=TaskStatus.TODO, nullable=False)
priority = Column(SQLEnum(TaskPriority), default=TaskPriority.MEDIUM, nullable=False)
assigned_to = Column(String, index=True)
created_by = Column(String, nullable=False)
due_date = Column(DateTime(timezone=True))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
completed_at = Column(DateTime(timezone=True))
def __repr__(self):
return f"<Task(id={self.id}, title={self.title}, status={self.status})>"
Task Repository
# services/task-service/app/repositories/task_repository.py
from sqlalchemy.orm import Session
from app.models.task import Task, TaskStatus
from typing import List, Optional
class TaskRepository:
"""
Provides data access operations for Task entities. Supports querying
tasks by various criteria such as project, assignee, and status.
"""
def __init__(self, db: Session):
self.db = db
def create(self, task: Task) -> Task:
"""
Persists a new task to the database. Returns the created task
with database-generated fields populated.
"""
self.db.add(task)
self.db.commit()
self.db.refresh(task)
return task
def get_by_id(self, task_id: str) -> Optional[Task]:
"""
Retrieves a task by its unique identifier. Returns None if
no task exists with the specified ID.
"""
return self.db.query(Task).filter(Task.id == task_id).first()
def get_by_project(self, project_id: str) -> List[Task]:
"""
Retrieves all tasks belonging to a specific project. Used to
display project task lists in the UI.
"""
return self.db.query(Task).filter(Task.project_id == project_id).all()
def get_by_assignee(self, user_id: str) -> List[Task]:
"""
Retrieves all tasks assigned to a specific user. Used to display
a user's task list across all projects.
"""
return self.db.query(Task).filter(Task.assigned_to == user_id).all()
def update(self, task: Task) -> Task:
"""
Updates an existing task in the database. Commits changes and
refreshes the task object with updated values.
"""
self.db.commit()
self.db.refresh(task)
return task
def delete(self, task_id: str) -> bool:
"""
Deletes a task from the database. Returns True if a task was
deleted, False if no task existed with the specified ID.
"""
task = self.get_by_id(task_id)
if task:
self.db.delete(task)
self.db.commit()
return True
return False
Task Service Business Logic
# services/task-service/app/services/task_service.py
from app.models.task import Task, TaskStatus
from app.repositories.task_repository import TaskRepository
from shared.messaging.rabbitmq import MessageQueue
from shared.events.definitions import TaskCreated, TaskAssigned, TaskStatusChanged, TaskCompleted
from datetime import datetime
import uuid
class TaskService:
"""
Implements business logic for task operations. Enforces business rules
and publishes domain events to notify other services of task changes.
"""
def __init__(self, repository: TaskRepository):
self.repository = repository
self.message_queue = MessageQueue()
def create_task(self, task: Task) -> Task:
"""
Creates a new task and publishes a TaskCreated event. If the task
is assigned to a user, also publishes a TaskAssigned event.
"""
created_task = self.repository.create(task)
self.message_queue.connect()
self.message_queue.declare_exchange("tasks", "topic")
event = TaskCreated(
event_id=str(uuid.uuid4()),
event_type="TaskCreated",
timestamp=datetime.utcnow(),
task_id=created_task.id,
project_id=created_task.project_id,
title=created_task.title,
description=created_task.description,
assigned_to=created_task.assigned_to,
priority=created_task.priority.value,
status=created_task.status.value,
due_date=created_task.due_date
)
self.message_queue.publish("tasks", "task.created", event.dict())
if created_task.assigned_to:
assign_event = TaskAssigned(
event_id=str(uuid.uuid4()),
event_type="TaskAssigned",
timestamp=datetime.utcnow(),
task_id=created_task.id,
project_id=created_task.project_id,
assigned_to=created_task.assigned_to,
assigned_by=created_task.created_by,
task_title=created_task.title
)
self.message_queue.publish("tasks", "task.assigned", assign_event.dict())
self.message_queue.close()
return created_task
def update_status(self, task_id: str, new_status: TaskStatus, user_id: str) -> Task:
"""
Updates task status and publishes appropriate events. If the new
status is completed, also publishes a TaskCompleted event.
"""
task = self.repository.get_by_id(task_id)
if not task:
raise ValueError("Task not found")
old_status = task.status
task.status = new_status
if new_status == TaskStatus.COMPLETED:
task.completed_at = datetime.utcnow()
updated_task = self.repository.update(task)
self.message_queue.connect()
self.message_queue.declare_exchange("tasks", "topic")
status_event = TaskStatusChanged(
event_id=str(uuid.uuid4()),
event_type="TaskStatusChanged",
timestamp=datetime.utcnow(),
task_id=task_id,
project_id=task.project_id,
old_status=old_status.value,
new_status=new_status.value,
changed_by=user_id
)
self.message_queue.publish("tasks", "task.status.changed", status_event.dict())
if new_status == TaskStatus.COMPLETED:
complete_event = TaskCompleted(
event_id=str(uuid.uuid4()),
event_type="TaskCompleted",
timestamp=datetime.utcnow(),
task_id=task_id,
project_id=task.project_id,
completed_by=user_id,
completion_date=task.completed_at
)
self.message_queue.publish("tasks", "task.completed", complete_event.dict())
self.message_queue.close()
return updated_task
Task API
# services/task-service/app/api/tasks.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.models.task import Task, TaskStatus, TaskPriority
from app.repositories.task_repository import TaskRepository
from app.services.task_service import TaskService
from shared.database.config import get_db
from typing import Optional, List
from datetime import datetime
router = APIRouter(prefix="/tasks", tags=["tasks"])
class TaskCreate(BaseModel):
"""Request model for task creation."""
project_id: str
title: str
description: Optional[str] = None
assigned_to: Optional[str] = None
priority: TaskPriority = TaskPriority.MEDIUM
due_date: Optional[datetime] = None
class TaskUpdate(BaseModel):
"""Request model for task updates."""
title: Optional[str] = None
description: Optional[str] = None
status: Optional[TaskStatus] = None
priority: Optional[TaskPriority] = None
assigned_to: Optional[str] = None
due_date: Optional[datetime] = None
class TaskResponse(BaseModel):
"""Response model for task data."""
id: str
project_id: str
title: str
description: Optional[str]
status: TaskStatus
priority: TaskPriority
assigned_to: Optional[str]
created_by: str
due_date: Optional[datetime]
created_at: datetime
completed_at: Optional[datetime]
class Config:
orm_mode = True
@router.post("/", response_model=TaskResponse, status_code=status.HTTP_201_CREATED)
def create_task(
task_data: TaskCreate,
current_user_id: str,
db: Session = Depends(get_db)
):
"""
Creates a new task in a project. The authenticated user becomes the
task creator. Publishes events to notify other services.
"""
task = Task(
project_id=task_data.project_id,
title=task_data.title,
description=task_data.description,
assigned_to=task_data.assigned_to,
priority=task_data.priority,
created_by=current_user_id,
due_date=task_data.due_date
)
repo = TaskRepository(db)
service = TaskService(repo)
created_task = service.create_task(task)
return created_task
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(
task_id: str,
db: Session = Depends(get_db)
):
"""
Retrieves task details by task ID. Returns 404 if the task does not exist.
"""
repo = TaskRepository(db)
task = repo.get_by_id(task_id)
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Task not found"
)
return task
@router.get("/project/{project_id}", response_model=List[TaskResponse])
def get_project_tasks(
project_id: str,
db: Session = Depends(get_db)
):
"""
Retrieves all tasks for a specific project. Used to display project
task lists in the UI.
"""
repo = TaskRepository(db)
tasks = repo.get_by_project(project_id)
return tasks
@router.get("/assigned/{user_id}", response_model=List[TaskResponse])
def get_user_tasks(
user_id: str,
db: Session = Depends(get_db)
):
"""
Retrieves all tasks assigned to a specific user across all projects.
Used to display a user's personal task list.
"""
repo = TaskRepository(db)
tasks = repo.get_by_assignee(user_id)
return tasks
@router.patch("/{task_id}/status")
def update_task_status(
task_id: str,
new_status: TaskStatus,
current_user_id: str,
db: Session = Depends(get_db)
):
"""
Updates the status of a task. Publishes events to notify other services
of the status change and task completion if applicable.
"""
repo = TaskRepository(db)
service = TaskService(repo)
try:
updated_task = service.update_status(task_id, new_status, current_user_id)
return {"message": "Task status updated", "task": TaskResponse.from_orm(updated_task)}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)
)
Notification Service Implementation
The Notification Service consumes events from other services and delivers notifications to users. It listens for task assignment and status change events.
Notification Consumer
# services/notification-service/app/consumers/event_consumer.py
from shared.messaging.rabbitmq import MessageQueue
from app.services.notification_service import NotificationService
import logging
logger = logging.getLogger(__name__)
class EventConsumer:
"""
Consumes domain events from the message queue and triggers appropriate
notification delivery based on event type.
"""
def __init__(self):
self.message_queue = MessageQueue()
self.notification_service = NotificationService()
def start(self):
"""
Starts consuming events from the task exchange. Declares the queue
and binds it to relevant routing keys for task events.
"""
self.message_queue.connect()
self.message_queue.declare_exchange("tasks", "topic")
self.message_queue.declare_queue(
"notification_queue",
"tasks",
"task.#"
)
logger.info("Starting event consumption for notifications")
self.message_queue.consume("notification_queue", self.handle_event)
def handle_event(self, event: dict):
"""
Routes events to appropriate handlers based on event type.
Processes task assignment and status change events.
"""
event_type = event.get("event_type")
logger.info(f"Received event: {event_type}")
if event_type == "TaskAssigned":
self.handle_task_assigned(event)
elif event_type == "TaskStatusChanged":
self.handle_task_status_changed(event)
elif event_type == "TaskCompleted":
self.handle_task_completed(event)
def handle_task_assigned(self, event: dict):
"""
Handles TaskAssigned events by sending a notification to the
assigned user informing them of the new task.
"""
user_id = event.get("assigned_to")
task_title = event.get("task_title")
assigned_by = event.get("assigned_by")
message = f"You have been assigned a new task: {task_title}"
self.notification_service.send_notification(
user_id=user_id,
message=message,
notification_type="task_assigned"
)
def handle_task_status_changed(self, event: dict):
"""
Handles TaskStatusChanged events. Could notify task creator or
project manager of status updates.
"""
task_id = event.get("task_id")
new_status = event.get("new_status")
changed_by = event.get("changed_by")
logger.info(f"Task {task_id} status changed to {new_status} by {changed_by}")
def handle_task_completed(self, event: dict):
"""
Handles TaskCompleted events by notifying relevant stakeholders
that a task has been completed.
"""
task_id = event.get("task_id")
completed_by = event.get("completed_by")
logger.info(f"Task {task_id} completed by {completed_by}")
Notification Service
# services/notification-service/app/services/notification_service.py
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
class NotificationService:
"""
Handles delivery of notifications to users. In a production system,
this would integrate with email services, push notification services,
or in-app notification systems.
"""
def send_notification(self, user_id: str, message: str, notification_type: str):
"""
Sends a notification to a user. This implementation logs the
notification. A production system would deliver via email, SMS,
push notification, or in-app notification.
"""
logger.info(
f"[NOTIFICATION] Type: {notification_type}, "
f"User: {user_id}, Message: {message}, "
f"Time: {datetime.utcnow().isoformat()}"
)
# In production, integrate with notification delivery services:
# - Email: SendGrid, Amazon SES, Mailgun
# - Push: Firebase Cloud Messaging, Apple Push Notification Service
# - SMS: Twilio, Amazon SNS
# - In-app: WebSocket connections, Server-Sent Events
Main Application Entry Points
Each service requires a main application file that initializes FastAPI, registers routes, and handles startup and shutdown.
User Service Main
# services/user-service/main.py
from fastapi import FastAPI
from app.api import users
from shared.database.config import Base, engine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="User Service", version="1.0.0")
@app.on_event("startup")
def startup():
"""
Creates database tables on application startup. In production,
use proper database migration tools like Alembic.
"""
logger.info("Creating database tables")
Base.metadata.create_all(bind=engine)
logger.info("User Service started")
@app.on_event("shutdown")
def shutdown():
"""
Cleanup on application shutdown.
"""
logger.info("User Service shutting down")
app.include_router(users.router)
@app.get("/health")
def health_check():
"""Health check endpoint for monitoring and load balancers."""
return {"status": "healthy", "service": "user-service"}
Project Service Main
# services/project-service/main.py
from fastapi import FastAPI
from app.api import projects
from shared.database.config import Base, engine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Project Service", version="1.0.0")
@app.on_event("startup")
def startup():
"""
Creates database tables on application startup.
"""
logger.info("Creating database tables")
Base.metadata.create_all(bind=engine)
logger.info("Project Service started")
@app.on_event("shutdown")
def shutdown():
"""
Cleanup on application shutdown.
"""
logger.info("Project Service shutting down")
app.include_router(projects.router)
@app.get("/health")
def health_check():
"""Health check endpoint for monitoring and load balancers."""
return {"status": "healthy", "service": "project-service"}
Task Service Main
# services/task-service/main.py
from fastapi import FastAPI
from app.api import tasks
from shared.database.config import Base, engine
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Task Service", version="1.0.0")
@app.on_event("startup")
def startup():
"""
Creates database tables on application startup.
"""
logger.info("Creating database tables")
Base.metadata.create_all(bind=engine)
logger.info("Task Service started")
@app.on_event("shutdown")
def shutdown():
"""
Cleanup on application shutdown.
"""
logger.info("Task Service shutting down")
app.include_router(tasks.router)
@app.get("/health")
def health_check():
"""Health check endpoint for monitoring and load balancers."""
return {"status": "healthy", "service": "task-service"}
Notification Service Main
# services/notification-service/main.py
from app.consumers.event_consumer import EventConsumer
import logging
import signal
import sys
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
consumer = None
def signal_handler(sig, frame):
"""
Handles shutdown signals gracefully, closing message queue connections.
"""
logger.info("Shutdown signal received")
if consumer:
consumer.message_queue.close()
sys.exit(0)
def main():
"""
Main entry point for the notification service. Starts consuming events
from the message queue.
"""
global consumer
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info("Starting Notification Service")
consumer = EventConsumer()
consumer.start()
if __name__ == "__main__":
main()
Infrastructure Configuration
The system requires infrastructure for databases, message queues, and service orchestration. Docker Compose provides a convenient way to run all components locally.
# infrastructure/docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:14
environment:
POSTGRES_USER: teamai
POSTGRES_PASSWORD: teamai
POSTGRES_DB: teamai
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- "6379:6379"
volumes:
- redis_data:/data
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
user-service:
build: ../services/user-service
environment:
DATABASE_URL: postgresql://teamai:teamai@postgres:5432/teamai
JWT_SECRET_KEY: your-secret-key-change-in-production
ports:
- "8001:8000"
depends_on:
- postgres
project-service:
build: ../services/project-service
environment:
DATABASE_URL: postgresql://teamai:teamai@postgres:5432/teamai
RABBITMQ_HOST: rabbitmq
ports:
- "8002:8000"
depends_on:
- postgres
- rabbitmq
task-service:
build: ../services/task-service
environment:
DATABASE_URL: postgresql://teamai:teamai@postgres:5432/teamai
RABBITMQ_HOST: rabbitmq
ports:
- "8003:8000"
depends_on:
- postgres
- rabbitmq
notification-service:
build: ../services/notification-service
environment:
RABBITMQ_HOST: rabbitmq
depends_on:
- rabbitmq
volumes:
postgres_data:
redis_data:
rabbitmq_data:
Testing Strategy and Implementation
TeamAI implements a comprehensive testing strategy that includes unit tests, integration tests, and system tests. The test agent collaborates with the architecture agent to develop a risk-based approach that focuses testing effort on areas of highest risk.
Unit Testing
Each developer agent implements unit tests for the code it produces. Unit tests verify that individual components behave correctly in isolation. For the task management system, unit tests verify business logic such as task status transitions, validation rules, and data transformations.
# services/task-service/tests/test_task_service.py
import pytest
from unittest.mock import Mock, MagicMock
from app.models.task import Task, TaskStatus, TaskPriority
from app.repositories.task_repository import TaskRepository
from app.services.task_service import TaskService
from datetime import datetime
class TestTaskService:
"""
Unit tests for TaskService business logic. Tests verify that business
rules are enforced and events are published correctly.
"""
def setup_method(self):
"""
Sets up test fixtures before each test method. Creates mock
repository and service instance.
"""
self.mock_repo = Mock(spec=TaskRepository)
self.service = TaskService(self.mock_repo)
self.service.message_queue = Mock()
def test_create_task_publishes_created_event(self):
"""
Verifies that creating a task publishes a TaskCreated event
to the message queue.
"""
task = Task(
project_id="project-1",
title="Test Task",
created_by="user-1",
priority=TaskPriority.HIGH
)
self.mock_repo.create.return_value = task
created_task = self.service.create_task(task)
assert created_task == task
self.mock_repo.create.assert_called_once_with(task)
self.service.message_queue.publish.assert_called()
def test_create_assigned_task_publishes_assignment_event(self):
"""
Verifies that creating a task with an assignee publishes both
TaskCreated and TaskAssigned events.
"""
task = Task(
project_id="project-1",
title="Test Task",
created_by="user-1",
assigned_to="user-2",
priority=TaskPriority.HIGH
)
self.mock_repo.create.return_value = task
created_task = self.service.create_task(task)
assert self.service.message_queue.publish.call_count == 2
def test_update_status_to_completed_sets_completion_date(self):
"""
Verifies that updating task status to completed sets the
completion timestamp.
"""
task = Task(
id="task-1",
project_id="project-1",
title="Test Task",
created_by="user-1",
status=TaskStatus.IN_PROGRESS
)
self.mock_repo.get_by_id.return_value = task
self.mock_repo.update.return_value = task
updated_task = self.service.update_status("task-1", TaskStatus.COMPLETED, "user-1")
assert updated_task.status == TaskStatus.COMPLETED
assert updated_task.completed_at is not None
def test_update_status_publishes_status_changed_event(self):
"""
Verifies that updating task status publishes a TaskStatusChanged
event to the message queue.
"""
task = Task(
id="task-1",
project_id="project-1",
title="Test Task",
created_by="user-1",
status=TaskStatus.TODO
)
self.mock_repo.get_by_id.return_value = task
self.mock_repo.update.return_value = task
self.service.update_status("task-1", TaskStatus.IN_PROGRESS, "user-1")
self.service.message_queue.publish.assert_called()
Integration Testing
Integration tests verify that services interact correctly through their defined interfaces. For the task management system, integration tests verify that events published by one service are correctly consumed by other services.
# tests/integration/test_task_notification_integration.py
import pytest
import time
from services.task_service.app.models.task import Task, TaskPriority
from services.task_service.app.repositories.task_repository import TaskRepository
from services.task_service.app.services.task_service import TaskService
from services.notification_service.app.consumers.event_consumer import EventConsumer
from shared.database.config import SessionLocal
class TestTaskNotificationIntegration:
"""
Integration tests verifying that task events are correctly consumed
by the notification service and result in notification delivery.
"""
def setup_method(self):
"""
Sets up test environment with real database and message queue
connections. Starts notification consumer in background.
"""
self.db = SessionLocal()
self.repo = TaskRepository(self.db)
self.service = TaskService(self.repo)
def teardown_method(self):
"""
Cleans up test environment, closing database connections.
"""
self.db.close()
def test_task_assignment_triggers_notification(self):
"""
Verifies that assigning a task results in a notification being
sent to the assigned user.
"""
task = Task(
project_id="project-1",
title="Integration Test Task",
created_by="user-1",
assigned_to="user-2",
priority=TaskPriority.HIGH
)
created_task = self.service.create_task(task)
time.sleep(1)
# In a real test, verify notification was logged or delivered
# This would require access to notification service logs or database
System Testing
System tests validate end-to-end scenarios from the requirements specification. These tests exercise the complete system through its external APIs, verifying that functional requirements and quality attributes are satisfied.
# tests/system/test_project_workflow.py
import pytest
import requests
from datetime import datetime, timedelta
class TestProjectWorkflow:
"""
System tests validating complete user workflows through the system.
Tests exercise multiple services and verify end-to-end functionality.
"""
def setup_method(self):
"""
Sets up test environment with base URLs for all services.
"""
self.user_service_url = "http://localhost:8001"
self.project_service_url = "http://localhost:8002"
self.task_service_url = "http://localhost:8003"
def test_complete_project_creation_and_task_assignment_workflow(self):
"""
Tests the complete workflow of a project manager creating a project,
adding team members, creating tasks, and assigning them to members.
"""
# Register project manager
manager_data = {
"email": "manager@example.com",
"username": "manager",
"password": "password123",
"full_name": "Project Manager"
}
response = requests.post(
f"{self.user_service_url}/users/register",
json=manager_data
)
assert response.status_code == 201
manager = response.json()
# Login as manager
login_data = {
"email": "manager@example.com",
"password": "password123"
}
response = requests.post(
f"{self.user_service_url}/users/login",
json=login_data
)
assert response.status_code == 200
token = response.json()["access_token"]
# Register team member
member_data = {
"email": "developer@example.com",
"username": "developer",
"password": "password123",
"full_name": "Developer"
}
response = requests.post(
f"{self.user_service_url}/users/register",
json=member_data
)
assert response.status_code == 201
member = response.json()
# Create project
project_data = {
"name": "Test Project",
"description": "A test project for system testing",
"member_ids": [member["id"]]
}
response = requests.post(
f"{self.project_service_url}/projects/",
json=project_data,
params={"owner_id": manager["id"]}
)
assert response.status_code == 201
project = response.json()
# Create task
task_data = {
"project_id": project["id"],
"title": "Implement feature X",
"description": "Detailed description of feature X",
"assigned_to": member["id"],
"priority": "high",
"due_date": (datetime.now() + timedelta(days=7)).isoformat()
}
response = requests.post(
f"{self.task_service_url}/tasks/",
json=task_data,
params={"current_user_id": manager["id"]}
)
assert response.status_code == 201
task = response.json()
# Verify task was created correctly
assert task["title"] == "Implement feature X"
assert task["assigned_to"] == member["id"]
assert task["status"] == "todo"
# Update task status
response = requests.patch(
f"{self.task_service_url}/tasks/{task['id']}/status",
params={
"new_status": "in_progress",
"current_user_id": member["id"]
}
)
assert response.status_code == 200
# Verify task status was updated
response = requests.get(f"{self.task_service_url}/tasks/{task['id']}")
assert response.status_code == 200
updated_task = response.json()
assert updated_task["status"] == "in_progress"
Architecture Decision Records
Architecture Decision Records document significant design choices made during the development of TeamAI and the task management system. These records provide context for future maintenance and evolution.
ADR 001: Microservices Architecture
Context: The task management system requires modularity, independent scalability of components, and alignment with bounded contexts from the domain model.
Decision: Implement a microservices architecture with separate services for user management, project management, task tracking, and notifications.
Rationale: Microservices provide several advantages for this system. Each service can be developed, deployed, and scaled independently. Service boundaries align with bounded contexts from Domain-Driven Design, maintaining conceptual integrity. Different services can use different data storage technologies optimized for their specific needs. The architecture supports team autonomy, allowing different developer agents to work on different services without coordination overhead.
Alternatives Considered: A modular monolith was considered as an alternative. While simpler to deploy and operate, a monolith would not provide the independent scalability and deployment flexibility required for the system. A monolith would also make it more difficult to maintain bounded context isolation.
Consequences: Microservices introduce operational complexity including service discovery, inter-service communication, distributed transaction management, and monitoring across multiple services. The system requires infrastructure for message queuing and API gateways. However, these costs are justified by the benefits of modularity, scalability, and alignment with the domain model.
ADR 002: Event-Driven Integration
Context: Services need to communicate and maintain consistency without tight coupling. Changes in one service should trigger appropriate responses in other services.
Decision: Use event-driven integration through a message queue for asynchronous communication between services. Services publish domain events when significant state changes occur, and other services consume these events to update their own state.
Rationale: Event-driven integration provides loose coupling between services. Services do not need to know about their consumers, only about the events they publish. This approach supports eventual consistency, which is acceptable for most operations in the task management domain. Events provide an audit trail of system activity and enable future functionality such as analytics and workflow automation.
Alternatives Considered: Synchronous REST API calls between services were considered. While simpler to implement initially, synchronous calls create tight coupling and make services dependent on each other's availability. Synchronous integration also makes it difficult to add new consumers of events without modifying existing services.
Consequences: Event-driven integration introduces eventual consistency, meaning that different services may have slightly different views of system state at any given moment. This requires careful design of business processes to handle eventual consistency appropriately. The system requires infrastructure for message queuing and event schema management.
ADR 003: Database Per Service
Context: Each microservice needs to persist data, and services should be independently deployable and scalable.
Decision: Each service owns its database and no other service directly accesses that database. Services communicate only through APIs and events.
Rationale: Database per service maintains bounded context isolation and prevents tight coupling through shared databases. Each service can choose the database technology best suited to its needs. Services can be scaled independently based on their specific data access patterns. Database schema changes in one service do not affect other services.
Alternatives Considered: A shared database across all services was considered. While simpler to implement and easier to maintain consistency, a shared database creates tight coupling and makes independent deployment and scaling difficult. Shared databases also make it harder to maintain bounded context boundaries.
Consequences: Database per service requires careful design of inter-service communication and consistency management. Queries that span multiple services require either API composition or eventual consistency through events. Distributed transactions are avoided in favor of eventual consistency and compensating transactions.
Conclusion
TeamAI demonstrates how multiagent AI systems can collaborate to develop complete software systems. By distributing responsibilities across specialized agents that mirror professional development teams, TeamAI achieves both specialization and coordination. The coordinator agent interfaces with users, the requirements agent analyzes specifications, the domain agent models the business domain, the architecture agent designs the system structure, developer agents implement subsystems, test agents validate functionality, and review agents ensure quality.
The running example of a task management system illustrates how these agents collaborate to produce a working microservices architecture with user management, project management, task tracking, and notification services. The implementation demonstrates event-driven integration, domain-driven design, comprehensive testing, and production-ready code quality.
The architecture supports both local and remote LLM deployment, enabling operation in diverse computational environments. Support for multiple GPU architectures including Intel, AMD ROCm, Apple MPS, and Nvidia CUDA ensures that TeamAI can leverage available hardware acceleration.
The agile feedback mechanisms built into TeamAI enable continuous refinement as agents request clarification, provide feedback, and adapt to evolving requirements. This adaptability distinguishes TeamAI from rigid automation and enables it to handle the complexity and ambiguity inherent in real-world software development.
Architecture Decision Records document the rationale behind significant design choices, providing invaluable context for future maintenance and evolution. The comprehensive testing strategy ensures that the system behaves correctly and continues to do so as it evolves.
TeamAI represents a significant step toward autonomous software development, demonstrating that AI agents can collaborate effectively to design, implement, test, and deliver production-ready software systems. As large language models continue to improve in capability, systems like TeamAI will become increasingly powerful tools for software development, augmenting human developers and enabling new levels of productivity and innovation.
No comments:
Post a Comment