分布式链路跟踪技术(五):跨线程传输和上下文传播

在分布式链路跟踪系统中,同一条请求处理链路要用一个全局唯一的 traceId 来标识,那就需要把 traceId 传输到该请求涉及的各个系统中。Trace 信息要在系统之间传输时,是通过各种 RPC 中间件里埋点,把 Trace 信息放在 HTTP Header、RPC Context 里进行传输的。

实际中,同一个系统内部业务处理出现多线程操作时,如果不做显式处理,也容易丢失 Trace 信息。那么遇到跨线程操作时,怎么做才能保证 Trace 信息不丢失呢?可以参考阿里开源的 transmittable-thread-local,基本原理是在创建 Runnable 或 Callabke 的时候,把父线程的 Trace 数据存起来传递给子线程,子线程执行之前先获取 Trace 数据设置 Context,再执行业务代码,具体可以看 TtlRunnable 类:

public final class TtlRunnable implements Runnable, TtlEnhanced, TtlAttachments {
    private final AtomicReferenceObject capturedRef;	// 需要跨线程传输的数据
    private final Runnable runnable;					// 待执行的 Runnable

    private TtlRunnable(@NonNull Runnable runnable) {
        this.capturedRef = new AtomicReferenceObject(capture());	// 获取主线程的 Context 信息
        this.runnable = runnable;
    }

    /**
     * wrap method {@link Runnable#run()}.
     */
    @Override
    public void run() {
        Object captured = capturedRef.get();

        Object backup = replay(captured);	// 设置 Context,并返回线程中原先已有的 Context 数据
        try {
            runnable.run();					// 执行业务代码
        } finally {
            restore(backup);				// 恢复 Context 信息
        }
    }
}

TtlRunnable 类是一个包装类,里面存储了待传输的 Context 数据和待执行的 Runnable,实际中如何使用呢?

  1. 提交线程池时,使用 TtlRunnable 来包装待执行的 Runnable。缺点是比较麻烦,需要在每次提交异步任务时使用 TtlRunnable。
  2. 创建一个 ExecutorService 的包装类,对线程池进行包装,这样在提交任务时会自动 wrap Runnable,比如 ExecutorServiceTtlWrapper 类。
  3. 使用 java agent instrument 计数,启动时修改 ThreadPoolExecutor 类的字节码。该方法更通用,使用起来也更方便。

TtlRunnable 是对 Runnable 的包装,ExecutorServiceTtlWrapper 是对 ExecutorService 的包装,该类重写了 submit 方法,使用 TtlRunnable 对 Runnable 进行包装后,又把任务交给被包装的 ExecutorService 去执行了。

class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
    private final ExecutorService executorService;
    ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService) {
        super(executorService);
        this.executorService = executorService;
    }

    @NonNull
    @Override
    public T FutureT submit(@NonNull CallableT task) {
        return executorService.submit(TtlCallable.get(task));
    }

    @NonNull
    @Override
    public T FutureT submit(@NonNull Runnable task, T resu< ) {
        return executorService.submit(TtlRunnable.get(task), resu< );
    }
}

java agent 方式是把原先需要手动写代码包装 Runnable 或线程池的工作,做成了自动修改,修改方法可以看 TtlExecutorTransformlet 类:

    private static SetString EXECUTOR_CLASS_NAMES = new HashSetString();

    private static final String THREAD_POOL_EXECUTOR_CLASS_NAME = "java.util.concurrent.ThreadPoolExecutor";
    private static final String RUNNABLE_CLASS_NAME = "java.lang.Runnable";

    static {
        EXECUTOR_CLASS_NAMES.add(THREAD_POOL_EXECUTOR_CLASS_NAME);
        EXECUTOR_CLASS_NAMES.add("java.util.concurrent.ScheduledThreadPoolExecutor");
    }
    public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
    	// 如果是线程池类,则修改之
        if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
            final CtClass clazz = classInfo.getCtClass();

            for (CtMethod method : clazz.getDeclaredMethods()) {
                updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
            }

            if (disableInheritableForThreadPool) updateConstructorDisableInheritable(clazz);

            classInfo.setModified();
        }
    }
    private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
        final int modifiers = method.getModifiers();
        if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;

        CtClass[] parameterTypes = method.getParameterTypes();
        StringBuilder insertCode = new StringBuilder();
        for (int i = 0; i  parameterTypes.len >h; i++) {
            final String paramTypeName = parameterTypes[i].getName();
            // 只修改指定方法
            if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
                String code = String.format(
                        // decorate to TTL wrapper,
                        // and then set AutoWrapper attachment/Tag
                        "$%d = %s.get($%d, false, true);"
                                + "\ncom.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.setAutoWrapperAttachment($%d);",
                        i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
                logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
                insertCode.append(code);
            }
        }
        if (insertCode.len >h()  0) method.insertBefore(insertCode.toString());
    }

跨线程、跨系统传输 Trace 数据,如果只是用来串起调用链路,那就有点大材小用了,还可以用来做什么事儿呢?在[美团全链路压测]里,提到了:

对于单服务来说,识别压测流量很容易,只要在请求头中加个特殊的压测标识即可,HTTP 和 RPC 服务是一样的。但是,要在整条完整的调用链路中要始终保持压测标识,这件事就非常困难。
对于跨服务的调用,架构团队对所有涉及到的中间件进行了一一改造。利用 Mtrace(公司内部统一的分布式会话跟踪系统)的服务间传递上下文特性,在原有传输上下文的基础上,添加了测试标识的属性,以保证传输中始终带着测试标识。

最新回复(0)
/jishuZab41_2BD_2BpQU_2FCUbULtgrKDIXkbxRj2axbVH6iJUs5oA_3D4795221
8 简首页