- Apply priority queues to Dijkstra's shortest path with O((V+E) log V)
- Implement K-way merge in O(n log k) for external sorting
- Build Huffman coding with always merging two lowest-frequency nodes
Priority Queue Applications
Priority queues aren't just an academic concept. They solve real problems in systems, algorithms, and data processing.
CPU Task Scheduling
Operating systems schedule tasks. Higher priority tasks run first.
import heapq
from dataclasses import dataclass
from typing import List
@dataclass
class Task:
priority: int # Lower number = higher priority
name: str
def __lt__(self, other):
return self.priority < other.priority
class CPUScheduler:
def __init__(self):
self.ready_queue = []
def add_task(self, priority, name):
heapq.heappush(self.ready_queue, Task(priority, name))
def run_next(self):
if not self.ready_queue:
return None
task = heapq.heappop(self.ready_queue)
return task.name
scheduler = CPUScheduler()
scheduler.add_task(2, "Background sync")
scheduler.add_task(1, "User input")
scheduler.add_task(3, "Log rotation")
print(scheduler.run_next()) # "User input"
print(scheduler.run_next()) # "Background sync"
User input (priority 1) runs before background tasks.
Dijkstra's Shortest Path
Find shortest path in weighted graph. Always explore node with smallest distance.
import heapq
def dijkstra(graph, start):
distances = {node: float('inf') for node in graph}
distances[start] = 0
pq = [(0, start)] # (distance, node)
visited = set()
while pq:
dist, node = heapq.heappop(pq)
if node in visited:
continue
visited.add(node)
for neighbor, weight in graph[node]:
new_dist = dist + weight
if new_dist < distances[neighbor]:
distances[neighbor] = new_dist
heapq.heappush(pq, (new_dist, neighbor))
return distances
graph = {
'A': [('B', 1), ('C', 4)],
'B': [('C', 2), ('D', 5)],
'C': [('D', 1)],
'D': []
}
print(dijkstra(graph, 'A'))
# {'A': 0, 'B': 1, 'C': 3, 'D': 4}
Priority queue ensures we always explore closest unvisited node. O((V + E) log V).
Event-Driven Simulation
Simulate events happening at different times. Process earliest event first.
import heapq
class Event:
def __init__(self, time, description):
self.time = time
self.description = description
def __lt__(self, other):
return self.time < other.time
class Simulator:
def __init__(self):
self.events = []
self.current_time = 0
def schedule(self, time, description):
heapq.heappush(self.events, Event(time, description))
def run(self):
while self.events:
event = heapq.heappop(self.events)
self.current_time = event.time
print(f"Time {self.current_time}: {event.description}")
sim = Simulator()
sim.schedule(5, "Customer arrives")
sim.schedule(2, "Server starts")
sim.schedule(8, "Customer leaves")
sim.run()
# Time 2: Server starts
# Time 5: Customer arrives
# Time 8: Customer leaves
Events processed in chronological order, regardless of insertion order.
K-Way Merge
Merge K sorted lists into one sorted list.
import heapq
def merge_k_sorted_lists(lists):
pq = []
result = []
# Initialize with first element from each list
for i, lst in enumerate(lists):
if lst:
heapq.heappush(pq, (lst[0], i, 0)) # (value, list_idx, elem_idx)
while pq:
val, list_idx, elem_idx = heapq.heappop(pq)
result.append(val)
# Add next from same list
if elem_idx + 1 < len(lists[list_idx]):
next_val = lists[list_idx][elem_idx + 1]
heapq.heappush(pq, (next_val, list_idx, elem_idx + 1))
return result
lists = [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]
print(merge_k_sorted_lists(lists))
# [1, 2, 3, 4, 5, 6, 7, 8, 9]
Priority queue of size K. Each element pushed/popped once. O(n log k) where n is total elements.
Used in external sorting (merge sorted chunks from disk).
Huffman Coding
Build optimal prefix-free encoding. Characters with higher frequency get shorter codes.
import heapq
from collections import Counter
class HuffmanNode:
def __init__(self, char, freq):
self.char = char
self.freq = freq
self.left = None
self.right = None
def __lt__(self, other):
return self.freq < other.freq
def build_huffman_tree(text):
freq = Counter(text)
# Priority queue of nodes by frequency
pq = [HuffmanNode(char, f) for char, f in freq.items()]
heapq.heapify(pq)
while len(pq) > 1:
left = heapq.heappop(pq)
right = heapq.heappop(pq)
merged = HuffmanNode(None, left.freq + right.freq)
merged.left = left
merged.right = right
heapq.heappush(pq, merged)
return pq[0]
def get_codes(node, prefix="", codes=None):
if codes is None:
codes = {}
if node.char is not None: # Leaf
codes[node.char] = prefix
else:
get_codes(node.left, prefix + "0", codes)
get_codes(node.right, prefix + "1", codes)
return codes
text = "hello world"
tree = build_huffman_tree(text)
codes = get_codes(tree)
print(codes)
# {'l': '00', 'o': '01', ' ': '100', 'h': '1010', ...}
Always merge two lowest-frequency nodes. Priority queue gives O(n log n).
Load Balancing
Distribute tasks to servers. Assign to server with least load.
import heapq
class Server:
def __init__(self, id):
self.id = id
self.load = 0
def __lt__(self, other):
return self.load < other.load
class LoadBalancer:
def __init__(self, num_servers):
self.servers = [Server(i) for i in range(num_servers)]
heapq.heapify(self.servers)
def assign_task(self, task_load):
# Get server with minimum load
server = heapq.heappop(self.servers)
print(f"Assigning to Server {server.id} (load: {server.load})")
server.load += task_load
# Re-insert with updated load
heapq.heappush(self.servers, server)
lb = LoadBalancer(3)
lb.assign_task(5) # Server 0
lb.assign_task(3) # Server 1
lb.assign_task(7) # Server 2
lb.assign_task(2) # Server 1 (least load)
Always pick server with minimum load. Priority queue maintains order.
Real-Time Data Stream Median
Track median as numbers stream in.
import heapq
class MedianFinder:
def __init__(self):
self.small = [] # Max heap (negate values)
self.large = [] # Min heap
def add_num(self, num):
heapq.heappush(self.small, -num)
# Balance: max of small ≤ min of large
if self.small and self.large and (-self.small[0] > self.large[0]):
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
# Balance sizes
if len(self.small) > len(self.large) + 1:
val = -heapq.heappop(self.small)
heapq.heappush(self.large, val)
if len(self.large) > len(self.small):
val = heapq.heappop(self.large)
heapq.heappush(self.small, -val)
def find_median(self):
if len(self.small) > len(self.large):
return -self.small[0]
return (-self.small[0] + self.large[0]) / 2.0
mf = MedianFinder()
for num in [5, 15, 1, 3]:
mf.add_num(num)
print(f"Added {num}, median: {mf.find_median()}")
# Added 5, median: 5
# Added 15, median: 10.0
# Added 1, median: 5
# Added 3, median: 4.0
Two heaps balanced. Add in O(log n), find median in O(1).
Job Queue with Deadlines
Process jobs by deadline (earliest deadline first).
import heapq
from datetime import datetime, timedelta
class Job:
def __init__(self, id, deadline, duration):
self.id = id
self.deadline = deadline
self.duration = duration
def __lt__(self, other):
return self.deadline < other.deadline
class JobScheduler:
def __init__(self):
self.jobs = []
def add_job(self, id, deadline, duration):
heapq.heappush(self.jobs, Job(id, deadline, duration))
def process_jobs(self):
current_time = datetime.now()
while self.jobs:
job = heapq.heappop(self.jobs)
if current_time > job.deadline:
print(f"Job {job.id} missed deadline!")
else:
print(f"Processing Job {job.id}")
current_time += timedelta(minutes=job.duration)
scheduler = JobScheduler()
now = datetime.now()
scheduler.add_job(1, now + timedelta(hours=2), 30)
scheduler.add_job(2, now + timedelta(hours=1), 20)
scheduler.add_job(3, now + timedelta(hours=3), 40)
scheduler.process_jobs()
# Processing Job 2 (earliest deadline)
# Processing Job 1
# Processing Job 3
Earliest deadline first prevents missing critical deadlines.
When to Use Priority Queues
Need frequent min/max access: Process highest/lowest priority repeatedly.
Event ordering: Simulation, scheduling by time.
Graph algorithms: Dijkstra, Prim, A*.
Data stream processing: Running statistics, top K.
Resource allocation: Load balancing, job scheduling.
When NOT to Use
All elements equally important: Regular queue (FIFO) is simpler.
Need all elements sorted: Just sort the array.
Frequent updates to priorities: Decrease-key operation exists but complex. Consider alternatives.
The Power
Priority queues excel at:
- Scheduling by importance
- Always accessing min/max efficiently
- Graph algorithms requiring "next best" choice
- Streaming data with priorities
Understanding applications means recognizing when priority ordering is the key to efficiency.
Master priority queues, and you'll handle scheduling, simulation, and algorithm optimization problems.
