- Build robust API client libraries with proper error handling
- Implement authentication and session management
- Handle rate limiting and retries automatically
- Create reusable API client patterns
Building API Clients and Error Handling
Introduction
Building robust API clients is both an art and a science. A good API client library should handle network failures gracefully, manage authentication seamlessly, respect rate limits, and provide clear error messages. It should be easy to use for simple cases but powerful enough for complex scenarios.
In this final lesson, we'll build production-ready API client libraries that can handle real-world challenges. You'll learn patterns for creating maintainable, reliable API clients that your applications can depend on.
Designing API Client Architecture
Base Client Class
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import logging
class APIClient:
"""Base API client with common functionality."""
def __init__(self, base_url, api_key=None, timeout=30, max_retries=3):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
# Configure logging
self.logger = logging.getLogger(self.__class__.__name__)
# Create session with retry strategy
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1,
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Set default headers
self.session.headers.update({
'User-Agent': f'{self.__class__.__name__}/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
if api_key:
self.session.headers.update({
'Authorization': f'Bearer {api_key}'
})
def _build_url(self, endpoint):
"""Build full URL from endpoint."""
if endpoint.startswith('http'):
return endpoint
return f"{self.base_url}/{endpoint.lstrip('/')}"
def _handle_response(self, response):
"""Handle API response and raise appropriate exceptions."""
try:
response.raise_for_status()
# Check rate limit headers
if 'X-RateLimit-Remaining' in response.headers:
remaining = int(response.headers['X-RateLimit-Remaining'])
if remaining < 10: # Warning threshold
self.logger.warning(f"Rate limit remaining: {remaining}")
return response.json() if response.content else None
except requests.exceptions.HTTPError:
self._handle_http_error(response)
except ValueError as e:
raise APIError(f"Invalid JSON response: {e}") from e
def _handle_http_error(self, response):
"""Handle specific HTTP errors."""
try:
error_data = response.json()
error_message = error_data.get('message', error_data.get('error', 'Unknown error'))
except ValueError:
error_message = response.text or 'Unknown error'
if response.status_code == 401:
raise AuthenticationError("Invalid or missing authentication")
elif response.status_code == 403:
raise PermissionError("Insufficient permissions")
elif response.status_code == 404:
raise NotFoundError(f"Resource not found: {response.url}")
elif response.status_code == 429:
retry_after = response.headers.get('Retry-After', '60')
raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after} seconds")
elif response.status_code >= 500:
raise ServerError(f"Server error: {error_message}")
else:
raise APIError(f"HTTP {response.status_code}: {error_message}")
def request(self, method, endpoint, **kwargs):
"""Make HTTP request with error handling."""
url = self._build_url(endpoint)
kwargs.setdefault('timeout', self.timeout)
self.logger.debug(f"{method} {url}")
try:
response = self.session.request(method, url, **kwargs)
return self._handle_response(response)
except requests.exceptions.Timeout:
raise TimeoutError("Request timed out")
except requests.exceptions.ConnectionError:
raise ConnectionError("Network connection failed")
except requests.exceptions.RequestException as e:
raise APIError(f"Request failed: {e}") from e
# Custom exceptions
class APIError(Exception):
"""Base API exception."""
pass
class AuthenticationError(APIError):
"""Authentication failed."""
pass
class PermissionError(APIError):
"""Insufficient permissions."""
pass
class NotFoundError(APIError):
"""Resource not found."""
pass
class RateLimitError(APIError):
"""Rate limit exceeded."""
pass
class ServerError(APIError):
"""Server-side error."""
pass
class TimeoutError(APIError):
"""Request timed out."""
pass
class ConnectionError(APIError):
"""Network connection error."""
pass
Rate Limiting with Token Bucket
import time
from collections import deque
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, requests_per_second=1):
self.requests_per_second = requests_per_second
self.tokens = requests_per_second
self.last_update = time.time()
self.max_tokens = requests_per_second * 10 # Burst capacity
def acquire(self):
"""Acquire a token, blocking if necessary."""
now = time.time()
time_passed = now - self.last_update
# Add tokens based on time passed
self.tokens = min(self.max_tokens,
self.tokens + time_passed * self.requests_per_second)
self.last_update = now
if self.tokens >= 1:
self.tokens -= 1
return True
else:
# Calculate wait time
wait_time = (1 - self.tokens) / self.requests_per_second
time.sleep(wait_time)
self.tokens = 0
self.last_update = time.time()
return True
class RateLimitedClient(APIClient):
"""API client with rate limiting."""
def __init__(self, *args, requests_per_second=1, **kwargs):
super().__init__(*args, **kwargs)
self.rate_limiter = RateLimiter(requests_per_second)
def request(self, method, endpoint, **kwargs):
"""Make rate-limited request."""
self.rate_limiter.acquire()
return super().request(method, endpoint, **kwargs)
Authentication Patterns
Multiple Authentication Methods
class AuthenticatedClient(APIClient):
"""API client with multiple authentication methods."""
def __init__(self, base_url, auth_config=None, **kwargs):
super().__init__(base_url, **kwargs)
self.auth_config = auth_config or {}
self._setup_authentication()
def _setup_authentication(self):
"""Setup authentication based on configuration."""
auth_type = self.auth_config.get('type')
if auth_type == 'bearer':
token = self.auth_config.get('token')
if token:
self.session.headers['Authorization'] = f'Bearer {token}'
elif auth_type == 'basic':
username = self.auth_config.get('username')
password = self.auth_config.get('password')
if username and password:
self.session.auth = (username, password)
elif auth_type == 'api_key':
key_name = self.auth_config.get('key_name', 'X-API-Key')
key_value = self.auth_config.get('key_value')
if key_value:
self.session.headers[key_name] = key_value
elif auth_type == 'oauth2':
self._setup_oauth2()
def _setup_oauth2(self):
"""Setup OAuth2 authentication."""
# This would typically involve getting tokens from an OAuth provider
# For simplicity, we'll assume we already have tokens
access_token = self.auth_config.get('access_token')
if access_token:
self.session.headers['Authorization'] = f'Bearer {access_token}'
def refresh_token(self):
"""Refresh OAuth2 token if needed."""
# Implementation would depend on the OAuth provider
# This is a simplified example
refresh_token = self.auth_config.get('refresh_token')
if refresh_token:
# Make token refresh request
# Update self.auth_config with new tokens
# Update session headers
pass
# Usage examples
clients = {
'bearer': AuthenticatedClient('https://api.example.com', {
'type': 'bearer',
'token': 'your_jwt_token'
}),
'basic': AuthenticatedClient('https://api.example.com', {
'type': 'basic',
'username': 'user',
'password': 'pass'
}),
'api_key': AuthenticatedClient('https://api.example.com', {
'type': 'api_key',
'key_value': 'your_api_key'
})
}
Token Management
import json
import os
from datetime import datetime, timedelta
class TokenManager:
"""Manage API tokens with automatic refresh."""
def __init__(self, token_file='tokens.json'):
self.token_file = token_file
self.tokens = self._load_tokens()
def _load_tokens(self):
"""Load tokens from file."""
if os.path.exists(self.token_file):
with open(self.token_file, 'r') as f:
return json.load(f)
return {}
def _save_tokens(self):
"""Save tokens to file."""
with open(self.token_file, 'w') as f:
json.dump(self.tokens, f, indent=2)
def get_token(self, service_name):
"""Get valid token for service."""
if service_name not in self.tokens:
return None
token_data = self.tokens[service_name]
expires_at = datetime.fromisoformat(token_data['expires_at'])
if datetime.now() >= expires_at:
# Token expired, try to refresh
return self._refresh_token(service_name)
return token_data['access_token']
def _refresh_token(self, service_name):
"""Refresh expired token."""
token_data = self.tokens[service_name]
refresh_token = token_data.get('refresh_token')
if not refresh_token:
return None
# This would make actual refresh request to the service
# For demonstration, we'll simulate it
try:
# refresh_response = requests.post(f'https://auth.{service_name}.com/refresh',
# json={'refresh_token': refresh_token})
# new_tokens = refresh_response.json()
# Simulate successful refresh
new_tokens = {
'access_token': 'new_access_token_' + str(datetime.now().timestamp()),
'refresh_token': refresh_token,
'expires_at': (datetime.now() + timedelta(hours=1)).isoformat()
}
self.tokens[service_name] = new_tokens
self._save_tokens()
return new_tokens['access_token']
except Exception as e:
self.logger.error(f"Token refresh failed for {service_name}: {e}")
return None
def set_token(self, service_name, access_token, refresh_token=None, expires_in=3600):
"""Set token for service."""
expires_at = datetime.now() + timedelta(seconds=expires_in)
self.tokens[service_name] = {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_at': expires_at.isoformat()
}
self._save_tokens()
class TokenAuthenticatedClient(AuthenticatedClient):
"""API client with automatic token management."""
def __init__(self, base_url, service_name, token_manager=None, **kwargs):
self.service_name = service_name
self.token_manager = token_manager or TokenManager()
# Get initial token
token = self.token_manager.get_token(service_name)
auth_config = {
'type': 'bearer',
'token': token
}
super().__init__(base_url, auth_config=auth_config, **kwargs)
def request(self, method, endpoint, **kwargs):
"""Make request with automatic token refresh."""
try:
return super().request(method, endpoint, **kwargs)
except AuthenticationError:
# Try to refresh token
new_token = self.token_manager.get_token(self.service_name)
if new_token and new_token != self.auth_config.get('token'):
self.auth_config['token'] = new_token
self.session.headers['Authorization'] = f'Bearer {new_token}'
# Retry the request
return super().request(method, endpoint, **kwargs)
else:
raise
Configuration Management
Environment-Based Configuration
import os
from pathlib import Path
class APIConfig:
"""Configuration manager for API clients."""
def __init__(self, config_file=None, env_prefix='API_'):
self.config_file = config_file or self._find_config_file()
self.env_prefix = env_prefix
self.config = self._load_config()
def _find_config_file(self):
"""Find configuration file in standard locations."""
search_paths = [
Path.cwd() / 'api_config.json',
Path.home() / '.api_config.json',
Path.home() / '.config' / 'api_config.json'
]
for path in search_paths:
if path.exists():
return path
return None
def _load_config(self):
"""Load configuration from file and environment."""
config = {}
# Load from file
if self.config_file and self.config_file.exists():
with open(self.config_file, 'r') as f:
config = json.load(f)
# Override with environment variables
for key, value in os.environ.items():
if key.startswith(self.env_prefix):
config_key = key[len(self.env_prefix):].lower()
config[config_key] = value
return config
def get(self, key, default=None):
"""Get configuration value."""
return self.config.get(key, default)
def get_service_config(self, service_name):
"""Get configuration for specific service."""
return self.config.get(service_name, {})
class ConfigurableClient(APIClient):
"""API client that uses configuration management."""
def __init__(self, service_name, config=None, **kwargs):
self.service_name = service_name
self.api_config = config or APIConfig()
service_config = self.api_config.get_service_config(service_name)
base_url = service_config.get('base_url')
api_key = service_config.get('api_key')
timeout = service_config.get('timeout', 30)
if not base_url:
raise ValueError(f"No base_url configured for service {service_name}")
super().__init__(base_url, api_key=api_key, timeout=timeout, **kwargs)
# Configuration file example: api_config.json
"""
{
"github": {
"base_url": "https://api.github.com",
"timeout": 10
},
"twitter": {
"base_url": "https://api.twitter.com/2",
"api_key": "your_twitter_bearer_token",
"timeout": 15
}
}
"""
# Environment variables
# export API_GITHUB_TIMEOUT=20
# export API_TWITTER_API_KEY=env_override_token
Comprehensive Error Handling
Error Recovery Strategies
class ResilientClient(APIClient):
"""API client with comprehensive error recovery."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.retry_delays = [1, 2, 4, 8, 16] # Exponential backoff
def request_with_recovery(self, method, endpoint, max_attempts=3, **kwargs):
"""Make request with error recovery."""
last_exception = None
for attempt in range(max_attempts):
try:
return self.request(method, endpoint, **kwargs)
except (TimeoutError, ConnectionError, ServerError) as e:
last_exception = e
if attempt < max_attempts - 1:
delay = self.retry_delays[min(attempt, len(self.retry_delays) - 1)]
self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
else:
self.logger.error(f"All {max_attempts} attempts failed")
raise last_exception
except RateLimitError as e:
# Handle rate limiting specifically
retry_after = self._parse_retry_after(e)
self.logger.warning(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
# Retry once after waiting
if attempt == 0: # Only retry once for rate limits
continue
else:
raise e
except AuthenticationError as e:
# Don't retry auth errors
raise e
def _parse_retry_after(self, rate_limit_error):
"""Parse retry-after header from rate limit error."""
# This would extract retry-after from the error or response
# For simplicity, return a default
return 60
def batch_request(self, requests_list, max_concurrent=3):
"""Make multiple requests with concurrency control."""
import concurrent.futures
import threading
results = {}
errors = {}
semaphore = threading.Semaphore(max_concurrent)
def make_request(req):
with semaphore:
method, endpoint, kwargs = req
try:
result = self.request(method, endpoint, **kwargs)
results[endpoint] = result
except Exception as e:
errors[endpoint] = e
with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
futures = [executor.submit(make_request, req) for req in requests_list]
concurrent.futures.wait(futures)
return results, errors
# Usage
client = ResilientClient('https://jsonplaceholder.typicode.com')
# Single request with recovery
try:
posts = client.request_with_recovery('GET', '/posts', max_attempts=3)
print(f"Retrieved {len(posts)} posts")
except APIError as e:
print(f"Request failed after retries: {e}")
# Batch requests
requests_list = [
('GET', '/posts/1', {}),
('GET', '/posts/2', {}),
('GET', '/users/1', {}),
]
results, errors = client.batch_request(requests_list, max_concurrent=2)
print(f"Successful requests: {len(results)}")
print(f"Failed requests: {len(errors)}")
Logging and Monitoring
Structured Logging
import logging
import json
from datetime import datetime
class LoggedClient(APIClient):
"""API client with structured logging."""
def __init__(self, *args, log_requests=True, log_responses=True, **kwargs):
super().__init__(*args, **kwargs)
self.log_requests = log_requests
self.log_responses = log_responses
# Setup structured logging
self.logger = logging.getLogger(f'{self.__class__.__name__}.{self.base_url}')
# Add custom formatter for structured logs
formatter = logging.Formatter(
json.dumps({
'timestamp': '%(asctime)s',
'level': '%(levelname)s',
'service': '%(name)s',
'message': '%(message)s',
'request_id': '%(request_id)s'
})
)
# This would be configured in a real application
# handler = logging.StreamHandler()
# handler.setFormatter(formatter)
# self.logger.addHandler(handler)
# self.logger.setLevel(logging.INFO)
def request(self, method, endpoint, **kwargs):
"""Make request with logging."""
request_id = f"{datetime.now().timestamp()}_{id(self)}"
if self.log_requests:
self.logger.info("API Request", extra={
'request_id': request_id,
'method': method,
'endpoint': endpoint,
'params': kwargs.get('params'),
'data_size': len(json.dumps(kwargs.get('json', {})))
})
start_time = time.time()
try:
result = super().request(method, endpoint, **kwargs)
duration = time.time() - start_time
if self.log_responses:
self.logger.info("API Response", extra={
'request_id': request_id,
'status': 'success',
'duration': f"{duration:.3f}s",
'response_size': len(json.dumps(result)) if result else 0
})
return result
except Exception as e:
duration = time.time() - start_time
self.logger.error("API Error", extra={
'request_id': request_id,
'status': 'error',
'duration': f"{duration:.3f}s",
'error_type': type(e).__name__,
'error_message': str(e)
})
raise
# Usage with monitoring
client = LoggedClient('https://jsonplaceholder.typicode.com')
# This will generate structured logs
try:
posts = client.request('GET', '/posts', params={'_limit': 5})
print(f"Retrieved {len(posts)} posts")
except APIError as e:
print(f"Request failed: {e}")
Testing API Clients
Mocking for Tests
import unittest
from unittest.mock import Mock, patch
class TestAPIClient(unittest.TestCase):
"""Test cases for API client."""
def setUp(self):
self.client = APIClient('https://api.example.com')
@patch('requests.Session.request')
def test_successful_request(self, mock_request):
"""Test successful API request."""
# Mock the response
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {'data': 'test'}
mock_request.return_value = mock_response
result = self.client.request('GET', '/test')
self.assertEqual(result, {'data': 'test'})
mock_request.assert_called_once()
@patch('requests.Session.request')
def test_http_error_handling(self, mock_request):
"""Test HTTP error handling."""
from requests.exceptions import HTTPError
mock_response = Mock()
mock_response.raise_for_status.side_effect = HTTPError("404 Not Found")
mock_response.status_code = 404
mock_response.json.return_value = {'error': 'Not found'}
mock_request.return_value = mock_response
with self.assertRaises(NotFoundError):
self.client.request('GET', '/missing')
def test_url_building(self):
"""Test URL building."""
self.assertEqual(
self.client._build_url('/users'),
'https://api.example.com/users'
)
self.assertEqual(
self.client._build_url('https://other.com/api'),
'https://other.com/api'
)
if __name__ == '__main__':
unittest.main()
Production Deployment Considerations
Health Checks
class HealthCheckClient(APIClient):
"""API client with health check capabilities."""
def health_check(self):
"""Perform health check on the API."""
try:
# Try a simple endpoint
response = self.session.get(f"{self.base_url}/health", timeout=5)
return {
'status': 'healthy' if response.status_code == 200 else 'unhealthy',
'response_time': response.elapsed.total_seconds(),
'status_code': response.status_code
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e)
}
def wait_for_healthy(self, timeout=60, interval=5):
"""Wait for API to become healthy."""
import time
start_time = time.time()
while time.time() - start_time < timeout:
health = self.health_check()
if health['status'] == 'healthy':
return True
time.sleep(interval)
return False
# Usage in deployment
client = HealthCheckClient('https://api.example.com')
if client.wait_for_healthy(timeout=120):
print("API is healthy, proceeding with application startup")
else:
print("API failed health check, aborting startup")
exit(1)
Metrics and Monitoring
class MonitoredClient(APIClient):
"""API client with metrics collection."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
'requests_total': 0,
'requests_success': 0,
'requests_error': 0,
'response_times': [],
'errors_by_type': {}
}
def request(self, method, endpoint, **kwargs):
"""Make request with metrics collection."""
self.metrics['requests_total'] += 1
start_time = time.time()
try:
result = super().request(method, endpoint, **kwargs)
self.metrics['requests_success'] += 1
response_time = time.time() - start_time
self.metrics['response_times'].append(response_time)
return result
except Exception as e:
self.metrics['requests_error'] += 1
error_type = type(e).__name__
self.metrics['errors_by_type'][error_type] = \
self.metrics['errors_by_type'].get(error_type, 0) + 1
raise
def get_metrics(self):
"""Get current metrics."""
metrics = self.metrics.copy()
if metrics['response_times']:
metrics['avg_response_time'] = sum(metrics['response_times']) / len(metrics['response_times'])
metrics['max_response_time'] = max(metrics['response_times'])
metrics['min_response_time'] = min(metrics['response_times'])
return metrics
def reset_metrics(self):
"""Reset metrics counters."""
self.metrics = {
'requests_total': 0,
'requests_success': 0,
'requests_error': 0,
'response_times': [],
'errors_by_type': {}
}
# Usage
client = MonitoredClient('https://jsonplaceholder.typicode.com')
# Make some requests
client.request('GET', '/posts/1')
client.request('GET', '/users/1')
# Check metrics
metrics = client.get_metrics()
print("API Metrics:")
print(f"Total requests: {metrics['requests_total']}")
print(f"Success rate: {metrics['requests_success']/metrics['requests_total']:.1%}")
if 'avg_response_time' in metrics:
print(f"Average response time: {metrics['avg_response_time']:.3f}s")
Key Takeaways
- Base client classes provide common functionality and error handling
- Authentication patterns support multiple auth methods (Bearer, Basic, API keys, OAuth)
- Rate limiting prevents API quota exhaustion with token buckets or header checking
- Retry strategies handle transient failures with exponential backoff
- Custom exceptions provide specific error types for different failure scenarios
- Configuration management enables easy environment switching
- Logging and monitoring track API usage and performance
- Testing ensures reliability with mocked responses and error scenarios
- Health checks verify API availability before application startup
Congratulations! You've now built comprehensive API client libraries that can handle production workloads. These patterns and techniques will serve you well when integrating with any web API, from simple REST services to complex enterprise systems. Remember to always handle errors gracefully, respect rate limits, and monitor your API usage for optimal performance and reliability.
