为什么需要OpenTracing

OpenTracing通过提供平台无关、厂商无关的API,使得开发人员能够方便的添加(或更换)追踪系统的实现。 OpenTracing提供了用于运营支撑系统的和针对特定平台的辅助程序库。程序库的具体信息请参考详细的规范。

OpenTracing数据模型

  • Trace: 调用链, 其中包含了多个Span.
  • Span: 跨度, 计量的最小单位, 每个跨度都有开始时间与截止时间. Span和Span之间可以存在References(关系): ChildOf 与 FollowsFrom

Jaeger 架构

  • Jaeger Client - 为不同语言实现了符合 OpenTracing 标准的 SDK。应用程序通过 API 写入数据,client library 把 trace 信息按照应用程序指定的采样策略传递给 jaeger-agent。在 Application 中调用 Jaeger Client Library 记录 Span 的过程通常被称为埋点。
  • Agent - 它是一个监听在 UDP 端口上接收 span 数据的网络守护进程,它会将数据批量发送给 collector。它被设计成一个基础组件,部署到所有的宿主机上。Agent 将 client library 和 collector 解耦,为 client library 屏蔽了路由和发现 collector 的细节。
  • Collector - 接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。Collector 被设计成无状态的组件,因此您可以同时运行任意数量的 jaeger-collector。
  • Data Store - 后端存储被设计成一个可插拔的组件,支持将数据写入 cassandra、elastic search。
  • Query - 接收查询请求,然后从后端存储系统中检索 trace 并通过 UI 进行展示。Query 是无状态的,您可以启动多个实例,把它们部署在 nginx 这样的负载均衡器后面。

安装

官方:https://www.jaegertracing.io/

dock安装:

$ docker run -d --name jaeger \
  -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
  -p 5775:5775/udp \
  -p 6831:6831/udp \
  -p 6832:6832/udp \
  -p 5778:5778 \
  -p 16686:16686 \
  -p 14268:14268 \
  -p 9411:9411 \
  jaegertracing/all-in-one:1.19

安装完成之后, 你可以访问http://域名或ip:16686来访问JaegerUI,

客户端

官方提供了Go/Java/Node.js/Python/C++/C#语言的客户端库

package main
import (
    "context"
    "log"
    "time"
    "github.com/opentracing/opentracing-go"
    "github.com/uber/jaeger-client-go"
    jaegercfg "github.com/uber/jaeger-client-go/config"
)
var cfg = jaegercfg.Configuration{
    Sampler: &jaegercfg.SamplerConfig{
        Type:  jaeger.SamplerTypeConst,
        Param: 1,
    },
    Reporter: &jaegercfg.ReporterConfig{
        LogSpans:           true,
        LocalAgentHostPort: "zhaohaiyu.com:6831", 
    },
}
func test() {

    closer, err := cfg.InitGlobalTracer(
        "hello-world",
    )
    if err != nil {
        log.Printf("Could not initialize jaeger tracer: %s", err.Error())
        return
    }
    defer closer.Close()
    var ctx = context.TODO()
    span1, ctx := opentracing.StartSpanFromContext(ctx, "helloworld-1")
    time.Sleep(time.Millisecond * 400)
    span11, _ := opentracing.StartSpanFromContext(ctx, "hellowrld-2")
    time.Sleep(time.Millisecond * 500)
    span11.Finish()
    span1.Finish()
}
func main() {
    test()
}

点击进入第一个请求:

可以看到在服务的调用过程中各个span的时间,这个span可以是一个微服务之间的调用也可以是某个方法的调用。

点开某个span也能看到额外的log信息。

函数之间使用

package main
import (
    "context"
    "io"
    "os"
    "time"
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/log"
    "github.com/uber/jaeger-client-go"
    jaegercfg "github.com/uber/jaeger-client-go/config"
)
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
    cfg := &jaegercfg.Configuration{
        Sampler: &jaegercfg.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &jaegercfg.ReporterConfig{
            LogSpans: true,
            LocalAgentHostPort:"zhaohaiyu.com:6831",
        },
    }
    tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
    if err != nil {
        panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }
    return tracer, closer
}
func formatString(ctx context.Context, helloTo string) string {
    span, _ := opentracing.StartSpanFromContext(ctx, "formatString")
    defer span.Finish()
    helloStr := fmt.Sprintf("Hello, %s!", helloTo)
    span.LogFields(
        log.String("event", "string-format"),
        log.String("value", helloStr),
    )
    time.Sleep(time.Second * 2)
    return helloStr
}
func printHello(ctx context.Context, helloStr string) {
    span, _ := opentracing.StartSpanFromContext(ctx, "printHello")
    defer span.Finish()
    println(helloStr)
    span.LogKV("event", "println")
    time.Sleep(time.Second)
}
func main() {
    if len(os.Args) != 2 {
        panic("ERROR: Expecting one argument")
    }
    tracer, closer := initJaeger("function-demo")
    defer closer.Close()
    opentracing.SetGlobalTracer(tracer)
    helloTo := os.Args[1]
    span := tracer.StartSpan("say-hello")
    span.SetTag("hello-to", helloTo)
    defer span.Finish()
    ctx := opentracing.ContextWithSpan(context.Background(), span)
    helloStr := formatString(ctx, helloTo)
    printHello(ctx, helloStr)
}

