当Netty处理大量日志时,需要从多个层面进行优化,避免日志本身成为系统瓶颈。
1. 异步日志架构
1.1 异步日志处理器
java
@Component
public class AsyncLogHandler extends SimpleChannelInboundHandler<LogMessage> {
// 高性能无锁队列
private final Disruptor<LogEvent> disruptor;
private final RingBuffer<LogEvent> ringBuffer;
// 批量处理配置
private static final int BATCH_SIZE = 1000;
private static final int BUFFER_SIZE = 65536;
private final List<LogMessage> batchBuffer = new ArrayList<>(BATCH_SIZE);
private final ScheduledExecutorService flushScheduler =
Executors.newScheduledThreadPool(1);
@PostConstruct
public void init() {
// 初始化Disruptor
disruptor = new Disruptor<>(
LogEvent::new,
BUFFER_SIZE,
Executors.defaultThreadFactory(),
ProducerType.MULTI,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new LogEventHandler());
disruptor.start();
ringBuffer = disruptor.getRingBuffer();
// 启动定时刷新
flushScheduler.scheduleAtFixedRate(this::flushBatch, 100, 100, TimeUnit.MILLISECONDS);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LogMessage logMessage) {
// 异步写入队列,不阻塞IO线程
long sequence = ringBuffer.next();
try {
LogEvent event = ringBuffer.get(sequence);
event.setLogMessage(logMessage);
event.setReceiveTime(System.currentTimeMillis());
} finally {
ringBuffer.publish(sequence);
}
}
// Disruptor事件处理器
private class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch) {
processLogEvent(event);
// 批量处理完成时刷新
if (endOfBatch) {
flushBatch();
}
}
}
private void processLogEvent(LogEvent event) {
batchBuffer.add(event.getLogMessage());
if (batchBuffer.size() >= BATCH_SIZE) {
flushBatch();
}
}
private void flushBatch() {
if (batchBuffer.isEmpty()) {
return;
}
List<LogMessage> logsToProcess = new ArrayList<>(batchBuffer);
batchBuffer.clear();
// 异步处理批量日志
CompletableFuture.runAsync(() -> processBatchLogs(logsToProcess))
.exceptionally(ex -> {
log.error("批量处理日志异常", ex);
return null;
});
}
}
1.2 日志分级处理策略
java
@Component
public class LogLevelProcessor {
// 不同级别日志采用不同处理策略
private final Map<LogLevel, LogStrategy> strategyMap = new EnumMap<>(LogLevel.class);
@PostConstruct
public void init() {
// ERROR级别 - 实时处理
strategyMap.put(LogLevel.ERROR, new RealTimeLogStrategy());
// WARN级别 - 快速处理
strategyMap.put(LogLevel.WARN, new FastLogStrategy());
// INFO级别 - 批量处理
strategyMap.put(LogLevel.INFO, new BatchLogStrategy());
// DEBUG级别 - 采样处理
strategyMap.put(LogLevel.DEBUG, new SamplingLogStrategy(0.1)); // 10%采样
}
public void processLog(LogMessage log) {
LogStrategy strategy = strategyMap.get(log.getLevel());
if (strategy != null) {
strategy.process(log);
}
}
// 策略接口
public interface LogStrategy {
void process(LogMessage log);
}
// 实时处理策略
public class RealTimeLogStrategy implements LogStrategy {
@Override
public void process(LogMessage log) {
// 立即写入重要存储
criticalStorage.save(log);
// 触发告警
alertService.checkAndAlert(log);
}
}
// 采样处理策略
public class SamplingLogStrategy implements LogStrategy {
private final double samplingRate;
private final Random random = new Random();
public SamplingLogStrategy(double samplingRate) {
this.samplingRate = samplingRate;
}
@Override
public void process(LogMessage log) {
if (random.nextDouble() < samplingRate) {
// 只处理采样到的日志
logStorage.save(log);
}
}
}
}
2. 存储优化方案
2.1 多级存储架构
java
@Component
public class TieredLogStorage {
@Autowired
private ElasticsearchService esService; // 热存储 - 近期数据
@Autowired
private HBaseService hbaseService; // 温存储 - 历史数据
@Autowired
private S3Service s3Service; // 冷存储 - 归档数据
// 时间分区策略
private static final Duration HOT_STORAGE_DURATION = Duration.ofDays(7);
private static final Duration WARM_STORAGE_DURATION = Duration.ofDays(30);
/**
* 根据时间选择存储层级
*/
public void storeLog(LogMessage log) {
long logTime = log.getTimestamp();
long currentTime = System.currentTimeMillis();
if (currentTime - logTime < HOT_STORAGE_DURATION.toMillis()) {
// 热数据 -> Elasticsearch
esService.indexLog(log);
} else if (currentTime - logTime < WARM_STORAGE_DURATION.toMillis()) {
// 温数据 -> HBase
hbaseService.storeLog(log);
} else {
// 冷数据 -> 对象存储
s3Service.archiveLog(log);
}
}
/**
* 批量存储优化
*/
public void storeLogsBatch(List<LogMessage> logs) {
// 按时间分区
Map<StorageTier, List<LogMessage>> partitionedLogs = logs.stream()
.collect(Collectors.groupingBy(this::classifyStorageTier));
// 并行存储到不同层级
CompletableFuture.allOf(
CompletableFuture.runAsync(() ->
esService.bulkIndex(partitionedLogs.get(StorageTier.HOT))),
CompletableFuture.runAsync(() ->
hbaseService.batchStore(partitionedLogs.get(StorageTier.WARM))),
CompletableFuture.runAsync(() ->
s3Service.batchArchive(partitionedLogs.get(StorageTier.COLD)))
).exceptionally(ex -> {
log.error("批量存储日志失败", ex);
return null;
});
}
private StorageTier classifyStorageTier(LogMessage log) {
long logTime = log.getTimestamp();
long currentTime = System.currentTimeMillis();
long age = currentTime - logTime;
if (age < HOT_STORAGE_DURATION.toMillis()) {
return StorageTier.HOT;
} else if (age < WARM_STORAGE_DURATION.toMillis()) {
return StorageTier.WARM;
} else {
return StorageTier.COLD;
}
}
enum StorageTier {
HOT, WARM, COLD
}
}
2.2 日志压缩和清理
java
@Component
public class LogCompactionService {
@Autowired
private ScheduledExecutorService compactionScheduler;
@PostConstruct
public void startCompaction() {
// 每天凌晨执行压缩任务
compactionScheduler.scheduleAtFixedRate(
this::compactOldLogs,
1, 24, TimeUnit.HOURS
);
}
/**
* 日志压缩 - 合并小文件,删除过期数据
*/
public void compactOldLogs() {
try {
// 1. 查找需要压缩的日志文件
List<LogFile> filesToCompact = findFilesToCompact();
// 2. 执行压缩
compactFiles(filesToCompact);
// 3. 删除过期日志
deleteExpiredLogs();
} catch (Exception e) {
log.error("日志压缩任务执行失败", e);
}
}
/**
* 实时日志数据裁剪
*/
public LogMessage compressLog(LogMessage originalLog) {
return LogMessage.builder()
.timestamp(originalLog.getTimestamp())
.level(originalLog.getLevel())
.service(originalLog.getService())
.traceId(originalLog.getTraceId())
.message(compressMessage(originalLog.getMessage())) // 压缩消息内容
.essentialFields(extractEssentialFields(originalLog)) // 提取关键字段
.build();
}
private String compressMessage(String message) {
// 简单的消息压缩逻辑
if (message.length() > 1024) {
return message.substring(0, 1000) + "...[compressed]";
}
return message;
}
}
3. 流量控制和降级
3.1 自适应流量控制
java
@Component
public class AdaptiveFlowController {
private final AtomicLong currentRate = new AtomicLong(10000); // 初始速率
private final AtomicLong processedCount = new AtomicLong(0);
private final RateLimiter rateLimiter = RateLimiter.create(10000);
// 系统指标监控
private final SystemMetricsCollector metricsCollector;
@Scheduled(fixedRate = 5000)
public void adjustRate() {
// 根据系统负载动态调整处理速率
SystemMetrics metrics = metricsCollector.getCurrentMetrics();
long newRate = calculateOptimalRate(metrics);
currentRate.set(newRate);
rateLimiter.setRate(newRate);
log.info("调整日志处理速率: {} logs/s", newRate);
}
private long calculateOptimalRate(SystemMetrics metrics) {
double cpuUsage = metrics.getCpuUsage();
double memoryUsage = metrics.getMemoryUsage();
long currentRate = this.currentRate.get();
if (cpuUsage > 0.8 || memoryUsage > 0.8) {
// 系统负载高,降低处理速率
return (long) (currentRate * 0.7);
} else if (cpuUsage < 0.5 && memoryUsage < 0.6) {
// 系统负载低,提高处理速率
return (long) (currentRate * 1.2);
}
return currentRate;
}
public boolean tryAcquire(LogMessage log) {
// 根据日志级别调整优先级
double cost = getAcquireCost(log.getLevel());
return rateLimiter.tryAcquire((int) cost);
}
private double getAcquireCost(LogLevel level) {
switch (level) {
case ERROR: return 0.5; // 错误日志高优先级
case WARN: return 0.8; // 警告日志中优先级
case INFO: return 1.0; // 信息日志标准优先级
case DEBUG: return 2.0; // 调试日志低优先级
default: return 1.0;
}
}
}
3.2 降级策略
java
@Component
public class LogDegradationService {
private volatile DegradationLevel currentLevel = DegradationLevel.NORMAL;
/**
* 根据系统状态自动降级
*/
@Scheduled(fixedRate = 3000)
public void autoDegrade() {
SystemMetrics metrics = systemMonitor.getMetrics();
DegradationLevel newLevel = calculateDegradationLevel(metrics);
if (newLevel != currentLevel) {
applyDegradationLevel(newLevel);
currentLevel = newLevel;
log.warn("日志处理降级级别变更: {} -> {}", currentLevel, newLevel);
}
}
private DegradationLevel calculateDegradationLevel(SystemMetrics metrics) {
if (metrics.getCpuUsage() > 0.9 || metrics.getMemoryUsage() > 0.9) {
return DegradationLevel.EMERGENCY;
} else if (metrics.getCpuUsage() > 0.8 || metrics.getMemoryUsage() > 0.85) {
return DegradationLevel.HIGH;
} else if (metrics.getCpuUsage() > 0.7 || metrics.getMemoryUsage() > 0.75) {
return DegradationLevel.MEDIUM;
} else {
return DegradationLevel.NORMAL;
}
}
private void applyDegradationLevel(DegradationLevel level) {
switch (level) {
case NORMAL:
// 正常处理所有日志
setSamplingRate(1.0);
setBatchSize(1000);
setStorageTier(StorageTier.HOT);
break;
case MEDIUM:
// 中等降级:DEBUG日志采样50%
setSamplingRate(0.5);
setBatchSize(500);
break;
case HIGH:
// 高度降级:DEBUG日志采样10%,INFO日志采样50%
setSamplingRate(0.1);
setBatchSize(100);
setStorageTier(StorageTier.WARM);
break;
case EMERGENCY:
// 紧急降级:只处理ERROR和WARN日志
setSamplingRate(0.0);
setBatchSize(50);
setStorageTier(StorageTier.COLD);
break;
}
}
enum DegradationLevel {
NORMAL, MEDIUM, HIGH, EMERGENCY
}
}
4. 消息队列缓冲
4.1 Kafka日志缓冲层
java
@Component
public class KafkaLogBuffer {
@Autowired
private KafkaTemplate<String, LogMessage> kafkaTemplate;
// 按服务分Topic,避免单Topic压力过大
private static final String LOG_TOPIC_PREFIX = "logs-";
/**
* 异步写入Kafka缓冲
*/
public CompletableFuture<SendResult<String, LogMessage>> bufferLog(LogMessage log) {
String topic = getTopicForService(log.getService());
ListenableFuture<SendResult<String, LogMessage>> future =
kafkaTemplate.send(topic, log.getService(), log);
return future.completable()
.exceptionally(ex -> {
// Kafka写入失败时的降级处理
handleKafkaFailure(log, ex);
return null;
});
}
private String getTopicForService(String service) {
// 根据服务名称哈希到不同的Topic
int topicIndex = Math.abs(service.hashCode()) % 16;
return LOG_TOPIC_PREFIX + topicIndex;
}
/**
* Kafka消费者 - 从缓冲层消费日志
*/
@KafkaListener(topics = "logs-${server.id}")
public void consumeLogs(List<ConsumerRecord<String, LogMessage>> records) {
List<LogMessage> logs = records.stream()
.map(ConsumerRecord::value)
.collect(Collectors.toList());
// 批量处理日志
tieredLogStorage.storeLogsBatch(logs);
}
}
5. 监控和告警
5.1 日志处理监控
java
@Component
public class LogProcessingMonitor {
private final MeterRegistry meterRegistry;
private final Counter receivedCounter;
private final Counter processedCounter;
private final Counter droppedCounter;
public LogProcessingMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.receivedCounter = Counter.builder("logs.received")
.register(meterRegistry);
this.processedCounter = Counter.builder("logs.processed")
.register(meterRegistry);
this.droppedCounter = Counter.builder("logs.dropped")
.register(meterRegistry);
}
public void recordReceived() {
receivedCounter.increment();
}
public void recordProcessed() {
processedCounter.increment();
}
public void recordDropped() {
droppedCounter.increment();
}
@Scheduled(fixedRate = 60000)
public void reportMetrics() {
double receiveRate = getReceiveRate();
double processRate = getProcessRate();
double dropRate = getDropRate();
// 计算处理延迟
double processingLag = receiveRate - processRate;
if (processingLag > 1000) {
// 触发告警
alertService.alert("日志处理延迟过高: " + processingLag);
}
if (dropRate > receiveRate * 0.1) {
// 丢弃率超过10%告警
alertService.alert("日志丢弃率过高: " + dropRate);
}
}
}
6. 配置优化
6.1 可配置化处理策略
yaml
# application.yml
logging:
processing:
enabled: true
async: true
batch-size: 1000
flush-interval: 100ms
buffer-size: 65536
storage:
hot-tier-duration: 7d
warm-tier-duration: 30d
compression-enabled: true
degradation:
enabled: true
cpu-threshold: 0.8
memory-threshold: 0.85
sampling-rates:
error: 1.0
warn: 1.0
info: 0.5
debug: 0.1
总结
处理Netty大量日志的核心策略:
异步化: 使用Disruptor等无锁队列,避免阻塞IO线程
分级处理: 不同级别日志采用不同处理策略
批量操作: 合并小操作,减少IO次数
流量控制: 自适应限流,保护系统稳定性
多级存储: 热温冷数据分层存储
降级策略: 系统压力大时自动降级
监控告警: 实时监控处理状态
通过这些方案,可以有效处理10万+ TPS的日志流量,同时保证系统稳定性。
处理海量日志存储到数据库的核心策略:
分库分表: 按时间和业务分片,分散压力
数据压缩: 对大文本字段进行压缩存储
批量写入: 使用JDBC批量插入,提高吞吐量
读写分离: 查询走从库,写入走主库
分层存储: 热温冷数据不同存储策略
生命周期管理: 自动归档和清理过期数据
监控优化: 实时监控数据库性能
© 版权声明
文章版权归作者所有,未经允许请勿转载。如内容涉嫌侵权,请在本页底部进入<联系我们>进行举报投诉!
THE END

















暂无评论内容