Devean Medium
Distributed Tracing
February 9, 2024

Distributed Tracing

Posted on February 9, 2024  •  3 minutes  • 558 words
Table of contents

What is Distributed Tracing?

Distributed tracing, also known as distributed request tracking, is a methodology for analyzing and monitoring distributed service profiles, especially for failures that occur for unknown and degraded performance reasons.

Core concepts

Why Distributed Tracking is Needed?

With the growth of business volume, monolithic services have been unable to meet the demand, so SOA servitization and microservices, and multiple instances of each service are deployed, resulting in increased difficulty in troubleshooting and performance issues.

What can we do with distributed tracing?

The implementation principle of distributed tracing

Cross thread:Thread Local transmits information such as the trace Id cross process:Pass traceId and other information through headers encapsulating RPC, HTTP, and MQ protocols

Implementation methodology:

Industry middleware SDK packaging, manual processing where needed Compiler bytecode instrumentation

GlobalTracing

public class GlobalTracing {

  private static final ThreadLocal<String> TRACE_ID_LOCAL = new ThreadLocal<>();
  public static final String TRACE_ID = "trace_id";


  private GlobalTracing() {
  }


  public static void setTraceId(String traceId) {
      TRACE_ID_LOCAL.set(traceId);
      LogUtils.setTraceId(traceId);
  }

  public static String getTraceId() {
      return TRACE_ID_LOCAL.get();
  }

  public static void remove() {
      TRACE_ID_LOCAL.remove();
  }
}

BaseProcessor

 @Override
   public Response execute0(RstMsgType rmt, GeneratedMessageV3 req) {
       try {
           GlobalTracing.setTraceId(getMessage().getTraceId());
           return this.execute((T) req);
       }finally {
           GlobalTracing.remove();
       }
   }

Grpc Client

public class TracingClientInterceptor implements ClientInterceptor {
   private static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of(GlobalTracing.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER);

   @Override
   public <ReqT, RespT> ForwardingClientCall.SimpleForwardingClientCall interceptCall(MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {

       return new ForwardingClientCall.SimpleForwardingClientCall(channel.newCall(methodDescriptor, callOptions)) {
           @Override
           protected ClientCall delegate() {
               return super.delegate();
           }

           @Override
           public void start(Listener responseListener, Metadata headers) {
               String traceId = GlobalTracing.getTraceId();
               if (traceId != null) {
                   headers.put(TRACE_ID_KEY, traceId);
               }
               super.start(responseListener, headers);
           }
      }
}

Grpc Server


public class TracingServerInterceptor implements ServerInterceptor {
    private static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of(GlobalTracing.TRACE_ID, Metadata.ASCII_STRING_MARSHALLER);

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata header, ServerCallHandler<ReqT, RespT> serverCallHandler) {
        Set<String> keys = header.keys();
        for (String key : keys) {
            if (GlobalTracing.TRACE_ID.equals(key)) {
                String traceId = header.get(TRACE_ID_KEY);
                header.removeAll(TRACE_ID_KEY);
                GlobalTracing.setTraceId(traceId);
                break;
            }
        }
        return new TracingForwardingServerCallListener(serverCallHandler.startCall(serverCall, header), serverCall.getMethodDescriptor());
    }
}

Feign

public class FeignTraceInterceptor implements RequestInterceptor {

    @Override
    public void apply(RequestTemplate template) {
        String traceId = TraceContext.getTraceId();
        if (traceId != null) {
            MDC.put(TraceContext.LOG_PATTER_TRACE_ID, traceId);
            template.header(TraceContext.TRACE_ID_KEY, traceId);
        }
    }
}

public class TraceHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(TraceHystrixConcurrencyStrategy.class);


    public TraceHystrixConcurrencyStrategy() {
        LOG.info("TraceHystrixConcurrencyStrategy init!");
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new CallableDecorator<>(callable);
    }
}

In-process cross-thread

Runnnable Interface proxy encapsulation

    public class RunnableWrapper implements Runnable {
    final Runnable runnable;
    final String traceId;


    public RunnableWrapper(Runnable runnable) {
        this.traceId = GlobalTracing.getTraceId();
        this.runnable = runnable;
    }

    @Override
    public void run() {
        if (this.traceId != null) {
            GlobalTracing.setTraceId(this.traceId);
            this.runnable.run();
        } else {
            this.runnable.run();
        }
    }
}
ExcutorService Proxy encapsulation


public class TracingExecutorServiceImpl implements ExecutorService {

    private ExecutorService executorService;


    public TracingExecutorServiceImpl(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return executorService.submit(new CallableWrapper<>(task));
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return executorService.submit(new RunnableWrapper(task), result);
    }
}
代理ExecutorService

public final class ConcurrentUtils {

    public static final ExecutorService EXECUTOR = new TracingExecutorServiceImpl(Executors.newFixedThreadPool(96, new BasicThreadFactory.Builder()
            .namingPattern("utils-execute-%d")
            .uncaughtExceptionHandler((t, e) -> log.error("execute runnable failed: ", e))
            .build()));
    
Spring的TaskDecorator的实现
public class TraceTaskDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
        return new RunnableWrapper(runnable);
    }
}

reference

Dapper, a Large-Scale Distributed Systems Tracing Infrastructure