Distributed Tracing
Posted on February 9, 2024 • 3 minutes • 558 words
Table of contents
What is Distributed Tracing?
Distributed tracing, also known as distributed request tracking, is a methodology for analyzing and monitoring distributed service profiles, especially for failures that occur for unknown and degraded performance reasons.
Core concepts
traceId
:Identifies a unique ID generated by a single user requestspanId
:Identifies the position of this call in the call chain
Why Distributed Tracking is Needed?
With the growth of business volume, monolithic services have been unable to meet the demand, so SOA servitization and microservices, and multiple instances of each service are deployed, resulting in increased difficulty in troubleshooting and performance issues.
What can we do with distributed tracing?
- Fault Localization
- Log aggregation
- Performance analysis
- Service Dependency Topology View
The implementation principle of distributed tracing
Cross thread:Thread Local transmits information such as the trace Id cross process:Pass traceId and other information through headers encapsulating RPC, HTTP, and MQ protocols
Implementation methodology:
Industry middleware SDK packaging, manual processing where needed Compiler bytecode instrumentation
GlobalTracing
public class GlobalTracing {
private static final ThreadLocal<String> TRACE_ID_LOCAL = new ThreadLocal<>();
public static final String TRACE_ID = "trace_id";
private GlobalTracing() {
}
public static void setTraceId(String traceId) {
TRACE_ID_LOCAL.set(traceId);
LogUtils.setTraceId(traceId);
}
public static String getTraceId() {
return TRACE_ID_LOCAL.get();
}
public static void remove() {
TRACE_ID_LOCAL.remove();
}
}
BaseProcessor
@Override
public Response execute0(RstMsgType rmt, GeneratedMessageV3 req) {
try {
GlobalTracing.setTraceId(getMessage().getTraceId());
return this.execute((T) req);
}finally {
GlobalTracing.remove();
}
}
Grpc Client
public class TracingClientInterceptor implements ClientInterceptor {
private static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of(GlobalTracing.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ForwardingClientCall.SimpleForwardingClientCall interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
return new ForwardingClientCall.SimpleForwardingClientCall(channel.newCall(methodDescriptor, callOptions)) {
@Override
protected ClientCall delegate() {
return super.delegate();
}
@Override
public void start(Listener responseListener, Metadata headers) {
String traceId = GlobalTracing.getTraceId();
if (traceId != null) {
headers.put(TRACE_ID_KEY, traceId);
}
super.start(responseListener, headers);
}
}
}
Grpc Server
public class TracingServerInterceptor implements ServerInterceptor {
private static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of(GlobalTracing.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata header, ServerCallHandler<ReqT, RespT> serverCallHandler) {
Set<String> keys = header.keys();
for (String key : keys) {
if (GlobalTracing.TRACE_ID.equals(key)) {
String traceId = header.get(TRACE_ID_KEY);
header.removeAll(TRACE_ID_KEY);
GlobalTracing.setTraceId(traceId);
break;
}
}
return new TracingForwardingServerCallListener(serverCallHandler.startCall(serverCall, header), serverCall.getMethodDescriptor());
}
}
Feign
public class FeignTraceInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
String traceId = TraceContext.getTraceId();
if (traceId != null) {
MDC.put(TraceContext.LOG_PATTER_TRACE_ID, traceId);
template.header(TraceContext.TRACE_ID_KEY, traceId);
}
}
}
public class TraceHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private static final Logger LOG = LoggerFactory.getLogger(TraceHystrixConcurrencyStrategy.class);
public TraceHystrixConcurrencyStrategy() {
LOG.info("TraceHystrixConcurrencyStrategy init!");
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return new CallableDecorator<>(callable);
}
}
In-process cross-thread
Runnnable Interface proxy encapsulation
public class RunnableWrapper implements Runnable {
final Runnable runnable;
final String traceId;
public RunnableWrapper(Runnable runnable) {
this.traceId = GlobalTracing.getTraceId();
this.runnable = runnable;
}
@Override
public void run() {
if (this.traceId != null) {
GlobalTracing.setTraceId(this.traceId);
this.runnable.run();
} else {
this.runnable.run();
}
}
}
ExcutorService Proxy encapsulation
public class TracingExecutorServiceImpl implements ExecutorService {
private ExecutorService executorService;
public TracingExecutorServiceImpl(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(new CallableWrapper<>(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(new RunnableWrapper(task), result);
}
}
代理ExecutorService
public final class ConcurrentUtils {
public static final ExecutorService EXECUTOR = new TracingExecutorServiceImpl(Executors.newFixedThreadPool(96, new BasicThreadFactory.Builder()
.namingPattern("utils-execute-%d")
.uncaughtExceptionHandler((t, e) -> log.error("execute runnable failed: ", e))
.build()));
Spring的TaskDecorator的实现
public class TraceTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
return new RunnableWrapper(runnable);
}
}
reference
Dapper, a Large-Scale Distributed Systems Tracing Infrastructure