Python疯狂练习60天——第十二天

今日练习主题:并发编程基础

今天我们将学习Python的并发编程基础,包括多线程、多进程和异步编程。这些技术可以协助你编写更高效的程序。

练习1:多线程基础

import threading
import time
from concurrent.futures import ThreadPoolExecutor

# 基础多线程
def basic_threading():
    """基础多线程示例"""
    print("=== 基础多线程 ===")
    
    def worker(thread_id, delay):
        """工作线程函数"""
        print(f"线程 {thread_id} 开始执行,延迟 {delay} 秒")
        time.sleep(delay)
        print(f"线程 {thread_id} 执行完成")
        return f"线程 {thread_id} 结果"
    
    # 创建并启动线程
    threads = []
    for i in range(3):
        thread = threading.Thread(target=worker, args=(i, i+1))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    print("所有线程执行完成")

basic_threading()

print("
" + "="*50 + "
")

# 使用ThreadPoolExecutor
def thread_pool_example():
    """线程池示例"""
    print("=== 线程池执行器 ===")
    
    def task(n):
        """任务函数"""
        print(f"任务 {n} 开始")
        time.sleep(1)
        print(f"任务 {n} 完成")
        return n * n
    
    # 使用线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交任务
        futures = [executor.submit(task, i) for i in range(5)]
        
        # 获取结果
        results = [future.result() for future in futures]
        print(f"所有任务结果: {results}")

thread_pool_example()

练习2:线程同步

import threading
import time

# 线程同步
def thread_synchronization():
    """线程同步示例"""
    print("
=== 线程同步 ===")
    
    # 共享资源
    counter = 0
    lock = threading.Lock()
    
    def increment():
        """增加计数器"""
        nonlocal counter
        for _ in range(100000):
            with lock:  # 使用锁保护共享资源
                counter += 1
    
    def decrement():
        """减少计数器"""
        nonlocal counter
        for _ in range(100000):
            with lock:  # 使用锁保护共享资源
                counter -= 1
    
    # 创建线程
    t1 = threading.Thread(target=increment)
    t2 = threading.Thread(target=decrement)
    
    # 启动线程
    t1.start()
    t2.start()
    
    # 等待线程完成
    t1.join()
    t2.join()
    
    print(f"最终计数器值: {counter}")  # 应该是0

thread_synchronization()

print("
" + "="*50 + "
")

# 使用信号量
def semaphore_example():
    """信号量示例"""
    print("=== 信号量控制 ===")
    
    # 限制同时访问的线程数量
    semaphore = threading.Semaphore(2)  # 允许2个线程同时访问
    
    def limited_access(thread_id):
        """受限访问的函数"""
        with semaphore:
            print(f"线程 {thread_id} 获得访问权限")
            time.sleep(2)
            print(f"线程 {thread_id} 释放访问权限")
    
    # 创建多个线程
    threads = []
    for i in range(5):
        thread = threading.Thread(target=limited_access, args=(i,))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程
    for thread in threads:
        thread.join()

semaphore_example()

练习3:多进程基础

import multiprocessing
import time
import os
from concurrent.futures import ProcessPoolExecutor

# 多进程基础
def basic_multiprocessing():
    """多进程基础示例"""
    print("
=== 多进程基础 ===")
    
    def worker(process_id, delay):
        """工作进程函数"""
        print(f"进程 {process_id} (PID: {os.getpid()}) 开始执行")
        time.sleep(delay)
        print(f"进程 {process_id} 执行完成")
        return f"进程 {process_id} 结果"
    
    # 创建并启动进程
    processes = []
    for i in range(3):
        process = multiprocessing.Process(target=worker, args=(i, i+1))
        processes.append(process)
        process.start()
    
    # 等待所有进程完成
    for process in processes:
        process.join()
    
    print("所有进程执行完成")

basic_multiprocessing()

print("
" + "="*50 + "
")

# 使用进程池
def process_pool_example():
    """进程池示例"""
    print("=== 进程池执行器 ===")
    
    def cpu_intensive_task(n):
        """CPU密集型任务"""
        print(f"进程 {n} (PID: {os.getpid()}) 开始计算")
        result = sum(i*i for i in range(1000000))
        print(f"进程 {n} 计算完成")
        return result
    
    # 使用进程池
    with ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(cpu_intensive_task, i) for i in range(4)]
        results = [future.result() for future in futures]
        print(f"计算结果: {results}")

process_pool_example()

练习4:进程间通信

import multiprocessing
import time

# 进程间通信
def inter_process_communication():
    """进程间通信示例"""
    print("
=== 进程间通信 ===")
    
    # 使用队列进行通信
    def producer(queue, items):
        """生产者进程"""
        for item in items:
            print(f"生产者发送: {item}")
            queue.put(item)
            time.sleep(0.1)
        queue.put(None)  # 结束信号
    
    def consumer(queue):
        """消费者进程"""
        while True:
            item = queue.get()
            if item is None:  # 收到结束信号
                break
            print(f"消费者接收: {item}")
            time.sleep(0.2)
    
    # 创建队列
    queue = multiprocessing.Queue()
    
    # 创建进程
    items = [f"消息_{i}" for i in range(5)]
    p1 = multiprocessing.Process(target=producer, args=(queue, items))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))
    
    # 启动进程
    p1.start()
    p2.start()
    
    # 等待进程完成
    p1.join()
    p2.join()
    
    print("进程间通信完成")

inter_process_communication()

print("
" + "="*50 + "
")

# 使用共享内存
def shared_memory_example():
    """共享内存示例"""
    print("=== 共享内存 ===")
    
    # 共享值
    shared_value = multiprocessing.Value('i', 0)  # 'i' 表明整数类型
    
    def incrementer(process_id, value):
        """增加共享值"""
        for _ in range(1000):
            with value.get_lock():  # 需要加锁
                value.value += 1
        print(f"进程 {process_id} 完成增加")
    
    def decrementer(process_id, value):
        """减少共享值"""
        for _ in range(1000):
            with value.get_lock():  # 需要加锁
                value.value -= 1
        print(f"进程 {process_id} 完成减少")
    
    # 创建进程
    processes = []
    for i in range(2):
        if i % 2 == 0:
            process = multiprocessing.Process(target=incrementer, args=(i, shared_value))
        else:
            process = multiprocessing.Process(target=decrementer, args=(i, shared_value))
        processes.append(process)
        process.start()
    
    # 等待进程
    for process in processes:
        process.join()
    
    print(f"最终共享值: {shared_value.value}")  # 应该是0

shared_memory_example()

练习5:异步编程基础

import asyncio
import time

# 异步编程基础
async def basic_async():
    """异步编程基础示例"""
    print("
=== 异步编程基础 ===")
    
    async def async_task(task_id, delay):
        """异步任务"""
        print(f"任务 {task_id} 开始,等待 {delay} 秒")
        await asyncio.sleep(delay)
        print(f"任务 {task_id} 完成")
        return f"任务 {task_id} 结果"
    
    # 运行异步任务
    start_time = time.time()
    
    # 方式1: 使用asyncio.run
    async def main():
        # 创建任务列表
        tasks = [
            async_task(1, 2),
            async_task(2, 1),
            async_task(3, 3)
        ]
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
        print(f"所有任务结果: {results}")
    
    await main()
    
    end_time = time.time()
    print(f"总执行时间: {end_time - start_time:.2f} 秒")

# 运行异步示例
asyncio.run(basic_async())

print("
" + "="*50 + "
")

# 异步IO操作
async def async_io_example():
    """异步IO操作示例"""
    print("=== 异步IO操作 ===")
    
    async def read_file_async(filename):
        """异步读取文件(模拟)"""
        print(f"开始读取 {filename}")
        await asyncio.sleep(1)  # 模拟IO等待
        print(f"完成读取 {filename}")
        return f"{filename} 的内容"
    
    async def fetch_url_async(url):
        """异步获取URL(模拟)"""
        print(f"开始获取 {url}")
        await asyncio.sleep(2)  # 模拟网络等待
        print(f"完成获取 {url}")
        return f"{url} 的响应"
    
    # 并发执行IO操作
    start_time = time.time()
    
    results = await asyncio.gather(
        read_file_async("file1.txt"),
        read_file_async("file2.txt"),
        fetch_url_async("https://example.com"),
        fetch_url_async("https://api.example.com")
    )
    
    end_time = time.time()
    print(f"所有IO操作完成,结果: {results}")
    print(f"总执行时间: {end_time - start_time:.2f} 秒")

asyncio.run(async_io_example())

练习6:异步与同步对比

import asyncio
import time
import threading

# 同步版本
def sync_io_operations():
    """同步IO操作"""
    print("
=== 同步IO操作 ===")
    
    def read_file_sync(filename):
        print(f"开始同步读取 {filename}")
        time.sleep(1)  # 模拟IO等待
        print(f"完成同步读取 {filename}")
        return f"{filename} 的内容"
    
    def fetch_url_sync(url):
        print(f"开始同步获取 {url}")
        time.sleep(2)  # 模拟网络等待
        print(f"完成同步获取 {url}")
        return f"{url} 的响应"
    
    start_time = time.time()
    
    # 顺序执行
    result1 = read_file_sync("file1.txt")
    result2 = read_file_sync("file2.txt")
    result3 = fetch_url_sync("https://example.com")
    result4 = fetch_url_sync("https://api.example.com")
    
    results = [result1, result2, result3, result4]
    
    end_time = time.time()
    print(f"同步操作结果: {results}")
    print(f"同步总执行时间: {end_time - start_time:.2f} 秒")

# 多线程版本
def threaded_io_operations():
    """多线程IO操作"""
    print("
=== 多线程IO操作 ===")
    
    def read_file_thread(filename):
        print(f"开始线程读取 {filename}")
        time.sleep(1)
        print(f"完成线程读取 {filename}")
        return f"{filename} 的内容"
    
    def fetch_url_thread(url):
        print(f"开始线程获取 {url}")
        time.sleep(2)
        print(f"完成线程获取 {url}")
        return f"{url} 的响应"
    
    start_time = time.time()
    
    # 使用线程池
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [
            executor.submit(read_file_thread, "file1.txt"),
            executor.submit(read_file_thread, "file2.txt"),
            executor.submit(fetch_url_thread, "https://example.com"),
            executor.submit(fetch_url_thread, "https://api.example.com")
        ]
        results = [future.result() for future in futures]
    
    end_time = time.time()
    print(f"多线程操作结果: {results}")
    print(f"多线程总执行时间: {end_time - start_time:.2f} 秒")

# 运行对比
sync_io_operations()
threaded_io_operations()

# 异步版本已经在前面运行过

练习7:并发模式实践

import asyncio
import concurrent.futures
import time

# CPU密集型任务的并发处理
def cpu_intensive_computation(n):
    """CPU密集型计算"""
    return sum(i * i for i in range(n))

async def mixed_concurrency():
    """混合并发模式:异步 + 线程池"""
    print("
=== 混合并发模式 ===")
    
    # I/O密集型任务(异步)
    async def io_task(task_id):
        print(f"异步IO任务 {task_id} 开始")
        await asyncio.sleep(1)
        print(f"异步IO任务 {task_id} 完成")
        return f"IO结果_{task_id}"
    
    # CPU密集型任务(使用线程池)
    async def cpu_task(n, executor):
        loop = asyncio.get_event_loop()
        print(f"CPU任务 {n} 开始")
        # 在线程池中运行CPU密集型任务
        result = await loop.run_in_executor(
            executor, cpu_intensive_computation, n
        )
        print(f"CPU任务 {n} 完成,结果: {result}")
        return result
    
    start_time = time.time()
    
    # 创建线程池用于CPU密集型任务
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        # 并发执行IO和CPU任务
        io_tasks = [io_task(i) for i in range(3)]
        cpu_tasks = [cpu_task(1000000, executor) for _ in range(2)]
        
        # 等待所有任务完成
        io_results = await asyncio.gather(*io_tasks)
        cpu_results = await asyncio.gather(*cpu_tasks)
    
    end_time = time.time()
    print(f"IO结果: {io_results}")
    print(f"CPU结果: {cpu_results}")
    print(f"混合并发总时间: {end_time - start_time:.2f} 秒")

asyncio.run(mixed_concurrency())

练习8:并发编程最佳实践

import asyncio
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

class ConcurrentProcessor:
    """并发处理器"""
    
    def __init__(self):
        self.thread_executor = ThreadPoolExecutor(max_workers=4)
        self.process_executor = ProcessPoolExecutor(max_workers=2)
    
    async def process_tasks(self, tasks):
        """处理各种类型的任务"""
        results = []
        
        for task in tasks:
            if task['type'] == 'io':
                # I/O密集型使用异步
                result = await self.process_io_task(task)
                results.append(result)
            elif task['type'] == 'cpu':
                # CPU密集型使用进程池
                result = await self.process_cpu_task(task)
                results.append(result)
            elif task['type'] == 'network':
                # 网络请求使用线程池
                result = await self.process_network_task(task)
                results.append(result)
        
        return results
    
    async def process_io_task(self, task):
        """处理I/O密集型任务"""
        print(f"处理I/O任务: {task['name']}")
        await asyncio.sleep(task.get('duration', 1))
        return f"I/O结果: {task['name']}"
    
    async def process_cpu_task(self, task):
        """处理CPU密集型任务"""
        loop = asyncio.get_event_loop()
        print(f"处理CPU任务: {task['name']}")
        n = task.get('n', 1000000)
        result = await loop.run_in_executor(
            self.process_executor, 
            self.cpu_intensive_work, 
            n
        )
        return f"CPU结果: {task['name']} = {result}"
    
    async def process_network_task(self, task):
        """处理网络密集型任务"""
        loop = asyncio.get_event_loop()
        print(f"处理网络任务: {task['name']}")
        result = await loop.run_in_executor(
            self.thread_executor,
            self.network_work,
            task['url']
        )
        return f"网络结果: {task['name']} = {result}"
    
    def cpu_intensive_work(self, n):
        """CPU密集型工作"""
        return sum(i * i for i in range(n))
    
    def network_work(self, url):
        """网络工作(模拟)"""
        import time
        time.sleep(2)  # 模拟网络延迟
        return f"响应自 {url}"
    
    def close(self):
        """清理资源"""
        self.thread_executor.shutdown()
        self.process_executor.shutdown()

async def best_practices_example():
    """并发编程最佳实践示例"""
    print("
=== 并发编程最佳实践 ===")
    
    processor = ConcurrentProcessor()
    
    # 定义各种类型的任务
    tasks = [
        {'type': 'io', 'name': '读取文件1', 'duration': 1},
        {'type': 'io', 'name': '读取文件2', 'duration': 2},
        {'type': 'cpu', 'name': '计算1', 'n': 500000},
        {'type': 'cpu', 'name': '计算2', 'n': 1000000},
        {'type': 'network', 'name': 'API请求1', 'url': 'https://api1.com'},
        {'type': 'network', 'name': 'API请求2', 'url': 'https://api2.com'}
    ]
    
    start_time = time.time()
    
    try:
        results = await processor.process_tasks(tasks)
        print(f"
所有任务完成,结果:")
        for result in results:
            print(f"  - {result}")
    
    finally:
        processor.close()
    
    end_time = time.time()
    print(f"
总执行时间: {end_time - start_time:.2f} 秒")

asyncio.run(best_practices_example())

今日挑战:

创建一个完整的Web爬虫,使用多种并发技术来高效地抓取网页。

# 挑战练习:并发Web爬虫
import asyncio
import aiohttp
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
import time
from collections import deque
import async_timeout

class ConcurrentWebCrawler:
    """并发Web爬虫"""
    
    def __init__(self, max_concurrent=10, max_pages=50):
        self.max_concurrent = max_concurrent
        self.max_pages = max_pages
        self.visited = set()
        self.to_visit = deque()
        self.results = []
        self.session = None
    
    async def crawl(self, start_url):
        """开始爬取"""
        print(f"开始爬取: {start_url}")
        self.to_visit.append(start_url)
        
        async with aiohttp.ClientSession() as session:
            self.session = session
            tasks = set()
            
            while self.to_visit or tasks:
                # 创建新任务
                while self.to_visit and len(tasks) < self.max_concurrent and len(self.visited) < self.max_pages:
                    url = self.to_visit.popleft()
                    if url not in self.visited:
                        self.visited.add(url)
                        task = asyncio.create_task(self.fetch_page(url))
                        tasks.add(task)
                
                # 等待任务完成
                if tasks:
                    done, tasks = await asyncio.wait(
                        tasks, return_when=asyncio.FIRST_COMPLETED
                    )
                    
                    for task in done:
                        try:
                            result = await task
                            if result:
                                self.results.append(result)
                        except Exception as e:
                            print(f"爬取失败: {e}")
        
        return self.results
    
    async def fetch_page(self, url):
        """获取页面内容"""
        try:
            async with async_timeout.timeout(10):
                async with self.session.get(url) as response:
                    if response.status == 200:
                        html = await response.text()
                        title, links = self.parse_page(html, url)
                        
                        print(f"成功爬取: {url} - {title}")
                        return {
                            'url': url,
                            'title': title,
                            'links_found': len(links)
                        }
                    else:
                        print(f"HTTP错误: {url} - {response.status}")
        except asyncio.TimeoutError:
            print(f"超时: {url}")
        except Exception as e:
            print(f"错误爬取 {url}: {e}")
        
        return None
    
    def parse_page(self, html, base_url):
        """解析页面"""
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.find('title')
        title_text = title.get_text() if title else "无标题"
        
        # 提取链接
        links = []
        for link in soup.find_all('a', href=True):
            href = link['href']
            absolute_url = urljoin(base_url, href)
            if self.is_valid_url(absolute_url):
                links.append(absolute_url)
                if absolute_url not in self.visited and absolute_url not in self.to_visit:
                    self.to_visit.append(absolute_url)
        
        return title_text, links
    
    def is_valid_url(self, url):
        """检查URL是否有效"""
        parsed = urlparse(url)
        return bool(parsed.netloc) and parsed.scheme in ['http', 'https']
    
    def close(self):
        """关闭资源"""
        if self.session:
            asyncio.run(self.session.close())

async def web_crawler_demo():
    """Web爬虫演示"""
    print("=== 并发Web爬虫演示 ===")
    
    # 注意:实际使用时请遵守网站的robots.txt和使用条款
    # 这里使用示例网站进行演示
    start_urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/links/10/0"
    ]
    
    crawler = ConcurrentWebCrawler(max_concurrent=5, max_pages=10)
    
    try:
        start_time = time.time()
        
        results = []
        for start_url in start_urls:
            result = await crawler.crawl(start_url)
            results.extend(result)
        
        end_time = time.time()
        
        print(f"
爬取完成!")
        print(f"总共爬取页面: {len(results)}")
        print(f"总耗时: {end_time - start_time:.2f} 秒")
        
        print(f"
爬取结果:")
        for result in results[:5]:  # 显示前5个结果
            print(f"URL: {result['url']}")
            print(f"标题: {result['title']}")
            print(f"发现链接: {result['links_found']}")
            print("-" * 50)
    
    finally:
        crawler.close()

# 由于网络请求需要,这里注释掉实际运行,你可以撤销注释来测试
# asyncio.run(web_crawler_demo())

# 替代方案:使用线程池的简单爬虫
def simple_threaded_crawler():
    """简单的多线程爬虫"""
    print("=== 简单多线程爬虫 ===")
    
    def fetch_url(url):
        """获取URL内容"""
        try:
            response = requests.get(url, timeout=5)
            if response.status_code == 200:
                soup = BeautifulSoup(response.text, 'html.parser')
                title = soup.find('title')
                return {
                    'url': url,
                    'title': title.get_text() if title else "无标题",
                    'status': '成功'
                }
        except Exception as e:
            return {'url': url, 'title': f"错误: {e}", 'status': '失败'}
    
    urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404"
    ]
    
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(fetch_url, urls))
    
    end_time = time.time()
    
    print(f"爬取完成,耗时: {end_time - start_time:.2f} 秒")
    for result in results:
        print(f"URL: {result['url']}, 状态: {result['status']}, 标题: {result['title']}")

# 运行简单爬虫
simple_threaded_crawler()

学习提示:

  1. 多线程:适合I/O密集型任务,如网络请求、文件操作
  2. 多进程:适合CPU密集型任务,如数学计算、图像处理
  3. 异步编程:适合高并发的I/O操作,使用async/await语法
  4. 线程安全:使用锁保护共享资源,避免竞态条件
  5. 资源管理:正确关闭线程池、进程池和会话
  6. 错误处理:妥善处理并发环境中的异常
  7. 性能思考:根据任务类型选择合适的并发模型

明天我们将学习网络编程!坚持练习,你的并发编程能力会越来越强!

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 共2条

请登录后发表评论