编译执行:funcdemo.exe 赵海宇

http之间使用

server端publish

package main
import (
    "io"
    "net/http"
    "fmt"
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/uber/jaeger-client-go"
    jaegercfg "github.com/uber/jaeger-client-go/config"
)
func main() {
    tracer, closer := initJaeger("http-demo")
    defer closer.Close()
    http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
        spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
        span := tracer.StartSpan("publish", ext.RPCServerOption(spanCtx))
        defer span.Finish()
        helloStr := r.FormValue("helloStr")
        println(helloStr)
    })
    fmt.Println(http.ListenAndServe(":8848", nil))
}
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
    cfg := &jaegercfg.Configuration{
        Sampler: &jaegercfg.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &jaegercfg.ReporterConfig{
            LogSpans: true,
            LocalAgentHostPort:"zhaohaiyu.com:6831",
        },
    }
    tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
    if err != nil {
        panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }
    return tracer, closer
}

server端format

package main
import (
    "fmt"
    "io"
    "net/http"
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/opentracing/opentracing-go/log"
    "github.com/uber/jaeger-client-go"
    jaegercfg "github.com/uber/jaeger-client-go/config"
)
func main() {
    tracer, closer := initJaeger("http-formatter")
    defer closer.Close()
    http.HandleFunc("/format", func(w http.ResponseWriter, r *http.Request) {
        spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
        span := tracer.StartSpan("format", ext.RPCServerOption(spanCtx))
        defer span.Finish()
        helloTo := r.FormValue("helloTo")
        helloStr := fmt.Sprintf("Hello, %s!", helloTo)
        span.LogFields(
            log.String("event", "string-format"),
            log.String("value", helloStr),
        )
        w.Write([]byte(helloStr))
    })
    fmt.Println(http.ListenAndServe(":8081", nil))
}
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
    cfg := &jaegercfg.Configuration{
        Sampler: &jaegercfg.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &jaegercfg.ReporterConfig{
            LogSpans: true,
            LocalAgentHostPort:"zhaohaiyu.com:6831",
        },
    }
    tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
    if err != nil {
        panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }
    return tracer, closer
}

client端

package main
import (
    "context"
    "fmt"
    "io"
    "io/ioutil"
    "net/http"
    "net/url"
    "os"
    "github.com/opentracing/opentracing-go"
    "github.com/opentracing/opentracing-go/ext"
    "github.com/opentracing/opentracing-go/log"
    "github.com/uber/jaeger-client-go"
    jaegercfg "github.com/uber/jaeger-client-go/config"
)
func main() {
    if len(os.Args) != 2 {
        panic("ERROR: Expecting one argument")
    }
    tracer, closer := initJaeger("http-demo-client")
    defer closer.Close()
    opentracing.SetGlobalTracer(tracer)
    helloTo := os.Args[1]
    span := tracer.StartSpan("say-hello")
    span.SetTag("hello-to", helloTo)
    defer span.Finish()
    ctx := opentracing.ContextWithSpan(context.Background(), span)
    helloStr := formatString(ctx, helloTo)
    printHello(ctx, helloStr)
    fmt.Println("exit")
}
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
    cfg := &jaegercfg.Configuration{
        Sampler: &jaegercfg.SamplerConfig{
            Type:  "const",
            Param: 1,
        },
        Reporter: &jaegercfg.ReporterConfig{
            LogSpans: true,
            LocalAgentHostPort:"zhaohaiyu.com:6831",
        },
    }
    tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
    if err != nil {
        panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
    }
    return tracer, closer
}
func formatString(ctx context.Context, helloTo string) string {
    span, _ := opentracing.StartSpanFromContext(ctx, "formatString")
    defer span.Finish()
    v := url.Values{}
    v.Set("helloTo", helloTo)
    url := "http://localhost:8081/format?" + v.Encode()
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        panic(err.Error())
    }
    ext.SpanKindRPCClient.Set(span)
    ext.HTTPUrl.Set(span, url)
    ext.HTTPMethod.Set(span, "GET")
    span.Tracer().Inject(
        span.Context(),
        opentracing.HTTPHeaders,
        opentracing.HTTPHeadersCarrier(req.Header),
    )
    resp, err := httpDo(req)
    if err != nil {
        panic(err.Error())
    }
    helloStr := string(resp)
    span.LogFields(
        log.String("event", "string-format"),
        log.String("value", helloStr),
    )
    return helloStr
}
func printHello(ctx context.Context, helloStr string) {
    span, _ := opentracing.StartSpanFromContext(ctx, "printHello")
    defer span.Finish()
    v := url.Values{}
    v.Set("helloStr", helloStr)
    url := "http://localhost:8848/publish?" + v.Encode()
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        panic(err.Error())
    }
    ext.SpanKindRPCClient.Set(span)
    ext.HTTPUrl.Set(span, url)
    ext.HTTPMethod.Set(span, "GET")
    span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
    if _, err := httpDo(req); err != nil {
        panic(err.Error())
    }
}
func httpDo(req *http.Request) ([]byte, error) {
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    if resp.StatusCode != 200 {
        return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body)
    }
    return body, nil
}

