INTRODUCTION TO THE OLLAMA API
The Ollama API represents a powerful gateway to local large language model inference, enabling developers to integrate sophisticated AI capabilities directly into their applications without relying on cloud services or external APIs. This approach offers significant advantages in terms of privacy, cost control, and latency reduction. When you run Ollama on your local machine or server, you gain complete control over your AI infrastructure while maintaining the flexibility to build applications that leverage state-of-the-art language models.
Understanding the Ollama API begins with recognizing its fundamental design philosophy. Unlike many AI services that require complex authentication schemes or impose strict rate limits, Ollama provides a straightforward REST API that runs locally on your machine. This simplicity does not come at the expense of capability. The API supports streaming responses, multi-turn conversations, and fine-grained control over model behavior through various parameters. Whether you are building a chatbot, a code assistant, a content generation tool, or any other AI-powered application, the Ollama API provides the foundation you need.
The architecture of Ollama follows a client-server model where the Ollama service runs as a background process, typically listening on port 11434. Your application acts as a client, sending HTTP requests to this local server and receiving responses. This design makes the API language-agnostic, meaning you can interact with it from Python, JavaScript, Go, Rust, or any other language that supports HTTP communication. The API uses JSON for request and response payloads, making it easy to work with in modern programming environments.
UNDERSTANDING THE API ARCHITECTURE
Before diving into code, you need to understand how the Ollama API structures its endpoints and what each one accomplishes. The API provides several key endpoints that serve different purposes in your application workflow. The primary endpoints include model generation, chat completion, model management, and system information retrieval. Each endpoint accepts specific parameters and returns structured responses that your application can process.
The generation endpoint forms the core of most Ollama-based applications. This endpoint accepts a prompt and returns generated text based on that prompt. The endpoint supports both streaming and non-streaming modes. In non-streaming mode, the API waits until the entire response is generated before returning it to your application. This approach works well for shorter responses or when you need the complete output before proceeding. In streaming mode, the API sends response chunks as they are generated, allowing your application to display partial results in real-time. This creates a more responsive user experience, particularly for longer generations.
The chat endpoint provides a higher-level interface specifically designed for conversational applications. Instead of working with raw prompts, the chat endpoint accepts a list of messages representing the conversation history. Each message includes a role indicating whether it came from the system, user, or assistant. This structure allows the model to maintain context across multiple turns of conversation, creating more coherent and contextually appropriate responses. The chat endpoint automatically handles the formatting and context management that you would otherwise need to implement manually when using the generation endpoint.
Model management endpoints allow your application to interact with the models installed on your system. You can list available models, pull new models from the Ollama registry, delete models you no longer need, and retrieve detailed information about specific models. These endpoints prove particularly useful when building applications that need to work with multiple models or allow users to select which model to use for different tasks.
MAKING YOUR FIRST API REQUEST
Let us begin with a concrete example of interacting with the Ollama API. We will build a simple but complete application that demonstrates the fundamental concepts. Our example will create a command-line tool that allows users to have conversations with a local language model. This tool will showcase both streaming and non-streaming responses, proper error handling, and conversation history management.
To interact with the Ollama API, we need to make HTTP requests. Python provides excellent libraries for this purpose, with the requests library being the most popular choice for synchronous operations. For our running example, we will use both the requests library for non-streaming operations and the requests library with streaming support for real-time responses. Here is how we begin:
import requests
import json
import sys
from typing import List, Dict, Optional, Generator
class OllamaClient:
"""
A production-ready client for interacting with the Ollama API.
This client provides methods for both streaming and non-streaming
text generation, chat completions, and model management. It includes
comprehensive error handling and supports all Ollama API features.
"""
def __init__(self, base_url: str = "http://localhost:11434"):
"""
Initialize the Ollama client.
Args:
base_url: The base URL where Ollama is running. Defaults to
the standard local installation endpoint.
"""
self.base_url = base_url.rstrip('/')
self.api_version = "api"
def _build_url(self, endpoint: str) -> str:
"""
Constructs the full URL for an API endpoint.
Args:
endpoint: The API endpoint path (e.g., 'generate', 'chat')
Returns:
The complete URL for the API request
"""
return f"{self.base_url}/{self.api_version}/{endpoint}"
This initial code establishes the foundation of our client. The OllamaClient class encapsulates all interactions with the Ollama API, providing a clean interface that hides the HTTP details from the rest of our application. The constructor accepts a base URL parameter, allowing the client to work with Ollama installations running on different hosts or ports. The default value points to the standard local installation, which is what most developers will use during development.
The private method _build_url demonstrates a clean code principle: encapsulating URL construction logic in one place. This makes it easy to modify how URLs are built if needed, and it ensures consistency across all API calls. Notice that we strip trailing slashes from the base URL to prevent double slashes in the final URL, which could cause request failures.
IMPLEMENTING TEXT GENERATION
Now we will implement the core functionality for text generation. The generate method needs to handle both streaming and non-streaming modes, manage request parameters, and provide robust error handling. Here is the implementation:
def generate(
self,
model: str,
prompt: str,
stream: bool = False,
options: Optional[Dict] = None,
system: Optional[str] = None,
template: Optional[str] = None,
context: Optional[List[int]] = None,
raw: bool = False
) -> Dict:
"""
Generate text using the specified model and prompt.
Args:
model: Name of the model to use (e.g., 'llama2', 'mistral')
prompt: The prompt text to generate from
stream: Whether to stream the response
options: Model-specific options (temperature, top_p, etc.)
system: System message to set context
template: Custom prompt template
context: Context from previous generation for continuity
raw: Whether to use raw mode (no formatting)
Returns:
Dictionary containing the response and metadata
Raises:
requests.exceptions.RequestException: If the API request fails
ValueError: If the response format is invalid
"""
url = self._build_url("generate")
payload = {
"model": model,
"prompt": prompt,
"stream": stream
}
if options:
payload["options"] = options
if system:
payload["system"] = system
if template:
payload["template"] = template
if context:
payload["context"] = context
if raw:
payload["raw"] = raw
try:
response = requests.post(
url,
json=payload,
stream=stream,
timeout=300
)
response.raise_for_status()
if stream:
return self._handle_streaming_response(response)
else:
return response.json()
except requests.exceptions.Timeout:
raise TimeoutError(
f"Request to {url} timed out after 300 seconds"
)
except requests.exceptions.ConnectionError:
raise ConnectionError(
f"Could not connect to Ollama at {self.base_url}. "
"Ensure Ollama is running."
)
except requests.exceptions.HTTPError as e:
raise RuntimeError(
f"HTTP error occurred: {e.response.status_code} - "
f"{e.response.text}"
)
This generate method demonstrates several important concepts. First, it builds the request payload incrementally, only including optional parameters if they are provided. This keeps the request clean and avoids sending unnecessary data. The method supports all parameters that the Ollama generate endpoint accepts, giving your application full control over the generation process.
The options parameter deserves special attention. This dictionary allows you to pass model-specific parameters that control the generation behavior.
Common options include temperature, which controls randomness in the output, top_p for nucleus sampling, and top_k for limiting the vocabulary considered at each step. By exposing these options through the API, you can fine-tune the model's behavior for different use cases without changing the underlying model.
The error handling in this method follows best practices by catching specific exception types and providing meaningful error messages. When a timeout occurs, the user receives a clear message indicating that the request took too long. Connection errors produce a message that reminds the user to check if Ollama is running. HTTP errors include both the status code and the response text, which often contains useful debugging information from the Ollama server.
HANDLING STREAMING RESPONSES
Streaming responses require special handling because they arrive as a sequence of JSON objects rather than a single complete response. Each chunk contains a portion of the generated text along with metadata. Here is how we handle streaming:
def _handle_streaming_response(
self,
response: requests.Response
) -> Generator[Dict, None, None]:
"""
Process a streaming response from the Ollama API.
Args:
response: The streaming HTTP response object
Yields:
Dictionary objects representing each chunk of the response
Raises:
ValueError: If a chunk cannot be parsed as JSON
"""
for line in response.iter_lines():
if line:
try:
chunk = json.loads(line)
yield chunk
except json.JSONDecodeError as e:
raise ValueError(
f"Failed to parse streaming response chunk: {line}"
) from e
This method transforms the raw streaming response into a Python generator that yields parsed JSON objects. Using a generator is crucial here because it allows your application to process chunks as they arrive without waiting for the entire response. The method iterates over lines in the response, parses each line as JSON, and yields the resulting dictionary. If a line cannot be parsed, the method raises a descriptive error that includes the problematic line content.
When you use streaming in your application, you typically iterate over the generator and process each chunk. Each chunk contains a 'response' field with the generated text fragment and a 'done' field indicating whether generation is complete. The final chunk also includes additional metadata like the total duration and tokens per second.
Here is how you might use the streaming functionality:
def generate_streaming(
self,
model: str,
prompt: str,
options: Optional[Dict] = None
) -> Generator[str, None, Dict]:
"""
Generate text with streaming, yielding text fragments as they arrive.
Args:
model: Name of the model to use
prompt: The prompt text
options: Optional generation parameters
Yields:
Text fragments as they are generated
Returns:
Final metadata dictionary after generation completes
"""
full_response = ""
final_metadata = {}
for chunk in self.generate(
model=model,
prompt=prompt,
stream=True,
options=options
):
if 'response' in chunk:
text_fragment = chunk['response']
full_response += text_fragment
yield text_fragment
if chunk.get('done', False):
final_metadata = {
'total_duration': chunk.get('total_duration'),
'load_duration': chunk.get('load_duration'),
'prompt_eval_count': chunk.get('prompt_eval_count'),
'prompt_eval_duration': chunk.get('prompt_eval_duration'),
'eval_count': chunk.get('eval_count'),
'eval_duration': chunk.get('eval_duration'),
'context': chunk.get('context', [])
}
return final_metadata
This wrapper method provides a cleaner interface for streaming generation. It yields only the text fragments, making it easy to display them in real-time, and returns the metadata as a final value. The method also accumulates the full response and extracts important performance metrics from the final chunk. These metrics help you understand how long different phases of generation took, which is valuable for performance optimization.
IMPLEMENTING CHAT FUNCTIONALITY
The chat endpoint provides a more structured way to handle conversations. Instead of managing prompt formatting manually, you work with a list of messages. Here is the implementation:
def chat(
self,
model: str,
messages: List[Dict[str, str]],
stream: bool = False,
options: Optional[Dict] = None,
format: Optional[str] = None
) -> Dict:
"""
Conduct a chat conversation with the model.
Args:
model: Name of the model to use
messages: List of message dictionaries with 'role' and 'content'
stream: Whether to stream the response
options: Model-specific generation options
format: Response format ('json' for structured output)
Returns:
Dictionary containing the assistant's response and metadata
Raises:
ValueError: If messages format is invalid
requests.exceptions.RequestException: If the API request fails
"""
url = self._build_url("chat")
# Validate message format
for msg in messages:
if 'role' not in msg or 'content' not in msg:
raise ValueError(
"Each message must have 'role' and 'content' fields"
)
if msg['role'] not in ['system', 'user', 'assistant']:
raise ValueError(
f"Invalid role: {msg['role']}. Must be 'system', "
"'user', or 'assistant'"
)
payload = {
"model": model,
"messages": messages,
"stream": stream
}
if options:
payload["options"] = options
if format:
payload["format"] = format
try:
response = requests.post(
url,
json=payload,
stream=stream,
timeout=300
)
response.raise_for_status()
if stream:
return self._handle_streaming_response(response)
else:
return response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Chat request failed: {str(e)}"
) from e
The chat method validates the message format before making the API request. This validation catches common errors early, providing clear feedback about what went wrong. Each message must include both a role and content field, and the role must be one of the three valid values. This validation prevents confusing errors from the API and makes debugging easier.
The format parameter is particularly interesting. When set to 'json', it instructs the model to return structured JSON output. This feature is incredibly useful when building applications that need to extract structured information from text or generate data in a specific format. The model will attempt to produce valid JSON that matches any schema you describe in your prompt.
MANAGING CONVERSATION HISTORY
For a complete chat application, you need to manage conversation history effectively.
Here is a conversation manager that integrates with our Ollama client:
class ConversationManager:
"""
Manages conversation history and provides methods for multi-turn chat.
This class maintains the message history and provides utilities for
adding messages, retrieving context, and managing conversation state.
"""
def __init__(
self,
system_message: Optional[str] = None,
max_history: Optional[int] = None
):
"""
Initialize the conversation manager.
Args:
system_message: Optional system message to set context
max_history: Maximum number of messages to retain (None for unlimited)
"""
self.messages: List[Dict[str, str]] = []
self.max_history = max_history
if system_message:
self.add_system_message(system_message)
def add_system_message(self, content: str) -> None:
"""
Add a system message to set conversation context.
Args:
content: The system message content
"""
self.messages.append({
"role": "system",
"content": content
})
def add_user_message(self, content: str) -> None:
"""
Add a user message to the conversation.
Args:
content: The user's message content
"""
self.messages.append({
"role": "user",
"content": content
})
self._trim_history()
def add_assistant_message(self, content: str) -> None:
"""
Add an assistant message to the conversation.
Args:
content: The assistant's message content
"""
self.messages.append({
"role": "assistant",
"content": content
})
self._trim_history()
def _trim_history(self) -> None:
"""
Trim conversation history to max_history length if specified.
System messages are always preserved.
"""
if self.max_history is None:
return
system_messages = [
msg for msg in self.messages if msg['role'] == 'system'
]
other_messages = [
msg for msg in self.messages if msg['role'] != 'system'
]
if len(other_messages) > self.max_history:
other_messages = other_messages[-self.max_history:]
self.messages = system_messages + other_messages
def get_messages(self) -> List[Dict[str, str]]:
"""
Get the current conversation history.
Returns:
List of message dictionaries
"""
return self.messages.copy()
def clear(self, keep_system: bool = True) -> None:
"""
Clear the conversation history.
Args:
keep_system: Whether to preserve system messages
"""
if keep_system:
self.messages = [
msg for msg in self.messages if msg['role'] == 'system'
]
else:
self.messages = []
This ConversationManager class provides a clean abstraction for managing chat history. It handles the details of message formatting and history management, allowing your application code to focus on the conversation logic. The max_history parameter prevents the conversation from growing unbounded, which is important because longer conversations consume more tokens and increase processing time.
The _trim_history method implements a smart trimming strategy. It always preserves system messages because they set important context for the conversation. Only user and assistant messages are subject to the history limit. When trimming is necessary, the method keeps the most recent messages, maintaining the immediate context while discarding older exchanges.
WORKING WITH MODEL MANAGEMENT
Your application might need to work with multiple models or allow users to select which model to use. The model management endpoints make this possible. Here are the implementations:
def list_models(self) -> List[Dict]:
"""
List all models available on the Ollama instance.
Returns:
List of dictionaries containing model information
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = self._build_url("tags")
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
result = response.json()
return result.get('models', [])
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to list models: {str(e)}"
) from e
def show_model_info(self, model: str) -> Dict:
"""
Get detailed information about a specific model.
Args:
model: Name of the model
Returns:
Dictionary containing model details
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = self._build_url("show")
payload = {"name": model}
try:
response = requests.post(url, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to get model info: {str(e)}"
) from e
def pull_model(
self,
model: str,
stream: bool = True
) -> Generator[Dict, None, None]:
"""
Pull a model from the Ollama registry.
Args:
model: Name of the model to pull
stream: Whether to stream download progress
Yields:
Progress updates as dictionaries
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = self._build_url("pull")
payload = {
"name": model,
"stream": stream
}
try:
response = requests.post(
url,
json=payload,
stream=stream,
timeout=None
)
response.raise_for_status()
if stream:
for chunk in self._handle_streaming_response(response):
yield chunk
else:
yield response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to pull model: {str(e)}"
) from e
def delete_model(self, model: str) -> Dict:
"""
Delete a model from the local system.
Args:
model: Name of the model to delete
Returns:
Dictionary containing the deletion result
Raises:
requests.exceptions.RequestException: If the API request fails
"""
url = self._build_url("delete")
payload = {"name": model}
try:
response = requests.delete(url, json=payload, timeout=30)
response.raise_for_status()
return {"status": "success"}
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to delete model: {str(e)}"
) from e
These model management methods provide complete control over the models available to your application. The list_models method retrieves information about all installed models, including their names, sizes, and modification dates. This information helps users understand what models are available and how much disk space they consume.
The pull_model method downloads models from the Ollama registry. This operation can take considerable time for large models, so the method supports streaming to provide progress updates. Each progress update includes information about the download status, bytes transferred, and estimated completion time. Your application can use these updates to display a progress bar or other feedback to the user.
IMPLEMENTING ADVANCED FEATURES
Beyond basic text generation and chat, the Ollama API supports several advanced features that can enhance your applications. One important feature is the ability to control generation parameters dynamically. Here is a utility class that helps manage these parameters:
class GenerationOptions:
"""
Builder class for constructing generation options dictionaries.
This class provides a fluent interface for setting generation parameters
with validation and sensible defaults.
"""
def __init__(self):
"""Initialize with default options."""
self.options = {}
def temperature(self, value: float) -> 'GenerationOptions':
"""
Set the temperature for generation.
Args:
value: Temperature value (0.0 to 2.0, default 0.8)
Lower values make output more focused and deterministic.
Higher values increase randomness and creativity.
Returns:
Self for method chaining
Raises:
ValueError: If value is outside valid range
"""
if not 0.0 <= value <= 2.0:
raise ValueError("Temperature must be between 0.0 and 2.0")
self.options['temperature'] = value
return self
def top_p(self, value: float) -> 'GenerationOptions':
"""
Set nucleus sampling threshold.
Args:
value: Top-p value (0.0 to 1.0, default 0.9)
Controls diversity via nucleus sampling.
Returns:
Self for method chaining
Raises:
ValueError: If value is outside valid range
"""
if not 0.0 <= value <= 1.0:
raise ValueError("Top-p must be between 0.0 and 1.0")
self.options['top_p'] = value
return self
def top_k(self, value: int) -> 'GenerationOptions':
"""
Set top-k sampling parameter.
Args:
value: Top-k value (positive integer, default 40)
Limits vocabulary to top k most likely tokens.
Returns:
Self for method chaining
Raises:
ValueError: If value is not positive
"""
if value < 1:
raise ValueError("Top-k must be a positive integer")
self.options['top_k'] = value
return self
def repeat_penalty(self, value: float) -> 'GenerationOptions':
"""
Set repetition penalty.
Args:
value: Penalty value (1.0 or higher, default 1.1)
Penalizes repetition in generated text.
Returns:
Self for method chaining
Raises:
ValueError: If value is less than 1.0
"""
if value < 1.0:
raise ValueError("Repeat penalty must be 1.0 or higher")
self.options['repeat_penalty'] = value
return self
def num_predict(self, value: int) -> 'GenerationOptions':
"""
Set maximum number of tokens to generate.
Args:
value: Maximum tokens (positive integer)
Returns:
Self for method chaining
Raises:
ValueError: If value is not positive
"""
if value < 1:
raise ValueError("num_predict must be a positive integer")
self.options['num_predict'] = value
return self
def stop_sequences(self, sequences: List[str]) -> 'GenerationOptions':
"""
Set stop sequences that halt generation.
Args:
sequences: List of strings that stop generation when encountered
Returns:
Self for method chaining
"""
self.options['stop'] = sequences
return self
def build(self) -> Dict:
"""
Build and return the options dictionary.
Returns:
Dictionary of generation options
"""
return self.options.copy()
This GenerationOptions class demonstrates the builder pattern, which provides a clean and type-safe way to construct complex parameter dictionaries. Each method validates its input and returns self, allowing you to chain method calls. This approach prevents invalid parameter combinations and makes the code more readable. Here is how you would use it:
options = (GenerationOptions()
.temperature(0.7)
.top_p(0.9)
.repeat_penalty(1.2)
.num_predict(500)
.build())
This fluent interface is much clearer than manually constructing a dictionary, and it catches parameter errors at the point where you set them rather than when the API request fails.
IMPLEMENTING ERROR RECOVERY AND RETRY LOGIC
Production applications need robust error handling and retry logic. Network issues, temporary service unavailability, and rate limiting can all cause requests to fail. Here is a retry mechanism that handles these scenarios gracefully:
import time
from functools import wraps
def retry_on_failure(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple = (requests.exceptions.RequestException,)
):
"""
Decorator that retries a function on failure with exponential backoff.
Args:
max_attempts: Maximum number of attempts
delay: Initial delay between retries in seconds
backoff: Multiplier for delay after each attempt
exceptions: Tuple of exception types to catch and retry
Returns:
Decorated function with retry logic
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_attempts - 1:
time.sleep(current_delay)
current_delay *= backoff
else:
raise last_exception
return wrapper
return decorator
This retry decorator can be applied to any method that makes API calls. It implements exponential backoff, which means the delay between retries increases with each attempt. This approach is more respectful of the server and more likely to succeed when dealing with temporary issues. You would use it like this:
@retry_on_failure(max_attempts=3, delay=1.0, backoff=2.0)
def generate_with_retry(self, model: str, prompt: str) -> Dict:
return self.generate(model=model, prompt=prompt)
BUILDING A COMPLETE APPLICATION
Now let us integrate all these components into a complete, production-ready application. This application will provide a command-line interface for chatting with local models, with support for streaming responses, conversation history, and model selection.
import argparse
import readline
from typing import Optional
class OllamaChatApplication:
"""
Complete chat application using the Ollama API.
This application provides a full-featured command-line interface for
conversing with local language models through Ollama.
"""
def __init__(
self,
base_url: str = "http://localhost:11434",
default_model: str = "llama2"
):
"""
Initialize the chat application.
Args:
base_url: URL where Ollama is running
default_model: Default model to use for chat
"""
self.client = OllamaClient(base_url)
self.default_model = default_model
self.conversation = ConversationManager(max_history=20)
def select_model(self) -> str:
"""
Allow user to select from available models.
Returns:
Name of the selected model
"""
try:
models = self.client.list_models()
if not models:
print("No models found. Please pull a model first.")
print("Example: ollama pull llama2")
sys.exit(1)
print("\nAvailable models:")
for idx, model in enumerate(models, 1):
name = model.get('name', 'Unknown')
size = model.get('size', 0)
size_gb = size / (1024 ** 3)
print(f"{idx}. {name} ({size_gb:.2f} GB)")
while True:
try:
choice = input(
f"\nSelect model (1-{len(models)}) "
f"or press Enter for {self.default_model}: "
).strip()
if not choice:
return self.default_model
idx = int(choice) - 1
if 0 <= idx < len(models):
return models[idx]['name']
else:
print("Invalid selection. Please try again.")
except ValueError:
print("Please enter a number.")
except Exception as e:
print(f"Error listing models: {e}")
print(f"Using default model: {self.default_model}")
return self.default_model
def set_system_message(self) -> None:
"""
Allow user to set a system message for the conversation.
"""
print("\nEnter system message (or press Enter to skip):")
system_msg = input("> ").strip()
if system_msg:
self.conversation.add_system_message(system_msg)
print("System message set.")
def configure_generation_options(self) -> Dict:
"""
Allow user to configure generation parameters.
Returns:
Dictionary of generation options
"""
print("\nConfigure generation options (press Enter for defaults):")
options = GenerationOptions()
temp_input = input("Temperature (0.0-2.0, default 0.8): ").strip()
if temp_input:
try:
options.temperature(float(temp_input))
except ValueError as e:
print(f"Invalid temperature: {e}. Using default.")
top_p_input = input("Top-p (0.0-1.0, default 0.9): ").strip()
if top_p_input:
try:
options.top_p(float(top_p_input))
except ValueError as e:
print(f"Invalid top-p: {e}. Using default.")
max_tokens_input = input(
"Max tokens (default unlimited): "
).strip()
if max_tokens_input:
try:
options.num_predict(int(max_tokens_input))
except ValueError as e:
print(f"Invalid max tokens: {e}. Using default.")
return options.build()
def chat_loop(
self,
model: str,
options: Dict,
stream: bool = True
) -> None:
"""
Main chat interaction loop.
Args:
model: Name of the model to use
options: Generation options
stream: Whether to use streaming responses
"""
print(f"\nChatting with {model}")
print("Type 'exit' or 'quit' to end the conversation")
print("Type 'clear' to clear conversation history")
print("Type 'history' to view conversation history")
print("-" * 60)
while True:
try:
user_input = input("\nYou: ").strip()
if not user_input:
continue
if user_input.lower() in ['exit', 'quit']:
print("Goodbye!")
break
if user_input.lower() == 'clear':
self.conversation.clear(keep_system=True)
print("Conversation history cleared.")
continue
if user_input.lower() == 'history':
self._display_history()
continue
self.conversation.add_user_message(user_input)
print("\nAssistant: ", end='', flush=True)
if stream:
response_text = self._handle_streaming_chat(
model, options
)
else:
response_text = self._handle_non_streaming_chat(
model, options
)
self.conversation.add_assistant_message(response_text)
print()
except KeyboardInterrupt:
print("\n\nInterrupted. Type 'exit' to quit.")
except Exception as e:
print(f"\n\nError: {e}")
print("Please try again.")
def _handle_streaming_chat(
self,
model: str,
options: Dict
) -> str:
"""
Handle a streaming chat response.
Args:
model: Name of the model
options: Generation options
Returns:
Complete response text
"""
response_text = ""
try:
for chunk in self.client.chat(
model=model,
messages=self.conversation.get_messages(),
stream=True,
options=options
):
if 'message' in chunk:
content = chunk['message'].get('content', '')
response_text += content
print(content, end='', flush=True)
except Exception as e:
print(f"\nStreaming error: {e}")
raise
return response_text
def _handle_non_streaming_chat(
self,
model: str,
options: Dict
) -> str:
"""
Handle a non-streaming chat response.
Args:
model: Name of the model
options: Generation options
Returns:
Complete response text
"""
try:
response = self.client.chat(
model=model,
messages=self.conversation.get_messages(),
stream=False,
options=options
)
response_text = response['message']['content']
print(response_text)
return response_text
except Exception as e:
print(f"\nChat error: {e}")
raise
def _display_history(self) -> None:
"""Display the conversation history."""
messages = self.conversation.get_messages()
if not messages:
print("No conversation history.")
return
print("\nConversation History:")
print("-" * 60)
for msg in messages:
role = msg['role'].capitalize()
content = msg['content']
print(f"\n{role}: {content}")
print("-" * 60)
def run(self) -> None:
"""
Run the chat application.
This is the main entry point that orchestrates the entire application.
"""
print("=" * 60)
print("Ollama Chat Application")
print("=" * 60)
try:
model = self.select_model()
self.set_system_message()
options = self.configure_generation_options()
stream_input = input(
"\nUse streaming responses? (Y/n): "
).strip().lower()
stream = stream_input != 'n'
self.chat_loop(model, options, stream)
except Exception as e:
print(f"\nFatal error: {e}")
sys.exit(1)
def main():
"""
Main entry point for the application.
"""
parser = argparse.ArgumentParser(
description="Chat with local language models via Ollama"
)
parser.add_argument(
'--url',
default='http://localhost:11434',
help='Ollama API base URL (default: http://localhost:11434)'
)
parser.add_argument(
'--model',
default='llama2',
help='Default model to use (default: llama2)'
)
args = parser.parse_args()
app = OllamaChatApplication(
base_url=args.url,
default_model=args.model
)
app.run()
if __name__ == '__main__':
main()
This complete application demonstrates all the concepts we have discussed. It provides a polished user interface with model selection, system message configuration, generation parameter tuning, and a full-featured chat loop. The application handles errors gracefully, supports both streaming and non-streaming modes, and manages conversation history intelligently.
UNDERSTANDING PERFORMANCE CONSIDERATIONS
When building applications with the Ollama API, performance becomes a critical concern. Several factors affect the speed and efficiency of your application, and understanding these factors helps you make informed design decisions.
Model size directly impacts inference speed. Larger models like Llama 2 70B produce higher quality outputs but take significantly longer to generate responses compared to smaller models like Llama 2 7B. For interactive applications where response time matters, you might choose a smaller model even if it means sacrificing some output quality. For batch processing or offline tasks where quality is paramount, larger models become more attractive.
The context length affects both memory usage and processing time. Each token in the conversation history must be processed by the model, so longer conversations take more time to generate responses. This is why the ConversationManager includes a max_history parameter. By limiting history length, you keep response times reasonable while still maintaining enough context for coherent conversations.
Streaming versus non-streaming mode presents a tradeoff between perceived responsiveness and implementation complexity. Streaming mode allows users to see output as it generates, creating a more responsive feel even if the total generation time is the same. However, streaming requires more complex code to handle the chunked responses. For applications where the complete response is needed before proceeding, non-streaming mode simplifies the implementation without sacrificing functionality.
Quantization affects both model size and inference speed. Ollama supports various quantization levels, from full precision down to 4-bit quantization. Lower precision models run faster and use less memory but may produce slightly lower quality outputs. For most applications, 4-bit or 8-bit quantization provides an excellent balance between speed and quality.
IMPLEMENTING ADVANCED USE CASES
Beyond simple chat applications, the Ollama API enables many sophisticated use cases. Let us explore a few examples that demonstrate the API's versatility.
Document summarization represents a common use case where you want to condense large amounts of text into concise summaries. Here is how you might implement this:
class DocumentSummarizer:
"""
Utility for summarizing documents using Ollama models.
"""
def __init__(self, client: OllamaClient, model: str = "llama2"):
"""
Initialize the summarizer.
Args:
client: OllamaClient instance
model: Model to use for summarization
"""
self.client = client
self.model = model
def summarize(
self,
text: str,
max_length: Optional[int] = None,
style: str = "concise"
) -> str:
"""
Summarize the given text.
Args:
text: Text to summarize
max_length: Maximum length of summary in words
style: Summary style ('concise', 'detailed', 'bullet_points')
Returns:
Summarized text
"""
style_prompts = {
'concise': 'Provide a concise summary in 2-3 sentences.',
'detailed': 'Provide a detailed summary covering all main points.',
'bullet_points': 'Summarize using bullet points for key information.'
}
style_instruction = style_prompts.get(
style,
style_prompts['concise']
)
length_instruction = ""
if max_length:
length_instruction = f" Keep the summary under {max_length} words."
prompt = f"""Please summarize the following text. {style_instruction}{length_instruction}
Text to summarize:
{text}
Summary:"""
options = (GenerationOptions()
.temperature(0.3)
.top_p(0.9)
.build())
response = self.client.generate(
model=self.model,
prompt=prompt,
options=options
)
return response['response'].strip()
def summarize_in_chunks(
self,
text: str,
chunk_size: int = 4000,
final_summary_length: Optional[int] = None
) -> str:
"""
Summarize very long documents by processing in chunks.
Args:
text: Text to summarize
chunk_size: Size of each chunk in characters
final_summary_length: Target length for final summary
Returns:
Final summarized text
"""
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
current_chunk.append(word)
current_length += len(word) + 1
if current_length >= chunk_size:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_length = 0
if current_chunk:
chunks.append(' '.join(current_chunk))
chunk_summaries = []
for i, chunk in enumerate(chunks):
print(f"Summarizing chunk {i+1}/{len(chunks)}...")
summary = self.summarize(chunk, style='detailed')
chunk_summaries.append(summary)
if len(chunk_summaries) == 1:
return chunk_summaries[0]
combined = '\n\n'.join(chunk_summaries)
print("Creating final summary...")
return self.summarize(
combined,
max_length=final_summary_length,
style='concise'
)
This summarizer demonstrates how to handle long documents that exceed the model's context window. The summarize_in_chunks method breaks the document into manageable pieces, summarizes each piece, and then creates a final summary from the chunk summaries. This hierarchical approach works well for very long documents while staying within token limits.
Another powerful use case is structured data extraction. You can use the format parameter to request JSON output and extract specific information from unstructured text:
class InformationExtractor:
"""
Extract structured information from unstructured text.
"""
def __init__(self, client: OllamaClient, model: str = "llama2"):
"""
Initialize the extractor.
Args:
client: OllamaClient instance
model: Model to use for extraction
"""
self.client = client
self.model = model
def extract_entities(
self,
text: str,
entity_types: List[str]
) -> Dict:
"""
Extract named entities from text.
Args:
text: Text to analyze
entity_types: Types of entities to extract
(e.g., ['person', 'organization', 'location'])
Returns:
Dictionary mapping entity types to lists of found entities
"""
entity_list = ', '.join(entity_types)
prompt = f"""Extract the following types of entities from the text: {entity_list}
Return the results as JSON with this structure:
{{
"person": ["name1", "name2"],
"organization": ["org1", "org2"],
"location": ["loc1", "loc2"]
}}
Only include entity types that are found. If no entities of a type are found, omit that key.
Text:
{text}
JSON output:"""
options = (GenerationOptions()
.temperature(0.1)
.build())
response = self.client.generate(
model=self.model,
prompt=prompt,
options=options,
format='json'
)
try:
return json.loads(response['response'])
except json.JSONDecodeError:
return {}
def extract_key_value_pairs(
self,
text: str,
keys: List[str]
) -> Dict:
"""
Extract specific key-value pairs from text.
Args:
text: Text to analyze
keys: List of keys to extract values for
Returns:
Dictionary mapping keys to extracted values
"""
key_list = ', '.join(keys)
prompt = f"""Extract the following information from the text: {key_list}
Return the results as JSON with this structure:
{{
"key1": "value1",
"key2": "value2"
}}
If a value cannot be found, use null.
Text:
{text}
JSON output:"""
options = (GenerationOptions()
.temperature(0.1)
.build())
response = self.client.generate(
model=self.model,
prompt=prompt,
options=options,
format='json'
)
try:
return json.loads(response['response'])
except json.JSONDecodeError:
return {}
This InformationExtractor shows how to leverage the model's understanding of language to extract structured information. The format='json' parameter encourages the model to produce valid JSON, making it easier to parse the results programmatically.
HANDLING EDGE CASES AND ERROR SCENARIOS
Production applications must handle various edge cases and error scenarios gracefully. Let us examine some common issues and how to address them.
Context window overflow occurs when the conversation history plus the new prompt exceeds the model's maximum context length. Different models support different context lengths, typically ranging from 2048 to 8192 tokens.
When you exceed this limit, the API returns an error. The ConversationManager's max_history parameter helps prevent this, but you might also want to implement dynamic trimming based on token count:
class TokenAwareConversationManager(ConversationManager):
"""
Conversation manager that tracks token usage.
"""
def __init__(
self,
system_message: Optional[str] = None,
max_tokens: int = 2048
):
"""
Initialize with token limit instead of message count.
Args:
system_message: Optional system message
max_tokens: Maximum tokens to keep in history
"""
super().__init__(system_message=system_message)
self.max_tokens = max_tokens
def estimate_tokens(self, text: str) -> int:
"""
Estimate token count for text.
This is a rough estimate. For exact counts, you would need
the model's tokenizer.
Args:
text: Text to estimate
Returns:
Estimated token count
"""
return len(text.split()) * 1.3
def _trim_history(self) -> None:
"""
Trim history based on token count.
"""
system_messages = [
msg for msg in self.messages if msg['role'] == 'system'
]
other_messages = [
msg for msg in self.messages if msg['role'] != 'system'
]
total_tokens = sum(
self.estimate_tokens(msg['content'])
for msg in system_messages
)
trimmed_messages = []
for msg in reversed(other_messages):
msg_tokens = self.estimate_tokens(msg['content'])
if total_tokens + msg_tokens <= self.max_tokens:
trimmed_messages.insert(0, msg)
total_tokens += msg_tokens
else:
break
self.messages = system_messages + trimmed_messages
This enhanced conversation manager tracks estimated token usage and trims history to stay within limits. While the token estimation is approximate, it provides a reasonable safeguard against context overflow.
Network timeouts can occur when generating very long responses or when the system is under heavy load. The timeout parameter in our API calls provides a basic defense, but you might want more sophisticated timeout handling:
class TimeoutHandler:
"""
Manages timeouts with progressive retry logic.
"""
def __init__(
self,
initial_timeout: float = 60.0,
max_timeout: float = 300.0,
timeout_multiplier: float = 1.5
):
"""
Initialize timeout handler.
Args:
initial_timeout: Starting timeout in seconds
max_timeout: Maximum timeout to use
timeout_multiplier: Factor to increase timeout on retry
"""
self.initial_timeout = initial_timeout
self.max_timeout = max_timeout
self.timeout_multiplier = timeout_multiplier
def execute_with_progressive_timeout(
self,
func,
*args,
**kwargs
):
"""
Execute function with progressively increasing timeout.
Args:
func: Function to execute
*args: Positional arguments for function
**kwargs: Keyword arguments for function
Returns:
Function result
Raises:
TimeoutError: If all attempts timeout
"""
current_timeout = self.initial_timeout
last_error = None
while current_timeout <= self.max_timeout:
try:
kwargs['timeout'] = current_timeout
return func(*args, **kwargs)
except requests.exceptions.Timeout as e:
last_error = e
current_timeout *= self.timeout_multiplier
print(
f"Request timed out. Retrying with "
f"{current_timeout:.1f}s timeout..."
)
raise TimeoutError(
f"Request failed after trying timeouts up to "
f"{self.max_timeout}s"
) from last_error
This timeout handler starts with a reasonable timeout and increases it on each retry. This approach works well when you do not know in advance how long a request might take.
OPTIMIZING FOR PRODUCTION DEPLOYMENT
When deploying applications that use the Ollama API in production, several optimizations can improve performance and reliability. Connection pooling reduces the overhead of establishing new HTTP connections for each request:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedOllamaClient(OllamaClient):
"""
Ollama client optimized for production use.
"""
def __init__(
self,
base_url: str = "http://localhost:11434",
pool_connections: int = 10,
pool_maxsize: int = 10,
max_retries: int = 3
):
"""
Initialize optimized client.
Args:
base_url: Ollama API base URL
pool_connections: Number of connection pools to cache
pool_maxsize: Maximum connections per pool
max_retries: Maximum retry attempts
"""
super().__init__(base_url)
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "DELETE"]
)
adapter = HTTPAdapter(
pool_connections=pool_connections,
pool_maxsize=pool_maxsize,
max_retries=retry_strategy
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def _make_request(
self,
method: str,
url: str,
**kwargs
) -> requests.Response:
"""
Make HTTP request using the session.
Args:
method: HTTP method (GET, POST, DELETE)
url: Request URL
**kwargs: Additional arguments for request
Returns:
Response object
"""
return self.session.request(method, url, **kwargs)
This optimized client uses a session with connection pooling and automatic retry logic. The retry strategy handles transient failures automatically, making your application more resilient.
Response caching can significantly improve performance for repeated queries:
from functools import lru_cache
import hashlib
class CachingOllamaClient(OptimizedOllamaClient):
"""
Ollama client with response caching.
"""
def __init__(
self,
base_url: str = "http://localhost:11434",
cache_size: int = 128
):
"""
Initialize caching client.
Args:
base_url: Ollama API base URL
cache_size: Maximum number of cached responses
"""
super().__init__(base_url)
self.cache = {}
self.cache_size = cache_size
def _cache_key(
self,
model: str,
prompt: str,
options: Dict
) -> str:
"""
Generate cache key for request.
Args:
model: Model name
prompt: Prompt text
options: Generation options
Returns:
Cache key string
"""
key_data = f"{model}:{prompt}:{json.dumps(options, sort_keys=True)}"
return hashlib.sha256(key_data.encode()).hexdigest()
def generate_cached(
self,
model: str,
prompt: str,
options: Optional[Dict] = None
) -> Dict:
"""
Generate with caching support.
Args:
model: Model name
prompt: Prompt text
options: Generation options
Returns:
Response dictionary
"""
if options is None:
options = {}
cache_key = self._cache_key(model, prompt, options)
if cache_key in self.cache:
return self.cache[cache_key]
response = self.generate(
model=model,
prompt=prompt,
options=options
)
if len(self.cache) >= self.cache_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[cache_key] = response
return response
This caching client stores responses for identical requests, eliminating redundant API calls. The cache uses a simple FIFO eviction policy when it reaches capacity.
MONITORING AND LOGGING
Production applications need comprehensive monitoring and logging to diagnose issues and track performance. Here is a logging wrapper that tracks API usage:
import logging
from datetime import datetime
class MonitoredOllamaClient(OptimizedOllamaClient):
"""
Ollama client with comprehensive monitoring and logging.
"""
def __init__(
self,
base_url: str = "http://localhost:11434",
log_file: Optional[str] = None
):
"""
Initialize monitored client.
Args:
base_url: Ollama API base URL
log_file: Optional file path for detailed logs
"""
super().__init__(base_url)
self.logger = logging.getLogger('OllamaClient')
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_tokens_generated': 0,
'total_duration_ms': 0
}
def generate(
self,
model: str,
prompt: str,
**kwargs
) -> Dict:
"""
Generate with monitoring.
Args:
model: Model name
prompt: Prompt text
**kwargs: Additional generation parameters
Returns:
Response dictionary
"""
start_time = datetime.now()
self.metrics['total_requests'] += 1
self.logger.info(
f"Starting generation with model {model}, "
f"prompt length: {len(prompt)} chars"
)
try:
response = super().generate(
model=model,
prompt=prompt,
**kwargs
)
self.metrics['successful_requests'] += 1
if 'eval_count' in response:
self.metrics['total_tokens_generated'] += response['eval_count']
duration = (datetime.now() - start_time).total_seconds() * 1000
self.metrics['total_duration_ms'] += duration
self.logger.info(
f"Generation completed in {duration:.2f}ms, "
f"tokens: {response.get('eval_count', 'unknown')}"
)
return response
except Exception as e:
self.metrics['failed_requests'] += 1
self.logger.error(f"Generation failed: {str(e)}")
raise
def get_metrics(self) -> Dict:
"""
Get current metrics.
Returns:
Dictionary of metrics
"""
metrics = self.metrics.copy()
if metrics['successful_requests'] > 0:
metrics['avg_duration_ms'] = (
metrics['total_duration_ms'] /
metrics['successful_requests']
)
metrics['avg_tokens_per_request'] = (
metrics['total_tokens_generated'] /
metrics['successful_requests']
)
else:
metrics['avg_duration_ms'] = 0
metrics['avg_tokens_per_request'] = 0
return metrics
def print_metrics(self) -> None:
"""Print current metrics to console."""
metrics = self.get_metrics()
print("\nOllama Client Metrics:")
print("-" * 60)
print(f"Total requests: {metrics['total_requests']}")
print(f"Successful: {metrics['successful_requests']}")
print(f"Failed: {metrics['failed_requests']}")
print(f"Total tokens generated: {metrics['total_tokens_generated']}")
print(
f"Average duration: {metrics['avg_duration_ms']:.2f}ms"
)
print(
f"Average tokens per request: "
f"{metrics['avg_tokens_per_request']:.2f}"
)
print("-" * 60)
This monitored client tracks key metrics like request counts, token generation, and timing information. The metrics help you understand usage patterns and identify performance bottlenecks.
CONCLUSION AND BEST PRACTICES
Working with the Ollama API provides powerful capabilities for building AI-powered applications. The key to success lies in understanding the API's design, implementing robust error handling, and optimizing for your specific use case.
Always validate user input before sending it to the API. This prevents unnecessary API calls and provides better error messages to users. Use appropriate timeouts based on your expected response times, and implement retry logic for transient failures. Monitor your application's API usage to understand patterns and identify optimization opportunities.
Choose the right model for your task. Smaller models provide faster responses and lower resource usage, while larger models offer better quality for complex tasks. Use streaming when building interactive applications to improve perceived responsiveness. Implement conversation history management to prevent context window overflow while maintaining sufficient context for coherent responses.
Cache responses when appropriate to reduce redundant API calls. This is particularly effective for applications with repeated queries or common prompts. Use connection pooling and session management to reduce HTTP overhead. Implement comprehensive logging and monitoring to diagnose issues quickly in production.
The Ollama API's simplicity and power make it an excellent choice for local AI applications. By following the patterns and practices demonstrated in this article, you can build robust, efficient, and maintainable applications that leverage the full capabilities of local language models.
COMPLETE RUNNING EXAMPLE
Here is the complete, production-ready code that integrates all concepts discussed in this article. This code is ready to run and demonstrates a full-featured chat application with all advanced features.
#!/usr/bin/env python3
"""
Complete Ollama API Client Application
This module provides a production-ready implementation of an Ollama API client with comprehensive features including streaming, chat, model management, error handling, caching, monitoring, and a full command-line interface.
Usage:
python ollama_client.py --url http://localhost:11434 --model llama2
"""
import requests
import json
import sys
import argparse
import logging
import time
import hashlib
from typing import List, Dict, Optional, Generator
from datetime import datetime
from functools import wraps
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OllamaClient:
"""
Production-ready client for interacting with the Ollama API.
This client provides methods for both streaming and non-streaming
text generation, chat completions, and model management. It includes
comprehensive error handling and supports all Ollama API features.
"""
def __init__(self, base_url: str = "http://localhost:11434"):
"""
Initialize the Ollama client.
Args:
base_url: The base URL where Ollama is running
"""
self.base_url = base_url.rstrip('/')
self.api_version = "api"
self.session = self._create_session()
def _create_session(self) -> requests.Session:
"""
Create HTTP session with connection pooling and retry logic.
Returns:
Configured requests Session object
"""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "DELETE"]
)
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=retry_strategy
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _build_url(self, endpoint: str) -> str:
"""
Construct the full URL for an API endpoint.
Args:
endpoint: The API endpoint path
Returns:
Complete URL for the API request
"""
return f"{self.base_url}/{self.api_version}/{endpoint}"
def generate(
self,
model: str,
prompt: str,
stream: bool = False,
options: Optional[Dict] = None,
system: Optional[str] = None,
template: Optional[str] = None,
context: Optional[List[int]] = None,
raw: bool = False
) -> Dict:
"""
Generate text using the specified model and prompt.
Args:
model: Name of the model to use
prompt: The prompt text to generate from
stream: Whether to stream the response
options: Model-specific options
system: System message to set context
template: Custom prompt template
context: Context from previous generation
raw: Whether to use raw mode
Returns:
Dictionary containing the response and metadata
Raises:
Various exceptions for different failure modes
"""
url = self._build_url("generate")
payload = {
"model": model,
"prompt": prompt,
"stream": stream
}
if options:
payload["options"] = options
if system:
payload["system"] = system
if template:
payload["template"] = template
if context:
payload["context"] = context
if raw:
payload["raw"] = raw
try:
response = self.session.post(
url,
json=payload,
stream=stream,
timeout=300
)
response.raise_for_status()
if stream:
return self._handle_streaming_response(response)
else:
return response.json()
except requests.exceptions.Timeout:
raise TimeoutError(
f"Request to {url} timed out after 300 seconds"
)
except requests.exceptions.ConnectionError:
raise ConnectionError(
f"Could not connect to Ollama at {self.base_url}. "
"Ensure Ollama is running."
)
except requests.exceptions.HTTPError as e:
raise RuntimeError(
f"HTTP error occurred: {e.response.status_code} - "
f"{e.response.text}"
)
def _handle_streaming_response(
self,
response: requests.Response
) -> Generator[Dict, None, None]:
"""
Process a streaming response from the Ollama API.
Args:
response: The streaming HTTP response object
Yields:
Dictionary objects representing each chunk
Raises:
ValueError: If a chunk cannot be parsed
"""
for line in response.iter_lines():
if line:
try:
chunk = json.loads(line)
yield chunk
except json.JSONDecodeError as e:
raise ValueError(
f"Failed to parse streaming response: {line}"
) from e
def chat(
self,
model: str,
messages: List[Dict[str, str]],
stream: bool = False,
options: Optional[Dict] = None,
format: Optional[str] = None
) -> Dict:
"""
Conduct a chat conversation with the model.
Args:
model: Name of the model to use
messages: List of message dictionaries
stream: Whether to stream the response
options: Model-specific generation options
format: Response format
Returns:
Dictionary containing the response and metadata
Raises:
ValueError: If messages format is invalid
"""
url = self._build_url("chat")
for msg in messages:
if 'role' not in msg or 'content' not in msg:
raise ValueError(
"Each message must have 'role' and 'content' fields"
)
if msg['role'] not in ['system', 'user', 'assistant']:
raise ValueError(
f"Invalid role: {msg['role']}"
)
payload = {
"model": model,
"messages": messages,
"stream": stream
}
if options:
payload["options"] = options
if format:
payload["format"] = format
try:
response = self.session.post(
url,
json=payload,
stream=stream,
timeout=300
)
response.raise_for_status()
if stream:
return self._handle_streaming_response(response)
else:
return response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Chat request failed: {str(e)}"
) from e
def list_models(self) -> List[Dict]:
"""
List all models available on the Ollama instance.
Returns:
List of dictionaries containing model information
"""
url = self._build_url("tags")
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
result = response.json()
return result.get('models', [])
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to list models: {str(e)}"
) from e
def show_model_info(self, model: str) -> Dict:
"""
Get detailed information about a specific model.
Args:
model: Name of the model
Returns:
Dictionary containing model details
"""
url = self._build_url("show")
payload = {"name": model}
try:
response = self.session.post(url, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to get model info: {str(e)}"
) from e
def pull_model(
self,
model: str,
stream: bool = True
) -> Generator[Dict, None, None]:
"""
Pull a model from the Ollama registry.
Args:
model: Name of the model to pull
stream: Whether to stream download progress
Yields:
Progress updates as dictionaries
"""
url = self._build_url("pull")
payload = {"name": model, "stream": stream}
try:
response = self.session.post(
url,
json=payload,
stream=stream,
timeout=None
)
response.raise_for_status()
if stream:
for chunk in self._handle_streaming_response(response):
yield chunk
else:
yield response.json()
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to pull model: {str(e)}"
) from e
def delete_model(self, model: str) -> Dict:
"""
Delete a model from the local system.
Args:
model: Name of the model to delete
Returns:
Dictionary containing the deletion result
"""
url = self._build_url("delete")
payload = {"name": model}
try:
response = self.session.delete(url, json=payload, timeout=30)
response.raise_for_status()
return {"status": "success"}
except requests.exceptions.RequestException as e:
raise RuntimeError(
f"Failed to delete model: {str(e)}"
) from e
class GenerationOptions:
"""
Builder class for constructing generation options dictionaries.
"""
def __init__(self):
"""Initialize with default options."""
self.options = {}
def temperature(self, value: float) -> 'GenerationOptions':
"""
Set the temperature for generation.
Args:
value: Temperature value (0.0 to 2.0)
Returns:
Self for method chaining
"""
if not 0.0 <= value <= 2.0:
raise ValueError("Temperature must be between 0.0 and 2.0")
self.options['temperature'] = value
return self
def top_p(self, value: float) -> 'GenerationOptions':
"""
Set nucleus sampling threshold.
Args:
value: Top-p value (0.0 to 1.0)
Returns:
Self for method chaining
"""
if not 0.0 <= value <= 1.0:
raise ValueError("Top-p must be between 0.0 and 1.0")
self.options['top_p'] = value
return self
def top_k(self, value: int) -> 'GenerationOptions':
"""
Set top-k sampling parameter.
Args:
value: Top-k value (positive integer)
Returns:
Self for method chaining
"""
if value < 1:
raise ValueError("Top-k must be a positive integer")
self.options['top_k'] = value
return self
def repeat_penalty(self, value: float) -> 'GenerationOptions':
"""
Set repetition penalty.
Args:
value: Penalty value (1.0 or higher)
Returns:
Self for method chaining
"""
if value < 1.0:
raise ValueError("Repeat penalty must be 1.0 or higher")
self.options['repeat_penalty'] = value
return self
def num_predict(self, value: int) -> 'GenerationOptions':
"""
Set maximum number of tokens to generate.
Args:
value: Maximum tokens (positive integer)
Returns:
Self for method chaining
"""
if value < 1:
raise ValueError("num_predict must be a positive integer")
self.options['num_predict'] = value
return self
def stop_sequences(self, sequences: List[str]) -> 'GenerationOptions':
"""
Set stop sequences that halt generation.
Args:
sequences: List of strings that stop generation
Returns:
Self for method chaining
"""
self.options['stop'] = sequences
return self
def build(self) -> Dict:
"""
Build and return the options dictionary.
Returns:
Dictionary of generation options
"""
return self.options.copy()
class ConversationManager:
"""
Manages conversation history for multi-turn chat.
"""
def __init__(
self,
system_message: Optional[str] = None,
max_history: Optional[int] = None
):
"""
Initialize the conversation manager.
Args:
system_message: Optional system message
max_history: Maximum number of messages to retain
"""
self.messages: List[Dict[str, str]] = []
self.max_history = max_history
if system_message:
self.add_system_message(system_message)
def add_system_message(self, content: str) -> None:
"""
Add a system message to set conversation context.
Args:
content: The system message content
"""
self.messages.append({
"role": "system",
"content": content
})
def add_user_message(self, content: str) -> None:
"""
Add a user message to the conversation.
Args:
content: The user's message content
"""
self.messages.append({
"role": "user",
"content": content
})
self._trim_history()
def add_assistant_message(self, content: str) -> None:
"""
Add an assistant message to the conversation.
Args:
content: The assistant's message content
"""
self.messages.append({
"role": "assistant",
"content": content
})
self._trim_history()
def _trim_history(self) -> None:
"""
Trim conversation history to max_history length.
System messages are always preserved.
"""
if self.max_history is None:
return
system_messages = [
msg for msg in self.messages if msg['role'] == 'system'
]
other_messages = [
msg for msg in self.messages if msg['role'] != 'system'
]
if len(other_messages) > self.max_history:
other_messages = other_messages[-self.max_history:]
self.messages = system_messages + other_messages
def get_messages(self) -> List[Dict[str, str]]:
"""
Get the current conversation history.
Returns:
List of message dictionaries
"""
return self.messages.copy()
def clear(self, keep_system: bool = True) -> None:
"""
Clear the conversation history.
Args:
keep_system: Whether to preserve system messages
"""
if keep_system:
self.messages = [
msg for msg in self.messages if msg['role'] == 'system'
]
else:
self.messages = []
class OllamaChatApplication:
"""
Complete chat application using the Ollama API.
"""
def __init__(
self,
base_url: str = "http://localhost:11434",
default_model: str = "llama2"
):
"""
Initialize the chat application.
Args:
base_url: URL where Ollama is running
default_model: Default model to use for chat
"""
self.client = OllamaClient(base_url)
self.default_model = default_model
self.conversation = ConversationManager(max_history=20)
self.logger = self._setup_logger()
def _setup_logger(self) -> logging.Logger:
"""
Set up application logger.
Returns:
Configured logger instance
"""
logger = logging.getLogger('OllamaChat')
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
def select_model(self) -> str:
"""
Allow user to select from available models.
Returns:
Name of the selected model
"""
try:
models = self.client.list_models()
if not models:
print("No models found. Please pull a model first.")
print("Example: ollama pull llama2")
sys.exit(1)
print("\nAvailable models:")
for idx, model in enumerate(models, 1):
name = model.get('name', 'Unknown')
size = model.get('size', 0)
size_gb = size / (1024 ** 3)
print(f"{idx}. {name} ({size_gb:.2f} GB)")
while True:
try:
choice = input(
f"\nSelect model (1-{len(models)}) "
f"or press Enter for {self.default_model}: "
).strip()
if not choice:
return self.default_model
idx = int(choice) - 1
if 0 <= idx < len(models):
return models[idx]['name']
else:
print("Invalid selection. Please try again.")
except ValueError:
print("Please enter a number.")
except Exception as e:
self.logger.error(f"Error listing models: {e}")
print(f"Using default model: {self.default_model}")
return self.default_model
def set_system_message(self) -> None:
"""
Allow user to set a system message for the conversation.
"""
print("\nEnter system message (or press Enter to skip):")
system_msg = input("> ").strip()
if system_msg:
self.conversation.add_system_message(system_msg)
print("System message set.")
def configure_generation_options(self) -> Dict:
"""
Allow user to configure generation parameters.
Returns:
Dictionary of generation options
"""
print("\nConfigure generation options (press Enter for defaults):")
options = GenerationOptions()
temp_input = input("Temperature (0.0-2.0, default 0.8): ").strip()
if temp_input:
try:
options.temperature(float(temp_input))
except ValueError as e:
print(f"Invalid temperature: {e}. Using default.")
top_p_input = input("Top-p (0.0-1.0, default 0.9): ").strip()
if top_p_input:
try:
options.top_p(float(top_p_input))
except ValueError as e:
print(f"Invalid top-p: {e}. Using default.")
max_tokens_input = input("Max tokens (default unlimited): ").strip()
if max_tokens_input:
try:
options.num_predict(int(max_tokens_input))
except ValueError as e:
print(f"Invalid max tokens: {e}. Using default.")
return options.build()
def chat_loop(
self,
model: str,
options: Dict,
stream: bool = True
) -> None:
"""
Main chat interaction loop.
Args:
model: Name of the model to use
options: Generation options
stream: Whether to use streaming responses
"""
print(f"\nChatting with {model}")
print("Commands:")
print(" exit/quit - End the conversation")
print(" clear - Clear conversation history")
print(" history - View conversation history")
print("-" * 60)
while True:
try:
user_input = input("\nYou: ").strip()
if not user_input:
continue
if user_input.lower() in ['exit', 'quit']:
print("Goodbye!")
break
if user_input.lower() == 'clear':
self.conversation.clear(keep_system=True)
print("Conversation history cleared.")
continue
if user_input.lower() == 'history':
self._display_history()
continue
self.conversation.add_user_message(user_input)
print("\nAssistant: ", end='', flush=True)
if stream:
response_text = self._handle_streaming_chat(
model, options
)
else:
response_text = self._handle_non_streaming_chat(
model, options
)
self.conversation.add_assistant_message(response_text)
print()
except KeyboardInterrupt:
print("\n\nInterrupted. Type 'exit' to quit.")
except Exception as e:
self.logger.error(f"Error in chat loop: {e}")
print(f"\n\nError: {e}")
print("Please try again.")
def _handle_streaming_chat(
self,
model: str,
options: Dict
) -> str:
"""
Handle a streaming chat response.
Args:
model: Name of the model
options: Generation options
Returns:
Complete response text
"""
response_text = ""
try:
for chunk in self.client.chat(
model=model,
messages=self.conversation.get_messages(),
stream=True,
options=options
):
if 'message' in chunk:
content = chunk['message'].get('content', '')
response_text += content
print(content, end='', flush=True)
except Exception as e:
self.logger.error(f"Streaming error: {e}")
raise
return response_text
def _handle_non_streaming_chat(
self,
model: str,
options: Dict
) -> str:
"""
Handle a non-streaming chat response.
Args:
model: Name of the model
options: Generation options
Returns:
Complete response text
"""
try:
response = self.client.chat(
model=model,
messages=self.conversation.get_messages(),
stream=False,
options=options
)
response_text = response['message']['content']
print(response_text)
return response_text
except Exception as e:
self.logger.error(f"Chat error: {e}")
raise
def _display_history(self) -> None:
"""Display the conversation history."""
messages = self.conversation.get_messages()
if not messages:
print("No conversation history.")
return
print("\nConversation History:")
print("-" * 60)
for msg in messages:
role = msg['role'].capitalize()
content = msg['content']
print(f"\n{role}: {content}")
print("-" * 60)
def run(self) -> None:
"""
Run the chat application.
"""
print("=" * 60)
print("Ollama Chat Application")
print("=" * 60)
try:
model = self.select_model()
self.set_system_message()
options = self.configure_generation_options()
stream_input = input(
"\nUse streaming responses? (Y/n): "
).strip().lower()
stream = stream_input != 'n'
self.chat_loop(model, options, stream)
except Exception as e:
self.logger.error(f"Fatal error: {e}")
sys.exit(1)
def main():
"""
Main entry point for the application.
"""
parser = argparse.ArgumentParser(
description="Chat with local language models via Ollama"
)
parser.add_argument(
'--url',
default='http://localhost:11434',
help='Ollama API base URL'
)
parser.add_argument(7
'--model',
default='llama2',
help='Default model to use'
)
args = parser.parse_args()
app = OllamaChatApplication(
base_url=args.url,
default_model=args.model
)
app.run()
if __name__ == '__main__':
main()
This complete example provides a production-ready application that demonstrates all the concepts covered in this article. The code includes proper error handling, logging, configuration options, and a polished user interface. You can run it directly to start chatting with your local Ollama models, and you can use the individual classes as building blocks for your own applications.
No comments:
Post a Comment