netty异步日志架构

当Netty处理大量日志时,需要从多个层面进行优化,避免日志本身成为系统瓶颈。

1. 异步日志架构

1.1 异步日志处理器

java

@Component
public class AsyncLogHandler extends SimpleChannelInboundHandler<LogMessage> {
    
    // 高性能无锁队列
    private final Disruptor<LogEvent> disruptor;
    private final RingBuffer<LogEvent> ringBuffer;
    
    // 批量处理配置
    private static final int BATCH_SIZE = 1000;
    private static final int BUFFER_SIZE = 65536;
    private final List<LogMessage> batchBuffer = new ArrayList<>(BATCH_SIZE);
    private final ScheduledExecutorService flushScheduler = 
        Executors.newScheduledThreadPool(1);
    
    @PostConstruct
    public void init() {
        // 初始化Disruptor
        disruptor = new Disruptor<>(
            LogEvent::new,
            BUFFER_SIZE,
            Executors.defaultThreadFactory(),
            ProducerType.MULTI,
            new BlockingWaitStrategy()
        );
        
        disruptor.handleEventsWith(new LogEventHandler());
        disruptor.start();
        ringBuffer = disruptor.getRingBuffer();
        
        // 启动定时刷新
        flushScheduler.scheduleAtFixedRate(this::flushBatch, 100, 100, TimeUnit.MILLISECONDS);
    }
    
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LogMessage logMessage) {
        // 异步写入队列,不阻塞IO线程
        long sequence = ringBuffer.next();
        try {
            LogEvent event = ringBuffer.get(sequence);
            event.setLogMessage(logMessage);
            event.setReceiveTime(System.currentTimeMillis());
        } finally {
            ringBuffer.publish(sequence);
        }
    }
    
    // Disruptor事件处理器
    private class LogEventHandler implements EventHandler<LogEvent> {
        @Override
        public void onEvent(LogEvent event, long sequence, boolean endOfBatch) {
            processLogEvent(event);
            
            // 批量处理完成时刷新
            if (endOfBatch) {
                flushBatch();
            }
        }
    }
    
    private void processLogEvent(LogEvent event) {
        batchBuffer.add(event.getLogMessage());
        
        if (batchBuffer.size() >= BATCH_SIZE) {
            flushBatch();
        }
    }
    
    private void flushBatch() {
        if (batchBuffer.isEmpty()) {
            return;
        }
        
        List<LogMessage> logsToProcess = new ArrayList<>(batchBuffer);
        batchBuffer.clear();
        
        // 异步处理批量日志
        CompletableFuture.runAsync(() -> processBatchLogs(logsToProcess))
            .exceptionally(ex -> {
                log.error("批量处理日志异常", ex);
                return null;
            });
    }
}

1.2 日志分级处理策略

java

@Component
public class LogLevelProcessor {
    
    // 不同级别日志采用不同处理策略
    private final Map<LogLevel, LogStrategy> strategyMap = new EnumMap<>(LogLevel.class);
    
    @PostConstruct
    public void init() {
        // ERROR级别 - 实时处理
        strategyMap.put(LogLevel.ERROR, new RealTimeLogStrategy());
        
        // WARN级别 - 快速处理  
        strategyMap.put(LogLevel.WARN, new FastLogStrategy());
        
        // INFO级别 - 批量处理
        strategyMap.put(LogLevel.INFO, new BatchLogStrategy());
        
        // DEBUG级别 - 采样处理
        strategyMap.put(LogLevel.DEBUG, new SamplingLogStrategy(0.1)); // 10%采样
    }
    
    public void processLog(LogMessage log) {
        LogStrategy strategy = strategyMap.get(log.getLevel());
        if (strategy != null) {
            strategy.process(log);
        }
    }
    
    // 策略接口
    public interface LogStrategy {
        void process(LogMessage log);
    }
    
    // 实时处理策略
    public class RealTimeLogStrategy implements LogStrategy {
        @Override
        public void process(LogMessage log) {
            // 立即写入重要存储
            criticalStorage.save(log);
            // 触发告警
            alertService.checkAndAlert(log);
        }
    }
    
    // 采样处理策略
    public class SamplingLogStrategy implements LogStrategy {
        private final double samplingRate;
        private final Random random = new Random();
        
        public SamplingLogStrategy(double samplingRate) {
            this.samplingRate = samplingRate;
        }
        
