为什么需要OpenTracing
OpenTracing通过提供平台无关、厂商无关的API,使得开发人员能够方便的添加(或更换)追踪系统的实现。 OpenTracing提供了用于运营支撑系统的和针对特定平台的辅助程序库。程序库的具体信息请参考详细的规范。
OpenTracing数据模型
- Trace: 调用链, 其中包含了多个Span.
- Span: 跨度, 计量的最小单位, 每个跨度都有开始时间与截止时间. Span和Span之间可以存在References(关系): ChildOf 与 FollowsFrom
Jaeger 架构
- Jaeger Client - 为不同语言实现了符合 OpenTracing 标准的 SDK。应用程序通过 API 写入数据,client library 把 trace 信息按照应用程序指定的采样策略传递给 jaeger-agent。在 Application 中调用 Jaeger Client Library 记录 Span 的过程通常被称为埋点。
- Agent - 它是一个监听在 UDP 端口上接收 span 数据的网络守护进程,它会将数据批量发送给 collector。它被设计成一个基础组件,部署到所有的宿主机上。Agent 将 client library 和 collector 解耦,为 client library 屏蔽了路由和发现 collector 的细节。
- Collector - 接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。Collector 被设计成无状态的组件,因此您可以同时运行任意数量的 jaeger-collector。
- Data Store - 后端存储被设计成一个可插拔的组件,支持将数据写入 cassandra、elastic search。
- Query - 接收查询请求,然后从后端存储系统中检索 trace 并通过 UI 进行展示。Query 是无状态的,您可以启动多个实例,把它们部署在 nginx 这样的负载均衡器后面。
安装
官方:https://www.jaegertracing.io/
dock安装:
$ docker run -d --name jaeger \ -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ -p 5775:5775/udp \ -p 6831:6831/udp \ -p 6832:6832/udp \ -p 5778:5778 \ -p 16686:16686 \ -p 14268:14268 \ -p 9411:9411 \ jaegertracing/all-in-one:1.19
安装完成之后, 你可以访问http://域名或ip:16686来访问JaegerUI,
客户端
官方提供了Go/Java/Node.js/Python/C++/C#语言的客户端库
package main
import (
"context"
"log"
"time"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
var cfg = jaegercfg.Configuration{
Sampler: &jaegercfg.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort: "zhaohaiyu.com:6831",
},
}
func test() {
closer, err := cfg.InitGlobalTracer(
"hello-world",
)
if err != nil {
log.Printf("Could not initialize jaeger tracer: %s", err.Error())
return
}
defer closer.Close()
var ctx = context.TODO()
span1, ctx := opentracing.StartSpanFromContext(ctx, "helloworld-1")
time.Sleep(time.Millisecond * 400)
span11, _ := opentracing.StartSpanFromContext(ctx, "hellowrld-2")
time.Sleep(time.Millisecond * 500)
span11.Finish()
span1.Finish()
}
func main() {
test()
}点击进入第一个请求:
可以看到在服务的调用过程中各个span的时间,这个span可以是一个微服务之间的调用也可以是某个方法的调用。
点开某个span也能看到额外的log信息。
函数之间使用
package main
import (
"context"
"io"
"os"
"time"
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
cfg := &jaegercfg.Configuration{
Sampler: &jaegercfg.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort:"zhaohaiyu.com:6831",
},
}
tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
return tracer, closer
}
func formatString(ctx context.Context, helloTo string) string {
span, _ := opentracing.StartSpanFromContext(ctx, "formatString")
defer span.Finish()
helloStr := fmt.Sprintf("Hello, %s!", helloTo)
span.LogFields(
log.String("event", "string-format"),
log.String("value", helloStr),
)
time.Sleep(time.Second * 2)
return helloStr
}
func printHello(ctx context.Context, helloStr string) {
span, _ := opentracing.StartSpanFromContext(ctx, "printHello")
defer span.Finish()
println(helloStr)
span.LogKV("event", "println")
time.Sleep(time.Second)
}
func main() {
if len(os.Args) != 2 {
panic("ERROR: Expecting one argument")
}
tracer, closer := initJaeger("function-demo")
defer closer.Close()
opentracing.SetGlobalTracer(tracer)
helloTo := os.Args[1]
span := tracer.StartSpan("say-hello")
span.SetTag("hello-to", helloTo)
defer span.Finish()
ctx := opentracing.ContextWithSpan(context.Background(), span)
helloStr := formatString(ctx, helloTo)
printHello(ctx, helloStr)
}编译执行:funcdemo.exe 赵海宇
http之间使用
server端publish
package main
import (
"io"
"net/http"
"fmt"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
func main() {
tracer, closer := initJaeger("http-demo")
defer closer.Close()
http.HandleFunc("/publish", func(w http.ResponseWriter, r *http.Request) {
spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
span := tracer.StartSpan("publish", ext.RPCServerOption(spanCtx))
defer span.Finish()
helloStr := r.FormValue("helloStr")
println(helloStr)
})
fmt.Println(http.ListenAndServe(":8848", nil))
}
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
cfg := &jaegercfg.Configuration{
Sampler: &jaegercfg.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort:"zhaohaiyu.com:6831",
},
}
tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
return tracer, closer
}server端format
package main
import (
"fmt"
"io"
"net/http"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
func main() {
tracer, closer := initJaeger("http-formatter")
defer closer.Close()
http.HandleFunc("/format", func(w http.ResponseWriter, r *http.Request) {
spanCtx, _ := tracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
span := tracer.StartSpan("format", ext.RPCServerOption(spanCtx))
defer span.Finish()
helloTo := r.FormValue("helloTo")
helloStr := fmt.Sprintf("Hello, %s!", helloTo)
span.LogFields(
log.String("event", "string-format"),
log.String("value", helloStr),
)
w.Write([]byte(helloStr))
})
fmt.Println(http.ListenAndServe(":8081", nil))
}
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
cfg := &jaegercfg.Configuration{
Sampler: &jaegercfg.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort:"zhaohaiyu.com:6831",
},
}
tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
return tracer, closer
}client端
package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/uber/jaeger-client-go"
jaegercfg "github.com/uber/jaeger-client-go/config"
)
func main() {
if len(os.Args) != 2 {
panic("ERROR: Expecting one argument")
}
tracer, closer := initJaeger("http-demo-client")
defer closer.Close()
opentracing.SetGlobalTracer(tracer)
helloTo := os.Args[1]
span := tracer.StartSpan("say-hello")
span.SetTag("hello-to", helloTo)
defer span.Finish()
ctx := opentracing.ContextWithSpan(context.Background(), span)
helloStr := formatString(ctx, helloTo)
printHello(ctx, helloStr)
fmt.Println("exit")
}
func initJaeger(service string) (opentracing.Tracer, io.Closer) {
cfg := &jaegercfg.Configuration{
Sampler: &jaegercfg.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jaegercfg.ReporterConfig{
LogSpans: true,
LocalAgentHostPort:"zhaohaiyu.com:6831",
},
}
tracer, closer, err := cfg.New(service, jaegercfg.Logger(jaeger.StdLogger))
if err != nil {
panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
}
return tracer, closer
}
func formatString(ctx context.Context, helloTo string) string {
span, _ := opentracing.StartSpanFromContext(ctx, "formatString")
defer span.Finish()
v := url.Values{}
v.Set("helloTo", helloTo)
url := "http://localhost:8081/format?" + v.Encode()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
panic(err.Error())
}
ext.SpanKindRPCClient.Set(span)
ext.HTTPUrl.Set(span, url)
ext.HTTPMethod.Set(span, "GET")
span.Tracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
resp, err := httpDo(req)
if err != nil {
panic(err.Error())
}
helloStr := string(resp)
span.LogFields(
log.String("event", "string-format"),
log.String("value", helloStr),
)
return helloStr
}
func printHello(ctx context.Context, helloStr string) {
span, _ := opentracing.StartSpanFromContext(ctx, "printHello")
defer span.Finish()
v := url.Values{}
v.Set("helloStr", helloStr)
url := "http://localhost:8848/publish?" + v.Encode()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
panic(err.Error())
}
ext.SpanKindRPCClient.Set(span)
ext.HTTPUrl.Set(span, url)
ext.HTTPMethod.Set(span, "GET")
span.Tracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
if _, err := httpDo(req); err != nil {
panic(err.Error())
}
}
func httpDo(req *http.Request) ([]byte, error) {
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("StatusCode: %d, Body: %s", resp.StatusCode, body)
}
return body, nil
}编译执行:
数据消失
由于客户端和Jaeger-Agent之间是通过UDP协议传输的, 所以如果测试服务器与Jager-Agent服务之间是外网网络环境, 则可能会导致丢包, 通常包越大越容易丢包.
解决办法是将Agent部署到本机, 不过在开发环境为了方便也可以将客户端配置使用Jaeger-Collector, 这时会使用HTTP协议发送Spans.
基于gRPC的openTracing中间件
初始化:
func InitTrace(serviceName, reportAddr, sampleType string, rate float64) (err error) {
transport := transport.NewHTTPTransport(
reportAddr,
transport.HTTPBatchSize(16),
)
cfg := &config.Configuration{
Sampler: &config.SamplerConfig{
Type: sampleType,
Param: rate,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
},
}
r := jaeger.NewRemoteReporter(transport)
tracer, closer, err := cfg.New(serviceName,
config.Logger(jaeger.StdLogger),
config.Reporter(r))
if err != nil {
fmt.Printf("ERROR: cannot init Jaeger: %v\n", err)
return
}
_ = closer
opentracing.SetGlobalTracer(tracer)
return
}middleware
func TraceServerMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
//从ctx获取grpc的metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
//没有的话,新建一个
md = metadata.Pairs()
}
tracer := opentracing.GlobalTracer()
parentSpanContext, err := tracer.Extract(opentracing.HTTPHeaders, metadataTextMap(md))
if err != nil && err != opentracing.ErrSpanContextNotFound {
logs.Warn(ctx, "trace extract failed, parsing trace information: %v", err)
}
serverMeta := meta.GetServerMeta(ctx)
//开始追踪该方法
serverSpan := tracer.StartSpan(
serverMeta.Method,
ext.RPCServerOption(parentSpanContext),
ext.SpanKindRPCServer,
)
serverSpan.SetTag(util.TraceID, logs.GetTraceId(ctx))
ctx = opentracing.ContextWithSpan(ctx, serverSpan)
resp, err = next(ctx, req)
//记录错误
if err != nil {
ext.Error.Set(serverSpan, true)
serverSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
serverSpan.Finish()
return
}
}
func TraceRpcMiddleware(next MiddlewareFunc) MiddlewareFunc {
return func(ctx context.Context, req interface{}) (resp interface{}, err error) {
tracer := opentracing.GlobalTracer()
var parentSpanCtx opentracing.SpanContext
if parent := opentracing.SpanFromContext(ctx); parent != nil {
parentSpanCtx = parent.Context()
}
opts := []opentracing.StartSpanOption{
opentracing.ChildOf(parentSpanCtx),
ext.SpanKindRPCClient,
opentracing.Tag{Key: string(ext.Component), Value: "koala_rpc"},
opentracing.Tag{Key: util.TraceID, Value: logs.GetTraceId(ctx)},
}
rpcMeta := meta.GetRpcMeta(ctx)
clientSpan := tracer.StartSpan(rpcMeta.ServiceName, opts...)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
logs.Debug(ctx, "grpc_opentracing: failed serializing trace information: %v", err)
}
ctx = metadata.NewOutgoingContext(ctx, md)
ctx = metadata.AppendToOutgoingContext(ctx, util.TraceID, logs.GetTraceId(ctx))
ctx = opentracing.ContextWithSpan(ctx, clientSpan)
resp, err = next(ctx, req)
//记录错误
if err != nil {
ext.Error.Set(clientSpan, true)
clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
}
clientSpan.Finish()
return
}
}
京公网安备 11010502036488号