专栏名称: ImportNew
伯乐在线旗下账号,专注Java技术分享,包括Java基础技术、进阶技能、架构设计和Java技术领域动态等。
目录
相关文章推荐
江西文旅发布  ·  梅亦会见中石化江西分公司党委书记仲伟、总经理 ... ·  10 小时前  
江西文旅发布  ·  梅亦会见中石化江西分公司党委书记仲伟、总经理 ... ·  10 小时前  
国家外汇管理局  ·  李强主持召开国务院第七次全体会议 ... ·  11 小时前  
上饶新闻  ·  上饶又又又下雪了!! ·  2 天前  
上饶新闻  ·  上饶又又又下雪了!! ·  2 天前  
江西宣传  ·  因为这几句诗,这位冠军来江西了! ·  2 天前  
江西宣传  ·  因为这几句诗,这位冠军来江西了! ·  2 天前  
51好读  ›  专栏  ›  ImportNew

使用 MDC 实现日志链路跟踪

ImportNew  · 公众号  ·  · 2024-05-25 11:30

正文

(给 ImportNew加星标,提高Java技能)


1、背景简述


在技术运维过程中,很难从某服务庞杂的日志中,单独找寻出某次 API 调用的全部日志。

为提高排查问题的效率,在多个系统及应用内根据 统一的 TraceId 查找同一次请求链路上的日志,根据日志快速定位问题,同时需对业务代码无侵入,特别是在高频请求下,也可以方便的搜索此次请求的日志内容。

本此分享一个使用 MDC 实现日志链路跟踪,在微服务环境中,我们经常使用 Skywalking、Spring Cloud Sleut 等去实现整体请求链路的追踪,但是这个整体运维成本高,架构复杂,本次我们来使用 MDC 通过 Log 来实现一个轻量级的会话事务跟踪功能,需要的朋友可以参考一下。


1.1 应用效果图


我们知道了 MDC 的好处后,其实在用户从第一时间调用请求时候,我们其实可以将请求增加 traceid 一并返回,这样用户反馈时候,我们直接用 traceid 就可以全链路追踪到所有请求的情况了,做到信息的闭环。

请求效果图:







LOGBOOK 效果图:





2、关键思路


2.1 MDC


日志追踪目标是每次请求级别的,也就是说同一个接口的每次请求,都应该有不同的 traceId。每次接口请求,都是一个单独的线程,所以自然我们很容易考虑到通过 ThreadLocal 实现上述需求。考虑到 log4j 本身已经提供了类似的功能 MDC,所以直接使用 MDC 进行实现。


关于 MDC 的简述


MDC(Mapped Diagnostic Context)是一个映射,用于存储运行上下文的特定线程的上下文数据。因此,如果使用 log4j 进行日志记录,则每个线程都可以拥有自己的 MDC,该 MDC 对整个线程是全局的。属于该线程的任何代码都可以轻松访问线程的 MDC 中存在的值。


API 说明


  • clear() => 移除所有 MDC
  • get (String key) => 获取当前线程 MDC 中指定 key 的值
  • getContext() => 获取当前线程 MDC 的 MDC
  • put(String key, Object o) => 往当前线程的 MDC 中存入指定的键值对
  • remove(String key) => 删除当前线程 MDC 中指定的键值对


3、目标


  1. 需要一个全服务唯一的 id,即 traceId,如何保证?
  2. traceId 如何在服务内部传递?
  3. traceId 如何在服务间传递?
  4. traceId 如何在多线程中传递?


4、实现方式


4.1 需要一个全服务唯一的 id,即 traceId,如何保证?


使用最简单的 uuid 即可。复杂的话可以配置 Redis、雪花算法等方式。本次分享选最简单 uuid 生成 traceId 的方式。


4.2 traceId 如何在服务间传递?


1)在 XML 的日志格式中添加 %X{traceId} 配置。



"CONSOLE" class="org.apache.log4j.ConsoleAppender">    "org.apache.log4j.PatternLayout">        "ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} [%X{traceId}] [%p] %l[%t]%n%m%n" />    </layout>>

2)新增拦截器,拦截所有请求,从 header 中获取 traceId 然后放到 MDC 中,如果没有获取到,则直接用 UUID 生成一个。



@Slf4j@Componentpublic class LogInterceptor implements HandlerInterceptor {    private static final String TRACE_ID = "traceId";
@Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception arg3) throws Exception { }
@Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView arg3) throws Exception { }
@Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String traceId = request.getHeader(TRACE_ID); if (StringUtils.isEmpty(traceId)) { MDC.put(TRACE_ID, UUID.randomUUID().toString()); } else { MDC.put(TRACE_ID, traceId); } return true; }}



3)配置拦截器



@Configurationpublic class WebConfig implements WebMvcConfigurer {    @Resource    private LogInterceptor logInterceptor;
@Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(logInterceptor).addPathPatterns("/**"); }}


4.3 traceId 如何在服务间传递


封装 HTTP 工具类,把 traceId 加入头中,带到下一个服务。



@Slf4j




    
public class HttpUtils {    public static String get(String url) throws URISyntaxException {        RestTemplate restTemplate = new RestTemplate();        MultiValueMap<String, String> headers = new HttpHeaders();        headers.add("traceId", MDC.get("traceId"));        URI uri = new URI(url);        RequestEntity> requestEntity = new RequestEntity<>(headers, HttpMethod.GET, uri);        ResponseEntity exchange = restTemplate.exchange(requestEntity, String.class);        if (exchange.getStatusCode().equals(HttpStatus.OK)) {            log.info("send http request success");        }        return exchange.getBody();    }}


