设计一个分布式环境下的日志系统?你如何做?
在分布式系统的面试中,日志系统的设计问题频繁出现,这并非偶然。在生产环境中,当系统出现问题时,日志往往是我们定位和排查问题的唯一线索。
传统单体架构的简单时代
在单体架构时代,整个应用程序运行在一个进程中,所有的业务逻辑、数据访问、用户界面都打包在一起部署到单台服务器上。那时候的日志系统设计相对简单直接。
想象一个典型的电商系统,用户登录、浏览商品、下单支付,所有这些操作都在同一个应用中完成。日志文件也集中存放在服务器的某个目录下,比如 /var/log/application/。
那时候的日志配置非常简单,只需要在项目中引入 Log4j 或 Logback,然后配置一个基本的日志文件:
<!-- 单体架构时代的简单日志配置 -->
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>/var/log/application/app.log</file>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss} %-5level - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>在代码中使用也很直观:
@Service
public class OrderService {
private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
public Order createOrder(OrderRequest request) {
logger.info("开始创建订单,用户ID: {}", request.getUserId());
try {
// 验证库存
logger.debug("检查商品库存");
checkInventory(request.getProductId(), request.getQuantity());
// 创建订单
Order order = new Order();
order.setUserId(request.getUserId());
order.setProductId(request.getProductId());
orderRepository.save(order);
logger.info("订单创建成功,订单号: {}", order.getOrderNo());
return order;
} catch (Exception e) {
logger.error("创建订单失败,用户ID: " + request.getUserId(), e);
throw new BusinessException("订单创建失败");
}
}
}当出现问题时,运维人员只需要登录到服务器,使用简单的 Linux 命令就能快速定位问题:
# 查看最新的错误日志
tail -f /var/log/application/app.log | grep ERROR
# 查找某个用户的操作记录
grep "用户ID: 12345" /var/log/application/app.log
# 统计今天的错误数量
grep "$(date +%Y-%m-%d)" /var/log/application/app.log | grep ERROR | wc -l
# 查看某个时间段的日志
sed -n '/2024-01-01 10:00/,/2024-01-01 11:00/p' /var/log/application/app.log这种模式下,日志管理的核心关注点主要是:
日志级别的合理使用。通过 DEBUG、INFO、WARN、ERROR 来区分日志的重要程度,开发环境可以打印详细的 DEBUG 日志,生产环境则只保留 INFO 以上级别,既能保证关键信息不遗漏,又能控制日志文件大小。
日志滚动策略。随着系统运行,日志文件会不断增大。我们需要配置日志滚动策略,比如按天分割、按大小分割,或者保留最近 30 天的日志,自动删除过期文件:
<appender name="ROLLING-FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/var/log/application/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天生成一个新文件 -->
<fileNamePattern>/var/log/application/app-%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 保留30天 -->
<maxHistory>30</maxHistory>
<!-- 总大小不超过10GB -->
<totalSizeCap>10GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>日志内容的规范。为了便于后续查询和分析,日志消息需要包含足够的上下文信息,比如用户 ID、订单号、商品 ID 等关键业务参数。同时避免打印敏感信息如密码、银行卡号等。
单体架构时代的日志系统虽然简单,但足够实用。开发人员可以直接在 IDE 的控制台看到日志输出,运维人员可以通过 SSH 登录服务器查看日志文件,一切都在掌控之中。问题的排查路径也很清晰:发现问题 → 查看日志 → 定位代码 → 修复部署。
但是,当系统演进到分布式架构,特别是微服务架构时,这种简单的模式就不够用了。
分布式场景下的挑战
想象一下这样的场景:你的系统由数十个甚至上百个微服务节点组成,一个用户请求可能会跨越多个服务。当出现问题时,你需要登录多台服务器,在不同的日志文件中搜索相关信息,然后手动拼凑出完整的调用链路。这个过程不仅费时费力,还容易遗漏关键信息。
在上图所示的调用链中,如果支付服务出现问题,我们需要查看网关、订单、支付三个服务的日志才能完整还原问题现场。这就是分布式日志系统需要解决的核心问题:如何将分散在各个节点的日志收集起来,并在统一的地方进行汇总和查询。
日志收集:第一道关卡
日志收集是整个系统的基础,目前主要有两种策略。
第一种是基于磁盘日志变化的收集方式。应用程序将日志写入本地磁盘,收集工具监控日志文件的变化,实时读取新增的日志内容并传输到中央存储。这种方式的优点是对应用程序无侵入,即使收集工具出现故障,也不会影响应用的正常运行。
让我们看一个简单的 Logback 配置示例,它会将日志输出到本地文件:
<!-- logback-spring.xml -->
<configuration>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>/var/log/app/application.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"service":"order-service","env":"production"}</customFields>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/var/log/app/application-%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
</rollingPolicy>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
</root>
</configuration>对应的 Filebeat 配置文件:
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
service: order-service
env: production
json.keys_under_root: true
json.add_error_key: true
output.elasticsearch:
hosts: ["elasticsearch:9200"]
index: "logs-%{+yyyy.MM.dd}"第二种是通过消息队列同步日志。应用程序在记录日志的同时,将日志消息发送到 Kafka 等消息队列,然后由专门的消费者程序将日志持久化到存储系统。
下面是通过 Kafka 发送日志的实现:
@Component
public class KafkaLogAppender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String LOG_TOPIC = "application-logs";
public void sendLog(LogEvent event) {
try {
Map<String, Object> logData = new HashMap<>();
logData.put("timestamp", System.currentTimeMillis());
logData.put("level", event.getLevel());
logData.put("service", "order-service");
logData.put("message", event.getMessage());
logData.put("traceId", MDC.get("traceId"));
logData.put("thread", event.getThreadName());
if (event.getThrowable() != null) {
logData.put("exception", getStackTrace(event.getThrowable()));
}
String jsonLog = new ObjectMapper().writeValueAsString(logData);
kafkaTemplate.send(LOG_TOPIC, jsonLog);
} catch (Exception e) {
// 避免日志发送失败影响主业务
System.err.println("Failed to send log to Kafka: " + e.getMessage());
}
}
private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
throwable.printStackTrace(new PrintWriter(sw));
return sw.toString();
}
}在工具选择上,Logstash 和 Filebeat 是两个常见的选择。Logstash 功能强大,可以同时从多个源头采集数据,并且支持数据清洗功能。你可以通过配置过滤器,将日志转换成所需的格式,或者剔除不需要的日志内容。
除了这些通用工具,我们还可以通过 AOP 技术实现更精细的日志收集:
@Aspect
@Component
public class LogAspect {
@Autowired
private KafkaLogAppender logAppender;
@Around("@annotation(com.example.annotation.LogRecord)")
public Object logAround(ProceedingJoinPoint joinPoint) throws Throwable {
String traceId = MDC.get("traceId");
if (traceId == null) {
traceId = UUID.randomUUID().toString();
MDC.put("traceId", traceId);
}
long startTime = System.currentTimeMillis();
String methodName = joinPoint.getSignature().getName();
String className = joinPoint.getTarget().getClass().getSimpleName();
try {
// 记录方法入参
LogEvent enterEvent = LogEvent.builder()
.level("INFO")
.message(String.format("Enter method: %s.%s", className, methodName))
.args(joinPoint.getArgs())
.build();
logAppender.sendLog(enterEvent);
Object result = joinPoint.proceed();
// 记录方法出参和耗时
long duration = System.currentTimeMillis() - startTime;
LogEvent exitEvent = LogEvent.builder()
.level("INFO")
.message(String.format("Exit method: %s.%s, duration: %dms",
className, methodName, duration))
.result(result)
.build();
logAppender.sendLog(exitEvent);
return result;
} catch (Throwable e) {
// 记录异常
LogEvent errorEvent = LogEvent.builder()
.level("ERROR")
.message(String.format("Exception in method: %s.%s", className, methodName))
.throwable(e)
.build();
logAppender.sendLog(errorEvent);
throw e;
} finally {
MDC.clear();
}
}
}使用时只需要在方法上添加注解:
@Service
public class OrderService {
@LogRecord
public Order createOrder(OrderRequest request) {
// 业务逻辑
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setAmount(request.getAmount());
// ...
return order;
}
}日志存储:选择合适的数据库
收集到日志之后,下一个问题是如何存储。日志数据有几个特点:数据量大、写入频繁、查询模式相对固定。基于这些特点,我们需要选择合适的存储方案。
Elasticsearch 是 ELK 技术栈中的核心组件,也是最常见的选择。下面是一个简单的日志写入示例:
@Service
public class LogStorageService {
@Autowired
private RestHighLevelClient esClient;
public void saveLog(LogData logData) throws IOException {
IndexRequest request = new IndexRequest("logs-" +
LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy.MM.dd")));
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("timestamp", logData.getTimestamp());
jsonMap.put("level", logData.getLevel());
jsonMap.put("service", logData.getService());
jsonMap.put("message", logData.getMessage());
jsonMap.put("traceId", logData.getTraceId());
jsonMap.put("host", logData.getHost());
request.source(jsonMap);
esClient.index(request, RequestOptions.DEFAULT);
}
public List<LogData> searchLogs(String traceId, String startTime, String endTime)
throws IOException {
SearchRequest searchRequest = new SearchRequest("logs-*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (traceId != null) {
boolQuery.must(QueryBuilders.termQuery("traceId", traceId));
}
if (startTime != null && endTime != null) {
boolQuery.must(QueryBuilders.rangeQuery("timestamp")
.gte(startTime)
.lte(endTime));
}
sourceBuilder.query(boolQuery);
sourceBuilder.sort("timestamp", SortOrder.ASC);
sourceBuilder.size(1000);
searchRequest.source(sourceBuilder);
SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);
List<LogData> logs = new ArrayList<>();
for (SearchHit hit : response.getHits().getHits()) {
LogData log = new ObjectMapper().convertValue(
hit.getSourceAsMap(), LogData.class);
logs.add(log);
}
return logs;
}
}如果选择 MongoDB 作为存储方案,实现会更加简单:
@Service
public class MongoLogService {
@Autowired
private MongoTemplate mongoTemplate;
public void saveLog(LogData logData) {
Document doc = new Document()
.append("timestamp", logData.getTimestamp())
.append("level", logData.getLevel())
.append("service", logData.getService())
.append("message", logData.getMessage())
.append("traceId", logData.getTraceId())
.append("host", logData.getHost());
mongoTemplate.insert(doc, "application_logs");
}
public List<LogData> findByTraceId(String traceId) {
Query query = new Query(Criteria.where("traceId").is(traceId));
query.with(Sort.by(Sort.Direction.ASC, "timestamp"));
return mongoTemplate.find(query, LogData.class, "application_logs");
}
}在实际应用中,我们还需要考虑日志的生命周期管理。下面是一个自动清理过期日志的实现:
@Component
public class LogCleanupTask {
@Autowired
private RestHighLevelClient esClient;
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void cleanupOldLogs() {
try {
LocalDate cutoffDate = LocalDate.now().minusDays(30);
String indexPattern = "logs-" +
cutoffDate.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"));
DeleteIndexRequest request = new DeleteIndexRequest(indexPattern);
esClient.indices().delete(request, RequestOptions.DEFAULT);
log.info("Successfully deleted old index: {}", indexPattern);
} catch (Exception e) {
log.error("Failed to cleanup old logs", e);
}
}
}日志查询:让数据发挥价值
有了完善的收集和存储机制,最后一步是提供便捷的查询能力。让我们实现一个简单的日志查询 API:
@RestController
@RequestMapping("/api/logs")
public class LogQueryController {
@Autowired
private LogStorageService logStorageService;
@GetMapping("/search")
public ResponseEntity<LogQueryResponse> searchLogs(
@RequestParam(required = false) String traceId,
@RequestParam(required = false) String keyword,
@RequestParam(required = false) String level,
@RequestParam(required = false) String service,
@RequestParam String startTime,
@RequestParam String endTime,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "100") int size) {
try {
LogQueryRequest request = LogQueryRequest.builder()
.traceId(traceId)
.keyword(keyword)
.level(level)
.service(service)
.startTime(startTime)
.endTime(endTime)
.page(page)
.size(size)
.build();
List<LogData> logs = logStorageService.searchLogs(request);
long total = logStorageService.countLogs(request);
LogQueryResponse response = LogQueryResponse.builder()
.logs(logs)
.total(total)
.page(page)
.size(size)
.build();
return ResponseEntity.ok(response);
} catch (Exception e) {
log.error("Failed to search logs", e);
return ResponseEntity.status(500).build();
}
}
@GetMapping("/trace/{traceId}")
public ResponseEntity<List<LogData>> getLogsByTrace(@PathVariable String traceId) {
try {
List<LogData> logs = logStorageService.searchLogs(traceId, null, null);
return ResponseEntity.ok(logs);
} catch (Exception e) {
log.error("Failed to get logs by traceId: {}", traceId, e);
return ResponseEntity.status(500).build();
}
}
@GetMapping("/stats")
public ResponseEntity<LogStats> getLogStats(
@RequestParam String startTime,
@RequestParam String endTime) {
try {
LogStats stats = logStorageService.getLogStatistics(startTime, endTime);
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("Failed to get log statistics", e);
return ResponseEntity.status(500).build();
}
}
}对于日志统计分析,我们可以使用 Elasticsearch 的聚合功能:
public LogStats getLogStatistics(String startTime, String endTime) throws IOException {
SearchRequest searchRequest = new SearchRequest("logs-*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
// 时间范围过滤
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("timestamp")
.gte(startTime)
.lte(endTime);
sourceBuilder.query(rangeQuery);
// 按日志级别聚合
TermsAggregationBuilder levelAgg = AggregationBuilders
.terms("level_count")
.field("level.keyword");
// 按服务聚合
TermsAggregationBuilder serviceAgg = AggregationBuilders
.terms("service_count")
.field("service.keyword");
// 按时间聚合(每小时)
DateHistogramAggregationBuilder timeAgg = AggregationBuilders
.dateHistogram("time_distribution")
.field("timestamp")
.calendarInterval(DateHistogramInterval.HOUR);
sourceBuilder.aggregation(levelAgg);
sourceBuilder.aggregation(serviceAgg);
sourceBuilder.aggregation(timeAgg);
sourceBuilder.size(0); // 只要聚合结果,不要具体文档
searchRequest.source(sourceBuilder);
SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);
// 解析聚合结果
LogStats stats = new LogStats();
Terms levelTerms = response.getAggregations().get("level_count");
Map<String, Long> levelCounts = new HashMap<>();
for (Terms.Bucket bucket : levelTerms.getBuckets()) {
levelCounts.put(bucket.getKeyAsString(), bucket.getDocCount());
}
stats.setLevelCounts(levelCounts);
Terms serviceTerms = response.getAggregations().get("service_count");
Map<String, Long> serviceCounts = new HashMap<>();
for (Terms.Bucket bucket : serviceTerms.getBuckets()) {
serviceCounts.put(bucket.getKeyAsString(), bucket.getDocCount());
}
stats.setServiceCounts(serviceCounts);
return stats;
}链路追踪:更进一步
在分布式日志系统的基础上,我们可以引入链路追踪技术。下面是一个简单的 TraceID 传递实现:
@Component
public class TraceIdFilter implements Filter {
private static final String TRACE_ID_HEADER = "X-Trace-Id";
@Override
public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
// 从请求头获取 TraceID,如果没有则生成新的
String traceId = httpRequest.getHeader(TRACE_ID_HEADER);
if (traceId == null || traceId.isEmpty()) {
traceId = UUID.randomUUID().toString().replace("-", "");
}
// 将 TraceID 放入 MDC,后续日志会自动包含
MDC.put("traceId", traceId);
try {
chain.doFilter(request, response);
} finally {
MDC.clear();
}
}
}在服务间调用时传递 TraceID:
@Component
public class TraceIdInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution)
throws IOException {
String traceId = MDC.get("traceId");
if (traceId != null) {
request.getHeaders().add("X-Trace-Id", traceId);
}
return execution.execute(request, body);
}
}
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.getInterceptors().add(new TraceIdInterceptor());
return restTemplate;
}
}这样,当我们需要排查问题时,只需要在日志系统中搜索这个 TraceID,就能看到请求在整个系统中的完整流转过程。
总结与展望
一个完善的分布式日志系统需要解决三个核心问题:如何高效地收集分散在各个节点的日志,如何可靠地存储海量的日志数据,以及如何快速地查询和分析日志信息。
通过 Filebeat 或 Logstash 进行日志收集,使用 Elasticsearch 或 MongoDB 进行存储,配合 Kibana 或自建系统进行查询,我们可以构建一个功能完善的日志系统。在此基础上,结合链路追踪技术,可以进一步提升系统的可观测性。
随着云原生技术的发展,日志系统也在不断演进。容器化环境下的日志收集、基于 eBPF 的零侵入式监控、AI 驱动的日志分析等新技术正在涌现。但无论技术如何变化,日志系统的核心价值始终不变:帮助我们更好地理解系统的运行状态,快速定位和解决问题。
