今天我们将学习Python的并发编程,包括多线程、多进程和异步编程。这些技术可以协助我们提高程序执行效率,特别是在处理I/O密集型任务和计算密集型任务时。
第一部分:并发编程基础
1.1 并发 vs 并行
- 并发:多个任务交替执行,看起来同时进行
- 并行:多个任务真正同时执行,需要多核CPU支持
1.2 Python并发编程的三种方式
- 多线程:适合I/O密集型任务
- 多进程:适合计算密集型任务
- 异步编程:适合高并发I/O操作
第二部分:多线程编程
import threading
import time
import requests
from queue import Queue
import os
class ThreadingDemo:
def __init__(self):
self.results = []
self.lock = threading.Lock()
def simple_thread_demo(self):
"""简单的多线程演示"""
print("=== 简单多线程演示 ===")
def worker(thread_id, delay):
print(f"线程 {thread_id} 开始执行,延迟 {delay} 秒")
time.sleep(delay)
print(f"线程 {thread_id} 执行完成")
# 创建并启动线程
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, i+1))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有线程执行完成!")
def thread_with_lock(self):
"""使用锁的线程安全操作"""
print("
=== 线程锁演示 ===")
shared_counter = 0
def increment_counter(thread_id, iterations):
nonlocal shared_counter
for i in range(iterations):
# 使用锁确保线程安全
with self.lock:
current = shared_counter
time.sleep(0.001) # 模拟一些处理时间
shared_counter = current + 1
print(f"线程 {thread_id} 增加计数器: {shared_counter}")
threads = []
for i in range(3):
t = threading.Thread(target=increment_counter, args=(i, 5))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数器值: {shared_counter} (应该是15)")
def download_with_threads(self, urls):
"""使用多线程下载文件"""
print(f"
=== 多线程下载演示 - {len(urls)}个文件 ===")
def download_file(url, filename):
try:
start_time = time.time()
response = requests.get(url, stream=True, timeout=30)
if response.status_code == 200:
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
download_time = time.time() - start_time
file_size = os.path.getsize(filename)
with self.lock:
self.results.append({
'url': url,
'filename': filename,
'size': file_size,
'time': download_time
})
print(f"下载完成: {filename} ({file_size} bytes, {download_time:.2f}秒)")
else:
print(f"下载失败: {url}, 状态码: {response.status_code}")
except Exception as e:
print(f"下载错误 {url}: {e}")
# 创建下载线程
threads = []
start_time = time.time()
for i, url in enumerate(urls):
filename = f"download_{i+1}.jpg"
t = threading.Thread(target=download_file, args=(url, filename))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
total_time = time.time() - start_time
print(f"
所有下载完成! 总时间: {total_time:.2f}秒")
# 显示统计信息
total_size = sum(r['size'] for r in self.results)
print(f"总下载大小: {total_size} bytes")
print(f"平均下载速度: {total_size / total_time / 1024:.2f} KB/s")
# 运行多线程演示
def run_threading_demo():
demo = ThreadingDemo()
# 简单线程演示
demo.simple_thread_demo()
# 线程锁演示
demo.thread_with_lock()
# 多线程下载演示
urls = [
"https://httpbin.org/image/jpeg",
"https://httpbin.org/image/png",
"https://httpbin.org/image/svg",
] * 2 # 重复URL来演示并发效果
demo.download_with_threads(urls)
if __name__ == "__main__":
run_threading_demo()
第三部分:线程池和Queue
from concurrent.futures import ThreadPoolExecutor, as_completed
import random
class ThreadPoolDemo:
def __init__(self):
self.task_queue = Queue()
self.results = []
def producer_consumer_demo(self):
"""生产者-消费者模式演示"""
print("
=== 生产者-消费者模式 ===")
def producer(queue, num_tasks):
"""生产者:生成任务"""
for i in range(num_tasks):
task = f"任务-{i+1}"
queue.put(task)
print(f"生产者添加: {task}")
time.sleep(0.1) # 模拟生产时间
# 添加结束信号
for _ in range(3): # 3个消费者
queue.put(None)
def consumer(queue, consumer_id):
"""消费者:处理任务"""
while True:
task = queue.get()
if task is None:
print(f"消费者 {consumer_id} 结束")
queue.task_done()
break
print(f"消费者 {consumer_id} 处理: {task}")
# 模拟处理时间
time.sleep(random.uniform(0.5, 1.5))
queue.task_done()
# 创建队列和线程
num_tasks = 10
num_consumers = 3
# 启动生产者
producer_thread = threading.Thread(target=producer, args=(self.task_queue, num_tasks))
producer_thread.start()
# 启动消费者
consumer_threads = []
for i in range(num_consumers):
t = threading.Thread(target=consumer, args=(self.task_queue, i+1))
t.start()
consumer_threads.append(t)
# 等待所有任务完成
self.task_queue.join()
producer_thread.join()
for t in consumer_threads:
t.join()
print("所有任务处理完成!")
def thread_pool_executor_demo(self):
"""使用ThreadPoolExecutor"""
print("
=== ThreadPoolExecutor演示 ===")
def process_task(task_id, processing_time):
"""模拟任务处理"""
print(f"任务 {task_id} 开始执行,需要 {processing_time} 秒")
time.sleep(processing_time)
result = f"任务 {task_id} 完成,耗时 {processing_time} 秒"
return result
# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
futures = []
for i in range(10):
processing_time = random.uniform(0.5, 2.0)
future = executor.submit(process_task, i+1, processing_time)
futures.append(future)
# 获取结果
for future in as_completed(futures):
try:
result = future.result()
print(f"完成: {result}")
except Exception as e:
print(f"任务执行出错: {e}")
print("所有线程池任务完成!")
def url_status_checker(self, urls, max_workers=5):
"""使用线程池检查URL状态"""
print(f"
=== URL状态检查 (线程数: {max_workers}) ===")
def check_url_status(url):
"""检查单个URL的状态"""
try:
start_time = time.time()
response = requests.get(url, timeout=10)
response_time = time.time() - start_time
return {
'url': url,
'status_code': response.status_code,
'response_time': response_time,
'size': len(response.content),
'error': None
}
except Exception as e:
return {
'url': url,
'status_code': None,
'response_time': None,
'size': 0,
'error': str(e)
}
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_url = {executor.submit(check_url_status, url): url for url in urls}
# 收集结果
results = []
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
if result['error']:
print(f"❌ {url} - 错误: {result['error']}")
else:
print(f"✅ {url} - 状态: {result['status_code']}, "
f"响应时间: {result['response_time']:.2f}s")
except Exception as e:
print(f"❌ {url} - 异常: {e}")
total_time = time.time() - start_time
# 统计信息
successful = [r for r in results if r['error'] is None]
failed = [r for r in results if r['error'] is not None]
print(f"
检查完成!")
print(f"总URL数: {len(urls)}")
print(f"成功: {len(successful)}")
print(f"失败: {len(failed)}")
print(f"总时间: {total_time:.2f}秒")
print(f"平均每个URL: {total_time/len(urls):.2f}秒")
return results
# 运行线程池演示
def run_thread_pool_demo():
demo = ThreadPoolDemo()
# 生产者-消费者演示
demo.producer_consumer_demo()
# 线程池执行器演示
demo.thread_pool_executor_demo()
# URL状态检查
test_urls = [
"https://www.baidu.com",
"https://www.google.com",
"https://www.github.com",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://httpbin.org/status/500",
"https://www.python.org",
"https://www.qq.com",
"https://invalid-website-12345.com", # 这个会失败
] * 2 # 重复URL来测试并发
demo.url_status_checker(test_urls, max_workers=5)
if __name__ == "__main__":
run_thread_pool_demo()
第四部分:多进程编程
import multiprocessing
import math
import os
class MultiprocessingDemo:
def __init__(self):
self.cpu_count = multiprocessing.cpu_count()
print(f"CPU核心数: {self.cpu_count}")
def cpu_intensive_task(self, n):
"""CPU密集型任务:计算素数"""
def is_prime(num):
if num < 2:
return False
for i in range(2, int(math.sqrt(num)) + 1):
if num % i == 0:
return False
return True
primes = []
for i in range(2, n + 1):
if is_prime(i):
primes.append(i)
return primes
def compare_thread_vs_process(self, n=10000):
"""比较多线程和多进程在CPU密集型任务上的性能"""
print("
=== 多线程 vs 多进程性能比较 ===")
# 多线程版本
print("多线程执行中...")
start_time = time.time()
threads = []
for _ in range(self.cpu_count):
t = threading.Thread(target=self.cpu_intensive_task, args=(n,))
threads.append(t)
t.start()
for t in threads:
t.join()
thread_time = time.time() - start_time
print(f"多线程执行时间: {thread_time:.2f}秒")
# 多进程版本
print("多进程执行中...")
start_time = time.time()
with multiprocessing.Pool(processes=self.cpu_count) as pool:
results = pool.map(self.cpu_intensive_task, [n] * self.cpu_count)
process_time = time.time() - start_time
print(f"多进程执行时间: {process_time:.2f}秒")
print(f"性能提升: {thread_time/process_time:.2f}倍")
def process_pool_demo(self):
"""进程池演示"""
print("
=== 进程池演示 ===")
def worker_process(task_id, data):
"""工作进程函数"""
pid = os.getpid()
print(f"进程 {pid} 处理任务 {task_id}")
# 模拟一些处理
result = sum(i * i for i in range(data))
time.sleep(1) # 模拟处理时间
return f"任务 {task_id} 结果: {result} (进程: {pid})"
# 准备任务数据
tasks = [(i, i * 1000) for i in range(1, 11)]
print(f"使用进程池处理 {len(tasks)} 个任务")
start_time = time.time()
with multiprocessing.Pool(processes=4) as pool:
# 使用starmap处理多参数任务
results = pool.starmap(worker_process, tasks)
total_time = time.time() - start_time
print("
处理结果:")
for result in results:
print(f" {result}")
print(f"
总处理时间: {total_time:.2f}秒")
print(f"顺序执行预计时间: {len(tasks)}秒")
print(f"加速比: {len(tasks)/total_time:.2f}倍")
def shared_memory_demo(self):
"""共享内存演示"""
print("
=== 多进程共享内存 ===")
def worker_with_shared_array(process_id, shared_arr, lock):
"""使用共享数组的工作进程"""
with lock:
# 修改共享数组
for i in range(len(shared_arr)):
shared_arr[i] += process_id * 10
print(f"进程 {process_id} 修改后的数组: {list(shared_arr)}")
# 创建共享数组和锁
with multiprocessing.Manager() as manager:
shared_array = manager.list([1, 2, 3, 4, 5])
lock = manager.Lock()
processes = []
for i in range(3):
p = multiprocessing.Process(
target=worker_with_shared_array,
args=(i+1, shared_array, lock)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"最终共享数组: {list(shared_array)}")
def process_communication_demo(self):
"""进程间通信演示"""
print("
=== 进程间通信 ===")
def producer(queue, num_items):
"""生产者进程"""
for i in range(num_items):
item = f"消息-{i+1}"
queue.put(item)
print(f"生产者发送: {item}")
time.sleep(0.1)
queue.put("END") # 结束信号
def consumer(queue, consumer_id):
"""消费者进程"""
while True:
item = queue.get()
if item == "END":
print(f"消费者 {consumer_id} 收到结束信号")
break
print(f"消费者 {consumer_id} 收到: {item}")
time.sleep(0.2) # 模拟处理时间
# 创建队列
queue = multiprocessing.Queue()
# 创建进程
producer_process = multiprocessing.Process(
target=producer, args=(queue, 10)
)
consumer_processes = []
for i in range(2):
p = multiprocessing.Process(
target=consumer, args=(queue, i+1)
)
consumer_processes.append(p)
# 启动进程
producer_process.start()
for p in consumer_processes:
p.start()
# 等待完成
producer_process.join()
for p in consumer_processes:
p.join()
print("所有进程通信完成!")
# 运行多进程演示
def run_multiprocessing_demo():
demo = MultiprocessingDemo()
# 性能比较
demo.compare_thread_vs_process(n=5000)
# 进程池演示
demo.process_pool_demo()
# 共享内存演示
demo.shared_memory_demo()
# 进程通信演示
demo.process_communication_demo()
if __name__ == "__main__":
run_multiprocessing_demo()
第五部分:异步编程基础
import asyncio
import aiohttp
import async_timeout
class AsyncIODemo:
def __init__(self):
pass
async def simple_async_demo(self):
"""简单的异步演示"""
print("=== 简单异步演示 ===")
async def say_hello(name, delay):
"""异步打招呼"""
print(f"{name} 开始,等待 {delay} 秒")
await asyncio.sleep(delay)
print(f"Hello, {name}! (等待了 {delay} 秒)")
return f"{name} 完成"
# 创建多个异步任务
tasks = [
say_hello("Alice", 2),
say_hello("Bob", 1),
say_hello("Charlie", 3),
say_hello("David", 1.5)
]
# 并行执行所有任务
results = await asyncio.gather(*tasks)
print(f"所有任务完成: {results}")
async def async_url_checker(self, urls):
"""异步URL状态检查"""
print(f"
=== 异步URL检查 - {len(urls)}个URL ===")
async def check_url(session, url):
"""检查单个URL"""
try:
async with async_timeout.timeout(10):
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'content_type': response.headers.get('Content-Type', 'Unknown')
}
except asyncio.TimeoutError:
return {'url': url, 'error': 'Timeout'}
except Exception as e:
return {'url': url, 'error': str(e)}
start_time = time.time()
results = []
async with aiohttp.ClientSession() as session:
tasks = [check_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
total_time = time.time() - start_time
# 显示结果
success_count = 0
for result in results:
if 'error' in result:
print(f"❌ {result['url']} - 错误: {result['error']}")
else:
print(f"✅ {result['url']} - 状态: {result['status']}")
success_count += 1
print(f"
检查完成!")
print(f"成功: {success_count}/{len(urls)}")
print(f"总时间: {total_time:.2f}秒")
return results
async def async_producer_consumer(self):
"""异步生产者-消费者模式"""
print("
=== 异步生产者-消费者 ===")
queue = asyncio.Queue(maxsize=5)
async def producer(name, num_items):
"""异步生产者"""
for i in range(num_items):
item = f"{name}-产品-{i+1}"
await queue.put(item)
print(f"生产者 {name} 生产: {item}")
await asyncio.sleep(0.5) # 模拟生产时间
# 发送结束信号
await queue.put(f"{name}-END")
print(f"生产者 {name} 完成")
async def consumer(name):
"""异步消费者"""
end_count = 0
total_producers = 2 # 我们知道有2个生产者
while end_count < total_producers:
item = await queue.get()
if item.endswith("-END"):
end_count += 1
print(f"消费者 {name} 收到结束信号 ({end_count}/{total_producers})")
else:
print(f"消费者 {name} 消费: {item}")
await asyncio.sleep(1) # 模拟消费时间
queue.task_done()
print(f"消费者 {name} 完成")
# 启动生产者和消费者
producers = [
asyncio.create_task(producer("P1", 3)),
asyncio.create_task(producer("P2", 2))
]
consumers = [
asyncio.create_task(consumer("C1")),
asyncio.create_task(consumer("C2"))
]
# 等待所有生产者完成
await asyncio.gather(*producers)
# 等待所有任务处理完成
await queue.join()
# 撤销消费者任务
for c in consumers:
c.cancel()
print("异步生产者-消费者完成!")
async def run_benchmark(self):
"""性能基准测试:同步 vs 异步"""
print("
=== 性能基准测试 ===")
urls = [
"https://httpbin.org/delay/1", # 这个端点会延迟1秒响应
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
]
# 同步版本
print("同步版本执行中...")
start_time = time.time()
session = requests.Session()
for url in urls:
try:
response = session.get(url, timeout=10)
print(f"同步: {url} - 状态: {response.status_code}")
except Exception as e:
print(f"同步: {url} - 错误: {e}")
sync_time = time.time() - start_time
print(f"同步版本总时间: {sync_time:.2f}秒")
# 异步版本
print("异步版本执行中...")
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = []
for url in urls:
task = asyncio.create_task(self.fetch_with_session(session, url))
tasks.append(task)
results = await asyncio.gather(*tasks)
for result in results:
if 'error' in result:
print(f"异步: {result['url']} - 错误: {result['error']}")
else:
print(f"异步: {result['url']} - 状态: {result['status']}")
async_time = time.time() - start_time
print(f"异步版本总时间: {async_time:.2f}秒")
print(f"性能提升: {sync_time/async_time:.2f}倍")
async def fetch_with_session(self, session, url):
"""使用会话获取URL"""
try:
async with async_timeout.timeout(10):
async with session.get(url) as response:
return {
'url': url,
'status': response.status
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
# 运行异步演示
async def run_async_demo():
demo = AsyncIODemo()
# 简单异步演示
await demo.simple_async_demo()
# 异步URL检查
test_urls = [
"https://www.baidu.com",
"https://www.python.org",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404",
"https://invalid-url-test.com",
]
await demo.async_url_checker(test_urls)
# 异步生产者消费者
await demo.async_producer_consumer()
# 性能基准测试
await demo.run_benchmark()
# 运行所有演示
def main():
print("=" * 60)
print("Python并发编程演示")
print("=" * 60)
# 多线程演示
run_threading_demo()
# 线程池演示
run_thread_pool_demo()
# 多进程演示
run_multiprocessing_demo()
# 异步编程演示
print("
" + "=" * 60)
print("异步编程演示")
print("=" * 60)
asyncio.run(run_async_demo())
if __name__ == "__main__":
main()
第六部分:综合项目 – 并发Web爬虫
class ConcurrentWebCrawler:
def __init__(self, max_workers=10, use_async=False):
self.max_workers = max_workers
self.use_async = use_async
self.visited_urls = set()
self.lock = threading.Lock()
self.results = []
async def async_crawl(self, start_urls, max_pages=50):
"""异步爬虫"""
print(f"启动异步爬虫,最大页面: {max_pages}")
async with aiohttp.ClientSession() as session:
tasks = set()
# 初始URLs
for url in start_urls:
if len(self.visited_urls) < max_pages:
task = asyncio.create_task(self.async_crawl_page(session, url, max_pages))
tasks.add(task)
while tasks and len(self.visited_urls) < max_pages:
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
new_urls = await task
for url in new_urls:
if url not in self.visited_urls and len(self.visited_urls) < max_pages:
self.visited_urls.add(url)
new_task = asyncio.create_task(self.async_crawl_page(session, url, max_pages))
tasks.add(new_task)
except Exception as e:
print(f"爬取任务出错: {e}")
# 撤销剩余任务
for task in tasks:
task.cancel()
print(f"异步爬虫完成! 总共爬取 {len(self.results)} 个页面")
return self.results
async def async_crawl_page(self, session, url, max_pages):
"""异步爬取单个页面"""
if len(self.visited_urls) >= max_pages:
return []
print(f"异步爬取: {url}")
try:
async with async_timeout.timeout(10):
async with session.get(url) as response:
if response.status != 200:
return []
html = await response.text()
# 解析页面
soup = BeautifulSoup(html, 'html.parser')
# 提取信息
page_data = {
'url': url,
'title': soup.title.string.strip() if soup.title else '无标题',
'links_found': 0,
'status': response.status
}
with self.lock:
self.results.append(page_data)
# 提取新链接
new_urls = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = self.normalize_url(url, href)
if full_url and full_url not in self.visited_urls:
new_urls.append(full_url)
page_data['links_found'] = len(new_urls)
return new_urls[:10] # 限制新链接数量
except Exception as e:
print(f"异步爬取失败 {url}: {e}")
return []
def normalize_url(self, base_url, href):
"""标准化URL"""
try:
full_url = urljoin(base_url, href)
if full_url.startswith(('http://', 'https://')):
return full_url
except:
pass
return None
def thread_crawl(self, start_urls, max_pages=50):
"""多线程爬虫"""
print(f"启动多线程爬虫,工作线程: {self.max_workers}")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交初始任务
futures = set()
for url in start_urls:
if len(self.visited_urls) < max_pages:
future = executor.submit(self.thread_crawl_page, url, max_pages)
futures.add(future)
while futures and len(self.visited_urls) < max_pages:
# 等待任意任务完成
done, futures = futures, set()
for future in done:
try:
new_urls = future.result(timeout=30)
for url in new_urls:
if url not in self.visited_urls and len(self.visited_urls) < max_pages:
self.visited_urls.add(url)
new_future = executor.submit(self.thread_crawl_page, url, max_pages)
futures.add(new_future)
except Exception as e:
print(f"爬取任务出错: {e}")
# 添加新的futures
for future in futures:
pass
print(f"多线程爬虫完成! 总共爬取 {len(self.results)} 个页面")
return self.results
def thread_crawl_page(self, url, max_pages):
"""多线程爬取单个页面"""
if len(self.visited_urls) >= max_pages:
return []
print(f"线程爬取: {url}")
try:
response = requests.get(url, timeout=10, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
if response.status_code != 200:
return []
soup = BeautifulSoup(response.content, 'html.parser')
# 提取信息
page_data = {
'url': url,
'title': soup.title.string.strip() if soup.title else '无标题',
'links_found': 0,
'status': response.status_code
}
with self.lock:
self.results.append(page_data)
# 提取新链接
new_urls = []
for link in soup.find_all('a', href=True):
href = link['href']
full_url = self.normalize_url(url, href)
if full_url and full_url not in self.visited_urls:
new_urls.append(full_url)
page_data['links_found'] = len(new_urls)
return new_urls[:10] # 限制新链接数量
except Exception as e:
print(f"线程爬取失败 {url}: {e}")
return []
def compare_crawlers(self, start_urls, max_pages=20):
"""比较不同爬虫性能"""
print("
" + "="*60)
print("爬虫性能比较")
print("="*60)
# 重置状态
self.visited_urls.clear()
self.results.clear()
# 多线程爬虫
print("
1. 多线程爬虫测试")
start_time = time.time()
thread_results = self.thread_crawl(start_urls, max_pages)
thread_time = time.time() - start_time
# 重置状态
self.visited_urls.clear()
self.results.clear()
# 异步爬虫
print("
2. 异步爬虫测试")
start_time = time.time()
async_results = asyncio.run(self.async_crawl(start_urls, max_pages))
async_time = time.time() - start_time
# 显示比较结果
print("
" + "="*60)
print("性能比较结果")
print("="*60)
print(f"多线程爬虫:")
print(f" 爬取页面: {len(thread_results)}")
print(f" 总时间: {thread_time:.2f}秒")
print(f" 页面/秒: {len(thread_results)/thread_time:.2f}")
print(f"异步爬虫:")
print(f" 爬取页面: {len(async_results)}")
print(f" 总时间: {async_time:.2f}秒")
print(f" 页面/秒: {len(async_results)/async_time:.2f}")
if thread_time > 0 and async_time > 0:
ratio = thread_time / async_time
print(f"性能差异: 异步爬虫是线程爬虫的 {ratio:.2f} 倍")
# 运行爬虫比较
def run_crawler_comparison():
crawler = ConcurrentWebCrawler(max_workers=5)
# 使用一些安全的测试URL
start_urls = [
"https://httpbin.org/html",
"https://httpbin.org/links/10/0",
"https://httpbin.org/links/5/0",
]
crawler.compare_crawlers(start_urls, max_pages=15)
if __name__ == "__main__":
# 运行所有演示
main()
# 运行爬虫比较
run_crawler_comparison()
今日重点学习内容
- 多线程编程:适合I/O密集型任务,使用threading模块
- 线程安全:使用锁保护共享资源
- 线程池:使用ThreadPoolExecutor管理线程
- 多进程编程:适合CPU密集型任务,使用multiprocessing模块
- 进程间通信:使用Queue、Pipe等
- 异步编程:使用asyncio处理高并发I/O
选择指南
- I/O密集型:多线程或异步编程
- CPU密集型:多进程编程
- 高并发I/O:异步编程(asyncio)
- 简单并行:线程池/进程池
注意事项
- GIL限制:Python的全局解释器锁限制多线程的CPU并行
- 资源管理:合理控制线程/进程数量
- 死锁预防:避免循环等待锁的情况
- 异常处理:妥善处理并发任务中的异常
扩展练习
- 实现一个并发下载管理器
- 构建实时数据处理的并发管道
- 实现一个支持并发的Web服务器
- 比较不同并发模式在具体场景下的性能
明日预告
第27天:我们将学习Python的数据分析和科学计算,包括NumPy、Pandas和Matplotlib等库的使用。
今天先运行这些并发编程的示例,理解不同并发模式的特点和适用场景。有什么问题随时问我!
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END














- 最新
- 最热
只看作者