        @Override
        public void process(LogMessage log) {
            if (random.nextDouble() < samplingRate) {
                // 只处理采样到的日志
                logStorage.save(log);
            }
        }
    }
}

2. 存储优化方案

2.1 多级存储架构

java

@Component
public class TieredLogStorage {
    
    @Autowired
    private ElasticsearchService esService;      // 热存储 - 近期数据
    @Autowired  
    private HBaseService hbaseService;          // 温存储 - 历史数据
    @Autowired
    private S3Service s3Service;                // 冷存储 - 归档数据
    
    // 时间分区策略
    private static final Duration HOT_STORAGE_DURATION = Duration.ofDays(7);
    private static final Duration WARM_STORAGE_DURATION = Duration.ofDays(30);
    
    /**
     * 根据时间选择存储层级
     */
    public void storeLog(LogMessage log) {
        long logTime = log.getTimestamp();
        long currentTime = System.currentTimeMillis();
        
        if (currentTime - logTime < HOT_STORAGE_DURATION.toMillis()) {
            // 热数据 -> Elasticsearch
            esService.indexLog(log);
        } else if (currentTime - logTime < WARM_STORAGE_DURATION.toMillis()) {
            // 温数据 -> HBase
            hbaseService.storeLog(log);
        } else {
            // 冷数据 -> 对象存储
            s3Service.archiveLog(log);
        }
    }
    
    /**
     * 批量存储优化
     */
    public void storeLogsBatch(List<LogMessage> logs) {
        // 按时间分区
        Map<StorageTier, List<LogMessage>> partitionedLogs = logs.stream()
            .collect(Collectors.groupingBy(this::classifyStorageTier));
        
        // 并行存储到不同层级
        CompletableFuture.allOf(
            CompletableFuture.runAsync(() -> 
                esService.bulkIndex(partitionedLogs.get(StorageTier.HOT))),
            CompletableFuture.runAsync(() -> 
                hbaseService.batchStore(partitionedLogs.get(StorageTier.WARM))),
            CompletableFuture.runAsync(() -> 
                s3Service.batchArchive(partitionedLogs.get(StorageTier.COLD)))
        ).exceptionally(ex -> {
            log.error("批量存储日志失败", ex);
            return null;
        });
    }
    
    private StorageTier classifyStorageTier(LogMessage log) {
        long logTime = log.getTimestamp();
        long currentTime = System.currentTimeMillis();
        long age = currentTime - logTime;
        
        if (age < HOT_STORAGE_DURATION.toMillis()) {
            return StorageTier.HOT;
        } else if (age < WARM_STORAGE_DURATION.toMillis()) {
            return StorageTier.WARM;
        } else {
            return StorageTier.COLD;
        }
    }
    
    enum StorageTier {
        HOT, WARM, COLD
    }
}

2.2 日志压缩和清理

java

@Component
public class LogCompactionService {
    
    @Autowired
    private ScheduledExecutorService compactionScheduler;
    
    @PostConstruct
    public void startCompaction() {
        // 每天凌晨执行压缩任务
        compactionScheduler.scheduleAtFixedRate(
            this::compactOldLogs, 
            1, 24, TimeUnit.HOURS
        );
    }
    
    /**
     * 日志压缩 - 合并小文件,删除过期数据
     */
    public void compactOldLogs() {
        try {
            // 1. 查找需要压缩的日志文件
            List<LogFile> filesToCompact = findFilesToCompact();
            
            // 2. 执行压缩
            compactFiles(filesToCompact);
            
            // 3. 删除过期日志
            deleteExpiredLogs();
            
        } catch (Exception e) {
            log.error("日志压缩任务执行失败", e);
        }
    }
    
    /**
     * 实时日志数据裁剪
     */
    public LogMessage compressLog(LogMessage originalLog) {
        return LogMessage.builder()
            .timestamp(originalLog.getTimestamp())
            .level(originalLog.getLevel())
            .service(originalLog.getService())
            .traceId(originalLog.getTraceId())
            .message(compressMessage(originalLog.getMessage())) // 压缩消息内容
            .essentialFields(extractEssentialFields(originalLog)) // 提取关键字段
            .build();
    }
    
    private String compressMessage(String message) {
        // 简单的消息压缩逻辑
        if (message.length() > 1024) {
            return message.substring(0, 1000) + "...[compressed]";
        }
        return message;
    }
}

