在分布式链路跟踪系统中,同一条请求处理链路要用一个全局唯一的 traceId 来标识,那就需要把 traceId 传输到该请求涉及的各个系统中。Trace 信息要在系统之间传输时,是通过各种 RPC 中间件里埋点,把 Trace 信息放在 HTTP Header、RPC Context 里进行传输的。
实际中,同一个系统内部业务处理出现多线程操作时,如果不做显式处理,也容易丢失 Trace 信息。那么遇到跨线程操作时,怎么做才能保证 Trace 信息不丢失呢?可以参考阿里开源的 transmittable-thread-local,基本原理是在创建 Runnable 或 Callabke 的时候,把父线程的 Trace 数据存起来传递给子线程,子线程执行之前先获取 Trace 数据设置 Context,再执行业务代码,具体可以看 TtlRunnable 类:
public final class TtlRunnable implements Runnable, TtlEnhanced, TtlAttachments {
private final AtomicReferenceObject capturedRef; // 需要跨线程传输的数据
private final Runnable runnable; // 待执行的 Runnable
private TtlRunnable(@NonNull Runnable runnable) {
this.capturedRef = new AtomicReferenceObject(capture()); // 获取主线程的 Context 信息
this.runnable = runnable;
}
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
Object captured = capturedRef.get();
Object backup = replay(captured); // 设置 Context,并返回线程中原先已有的 Context 数据
try {
runnable.run(); // 执行业务代码
} finally {
restore(backup); // 恢复 Context 信息
}
}
}
TtlRunnable 类是一个包装类,里面存储了待传输的 Context 数据和待执行的 Runnable,实际中如何使用呢?
TtlRunnable 是对 Runnable 的包装,ExecutorServiceTtlWrapper 是对 ExecutorService 的包装,该类重写了 submit 方法,使用 TtlRunnable 对 Runnable 进行包装后,又把任务交给被包装的 ExecutorService 去执行了。
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService, TtlEnhanced {
private final ExecutorService executorService;
ExecutorServiceTtlWrapper(@NonNull ExecutorService executorService) {
super(executorService);
this.executorService = executorService;
}
@NonNull
@Override
public T FutureT submit(@NonNull CallableT task) {
return executorService.submit(TtlCallable.get(task));
}
@NonNull
@Override
public T FutureT submit(@NonNull Runnable task, T resu< ) {
return executorService.submit(TtlRunnable.get(task), resu< );
}
}
java agent 方式是把原先需要手动写代码包装 Runnable 或线程池的工作,做成了自动修改,修改方法可以看 TtlExecutorTransformlet 类:
private static SetString EXECUTOR_CLASS_NAMES = new HashSetString();
private static final String THREAD_POOL_EXECUTOR_CLASS_NAME = "java.util.concurrent.ThreadPoolExecutor";
private static final String RUNNABLE_CLASS_NAME = "java.lang.Runnable";
static {
EXECUTOR_CLASS_NAMES.add(THREAD_POOL_EXECUTOR_CLASS_NAME);
EXECUTOR_CLASS_NAMES.add("java.util.concurrent.ScheduledThreadPoolExecutor");
}
public void doTransform(@NonNull final ClassInfo classInfo) throws IOException, NotFoundException, CannotCompileException {
// 如果是线程池类,则修改之
if (EXECUTOR_CLASS_NAMES.contains(classInfo.getClassName())) {
final CtClass clazz = classInfo.getCtClass();
for (CtMethod method : clazz.getDeclaredMethods()) {
updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(method);
}
if (disableInheritableForThreadPool) updateConstructorDisableInheritable(clazz);
classInfo.setModified();
}
}
private void updateSubmitMethodsOfExecutorClass_decorateToTtlWrapperAndSetAutoWrapperAttachment(@NonNull final CtMethod method) throws NotFoundException, CannotCompileException {
final int modifiers = method.getModifiers();
if (!Modifier.isPublic(modifiers) || Modifier.isStatic(modifiers)) return;
CtClass[] parameterTypes = method.getParameterTypes();
StringBuilder insertCode = new StringBuilder();
for (int i = 0; i parameterTypes.len >h; i++) {
final String paramTypeName = parameterTypes[i].getName();
// 只修改指定方法
if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
String code = String.format(
// decorate to TTL wrapper,
// and then set AutoWrapper attachment/Tag
"$%d = %s.get($%d, false, true);"
+ "\ncom.alibaba.ttl.threadpool.agent.internal.transformlet.impl.Utils.setAutoWrapperAttachment($%d);",
i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
insertCode.append(code);
}
}
if (insertCode.len >h() 0) method.insertBefore(insertCode.toString());
}
跨线程、跨系统传输 Trace 数据,如果只是用来串起调用链路,那就有点大材小用了,还可以用来做什么事儿呢?在[美团全链路压测]里,提到了:
对于单服务来说,识别压测流量很容易,只要在请求头中加个特殊的压测标识即可,HTTP 和 RPC 服务是一样的。但是,要在整条完整的调用链路中要始终保持压测标识,这件事就非常困难。
对于跨服务的调用,架构团队对所有涉及到的中间件进行了一一改造。利用 Mtrace(公司内部统一的分布式会话跟踪系统)的服务间传递上下文特性,在原有传输上下文的基础上,添加了测试标识的属性,以保证传输中始终带着测试标识。