foreach-ui logo
codeLanguages
account_treeDSA

Quick Actions

quizlock Random Quiz
trending_uplock Progress
  • 1
  • 2
  • 3
  • 4
  • quiz
Python
  • Build robust API client libraries with proper error handling
  • Implement authentication and session management
  • Handle rate limiting and retries automatically
  • Create reusable API client patterns

Building API Clients and Error Handling

Introduction

Building robust API clients is both an art and a science. A good API client library should handle network failures gracefully, manage authentication seamlessly, respect rate limits, and provide clear error messages. It should be easy to use for simple cases but powerful enough for complex scenarios.

In this final lesson, we'll build production-ready API client libraries that can handle real-world challenges. You'll learn patterns for creating maintainable, reliable API clients that your applications can depend on.


Designing API Client Architecture

Base Client Class

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
import logging

class APIClient:
    """Base API client with common functionality."""
    
    def __init__(self, base_url, api_key=None, timeout=30, max_retries=3):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.timeout = timeout
        
        # Configure logging
        self.logger = logging.getLogger(self.__class__.__name__)
        
        # Create session with retry strategy
        self.session = requests.Session()
        
        retry_strategy = Retry(
            total=max_retries,
            status_forcelist=[429, 500, 502, 503, 504],
            backoff_factor=1,
            raise_on_status=False
        )
        
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
        
        # Set default headers
        self.session.headers.update({
            'User-Agent': f'{self.__class__.__name__}/1.0',
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })
        
        if api_key:
            self.session.headers.update({
                'Authorization': f'Bearer {api_key}'
            })
    
    def _build_url(self, endpoint):
        """Build full URL from endpoint."""
        if endpoint.startswith('http'):
            return endpoint
        return f"{self.base_url}/{endpoint.lstrip('/')}"
    
    def _handle_response(self, response):
        """Handle API response and raise appropriate exceptions."""
        try:
            response.raise_for_status()
            
            # Check rate limit headers
            if 'X-RateLimit-Remaining' in response.headers:
                remaining = int(response.headers['X-RateLimit-Remaining'])
                if remaining < 10:  # Warning threshold
                    self.logger.warning(f"Rate limit remaining: {remaining}")
            
            return response.json() if response.content else None
            
        except requests.exceptions.HTTPError:
            self._handle_http_error(response)
        except ValueError as e:
            raise APIError(f"Invalid JSON response: {e}") from e
    
    def _handle_http_error(self, response):
        """Handle specific HTTP errors."""
        try:
            error_data = response.json()
            error_message = error_data.get('message', error_data.get('error', 'Unknown error'))
        except ValueError:
            error_message = response.text or 'Unknown error'
        
        if response.status_code == 401:
            raise AuthenticationError("Invalid or missing authentication")
        elif response.status_code == 403:
            raise PermissionError("Insufficient permissions")
        elif response.status_code == 404:
            raise NotFoundError(f"Resource not found: {response.url}")
        elif response.status_code == 429:
            retry_after = response.headers.get('Retry-After', '60')
            raise RateLimitError(f"Rate limit exceeded. Retry after {retry_after} seconds")
        elif response.status_code >= 500:
            raise ServerError(f"Server error: {error_message}")
        else:
            raise APIError(f"HTTP {response.status_code}: {error_message}")
    
    def request(self, method, endpoint, **kwargs):
        """Make HTTP request with error handling."""
        url = self._build_url(endpoint)
        kwargs.setdefault('timeout', self.timeout)
        
        self.logger.debug(f"{method} {url}")
        
        try:
            response = self.session.request(method, url, **kwargs)
            return self._handle_response(response)
        except requests.exceptions.Timeout:
            raise TimeoutError("Request timed out")
        except requests.exceptions.ConnectionError:
            raise ConnectionError("Network connection failed")
        except requests.exceptions.RequestException as e:
            raise APIError(f"Request failed: {e}") from e

# Custom exceptions
class APIError(Exception):
    """Base API exception."""
    pass