4.4 traceId 如何在多线程中传递?


Spring 项目也使用到了很多线程池,比如 @Async 异步调用,Zookeeper 线程池、 Kafka 线程池等。不管是哪种线程池都大都支持传入指定的线程池实现,@Async 举例:


原理为:


MDC 底层使用 ThreadLocal 来实现,那根据 ThreadLocal 的特点,它是可以让我们在同一个线程中共享数据的,但是往往我们在业务方法中,会开启多线程来执行程序,这样的话 MDC 就无法传递到其他子线程了。这时,我们需要使用额外的方法来传递存在 ThreadLocal 里的值。


MDC 提供了一个叫 getCopyOfContextMap 的方法,很显然,该方法就是把当前线程 ThreadLocal 绑定的Map获取出来,之后就是把该 Map 绑定到子线程中的ThreadLocal 中了。


改造 Spring 的异步线程池,包装提交的任务。



@Slf4j@Componentpublic class TraceAsyncConfigurer implements AsyncConfigurer {    @Override    public Executor getAsyncExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(8);        executor.setMaxPoolSize(16);        executor.setQueueCapacity(100);        executor.setThreadNamePrefix("async-pool-");        executor.setTaskDecorator(new MdcTaskDecorator());        executor.setWaitForTasksToCompleteOnShutdown(true);        executor.initialize();        return executor;    }
@Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, params) -> log.error("asyc execute error, method={}, params={}", method.getName(), Arrays.toString(params)); }
public static class MdcTaskDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { Map contextMap = MDC.getCopyOfContextMap(); return () -> { if (contextMap != null) { MDC.setContextMap(contextMap); } try { runnable.run(); } finally { MDC.clear(); } }; } }}
public class MDCLogThreadPoolExecutor extends ThreadPoolExecutor { public MDCLogThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); }
@Override public void execute(Runnable command) { super.execute(MDCLogThreadPoolExecutor.executeRunable(command, MDC.getCopyOfContextMap())); }
@Override public Future> submit(Runnable task) { return super.submit(MDCLogThreadPoolExecutor.executeRunable(task, MDC.getCopyOfContextMap())); }
@Override public Future submit(Callable callable) { return super.submit(MDCLogThreadPoolExecutor.submitCallable(callable, MDC.getCopyOfContextMap())); }
public static Runnable executeRunable(Runnable runnable, Map mdcContext) { return new Runnable() { @Override public void run() { if (mdcContext == null) { MDC.clear(); } else { MDC.setContextMap(mdcContext); } try { runnable.run(); } finally { MDC.clear(); } } }; }
private static Callable submitCallable(Callable callable, Map context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { return callable.call(); } finally { MDC.clear(); } }; }}

接下来需要对 ThreadPoolTaskExecutor 的方法进行重写:



package com.example.demo.common.threadpool;
import com.example.demo.common.constant.Constants;import lombok.extern.slf4j.Slf4j;import org.slf4j.MDC;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.Map;import java.util.UUID;import java.util.concurrent.Callable;import java.util.concurrent.Future;
/** * MDC线程池 * 实现内容传递 * @author wangbo * @date 2021/5/13 */@Slf4jpublicclass MdcTaskExecutor extends ThreadPoolTaskExecutor { @Override public Future submit(Callable task) { log.info("mdc thread pool task executor submit"); Map context = MDC.getCopyOfContextMap();
return super.submit(() -> { T result; if (context != null) { // 将父线程的MDC内容传给子线程 MDC.setContextMap(context); } else { // 直接给子线程设置MDC MDC.put(Constants.LOG_MDC_ID, UUID.randomUUID().toString().replace("-", "")); } try { // 执行任务 result = task.call(); } finally { try { MDC.clear(); } catch (Exception e) { log.warn("MDC clear exception", e); } } return result; }); }
@Override public void execute(Runnable task) { log.info("mdc thread pool task executor execute"); Map context = MDC.getCopyOfContextMap(); super.execute(() -> { if (context != null) { // 将父线程的MDC内容传给子线程 MDC.setContextMap(context); } else { // 直接给子线程设置MDC MDC.put(Constants.LOG_MDC_ID, UUID.randomUUID().toString().replace("-", "")); } try { // 执行任务 task.run(); } finally { try { MDC.clear(); } catch (Exception e) { log.warn("MDC clear exception", e); } } }); }}



然后使用自定义的重写子类 MdcTaskExecutor 来实现线程池配置:

/** * 线程池配置 *  * @author wangbo * @date 2021/5/13 */@Slf4j@Configurationpublicclass ThreadPoolConfig {    /** * 异步任务线程池 * 用于执行普通的异步请求,带有请求链路的MDC标志 */    @Bean    public Executor commonThreadPool() {        log.info("start init common thread pool"); // ThreadPoolTaskExecutor        executor = new ThreadPoolTaskExecutor();        MdcTaskExecutor executor = new MdcTaskExecutor();        // 配置核心线程数        executor.setCorePoolSize(10);        // 配置最大线程数        executor.setMaxPoolSize(20);        // 配置队列大小        executor.setQueueCapacity(3000);        // 配置空闲线程存活时间        executor.setKeepAliveSeconds(120);        // 配置线程池中的线程的名称前缀        executor.setThreadNamePrefix("common-thread-pool-");        // 当达到最大线程池的时候丢弃最老的任务        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());        // 执行初始化        executor.initialize();        return executor;    }
/**





请到「今天看啥」查看全文