编译执行:

数据消失

由于客户端和Jaeger-Agent之间是通过UDP协议传输的, 所以如果测试服务器与Jager-Agent服务之间是外网网络环境, 则可能会导致丢包, 通常包越大越容易丢包.

解决办法是将Agent部署到本机, 不过在开发环境为了方便也可以将客户端配置使用Jaeger-Collector, 这时会使用HTTP协议发送Spans.

基于gRPC的openTracing中间件

初始化:

func InitTrace(serviceName, reportAddr, sampleType string, rate float64) (err error) {
    transport := transport.NewHTTPTransport(
        reportAddr,
        transport.HTTPBatchSize(16),
    )
    cfg := &config.Configuration{
        Sampler: &config.SamplerConfig{
            Type:  sampleType,
            Param: rate,
        },
        Reporter: &config.ReporterConfig{
            LogSpans: true,
        },
    }
    r := jaeger.NewRemoteReporter(transport)
    tracer, closer, err := cfg.New(serviceName,
        config.Logger(jaeger.StdLogger),
        config.Reporter(r))
    if err != nil {
        fmt.Printf("ERROR: cannot init Jaeger: %v\n", err)
        return
    }
    _ = closer
    opentracing.SetGlobalTracer(tracer)
    return
}

middleware

func TraceServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
    return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
        //从ctx获取grpc的metadata
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            //没有的话,新建一个
            md = metadata.Pairs()
        }
        tracer := opentracing.GlobalTracer()
        parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
        if err != nil && err != opentracing.ErrSpanContextNotFound {
            logs.Warn(ctx, "trace extract failed, parsing trace information: %v", err)
        }
        serverMeta := meta.GetServerMeta(ctx)
        //开始追踪该方法
        serverSpan := tracer.StartSpan(
            serverMeta.Method,
            ext.RPCServerOption(parentSpanContext),
            ext.SpanKindRPCServer,
        )
        serverSpan.SetTag(util.TraceID, logs.GetTraceId(ctx))
        ctx = opentracing.ContextWithSpan(ctx, serverSpan)
        resp, err = next(ctx, req)
        //记录错误
        if err != nil {
            ext.Error.Set(serverSpan, true)
            serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
        }
        serverSpan.Finish()
        return
    }
}
func TraceRpcMiddleware(next MiddlewareFunc) MiddlewareFunc {
    return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
        tracer := opentracing.GlobalTracer()
        var parentSpanCtx opentracing.SpanContext
        if parent := opentracing.SpanFromContext(ctx); parent != nil {
            parentSpanCtx = parent.Context()
        }
        opts := []opentracing.StartSpanOption{
            opentracing.ChildOf(parentSpanCtx),
            ext.SpanKindRPCClient,
            opentracing.Tag{Key: string(ext.Component), Value: "koala_rpc"},
            opentracing.Tag{Key: util.TraceID, Value: logs.GetTraceId(ctx)},
        }
        rpcMeta := meta.GetRpcMeta(ctx)
        clientSpan := tracer.StartSpan(rpcMeta.ServiceName, opts...)
        md, ok := metadata.FromOutgoingContext(ctx)
        if !ok {
            md = metadata.Pairs()
        }
        if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
            logs.Debug(ctx, "grpc_opentracing: failed serializing trace information: %v", err)
        }
        ctx = metadata.NewOutgoingContext(ctx, md)
        ctx = metadata.AppendToOutgoingContext(ctx, util.TraceID, logs.GetTraceId(ctx))
        ctx = opentracing.ContextWithSpan(ctx, clientSpan)
        resp, err = next(ctx, req)
        //记录错误
        if err != nil {
            ext.Error.Set(clientSpan, true)
            clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
        }
        clientSpan.Finish()
        return
    }
}