class AuthenticationError(APIError):
    """Authentication failed."""
    pass

class PermissionError(APIError):
    """Insufficient permissions."""
    pass

class NotFoundError(APIError):
    """Resource not found."""
    pass

class RateLimitError(APIError):
    """Rate limit exceeded."""
    pass

class ServerError(APIError):
    """Server-side error."""
    pass

class TimeoutError(APIError):
    """Request timed out."""
    pass

class ConnectionError(APIError):
    """Network connection error."""
    pass

Rate Limiting with Token Bucket

import time
from collections import deque

class RateLimiter:
    """Token bucket rate limiter."""
    
    def __init__(self, requests_per_second=1):
        self.requests_per_second = requests_per_second
        self.tokens = requests_per_second
        self.last_update = time.time()
        self.max_tokens = requests_per_second * 10  # Burst capacity
    
    def acquire(self):
        """Acquire a token, blocking if necessary."""
        now = time.time()
        time_passed = now - self.last_update
        
        # Add tokens based on time passed
        self.tokens = min(self.max_tokens, 
                         self.tokens + time_passed * self.requests_per_second)
        
        self.last_update = now
        
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        else:
            # Calculate wait time
            wait_time = (1 - self.tokens) / self.requests_per_second
            time.sleep(wait_time)
            self.tokens = 0
            self.last_update = time.time()
            return True

class RateLimitedClient(APIClient):
    """API client with rate limiting."""
    
    def __init__(self, *args, requests_per_second=1, **kwargs):
        super().__init__(*args, **kwargs)
        self.rate_limiter = RateLimiter(requests_per_second)
    
    def request(self, method, endpoint, **kwargs):
        """Make rate-limited request."""
        self.rate_limiter.acquire()
        return super().request(method, endpoint, **kwargs)

Authentication Patterns

Multiple Authentication Methods

class AuthenticatedClient(APIClient):
    """API client with multiple authentication methods."""
    
    def __init__(self, base_url, auth_config=None, **kwargs):
        super().__init__(base_url, **kwargs)
        self.auth_config = auth_config or {}
        self._setup_authentication()
    
    def _setup_authentication(self):
        """Setup authentication based on configuration."""
        auth_type = self.auth_config.get('type')
        
        if auth_type == 'bearer':
            token = self.auth_config.get('token')
            if token:
                self.session.headers['Authorization'] = f'Bearer {token}'
        
        elif auth_type == 'basic':
            username = self.auth_config.get('username')
            password = self.auth_config.get('password')
            if username and password:
                self.session.auth = (username, password)
        
        elif auth_type == 'api_key':
            key_name = self.auth_config.get('key_name', 'X-API-Key')
            key_value = self.auth_config.get('key_value')
            if key_value:
                self.session.headers[key_name] = key_value
        
        elif auth_type == 'oauth2':
            self._setup_oauth2()
    
    def _setup_oauth2(self):
        """Setup OAuth2 authentication."""
        # This would typically involve getting tokens from an OAuth provider
        # For simplicity, we'll assume we already have tokens
        access_token = self.auth_config.get('access_token')
        if access_token:
            self.session.headers['Authorization'] = f'Bearer {access_token}'
    
    def refresh_token(self):
        """Refresh OAuth2 token if needed."""
        # Implementation would depend on the OAuth provider
        # This is a simplified example
        refresh_token = self.auth_config.get('refresh_token')
        if refresh_token:
            # Make token refresh request
            # Update self.auth_config with new tokens
            # Update session headers
            pass

# Usage examples
clients = {
    'bearer': AuthenticatedClient('https://api.example.com', {
        'type': 'bearer',
        'token': 'your_jwt_token'
    }),
    
    'basic': AuthenticatedClient('https://api.example.com', {
        'type': 'basic',
        'username': 'user',
        'password': 'pass'
    }),
    
    'api_key': AuthenticatedClient('https://api.example.com', {
        'type': 'api_key',
        'key_value': 'your_api_key'
    })
}

Token Management

import json
import os
from datetime import datetime, timedelta

