Understanding the Intersection of Graph Theory and Software Architecture
Software systems grow in complexity over time, and understanding their internal structure becomes increasingly challenging. When we examine a large codebase, we often face questions about how components relate to each other, which modules are most critical, and whether architectural principles are being violated. Graph theory provides us with powerful mathematical tools to answer these questions systematically.
A dependency graph represents your codebase as a network where nodes represent code entities such as classes, modules, or packages, and edges represent dependencies between them. When class A imports or uses class B, we draw a directed edge from A to B. This simple representation unlocks a wealth of analytical possibilities through graph algorithms that have been refined over decades of mathematical research.
The beauty of applying graph algorithms to software architecture lies in their objectivity. Unlike manual code reviews, which can be subjective and incomplete, graph algorithms examine every relationship in your codebase systematically. They reveal patterns that might be invisible to human reviewers, especially in large systems where no single developer understands the entire architecture.
Representing Code Dependencies as Graphs
Before we can apply graph algorithms, we need to understand how to represent code dependencies mathematically. In graph theory terms, we work with a directed graph G = (V, E) where V is the set of vertices representing code entities and E is the set of directed edges representing dependencies.
The choice of what constitutes a node depends on the granularity of analysis you need. For high-level architectural analysis, nodes might represent entire packages or modules. For detailed design analysis, nodes might represent individual classes or even functions. The edge direction is crucial because dependencies are not symmetric. If module A depends on module B, this does not imply that B depends on A.
Let us begin with a practical example using NetworkX, a powerful Python library for graph analysis. NetworkX provides an intuitive API and implements dozens of graph algorithms efficiently. We start by creating a simple dependency graph representing a hypothetical e-commerce system.
import networkx as nx
import matplotlib.pyplot as plt
def create_sample_dependency_graph():
"""
Creates a directed graph representing dependencies in an e-commerce system.
Each node represents a module, and each edge represents a dependency
relationship where the source depends on the target.
Returns:
nx.DiGraph: A directed graph of module dependencies
"""
# Initialize an empty directed graph
dependency_graph = nx.DiGraph()
# Add nodes representing different modules in the system
modules = [
'UserInterface',
'OrderService',
'PaymentService',
'InventoryService',
'NotificationService',
'Database',
'Logger',
'ConfigManager'
]
dependency_graph.add_nodes_from(modules)
# Define dependencies as (dependent, dependency) pairs
# For example, ('UserInterface', 'OrderService') means
# UserInterface depends on OrderService
dependencies = [
('UserInterface', 'OrderService'),
('UserInterface', 'Logger'),
('OrderService', 'PaymentService'),
('OrderService', 'InventoryService'),
('OrderService', 'NotificationService'),
('OrderService', 'Database'),
('OrderService', 'Logger'),
('PaymentService', 'Database'),
('PaymentService', 'Logger'),
('InventoryService', 'Database'),
('NotificationService', 'ConfigManager'),
('NotificationService', 'Logger')
]
dependency_graph.add_edges_from(dependencies)
return dependency_graph
This function constructs a directed graph that mirrors a typical layered architecture. The UserInterface sits at the top, depending on services below it. The services themselves depend on infrastructure components like Database and Logger. This hierarchical structure is common in well-designed systems, but as we will see, graph algorithms can help us verify whether our actual codebase maintains this structure or has degraded over time.
Detecting Circular Dependencies Through Cycle Detection
One of the most common architectural problems in software systems is circular dependencies, where module A depends on module B, which depends on module C, which eventually depends back on A. These cycles create tight coupling, make testing difficult, and can lead to initialization problems. Graph theory provides us with efficient algorithms to detect all cycles in a dependency graph.
The presence of cycles in a directed graph can be detected using depth-first search. NetworkX provides several functions for cycle detection, each suited to different scenarios. The simplest check determines whether any cycles exist at all, while more sophisticated algorithms enumerate all cycles or find the shortest cycles.
def analyze_circular_dependencies(graph):
"""
Analyzes a dependency graph for circular dependencies and reports
detailed information about any cycles found.
Args:
graph (nx.DiGraph): The dependency graph to analyze
Returns:
dict: Analysis results including cycle presence and details
"""
analysis_results = {
'has_cycles': False,
'cycle_count': 0,
'cycles': [],
'strongly_connected_components': []
}
# Check if the graph contains any cycles
# A directed acyclic graph (DAG) has no cycles
analysis_results['has_cycles'] = not nx.is_directed_acyclic_graph(graph)
if analysis_results['has_cycles']:
# Find all simple cycles in the graph
# A simple cycle visits each node at most once
try:
cycles = list(nx.simple_cycles(graph))
analysis_results['cycles'] = cycles
analysis_results['cycle_count'] = len(cycles)
# Find strongly connected components
# These are maximal sets of nodes where every node
# can reach every other node in the set
sccs = list(nx.strongly_connected_components(graph))
# Filter out trivial components with single nodes
non_trivial_sccs = [scc for scc in sccs if len(scc) > 1]
analysis_results['strongly_connected_components'] = non_trivial_sccs
except nx.NetworkXNoCycle:
# This should not happen if has_cycles is True,
# but we handle it defensively
pass
return analysis_results
def report_circular_dependencies(analysis_results):
"""
Generates a human-readable report of circular dependency analysis.
Args:
analysis_results (dict): Results from analyze_circular_dependencies
"""
if not analysis_results['has_cycles']:
print("EXCELLENT: No circular dependencies detected.")
print("The dependency graph is a directed acyclic graph (DAG).")
return
print("WARNING: Circular dependencies detected!")
print(f"Total number of cycles found: {analysis_results['cycle_count']}")
print()
print("Individual cycles:")
for idx, cycle in enumerate(analysis_results['cycles'], 1):
# Format the cycle as A -> B -> C -> A
cycle_path = ' -> '.join(cycle + [cycle[0]])
print(f" Cycle {idx}: {cycle_path}")
print()
print("Strongly connected components:")
for idx, scc in enumerate(analysis_results['strongly_connected_components'], 1):
print(f" Component {idx}: {', '.join(sorted(scc))}")
The distinction between individual cycles and strongly connected components is important for architectural analysis. A strongly connected component is a maximal set of modules where every module can reach every other module through some path. If you have a strongly connected component with ten modules, there might be hundreds of individual cycles within it. For architectural remediation, it is often more practical to focus on breaking apart strongly connected components rather than eliminating individual cycles one by one.
When you discover circular dependencies, the next question is how to break them. Graph algorithms can help here too by identifying the minimum set of edges whose removal would eliminate all cycles. This is known as the feedback arc set problem, and while finding the absolute minimum is computationally hard, good approximations exist.
Discovering Architectural Layers Through Topological Sorting
A well-designed software system typically exhibits a layered architecture where higher-level modules depend on lower-level modules, but not vice versa. This creates a clear hierarchy that makes the system easier to understand, test, and modify. Topological sorting is the graph algorithm that reveals this layering structure.
A topological sort of a directed acyclic graph produces a linear ordering of nodes such that for every edge from node A to node B, A appears before B in the ordering. In architectural terms, this means modules appear in an order where each module only depends on modules that come later in the sequence. If your graph has cycles, topological sorting is impossible, which is why cycle detection is a prerequisite for layering analysis.
def identify_architectural_layers(graph):
"""
Identifies architectural layers in a dependency graph using
topological sorting and level assignment.
Args:
graph (nx.DiGraph): The dependency graph to analyze
Returns:
dict: Layer assignments and related metrics
"""
# First verify the graph is a DAG
if not nx.is_directed_acyclic_graph(graph):
raise ValueError(
"Cannot identify layers in a graph with circular dependencies. "
"Please resolve cycles first."
)
# Compute the longest path from each node to any leaf node
# Nodes with no outgoing edges (dependencies) are at layer 0
# Nodes depending on layer 0 nodes are at layer 1, and so on
layers = {}
# Get a topological ordering of nodes
topo_order = list(nx.topological_sort(graph))
# Process nodes in reverse topological order
# This ensures we process dependencies before dependents
for node in reversed(topo_order):
# Get all nodes this node depends on
dependencies = list(graph.successors(node))
if not dependencies:
# No dependencies means this is a leaf node (layer 0)
layers[node] = 0
else:
# Layer is one more than the maximum layer of dependencies
max_dependency_layer = max(layers[dep] for dep in dependencies)
layers[node] = max_dependency_layer + 1
# Organize nodes by layer for easier reporting
layers_by_level = {}
for node, layer in layers.items():
if layer not in layers_by_level:
layers_by_level[layer] = []
layers_by_level[layer].append(node)
return {
'node_layers': layers,
'layers_by_level': layers_by_level,
'total_layers': max(layers.values()) + 1 if layers else 0,
'topological_order': topo_order
}
def report_architectural_layers(layer_info):
"""
Generates a report showing the architectural layering of the system.
Args:
layer_info (dict): Results from identify_architectural_layers
"""
print("ARCHITECTURAL LAYER ANALYSIS")
print("=" * 60)
print(f"Total number of layers: {layer_info['total_layers']}")
print()
# Report layers from highest (most dependent) to lowest (most foundational)
max_layer = layer_info['total_layers'] - 1
for layer_num in range(max_layer, -1, -1):
nodes = layer_info['layers_by_level'].get(layer_num, [])
print(f"Layer {layer_num} ({len(nodes)} modules):")
for node in sorted(nodes):
print(f" - {node}")
print()
print("Interpretation:")
print(" Layer 0: Foundation modules with no dependencies")
print(" Higher layers: Modules that depend on lower layers")
print(" A well-layered architecture shows clear separation")
The layer assignment algorithm works by computing the longest path from each node to a leaf node. Leaf nodes, which have no dependencies, form the foundation layer. Nodes that depend only on foundation layer nodes form the next layer up, and so on. This creates a natural stratification of your architecture.
When you run this analysis on a real codebase, you might discover that your system has more layers than you expected, or that certain modules are at unexpected layers. For example, if a supposedly high-level business logic module appears in a low layer, it might indicate that the module has too few dependencies and is not actually integrating the system as intended. Conversely, if an infrastructure module appears in a high layer, it might have inappropriate dependencies on business logic.
Identifying Critical Components With Centrality Measures
Not all modules in a system are equally important. Some modules are central to the architecture, with many other modules depending on them. Others are peripheral, with few connections. Graph centrality measures quantify this importance in various ways, each capturing a different aspect of what makes a module critical.
Degree centrality is the simplest measure, counting how many other modules connect to a given module. In a dependency graph, we typically care about in-degree, which counts how many modules depend on a given module. High in-degree indicates a module that many others rely on, making it a critical dependency.
Betweenness centrality measures how often a module appears on the shortest path between other modules. A module with high betweenness acts as a bridge in the architecture, and its failure or modification could disrupt communication between many parts of the system.
PageRank, originally developed for ranking web pages, can be applied to dependency graphs to identify influential modules. It considers not just how many modules depend on a given module, but also the importance of those dependent modules. A module depended upon by other important modules receives a higher PageRank score.
def analyze_component_criticality(graph):
"""
Analyzes the criticality of components using multiple centrality measures.
Args:
graph (nx.DiGraph): The dependency graph to analyze
Returns:
dict: Centrality scores for each node under different metrics
"""
criticality_metrics = {}
# In-degree centrality: how many modules depend on this one
# Higher values indicate modules that are widely depended upon
in_degree_cent = nx.in_degree_centrality(graph)
criticality_metrics['in_degree'] = in_degree_cent
# Out-degree centrality: how many modules this one depends on
# Higher values indicate modules with many dependencies
out_degree_cent = nx.out_degree_centrality(graph)
criticality_metrics['out_degree'] = out_degree_cent
# Betweenness centrality: how often this module appears on
# shortest paths between other modules
betweenness_cent = nx.betweenness_centrality(graph)
criticality_metrics['betweenness'] = betweenness_cent
# PageRank: importance based on the importance of dependents
pagerank_scores = nx.pagerank(graph)
criticality_metrics['pagerank'] = pagerank_scores
# Closeness centrality: how close this module is to all others
# Note: for directed graphs, this considers outgoing edges
try:
closeness_cent = nx.closeness_centrality(graph)
criticality_metrics['closeness'] = closeness_cent
except:
# Closeness centrality may fail for disconnected graphs
criticality_metrics['closeness'] = {}
return criticality_metrics
def identify_critical_components(criticality_metrics, top_n=5):
"""
Identifies the most critical components based on centrality metrics.
Args:
criticality_metrics (dict): Results from analyze_component_criticality
top_n (int): Number of top components to report for each metric
Returns:
dict: Top components for each centrality measure
"""
critical_components = {}
for metric_name, scores in criticality_metrics.items():
# Sort nodes by score in descending order
sorted_nodes = sorted(
scores.items(),
key=lambda x: x[1],
reverse=True
)
# Take the top N
critical_components[metric_name] = sorted_nodes[:top_n]
return critical_components
def report_critical_components(critical_components):
"""
Generates a detailed report of critical components.
Args:
critical_components (dict): Results from identify_critical_components
"""
print("CRITICAL COMPONENT ANALYSIS")
print("=" * 60)
print()
metric_descriptions = {
'in_degree': 'Most Depended Upon (High In-Degree)',
'out_degree': 'Most Dependent (High Out-Degree)',
'betweenness': 'Most Central Bridges (High Betweenness)',
'pagerank': 'Most Influential (High PageRank)',
'closeness': 'Most Accessible (High Closeness)'
}
for metric, description in metric_descriptions.items():
if metric not in critical_components:
continue
print(f"{description}:")
components = critical_components[metric]
for rank, (node, score) in enumerate(components, 1):
print(f" {rank}. {node}: {score:.4f}")
print()
print("Interpretation:")
print(" High in-degree: Core infrastructure components")
print(" High out-degree: Complex integrators, potential refactoring targets")
print(" High betweenness: Architectural bottlenecks or key mediators")
print(" High PageRank: Foundational components with important dependents")
When you analyze centrality in a real codebase, you often discover surprising results. A module you thought was central might have low centrality scores, indicating it is actually peripheral to the architecture. Conversely, a seemingly minor utility module might have very high in-degree centrality because it is used throughout the system.
High betweenness centrality deserves special attention because it indicates architectural bottlenecks. If a module has high betweenness, many paths through the dependency graph pass through it. This makes it a single point of failure and a potential performance bottleneck. In a well-designed architecture, you might want to reduce betweenness by creating alternative paths or splitting the module into smaller, more focused components.
Detecting Modularity Issues Through Community Detection
A modular architecture groups related components together while minimizing dependencies between groups. Graph theory provides community detection algorithms that can automatically identify these natural groupings in your dependency graph. By comparing the detected communities with your intended module structure, you can identify modularity violations.
Community detection algorithms seek to partition the graph into groups where nodes within a group are densely connected to each other but sparsely connected to nodes in other groups. The Louvain method is one of the most popular algorithms for this purpose, optimizing a measure called modularity that quantifies the quality of a partition.
import community as community_louvain # python-louvain package
def detect_architectural_communities(graph):
"""
Detects natural communities in the dependency graph using
the Louvain method for community detection.
Args:
graph (nx.DiGraph): The dependency graph to analyze
Returns:
dict: Community assignments and modularity metrics
"""
# The Louvain algorithm works on undirected graphs,
# so we convert our directed graph to undirected
# This treats dependencies as bidirectional relationships
undirected_graph = graph.to_undirected()
# Apply the Louvain community detection algorithm
# This returns a dictionary mapping nodes to community IDs
partition = community_louvain.best_partition(undirected_graph)
# Calculate the modularity score of this partition
# Modularity ranges from -1 to 1, with higher values indicating
# stronger community structure
modularity_score = community_louvain.modularity(
partition,
undirected_graph
)
# Organize nodes by community for easier analysis
communities = {}
for node, community_id in partition.items():
if community_id not in communities:
communities[community_id] = []
communities[community_id].append(node)
# Calculate inter-community and intra-community edge counts
inter_community_edges = 0
intra_community_edges = 0
for source, target in graph.edges():
if partition[source] == partition[target]:
intra_community_edges += 1
else:
inter_community_edges += 1
return {
'partition': partition,
'communities': communities,
'num_communities': len(communities),
'modularity_score': modularity_score,
'inter_community_edges': inter_community_edges,
'intra_community_edges': intra_community_edges
}
def compare_with_intended_architecture(detected_communities, intended_modules):
"""
Compares detected communities with the intended module structure
to identify architectural violations.
Args:
detected_communities (dict): Results from detect_architectural_communities
intended_modules (dict): Mapping of nodes to intended module names
Returns:
dict: Analysis of alignment between detected and intended structure
"""
partition = detected_communities['partition']
# For each intended module, see which communities its members belong to
module_community_distribution = {}
for node, intended_module in intended_modules.items():
if node not in partition:
continue
detected_community = partition[node]
if intended_module not in module_community_distribution:
module_community_distribution[intended_module] = {}
if detected_community not in module_community_distribution[intended_module]:
module_community_distribution[intended_module][detected_community] = 0
module_community_distribution[intended_module][detected_community] += 1
# Identify modules that are split across multiple communities
split_modules = {}
for module, community_dist in module_community_distribution.items():
if len(community_dist) > 1:
split_modules[module] = community_dist
# Identify communities that contain members from multiple intended modules
community_module_distribution = {}
for node, intended_module in intended_modules.items():
if node not in partition:
continue
detected_community = partition[node]
if detected_community not in community_module_distribution:
community_module_distribution[detected_community] = {}
if intended_module not in community_module_distribution[detected_community]:
community_module_distribution[detected_community][intended_module] = 0
community_module_distribution[detected_community][intended_module] += 1
mixed_communities = {}
for community, module_dist in community_module_distribution.items():
if len(module_dist) > 1:
mixed_communities[community] = module_dist
return {
'split_modules': split_modules,
'mixed_communities': mixed_communities,
'alignment_score': calculate_alignment_score(
module_community_distribution,
community_module_distribution
)
}
def calculate_alignment_score(module_community_dist, community_module_dist):
"""
Calculates a score representing how well detected communities
align with intended modules.
Returns:
float: Score between 0 and 1, where 1 is perfect alignment
"""
# A simple alignment score based on purity
total_nodes = sum(
sum(counts.values())
for counts in module_community_dist.values()
)
if total_nodes == 0:
return 0.0
# Count nodes that are in the majority community for their module
aligned_nodes = 0
for module, community_dist in module_community_dist.items():
if community_dist:
aligned_nodes += max(community_dist.values())
return aligned_nodes / total_nodes
Community detection reveals the natural clustering in your dependency graph based purely on the structure of dependencies. When this natural clustering differs significantly from your intended module structure, it indicates architectural drift. Perhaps a module that was designed to be cohesive has become fragmented, with its components more strongly connected to other modules than to each other. Or perhaps components from different modules have become so intertwined that they form a single community in practice.
The modularity score itself is informative. A score close to zero suggests that your dependency graph has no clear community structure, which might indicate either a highly integrated system or a chaotic architecture. A high modularity score indicates strong community structure, which is generally desirable but should align with your intended architecture.
Finding Architectural Smells Through Pattern Matching
Beyond general graph metrics, we can search for specific patterns that indicate architectural problems. These patterns, often called architectural smells, include god classes that depend on or are depended upon by too many other components, unstable interfaces that change frequently and affect many dependents, and feature envy where a component depends more heavily on another module than on its own.
Graph pattern matching allows us to detect these smells systematically. We can search for subgraphs that match specific patterns, such as hub-and-spoke structures indicating god classes, or long dependency chains that might indicate unnecessary indirection.
def detect_god_components(graph, threshold_ratio=0.3):
"""
Detects potential god components that have connections to
an unusually large proportion of the system.
Args:
graph (nx.DiGraph): The dependency graph to analyze
threshold_ratio (float): Minimum ratio of connections to total nodes
Returns:
list: Components identified as potential god components
"""
total_nodes = graph.number_of_nodes()
god_components = []
for node in graph.nodes():
# Count total connections (both in and out)
in_degree = graph.in_degree(node)
out_degree = graph.out_degree(node)
total_connections = in_degree + out_degree
# Calculate ratio of connections to total possible connections
connection_ratio = total_connections / (total_nodes - 1)
if connection_ratio >= threshold_ratio:
god_components.append({
'node': node,
'in_degree': in_degree,
'out_degree': out_degree,
'total_connections': total_connections,
'connection_ratio': connection_ratio
})
return sorted(
god_components,
key=lambda x: x['connection_ratio'],
reverse=True
)
def detect_unstable_dependencies(graph, stability_threshold=0.7):
"""
Detects components with unstable dependency structures using
the instability metric I = Ce / (Ca + Ce) where:
- Ce (efferent coupling) is the number of outgoing dependencies
- Ca (afferent coupling) is the number of incoming dependencies
High instability (close to 1) means the component depends on many
others but few depend on it, making it potentially unstable.
Args:
graph (nx.DiGraph): The dependency graph to analyze
stability_threshold (float): Minimum instability to report
Returns:
list: Components with high instability
"""
unstable_components = []
for node in graph.nodes():
ca = graph.in_degree(node) # Afferent coupling
ce = graph.out_degree(node) # Efferent coupling
# Avoid division by zero
if ca + ce == 0:
instability = 0
else:
instability = ce / (ca + ce)
if instability >= stability_threshold:
unstable_components.append({
'node': node,
'afferent_coupling': ca,
'efferent_coupling': ce,
'instability': instability
})
return sorted(
unstable_components,
key=lambda x: x['instability'],
reverse=True
)
def detect_dependency_chains(graph, min_chain_length=4):
"""
Detects long dependency chains that might indicate unnecessary
indirection or deep inheritance hierarchies.
Args:
graph (nx.DiGraph): The dependency graph to analyze
min_chain_length (int): Minimum length to report
Returns:
list: Long dependency chains found
"""
long_chains = []
# For each node, find the longest simple path starting from it
for source in graph.nodes():
# Find all simple paths from this source
# We limit the search to avoid exponential explosion
for target in graph.nodes():
if source == target:
continue
try:
# Find all simple paths between source and target
paths = nx.all_simple_paths(
graph,
source,
target,
cutoff=min_chain_length + 2
)
for path in paths:
if len(path) >= min_chain_length:
long_chains.append({
'path': path,
'length': len(path),
'source': source,
'target': target
})
except nx.NetworkXNoPath:
continue
# Remove duplicate chains and sort by length
unique_chains = []
seen_paths = set()
for chain in long_chains:
path_tuple = tuple(chain['path'])
if path_tuple not in seen_paths:
seen_paths.add(path_tuple)
unique_chains.append(chain)
return sorted(
unique_chains,
key=lambda x: x['length'],
reverse=True
)
def report_architectural_smells(graph):
"""
Comprehensive report of architectural smells detected in the graph.
Args:
graph (nx.DiGraph): The dependency graph to analyze
"""
print("ARCHITECTURAL SMELL DETECTION")
print("=" * 60)
print()
# Detect god components
god_components = detect_god_components(graph)
if god_components:
print("GOD COMPONENTS DETECTED:")
print("These components have connections to an unusually large")
print("proportion of the system, indicating potential violations")
print("of the Single Responsibility Principle.")
print()
for gc in god_components:
print(f" {gc['node']}:")
print(f" Incoming dependencies: {gc['in_degree']}")
print(f" Outgoing dependencies: {gc['out_degree']}")
print(f" Connection ratio: {gc['connection_ratio']:.2%}")
print()
else:
print("No god components detected.")
print()
# Detect unstable dependencies
unstable = detect_unstable_dependencies(graph)
if unstable:
print("UNSTABLE COMPONENTS DETECTED:")
print("These components have high instability metrics, meaning")
print("they depend on many others but few depend on them.")
print()
for uc in unstable:
print(f" {uc['node']}:")
print(f" Instability: {uc['instability']:.2f}")
print(f" Depends on: {uc['efferent_coupling']} components")
print(f" Depended upon by: {uc['afferent_coupling']} components")
print()
else:
print("No highly unstable components detected.")
print()
# Detect long dependency chains
chains = detect_dependency_chains(graph)
if chains:
print("LONG DEPENDENCY CHAINS DETECTED:")
print("These chains might indicate unnecessary indirection")
print("or overly complex dependency structures.")
print()
for idx, chain in enumerate(chains[:5], 1): # Show top 5
path_str = ' -> '.join(chain['path'])
print(f" Chain {idx} (length {chain['length']}):")
print(f" {path_str}")
print()
else:
print("No excessively long dependency chains detected.")
print()
These pattern-based analyses complement the more general graph metrics. While centrality measures tell you which components are important, smell detection tells you which components might be problematic. A component with high centrality is not necessarily bad, but a god component with high centrality is a serious architectural concern that should be addressed.
The instability metric deserves special explanation because it captures an important architectural principle. Components that depend on many others but are depended upon by few are unstable because changes to their dependencies can force them to change, but these changes do not affect other parts of the system. Conversely, stable components are depended upon by many but depend on few. In a well-designed architecture, unstable components should depend on stable ones, not the other way around. This is known as the Stable Dependencies Principle.
Performance Considerations for Large Codebases
When analyzing real-world codebases, you may encounter graphs with thousands or tens of thousands of nodes. Some graph algorithms have polynomial or even exponential time complexity, making them impractical for large graphs. Understanding the computational complexity of different algorithms helps you choose the right tools for your analysis.
Cycle detection using depth-first search runs in linear time relative to the number of nodes and edges, making it practical even for very large graphs. Topological sorting also runs in linear time. Centrality measures vary in complexity. Degree centrality is trivial to compute, while betweenness centrality has cubic time complexity in the worst case, though optimized algorithms can do better.
Community detection algorithms like Louvain are designed to scale to large networks and typically run in near-linear time. However, finding all simple cycles or all simple paths can be exponential in the worst case, so these operations should be used carefully on large graphs.
import time
from functools import wraps
def measure_performance(func):
"""
Decorator to measure the execution time of graph analysis functions.
Args:
func: The function to measure
Returns:
Wrapped function that prints execution time
"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
print(f"Function {func.__name__} executed in {execution_time:.4f} seconds")
return result
return wrapper
@measure_performance
def scalable_cycle_detection(graph):
"""
Efficient cycle detection suitable for large graphs.
Uses DFS-based approach with linear time complexity.
Args:
graph (nx.DiGraph): The dependency graph to analyze
Returns:
bool: True if cycles exist, False otherwise
"""
# This is much faster than enumerating all cycles
return not nx.is_directed_acyclic_graph(graph)
@measure_performance
def scalable_centrality_analysis(graph, sample_size=None):
"""
Computes centrality measures with options for approximation
on large graphs.
Args:
graph (nx.DiGraph): The dependency graph to analyze
sample_size (int): If provided, use sampling for betweenness
Returns:
dict: Centrality measures
"""
centrality = {}
# Degree centrality is always fast
centrality['in_degree'] = nx.in_degree_centrality(graph)
centrality['out_degree'] = nx.out_degree_centrality(graph)
# PageRank is reasonably fast even for large graphs
centrality['pagerank'] = nx.pagerank(graph, max_iter=100)
# Betweenness can be slow, so we might approximate
if sample_size and graph.number_of_nodes() > sample_size:
# Use approximate betweenness based on sampling
centrality['betweenness'] = nx.betweenness_centrality(
graph,
k=sample_size # Sample k nodes for path calculations
)
else:
centrality['betweenness'] = nx.betweenness_centrality(graph)
return centrality
def create_graph_summary(graph):
"""
Creates a high-level summary of graph properties without
expensive computations.
Args:
graph (nx.DiGraph): The dependency graph to analyze
Returns:
dict: Summary statistics
"""
summary = {
'num_nodes': graph.number_of_nodes(),
'num_edges': graph.number_of_edges(),
'density': nx.density(graph),
'is_dag': nx.is_directed_acyclic_graph(graph)
}
# Calculate average degree
if summary['num_nodes'] > 0:
total_in_degree = sum(d for n, d in graph.in_degree())
total_out_degree = sum(d for n, d in graph.out_degree())
summary['avg_in_degree'] = total_in_degree / summary['num_nodes']
summary['avg_out_degree'] = total_out_degree / summary['num_nodes']
# Check if graph is weakly connected
summary['is_weakly_connected'] = nx.is_weakly_connected(graph)
if summary['is_weakly_connected']:
summary['num_components'] = 1
else:
summary['num_components'] = nx.number_weakly_connected_components(graph)
return summary
For very large codebases, you might need to analyze the graph at different levels of granularity. Start with a high-level view where nodes represent packages or modules rather than individual classes. This gives you a smaller graph that you can analyze completely. Then, for areas of concern identified in the high-level analysis, you can drill down to a more detailed class-level or function-level graph.
Another strategy for large graphs is to focus on subgraphs. If you are concerned about a particular module, extract the subgraph containing that module and its immediate neighbors. This local analysis can reveal problems that might be obscured in the full graph.
Integrating Graph Analysis Into Development Workflows
The true value of graph-based architectural analysis comes from integrating it into your regular development workflow. Rather than performing analysis as a one-time exercise, you should track architectural metrics over time and set up automated checks to prevent architectural degradation.
You can incorporate graph analysis into your continuous integration pipeline. After each commit, extract the dependency graph from your codebase and compute key metrics. If metrics exceed certain thresholds, such as the number of circular dependencies increasing or the modularity score decreasing significantly, the build can fail or generate warnings.
import json
import os
from datetime import datetime
class ArchitecturalMetricsTracker:
"""
Tracks architectural metrics over time and detects degradation.
"""
def __init__(self, metrics_file='architectural_metrics.json'):
"""
Initialize the tracker with a file to store historical metrics.
Args:
metrics_file (str): Path to JSON file storing metrics history
"""
self.metrics_file = metrics_file
self.history = self.load_history()
def load_history(self):
"""
Loads historical metrics from file.
Returns:
list: Historical metrics records
"""
if os.path.exists(self.metrics_file):
with open(self.metrics_file, 'r') as f:
return json.load(f)
return []
def save_history(self):
"""
Saves metrics history to file.
"""
with open(self.metrics_file, 'w') as f:
json.dump(self.history, f, indent=2)
def record_metrics(self, graph, commit_hash=None):
"""
Records current architectural metrics.
Args:
graph (nx.DiGraph): Current dependency graph
commit_hash (str): Optional commit identifier
Returns:
dict: Recorded metrics
"""
# Compute comprehensive metrics
metrics = {
'timestamp': datetime.now().isoformat(),
'commit_hash': commit_hash,
'num_nodes': graph.number_of_nodes(),
'num_edges': graph.number_of_edges(),
'density': nx.density(graph),
'is_dag': nx.is_directed_acyclic_graph(graph)
}
# Add cycle information if cycles exist
if not metrics['is_dag']:
try:
cycles = list(nx.simple_cycles(graph))
metrics['num_cycles'] = len(cycles)
sccs = [scc for scc in nx.strongly_connected_components(graph)
if len(scc) > 1]
metrics['num_scc'] = len(sccs)
except:
metrics['num_cycles'] = -1
metrics['num_scc'] = -1
else:
metrics['num_cycles'] = 0
metrics['num_scc'] = 0
# Compute layering metrics for DAGs
layer_info = identify_architectural_layers(graph)
metrics['num_layers'] = layer_info['total_layers']
# Compute modularity
try:
undirected = graph.to_undirected()
partition = community_louvain.best_partition(undirected)
metrics['modularity'] = community_louvain.modularity(
partition,
undirected
)
metrics['num_communities'] = len(set(partition.values()))
except:
metrics['modularity'] = None
metrics['num_communities'] = None
# Add to history
self.history.append(metrics)
self.save_history()
return metrics
def detect_degradation(self, current_metrics, threshold=0.1):
"""
Detects architectural degradation by comparing current metrics
with historical trends.
Args:
current_metrics (dict): Current metrics
threshold (float): Threshold for significant change
Returns:
dict: Degradation warnings
"""
if len(self.history) < 2:
return {'warnings': [], 'status': 'insufficient_history'}
warnings = []
previous_metrics = self.history[-2]
# Check for new cycles
if current_metrics['num_cycles'] > previous_metrics['num_cycles']:
warnings.append({
'type': 'new_cycles',
'message': f"Cycle count increased from {previous_metrics['num_cycles']} "
f"to {current_metrics['num_cycles']}",
'severity': 'high'
})
# Check for modularity degradation
if (current_metrics['modularity'] is not None and
previous_metrics['modularity'] is not None):
modularity_change = (current_metrics['modularity'] -
previous_metrics['modularity'])
if modularity_change < -threshold:
warnings.append({
'type': 'modularity_degradation',
'message': f"Modularity decreased by {abs(modularity_change):.3f}",
'severity': 'medium'
})
# Check for increasing density
density_change = current_metrics['density'] - previous_metrics['density']
if density_change > threshold:
warnings.append({
'type': 'increasing_coupling',
'message': f"Graph density increased by {density_change:.3f}",
'severity': 'medium'
})
return {
'warnings': warnings,
'status': 'degradation_detected' if warnings else 'healthy'
}
def generate_trend_report(self):
"""
Generates a report showing trends in architectural metrics.
Returns:
str: Formatted trend report
"""
if len(self.history) < 2:
return "Insufficient history for trend analysis"
report_lines = ["ARCHITECTURAL METRICS TREND REPORT"]
report_lines.append("=" * 60)
report_lines.append("")
# Get first and last metrics
first = self.history[0]
last = self.history[-1]
report_lines.append(f"Analysis period: {first['timestamp']} to {last['timestamp']}")
report_lines.append(f"Number of snapshots: {len(self.history)}")
report_lines.append("")
# Calculate trends
report_lines.append("METRIC TRENDS:")
metrics_to_track = [
('num_nodes', 'Number of nodes'),
('num_edges', 'Number of edges'),
('density', 'Graph density'),
('num_cycles', 'Number of cycles'),
('modularity', 'Modularity score')
]
for metric_key, metric_name in metrics_to_track:
if metric_key in first and metric_key in last:
first_val = first[metric_key]
last_val = last[metric_key]
if first_val is not None and last_val is not None:
if isinstance(first_val, (int, float)):
change = last_val - first_val
if first_val != 0:
pct_change = (change / first_val) * 100
report_lines.append(
f" {metric_name}: {first_val:.3f} -> {last_val:.3f} "
f"({pct_change:+.1f}%)"
)
else:
report_lines.append(
f" {metric_name}: {first_val:.3f} -> {last_val:.3f}"
)
return '\n'.join(report_lines)
This tracking system allows you to monitor architectural health over time. You can see whether your architecture is improving or degrading with each change. If you notice a sudden increase in cycles or a drop in modularity, you can investigate which recent changes caused the degradation and address them before they become entrenched.
Practical Example: Complete Analysis Pipeline
Let us bring together all the concepts we have discussed into a complete analysis pipeline that you might run on a real codebase. This pipeline performs multiple analyses and generates a comprehensive architectural health report.
def comprehensive_architectural_analysis(graph, intended_modules=None):
"""
Performs a complete architectural analysis of a dependency graph.
Args:
graph (nx.DiGraph): The dependency graph to analyze
intended_modules (dict): Optional mapping of nodes to intended modules
Returns:
dict: Complete analysis results
"""
print("COMPREHENSIVE ARCHITECTURAL ANALYSIS")
print("=" * 70)
print()
# Step 1: Basic graph statistics
print("STEP 1: BASIC GRAPH STATISTICS")
print("-" * 70)
summary = create_graph_summary(graph)
print(f"Total nodes: {summary['num_nodes']}")
print(f"Total edges: {summary['num_edges']}")
print(f"Graph density: {summary['density']:.4f}")
print(f"Is DAG: {summary['is_dag']}")
print(f"Number of components: {summary['num_components']}")
print()
# Step 2: Circular dependency analysis
print("STEP 2: CIRCULAR DEPENDENCY ANALYSIS")
print("-" * 70)
cycle_analysis = analyze_circular_dependencies(graph)
report_circular_dependencies(cycle_analysis)
print()
# Step 3: Layer analysis (if DAG)
if summary['is_dag']:
print("STEP 3: ARCHITECTURAL LAYER ANALYSIS")
print("-" * 70)
layer_info = identify_architectural_layers(graph)
report_architectural_layers(layer_info)
print()
else:
print("STEP 3: LAYER ANALYSIS SKIPPED (graph contains cycles)")
print()
# Step 4: Critical component analysis
print("STEP 4: CRITICAL COMPONENT ANALYSIS")
print("-" * 70)
centrality = analyze_component_criticality(graph)
critical = identify_critical_components(centrality, top_n=5)
report_critical_components(critical)
print()
# Step 5: Community detection
print("STEP 5: COMMUNITY DETECTION")
print("-" * 70)
communities = detect_architectural_communities(graph)
print(f"Detected {communities['num_communities']} communities")
print(f"Modularity score: {communities['modularity_score']:.4f}")
print(f"Intra-community edges: {communities['intra_community_edges']}")
print(f"Inter-community edges: {communities['inter_community_edges']}")
print()
if intended_modules:
alignment = compare_with_intended_architecture(
communities,
intended_modules
)
print(f"Alignment with intended architecture: {alignment['alignment_score']:.2%}")
if alignment['split_modules']:
print()
print("WARNING: Some intended modules are split across communities:")
for module, distribution in alignment['split_modules'].items():
print(f" {module}: spread across {len(distribution)} communities")
print()
# Step 6: Architectural smell detection
print("STEP 6: ARCHITECTURAL SMELL DETECTION")
print("-" * 70)
report_architectural_smells(graph)
# Generate overall health score
health_score = calculate_architectural_health_score(
summary,
cycle_analysis,
communities
)
print("OVERALL ARCHITECTURAL HEALTH SCORE")
print("=" * 70)
print(f"Score: {health_score['score']:.1f}/100")
print()
print("Score breakdown:")
for component, score in health_score['components'].items():
print(f" {component}: {score:.1f}/100")
print()
return {
'summary': summary,
'cycles': cycle_analysis,
'centrality': centrality,
'communities': communities,
'health_score': health_score
}
def calculate_architectural_health_score(summary, cycle_analysis, communities):
"""
Calculates an overall architectural health score based on multiple factors.
Args:
summary (dict): Basic graph statistics
cycle_analysis (dict): Cycle detection results
communities (dict): Community detection results
Returns:
dict: Health score and breakdown
"""
components = {}
# Component 1: Acyclicity (30 points)
if cycle_analysis['has_cycles']:
# Penalize based on number of cycles
cycle_penalty = min(cycle_analysis['cycle_count'] * 5, 30)
components['acyclicity'] = max(0, 30 - cycle_penalty)
else:
components['acyclicity'] = 30
# Component 2: Modularity (30 points)
if communities['modularity_score'] is not None:
# Modularity ranges from -1 to 1, we map it to 0-30
components['modularity'] = (communities['modularity_score'] + 1) * 15
else:
components['modularity'] = 0
# Component 3: Coupling (20 points)
# Lower density is better for large systems
if summary['num_nodes'] > 10:
# Penalize high density
density_score = max(0, 20 - (summary['density'] * 100))
components['coupling'] = density_score
else:
components['coupling'] = 20
# Component 4: Connectivity (20 points)
# Prefer weakly connected graphs (not too fragmented)
if summary['is_weakly_connected']:
components['connectivity'] = 20
else:
# Penalize based on number of components
penalty = (summary['num_components'] - 1) * 5
components['connectivity'] = max(0, 20 - penalty)
total_score = sum(components.values())
return {
'score': total_score,
'components': components,
'grade': get_grade(total_score)
}
def get_grade(score):
"""
Converts numerical score to letter grade.
Args:
score (float): Score from 0 to 100
Returns:
str: Letter grade
"""
if score >= 90:
return 'A (Excellent)'
elif score >= 80:
return 'B (Good)'
elif score >= 70:
return 'C (Fair)'
elif score >= 60:
return 'D (Poor)'
else:
return 'F (Critical)'
This comprehensive analysis pipeline gives you a complete picture of your architectural health. By running it regularly and tracking the results over time, you can ensure that your architecture remains clean and maintainable as your codebase evolves.
Advanced Techniques for Deep Architectural Insight
The foundational graph algorithms we have explored provide essential insights into software architecture, but the field of graph theory offers much more sophisticated techniques that can reveal subtle architectural patterns and predict future problems. In this section, we delve into advanced algorithms that go beyond basic metrics to provide actionable intelligence about code quality, architectural evolution, and potential refactoring opportunities.
Advanced graph algorithms allow us to answer questions that simple metrics cannot address. For instance, how do we identify components that should be split because they serve multiple unrelated purposes? How can we predict which parts of the codebase are most likely to require changes when a new feature is added? Which refactoring operations would have the greatest positive impact on overall architecture? These questions require more sophisticated analytical techniques.
Minimum Spanning Trees and Architectural Backbones
A minimum spanning tree is a subset of edges in a graph that connects all nodes with the minimum total edge weight. In architectural analysis, we can use weighted graphs where edge weights represent coupling strength, measured by factors such as the number of method calls between classes, the frequency of changes that affect both components, or the semantic similarity of the code.
The minimum spanning tree reveals the architectural backbone of your system, showing the essential dependencies that hold the architecture together. Edges not in the minimum spanning tree represent redundant or less critical dependencies that might be candidates for removal to simplify the architecture.
def calculate_coupling_strength(graph, call_counts=None, change_coupling=None):
"""
Calculates coupling strength between components based on multiple factors.
Higher weights indicate stronger coupling.
Args:
graph (nx.DiGraph): The dependency graph
call_counts (dict): Dictionary mapping (source, target) to call count
change_coupling (dict): Dictionary mapping (source, target) to
co-change frequency
Returns:
nx.Graph: Undirected weighted graph with coupling strengths
"""
weighted_graph = nx.Graph()
# Add all nodes from the original graph
weighted_graph.add_nodes_from(graph.nodes())
# Calculate weights for each edge
for source, target in graph.edges():
weight = 0.0
# Factor 1: Direct dependency exists (base weight)
weight += 1.0
# Factor 2: Number of calls or references
if call_counts and (source, target) in call_counts:
# Normalize call count to a reasonable range
call_weight = min(call_counts[(source, target)] / 10.0, 5.0)
weight += call_weight
# Factor 3: Change coupling (how often they change together)
if change_coupling and (source, target) in change_coupling:
# Change coupling ranges from 0 to 1
weight += change_coupling[(source, target)] * 3.0
# Add edge with calculated weight
weighted_graph.add_edge(source, target, weight=weight)
return weighted_graph
def identify_architectural_backbone(weighted_graph):
"""
Identifies the architectural backbone using minimum spanning tree.
Args:
weighted_graph (nx.Graph): Weighted undirected graph of dependencies
Returns:
dict: Analysis of backbone structure
"""
if weighted_graph.number_of_nodes() == 0:
return {
'backbone_edges': [],
'redundant_edges': [],
'backbone_weight': 0,
'total_weight': 0
}
# Compute minimum spanning tree
# We use maximum spanning tree by negating weights since we want
# to keep the strongest connections
mst = nx.maximum_spanning_tree(weighted_graph, weight='weight')
backbone_edges = list(mst.edges(data=True))
all_edges = list(weighted_graph.edges(data=True))
# Identify redundant edges (not in backbone)
backbone_edge_set = set((min(u, v), max(u, v)) for u, v, _ in backbone_edges)
redundant_edges = [
(u, v, data) for u, v, data in all_edges
if (min(u, v), max(u, v)) not in backbone_edge_set
]
# Calculate total weights
backbone_weight = sum(data['weight'] for _, _, data in backbone_edges)
total_weight = sum(data['weight'] for _, _, data in all_edges)
return {
'backbone_edges': backbone_edges,
'redundant_edges': redundant_edges,
'backbone_weight': backbone_weight,
'total_weight': total_weight,
'backbone_graph': mst,
'redundancy_ratio': len(redundant_edges) / len(all_edges) if all_edges else 0
}
def analyze_redundant_dependencies(backbone_analysis):
"""
Analyzes redundant dependencies to identify refactoring opportunities.
Args:
backbone_analysis (dict): Results from identify_architectural_backbone
Returns:
list: Prioritized list of dependencies to consider removing
"""
redundant = backbone_analysis['redundant_edges']
# Sort redundant edges by weight (ascending)
# Lower weight edges are better candidates for removal
sorted_redundant = sorted(redundant, key=lambda x: x[2]['weight'])
recommendations = []
for source, target, data in sorted_redundant:
recommendations.append({
'source': source,
'target': target,
'coupling_strength': data['weight'],
'recommendation': 'Consider removing or refactoring this dependency',
'priority': 'high' if data['weight'] < 2.0 else 'medium'
})
return recommendations
def report_backbone_analysis(backbone_analysis, recommendations):
"""
Generates a comprehensive report on architectural backbone analysis.
Args:
backbone_analysis (dict): Backbone analysis results
recommendations (list): Refactoring recommendations
"""
print("ARCHITECTURAL BACKBONE ANALYSIS")
print("=" * 70)
print()
total_edges = (len(backbone_analysis['backbone_edges']) +
len(backbone_analysis['redundant_edges']))
print(f"Total dependencies: {total_edges}")
print(f"Backbone dependencies: {len(backbone_analysis['backbone_edges'])}")
print(f"Redundant dependencies: {len(backbone_analysis['redundant_edges'])}")
print(f"Redundancy ratio: {backbone_analysis['redundancy_ratio']:.2%}")
print()
print(f"Backbone coupling strength: {backbone_analysis['backbone_weight']:.2f}")
print(f"Total coupling strength: {backbone_analysis['total_weight']:.2f}")
print()
if recommendations:
print("REFACTORING RECOMMENDATIONS (Top 10):")
print("-" * 70)
for idx, rec in enumerate(recommendations[:10], 1):
print(f"{idx}. {rec['source']} -> {rec['target']}")
print(f" Coupling strength: {rec['coupling_strength']:.2f}")
print(f" Priority: {rec['priority']}")
print(f" {rec['recommendation']}")
print()
The backbone analysis reveals which dependencies are truly essential to your architecture. Redundant dependencies with low coupling strength are prime candidates for removal. By eliminating these weak dependencies, you can simplify your architecture without sacrificing functionality. This is particularly valuable when dealing with legacy systems where dependencies have accumulated over time without clear architectural oversight.
Graph Motif Detection for Design Pattern Recognition
Graph motifs are small subgraph patterns that appear frequently in a graph. In software architecture, certain motifs correspond to well-known design patterns or anti-patterns. By detecting these motifs systematically, we can identify where design patterns are being used correctly and where anti-patterns have emerged.
For example, the Adapter pattern creates a specific motif where an adapter class depends on both an interface and a concrete implementation. The Singleton pattern creates a motif where a class has a self-reference. The God Class anti-pattern creates a star-shaped motif with one central node connected to many others.
def detect_adapter_pattern(graph):
"""
Detects potential Adapter pattern instances in the dependency graph.
An adapter typically depends on both an interface and an adaptee,
while implementing the interface.
Args:
graph (nx.DiGraph): The dependency graph
Returns:
list: Potential adapter pattern instances
"""
adapters = []
for node in graph.nodes():
# Get what this node depends on
dependencies = list(graph.successors(node))
# An adapter typically has at least 2 dependencies
if len(dependencies) < 2:
continue
# Check for the pattern: node depends on multiple components
# where at least one might be an interface and one an implementation
# We use heuristics based on naming and structure
potential_interfaces = []
potential_implementations = []
for dep in dependencies:
# Heuristic: interfaces often have 'Interface' or 'I' prefix
# or have high in-degree (many implementations)
if ('Interface' in dep or dep.startswith('I') or
graph.in_degree(dep) > 3):
potential_interfaces.append(dep)
else:
potential_implementations.append(dep)
# If we found both interfaces and implementations
if potential_interfaces and potential_implementations:
adapters.append({
'adapter': node,
'interfaces': potential_interfaces,
'adaptees': potential_implementations,
'confidence': calculate_adapter_confidence(
graph, node, potential_interfaces, potential_implementations
)
})
return sorted(adapters, key=lambda x: x['confidence'], reverse=True)
def calculate_adapter_confidence(graph, adapter, interfaces, adaptees):
"""
Calculates confidence score for adapter pattern detection.
Args:
graph (nx.DiGraph): The dependency graph
adapter (str): The potential adapter node
interfaces (list): Potential interface dependencies
adaptees (list): Potential adaptee dependencies
Returns:
float: Confidence score between 0 and 1
"""
score = 0.0
# Factor 1: Has both interface and adaptee dependencies
if interfaces and adaptees:
score += 0.4
# Factor 2: Adapter name contains 'Adapter'
if 'Adapter' in adapter:
score += 0.3
# Factor 3: Interfaces have high in-degree
avg_interface_in_degree = sum(
graph.in_degree(i) for i in interfaces
) / len(interfaces) if interfaces else 0
if avg_interface_in_degree > 2:
score += 0.2
# Factor 4: Adapter has low in-degree (not widely used)
if graph.in_degree(adapter) <= 3:
score += 0.1
return min(score, 1.0)
def detect_facade_pattern(graph):
"""
Detects potential Facade pattern instances.
A facade provides a simplified interface to a complex subsystem.
Args:
graph (nx.DiGraph): The dependency graph
Returns:
list: Potential facade pattern instances
"""
facades = []
for node in graph.nodes():
# A facade typically depends on many components in a subsystem
dependencies = list(graph.successors(node))
if len(dependencies) < 3:
continue
# Check if dependencies form a cohesive subsystem
# by examining their interconnections
subsystem_graph = graph.subgraph(dependencies)
subsystem_edges = subsystem_graph.number_of_edges()
# A true subsystem should have internal connections
if subsystem_edges > 0:
# Calculate subsystem cohesion
max_possible_edges = len(dependencies) * (len(dependencies) - 1)
cohesion = subsystem_edges / max_possible_edges if max_possible_edges > 0 else 0
# Check if the facade is the primary entry point
external_dependencies = sum(
1 for dep in dependencies
if any(pred not in dependencies and pred != node
for pred in graph.predecessors(dep))
)
is_primary_entry = external_dependencies < len(dependencies) * 0.3
if cohesion > 0.1 and is_primary_entry:
facades.append({
'facade': node,
'subsystem': dependencies,
'subsystem_size': len(dependencies),
'cohesion': cohesion,
'confidence': cohesion * 0.7 + (0.3 if 'Facade' in node else 0)
})
return sorted(facades, key=lambda x: x['confidence'], reverse=True)
def detect_circular_dependency_motifs(graph):
"""
Detects specific types of circular dependency motifs that indicate
different architectural problems.
Args:
graph (nx.DiGraph): The dependency graph
Returns:
dict: Categorized circular dependency motifs
"""
motifs = {
'mutual_dependencies': [], # A -> B, B -> A
'triangular_cycles': [], # A -> B -> C -> A
'longer_cycles': [] # Cycles with 4+ nodes
}
# Find all simple cycles
try:
all_cycles = list(nx.simple_cycles(graph))
except:
return motifs
for cycle in all_cycles:
cycle_length = len(cycle)
if cycle_length == 2:
# Mutual dependency
motifs['mutual_dependencies'].append({
'nodes': cycle,
'severity': 'high',
'description': f"Mutual dependency between {cycle[0]} and {cycle[1]}"
})
elif cycle_length == 3:
# Triangular cycle
motifs['triangular_cycles'].append({
'nodes': cycle,
'severity': 'high',
'description': f"Triangular dependency: {' -> '.join(cycle + [cycle[0]])}"
})
else:
# Longer cycle
motifs['longer_cycles'].append({
'nodes': cycle,
'length': cycle_length,
'severity': 'medium',
'description': f"Cycle of length {cycle_length}: {' -> '.join(cycle[:3])} ..."
})
return motifs
def report_design_patterns(adapters, facades, circular_motifs):
"""
Generates a comprehensive report on detected design patterns and anti-patterns.
Args:
adapters (list): Detected adapter patterns
facades (list): Detected facade patterns
circular_motifs (dict): Detected circular dependency motifs
"""
print("DESIGN PATTERN AND MOTIF ANALYSIS")
print("=" * 70)
print()
# Report Adapter patterns
if adapters:
print(f"ADAPTER PATTERN INSTANCES DETECTED: {len(adapters)}")
print("-" * 70)
for idx, adapter in enumerate(adapters[:5], 1):
print(f"{idx}. {adapter['adapter']}")
print(f" Confidence: {adapter['confidence']:.2f}")
print(f" Interfaces: {', '.join(adapter['interfaces'])}")
print(f" Adaptees: {', '.join(adapter['adaptees'])}")
print()
else:
print("No Adapter pattern instances detected.")
print()
# Report Facade patterns
if facades:
print(f"FACADE PATTERN INSTANCES DETECTED: {len(facades)}")
print("-" * 70)
for idx, facade in enumerate(facades[:5], 1):
print(f"{idx}. {facade['facade']}")
print(f" Confidence: {facade['confidence']:.2f}")
print(f" Subsystem size: {facade['subsystem_size']} components")
print(f" Subsystem cohesion: {facade['cohesion']:.2f}")
print()
else:
print("No Facade pattern instances detected.")
print()
# Report circular dependency motifs
print("CIRCULAR DEPENDENCY MOTIFS")
print("-" * 70)
mutual = circular_motifs['mutual_dependencies']
triangular = circular_motifs['triangular_cycles']
longer = circular_motifs['longer_cycles']
print(f"Mutual dependencies: {len(mutual)}")
print(f"Triangular cycles: {len(triangular)}")
print(f"Longer cycles: {len(longer)}")
print()
if mutual:
print("Critical mutual dependencies (showing first 5):")
for motif in mutual[:5]:
print(f" - {motif['description']}")
print()
Motif detection helps you understand not just what dependencies exist, but what patterns those dependencies form. Recognizing design patterns automatically can validate that your architecture follows intended design principles. Detecting anti-pattern motifs helps you identify code smells that might not be obvious from simple metrics alone.
Spectral Graph Analysis for Architectural Clustering
Spectral graph theory uses the eigenvalues and eigenvectors of graph matrices to reveal deep structural properties. The graph Laplacian matrix, in particular, encodes information about the connectivity and clustering structure of a graph. By analyzing the spectrum of the Laplacian, we can identify natural clusters in the architecture that might not be apparent from other methods.
Spectral clustering is particularly powerful because it can detect clusters of arbitrary shape and is less sensitive to noise than simpler clustering methods. In architectural analysis, this helps identify cohesive modules even when the dependency structure is complex.
import numpy as np
from sklearn.cluster import SpectralClustering
def perform_spectral_clustering(graph, num_clusters=None):
"""
Performs spectral clustering on the dependency graph to identify
natural architectural clusters.
Args:
graph (nx.DiGraph): The dependency graph
num_clusters (int): Number of clusters to identify. If None,
automatically determined.
Returns:
dict: Clustering results and analysis
"""
if graph.number_of_nodes() < 2:
return {
'clusters': {},
'num_clusters': 0,
'silhouette_score': 0
}
# Convert to undirected for spectral analysis
undirected = graph.to_undirected()
# Get adjacency matrix
adjacency_matrix = nx.to_numpy_array(undirected)
# Determine optimal number of clusters if not specified
if num_clusters is None:
num_clusters = estimate_optimal_clusters(adjacency_matrix, undirected)
# Ensure num_clusters is valid
num_clusters = min(num_clusters, graph.number_of_nodes())
num_clusters = max(num_clusters, 2)
# Perform spectral clustering
clustering = SpectralClustering(
n_clusters=num_clusters,
affinity='precomputed',
assign_labels='kmeans',
random_state=42
)
labels = clustering.fit_predict(adjacency_matrix)
# Organize nodes by cluster
clusters = {}
node_list = list(undirected.nodes())
for idx, label in enumerate(labels):
if label not in clusters:
clusters[label] = []
clusters[label].append(node_list[idx])
# Calculate cluster quality metrics
silhouette = calculate_clustering_quality(adjacency_matrix, labels)
# Analyze inter-cluster and intra-cluster connections
cluster_stats = analyze_cluster_connections(graph, clusters)
return {
'clusters': clusters,
'num_clusters': num_clusters,
'labels': dict(zip(node_list, labels)),
'silhouette_score': silhouette,
'cluster_stats': cluster_stats
}
def estimate_optimal_clusters(adjacency_matrix, graph):
"""
Estimates the optimal number of clusters using the eigengap heuristic.
Args:
adjacency_matrix (np.array): Graph adjacency matrix
graph (nx.Graph): The graph
Returns:
int: Estimated optimal number of clusters
"""
# Compute Laplacian matrix
degree_matrix = np.diag(adjacency_matrix.sum(axis=1))
laplacian = degree_matrix - adjacency_matrix
# Compute eigenvalues
eigenvalues = np.linalg.eigvalsh(laplacian)
eigenvalues = np.sort(eigenvalues)
# Find the largest eigengap
# The eigengap heuristic suggests that the number of clusters
# corresponds to the number of small eigenvalues before a large gap
eigengaps = np.diff(eigenvalues)
# Look for the largest gap in the first half of eigenvalues
max_clusters = min(10, len(eigenvalues) // 2)
if max_clusters < 2:
return 2
largest_gap_idx = np.argmax(eigengaps[:max_clusters])
optimal_clusters = largest_gap_idx + 2 # +2 because we want clusters after the gap
return max(2, min(optimal_clusters, 10))
def calculate_clustering_quality(adjacency_matrix, labels):
"""
Calculates silhouette score to measure clustering quality.
Args:
adjacency_matrix (np.array): Graph adjacency matrix
labels (np.array): Cluster labels
Returns:
float: Silhouette score (-1 to 1, higher is better)
"""
from sklearn.metrics import silhouette_score
try:
# Convert adjacency to distance matrix
# Distance = 1 - similarity
max_val = adjacency_matrix.max()
if max_val > 0:
similarity = adjacency_matrix / max_val
distance_matrix = 1 - similarity
else:
distance_matrix = np.ones_like(adjacency_matrix)
score = silhouette_score(distance_matrix, labels, metric='precomputed')
return score
except:
return 0.0
def analyze_cluster_connections(graph, clusters):
"""
Analyzes the connection patterns within and between clusters.
Args:
graph (nx.DiGraph): The dependency graph
clusters (dict): Cluster assignments
Returns:
dict: Statistics about cluster connections
"""
stats = {
'intra_cluster_edges': {},
'inter_cluster_edges': {},
'cluster_cohesion': {},
'cluster_coupling': {}
}
# Count edges within each cluster
for cluster_id, nodes in clusters.items():
subgraph = graph.subgraph(nodes)
intra_edges = subgraph.number_of_edges()
stats['intra_cluster_edges'][cluster_id] = intra_edges
# Calculate cohesion (actual edges / possible edges)
num_nodes = len(nodes)
max_edges = num_nodes * (num_nodes - 1)
cohesion = intra_edges / max_edges if max_edges > 0 else 0
stats['cluster_cohesion'][cluster_id] = cohesion
# Count edges between clusters
for cluster_id1, nodes1 in clusters.items():
for cluster_id2, nodes2 in clusters.items():
if cluster_id1 >= cluster_id2:
continue
inter_edges = 0
for node1 in nodes1:
for node2 in nodes2:
if graph.has_edge(node1, node2) or graph.has_edge(node2, node1):
inter_edges += 1
if inter_edges > 0:
key = (cluster_id1, cluster_id2)
stats['inter_cluster_edges'][key] = inter_edges
# Calculate coupling between clusters
for (c1, c2), edge_count in stats['inter_cluster_edges'].items():
size1 = len(clusters[c1])
size2 = len(clusters[c2])
max_possible = size1 * size2 * 2 # Bidirectional
coupling = edge_count / max_possible if max_possible > 0 else 0
stats['cluster_coupling'][(c1, c2)] = coupling
return stats
def report_spectral_clustering(clustering_results):
"""
Generates a detailed report on spectral clustering results.
Args:
clustering_results (dict): Results from perform_spectral_clustering
"""
print("SPECTRAL CLUSTERING ANALYSIS")
print("=" * 70)
print()
clusters = clustering_results['clusters']
stats = clustering_results['cluster_stats']
print(f"Number of clusters identified: {clustering_results['num_clusters']}")
print(f"Clustering quality (silhouette score): {clustering_results['silhouette_score']:.3f}")
print()
print("CLUSTER COMPOSITION:")
print("-" * 70)
for cluster_id, nodes in sorted(clusters.items()):
cohesion = stats['cluster_cohesion'].get(cluster_id, 0)
intra_edges = stats['intra_cluster_edges'].get(cluster_id, 0)
print(f"Cluster {cluster_id}: {len(nodes)} components")
print(f" Cohesion: {cohesion:.3f}")
print(f" Internal edges: {intra_edges}")
print(f" Components: {', '.join(sorted(nodes)[:5])}")
if len(nodes) > 5:
print(f" ... and {len(nodes) - 5} more")
print()
print("INTER-CLUSTER COUPLING:")
print("-" * 70)
if stats['cluster_coupling']:
sorted_coupling = sorted(
stats['cluster_coupling'].items(),
key=lambda x: x[1],
reverse=True
)
for (c1, c2), coupling in sorted_coupling[:10]:
edge_count = stats['inter_cluster_edges'][(c1, c2)]
print(f"Cluster {c1} <-> Cluster {c2}:")
print(f" Coupling: {coupling:.3f}")
print(f" Edges: {edge_count}")
print()
else:
print("No significant inter-cluster coupling detected.")
print()
Spectral clustering provides a mathematically rigorous way to partition your architecture. Unlike community detection methods that optimize modularity, spectral clustering uses the fundamental structure of the graph encoded in its eigenvalues. This can reveal architectural divisions that align more closely with the actual flow of dependencies through the system.
Temporal Graph Analysis for Architectural Evolution
Software architecture does not remain static. Understanding how your dependency graph evolves over time provides insights into architectural drift, technical debt accumulation, and the effectiveness of refactoring efforts. Temporal graph analysis examines sequences of graph snapshots to identify trends and predict future problems.
By tracking how individual edges and nodes appear, disappear, and change over time, we can identify architectural hot spots where change is concentrated, predict which components are likely to become problematic, and measure the impact of architectural decisions.
from datetime import datetime, timedelta
import json
class TemporalGraphAnalyzer:
"""
Analyzes the evolution of dependency graphs over time.
"""
def __init__(self):
"""
Initializes the temporal analyzer with empty history.
"""
self.snapshots = []
self.edge_history = {}
self.node_history = {}
def add_snapshot(self, graph, timestamp=None, metadata=None):
"""
Adds a graph snapshot to the temporal history.
Args:
graph (nx.DiGraph): The dependency graph at this point in time
timestamp (datetime): When this snapshot was taken
metadata (dict): Additional information about this snapshot
"""
if timestamp is None:
timestamp = datetime.now()
snapshot = {
'timestamp': timestamp,
'graph': graph.copy(),
'metadata': metadata or {},
'num_nodes': graph.number_of_nodes(),
'num_edges': graph.number_of_edges()
}
self.snapshots.append(snapshot)
self._update_histories(graph, timestamp)
def _update_histories(self, graph, timestamp):
"""
Updates edge and node histories with new snapshot data.
Args:
graph (nx.DiGraph): The current graph
timestamp (datetime): Current timestamp
"""
# Update node history
for node in graph.nodes():
if node not in self.node_history:
self.node_history[node] = {
'first_seen': timestamp,
'last_seen': timestamp,
'appearances': 1,
'in_degree_history': [],
'out_degree_history': []
}
else:
self.node_history[node]['last_seen'] = timestamp
self.node_history[node]['appearances'] += 1
self.node_history[node]['in_degree_history'].append(
(timestamp, graph.in_degree(node))
)
self.node_history[node]['out_degree_history'].append(
(timestamp, graph.out_degree(node))
)
# Update edge history
for source, target in graph.edges():
edge_key = (source, target)
if edge_key not in self.edge_history:
self.edge_history[edge_key] = {
'first_seen': timestamp,
'last_seen': timestamp,
'appearances': 1,
'stable': True
}
else:
self.edge_history[edge_key]['last_seen'] = timestamp
self.edge_history[edge_key]['appearances'] += 1
def detect_architectural_drift(self):
"""
Detects architectural drift by analyzing changes over time.
Returns:
dict: Drift analysis results
"""
if len(self.snapshots) < 2:
return {
'drift_detected': False,
'message': 'Insufficient snapshots for drift analysis'
}
first_snapshot = self.snapshots[0]
last_snapshot = self.snapshots[-1]
# Calculate various drift metrics
node_churn = self._calculate_node_churn()
edge_churn = self._calculate_edge_churn()
structural_changes = self._calculate_structural_changes()
# Determine if significant drift has occurred
drift_score = (
node_churn['churn_rate'] * 0.3 +
edge_churn['churn_rate'] * 0.4 +
structural_changes['change_rate'] * 0.3
)
return {
'drift_detected': drift_score > 0.3,
'drift_score': drift_score,
'node_churn': node_churn,
'edge_churn': edge_churn,
'structural_changes': structural_changes,
'time_span': (last_snapshot['timestamp'] -
first_snapshot['timestamp']).days
}
def _calculate_node_churn(self):
"""
Calculates node churn (additions and removals).
Returns:
dict: Node churn statistics
"""
if len(self.snapshots) < 2:
return {'churn_rate': 0, 'added': 0, 'removed': 0}
first_nodes = set(self.snapshots[0]['graph'].nodes())
last_nodes = set(self.snapshots[-1]['graph'].nodes())
added = last_nodes - first_nodes
removed = first_nodes - last_nodes
total_unique = len(first_nodes | last_nodes)
churn_rate = (len(added) + len(removed)) / total_unique if total_unique > 0 else 0
return {
'churn_rate': churn_rate,
'added': len(added),
'removed': len(removed),
'added_nodes': list(added),
'removed_nodes': list(removed)
}
def _calculate_edge_churn(self):
"""
Calculates edge churn (dependency additions and removals).
Returns:
dict: Edge churn statistics
"""
if len(self.snapshots) < 2:
return {'churn_rate': 0, 'added': 0, 'removed': 0}
first_edges = set(self.snapshots[0]['graph'].edges())
last_edges = set(self.snapshots[-1]['graph'].edges())
added = last_edges - first_edges
removed = first_edges - last_edges
total_unique = len(first_edges | last_edges)
churn_rate = (len(added) + len(removed)) / total_unique if total_unique > 0 else 0
return {
'churn_rate': churn_rate,
'added': len(added),
'removed': len(removed),
'added_edges': list(added)[:10], # Sample for reporting
'removed_edges': list(removed)[:10]
}
def _calculate_structural_changes(self):
"""
Calculates changes in overall graph structure.
Returns:
dict: Structural change metrics
"""
if len(self.snapshots) < 2:
return {'change_rate': 0}
first = self.snapshots[0]
last = self.snapshots[-1]
# Calculate relative changes in key metrics
node_change = abs(last['num_nodes'] - first['num_nodes']) / max(first['num_nodes'], 1)
edge_change = abs(last['num_edges'] - first['num_edges']) / max(first['num_edges'], 1)
# Calculate density change
first_density = nx.density(first['graph'])
last_density = nx.density(last['graph'])
density_change = abs(last_density - first_density)
change_rate = (node_change + edge_change + density_change) / 3
return {
'change_rate': change_rate,
'node_growth': (last['num_nodes'] - first['num_nodes']) / max(first['num_nodes'], 1),
'edge_growth': (last['num_edges'] - first['num_edges']) / max(first['num_edges'], 1),
'density_change': last_density - first_density
}
def identify_architectural_hotspots(self):
"""
Identifies components that change frequently (architectural hotspots).
Returns:
list: Hotspot components with change metrics
"""
if len(self.snapshots) < 3:
return []
hotspots = []
for node, history in self.node_history.items():
# Calculate degree volatility
in_degrees = [deg for _, deg in history['in_degree_history']]
out_degrees = [deg for _, deg in history['out_degree_history']]
in_volatility = np.std(in_degrees) if len(in_degrees) > 1 else 0
out_volatility = np.std(out_degrees) if len(out_degrees) > 1 else 0
total_volatility = in_volatility + out_volatility
# Calculate appearance ratio
appearance_ratio = history['appearances'] / len(self.snapshots)
# A hotspot is a component that appears frequently and has high volatility
if appearance_ratio > 0.5 and total_volatility > 1.0:
hotspots.append({
'component': node,
'volatility': total_volatility,
'appearance_ratio': appearance_ratio,
'in_degree_volatility': in_volatility,
'out_degree_volatility': out_volatility,
'avg_in_degree': np.mean(in_degrees),
'avg_out_degree': np.mean(out_degrees)
})
return sorted(hotspots, key=lambda x: x['volatility'], reverse=True)
def predict_future_dependencies(self, horizon_days=30):
"""
Predicts which new dependencies are likely to appear based on trends.
Args:
horizon_days (int): How many days into the future to predict
Returns:
list: Predicted new dependencies with confidence scores
"""
if len(self.snapshots) < 3:
return []
predictions = []
# Analyze edge addition patterns
recent_additions = []
for i in range(1, len(self.snapshots)):
prev_edges = set(self.snapshots[i-1]['graph'].edges())
curr_edges = set(self.snapshots[i]['graph'].edges())
new_edges = curr_edges - prev_edges
time_diff = (self.snapshots[i]['timestamp'] -
self.snapshots[i-1]['timestamp']).days
for edge in new_edges:
recent_additions.append({
'edge': edge,
'time_diff': time_diff,
'snapshot_idx': i
})
# Look for patterns in recent additions
# Simple heuristic: if a component has been adding dependencies
# at an increasing rate, predict it will continue
last_graph = self.snapshots[-1]['graph']
for node in last_graph.nodes():
# Get recent out-degree growth
out_degrees = [deg for _, deg in
self.node_history.get(node, {}).get('out_degree_history', [])]
if len(out_degrees) >= 3:
recent_growth = out_degrees[-1] - out_degrees[-3]
if recent_growth > 0:
# Predict new dependencies for this node
# Look at what it doesn't depend on yet
current_deps = set(last_graph.successors(node))
potential_deps = set(last_graph.nodes()) - current_deps - {node}
for potential_dep in list(potential_deps)[:3]:
confidence = min(recent_growth / 5.0, 0.8)
predictions.append({
'source': node,
'target': potential_dep,
'confidence': confidence,
'reason': f'Recent dependency growth trend ({recent_growth} new deps)'
})
return sorted(predictions, key=lambda x: x['confidence'], reverse=True)[:10]
def report_temporal_analysis(temporal_analyzer):
"""
Generates a comprehensive temporal analysis report.
Args:
temporal_analyzer (TemporalGraphAnalyzer): The analyzer with historical data
"""
print("TEMPORAL ARCHITECTURAL ANALYSIS")
print("=" * 70)
print()
if len(temporal_analyzer.snapshots) < 2:
print("Insufficient historical data for temporal analysis.")
print("At least 2 snapshots are required.")
return
# Report drift analysis
drift = temporal_analyzer.detect_architectural_drift()
print("ARCHITECTURAL DRIFT ANALYSIS")
print("-" * 70)
print(f"Time span: {drift['time_span']} days")
print(f"Drift score: {drift['drift_score']:.3f}")
print(f"Drift detected: {'YES' if drift['drift_detected'] else 'NO'}")
print()
print("Node churn:")
print(f" Added: {drift['node_churn']['added']} components")
print(f" Removed: {drift['node_churn']['removed']} components")
print(f" Churn rate: {drift['node_churn']['churn_rate']:.2%}")
print()
print("Edge churn:")
print(f" Added: {drift['edge_churn']['added']} dependencies")
print(f" Removed: {drift['edge_churn']['removed']} dependencies")
print(f" Churn rate: {drift['edge_churn']['churn_rate']:.2%}")
print()
# Report hotspots
hotspots = temporal_analyzer.identify_architectural_hotspots()
print("ARCHITECTURAL HOTSPOTS")
print("-" * 70)
if hotspots:
print(f"Identified {len(hotspots)} hotspot components")
print()
for idx, hotspot in enumerate(hotspots[:5], 1):
print(f"{idx}. {hotspot['component']}")
print(f" Volatility: {hotspot['volatility']:.2f}")
print(f" Average in-degree: {hotspot['avg_in_degree']:.1f}")
print(f" Average out-degree: {hotspot['avg_out_degree']:.1f}")
print()
else:
print("No significant hotspots detected.")
print()
# Report predictions
predictions = temporal_analyzer.predict_future_dependencies()
print("PREDICTED FUTURE DEPENDENCIES")
print("-" * 70)
if predictions:
print("Based on recent trends, these dependencies may emerge:")
print()
for idx, pred in enumerate(predictions[:5], 1):
print(f"{idx}. {pred['source']} -> {pred['target']}")
print(f" Confidence: {pred['confidence']:.2f}")
print(f" Reason: {pred['reason']}")
print()
else:
print("No significant trends detected for prediction.")
print()
Temporal analysis transforms architectural analysis from a static snapshot into a dynamic understanding of how your system evolves. By tracking changes over time, you can identify which parts of your architecture are stable and which are in flux. This helps you make informed decisions about where to invest refactoring effort and where to be cautious about making changes.
Detecting Hidden Dependencies Through Transitive Reduction
When we examine a dependency graph extracted from source code, we often see both direct and indirect dependencies represented as edges. If component A depends on B, and B depends on C, then A indirectly depends on C. In many cases, the source code might explicitly import both B and C from A, even though the dependency on C is redundant given the dependency on B. This creates visual clutter in the dependency graph and obscures the true architectural structure.
Transitive reduction is a graph algorithm that removes redundant edges while preserving reachability. After transitive reduction, an edge from A to C exists only if there is no alternative path from A to C through other nodes. This reveals the minimal set of dependencies that fully describes the system's structure.
def compute_transitive_reduction(graph):
"""
Computes the transitive reduction of a directed graph, removing
redundant edges that can be inferred through transitivity.
The transitive reduction preserves all reachability relationships
while minimizing the number of edges. This reveals the essential
dependency structure without redundant connections.
Args:
graph (nx.DiGraph): The dependency graph to reduce
Returns:
nx.DiGraph: The transitively reduced graph
"""
# NetworkX provides a built-in transitive reduction function
# but we implement our own to understand the algorithm
reduced_graph = graph.copy()
# For each edge (u, v), check if there exists an alternative path
# from u to v that does not use the direct edge
edges_to_remove = []
for u, v in graph.edges():
# Temporarily remove the edge
reduced_graph.remove_edge(u, v)
# Check if v is still reachable from u
if nx.has_path(reduced_graph, u, v):
# The edge is redundant, mark it for permanent removal
edges_to_remove.append((u, v))
else:
# The edge is essential, restore it
reduced_graph.add_edge(u, v)
return reduced_graph
def analyze_dependency_redundancy(original_graph, reduced_graph):
"""
Analyzes the difference between original and reduced graphs to
identify redundant dependencies.
Args:
original_graph (nx.DiGraph): Original dependency graph
reduced_graph (nx.DiGraph): Transitively reduced graph
Returns:
dict: Analysis of redundant dependencies
"""
original_edges = set(original_graph.edges())
reduced_edges = set(reduced_graph.edges())
redundant_edges = original_edges - reduced_edges
# Group redundant edges by source node
redundancy_by_node = {}
for source, target in redundant_edges:
if source not in redundancy_by_node:
redundancy_by_node[source] = []
redundancy_by_node[source].append(target)
# Calculate redundancy metrics
total_edges = len(original_edges)
essential_edges = len(reduced_edges)
redundant_count = len(redundant_edges)
redundancy_ratio = redundant_count / total_edges if total_edges > 0 else 0
return {
'total_edges': total_edges,
'essential_edges': essential_edges,
'redundant_edges': redundant_count,
'redundancy_ratio': redundancy_ratio,
'redundancy_by_node': redundancy_by_node,
'most_redundant_nodes': sorted(
redundancy_by_node.items(),
key=lambda x: len(x[1]),
reverse=True
)[:10]
}
def report_dependency_redundancy(redundancy_analysis):
"""
Generates a detailed report on dependency redundancy.
Args:
redundancy_analysis (dict): Results from analyze_dependency_redundancy
"""
print("DEPENDENCY REDUNDANCY ANALYSIS")
print("=" * 70)
print()
print(f"Total dependencies: {redundancy_analysis['total_edges']}")
print(f"Essential dependencies: {redundancy_analysis['essential_edges']}")
print(f"Redundant dependencies: {redundancy_analysis['redundant_edges']}")
print(f"Redundancy ratio: {redundancy_analysis['redundancy_ratio']:.2%}")
print()
if redundancy_analysis['most_redundant_nodes']:
print("Components with most redundant dependencies:")
for node, redundant_deps in redundancy_analysis['most_redundant_nodes']:
print(f" {node}: {len(redundant_deps)} redundant dependencies")
for dep in redundant_deps[:5]: # Show first 5
print(f" - {dep}")
if len(redundant_deps) > 5:
print(f" ... and {len(redundant_deps) - 5} more")
print()
print("Interpretation:")
print(" High redundancy may indicate over-specification of dependencies")
print(" or opportunities to simplify import statements and reduce coupling.")
Transitive reduction is particularly valuable when reviewing architectural diagrams. The full dependency graph of a large system can be overwhelming, with thousands of edges creating a dense tangle. The transitively reduced graph shows only the essential dependencies, making it much easier to understand the core architectural structure. This helps architects identify the fundamental relationships that define the system's organization.
Conclusion and Future Directions
The graph algorithms we have explored represent a comprehensive toolkit for understanding software architecture. From basic cycle detection and topological sorting to advanced techniques like spectral clustering and temporal analysis, these algorithms provide objective, quantitative insights into architectural quality.
The key to success with these methods is understanding when to apply each technique. Cycle detection is essential for any architectural analysis. Topological sorting reveals layering structure in well-designed systems. Centrality measures identify critical components. Community detection and spectral clustering reveal natural module boundaries. Temporal analysis tracks architectural evolution. Transitive reduction simplifies complex dependency graphs. Together, these techniques provide a complete picture of architectural health.
As software systems continue to grow in complexity, the importance of automated architectural analysis will only increase. Manual code reviews and architectural assessments cannot keep pace with the rate of change in modern development. Graph-based analysis provides the scalability and objectivity needed to maintain architectural integrity in large, rapidly evolving codebases.
The techniques presented here are not just academic exercises. They are practical tools that can be applied to real codebases today. By integrating them into your development workflow, you transform architectural analysis from an occasional manual review into a continuous, automated process that keeps your codebase healthy as it grows and evolves. Whether you use NetworkX for its simplicity and rich algorithm library, or Neo4j for its scalability and query capabilities, the fundamental principles remain the same. Model your dependencies as a graph, apply appropriate algorithms to extract insights, and use those insights to guide architectural decisions. The mathematics of graph theory, developed over centuries, is ready to help you build better software.