3. 流量控制和降级

3.1 自适应流量控制

java

@Component
public class AdaptiveFlowController {
    
    private final AtomicLong currentRate = new AtomicLong(10000); // 初始速率
    private final AtomicLong processedCount = new AtomicLong(0);
    private final RateLimiter rateLimiter = RateLimiter.create(10000);
    
    // 系统指标监控
    private final SystemMetricsCollector metricsCollector;
    
    @Scheduled(fixedRate = 5000)
    public void adjustRate() {
        // 根据系统负载动态调整处理速率
        SystemMetrics metrics = metricsCollector.getCurrentMetrics();
        
        long newRate = calculateOptimalRate(metrics);
        currentRate.set(newRate);
        rateLimiter.setRate(newRate);
        
        log.info("调整日志处理速率: {} logs/s", newRate);
    }
    
    private long calculateOptimalRate(SystemMetrics metrics) {
        double cpuUsage = metrics.getCpuUsage();
        double memoryUsage = metrics.getMemoryUsage();
        long currentRate = this.currentRate.get();
        
        if (cpuUsage > 0.8 || memoryUsage > 0.8) {
            // 系统负载高,降低处理速率
            return (long) (currentRate * 0.7);
        } else if (cpuUsage < 0.5 && memoryUsage < 0.6) {
            // 系统负载低,提高处理速率
            return (long) (currentRate * 1.2);
        }
        
        return currentRate;
    }
    
    public boolean tryAcquire(LogMessage log) {
        // 根据日志级别调整优先级
        double cost = getAcquireCost(log.getLevel());
        return rateLimiter.tryAcquire((int) cost);
    }
    
    private double getAcquireCost(LogLevel level) {
        switch (level) {
            case ERROR: return 0.5;   // 错误日志高优先级
            case WARN: return 0.8;    // 警告日志中优先级  
            case INFO: return 1.0;    // 信息日志标准优先级
            case DEBUG: return 2.0;   // 调试日志低优先级
            default: return 1.0;
        }
    }
}

3.2 降级策略

java

@Component
public class LogDegradationService {
    
    private volatile DegradationLevel currentLevel = DegradationLevel.NORMAL;
    
    /**
     * 根据系统状态自动降级
     */
    @Scheduled(fixedRate = 3000)
    public void autoDegrade() {
        SystemMetrics metrics = systemMonitor.getMetrics();
        
        DegradationLevel newLevel = calculateDegradationLevel(metrics);
        if (newLevel != currentLevel) {
            applyDegradationLevel(newLevel);
            currentLevel = newLevel;
            log.warn("日志处理降级级别变更: {} -> {}", currentLevel, newLevel);
        }
    }
    
    private DegradationLevel calculateDegradationLevel(SystemMetrics metrics) {
        if (metrics.getCpuUsage() > 0.9 || metrics.getMemoryUsage() > 0.9) {
            return DegradationLevel.EMERGENCY;
        } else if (metrics.getCpuUsage() > 0.8 || metrics.getMemoryUsage() > 0.85) {
            return DegradationLevel.HIGH;
        } else if (metrics.getCpuUsage() > 0.7 || metrics.getMemoryUsage() > 0.75) {
            return DegradationLevel.MEDIUM;
        } else {
            return DegradationLevel.NORMAL;
        }
    }
    
    private void applyDegradationLevel(DegradationLevel level) {
        switch (level) {
            case NORMAL:
                // 正常处理所有日志
                setSamplingRate(1.0);
                setBatchSize(1000);
                setStorageTier(StorageTier.HOT);
                break;
                
            case MEDIUM:
                // 中等降级:DEBUG日志采样50%
                setSamplingRate(0.5);
                setBatchSize(500);
                break;
                
            case HIGH:
                // 高度降级:DEBUG日志采样10%,INFO日志采样50%
                setSamplingRate(0.1);
                setBatchSize(100);
                setStorageTier(StorageTier.WARM);
                break;
                
            case EMERGENCY:
                // 紧急降级:只处理ERROR和WARN日志
                setSamplingRate(0.0);
                setBatchSize(50);
                setStorageTier(StorageTier.COLD);
                break;
        }
    }
    
    enum DegradationLevel {
        NORMAL, MEDIUM, HIGH, EMERGENCY
    }
}

4. 消息队列缓冲

4.1 Kafka日志缓冲层

java