class TokenManager:
    """Manage API tokens with automatic refresh."""
    
    def __init__(self, token_file='tokens.json'):
        self.token_file = token_file
        self.tokens = self._load_tokens()
    
    def _load_tokens(self):
        """Load tokens from file."""
        if os.path.exists(self.token_file):
            with open(self.token_file, 'r') as f:
                return json.load(f)
        return {}
    
    def _save_tokens(self):
        """Save tokens to file."""
        with open(self.token_file, 'w') as f:
            json.dump(self.tokens, f, indent=2)
    
    def get_token(self, service_name):
        """Get valid token for service."""
        if service_name not in self.tokens:
            return None
        
        token_data = self.tokens[service_name]
        expires_at = datetime.fromisoformat(token_data['expires_at'])
        
        if datetime.now() >= expires_at:
            # Token expired, try to refresh
            return self._refresh_token(service_name)
        
        return token_data['access_token']
    
    def _refresh_token(self, service_name):
        """Refresh expired token."""
        token_data = self.tokens[service_name]
        refresh_token = token_data.get('refresh_token')
        
        if not refresh_token:
            return None
        
        # This would make actual refresh request to the service
        # For demonstration, we'll simulate it
        try:
            # refresh_response = requests.post(f'https://auth.{service_name}.com/refresh', 
            #                                json={'refresh_token': refresh_token})
            # new_tokens = refresh_response.json()
            
            # Simulate successful refresh
            new_tokens = {
                'access_token': 'new_access_token_' + str(datetime.now().timestamp()),
                'refresh_token': refresh_token,
                'expires_at': (datetime.now() + timedelta(hours=1)).isoformat()
            }
            
            self.tokens[service_name] = new_tokens
            self._save_tokens()
            
            return new_tokens['access_token']
            
        except Exception as e:
            self.logger.error(f"Token refresh failed for {service_name}: {e}")
            return None
    
    def set_token(self, service_name, access_token, refresh_token=None, expires_in=3600):
        """Set token for service."""
        expires_at = datetime.now() + timedelta(seconds=expires_in)
        
        self.tokens[service_name] = {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'expires_at': expires_at.isoformat()
        }
        
        self._save_tokens()

class TokenAuthenticatedClient(AuthenticatedClient):
    """API client with automatic token management."""
    
    def __init__(self, base_url, service_name, token_manager=None, **kwargs):
        self.service_name = service_name
        self.token_manager = token_manager or TokenManager()
        
        # Get initial token
        token = self.token_manager.get_token(service_name)
        
        auth_config = {
            'type': 'bearer',
            'token': token
        }
        
        super().__init__(base_url, auth_config=auth_config, **kwargs)
    
    def request(self, method, endpoint, **kwargs):
        """Make request with automatic token refresh."""
        try:
            return super().request(method, endpoint, **kwargs)
        except AuthenticationError:
            # Try to refresh token
            new_token = self.token_manager.get_token(self.service_name)
            if new_token and new_token != self.auth_config.get('token'):
                self.auth_config['token'] = new_token
                self.session.headers['Authorization'] = f'Bearer {new_token}'
                
                # Retry the request
                return super().request(method, endpoint, **kwargs)
            else:
                raise

Configuration Management

Environment-Based Configuration

import os
from pathlib import Path

class APIConfig:
    """Configuration manager for API clients."""
    
    def __init__(self, config_file=None, env_prefix='API_'):
        self.config_file = config_file or self._find_config_file()
        self.env_prefix = env_prefix
        self.config = self._load_config()
    
    def _find_config_file(self):
        """Find configuration file in standard locations."""
        search_paths = [
            Path.cwd() / 'api_config.json',
            Path.home() / '.api_config.json',
            Path.home() / '.config' / 'api_config.json'
        ]
        
        for path in search_paths:
            if path.exists():
                return path
        
        return None
    
    def _load_config(self):
        """Load configuration from file and environment."""
        config = {}
        
        # Load from file
        if self.config_file and self.config_file.exists():
            with open(self.config_file, 'r') as f:
                config = json.load(f)
        
        # Override with environment variables
        for key, value in os.environ.items():
            if key.startswith(self.env_prefix):
                config_key = key[len(self.env_prefix):].lower()
                config[config_key] = value
        
        return config
    
    def get(self, key, default=None):
        """Get configuration value."""
        return self.config.get(key, default)
    
    def get_service_config(self, service_name):
        """Get configuration for specific service."""
        return self.config.get(service_name, {})

