l_rocketmq/consumer_test.go

63 lines
1.6 KiB
Go
Raw Permalink Normal View History

2025-02-21 11:07:33 +08:00
package mq
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
)
func TestConsumer_Start(t *testing.T) {
initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_consumer")
manager := NewConsumerManager(nil)
ctx := context.Background()
consumerConf := &ConsumerConfig{
TopicName: "test_sdk_sync",
GroupName: "test_sdk_sync_myConsumer",
PerCoroutineCnt: 2,
}
connConf := &ConsumerConnConfig{
NameServers: "192.168.6.107:9876",
}
i := atomic.Int32{}
err := manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error {
cnt := i.Add(1)
fmt.Printf("%d开始消费%s body=%s \n", cnt, message.MsgId, message.Body)
fmt.Println(message.Properties)
// mock 业务耗时
time.Sleep(1 * time.Second)
fmt.Printf("%d已完成%s body=%s \n", cnt, message.MsgId, message.Body)
return nil
})
fmt.Println(err)
consumerConf = &ConsumerConfig{
TopicName: "test_sdk_async",
GroupName: "test_sdk_async_myConsumer",
PerCoroutineCnt: 2,
}
err = manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error {
cnt := i.Add(1)
fmt.Printf("%d开始消费%s body=%s \n", cnt, message.MsgId, message.Body)
fmt.Println(message.Properties)
// mock 业务耗时
time.Sleep(1 * time.Second)
fmt.Printf("%d已完成%s body=%s \n", cnt, message.MsgId, message.Body)
return nil
})
fmt.Println(err)
_ = manager.Start(context.Background())
time.Sleep(5 * time.Second)
// mock 退出
_ = manager.Stop(context.Background())
}