今日练习主题:并发编程基础
今天我们将学习Python的并发编程基础,包括多线程、多进程和异步编程。这些技术可以协助你编写更高效的程序。
练习1:多线程基础
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# 基础多线程
def basic_threading():
"""基础多线程示例"""
print("=== 基础多线程 ===")
def worker(thread_id, delay):
"""工作线程函数"""
print(f"线程 {thread_id} 开始执行,延迟 {delay} 秒")
time.sleep(delay)
print(f"线程 {thread_id} 执行完成")
return f"线程 {thread_id} 结果"
# 创建并启动线程
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i, i+1))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
print("所有线程执行完成")
basic_threading()
print("
" + "="*50 + "
")
# 使用ThreadPoolExecutor
def thread_pool_example():
"""线程池示例"""
print("=== 线程池执行器 ===")
def task(n):
"""任务函数"""
print(f"任务 {n} 开始")
time.sleep(1)
print(f"任务 {n} 完成")
return n * n
# 使用线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
futures = [executor.submit(task, i) for i in range(5)]
# 获取结果
results = [future.result() for future in futures]
print(f"所有任务结果: {results}")
thread_pool_example()
练习2:线程同步
import threading
import time
# 线程同步
def thread_synchronization():
"""线程同步示例"""
print("
=== 线程同步 ===")
# 共享资源
counter = 0
lock = threading.Lock()
def increment():
"""增加计数器"""
nonlocal counter
for _ in range(100000):
with lock: # 使用锁保护共享资源
counter += 1
def decrement():
"""减少计数器"""
nonlocal counter
for _ in range(100000):
with lock: # 使用锁保护共享资源
counter -= 1
# 创建线程
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=decrement)
# 启动线程
t1.start()
t2.start()
# 等待线程完成
t1.join()
t2.join()
print(f"最终计数器值: {counter}") # 应该是0
thread_synchronization()
print("
" + "="*50 + "
")
# 使用信号量
def semaphore_example():
"""信号量示例"""
print("=== 信号量控制 ===")
# 限制同时访问的线程数量
semaphore = threading.Semaphore(2) # 允许2个线程同时访问
def limited_access(thread_id):
"""受限访问的函数"""
with semaphore:
print(f"线程 {thread_id} 获得访问权限")
time.sleep(2)
print(f"线程 {thread_id} 释放访问权限")
# 创建多个线程
threads = []
for i in range(5):
thread = threading.Thread(target=limited_access, args=(i,))
threads.append(thread)
thread.start()
# 等待所有线程
for thread in threads:
thread.join()
semaphore_example()
练习3:多进程基础
import multiprocessing
import time
import os
from concurrent.futures import ProcessPoolExecutor
# 多进程基础
def basic_multiprocessing():
"""多进程基础示例"""
print("
=== 多进程基础 ===")
def worker(process_id, delay):
"""工作进程函数"""
print(f"进程 {process_id} (PID: {os.getpid()}) 开始执行")
time.sleep(delay)
print(f"进程 {process_id} 执行完成")
return f"进程 {process_id} 结果"
# 创建并启动进程
processes = []
for i in range(3):
process = multiprocessing.Process(target=worker, args=(i, i+1))
processes.append(process)
process.start()
# 等待所有进程完成
for process in processes:
process.join()
print("所有进程执行完成")
basic_multiprocessing()
print("
" + "="*50 + "
")
# 使用进程池
def process_pool_example():
"""进程池示例"""
print("=== 进程池执行器 ===")
def cpu_intensive_task(n):
"""CPU密集型任务"""
print(f"进程 {n} (PID: {os.getpid()}) 开始计算")
result = sum(i*i for i in range(1000000))
print(f"进程 {n} 计算完成")
return result
# 使用进程池
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(cpu_intensive_task, i) for i in range(4)]
results = [future.result() for future in futures]
print(f"计算结果: {results}")
process_pool_example()
练习4:进程间通信
import multiprocessing
import time
# 进程间通信
def inter_process_communication():
"""进程间通信示例"""
print("
=== 进程间通信 ===")
# 使用队列进行通信
def producer(queue, items):
"""生产者进程"""
for item in items:
print(f"生产者发送: {item}")
queue.put(item)
time.sleep(0.1)
queue.put(None) # 结束信号
def consumer(queue):
"""消费者进程"""
while True:
item = queue.get()
if item is None: # 收到结束信号
break
print(f"消费者接收: {item}")
time.sleep(0.2)
# 创建队列
queue = multiprocessing.Queue()
# 创建进程
items = [f"消息_{i}" for i in range(5)]
p1 = multiprocessing.Process(target=producer, args=(queue, items))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
# 启动进程
p1.start()
p2.start()
# 等待进程完成
p1.join()
p2.join()
print("进程间通信完成")
inter_process_communication()
print("
" + "="*50 + "
")
# 使用共享内存
def shared_memory_example():
"""共享内存示例"""
print("=== 共享内存 ===")
# 共享值
shared_value = multiprocessing.Value('i', 0) # 'i' 表明整数类型
def incrementer(process_id, value):
"""增加共享值"""
for _ in range(1000):
with value.get_lock(): # 需要加锁
value.value += 1
print(f"进程 {process_id} 完成增加")
def decrementer(process_id, value):
"""减少共享值"""
for _ in range(1000):
with value.get_lock(): # 需要加锁
value.value -= 1
print(f"进程 {process_id} 完成减少")
# 创建进程
processes = []
for i in range(2):
if i % 2 == 0:
process = multiprocessing.Process(target=incrementer, args=(i, shared_value))
else:
process = multiprocessing.Process(target=decrementer, args=(i, shared_value))
processes.append(process)
process.start()
# 等待进程
for process in processes:
process.join()
print(f"最终共享值: {shared_value.value}") # 应该是0
shared_memory_example()
练习5:异步编程基础
import asyncio
import time
# 异步编程基础
async def basic_async():
"""异步编程基础示例"""
print("
=== 异步编程基础 ===")
async def async_task(task_id, delay):
"""异步任务"""
print(f"任务 {task_id} 开始,等待 {delay} 秒")
await asyncio.sleep(delay)
print(f"任务 {task_id} 完成")
return f"任务 {task_id} 结果"
# 运行异步任务
start_time = time.time()
# 方式1: 使用asyncio.run
async def main():
# 创建任务列表
tasks = [
async_task(1, 2),
async_task(2, 1),
async_task(3, 3)
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(f"所有任务结果: {results}")
await main()
end_time = time.time()
print(f"总执行时间: {end_time - start_time:.2f} 秒")
# 运行异步示例
asyncio.run(basic_async())
print("
" + "="*50 + "
")
# 异步IO操作
async def async_io_example():
"""异步IO操作示例"""
print("=== 异步IO操作 ===")
async def read_file_async(filename):
"""异步读取文件(模拟)"""
print(f"开始读取 {filename}")
await asyncio.sleep(1) # 模拟IO等待
print(f"完成读取 {filename}")
return f"{filename} 的内容"
async def fetch_url_async(url):
"""异步获取URL(模拟)"""
print(f"开始获取 {url}")
await asyncio.sleep(2) # 模拟网络等待
print(f"完成获取 {url}")
return f"{url} 的响应"
# 并发执行IO操作
start_time = time.time()
results = await asyncio.gather(
read_file_async("file1.txt"),
read_file_async("file2.txt"),
fetch_url_async("https://example.com"),
fetch_url_async("https://api.example.com")
)
end_time = time.time()
print(f"所有IO操作完成,结果: {results}")
print(f"总执行时间: {end_time - start_time:.2f} 秒")
asyncio.run(async_io_example())
练习6:异步与同步对比
import asyncio
import time
import threading
# 同步版本
def sync_io_operations():
"""同步IO操作"""
print("
=== 同步IO操作 ===")
def read_file_sync(filename):
print(f"开始同步读取 {filename}")
time.sleep(1) # 模拟IO等待
print(f"完成同步读取 {filename}")
return f"{filename} 的内容"
def fetch_url_sync(url):
print(f"开始同步获取 {url}")
time.sleep(2) # 模拟网络等待
print(f"完成同步获取 {url}")
return f"{url} 的响应"
start_time = time.time()
# 顺序执行
result1 = read_file_sync("file1.txt")
result2 = read_file_sync("file2.txt")
result3 = fetch_url_sync("https://example.com")
result4 = fetch_url_sync("https://api.example.com")
results = [result1, result2, result3, result4]
end_time = time.time()
print(f"同步操作结果: {results}")
print(f"同步总执行时间: {end_time - start_time:.2f} 秒")
# 多线程版本
def threaded_io_operations():
"""多线程IO操作"""
print("
=== 多线程IO操作 ===")
def read_file_thread(filename):
print(f"开始线程读取 {filename}")
time.sleep(1)
print(f"完成线程读取 {filename}")
return f"{filename} 的内容"
def fetch_url_thread(url):
print(f"开始线程获取 {url}")
time.sleep(2)
print(f"完成线程获取 {url}")
return f"{url} 的响应"
start_time = time.time()
# 使用线程池
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
executor.submit(read_file_thread, "file1.txt"),
executor.submit(read_file_thread, "file2.txt"),
executor.submit(fetch_url_thread, "https://example.com"),
executor.submit(fetch_url_thread, "https://api.example.com")
]
results = [future.result() for future in futures]
end_time = time.time()
print(f"多线程操作结果: {results}")
print(f"多线程总执行时间: {end_time - start_time:.2f} 秒")
# 运行对比
sync_io_operations()
threaded_io_operations()
# 异步版本已经在前面运行过
练习7:并发模式实践
import asyncio
import concurrent.futures
import time
# CPU密集型任务的并发处理
def cpu_intensive_computation(n):
"""CPU密集型计算"""
return sum(i * i for i in range(n))
async def mixed_concurrency():
"""混合并发模式:异步 + 线程池"""
print("
=== 混合并发模式 ===")
# I/O密集型任务(异步)
async def io_task(task_id):
print(f"异步IO任务 {task_id} 开始")
await asyncio.sleep(1)
print(f"异步IO任务 {task_id} 完成")
return f"IO结果_{task_id}"
# CPU密集型任务(使用线程池)
async def cpu_task(n, executor):
loop = asyncio.get_event_loop()
print(f"CPU任务 {n} 开始")
# 在线程池中运行CPU密集型任务
result = await loop.run_in_executor(
executor, cpu_intensive_computation, n
)
print(f"CPU任务 {n} 完成,结果: {result}")
return result
start_time = time.time()
# 创建线程池用于CPU密集型任务
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# 并发执行IO和CPU任务
io_tasks = [io_task(i) for i in range(3)]
cpu_tasks = [cpu_task(1000000, executor) for _ in range(2)]
# 等待所有任务完成
io_results = await asyncio.gather(*io_tasks)
cpu_results = await asyncio.gather(*cpu_tasks)
end_time = time.time()
print(f"IO结果: {io_results}")
print(f"CPU结果: {cpu_results}")
print(f"混合并发总时间: {end_time - start_time:.2f} 秒")
asyncio.run(mixed_concurrency())
练习8:并发编程最佳实践
import asyncio
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ConcurrentProcessor:
"""并发处理器"""
def __init__(self):
self.thread_executor = ThreadPoolExecutor(max_workers=4)
self.process_executor = ProcessPoolExecutor(max_workers=2)
async def process_tasks(self, tasks):
"""处理各种类型的任务"""
results = []
for task in tasks:
if task['type'] == 'io':
# I/O密集型使用异步
result = await self.process_io_task(task)
results.append(result)
elif task['type'] == 'cpu':
# CPU密集型使用进程池
result = await self.process_cpu_task(task)
results.append(result)
elif task['type'] == 'network':
# 网络请求使用线程池
result = await self.process_network_task(task)
results.append(result)
return results
async def process_io_task(self, task):
"""处理I/O密集型任务"""
print(f"处理I/O任务: {task['name']}")
await asyncio.sleep(task.get('duration', 1))
return f"I/O结果: {task['name']}"
async def process_cpu_task(self, task):
"""处理CPU密集型任务"""
loop = asyncio.get_event_loop()
print(f"处理CPU任务: {task['name']}")
n = task.get('n', 1000000)
result = await loop.run_in_executor(
self.process_executor,
self.cpu_intensive_work,
n
)
return f"CPU结果: {task['name']} = {result}"
async def process_network_task(self, task):
"""处理网络密集型任务"""
loop = asyncio.get_event_loop()
print(f"处理网络任务: {task['name']}")
result = await loop.run_in_executor(
self.thread_executor,
self.network_work,
task['url']
)
return f"网络结果: {task['name']} = {result}"
def cpu_intensive_work(self, n):
"""CPU密集型工作"""
return sum(i * i for i in range(n))
def network_work(self, url):
"""网络工作(模拟)"""
import time
time.sleep(2) # 模拟网络延迟
return f"响应自 {url}"
def close(self):
"""清理资源"""
self.thread_executor.shutdown()
self.process_executor.shutdown()
async def best_practices_example():
"""并发编程最佳实践示例"""
print("
=== 并发编程最佳实践 ===")
processor = ConcurrentProcessor()
# 定义各种类型的任务
tasks = [
{'type': 'io', 'name': '读取文件1', 'duration': 1},
{'type': 'io', 'name': '读取文件2', 'duration': 2},
{'type': 'cpu', 'name': '计算1', 'n': 500000},
{'type': 'cpu', 'name': '计算2', 'n': 1000000},
{'type': 'network', 'name': 'API请求1', 'url': 'https://api1.com'},
{'type': 'network', 'name': 'API请求2', 'url': 'https://api2.com'}
]
start_time = time.time()
try:
results = await processor.process_tasks(tasks)
print(f"
所有任务完成,结果:")
for result in results:
print(f" - {result}")
finally:
processor.close()
end_time = time.time()
print(f"
总执行时间: {end_time - start_time:.2f} 秒")
asyncio.run(best_practices_example())
今日挑战:
创建一个完整的Web爬虫,使用多种并发技术来高效地抓取网页。
# 挑战练习:并发Web爬虫
import asyncio
import aiohttp
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from collections import deque
import async_timeout
class ConcurrentWebCrawler:
"""并发Web爬虫"""
def __init__(self, max_concurrent=10, max_pages=50):
self.max_concurrent = max_concurrent
self.max_pages = max_pages
self.visited = set()
self.to_visit = deque()
self.results = []
self.session = None
async def crawl(self, start_url):
"""开始爬取"""
print(f"开始爬取: {start_url}")
self.to_visit.append(start_url)
async with aiohttp.ClientSession() as session:
self.session = session
tasks = set()
while self.to_visit or tasks:
# 创建新任务
while self.to_visit and len(tasks) < self.max_concurrent and len(self.visited) < self.max_pages:
url = self.to_visit.popleft()
if url not in self.visited:
self.visited.add(url)
task = asyncio.create_task(self.fetch_page(url))
tasks.add(task)
# 等待任务完成
if tasks:
done, tasks = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
try:
result = await task
if result:
self.results.append(result)
except Exception as e:
print(f"爬取失败: {e}")
return self.results
async def fetch_page(self, url):
"""获取页面内容"""
try:
async with async_timeout.timeout(10):
async with self.session.get(url) as response:
if response.status == 200:
html = await response.text()
title, links = self.parse_page(html, url)
print(f"成功爬取: {url} - {title}")
return {
'url': url,
'title': title,
'links_found': len(links)
}
else:
print(f"HTTP错误: {url} - {response.status}")
except asyncio.TimeoutError:
print(f"超时: {url}")
except Exception as e:
print(f"错误爬取 {url}: {e}")
return None
def parse_page(self, html, base_url):
"""解析页面"""
soup = BeautifulSoup(html, 'html.parser')
title = soup.find('title')
title_text = title.get_text() if title else "无标题"
# 提取链接
links = []
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urljoin(base_url, href)
if self.is_valid_url(absolute_url):
links.append(absolute_url)
if absolute_url not in self.visited and absolute_url not in self.to_visit:
self.to_visit.append(absolute_url)
return title_text, links
def is_valid_url(self, url):
"""检查URL是否有效"""
parsed = urlparse(url)
return bool(parsed.netloc) and parsed.scheme in ['http', 'https']
def close(self):
"""关闭资源"""
if self.session:
asyncio.run(self.session.close())
async def web_crawler_demo():
"""Web爬虫演示"""
print("=== 并发Web爬虫演示 ===")
# 注意:实际使用时请遵守网站的robots.txt和使用条款
# 这里使用示例网站进行演示
start_urls = [
"https://httpbin.org/html",
"https://httpbin.org/links/10/0"
]
crawler = ConcurrentWebCrawler(max_concurrent=5, max_pages=10)
try:
start_time = time.time()
results = []
for start_url in start_urls:
result = await crawler.crawl(start_url)
results.extend(result)
end_time = time.time()
print(f"
爬取完成!")
print(f"总共爬取页面: {len(results)}")
print(f"总耗时: {end_time - start_time:.2f} 秒")
print(f"
爬取结果:")
for result in results[:5]: # 显示前5个结果
print(f"URL: {result['url']}")
print(f"标题: {result['title']}")
print(f"发现链接: {result['links_found']}")
print("-" * 50)
finally:
crawler.close()
# 由于网络请求需要,这里注释掉实际运行,你可以撤销注释来测试
# asyncio.run(web_crawler_demo())
# 替代方案:使用线程池的简单爬虫
def simple_threaded_crawler():
"""简单的多线程爬虫"""
print("=== 简单多线程爬虫 ===")
def fetch_url(url):
"""获取URL内容"""
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.find('title')
return {
'url': url,
'title': title.get_text() if title else "无标题",
'status': '成功'
}
except Exception as e:
return {'url': url, 'title': f"错误: {e}", 'status': '失败'}
urls = [
"https://httpbin.org/html",
"https://httpbin.org/status/200",
"https://httpbin.org/status/404"
]
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
end_time = time.time()
print(f"爬取完成,耗时: {end_time - start_time:.2f} 秒")
for result in results:
print(f"URL: {result['url']}, 状态: {result['status']}, 标题: {result['title']}")
# 运行简单爬虫
simple_threaded_crawler()
学习提示:
- 多线程:适合I/O密集型任务,如网络请求、文件操作
- 多进程:适合CPU密集型任务,如数学计算、图像处理
- 异步编程:适合高并发的I/O操作,使用async/await语法
- 线程安全:使用锁保护共享资源,避免竞态条件
- 资源管理:正确关闭线程池、进程池和会话
- 错误处理:妥善处理并发环境中的异常
- 性能思考:根据任务类型选择合适的并发模型
明天我们将学习网络编程!坚持练习,你的并发编程能力会越来越强!
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END

















- 最新
- 最热
只看作者