class ConfigurableClient(APIClient):
    """API client that uses configuration management."""
    
    def __init__(self, service_name, config=None, **kwargs):
        self.service_name = service_name
        self.api_config = config or APIConfig()
        
        service_config = self.api_config.get_service_config(service_name)
        
        base_url = service_config.get('base_url')
        api_key = service_config.get('api_key')
        timeout = service_config.get('timeout', 30)
        
        if not base_url:
            raise ValueError(f"No base_url configured for service {service_name}")
        
        super().__init__(base_url, api_key=api_key, timeout=timeout, **kwargs)

# Configuration file example: api_config.json
"""
{
  "github": {
    "base_url": "https://api.github.com",
    "timeout": 10
  },
  "twitter": {
    "base_url": "https://api.twitter.com/2",
    "api_key": "your_twitter_bearer_token",
    "timeout": 15
  }
}
"""

# Environment variables
# export API_GITHUB_TIMEOUT=20
# export API_TWITTER_API_KEY=env_override_token

Comprehensive Error Handling

Error Recovery Strategies

class ResilientClient(APIClient):
    """API client with comprehensive error recovery."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.retry_delays = [1, 2, 4, 8, 16]  # Exponential backoff
    
    def request_with_recovery(self, method, endpoint, max_attempts=3, **kwargs):
        """Make request with error recovery."""
        last_exception = None
        
        for attempt in range(max_attempts):
            try:
                return self.request(method, endpoint, **kwargs)
            except (TimeoutError, ConnectionError, ServerError) as e:
                last_exception = e
                
                if attempt < max_attempts - 1:
                    delay = self.retry_delays[min(attempt, len(self.retry_delays) - 1)]
                    self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                    time.sleep(delay)
                else:
                    self.logger.error(f"All {max_attempts} attempts failed")
                    raise last_exception
            except RateLimitError as e:
                # Handle rate limiting specifically
                retry_after = self._parse_retry_after(e)
                self.logger.warning(f"Rate limited. Waiting {retry_after}s...")
                time.sleep(retry_after)
                
                # Retry once after waiting
                if attempt == 0:  # Only retry once for rate limits
                    continue
                else:
                    raise e
            except AuthenticationError as e:
                # Don't retry auth errors
                raise e
    
    def _parse_retry_after(self, rate_limit_error):
        """Parse retry-after header from rate limit error."""
        # This would extract retry-after from the error or response
        # For simplicity, return a default
        return 60
    
    def batch_request(self, requests_list, max_concurrent=3):
        """Make multiple requests with concurrency control."""
        import concurrent.futures
        import threading
        
        results = {}
        errors = {}
        semaphore = threading.Semaphore(max_concurrent)
        
        def make_request(req):
            with semaphore:
                method, endpoint, kwargs = req
                try:
                    result = self.request(method, endpoint, **kwargs)
                    results[endpoint] = result
                except Exception as e:
                    errors[endpoint] = e
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent) as executor:
            futures = [executor.submit(make_request, req) for req in requests_list]
            concurrent.futures.wait(futures)
        
        return results, errors

# Usage
client = ResilientClient('https://jsonplaceholder.typicode.com')

# Single request with recovery
try:
    posts = client.request_with_recovery('GET', '/posts', max_attempts=3)
    print(f"Retrieved {len(posts)} posts")
except APIError as e:
    print(f"Request failed after retries: {e}")

# Batch requests
requests_list = [
    ('GET', '/posts/1', {}),
    ('GET', '/posts/2', {}),
    ('GET', '/users/1', {}),
]

results, errors = client.batch_request(requests_list, max_concurrent=2)
print(f"Successful requests: {len(results)}")
print(f"Failed requests: {len(errors)}")

Logging and Monitoring

Structured Logging

import logging
import json
from datetime import datetime

class LoggedClient(APIClient):
    """API client with structured logging."""
    
    def __init__(self, *args, log_requests=True, log_responses=True, **kwargs):
        super().__init__(*args, **kwargs)
        self.log_requests = log_requests
        self.log_responses = log_responses
        
        # Setup structured logging
        self.logger = logging.getLogger(f'{self.__class__.__name__}.{self.base_url}')
        
        # Add custom formatter for structured logs
        formatter = logging.Formatter(
            json.dumps({
                'timestamp': '%(asctime)s',
                'level': '%(levelname)s',
                'service': '%(name)s',
                'message': '%(message)s',
                'request_id': '%(request_id)s'
            })
        )
        
        # This would be configured in a real application
        # handler = logging.StreamHandler()
        # handler.setFormatter(formatter)
        # self.logger.addHandler(handler)
        # self.logger.setLevel(logging.INFO)
    
    def request(self, method, endpoint, **kwargs):
        """Make request with logging."""
        request_id = f"{datetime.now().timestamp()}_{id(self)}"
        
        if self.log_requests:
            self.logger.info("API Request", extra={
                'request_id': request_id,
                'method': method,
                'endpoint': endpoint,
                'params': kwargs.get('params'),
                'data_size': len(json.dumps(kwargs.get('json', {})))
            })
        
        start_time = time.time()
        
        try:
            result = super().request(method, endpoint, **kwargs)
            
            duration = time.time() - start_time
            
            if self.log_responses:
                self.logger.info("API Response", extra={
                    'request_id': request_id,
                    'status': 'success',
                    'duration': f"{duration:.3f}s",
                    'response_size': len(json.dumps(result)) if result else 0
                })
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            
            self.logger.error("API Error", extra={
                'request_id': request_id,
                'status': 'error',
                'duration': f"{duration:.3f}s",
                'error_type': type(e).__name__,
                'error_message': str(e)
            })
            
            raise

# Usage with monitoring
client = LoggedClient('https://jsonplaceholder.typicode.com')

# This will generate structured logs
try:
    posts = client.request('GET', '/posts', params={'_limit': 5})
    print(f"Retrieved {len(posts)} posts")
except APIError as e:
    print(f"Request failed: {e}")

Testing API Clients

Mocking for Tests

import unittest
from unittest.mock import Mock, patch

class TestAPIClient(unittest.TestCase):
    """Test cases for API client."""
    
    def setUp(self):
        self.client = APIClient('https://api.example.com')
    
    @patch('requests.Session.request')
    def test_successful_request(self, mock_request):
        """Test successful API request."""
        # Mock the response
        mock_response = Mock()
        mock_response.raise_for_status.return_value = None
        mock_response.json.return_value = {'data': 'test'}
        mock_request.return_value = mock_response
        
        result = self.client.request('GET', '/test')
        
        self.assertEqual(result, {'data': 'test'})
        mock_request.assert_called_once()
    
    @patch('requests.Session.request')
    def test_http_error_handling(self, mock_request):
        """Test HTTP error handling."""
        from requests.exceptions import HTTPError
        
        mock_response = Mock()
        mock_response.raise_for_status.side_effect = HTTPError("404 Not Found")
        mock_response.status_code = 404
        mock_response.json.return_value = {'error': 'Not found'}
        mock_request.return_value = mock_response
        
        with self.assertRaises(NotFoundError):
            self.client.request('GET', '/missing')
    
    def test_url_building(self):
        """Test URL building."""
        self.assertEqual(
            self.client._build_url('/users'),
            'https://api.example.com/users'
        )
        
        self.assertEqual(
            self.client._build_url('https://other.com/api'),
            'https://other.com/api'
        )

if __name__ == '__main__':
    unittest.main()

Production Deployment Considerations

Health Checks

class HealthCheckClient(APIClient):
    """API client with health check capabilities."""
    
    def health_check(self):
        """Perform health check on the API."""
        try:
            # Try a simple endpoint
            response = self.session.get(f"{self.base_url}/health", timeout=5)
            return {
                'status': 'healthy' if response.status_code == 200 else 'unhealthy',
                'response_time': response.elapsed.total_seconds(),
                'status_code': response.status_code
            }
        except Exception as e:
            return {
                'status': 'unhealthy',
                'error': str(e)
            }
    
    def wait_for_healthy(self, timeout=60, interval=5):
        """Wait for API to become healthy."""
        import time
        
        start_time = time.time()
        while time.time() - start_time < timeout:
            health = self.health_check()
            if health['status'] == 'healthy':
                return True
            
            time.sleep(interval)
        
        return False

# Usage in deployment
client = HealthCheckClient('https://api.example.com')

if client.wait_for_healthy(timeout=120):
    print("API is healthy, proceeding with application startup")
else:
    print("API failed health check, aborting startup")
    exit(1)

Metrics and Monitoring

class MonitoredClient(APIClient):
    """API client with metrics collection."""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = {
            'requests_total': 0,
            'requests_success': 0,
            'requests_error': 0,
            'response_times': [],
            'errors_by_type': {}
        }
    
    def request(self, method, endpoint, **kwargs):
        """Make request with metrics collection."""
        self.metrics['requests_total'] += 1
        
        start_time = time.time()
        try:
            result = super().request(method, endpoint, **kwargs)
            self.metrics['requests_success'] += 1
            
            response_time = time.time() - start_time
            self.metrics['response_times'].append(response_time)
            
            return result
            
        except Exception as e:
            self.metrics['requests_error'] += 1
            
            error_type = type(e).__name__
            self.metrics['errors_by_type'][error_type] = \
                self.metrics['errors_by_type'].get(error_type, 0) + 1
            
            raise
    
    def get_metrics(self):
        """Get current metrics."""
        metrics = self.metrics.copy()
        
        if metrics['response_times']:
            metrics['avg_response_time'] = sum(metrics['response_times']) / len(metrics['response_times'])
            metrics['max_response_time'] = max(metrics['response_times'])
            metrics['min_response_time'] = min(metrics['response_times'])
        
        return metrics
    
    def reset_metrics(self):
        """Reset metrics counters."""
        self.metrics = {
            'requests_total': 0,
            'requests_success': 0,
            'requests_error': 0,
            'response_times': [],
            'errors_by_type': {}
        }

# Usage
client = MonitoredClient('https://jsonplaceholder.typicode.com')

# Make some requests
client.request('GET', '/posts/1')
client.request('GET', '/users/1')

# Check metrics
metrics = client.get_metrics()
print("API Metrics:")
print(f"Total requests: {metrics['requests_total']}")
print(f"Success rate: {metrics['requests_success']/metrics['requests_total']:.1%}")
if 'avg_response_time' in metrics:
    print(f"Average response time: {metrics['avg_response_time']:.3f}s")

Key Takeaways

  • Base client classes provide common functionality and error handling
  • Authentication patterns support multiple auth methods (Bearer, Basic, API keys, OAuth)
  • Rate limiting prevents API quota exhaustion with token buckets or header checking
  • Retry strategies handle transient failures with exponential backoff
  • Custom exceptions provide specific error types for different failure scenarios
  • Configuration management enables easy environment switching
  • Logging and monitoring track API usage and performance
  • Testing ensures reliability with mocked responses and error scenarios
  • Health checks verify API availability before application startup

Congratulations! You've now built comprehensive API client libraries that can handle production workloads. These patterns and techniques will serve you well when integrating with any web API, from simple REST services to complex enterprise systems. Remember to always handle errors gracefully, respect rate limits, and monitor your API usage for optimal performance and reliability.

© 2026 forEach. All rights reserved.

Privacy Policy•Terms of Service