为什么需要OpenTracing
OpenTracing通过提供平台无关、厂商无关的API,使得开发人员能够方便的添加(或更换)追踪系统的实现。 OpenTracing提供了用于运营支撑系统的和针对特定平台的辅助程序库。程序库的具体信息请参考详细的规范。
OpenTracing数据模型
- Trace: 调用链, 其中包含了多个Span.
- Span: 跨度, 计量的最小单位, 每个跨度都有开始时间与截止时间. Span和Span之间可以存在References(关系): ChildOf 与 FollowsFrom
Jaeger 架构
- Jaeger Client - 为不同语言实现了符合 OpenTracing 标准的 SDK。应用程序通过 API 写入数据,client library 把 trace 信息按照应用程序指定的采样策略传递给 jaeger-agent。在 Application 中调用 Jaeger Client Library 记录 Span 的过程通常被称为埋点。
- Agent - 它是一个监听在 UDP 端口上接收 span 数据的网络守护进程,它会将数据批量发送给 collector。它被设计成一个基础组件,部署到所有的宿主机上。Agent 将 client library 和 collector 解耦,为 client library 屏蔽了路由和发现 collector 的细节。
- Collector - 接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。Collector 被设计成无状态的组件,因此您可以同时运行任意数量的 jaeger-collector。
- Data Store - 后端存储被设计成一个可插拔的组件,支持将数据写入 cassandra、elastic search。
- Query - 接收查询请求,然后从后端存储系统中检索 trace 并通过 UI 进行展示。Query 是无状态的,您可以启动多个实例,把它们部署在 nginx 这样的负载均衡器后面。
安装
官方:https://www.jaegertracing.io/
dock安装:
$ docker run -d --name jaeger \ -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ -p 5775:5775/udp \ -p 6831:6831/udp \ -p 6832:6832/udp \ -p 5778:5778 \ -p 16686:16686 \ -p 14268:14268 \ -p 9411:9411 \ jaegertracing/all-in-one:1.19
安装完成之后, 你可以访问http://域名或ip:16686来访问JaegerUI,
客户端
官方提供了Go/Java/Node.js/Python/C++/C#语言的客户端库
package main import ( "context" "log" "time" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" ) var cfg = jaegercfg.Configuration{ Sampler: &jaegercfg.SamplerConfig{ Type: jaeger.SamplerTypeConst, Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ LogSpans: true, LocalAgentHostPort: "zhaohaiyu.com:6831", }, } func test() { closer, err := cfg.InitGlobalTracer( "hello-world", ) if err != nil { log.Printf("Could not initialize jaeger tracer: %s", err.Error()) return } defer closer.Close() var ctx = context.TODO() span1, ctx := opentracing.StartSpanFromContext(ctx, "helloworld-1") time.Sleep(time.Millisecond * 400) span11, _ := opentracing.StartSpanFromContext(ctx, "hellowrld-2") time.Sleep(time.Millisecond * 500) span11.Finish() span1.Finish() } func main() { test() }
点击进入第一个请求:
可以看到在服务的调用过程中各个span的时间,这个span可以是一个微服务之间的调用也可以是某个方法的调用。
点开某个span也能看到额外的log信息。
函数之间使用
package main import ( "context" "io" "os" "time" "fmt" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/log" "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" ) func initJaeger(service string) (opentracing.Tracer, io.Closer) { cfg := &jaegercfg.Configuration{ Sampler: &jaegercfg.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ LogSpans: true, LocalAgentHostPort:"zhaohaiyu.com:6831", }, } tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } return tracer, closer } func formatString(ctx context.Context, helloTo string) string { span, _ := opentracing.StartSpanFromContext(ctx, "formatString") defer span.Finish() helloStr := fmt.Sprintf("Hello, %s!", helloTo) span.LogFields( log.String("event", "string-format"), log.String("value", helloStr), ) time.Sleep(time.Second * 2) return helloStr } func printHello(ctx context.Context, helloStr string) { span, _ := opentracing.StartSpanFromContext(ctx, "printHello") defer span.Finish() println(helloStr) span.LogKV("event", "println") time.Sleep(time.Second) } func main() { if len(os.Args) != 2 { panic("ERROR: Expecting one argument") } tracer, closer := initJaeger("function-demo") defer closer.Close() opentracing.SetGlobalTracer(tracer) helloTo := os.Args[1] span := tracer.StartSpan("say-hello") span.SetTag("hello-to", helloTo) defer span.Finish() ctx := opentracing.ContextWithSpan(context.Background(), span) helloStr := formatString(ctx, helloTo) printHello(ctx, helloStr) }
编译执行:funcdemo.exe 赵海宇
http之间使用
server端publish
package main import ( "io" "net/http" "fmt" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" ) func main() { tracer, closer := initJaeger("http-demo") defer closer.Close() http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) { spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) span := tracer.StartSpan("publish", ext.RPCServerOption(spanCtx)) defer span.Finish() helloStr := r.FormValue("helloStr") println(helloStr) }) fmt.Println(http.ListenAndServe(":8848", nil)) } func initJaeger(service string) (opentracing.Tracer, io.Closer) { cfg := &jaegercfg.Configuration{ Sampler: &jaegercfg.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ LogSpans: true, LocalAgentHostPort:"zhaohaiyu.com:6831", }, } tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } return tracer, closer }
server端format
package main import ( "fmt" "io" "net/http" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go/log" "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" ) func main() { tracer, closer := initJaeger("http-formatter") defer closer.Close() http.HandleFunc("/format", func(w http.ResponseWriter, r *http.Request) { spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) span := tracer.StartSpan("format", ext.RPCServerOption(spanCtx)) defer span.Finish() helloTo := r.FormValue("helloTo") helloStr := fmt.Sprintf("Hello, %s!", helloTo) span.LogFields( log.String("event", "string-format"), log.String("value", helloStr), ) w.Write([]byte(helloStr)) }) fmt.Println(http.ListenAndServe(":8081", nil)) } func initJaeger(service string) (opentracing.Tracer, io.Closer) { cfg := &jaegercfg.Configuration{ Sampler: &jaegercfg.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ LogSpans: true, LocalAgentHostPort:"zhaohaiyu.com:6831", }, } tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } return tracer, closer }
client端
package main import ( "context" "fmt" "io" "io/ioutil" "net/http" "net/url" "os" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go/log" "github.com/uber/jaeger-client-go" jaegercfg "github.com/uber/jaeger-client-go/config" ) func main() { if len(os.Args) != 2 { panic("ERROR: Expecting one argument") } tracer, closer := initJaeger("http-demo-client") defer closer.Close() opentracing.SetGlobalTracer(tracer) helloTo := os.Args[1] span := tracer.StartSpan("say-hello") span.SetTag("hello-to", helloTo) defer span.Finish() ctx := opentracing.ContextWithSpan(context.Background(), span) helloStr := formatString(ctx, helloTo) printHello(ctx, helloStr) fmt.Println("exit") } func initJaeger(service string) (opentracing.Tracer, io.Closer) { cfg := &jaegercfg.Configuration{ Sampler: &jaegercfg.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &jaegercfg.ReporterConfig{ LogSpans: true, LocalAgentHostPort:"zhaohaiyu.com:6831", }, } tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } return tracer, closer } func formatString(ctx context.Context, helloTo string) string { span, _ := opentracing.StartSpanFromContext(ctx, "formatString") defer span.Finish() v := url.Values{} v.Set("helloTo", helloTo) url := "http://localhost:8081/format?" + v.Encode() req, err := http.NewRequest("GET", url, nil) if err != nil { panic(err.Error()) } ext.SpanKindRPCClient.Set(span) ext.HTTPUrl.Set(span, url) ext.HTTPMethod.Set(span, "GET") span.Tracer().Inject( span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header), ) resp, err := httpDo(req) if err != nil { panic(err.Error()) } helloStr := string(resp) span.LogFields( log.String("event", "string-format"), log.String("value", helloStr), ) return helloStr } func printHello(ctx context.Context, helloStr string) { span, _ := opentracing.StartSpanFromContext(ctx, "printHello") defer span.Finish() v := url.Values{} v.Set("helloStr", helloStr) url := "http://localhost:8848/publish?" + v.Encode() req, err := http.NewRequest("GET", url, nil) if err != nil { panic(err.Error()) } ext.SpanKindRPCClient.Set(span) ext.HTTPUrl.Set(span, url) ext.HTTPMethod.Set(span, "GET") span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header)) if _, err := httpDo(req); err != nil { panic(err.Error()) } } func httpDo(req *http.Request) ([]byte, error) { resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, err } if resp.StatusCode != 200 { return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body) } return body, nil }
编译执行:
数据消失
由于客户端和Jaeger-Agent之间是通过UDP协议传输的, 所以如果测试服务器与Jager-Agent服务之间是外网网络环境, 则可能会导致丢包, 通常包越大越容易丢包.
解决办法是将Agent部署到本机, 不过在开发环境为了方便也可以将客户端配置使用Jaeger-Collector, 这时会使用HTTP协议发送Spans.
基于gRPC的openTracing中间件
初始化:
func InitTrace(serviceName, reportAddr, sampleType string, rate float64) (err error) { transport := transport.NewHTTPTransport( reportAddr, transport.HTTPBatchSize(16), ) cfg := &config.Configuration{ Sampler: &config.SamplerConfig{ Type: sampleType, Param: rate, }, Reporter: &config.ReporterConfig{ LogSpans: true, }, } r := jaeger.NewRemoteReporter(transport) tracer, closer, err := cfg.New(serviceName, config.Logger(jaeger.StdLogger), config.Reporter(r)) if err != nil { fmt.Printf("ERROR: cannot init Jaeger: %v\n", err) return } _ = closer opentracing.SetGlobalTracer(tracer) return }
middleware
func TraceServerMiddleware(next MiddlewareFunc) MiddlewareFunc { return func(ctx context.Context, req interface{}) (resp interface{}, err error) { //从ctx获取grpc的metadata md, ok := metadata.FromIncomingContext(ctx) if !ok { //没有的话,新建一个 md = metadata.Pairs() } tracer := opentracing.GlobalTracer() parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md)) if err != nil && err != opentracing.ErrSpanContextNotFound { logs.Warn(ctx, "trace extract failed, parsing trace information: %v", err) } serverMeta := meta.GetServerMeta(ctx) //开始追踪该方法 serverSpan := tracer.StartSpan( serverMeta.Method, ext.RPCServerOption(parentSpanContext), ext.SpanKindRPCServer, ) serverSpan.SetTag(util.TraceID, logs.GetTraceId(ctx)) ctx = opentracing.ContextWithSpan(ctx, serverSpan) resp, err = next(ctx, req) //记录错误 if err != nil { ext.Error.Set(serverSpan, true) serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) } serverSpan.Finish() return } } func TraceRpcMiddleware(next MiddlewareFunc) MiddlewareFunc { return func(ctx context.Context, req interface{}) (resp interface{}, err error) { tracer := opentracing.GlobalTracer() var parentSpanCtx opentracing.SpanContext if parent := opentracing.SpanFromContext(ctx); parent != nil { parentSpanCtx = parent.Context() } opts := []opentracing.StartSpanOption{ opentracing.ChildOf(parentSpanCtx), ext.SpanKindRPCClient, opentracing.Tag{Key: string(ext.Component), Value: "koala_rpc"}, opentracing.Tag{Key: util.TraceID, Value: logs.GetTraceId(ctx)}, } rpcMeta := meta.GetRpcMeta(ctx) clientSpan := tracer.StartSpan(rpcMeta.ServiceName, opts...) md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.Pairs() } if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil { logs.Debug(ctx, "grpc_opentracing: failed serializing trace information: %v", err) } ctx = metadata.NewOutgoingContext(ctx, md) ctx = metadata.AppendToOutgoingContext(ctx, util.TraceID, logs.GetTraceId(ctx)) ctx = opentracing.ContextWithSpan(ctx, clientSpan) resp, err = next(ctx, req) //记录错误 if err != nil { ext.Error.Set(clientSpan, true) clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error())) } clientSpan.Finish() return } }