@Component
public class KafkaLogBuffer {
    
    @Autowired
    private KafkaTemplate<String, LogMessage> kafkaTemplate;
    
    // 按服务分Topic,避免单Topic压力过大
    private static final String LOG_TOPIC_PREFIX = "logs-";
    
    /**
     * 异步写入Kafka缓冲
     */
    public CompletableFuture<SendResult<String, LogMessage>> bufferLog(LogMessage log) {
        String topic = getTopicForService(log.getService());
        
        ListenableFuture<SendResult<String, LogMessage>> future = 
            kafkaTemplate.send(topic, log.getService(), log);
        
        return future.completable()
            .exceptionally(ex -> {
                // Kafka写入失败时的降级处理
                handleKafkaFailure(log, ex);
                return null;
            });
    }
    
    private String getTopicForService(String service) {
        // 根据服务名称哈希到不同的Topic
        int topicIndex = Math.abs(service.hashCode()) % 16;
        return LOG_TOPIC_PREFIX + topicIndex;
    }
    
    /**
     * Kafka消费者 - 从缓冲层消费日志
     */
    @KafkaListener(topics = "logs-${server.id}")
    public void consumeLogs(List<ConsumerRecord<String, LogMessage>> records) {
        List<LogMessage> logs = records.stream()
            .map(ConsumerRecord::value)
            .collect(Collectors.toList());
            
        // 批量处理日志
        tieredLogStorage.storeLogsBatch(logs);
    }
}

5. 监控和告警

5.1 日志处理监控

java

@Component
public class LogProcessingMonitor {
    
    private final MeterRegistry meterRegistry;
    private final Counter receivedCounter;
    private final Counter processedCounter;
    private final Counter droppedCounter;
    
    public LogProcessingMonitor(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.receivedCounter = Counter.builder("logs.received")
            .register(meterRegistry);
        this.processedCounter = Counter.builder("logs.processed")
            .register(meterRegistry);
        this.droppedCounter = Counter.builder("logs.dropped")
            .register(meterRegistry);
    }
    
    public void recordReceived() {
        receivedCounter.increment();
    }
    
    public void recordProcessed() {
        processedCounter.increment();
    }
    
    public void recordDropped() {
        droppedCounter.increment();
    }
    
    @Scheduled(fixedRate = 60000)
    public void reportMetrics() {
        double receiveRate = getReceiveRate();
        double processRate = getProcessRate();
        double dropRate = getDropRate();
        
        // 计算处理延迟
        double processingLag = receiveRate - processRate;
        
        if (processingLag > 1000) {
            // 触发告警
            alertService.alert("日志处理延迟过高: " + processingLag);
        }
        
        if (dropRate > receiveRate * 0.1) {
            // 丢弃率超过10%告警
            alertService.alert("日志丢弃率过高: " + dropRate);
        }
    }
}

6. 配置优化

6.1 可配置化处理策略

yaml

# application.yml
logging:
  processing:
    enabled: true
    async: true
    batch-size: 1000
    flush-interval: 100ms
    buffer-size: 65536
    
  storage:
    hot-tier-duration: 7d
    warm-tier-duration: 30d
    compression-enabled: true
    
  degradation:
    enabled: true
    cpu-threshold: 0.8
    memory-threshold: 0.85
    sampling-rates:
      error: 1.0
      warn: 1.0  
      info: 0.5
      debug: 0.1

总结

处理Netty大量日志的核心策略:

异步化: 使用Disruptor等无锁队列,避免阻塞IO线程

分级处理: 不同级别日志采用不同处理策略

批量操作: 合并小操作,减少IO次数

流量控制: 自适应限流,保护系统稳定性

多级存储: 热温冷数据分层存储

降级策略: 系统压力大时自动降级

监控告警: 实时监控处理状态

通过这些方案,可以有效处理10万+ TPS的日志流量,同时保证系统稳定性。

处理海量日志存储到数据库的核心策略:

分库分表: 按时间和业务分片,分散压力

数据压缩: 对大文本字段进行压缩存储

批量写入: 使用JDBC批量插入,提高吞吐量

读写分离: 查询走从库,写入走主库

分层存储: 热温冷数据不同存储策略

生命周期管理: 自动归档和清理过期数据

监控优化: 实时监控数据库性能

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
遇冰的头像 - 鹿快
评论 抢沙发

请登录后发表评论

    暂无评论内容