python疯狂练习60天——第26天:Python并发编程

今天我们将学习Python的并发编程,包括多线程、多进程和异步编程。这些技术可以协助我们提高程序执行效率,特别是在处理I/O密集型任务和计算密集型任务时。

第一部分:并发编程基础

1.1 并发 vs 并行

  • 并发:多个任务交替执行,看起来同时进行
  • 并行:多个任务真正同时执行,需要多核CPU支持

1.2 Python并发编程的三种方式

  • 多线程:适合I/O密集型任务
  • 多进程:适合计算密集型任务
  • 异步编程:适合高并发I/O操作

第二部分:多线程编程

import threading
import time
import requests
from queue import Queue
import os

class ThreadingDemo:
    def __init__(self):
        self.results = []
        self.lock = threading.Lock()
    
    def simple_thread_demo(self):
        """简单的多线程演示"""
        print("=== 简单多线程演示 ===")
        
        def worker(thread_id, delay):
            print(f"线程 {thread_id} 开始执行,延迟 {delay} 秒")
            time.sleep(delay)
            print(f"线程 {thread_id} 执行完成")
        
        # 创建并启动线程
        threads = []
        for i in range(5):
            t = threading.Thread(target=worker, args=(i, i+1))
            threads.append(t)
            t.start()
        
        # 等待所有线程完成
        for t in threads:
            t.join()
        
        print("所有线程执行完成!")
    
    def thread_with_lock(self):
        """使用锁的线程安全操作"""
        print("
=== 线程锁演示 ===")
        
        shared_counter = 0
        
        def increment_counter(thread_id, iterations):
            nonlocal shared_counter
            for i in range(iterations):
                # 使用锁确保线程安全
                with self.lock:
                    current = shared_counter
                    time.sleep(0.001)  # 模拟一些处理时间
                    shared_counter = current + 1
                print(f"线程 {thread_id} 增加计数器: {shared_counter}")
        
        threads = []
        for i in range(3):
            t = threading.Thread(target=increment_counter, args=(i, 5))
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        print(f"最终计数器值: {shared_counter} (应该是15)")
    
    def download_with_threads(self, urls):
        """使用多线程下载文件"""
        print(f"
=== 多线程下载演示 - {len(urls)}个文件 ===")
        
        def download_file(url, filename):
            try:
                start_time = time.time()
                response = requests.get(url, stream=True, timeout=30)
                
                if response.status_code == 200:
                    with open(filename, 'wb') as f:
                        for chunk in response.iter_content(chunk_size=8192):
                            f.write(chunk)
                    
                    download_time = time.time() - start_time
                    file_size = os.path.getsize(filename)
                    
                    with self.lock:
                        self.results.append({
                            'url': url,
                            'filename': filename,
                            'size': file_size,
                            'time': download_time
                        })
                    
                    print(f"下载完成: {filename} ({file_size} bytes, {download_time:.2f}秒)")
                else:
                    print(f"下载失败: {url}, 状态码: {response.status_code}")
                    
            except Exception as e:
                print(f"下载错误 {url}: {e}")
        
        # 创建下载线程
        threads = []
        start_time = time.time()
        
        for i, url in enumerate(urls):
            filename = f"download_{i+1}.jpg"
            t = threading.Thread(target=download_file, args=(url, filename))
            threads.append(t)
            t.start()
        
        # 等待所有线程完成
        for t in threads:
            t.join()
        
        total_time = time.time() - start_time
        print(f"
所有下载完成! 总时间: {total_time:.2f}秒")
        
        # 显示统计信息
        total_size = sum(r['size'] for r in self.results)
        print(f"总下载大小: {total_size} bytes")
        print(f"平均下载速度: {total_size / total_time / 1024:.2f} KB/s")

# 运行多线程演示
def run_threading_demo():
    demo = ThreadingDemo()
    
    # 简单线程演示
    demo.simple_thread_demo()
    
    # 线程锁演示
    demo.thread_with_lock()
    
    # 多线程下载演示
    urls = [
        "https://httpbin.org/image/jpeg",
        "https://httpbin.org/image/png",
        "https://httpbin.org/image/svg",
    ] * 2  # 重复URL来演示并发效果
    
    demo.download_with_threads(urls)

if __name__ == "__main__":
    run_threading_demo()

第三部分:线程池和Queue

from concurrent.futures import ThreadPoolExecutor, as_completed
import random

class ThreadPoolDemo:
    def __init__(self):
        self.task_queue = Queue()
        self.results = []
    
    def producer_consumer_demo(self):
        """生产者-消费者模式演示"""
        print("
=== 生产者-消费者模式 ===")
        
        def producer(queue, num_tasks):
            """生产者:生成任务"""
            for i in range(num_tasks):
                task = f"任务-{i+1}"
                queue.put(task)
                print(f"生产者添加: {task}")
                time.sleep(0.1)  # 模拟生产时间
            # 添加结束信号
            for _ in range(3):  # 3个消费者
                queue.put(None)
        
        def consumer(queue, consumer_id):
            """消费者:处理任务"""
            while True:
                task = queue.get()
                if task is None:
                    print(f"消费者 {consumer_id} 结束")
                    queue.task_done()
                    break
                
                print(f"消费者 {consumer_id} 处理: {task}")
                # 模拟处理时间
                time.sleep(random.uniform(0.5, 1.5))
                queue.task_done()
        
        # 创建队列和线程
        num_tasks = 10
        num_consumers = 3
        
        # 启动生产者
        producer_thread = threading.Thread(target=producer, args=(self.task_queue, num_tasks))
        producer_thread.start()
        
        # 启动消费者
        consumer_threads = []
        for i in range(num_consumers):
            t = threading.Thread(target=consumer, args=(self.task_queue, i+1))
            t.start()
            consumer_threads.append(t)
        
        # 等待所有任务完成
        self.task_queue.join()
        producer_thread.join()
        for t in consumer_threads:
            t.join()
        
        print("所有任务处理完成!")
    
    def thread_pool_executor_demo(self):
        """使用ThreadPoolExecutor"""
        print("
=== ThreadPoolExecutor演示 ===")
        
        def process_task(task_id, processing_time):
            """模拟任务处理"""
            print(f"任务 {task_id} 开始执行,需要 {processing_time} 秒")
            time.sleep(processing_time)
            result = f"任务 {task_id} 完成,耗时 {processing_time} 秒"
            return result
        
        # 使用线程池
        with ThreadPoolExecutor(max_workers=3) as executor:
            # 提交任务
            futures = []
            for i in range(10):
                processing_time = random.uniform(0.5, 2.0)
                future = executor.submit(process_task, i+1, processing_time)
                futures.append(future)
            
            # 获取结果
            for future in as_completed(futures):
                try:
                    result = future.result()
                    print(f"完成: {result}")
                except Exception as e:
                    print(f"任务执行出错: {e}")
        
        print("所有线程池任务完成!")
    
    def url_status_checker(self, urls, max_workers=5):
        """使用线程池检查URL状态"""
        print(f"
=== URL状态检查 (线程数: {max_workers}) ===")
        
        def check_url_status(url):
            """检查单个URL的状态"""
            try:
                start_time = time.time()
                response = requests.get(url, timeout=10)
                response_time = time.time() - start_time
                
                return {
                    'url': url,
                    'status_code': response.status_code,
                    'response_time': response_time,
                    'size': len(response.content),
                    'error': None
                }
            except Exception as e:
                return {
                    'url': url,
                    'status_code': None,
                    'response_time': None,
                    'size': 0,
                    'error': str(e)
                }
        
        start_time = time.time()
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_url = {executor.submit(check_url_status, url): url for url in urls}
            
            # 收集结果
            results = []
            for future in as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    if result['error']:
                        print(f"❌ {url} - 错误: {result['error']}")
                    else:
                        print(f"✅ {url} - 状态: {result['status_code']}, "
                              f"响应时间: {result['response_time']:.2f}s")
                        
                except Exception as e:
                    print(f"❌ {url} - 异常: {e}")
        
        total_time = time.time() - start_time
        
        # 统计信息
        successful = [r for r in results if r['error'] is None]
        failed = [r for r in results if r['error'] is not None]
        
        print(f"
检查完成!")
        print(f"总URL数: {len(urls)}")
        print(f"成功: {len(successful)}")
        print(f"失败: {len(failed)}")
        print(f"总时间: {total_time:.2f}秒")
        print(f"平均每个URL: {total_time/len(urls):.2f}秒")
        
        return results

# 运行线程池演示
def run_thread_pool_demo():
    demo = ThreadPoolDemo()
    
    # 生产者-消费者演示
    demo.producer_consumer_demo()
    
    # 线程池执行器演示
    demo.thread_pool_executor_demo()
    
    # URL状态检查
    test_urls = [
        "https://www.baidu.com",
        "https://www.google.com",
        "https://www.github.com",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://httpbin.org/status/500",
        "https://www.python.org",
        "https://www.qq.com",
        "https://invalid-website-12345.com",  # 这个会失败
    ] * 2  # 重复URL来测试并发
    
    demo.url_status_checker(test_urls, max_workers=5)

if __name__ == "__main__":
    run_thread_pool_demo()

第四部分:多进程编程

import multiprocessing
import math
import os

class MultiprocessingDemo:
    def __init__(self):
        self.cpu_count = multiprocessing.cpu_count()
        print(f"CPU核心数: {self.cpu_count}")
    
    def cpu_intensive_task(self, n):
        """CPU密集型任务:计算素数"""
        def is_prime(num):
            if num < 2:
                return False
            for i in range(2, int(math.sqrt(num)) + 1):
                if num % i == 0:
                    return False
            return True
        
        primes = []
        for i in range(2, n + 1):
            if is_prime(i):
                primes.append(i)
        
        return primes
    
    def compare_thread_vs_process(self, n=10000):
        """比较多线程和多进程在CPU密集型任务上的性能"""
        print("
=== 多线程 vs 多进程性能比较 ===")
        
        # 多线程版本
        print("多线程执行中...")
        start_time = time.time()
        
        threads = []
        for _ in range(self.cpu_count):
            t = threading.Thread(target=self.cpu_intensive_task, args=(n,))
            threads.append(t)
            t.start()
        
        for t in threads:
            t.join()
        
        thread_time = time.time() - start_time
        print(f"多线程执行时间: {thread_time:.2f}秒")
        
        # 多进程版本
        print("多进程执行中...")
        start_time = time.time()
        
        with multiprocessing.Pool(processes=self.cpu_count) as pool:
            results = pool.map(self.cpu_intensive_task, [n] * self.cpu_count)
        
        process_time = time.time() - start_time
        print(f"多进程执行时间: {process_time:.2f}秒")
        
        print(f"性能提升: {thread_time/process_time:.2f}倍")
    
    def process_pool_demo(self):
        """进程池演示"""
        print("
=== 进程池演示 ===")
        
        def worker_process(task_id, data):
            """工作进程函数"""
            pid = os.getpid()
            print(f"进程 {pid} 处理任务 {task_id}")
            
            # 模拟一些处理
            result = sum(i * i for i in range(data))
            
            time.sleep(1)  # 模拟处理时间
            return f"任务 {task_id} 结果: {result} (进程: {pid})"
        
        # 准备任务数据
        tasks = [(i, i * 1000) for i in range(1, 11)]
        
        print(f"使用进程池处理 {len(tasks)} 个任务")
        start_time = time.time()
        
        with multiprocessing.Pool(processes=4) as pool:
            # 使用starmap处理多参数任务
            results = pool.starmap(worker_process, tasks)
        
        total_time = time.time() - start_time
        
        print("
处理结果:")
        for result in results:
            print(f"  {result}")
        
        print(f"
总处理时间: {total_time:.2f}秒")
        print(f"顺序执行预计时间: {len(tasks)}秒")
        print(f"加速比: {len(tasks)/total_time:.2f}倍")
    
    def shared_memory_demo(self):
        """共享内存演示"""
        print("
=== 多进程共享内存 ===")
        
        def worker_with_shared_array(process_id, shared_arr, lock):
            """使用共享数组的工作进程"""
            with lock:
                # 修改共享数组
                for i in range(len(shared_arr)):
                    shared_arr[i] += process_id * 10
                print(f"进程 {process_id} 修改后的数组: {list(shared_arr)}")
        
        # 创建共享数组和锁
        with multiprocessing.Manager() as manager:
            shared_array = manager.list([1, 2, 3, 4, 5])
            lock = manager.Lock()
            
            processes = []
            for i in range(3):
                p = multiprocessing.Process(
                    target=worker_with_shared_array,
                    args=(i+1, shared_array, lock)
                )
                processes.append(p)
                p.start()
            
            for p in processes:
                p.join()
            
            print(f"最终共享数组: {list(shared_array)}")
    
    def process_communication_demo(self):
        """进程间通信演示"""
        print("
=== 进程间通信 ===")
        
        def producer(queue, num_items):
            """生产者进程"""
            for i in range(num_items):
                item = f"消息-{i+1}"
                queue.put(item)
                print(f"生产者发送: {item}")
                time.sleep(0.1)
            queue.put("END")  # 结束信号
        
        def consumer(queue, consumer_id):
            """消费者进程"""
            while True:
                item = queue.get()
                if item == "END":
                    print(f"消费者 {consumer_id} 收到结束信号")
                    break
                print(f"消费者 {consumer_id} 收到: {item}")
                time.sleep(0.2)  # 模拟处理时间
        
        # 创建队列
        queue = multiprocessing.Queue()
        
        # 创建进程
        producer_process = multiprocessing.Process(
            target=producer, args=(queue, 10)
        )
        
        consumer_processes = []
        for i in range(2):
            p = multiprocessing.Process(
                target=consumer, args=(queue, i+1)
            )
            consumer_processes.append(p)
        
        # 启动进程
        producer_process.start()
        for p in consumer_processes:
            p.start()
        
        # 等待完成
        producer_process.join()
        for p in consumer_processes:
            p.join()
        
        print("所有进程通信完成!")

# 运行多进程演示
def run_multiprocessing_demo():
    demo = MultiprocessingDemo()
    
    # 性能比较
    demo.compare_thread_vs_process(n=5000)
    
    # 进程池演示
    demo.process_pool_demo()
    
    # 共享内存演示
    demo.shared_memory_demo()
    
    # 进程通信演示
    demo.process_communication_demo()

if __name__ == "__main__":
    run_multiprocessing_demo()

第五部分:异步编程基础

import asyncio
import aiohttp
import async_timeout

class AsyncIODemo:
    def __init__(self):
        pass
    
    async def simple_async_demo(self):
        """简单的异步演示"""
        print("=== 简单异步演示 ===")
        
        async def say_hello(name, delay):
            """异步打招呼"""
            print(f"{name} 开始,等待 {delay} 秒")
            await asyncio.sleep(delay)
            print(f"Hello, {name}! (等待了 {delay} 秒)")
            return f"{name} 完成"
        
        # 创建多个异步任务
        tasks = [
            say_hello("Alice", 2),
            say_hello("Bob", 1),
            say_hello("Charlie", 3),
            say_hello("David", 1.5)
        ]
        
        # 并行执行所有任务
        results = await asyncio.gather(*tasks)
        print(f"所有任务完成: {results}")
    
    async def async_url_checker(self, urls):
        """异步URL状态检查"""
        print(f"
=== 异步URL检查 - {len(urls)}个URL ===")
        
        async def check_url(session, url):
            """检查单个URL"""
            try:
                async with async_timeout.timeout(10):
                    async with session.get(url) as response:
                        return {
                            'url': url,
                            'status': response.status,
                            'content_type': response.headers.get('Content-Type', 'Unknown')
                        }
            except asyncio.TimeoutError:
                return {'url': url, 'error': 'Timeout'}
            except Exception as e:
                return {'url': url, 'error': str(e)}
        
        start_time = time.time()
        results = []
        
        async with aiohttp.ClientSession() as session:
            tasks = [check_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
        
        total_time = time.time() - start_time
        
        # 显示结果
        success_count = 0
        for result in results:
            if 'error' in result:
                print(f"❌ {result['url']} - 错误: {result['error']}")
            else:
                print(f"✅ {result['url']} - 状态: {result['status']}")
                success_count += 1
        
        print(f"
检查完成!")
        print(f"成功: {success_count}/{len(urls)}")
        print(f"总时间: {total_time:.2f}秒")
        
        return results
    
    async def async_producer_consumer(self):
        """异步生产者-消费者模式"""
        print("
=== 异步生产者-消费者 ===")
        
        queue = asyncio.Queue(maxsize=5)
        
        async def producer(name, num_items):
            """异步生产者"""
            for i in range(num_items):
                item = f"{name}-产品-{i+1}"
                await queue.put(item)
                print(f"生产者 {name} 生产: {item}")
                await asyncio.sleep(0.5)  # 模拟生产时间
            
            # 发送结束信号
            await queue.put(f"{name}-END")
            print(f"生产者 {name} 完成")
        
        async def consumer(name):
            """异步消费者"""
            end_count = 0
            total_producers = 2  # 我们知道有2个生产者
            
            while end_count < total_producers:
                item = await queue.get()
                
                if item.endswith("-END"):
                    end_count += 1
                    print(f"消费者 {name} 收到结束信号 ({end_count}/{total_producers})")
                else:
                    print(f"消费者 {name} 消费: {item}")
                    await asyncio.sleep(1)  # 模拟消费时间
                
                queue.task_done()
            
            print(f"消费者 {name} 完成")
        
        # 启动生产者和消费者
        producers = [
            asyncio.create_task(producer("P1", 3)),
            asyncio.create_task(producer("P2", 2))
        ]
        
        consumers = [
            asyncio.create_task(consumer("C1")),
            asyncio.create_task(consumer("C2"))
        ]
        
        # 等待所有生产者完成
        await asyncio.gather(*producers)
        
        # 等待所有任务处理完成
        await queue.join()
        
        # 撤销消费者任务
        for c in consumers:
            c.cancel()
        
        print("异步生产者-消费者完成!")
    
    async def run_benchmark(self):
        """性能基准测试:同步 vs 异步"""
        print("
=== 性能基准测试 ===")
        
        urls = [
            "https://httpbin.org/delay/1",  # 这个端点会延迟1秒响应
            "https://httpbin.org/delay/1",
            "https://httpbin.org/delay/1",
            "https://httpbin.org/delay/1",
            "https://httpbin.org/delay/1",
        ]
        
        # 同步版本
        print("同步版本执行中...")
        start_time = time.time()
        
        session = requests.Session()
        for url in urls:
            try:
                response = session.get(url, timeout=10)
                print(f"同步: {url} - 状态: {response.status_code}")
            except Exception as e:
                print(f"同步: {url} - 错误: {e}")
        
        sync_time = time.time() - start_time
        print(f"同步版本总时间: {sync_time:.2f}秒")
        
        # 异步版本
        print("异步版本执行中...")
        start_time = time.time()
        
        async with aiohttp.ClientSession() as session:
            tasks = []
            for url in urls:
                task = asyncio.create_task(self.fetch_with_session(session, url))
                tasks.append(task)
            
            results = await asyncio.gather(*tasks)
            
            for result in results:
                if 'error' in result:
                    print(f"异步: {result['url']} - 错误: {result['error']}")
                else:
                    print(f"异步: {result['url']} - 状态: {result['status']}")
        
        async_time = time.time() - start_time
        print(f"异步版本总时间: {async_time:.2f}秒")
        
        print(f"性能提升: {sync_time/async_time:.2f}倍")
    
    async def fetch_with_session(self, session, url):
        """使用会话获取URL"""
        try:
            async with async_timeout.timeout(10):
                async with session.get(url) as response:
                    return {
                        'url': url,
                        'status': response.status
                    }
        except Exception as e:
            return {
                'url': url,
                'error': str(e)
            }

# 运行异步演示
async def run_async_demo():
    demo = AsyncIODemo()
    
    # 简单异步演示
    await demo.simple_async_demo()
    
    # 异步URL检查
    test_urls = [
        "https://www.baidu.com",
        "https://www.python.org",
        "https://httpbin.org/status/200",
        "https://httpbin.org/status/404",
        "https://invalid-url-test.com",
    ]
    
    await demo.async_url_checker(test_urls)
    
    # 异步生产者消费者
    await demo.async_producer_consumer()
    
    # 性能基准测试
    await demo.run_benchmark()

# 运行所有演示
def main():
    print("=" * 60)
    print("Python并发编程演示")
    print("=" * 60)
    
    # 多线程演示
    run_threading_demo()
    
    # 线程池演示
    run_thread_pool_demo()
    
    # 多进程演示
    run_multiprocessing_demo()
    
    # 异步编程演示
    print("
" + "=" * 60)
    print("异步编程演示")
    print("=" * 60)
    asyncio.run(run_async_demo())

if __name__ == "__main__":
    main()

第六部分:综合项目 – 并发Web爬虫

class ConcurrentWebCrawler:
    def __init__(self, max_workers=10, use_async=False):
        self.max_workers = max_workers
        self.use_async = use_async
        self.visited_urls = set()
        self.lock = threading.Lock()
        self.results = []
    
    async def async_crawl(self, start_urls, max_pages=50):
        """异步爬虫"""
        print(f"启动异步爬虫,最大页面: {max_pages}")
        
        async with aiohttp.ClientSession() as session:
            tasks = set()
            
            # 初始URLs
            for url in start_urls:
                if len(self.visited_urls) < max_pages:
                    task = asyncio.create_task(self.async_crawl_page(session, url, max_pages))
                    tasks.add(task)
            
            while tasks and len(self.visited_urls) < max_pages:
                done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
                
                for task in done:
                    try:
                        new_urls = await task
                        for url in new_urls:
                            if url not in self.visited_urls and len(self.visited_urls) < max_pages:
                                self.visited_urls.add(url)
                                new_task = asyncio.create_task(self.async_crawl_page(session, url, max_pages))
                                tasks.add(new_task)
                    except Exception as e:
                        print(f"爬取任务出错: {e}")
            
            # 撤销剩余任务
            for task in tasks:
                task.cancel()
        
        print(f"异步爬虫完成! 总共爬取 {len(self.results)} 个页面")
        return self.results
    
    async def async_crawl_page(self, session, url, max_pages):
        """异步爬取单个页面"""
        if len(self.visited_urls) >= max_pages:
            return []
        
        print(f"异步爬取: {url}")
        
        try:
            async with async_timeout.timeout(10):
                async with session.get(url) as response:
                    if response.status != 200:
                        return []
                    
                    html = await response.text()
                    
                    # 解析页面
                    soup = BeautifulSoup(html, 'html.parser')
                    
                    # 提取信息
                    page_data = {
                        'url': url,
                        'title': soup.title.string.strip() if soup.title else '无标题',
                        'links_found': 0,
                        'status': response.status
                    }
                    
                    with self.lock:
                        self.results.append(page_data)
                    
                    # 提取新链接
                    new_urls = []
                    for link in soup.find_all('a', href=True):
                        href = link['href']
                        full_url = self.normalize_url(url, href)
                        if full_url and full_url not in self.visited_urls:
                            new_urls.append(full_url)
                    
                    page_data['links_found'] = len(new_urls)
                    
                    return new_urls[:10]  # 限制新链接数量
                    
        except Exception as e:
            print(f"异步爬取失败 {url}: {e}")
            return []
    
    def normalize_url(self, base_url, href):
        """标准化URL"""
        try:
            full_url = urljoin(base_url, href)
            if full_url.startswith(('http://', 'https://')):
                return full_url
        except:
            pass
        return None
    
    def thread_crawl(self, start_urls, max_pages=50):
        """多线程爬虫"""
        print(f"启动多线程爬虫,工作线程: {self.max_workers}")
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交初始任务
            futures = set()
            for url in start_urls:
                if len(self.visited_urls) < max_pages:
                    future = executor.submit(self.thread_crawl_page, url, max_pages)
                    futures.add(future)
            
            while futures and len(self.visited_urls) < max_pages:
                # 等待任意任务完成
                done, futures = futures, set()
                for future in done:
                    try:
                        new_urls = future.result(timeout=30)
                        for url in new_urls:
                            if url not in self.visited_urls and len(self.visited_urls) < max_pages:
                                self.visited_urls.add(url)
                                new_future = executor.submit(self.thread_crawl_page, url, max_pages)
                                futures.add(new_future)
                    except Exception as e:
                        print(f"爬取任务出错: {e}")
                
                # 添加新的futures
                for future in futures:
                    pass
        
        print(f"多线程爬虫完成! 总共爬取 {len(self.results)} 个页面")
        return self.results
    
    def thread_crawl_page(self, url, max_pages):
        """多线程爬取单个页面"""
        if len(self.visited_urls) >= max_pages:
            return []
        
        print(f"线程爬取: {url}")
        
        try:
            response = requests.get(url, timeout=10, headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            })
            
            if response.status_code != 200:
                return []
            
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # 提取信息
            page_data = {
                'url': url,
                'title': soup.title.string.strip() if soup.title else '无标题',
                'links_found': 0,
                'status': response.status_code
            }
            
            with self.lock:
                self.results.append(page_data)
            
            # 提取新链接
            new_urls = []
            for link in soup.find_all('a', href=True):
                href = link['href']
                full_url = self.normalize_url(url, href)
                if full_url and full_url not in self.visited_urls:
                    new_urls.append(full_url)
            
            page_data['links_found'] = len(new_urls)
            
            return new_urls[:10]  # 限制新链接数量
            
        except Exception as e:
            print(f"线程爬取失败 {url}: {e}")
            return []
    
    def compare_crawlers(self, start_urls, max_pages=20):
        """比较不同爬虫性能"""
        print("
" + "="*60)
        print("爬虫性能比较")
        print("="*60)
        
        # 重置状态
        self.visited_urls.clear()
        self.results.clear()
        
        # 多线程爬虫
        print("
1. 多线程爬虫测试")
        start_time = time.time()
        thread_results = self.thread_crawl(start_urls, max_pages)
        thread_time = time.time() - start_time
        
        # 重置状态
        self.visited_urls.clear()
        self.results.clear()
        
        # 异步爬虫
        print("
2. 异步爬虫测试")
        start_time = time.time()
        async_results = asyncio.run(self.async_crawl(start_urls, max_pages))
        async_time = time.time() - start_time
        
        # 显示比较结果
        print("
" + "="*60)
        print("性能比较结果")
        print("="*60)
        print(f"多线程爬虫:")
        print(f"  爬取页面: {len(thread_results)}")
        print(f"  总时间: {thread_time:.2f}秒")
        print(f"  页面/秒: {len(thread_results)/thread_time:.2f}")
        
        print(f"异步爬虫:")
        print(f"  爬取页面: {len(async_results)}")
        print(f"  总时间: {async_time:.2f}秒")
        print(f"  页面/秒: {len(async_results)/async_time:.2f}")
        
        if thread_time > 0 and async_time > 0:
            ratio = thread_time / async_time
            print(f"性能差异: 异步爬虫是线程爬虫的 {ratio:.2f} 倍")

# 运行爬虫比较
def run_crawler_comparison():
    crawler = ConcurrentWebCrawler(max_workers=5)
    
    # 使用一些安全的测试URL
    start_urls = [
        "https://httpbin.org/html",
        "https://httpbin.org/links/10/0",
        "https://httpbin.org/links/5/0",
    ]
    
    crawler.compare_crawlers(start_urls, max_pages=15)

if __name__ == "__main__":
    # 运行所有演示
    main()
    
    # 运行爬虫比较
    run_crawler_comparison()

今日重点学习内容

  1. 多线程编程:适合I/O密集型任务,使用threading模块
  2. 线程安全:使用锁保护共享资源
  3. 线程池:使用ThreadPoolExecutor管理线程
  4. 多进程编程:适合CPU密集型任务,使用multiprocessing模块
  5. 进程间通信:使用Queue、Pipe等
  6. 异步编程:使用asyncio处理高并发I/O

选择指南

  • I/O密集型:多线程或异步编程
  • CPU密集型:多进程编程
  • 高并发I/O:异步编程(asyncio)
  • 简单并行:线程池/进程池

注意事项

  1. GIL限制:Python的全局解释器锁限制多线程的CPU并行
  2. 资源管理:合理控制线程/进程数量
  3. 死锁预防:避免循环等待锁的情况
  4. 异常处理:妥善处理并发任务中的异常

扩展练习

  1. 实现一个并发下载管理器
  2. 构建实时数据处理的并发管道
  3. 实现一个支持并发的Web服务器
  4. 比较不同并发模式在具体场景下的性能

明日预告

第27天:我们将学习Python的数据分析和科学计算,包括NumPy、Pandas和Matplotlib等库的使用。

今天先运行这些并发编程的示例,理解不同并发模式的特点和适用场景。有什么问题随时问我!

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
-Leeleven-的头像 - 鹿快
评论 共1条

请登录后发表评论

    暂无评论内容