diff --git a/configs/config.yaml b/configs/config.yaml index 19f82fc..fcb932a 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -45,8 +45,6 @@ wechatNotifyMQ: topic: "notify" groupId: "GID_market_pro" tag: "voucher_notify_dev" - tags: - - voucher_notify_dev isOpenConsumer: false #是否启动消费 true/false registerTagUrl: "https://wpcallbacks.api.1688sup.com/wechatPay/register_tag" noticeStartDays: 7 diff --git a/configs/config_test.yaml b/configs/config_test.yaml index a53a813..7126656 100644 --- a/configs/config_test.yaml +++ b/configs/config_test.yaml @@ -45,8 +45,6 @@ wechatNotifyMQ: topic: "notify" groupId: "GID_market_pro" tag: "voucher_notify_dev" - tags: - - voucher_notify_dev isOpenConsumer: false #是否启动消费 true/false registerTagUrl: "https://wpcallbacks.api.1688sup.com/wechatPay/register_tag" noticeStartDays: 7 diff --git a/internal/conf/conf.pb.go b/internal/conf/conf.pb.go index d075922..f400113 100644 --- a/internal/conf/conf.pb.go +++ b/internal/conf/conf.pb.go @@ -587,17 +587,16 @@ type WechatNotifyMQ struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - AccessKeyId string `protobuf:"bytes,1,opt,name=accessKeyId,proto3" json:"accessKeyId,omitempty"` - AccessKeySecret string `protobuf:"bytes,2,opt,name=accessKeySecret,proto3" json:"accessKeySecret,omitempty"` - EndPoint string `protobuf:"bytes,3,opt,name=endPoint,proto3" json:"endPoint,omitempty"` - RegionId string `protobuf:"bytes,4,opt,name=regionId,proto3" json:"regionId,omitempty"` - InstanceId string `protobuf:"bytes,5,opt,name=instanceId,proto3" json:"instanceId,omitempty"` - Topic string `protobuf:"bytes,6,opt,name=topic,proto3" json:"topic,omitempty"` - Tag string `protobuf:"bytes,7,opt,name=tag,proto3" json:"tag,omitempty"` - GroupId string `protobuf:"bytes,8,opt,name=groupId,proto3" json:"groupId,omitempty"` - Tags []string `protobuf:"bytes,9,rep,name=tags,proto3" json:"tags,omitempty"` - RegisterTagUrl string `protobuf:"bytes,10,opt,name=registerTagUrl,proto3" json:"registerTagUrl,omitempty"` - IsOpenConsumer bool `protobuf:"varint,11,opt,name=isOpenConsumer,proto3" json:"isOpenConsumer,omitempty"` + AccessKeyId string `protobuf:"bytes,1,opt,name=accessKeyId,proto3" json:"accessKeyId,omitempty"` + AccessKeySecret string `protobuf:"bytes,2,opt,name=accessKeySecret,proto3" json:"accessKeySecret,omitempty"` + EndPoint string `protobuf:"bytes,3,opt,name=endPoint,proto3" json:"endPoint,omitempty"` + RegionId string `protobuf:"bytes,4,opt,name=regionId,proto3" json:"regionId,omitempty"` + InstanceId string `protobuf:"bytes,5,opt,name=instanceId,proto3" json:"instanceId,omitempty"` + Topic string `protobuf:"bytes,6,opt,name=topic,proto3" json:"topic,omitempty"` + Tag string `protobuf:"bytes,7,opt,name=tag,proto3" json:"tag,omitempty"` + GroupId string `protobuf:"bytes,8,opt,name=groupId,proto3" json:"groupId,omitempty"` + RegisterTagUrl string `protobuf:"bytes,10,opt,name=registerTagUrl,proto3" json:"registerTagUrl,omitempty"` + IsOpenConsumer bool `protobuf:"varint,11,opt,name=isOpenConsumer,proto3" json:"isOpenConsumer,omitempty"` } func (x *WechatNotifyMQ) Reset() { @@ -688,13 +687,6 @@ func (x *WechatNotifyMQ) GetGroupId() string { return "" } -func (x *WechatNotifyMQ) GetTags() []string { - if x != nil { - return x.Tags - } - return nil -} - func (x *WechatNotifyMQ) GetRegisterTagUrl() string { if x != nil { return x.RegisterTagUrl @@ -1371,7 +1363,7 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0f, 0x6e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x44, 0x61, 0x79, 0x73, 0x12, 0x24, 0x0a, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x63, 0x65, 0x45, 0x6e, 0x64, 0x44, 0x61, 0x79, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x6e, - 0x6f, 0x74, 0x69, 0x63, 0x65, 0x45, 0x6e, 0x64, 0x44, 0x61, 0x79, 0x73, 0x22, 0xda, 0x02, 0x0a, + 0x6f, 0x74, 0x69, 0x63, 0x65, 0x45, 0x6e, 0x64, 0x44, 0x61, 0x79, 0x73, 0x22, 0xc6, 0x02, 0x0a, 0x0e, 0x57, 0x65, 0x63, 0x68, 0x61, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4d, 0x51, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, @@ -1387,43 +1379,41 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x67, 0x72, - 0x6f, 0x75, 0x70, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x09, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x12, 0x26, 0x0a, 0x0e, 0x72, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, 0x6c, 0x18, 0x0a, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, - 0x6c, 0x12, 0x26, 0x0a, 0x0e, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x73, 0x75, - 0x6d, 0x65, 0x72, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x69, 0x73, 0x4f, 0x70, 0x65, - 0x6e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x22, 0x73, 0x0a, 0x05, 0x41, 0x6c, 0x61, - 0x72, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x55, 0x52, 0x4c, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x55, - 0x52, 0x4c, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x74, - 0x41, 0x6c, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x74, 0x41, 0x6c, 0x6c, - 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x74, 0x4d, 0x6f, 0x62, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x04, 0x20, - 0x03, 0x28, 0x09, 0x52, 0x09, 0x61, 0x74, 0x4d, 0x6f, 0x62, 0x69, 0x6c, 0x65, 0x73, 0x22, 0x84, - 0x02, 0x0a, 0x04, 0x43, 0x72, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, - 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, - 0x44, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x18, 0x02, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x61, - 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x1a, 0x3e, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x4d, 0x61, 0x70, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x63, - 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x1a, 0x5e, 0x0a, 0x0f, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x4d, 0x61, 0x70, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, - 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, - 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, - 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, - 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, - 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, - 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x6f, 0x75, 0x70, 0x49, 0x64, 0x12, 0x26, 0x0a, 0x0e, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, 0x6c, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x72, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x54, 0x61, 0x67, 0x55, 0x72, 0x6c, 0x12, 0x26, 0x0a, + 0x0e, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, 0x73, 0x75, 0x6d, 0x65, 0x72, 0x18, + 0x0b, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x43, 0x6f, 0x6e, + 0x73, 0x75, 0x6d, 0x65, 0x72, 0x22, 0x73, 0x0a, 0x05, 0x41, 0x6c, 0x61, 0x72, 0x6d, 0x12, 0x1e, + 0x0a, 0x0a, 0x77, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x55, 0x52, 0x4c, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0a, 0x77, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x55, 0x52, 0x4c, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x74, 0x41, 0x6c, 0x6c, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x74, 0x41, 0x6c, 0x6c, 0x12, 0x1c, 0x0a, 0x09, + 0x61, 0x74, 0x4d, 0x6f, 0x62, 0x69, 0x6c, 0x65, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x09, 0x61, 0x74, 0x4d, 0x6f, 0x62, 0x69, 0x6c, 0x65, 0x73, 0x22, 0x84, 0x02, 0x0a, 0x04, 0x43, + 0x72, 0x6f, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x44, 0x0a, 0x0a, 0x63, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x24, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, + 0x70, 0x1a, 0x3e, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x12, + 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x1a, 0x5e, 0x0a, 0x0f, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, + 0x01, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, + 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, + 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, + 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, + 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/internal/conf/conf.proto b/internal/conf/conf.proto index 7ad41fa..707713b 100644 --- a/internal/conf/conf.proto +++ b/internal/conf/conf.proto @@ -98,7 +98,6 @@ message WechatNotifyMQ { string topic = 6; string tag = 7; string groupId = 8; - repeated string tags = 9; string registerTagUrl = 10; bool isOpenConsumer = 11; } diff --git a/internal/pkg/uid/generator_test.go b/internal/pkg/uid/generator_test.go index fd0375f..35c54a2 100644 --- a/internal/pkg/uid/generator_test.go +++ b/internal/pkg/uid/generator_test.go @@ -1,12 +1,10 @@ package uid import ( - "context" "fmt" "github.com/bwmarrin/snowflake" "hash/fnv" "math" - "os" "sync" "testing" @@ -67,22 +65,23 @@ func Test_GenerateNo(t *testing.T) { //t.Log(len(no)) //t.Log(no) - p := os.Getpid() - - t.Log(p) - t.Log(p % 1000) + //p := os.Getpid() + // + //t.Log(p) + //t.Log(p % 1000) //uid := uuid.New().String() //t.Log(len(uid)) //t.Log(uid) - ctx := context.Background() - serverIdStr := GetAppId(ctx) - + //serverIdStr, _ := os.Hostname() + serverIdStr := "lsxddeMac-mini1111111dddddddddddddwffdsf" t.Log(serverIdStr) - t.Log(hashMod(serverIdStr)) - node, err := snowflake.NewNode(1023) + id := hashMod(serverIdStr) + t.Log(id) + + node, err := snowflake.NewNode(int64(id)) if err != nil { t.Error(err) return @@ -90,8 +89,8 @@ func Test_GenerateNo(t *testing.T) { nid := node.Generate().String() - t.Log(len(nid)) t.Log(nid) + t.Log(len(nid)) } func hashMod(hashStr string) int { diff --git a/internal/server/cron.go b/internal/server/cron.go index 7b7ec90..6bf2637 100644 --- a/internal/server/cron.go +++ b/internal/server/cron.go @@ -2,6 +2,7 @@ package server import ( "context" + "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport" "github.com/robfig/cron" @@ -49,7 +50,7 @@ func (cs *CronServer) Stop(_ context.Context) error { if !cs.conf.Cron.IsOpen { return nil } - log.Info("cron 关闭中...") + fmt.Printf("cron 关闭中...") cs.cron.Stop() diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index 0f2fe40..0f1669a 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -6,7 +6,6 @@ import ( mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport" - "github.com/gogap/errors" "strings" "time" "voucher/internal/conf" @@ -57,89 +56,69 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") - // 为每个 tag 启动一个消费协程 - for _, tag := range w.conf.WechatNotifyMQ.Tags { - mqConsumer := client.GetConsumer(instanceId, topic, groupId, tag) - go w.consumeMessages(ctx, mqConsumer, tag) - } + mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag) + w.consumeMessages(mqConsumer) return nil } -// consumeMessages 消费消息的具体逻辑 -func (w *WechatNotifyConsumer) consumeMessages2(ctx context.Context, mqConsumer mqhttpsdk.MQConsumer, tag string) { +func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mqhttpsdk.MQConsumer) { for { - endChan := make(chan int) respChan := make(chan mqhttpsdk.ConsumeMessageResponse) errChan := make(chan error) + go func() { select { case resp := <-respChan: - { - // 处理业务逻辑。 - var handles []string - for _, v := range resp.Messages { - handles = append(handles, v.ReceiptHandle) + var handles []string - log.Warnf("微信回调消费接收消息成功 messageTag:%s, message: %s", v.MessageTag, v.MessageBody) + for _, v := range resp.Messages { + handles = append(handles, v.ReceiptHandle) - if err := w.voucherService.WechatUseNotifyConsumer(ctx, v.MessageTag, v.MessageBody); err != nil { - log.Errorf("微信回调消费处理失败:%+v", err) - } + // 模拟业务逻辑处理 + if err := w.processMessage(v); err != nil { + log.Errorf("Failed to process message %s: %v", v.MessageId, err) + continue } - // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 - // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 - - if len(handles) > 0 { - go func(hs []string) { - ackerr := mqConsumer.AckMessage(hs) - if ackerr != nil { - log.Errorf("消息确认失败: %+v", ackerr) - // 记录失败句柄,后续处理 - if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok { - for _, errAckItem := range errAckItems { - log.Errorf("失败句柄: %s, 错误码: %s, 错误信息: %s", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) - } - } - } else { - log.Warnf("成功确认消息: %d条", len(hs)) - } - }(handles) + // 确认消息消费成功 + if err := mqConsumer.AckMessage([]string{v.ReceiptHandle}); err != nil { + log.Errorf("Ack message %s failed: %v", v.MessageId, err) } - - endChan <- 1 } + case err := <-errChan: - { - // Topic中没有消息可消费。 - if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") { - //fmt.Println("\nNo new message, continue!") - } else { - log.Errorf("\tTopic中没有消息可消费判定报错:%v\n", err) - time.Sleep(time.Duration(3) * time.Second) - } - endChan <- 1 + if strings.Contains(err.Error(), "MessageNotExist") { + fmt.Println("No new messages available.") + } else { + log.Errorf("Error occurred: %v", err) } + case <-time.After(35 * time.Second): - { - fmt.Println("Timeout of consumer message ??") - log.Errorf("消息处理超时,需要续期可见性") - endChan <- 1 - } + log.Errorf("Timeout of consumer message.") } }() - // 长轮询消费消息,网络超时时间默认为35s。 - // 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回响应。 - mqConsumer.ConsumeMessage(respChan, errChan, - 3, // 一次最多消费3条(最多可设置为16条)。 - 10, // 长轮询时间3s(最多可设置为30s)。 - ) - <-endChan + // 长轮询消费消息,每次最多拉取 3 条消息,超时时间为 30 秒 + mqConsumer.ConsumeMessage(respChan, errChan, 3, 30) + + // 避免频繁轮询,增加适当的间隔 + time.Sleep(2 * time.Second) } } +// 模拟业务逻辑处理 +func (w *WechatNotifyConsumer) processMessage(msg mqhttpsdk.ConsumeMessageEntry) error { + log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) + + //ctx := context.Background() + //if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { + // log.Errorf("微信回调消费处理失败:%+v", err) + //} + + return nil +} + // Stop 停止消息消费 func (w *WechatNotifyConsumer) Stop(_ context.Context) error { fmt.Println("关闭 wechat consumer 中...")