l_rocketmq/producer_test.go

88 lines
2.4 KiB
Go

package mq
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
// nolint
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"os"
"testing"
"time"
)
func TestProducer_SendSync(t *testing.T) {
initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_producer")
//http://192.168.6.107:9876/
p, err := NewProducer("192.168.6.107:9876")
//p, err := NewProducer("192.168.6.107:9876", WithProducerCredentials("", "", ""))
if err != nil {
t.Fatal(err)
}
err = p.Start()
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
err = p.SendSync(
context.Background(),
"test_sdk_sync",
[]byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)),
WithSendKeysOption([]string{"ttt"}),
WithSendTagOption("TagA"),
WithSendWithPropertyOption("pk", "pv"),
)
if err != nil {
fmt.Println("同步发送失败:", err)
}
err = p.SendAsync(
context.Background(),
"test_sdk_async",
[]byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)),
func(err error) {
if err != nil {
//出错了
fmt.Println("异步回调错误", err)
} else {
fmt.Println("异步回调成功")
}
},
WithSendKeysOption([]string{"ttt"}),
WithSendTagOption("TagA"),
WithSendWithPropertyOption("pk", "pv"),
)
if err != nil {
fmt.Println("异步发起失败", err)
}
}
_ = p.Shutdown()
time.Sleep(1 * time.Second) // 防止测试时span未上报完成
}
func initTracer(url string, sampler float64, name string) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
panic(fmt.Errorf("failed to initialize jaeger exporter:%s", url))
}
hostName, _ := os.Hostname()
tp := trace.NewTracerProvider(
// 将基于父span的采样率设置为100%
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(sampler))),
// 单元测试使用同步发送
//trace.WithBatcher(exporter),
trace.WithSyncer(exporter),
// 在资源中记录有关此应用程序的信息
trace.WithResource(resource.NewSchemaless(
semconv.ServiceNameKey.String(name),
semconv.ServiceVersionKey.String("v1.0.0"),
semconv.ServiceInstanceIDKey.String(hostName),
semconv.GCPGceInstanceHostnameKey.String(hostName),
)),
)
otel.SetTracerProvider(tp)
}