From 141682065ff17786b98135ecfdf981ba162a4f22 Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Thu, 16 Jan 2025 15:45:22 +0800 Subject: [PATCH] init --- .env.example | 62 + .gitignore | 4 + README.md | 22 + app/caches/bannerlistcache/banner_list.go | 34 + .../bannerlistcache/banner_list_test.go | 59 + app/caches/cache_key.go | 8 + app/console/command.go | 9 + app/console/kernel.go | 15 + app/console/test.go | 7 + app/constants/common/common.go | 14 + app/constants/errorcode/error_code.go | 37 + app/constants/logtype/log_type.go | 7 + app/handlers/event/user.go | 17 + app/handlers/im/im_handler.go | 34 + app/handlers/im/im_logic.go | 218 +++ app/handlers/mq/hanlers.go | 12 + app/handlers/mq/quenue.go | 34 + app/http/controllers/base.go | 92 ++ app/http/controllers/test.go | 113 ++ app/http/entities/order.go | 7 + app/http/entities/test.go | 43 + app/http/formatters/bannerformatter/banner.go | 34 + .../formatters/bannerformatter/banner_test.go | 45 + app/http/metric/metric.go | 77 + app/http/middlewares/metric.go | 22 + app/http/middlewares/server_recovery.go | 50 + app/http/middlewares/tracer.go | 55 + app/http/routes/route.go | 55 + app/http/tcppool/pool.go | 106 ++ app/http/tcppool/single.go | 103 ++ app/http/trace/trace.go | 42 + app/jobs/basejob/base_job.go | 66 + app/jobs/kernel.go | 56 + app/jobs/test.go | 24 + app/models/bannermodel/banner.go | 57 + app/models/bannermodel/banner_test.go | 54 + app/services/bannerservice/banner.go | 25 + app/utils/.gitkeep | 0 app/utils/httpclient/httpclient.go | 190 +++ app/utils/metric/reporter.go | 177 ++ app/utils/mq/common/imq.go | 6 + app/utils/mq/kafka.go | 95 ++ app/utils/mq/kafka_v2.go | 114 ++ app/utils/mq/mqmanager.go | 28 + app/utils/util.go | 15 + bootstrap/bootstrap.go | 63 + build/bin/.gitignore | 2 + build/shell/build.sh | 9 + config/config.go | 78 + config/option.go | 45 + docs/docs.go | 48 + go.mod | 107 ++ go.sum | 1436 +++++++++++++++++ logs/.gitignore | 2 + main.go | 146 ++ 55 files changed, 4280 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 app/caches/bannerlistcache/banner_list.go create mode 100644 app/caches/bannerlistcache/banner_list_test.go create mode 100644 app/caches/cache_key.go create mode 100644 app/console/command.go create mode 100644 app/console/kernel.go create mode 100644 app/console/test.go create mode 100644 app/constants/common/common.go create mode 100644 app/constants/errorcode/error_code.go create mode 100644 app/constants/logtype/log_type.go create mode 100644 app/handlers/event/user.go create mode 100644 app/handlers/im/im_handler.go create mode 100644 app/handlers/im/im_logic.go create mode 100644 app/handlers/mq/hanlers.go create mode 100644 app/handlers/mq/quenue.go create mode 100644 app/http/controllers/base.go create mode 100644 app/http/controllers/test.go create mode 100644 app/http/entities/order.go create mode 100644 app/http/entities/test.go create mode 100644 app/http/formatters/bannerformatter/banner.go create mode 100644 app/http/formatters/bannerformatter/banner_test.go create mode 100644 app/http/metric/metric.go create mode 100644 app/http/middlewares/metric.go create mode 100644 app/http/middlewares/server_recovery.go create mode 100644 app/http/middlewares/tracer.go create mode 100644 app/http/routes/route.go create mode 100644 app/http/tcppool/pool.go create mode 100644 app/http/tcppool/single.go create mode 100644 app/http/trace/trace.go create mode 100644 app/jobs/basejob/base_job.go create mode 100644 app/jobs/kernel.go create mode 100644 app/jobs/test.go create mode 100644 app/models/bannermodel/banner.go create mode 100644 app/models/bannermodel/banner_test.go create mode 100644 app/services/bannerservice/banner.go create mode 100644 app/utils/.gitkeep create mode 100644 app/utils/httpclient/httpclient.go create mode 100644 app/utils/metric/reporter.go create mode 100644 app/utils/mq/common/imq.go create mode 100644 app/utils/mq/kafka.go create mode 100644 app/utils/mq/kafka_v2.go create mode 100644 app/utils/mq/mqmanager.go create mode 100644 app/utils/util.go create mode 100644 bootstrap/bootstrap.go create mode 100644 build/bin/.gitignore create mode 100644 build/shell/build.sh create mode 100644 config/config.go create mode 100644 config/option.go create mode 100644 docs/docs.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 logs/.gitignore create mode 100644 main.go diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..8b93b8c --- /dev/null +++ b/.env.example @@ -0,0 +1,62 @@ +# toml配置文件 +# Wiki:https://github.com/toml-lang/toml +ServiceName = "snow" +Debug = true +Env = "local" # local-本地 develop-开发 beta-预发布 production-线上 +PrometheusCollectEnable = true +SkyWalkingOapServer = "127.0.0.1:11800" + +[Log] +Handler = "file" +Dir = "./logs" +Level = "info" + +[Db] +Driver = "mysql" + +[Db.Option] +MaxConns = 128 +MaxIdle = 32 +IdleTimeout = 180 # second +Charset = "utf8mb4" +ConnectTimeout = 3 # second + +[Db.Master] +Host = "127.0.0.1" +Port = 3306 +User = "root" +Password = "123456" +DBName = "test" + +[[Db.Slaves]] # 支持多个从库 +Host = "127.0.0.1" +Port = 3306 +User = "root" +Password = "123456" +DBName = "test" + +[Api] +Host = "0.0.0.0" +Port = 8080 + +[Cache] +Driver = "redis" + +[Redis.Master] +Host = "127.0.0.1" +Port = 6379 +#Password = "" +#DB = 0 + +#[Redis.Option] +#MaxIdle = 64 +#MaxConns = 256 +#IdleTimeout = 180 # second +#ConnectTimeout = 1 +#ReadTimeout = 1 +#WriteTimeout = 1 + +[AliMns] +Url = "" +AccessKeyId = "" +AccessKeySecret = "" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c7dbf37 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/.idea +/vendor +/.env +!/.env.example diff --git a/README.md b/README.md new file mode 100644 index 0000000..82b5fa4 --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +## Snow +Snow是一套简单易用的Go语言业务框架,整体逻辑设计简洁,支持HTTP服务、队列调度和任务调度等常用业务场景模式。 + +## Quick start + +### Build +sh build/shell/build.sh + +### Run +```shell +1. build/bin/snow -a api #启动Api服务 +2. build/bin/snow -a cron #启动Cron定时任务服务 +3. build/bin/snow -a job #启动队列调度服务 +4. build/bin/snow -a command -m test #执行名称为test的脚本任务 +``` + +## Documents + +- [项目地址](https://github.com/qit-team/snow) +- [中文文档](https://github.com/qit-team/snow/wiki) +- [changelog](https://github.com/qit-team/snow/blob/master/CHANGLOG.md) +- [xorm](http://gobook.io/read/github.com/go-xorm/manual-zh-CN/) diff --git a/app/caches/bannerlistcache/banner_list.go b/app/caches/bannerlistcache/banner_list.go new file mode 100644 index 0000000..594050a --- /dev/null +++ b/app/caches/bannerlistcache/banner_list.go @@ -0,0 +1,34 @@ +package bannerlistcache + +import ( + "sync" + + "quenue/app/caches" + + "github.com/qit-team/snow-core/cache" +) + +const ( + prefix = caches.BannerList //缓存key的前缀 +) + +var ( + instance *bannerListCache + once sync.Once +) + +type bannerListCache struct { + cache.BaseCache +} + +//单例模式 +func GetInstance() *bannerListCache { + once.Do(func() { + instance = new(bannerListCache) + instance.Prefix = prefix + //instance.DiName = redis.SingletonMain //设置缓存依赖的实例别名 + //instance.DriverType = cache.DriverTypeRedis //设置缓存驱动的类型,默认redis + //instance.SeTTL(86400) 设置默认缓存时间 默认86400 + }) + return instance +} diff --git a/app/caches/bannerlistcache/banner_list_test.go b/app/caches/bannerlistcache/banner_list_test.go new file mode 100644 index 0000000..d37b6fa --- /dev/null +++ b/app/caches/bannerlistcache/banner_list_test.go @@ -0,0 +1,59 @@ +package bannerlistcache + +import ( + "context" + "fmt" + "testing" + + "quenue/config" + + "github.com/qit-team/snow-core/cache" + _ "github.com/qit-team/snow-core/cache/rediscache" + "github.com/qit-team/snow-core/redis" +) + +func init() { + //加载配置文件 + conf, err := config.Load("../../../.env") + if err != nil { + fmt.Println(err) + } + + //注册redis类 + err = redis.Pr.Register(cache.DefaultDiName, conf.Redis) + if err != nil { + fmt.Println(err) + } +} + +func Test_GetMulti(t *testing.T) { + ctx := context.TODO() + cache := GetInstance() + res, _ := cache.Set(ctx, "1000", "a") + if res != true { + t.Errorf("set key:%s is error", "1000") + } + + keys := []string{"1000", "-2000", "9999"} + cacheList, err := cache.GetMulti(ctx, keys...) + if err != nil { + t.Errorf("getMulti error:%s", err.Error()) + } + fmt.Println(cacheList) + i := 0 + for k, v := range cacheList { + i++ + if k == "1000" { + if v != "a" { + t.Errorf("value of key:%s is error %v", k, v) + } + } else { + if v != "" { + t.Errorf("value of key:%s is error %v", k, v) + } + } + } + if i != len(keys) { + t.Errorf("count of cache key is error: %d", i) + } +} diff --git a/app/caches/cache_key.go b/app/caches/cache_key.go new file mode 100644 index 0000000..560c59e --- /dev/null +++ b/app/caches/cache_key.go @@ -0,0 +1,8 @@ +package caches + +//缓存前缀key,不同的业务使用不同的前缀,避免了业务之间的重用冲突 +const ( + Cookie = "ck:" + Copy = "cp:" + BannerList = "bl:" +) diff --git a/app/console/command.go b/app/console/command.go new file mode 100644 index 0000000..e312c14 --- /dev/null +++ b/app/console/command.go @@ -0,0 +1,9 @@ +package console + +import ( + "github.com/qit-team/snow-core/command" +) + +func RegisterCommand(c *command.Command) { + c.AddFunc("test", test) +} diff --git a/app/console/kernel.go b/app/console/kernel.go new file mode 100644 index 0000000..23f081d --- /dev/null +++ b/app/console/kernel.go @@ -0,0 +1,15 @@ +package console + +import ( + "github.com/robfig/cron" +) + +/** + * 配置执行计划 + * @wiki https://godoc.org/github.com/robfig/cron + */ +func RegisterSchedule(c *cron.Cron) { + //c.AddFunc("0 30 * * * *", test) + //c.AddFunc("@hourly", test) + c.AddFunc("@every 10s", test) +} diff --git a/app/console/test.go b/app/console/test.go new file mode 100644 index 0000000..1319e62 --- /dev/null +++ b/app/console/test.go @@ -0,0 +1,7 @@ +package console + +import "fmt" + +func test() { + fmt.Println("run test") +} diff --git a/app/constants/common/common.go b/app/constants/common/common.go new file mode 100644 index 0000000..4c0e0dd --- /dev/null +++ b/app/constants/common/common.go @@ -0,0 +1,14 @@ +package common + +const ( + + //IM + MQ_NATS = "nats" + MQ_RABBIT = "rabbitmq" + MQ_NSQ = "nsq" + MQ_KFK = "kafka" + MQ_KFK_V2 = "kafka_v2" + + //QUNUE + ORDER_RESEND_TOPICAL = "platform_all" +) diff --git a/app/constants/errorcode/error_code.go b/app/constants/errorcode/error_code.go new file mode 100644 index 0000000..b68da4a --- /dev/null +++ b/app/constants/errorcode/error_code.go @@ -0,0 +1,37 @@ +package errorcode + +const ( + //成功 + Success = 200 + + //参数错误 + ParamError = 400 + + //未经授权 + NotAuth = 401 + + //请求被禁止 + Forbidden = 403 + + //找不到页面 + NotFound = 404 + + //系统错误 + SystemError = 500 +) + +var MsgEN = map[int]string{ + Success: "success", + ParamError: "param error", + NotAuth: "not authorized", + Forbidden: "forbidden", + NotFound: "not found", + SystemError: "system error", +} + +func GetMsg(code int) string { + if msg, ok := MsgEN[code]; ok { + return msg + } + return "" +} diff --git a/app/constants/logtype/log_type.go b/app/constants/logtype/log_type.go new file mode 100644 index 0000000..02852f2 --- /dev/null +++ b/app/constants/logtype/log_type.go @@ -0,0 +1,7 @@ +package logtype + +const ( + Message = "message" + GoPanic = "go.panic" + HTTP = "http" +) diff --git a/app/handlers/event/user.go b/app/handlers/event/user.go new file mode 100644 index 0000000..adf9518 --- /dev/null +++ b/app/handlers/event/user.go @@ -0,0 +1,17 @@ +package event + +import ( + "encoding/json" + common2 "snow-im/app/constants/common" + "snow-im/app/utils" + "snow-im/app/utils/common" +) + +type RegisterCacheEventHandler struct { +} + +func (this *RegisterCacheEventHandler) Hand(param map[string]interface{}) error { + var paramData,_ = json.Marshal(param) + common.CacheHelper.Cache(utils.GetRealKey(common2.USER)+param["id"].(string),string(paramData),0,true) + return nil +} \ No newline at end of file diff --git a/app/handlers/im/im_handler.go b/app/handlers/im/im_handler.go new file mode 100644 index 0000000..6eb68d8 --- /dev/null +++ b/app/handlers/im/im_handler.go @@ -0,0 +1,34 @@ +package im + +import ( + "github.com/go-netty/go-netty" + "snow-im/app/utils" + "snow-im/app/utils/netool" +) + +var ImLogic ImHandler + +func Init() { + ImLogic = ImHandler{ + ImRouter, + } +} + +type ImHandler struct { + netool.Router +} + +//消息处理 +func (this *ImHandler) HandMessage(message []byte, connect netty.InboundContext) error { + defer func() { + if err := recover();err!=nil{ + utils.Log(nil,"hand message",err) + } + }() + var msgId, msg = utils.UnpackMessage(message) + err := this.Router.HandleMsg(msgId, msg, connect) + if err != nil{ + utils.Log(nil,"message err",err) + } + return err +} diff --git a/app/handlers/im/im_logic.go b/app/handlers/im/im_logic.go new file mode 100644 index 0000000..6b3cdfd --- /dev/null +++ b/app/handlers/im/im_logic.go @@ -0,0 +1,218 @@ +package im + +import ( + "encoding/json" + "github.com/go-netty/go-netty" + redis2 "github.com/go-redis/redis" + "github.com/qit-team/snow-core/redis" + "go.mongodb.org/mongo-driver/bson" + "snow-im/app/constants/common" + "snow-im/app/constants/errorcode" + "snow-im/app/constants/msgid" + "snow-im/app/http/controllers" + "snow-im/app/http/entities" + "snow-im/app/models" + "snow-im/app/utils" + common2 "snow-im/app/utils/common" + "snow-im/app/utils/mq" + "snow-im/app/utils/netool" + "snow-im/app/utils/serialize" + "snow-im/config" + "strconv" + "strings" + "time" +) + +var ImRouter netool.Router + +func init() { + ImRouter = netool.Router{} + ImRouter.AddRouter(msgid.SINGLE_MSG, SendMsgtoUser) + ImRouter.AddRouter(msgid.GROUP_MSG, SendMsgtoGroup) + ImRouter.AddRouter(msgid.MULT_MSG, SendMsgtoMultUser) + ImRouter.AddRouter(msgid.HEART_BEAT, HeartBeat) + ImRouter.AddRouter(msgid.AUTH, Auth) + ImRouter.AddRouter(msgid.ACK_MSG, Ack) +} + +func getRsp(err error) entities.ImRsp { + var rsp entities.ImRsp + if err == nil { + rsp = entities.ImRsp{ + Code: errorcode.Success, + Msg: errorcode.GetMsg(errorcode.Success, ""), + } + } else { + rsp = entities.ImRsp{ + Code: errorcode.SystemError, + Msg: errorcode.GetMsg(errorcode.SystemError, ""), + } + utils.Log(nil, "err", err) + } + return rsp +} + +//验证 +func Auth(msg []byte, connect netty.InboundContext) error { + var authReq entities.AuthReq + err := serialize.SerializeTool.UnSerialize(config.GetConf().Serialize,msg,&authReq) + if err == nil { + err = controllers.Validate(authReq) + if err == nil { + uid, _ := redis.GetRedis().Get(utils.GetRealKey(common.TOKEN_PRE) + authReq.AppId + ":" + authReq.Token) + if uid != "" { + netool.GetConnManagger().SaveConnection(uid, connect.Channel()) + } + } + } + return err +} + +//单聊 +func SendMsgtoUser(msg []byte, connect netty.InboundContext) error { + var singReq entities.SingTalkReq + utils.Log(nil, string(msg)) + err := serialize.SerializeTool.UnSerialize(config.GetConf().Serialize,msg,&singReq) + if err == nil { + err = controllers.Validate(singReq) + if err == nil { + singReq.Id = utils.GenUniqId(singReq.SenderId) + singReq.Timestamp = time.Now().Unix() + var uid, _ = strconv.Atoi(singReq.To) + var msgFrom = common2.CacheHelper.GetCache(utils.GetRealKey(common.USER)+singReq.SenderId,false).(map[string]interface{}) + singReq.Avatar = msgFrom["avatar"].(string) + singReq.SenderName = msgFrom["nick_name"].(string) + utils.Log(nil, "err", utils.GetUnderLineKey(common.SINGLE_TALK)+strconv.Itoa(uid%config.GetConf().Im.ImworkNum)) + err = rabbitmq.MqManager.GetMqByName(common.MQ_KFK).Produce(utils.GetUnderLineKey(common.SINGLE_TALK)+strconv.Itoa(uid%config.GetConf().Im.ImworkNum), singReq, 0) + //生成会话 + var msgData,_ = json.Marshal(singReq) + if err == nil { + var msg = models.Msg{} + msg.ParseFromParam(singReq.Msg) + msg.Conversion = utils.SortKeys([]string{singReq.SenderId,singReq.To}) + msg.SaveMsg() + if singReq.RoomId == ""{ + //优化管道操作 + var pipline = common2.PikaTool.GetPipe() + var to = redis2.Z{ + Score: float64(singReq.Timestamp), + Member: singReq.SenderId, + } + var from = redis2.Z{ + Score: float64(singReq.Timestamp), + Member: singReq.SenderId, + } + pipline.ZAdd(utils.GetRealKey(common.CONVERSION)+singReq.To, to,from) + var msgFrom = common2.CacheHelper.GetCache(utils.GetRealKey(common.USER)+singReq.To,false).(map[string]interface{}) + singReq.Avatar = msgFrom["avatar"].(string) + singReq.SenderName = msgFrom["nick_name"].(string) + msgDataFrom,_ := json.Marshal(singReq) + pipline.MSet(utils.GetRealKey(common.CONVERSION)+singReq.To+":"+singReq.SenderId, msgData,utils.GetRealKey(common.CONVERSION)+singReq.SenderId+":"+singReq.To, msgDataFrom) + pipline.Incr(utils.GetRealKey(common.CONVERSION)+singReq.To+":"+singReq.SenderId+":num") + pipline.Incr(utils.GetRealKey(common.CONVERSION)+singReq.SenderId+":"+singReq.To+":num") + pipline.Exec() + + } + + } + } + } + return err +} + +//群聊 +func SendMsgtoGroup(msg []byte, connect netty.InboundContext) error { + var singReq entities.SingTalkReq + err := serialize.SerializeTool.UnSerialize(config.GetConf().Serialize,msg,&singReq) + if err == nil { + err = controllers.Validate(singReq) + if err == nil { + singReq.Id = utils.GenUniqId(singReq.SenderId) + singReq.Timestamp = time.Now().Unix() + var userInfo map[string]interface{} + var rs = common2.CacheHelper.GetCache(utils.GetRealKey(common.USER)+singReq.SenderId,false) + userInfo = rs.(map[string]interface{}) + var room = models.Room{} + room = room.GetRoom(bson.M{"_id":singReq.RoomId}) + singReq.SenderName = userInfo["nick_name"].(string) + singReq.Avatar = userInfo["avatar"].(string) + singReq.RoomAvatar = room.Avatar + singReq.RoomName = room.Name + var uid, _ = strconv.Atoi(singReq.RoomId) + err = rabbitmq.MqManager.GetMqByName(common.MQ_KFK).Produce(utils.GetUnderLineKey(common.GROUP_TALK)+strconv.Itoa(uid%config.GetConf().Im.ImGroupNum), singReq, 0) + if err == nil { + var msg = models.Msg{} + msg.ParseFromParam(singReq.Msg) + msg.Conversion = singReq.RoomId + msg.SaveMsg() + var pipie = common2.PikaTool.GetPipe() + var roomMembers = common2.PikaTool.SetGet(utils.GetRealKey(common.ROOM) + singReq.RoomId) + for _,v := range roomMembers{ + var msgData,_ = json.Marshal(singReq) + var member =redis2.Z{ + Member:singReq.RoomId, + Score:float64(singReq.Timestamp), + } + pipie.ZAdd(utils.GetRealKey(common.CONVERSION)+v, member) + pipie.Set(utils.GetRealKey(common.CONVERSION)+v+":"+singReq.RoomId,msgData, 0) + } + _,err = pipie.Exec() + } + } + } + return err +} + +//组聊 +func SendMsgtoMultUser(msg []byte, connect netty.InboundContext) error { + var singReq entities.SingTalkReq + err := serialize.SerializeTool.UnSerialize(config.GetConf().Serialize,msg,&singReq) + if err == nil { + err = controllers.Validate(singReq) + if err == nil { + singReq.Id = utils.GenUniqId(singReq.SenderId) + var userInfo map[string]string + json.Unmarshal([]byte(common2.PikaTool.HashGet(utils.GetRealKey("user_info"), singReq.SenderId)), &userInfo) + singReq.SenderName = userInfo["nick_name"] + singReq.Avatar = userInfo["avatar"] + var uids = strings.Split(singReq.To, ",") + var pipie = common2.PikaTool.GetPipe() + for _, v := range uids { + var uid, _ = strconv.Atoi(v) + singReq.To = v + err = rabbitmq.MqManager.GetMqByName(common.MQ_KFK).Produce(utils.GetUnderLineKey(common.SINGLE_TALK)+strconv.Itoa(uid%config.GetConf().Im.ImworkNum), singReq, 0) + //生成会话 + if err == nil { + var member =redis2.Z{ + Member:singReq.RoomId, + Score:float64(singReq.Timestamp), + } + pipie.ZAdd(utils.GetRealKey(common.CONVERSION)+v, member) + pipie.Set(utils.GetRealKey(common.CONVERSION)+singReq.To+":"+singReq.RoomId, msg,0) + } + _,err = pipie.Exec() + } + } + } + return err +} + +//消息ack +func Ack(msg []byte, connect netty.InboundContext) error { + var req entities.MsgId + err := json.Unmarshal(msg, &req) + if err == nil { + + } + return err +} + +//心跳 +func HeartBeat(msg []byte, connect netty.InboundContext) error { + if connect.Channel().IsActive() { + netool.GetConnManagger().SetBeat(connect.Channel().ID()) + connect.Write(utils.PackMsg([]byte("Ping"), msgid.HEART_BEAT)) + } + + return nil +} diff --git a/app/handlers/mq/hanlers.go b/app/handlers/mq/hanlers.go new file mode 100644 index 0000000..4a06d52 --- /dev/null +++ b/app/handlers/mq/hanlers.go @@ -0,0 +1,12 @@ +package mq + +import ( + "quenue/app/http/tcppool" + "quenue/app/utils" +) + +// 调用充值 +func OrderCharge(tag uint64, ch interface{}, msg []byte) error { + utils.Log(nil, "消息", tag, ch, string(msg)) + return tcppool.TcpFactory.SendMsg(msg) +} diff --git a/app/handlers/mq/quenue.go b/app/handlers/mq/quenue.go new file mode 100644 index 0000000..99374f7 --- /dev/null +++ b/app/handlers/mq/quenue.go @@ -0,0 +1,34 @@ +package mq + +import ( + "fmt" + "quenue/app/constants/common" + mqs "quenue/app/utils/mq" + "quenue/config" + "strconv" +) + +func init() { + //初始化工作队列 + //utils.JobQueue = make(chan utils.Job, utils.MaxQueue) +} +func startQunue(name string, method interface{}, mqTp string, tp int, exhange string, i int) { + if tp == 0 { + go mqs.MqManager.GetMqByName(mqTp).Consume(name, method, i) + } else { + //go rabbitmq.DelayConsume(name, exhange, method.(func(tag uint64, ch *amqp.Channel, msg []byte))) + } + +} + +// 普通队列 +func StartQunueServer() error { + if config.GetConf().StartQunue == 1 { + for i := 0; i < 1; i++ { + fmt.Println("对列" + strconv.Itoa(i)) + startQunue(common.ORDER_RESEND_TOPICAL, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊 + } + } + select {} + return nil +} diff --git a/app/http/controllers/base.go b/app/http/controllers/base.go new file mode 100644 index 0000000..0b92500 --- /dev/null +++ b/app/http/controllers/base.go @@ -0,0 +1,92 @@ +package controllers + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + + "quenue/app/constants/errorcode" + + "github.com/gin-gonic/gin" + "github.com/qit-team/snow-core/log/logger" + "gopkg.in/go-playground/validator.v9" +) + +/** + * 成功时返回 + */ +func Success(c *gin.Context, data interface{}) { + c.JSON(http.StatusOK, gin.H{ + "code": errorcode.Success, + "message": "ok", + "request_uri": c.Request.URL.Path, + "data": data, + }) + c.Abort() +} + +/** + * 失败时返回 + */ +func Error(c *gin.Context, code int, msg ...string) { + message := "" + if len(msg) > 0 { + message = msg[0] + } else { + message = errorcode.GetMsg(code) + } + + c.JSON(http.StatusOK, gin.H{ + "code": code, + "message": message, + "request_uri": c.Request.URL.Path, + "data": make(map[string]string), + }) + c.Abort() +} + +func Error404(c *gin.Context) { + Error(c, errorcode.NotFound, "路由不存在") +} + +func Error500(c *gin.Context) { + Error(c, errorcode.SystemError) +} + +type HTTPError struct { + Code int `json:"code" example:"400"` + Message string `json:"message" example:"status bad request"` +} + +/** + * 将请求的body转换为request数据结构 + * @param c + * @param request 传入request数据结构的指针 如 new(TestRequest) + */ +func GenRequest(c *gin.Context, request interface{}) (err error) { + body, err := ReadBody(c) + if err != nil { + return + } + err = json.Unmarshal(body, request) + if (err == nil) { + validate := validator.New() + errValidate := validate.Struct(request) + if errValidate != nil { + logger.Error(c, "param_validator_exception:" + c.Request.URL.Path, errValidate) + return errValidate + } + } + return err +} + +//重复读取body +func ReadBody(c *gin.Context) (body []byte, err error) { + body, err = ioutil.ReadAll(c.Request.Body) + if err != nil { + return + } + c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + return +} diff --git a/app/http/controllers/test.go b/app/http/controllers/test.go new file mode 100644 index 0000000..42868a9 --- /dev/null +++ b/app/http/controllers/test.go @@ -0,0 +1,113 @@ +package controllers + +import ( + "strconv" + "time" + + "quenue/app/constants/errorcode" + "quenue/app/http/entities" + "quenue/app/http/formatters/bannerformatter" + "quenue/app/services/bannerservice" + "quenue/app/utils/httpclient" + + "github.com/gin-gonic/gin" + "github.com/qit-team/snow-core/log/logger" +) + +// hello示例 +func HandleHello(c *gin.Context) { + logger.Debug(c, "hello", "test message") + client := httpclient.NewClient(c.Request.Context()) + resposne, err := client.R().Get("https://www.baidu.com") + if err != nil { + Error(c, errorcode.SystemError, err.Error()) + return + } + logger.Info(c, "HandleHello", resposne.String()) + Success(c, "hello world!") + return +} + +// request和response的示例 +// HandleTest godoc +// @Summary request和response的示例 +// @Description request和response的示例 +// @Tags snow +// @Accept json +// @Produce json +// @Param test body entities.TestRequest true "test request" +// @Success 200 {array} entities.TestResponse +// @Failure 400 {object} controllers.HTTPError +// @Failure 404 {object} controllers.HTTPError +// @Failure 500 {object} controllers.HTTPError +// @Router /test [post] +func HandleTest(c *gin.Context) { + request := new(entities.TestRequest) + err := GenRequest(c, request) + if err != nil { + Error(c, errorcode.ParamError) + return + } + + response := new(entities.TestResponse) + response.Name = request.Name + response.Url = request.Url + response.Id = time.Now().Unix() + Success(c, response) + return +} + +// 测试数据库服务示例 +func GetBannerList(c *gin.Context) { + pageStr := c.Query("page") + limitStr := c.DefaultQuery("limit", "20") + + page, _ := strconv.Atoi(pageStr) + if page <= 0 { + page = 1 + } + + limit, _ := strconv.Atoi(limitStr) + if limit <= 0 { + limit = 20 + } + + list, err := bannerservice.GetListByPid(1, limit, page) + if err != nil { + Error500(c) + return + } + + data := map[string]interface{}{ + "page": page, + "limit": limit, + "data": bannerformatter.FormatList(list), + } + + Success(c, data) +} + +// validator的示例 +// HandleTestValidator godoc +// @Summary HandleTestValidator的示例 +// @Description HandleTestValidator的示例 +// @Tags snow +// @Accept json +// @Produce json +// @Param testValidator body entities.TestValidatorRequest true "example of validator" +// @Success 200 {array} entities.TestValidatorRequest +// @Failure 400 {object} controllers.HTTPError +// @Failure 404 {object} controllers.HTTPError +// @Failure 500 {object} controllers.HTTPError +// @Router /test_validator [post] +func HandleTestValidator(c *gin.Context) { + request := new(entities.TestValidatorRequest) + err := GenRequest(c, request) + if err != nil { + Error(c, errorcode.ParamError) + return + } + + Success(c, request) + return +} diff --git a/app/http/entities/order.go b/app/http/entities/order.go new file mode 100644 index 0000000..dd680ff --- /dev/null +++ b/app/http/entities/order.go @@ -0,0 +1,7 @@ +package entities + +type MqMessage struct { + Body map[string]interface{} `json:"body"` + Property map[string]interface{} `json:"property"` + Key string `json:"serial_number"` +} diff --git a/app/http/entities/test.go b/app/http/entities/test.go new file mode 100644 index 0000000..dd94d86 --- /dev/null +++ b/app/http/entities/test.go @@ -0,0 +1,43 @@ +package entities + +//请求数据结构 +type TestRequest struct { + Name string `json:"name" example:"snow"` + Url string `json:"url" example:"github.com/qit-team/snow"` +} + +//返回数据结构 +type TestResponse struct { + Id int64 `json:"id" example:"1"` + Name string `json:"name" example:"snow"` + Url string `json:"url" example:"github.com/qit-team/snow"` +} + +/* + * validator.v9文档 + * 地址https://godoc.org/gopkg.in/go-playground/validator.v9 + * 列了几个大家可能会用到的,如有遗漏,请看上面文档 + */ + +//请求数据结构 +type TestValidatorRequest struct { + //tips,因为组件required不管是没传值或者传 0 or "" 都通过不了,但是如果用指针类型,那么0就是0,而nil无法通过校验 + Id *int64 `json:"id" validate:"required" example:"1"` + Age int `json:"age" validate:"required,gte=0,lte=130" example:"20"` + Name *string `json:"name" validate:"required" example:"snow"` + Email string `json:"email" validate:"required,email" example:"snow@github.com"` + Url string `json:"url" validate:"required" example:"github.com/qit-team/snow"` + Mobile string `json:"mobile" validate:"required" example:"snow"` + RangeNum int `json:"range_num" validate:"max=10,min=1" example:"3"` + TestNum *int `json:"test_num" validate:"required,oneof=5 7 9" example:"7"` + Content *string `json:"content" example:"snow"` + Addresses []*Address `json:"addresses" validate:"required,dive,required" ` +} + +// Address houses a users address information +type Address struct { + Street string `json:"street" validate:"required" example:"huandaodonglu"` + City string `json:"city" validate:"required" example:"xiamen"` + Planet string `json:"planet" validate:"required" example:"snow"` + Phone string `json:"phone" validate:"required" example:"snow"` +} \ No newline at end of file diff --git a/app/http/formatters/bannerformatter/banner.go b/app/http/formatters/bannerformatter/banner.go new file mode 100644 index 0000000..7c89d6e --- /dev/null +++ b/app/http/formatters/bannerformatter/banner.go @@ -0,0 +1,34 @@ +package bannerformatter + +import ( + "quenue/app/models/bannermodel" +) + +type BannerFormatter struct { + Id int `json:"id"` + Title string `json:"title"` + Img string `json:"image"` + Url string `json:"url"` +} + +func FormatList(bannerList []*bannermodel.Banner) (res []*BannerFormatter) { + res = make([]*BannerFormatter, len(bannerList)) + + for k, banner := range bannerList { + one := FormatOne(banner) + res[k] = one + } + + return res +} + +//单条消息的格式化, +func FormatOne(banner *bannermodel.Banner) (res *BannerFormatter) { + res = &BannerFormatter{ + Id: int(banner.Id), + Title: banner.Title, + Img: banner.ImageUrl, + Url: banner.Url, + } + return +} \ No newline at end of file diff --git a/app/http/formatters/bannerformatter/banner_test.go b/app/http/formatters/bannerformatter/banner_test.go new file mode 100644 index 0000000..507f64c --- /dev/null +++ b/app/http/formatters/bannerformatter/banner_test.go @@ -0,0 +1,45 @@ +package bannerformatter + +import ( + "testing" + + "quenue/app/models/bannermodel" +) + +func TesFormatOne(t *testing.T) { + a := &bannermodel.Banner{ + Id: 1, + Title: "test", + ImageUrl: "http://x/1.jpg", + Url: "http://x", + Status: "1", + } + b := FormatOne(a) + if b.Title != a.Title || b.Img != a.ImageUrl || b.Url != a.Url { + t.Error("FormatOne not same") + } +} + +func TesFormatList(t *testing.T) { + a := make([]*bannermodel.Banner, 2) + a[0] = &bannermodel.Banner{ + Id: 1, + Title: "test", + ImageUrl: "http://x1/1.jpg", + Url: "http://x1", + Status: "1", + } + a[1] = &bannermodel.Banner{ + Id: 2, + Title: "test2", + ImageUrl: "http://x/2.jpg", + Url: "http://x2", + Status: "2", + } + b := FormatList(a) + for k, v := range b { + if v.Title != a[k].Title || v.Img != a[k].ImageUrl || v.Url != a[k].Url { + t.Error("FormatList not same") + } + } +} diff --git a/app/http/metric/metric.go b/app/http/metric/metric.go new file mode 100644 index 0000000..588d7bd --- /dev/null +++ b/app/http/metric/metric.go @@ -0,0 +1,77 @@ +package metric + +import ( + "net/http" + + "quenue/app/utils/metric" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + HOST = "host" + PATH = "path" // 路径 + METHOD = "method" // 方法 + CODE = "code" // 错误码 + + // metric + ALL_REQ_TOTAL_COUNT = "all_req_total_count" // 所有URL总请求数 + ALL_REQ_COST_TIME = "all_req_cost_time" // 所有URL请求耗时 + + REQ_TOTAL_COUNT = "req_total_count" // 每个URL总请求数 + REQ_COST_TIME = "req_cost_time" // 每个URL请求耗时 +) + +func init() { + metric.RegisterCollector(reqTotalCounter, reqCostTimeObserver, allReqTotalCounter, allReqCostTimeObserver) +} + +var ( + reqTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: REQ_TOTAL_COUNT, + }, []string{PATH, METHOD}) + + reqCostTimeObserver = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: REQ_COST_TIME, + Buckets: []float64{ + 100, + 200, + 500, + 1000, + 3000, + 5000, + }, + }, []string{PATH, METHOD}) + + allReqTotalCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: ALL_REQ_TOTAL_COUNT, + }, []string{HOST}) + + allReqCostTimeObserver = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: ALL_REQ_COST_TIME, + Buckets: []float64{ + 100, + 200, + 500, + 1000, + 3000, + 5000, + }, + }, []string{HOST}) +) + +func AddReqCount(req *http.Request) { + reqTotalCounter.WithLabelValues(req.URL.Path, req.Method).Inc() +} + +func CollectReqCostTime(req *http.Request, ms int64) { + reqCostTimeObserver.WithLabelValues(req.URL.Path, req.Method).Observe(float64(ms)) +} + +func AddAllReqCount(req *http.Request) { + allReqTotalCounter.WithLabelValues(req.Host).Inc() +} + +func CollectAllReqCostTime(req *http.Request, ms int64) { + allReqCostTimeObserver.WithLabelValues(req.Host).Observe(float64(ms)) +} diff --git a/app/http/middlewares/metric.go b/app/http/middlewares/metric.go new file mode 100644 index 0000000..2493fef --- /dev/null +++ b/app/http/middlewares/metric.go @@ -0,0 +1,22 @@ +package middlewares + +import ( + "time" + + "quenue/app/http/metric" + + "github.com/gin-gonic/gin" +) + +func CollectMetric() gin.HandlerFunc { + return func(ctx *gin.Context) { + start := time.Now() + ctx.Next() + dur := time.Now().Sub(start).Milliseconds() + + metric.AddAllReqCount(ctx.Request) + metric.CollectAllReqCostTime(ctx.Request, dur) + metric.AddReqCount(ctx.Request) + metric.CollectReqCostTime(ctx.Request, dur) + } +} diff --git a/app/http/middlewares/server_recovery.go b/app/http/middlewares/server_recovery.go new file mode 100644 index 0000000..f730c0e --- /dev/null +++ b/app/http/middlewares/server_recovery.go @@ -0,0 +1,50 @@ +package middlewares + +import ( + "encoding/json" + syslog "log" + "net/http/httputil" + "runtime/debug" + + "quenue/app/constants/logtype" + "quenue/config" + + "github.com/gin-gonic/gin" + "github.com/qit-team/snow-core/log/logger" +) + +func ServerRecovery() gin.HandlerFunc { + return func(c *gin.Context) { + + defer func() { + if err := recover(); err != nil { + httpRequest, _ := httputil.DumpRequest(c.Request, false) + msg := map[string]interface{}{ + "error": err, + "request": string(httpRequest), + "stack": string(debug.Stack()), + } + msgJson, _ := json.Marshal(msg) + logger.GetLogger().Error(string(msgJson), logtype.GoPanic, c) + + if config.IsDebug() { + //本地开发 debug 模式开启时输出错误信息到shell + syslog.Println(err) + } + + c.JSON(500, gin.H{ + "code": 500, + "msg": "system error", + "request_uri": c.Request.URL.Path, + "data": make(map[string]string), + }) + } + }() + + //before request + + c.Next() + + //after request + } +} diff --git a/app/http/middlewares/tracer.go b/app/http/middlewares/tracer.go new file mode 100644 index 0000000..35dc5ea --- /dev/null +++ b/app/http/middlewares/tracer.go @@ -0,0 +1,55 @@ +package middlewares + +import ( + "fmt" + "strconv" + "time" + + "quenue/app/http/trace" + + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky/propagation" + v3 "github.com/SkyAPM/go2sky/reporter/grpc/language-agent" + "github.com/gin-gonic/gin" + "github.com/qit-team/snow-core/log/logger" +) + +const ( + componentIDGOHttpServer = 5004 +) + +func Trace() gin.HandlerFunc { + return func(c *gin.Context) { + tracer, err := trace.Tracer() + if err != nil { + logger.Error(c, "Trace", err.Error()) + c.Next() + return + } + r := c.Request + operationName := fmt.Sprintf("/%s%s", r.Method, r.URL.Path) + span, ctx, err := tracer.CreateEntrySpan(c, operationName, func() (string, error) { + // 从http头部捞取上一层的调用链信息, 当前使用v3版本的协议 + // https://github.com/apache/skywalking/blob/master/docs/en/protocols/Skywalking-Cross-Process-Propagation-Headers-Protocol-v3.md + return r.Header.Get(propagation.Header), nil + }) + if err != nil { + logger.Error(c, "Trace", err.Error()) + c.Next() + return + } + span.SetComponent(componentIDGOHttpServer) + // 可以自定义tag + span.Tag(go2sky.TagHTTPMethod, r.Method) + span.Tag(go2sky.TagURL, fmt.Sprintf("%s%s", r.Host, r.URL.Path)) + span.SetSpanLayer(v3.SpanLayer_Http) + c.Request = c.Request.WithContext(ctx) + c.Next() + code := c.Writer.Status() + if code >= 400 { + span.Error(time.Now(), fmt.Sprintf("Error on handling request, statusCode: %d", code)) + } + span.Tag(go2sky.TagStatusCode, strconv.Itoa(code)) + span.End() + } +} diff --git a/app/http/routes/route.go b/app/http/routes/route.go new file mode 100644 index 0000000..395de56 --- /dev/null +++ b/app/http/routes/route.go @@ -0,0 +1,55 @@ +package routes + +/** + * 配置路由 + */ +import ( + "quenue/app/http/controllers" + "quenue/app/http/middlewares" + "quenue/app/http/trace" + "quenue/app/utils/metric" + "quenue/config" + + "github.com/gin-gonic/gin" + "github.com/qit-team/snow-core/http/middleware" + "github.com/qit-team/snow-core/log/logger" + "github.com/swaggo/gin-swagger" + "github.com/swaggo/gin-swagger/swaggerFiles" +) + +//api路由配置 +func RegisterRoute(router *gin.Engine) { + //middleware: 服务错误处理 => 生成请求id => access log + router.Use(middlewares.ServerRecovery(), middleware.GenRequestId, middleware.GenContextKit, middleware.AccessLog()) + + if config.GetConf().PrometheusCollectEnable && config.IsEnvEqual(config.ProdEnv) { + router.Use(middlewares.CollectMetric()) + metric.Init(metric.EnableRuntime(), metric.EnableProcess()) + metricHandler := metric.Handler() + router.GET("/metrics", func(ctx *gin.Context) { + metricHandler.ServeHTTP(ctx.Writer, ctx.Request) + }) + } + + if len(config.GetConf().SkyWalkingOapServer) > 0 && config.IsEnvEqual(config.ProdEnv) { + err := trace.InitTracer(config.GetConf().ServiceName, config.GetConf().SkyWalkingOapServer) + if err != nil { + logger.Error(nil, "InitTracer", err.Error()) + } else { + router.Use(middlewares.Trace()) + } + } + + router.NoRoute(controllers.Error404) + router.GET("/hello", controllers.HandleHello) + router.POST("/test", controllers.HandleTest) + router.POST("/test_validator", controllers.HandleTestValidator) + + //api版本 + v1 := router.Group("/v1") + { + v1.GET("/banner_list", controllers.GetBannerList) + } + + router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) +} diff --git a/app/http/tcppool/pool.go b/app/http/tcppool/pool.go new file mode 100644 index 0000000..0cc010a --- /dev/null +++ b/app/http/tcppool/pool.go @@ -0,0 +1,106 @@ +package tcppool + +import ( + "bufio" + "github.com/nange/easypool" + _ "github.com/nange/easypool" + "io" + "net" + "quenue/app/utils" + "quenue/config" + "sync" + "sync/atomic" + "time" +) + +var lock sync.Once +var TcpPoolFactory = &TcpPool{Full: new(int32)} + +type TcpPool struct { + client easypool.Pool + Full *int32 + lastTime int64 + isDie bool +} + +func (t *TcpPool) initPool(port string) *TcpPool { + lock.Do(func() { + factory := func() (net.Conn, error) { + var conn, err = net.Dial("tcp", "192.168.110.50:"+port) + return conn, err + } + config := &easypool.PoolConfig{ + InitialCap: 1, + MaxCap: 2, + MaxIdle: 1, + Idletime: 30 * time.Second, + MaxLifetime: 10 * time.Minute, + Factory: factory, + } + + pool, err := easypool.NewHeapPool(config) + if err == nil { + t.isDie = false + t.client = pool + } else { + utils.Log(nil, "tcp err", err) + time.Sleep(3 * time.Second) + } + + }) + return t +} +func (t *TcpPool) handRead(conn net.Conn) { + //defer func() { + // if err := recover(); err != nil { + // utils.Log(nil, "tcp read err", err) + // } + //}() + for { + if time.Now().Unix()-t.lastTime > 10 { + //t.isDie = true + //t.client.Close() + //t.client = nil + //return + } + reader := bufio.NewReader(conn) + var buffer [256]byte + // 持续读取数据 + n, err := reader.Read(buffer[:]) + if err == io.EOF { + continue + } + if err != nil { + t.isDie = true + if t.client != nil { + t.client.Close() + t.client = nil + } + return + utils.Log(nil, "Error reading data:", err) + } + recvStr := string(buffer[:n]) + if recvStr == "1" { + atomic.AddInt32(t.Full, 1) + } else if recvStr == "ping" { + conn.Write([]byte("pong")) + t.lastTime = time.Now().Unix() + } + } +} +func (t *TcpPool) watch(conn net.Conn) { + go t.handRead(conn) +} + +func (t *TcpPool) SendMsg(msg []byte) error { + conn, err := t.initPool(config.GetConf().OrderPort).client.Get() + if err != nil { + utils.Log(nil, "get tcp err", err) + return err + } + //var data, _ = json.Marshal(msg) + t.watch(conn) + _, err = conn.Write(msg) + conn.Close() + return err +} diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go new file mode 100644 index 0000000..cdd49a7 --- /dev/null +++ b/app/http/tcppool/single.go @@ -0,0 +1,103 @@ +package tcppool + +import ( + "bufio" + "fmt" + "io" + "net" + "quenue/app/utils" + "quenue/config" + "sync" + "sync/atomic" + "time" +) + +var ( + full int32 = 0 + TcpFactory = TcpHelper{Full: &full} + lockSingle sync.Once +) + +type TcpHelper struct { + client net.Conn + lastTime int64 + Full *int32 +} + +func (t *TcpHelper) Init(port string) *TcpHelper { + lockSingle.Do(func() { + var conn, err = net.Dial("tcp", "192.168.110.50:"+port) + if err == nil { + t.client = conn + //t.watch(t.client) + } + }) + return t +} +func (t *TcpHelper) handRead(conn net.Conn) { + //defer func() { + // if err := recover(); err != nil { + // utils.Log(nil, "tcp read err", err) + // } + //}() + for { + if time.Now().Unix()-t.lastTime > 10 { + //t.isDie = true + //t.client.Close() + //t.client = nil + //return + } + fmt.Println("read") + reader := bufio.NewReader(conn) + var buffer [256]byte + // 持续读取数据 + n, err := reader.Read(buffer[:]) + if err == io.EOF { + continue + } + if err != nil { + //if t.client != nil { + // t.client.Close() + // t.client = nil + //} + utils.Log(nil, "Error reading data:", err) + continue + + } + recvStr := string(buffer[:n]) + if recvStr == "1" { + atomic.AddInt32(t.Full, 1) + } else if recvStr == "ping" { + conn.Write([]byte("pong")) + t.lastTime = time.Now().Unix() + } + } + select {} +} +func (t *TcpHelper) SendMsg(msg []byte) error { + _, err := t.Init(config.GetConf().OrderPort).client.Write(msg) + + var buffer [256]byte + // 持续读取数据 + n, err := t.client.Read(buffer[:]) + if err == nil { + if n > 0 { + recvStr := string(buffer[:n]) + fmt.Println("结果:recvStr:", recvStr) + if recvStr == "1" { + fmt.Println("满了") + atomic.AddInt32(t.Full, 1) + } else if recvStr == "2" { + atomic.AddInt32(t.Full, -1) + } + } + + } + return err +} +func (t *TcpHelper) Close(conn net.Conn) { + t.client.Close() +} +func (t *TcpHelper) watch(conn net.Conn) { + go t.handRead(conn) +} diff --git a/app/http/trace/trace.go b/app/http/trace/trace.go new file mode 100644 index 0000000..2a986c0 --- /dev/null +++ b/app/http/trace/trace.go @@ -0,0 +1,42 @@ +package trace + +import ( + "sync" + + "quenue/config" + + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky/reporter" +) + +var ( + tracer *go2sky.Tracer + lock sync.Mutex +) + +func Tracer() (*go2sky.Tracer, error) { + if tracer == nil { + // 有err, 不适合用sync.Once做单例 + lock.Lock() + defer lock.Unlock() + if tracer == nil { + err := InitTracer(config.GetConf().ServiceName, config.GetConf().SkyWalkingOapServer) + if err != nil { + return nil, err + } + } + } + return tracer, nil +} + +func InitTracer(serviceName, skyWalkingOapServer string) error { + report, err := reporter.NewGRPCReporter(skyWalkingOapServer) + if err != nil { + return err + } + tracer, err = go2sky.NewTracer(serviceName, go2sky.WithReporter(report)) + if err != nil { + return err + } + return nil +} diff --git a/app/jobs/basejob/base_job.go b/app/jobs/basejob/base_job.go new file mode 100644 index 0000000..7024e97 --- /dev/null +++ b/app/jobs/basejob/base_job.go @@ -0,0 +1,66 @@ +package basejob + +import ( + "context" + "sync" + + "github.com/qit-team/work" +) + +var ( + jb *work.Job + register func(job *work.Job) + mu sync.RWMutex +) + +func SetJob(job *work.Job) { + if jb == nil { + jb = job + } +} + +func SetJobRegister(r func(*work.Job)) { + register = r +} + +func GetJob() *work.Job { + if jb == nil { + if register != nil { + mu.Lock() + defer mu.Unlock() + jb = work.New() + register(jb) + } else { + panic("job register is nil") + } + } + return jb +} + +/** + * 消息入队 -- 原始message + */ +func Enqueue(ctx context.Context, topic string, message string, args ...interface{}) (isOk bool, err error) { + return GetJob().Enqueue(ctx, topic, message, args...) +} + +/** + * 消息入队 -- Task数据结构 + */ +func EnqueueWithTask(ctx context.Context, topic string, task work.Task, args ...interface{}) (isOk bool, err error) { + return GetJob().EnqueueWithTask(ctx, topic, task, args...) +} + +/** + * 消息批量入队 -- 原始message + */ +func BatchEnqueue(ctx context.Context, topic string, messages []string, args ...interface{}) (isOk bool, err error) { + return GetJob().BatchEnqueue(ctx, topic, messages, args...) +} + +/** + * 消息批量入队 -- Task数据结构 + */ +func BatchEnqueueWithTask(ctx context.Context, topic string, tasks []work.Task, args ...interface{}) (isOk bool, err error) { + return GetJob().BatchEnqueueWithTask(ctx, topic, tasks, args...) +} diff --git a/app/jobs/kernel.go b/app/jobs/kernel.go new file mode 100644 index 0000000..17e8881 --- /dev/null +++ b/app/jobs/kernel.go @@ -0,0 +1,56 @@ +package jobs + +import ( + "strings" + + "quenue/app/jobs/basejob" + "quenue/config" + + "github.com/qit-team/snow-core/log/logger" + "github.com/qit-team/snow-core/queue" + "github.com/qit-team/snow-core/redis" + "github.com/qit-team/work" +) + +/** + * 配置队列任务 + */ +func RegisterWorker(job *work.Job) { + basejob.SetJob(job) + + //设置worker的任务投递回调函数 + job.AddFunc("topic-test", test) + //设置worker的任务投递回调函数,和并发数 + job.AddFunc("topic-test1", test, 2) + //使用worker结构进行注册 + job.AddWorker("topic-test2", &work.Worker{Call: work.MyWorkerFunc(test), MaxConcurrency: 1}) + + RegisterQueueDriver(job) + SetOptions(job) +} + +/** + * 给topic注册对应的队列服务 + */ +func RegisterQueueDriver(job *work.Job) { + //设置队列服务,需要实现work.Queue接口的方法 + q := queue.GetQueue(redis.SingletonMain, queue.DriverTypeRedis) + //针对topic设置相关的queue + job.AddQueue(q, "topic-test1", "topic-test2") + //设置默认的queue, 没有设置过的topic会使用默认的queue + job.AddQueue(q) +} + +/** + * 设置配置参数 + */ +func SetOptions(job *work.Job) { + //设置logger,需要实现work.Logger接口的方法 + job.SetLogger(logger.GetLogger()) + + //设置启用的topic,未设置表示启用全部注册过topic + if config.GetOptions().Queue != "" { + topics := strings.Split(config.GetOptions().Queue, ",") + job.SetEnableTopics(topics...) + } +} diff --git a/app/jobs/test.go b/app/jobs/test.go new file mode 100644 index 0000000..04eece4 --- /dev/null +++ b/app/jobs/test.go @@ -0,0 +1,24 @@ +package jobs + +import ( + "fmt" + "time" + + "github.com/qit-team/work" +) + +func test(task work.Task) (work.TaskResult) { + time.Sleep(time.Millisecond * 5) + s, err := work.JsonEncode(task) + if err != nil { + //work.StateFailed 不会进行ack确认 + //work.StateFailedWithAck 会进行actk确认 + //return work.TaskResult{Id: task.Id, State: work.StateFailed} + return work.TaskResult{Id: task.Id, State: work.StateFailedWithAck} + } else { + //work.StateSucceed 会进行ack确认 + fmt.Println("do task", s) + return work.TaskResult{Id: task.Id, State: work.StateSucceed} + } + +} diff --git a/app/models/bannermodel/banner.go b/app/models/bannermodel/banner.go new file mode 100644 index 0000000..bb0d7d2 --- /dev/null +++ b/app/models/bannermodel/banner.go @@ -0,0 +1,57 @@ +package bannermodel + +import ( + "sync" + "time" + + "github.com/qit-team/snow-core/db" +) + +var ( + once sync.Once + m *bannerModel +) +/** + * Banner实体 + */ +type Banner struct { + Id int64 `xorm:"pk autoincr"` //注:使用getOne 或者ID() 需要设置主键 + Pid int + Title string + ImageUrl string `xorm:"'img_url'"` + Url string + Status string + CreatedAt time.Time + UpdatedAt time.Time + DeletedAt time.Time `xorm:"deleted"` //此特性会激发软删除 +} + +/** + * 表名规则 + * @wiki http://gobook.io/read/github.com/go-xorm/manual-zh-CN/chapter-02/3.tags.html + */ +func (m *Banner) TableName() string { + return "banner" +} + +/** + * 私有化,防止被外部new + */ +type bannerModel struct { + db.Model //组合基础Model,集成基础Model的属性和方法 +} + +//单例模式 +func GetInstance() *bannerModel { + once.Do(func() { + m = new(bannerModel) + //m.DiName = "" //设置数据库实例连接,默认db.SingletonMain + }) + return m +} + +func (m *bannerModel) GetListByPid(pid int, limits ...int) (banners []*Banner, err error) { + banners = make([]*Banner, 0) + err = m.GetList(&banners, "pid = ?", []interface{}{pid}, limits) + return +} diff --git a/app/models/bannermodel/banner_test.go b/app/models/bannermodel/banner_test.go new file mode 100644 index 0000000..01c6af7 --- /dev/null +++ b/app/models/bannermodel/banner_test.go @@ -0,0 +1,54 @@ +package bannermodel + +import ( + "fmt" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/qit-team/snow-core/config" + "github.com/qit-team/snow-core/db" + "github.com/qit-team/snow-core/utils" +) + +func init() { + m := config.DbBaseConfig{ + Host: "127.0.0.1", + Port: 3306, + User: "root", + Password: "123456", + DBName: "test", + } + dbConf := config.DbConfig{ + Driver: "mysql", + Master: m, + } + + err := db.Pr.Register("db", dbConf, true) + if err != nil { + fmt.Println(err) + } +} + +func TestGetOne(t *testing.T) { + bannerModel := GetInstance() + banner := new(Banner) + res, err := bannerModel.GetOne(1, banner) + if err != nil { + t.Error(err) + } else if res != true { + t.Error("missing banner data") + } else if banner.Id == 0 { + t.Error("missing banner data") + } + fmt.Println(utils.JsonEncode(banner)) +} + +func TestGetList(t *testing.T) { + bannerModel := GetInstance() + banners := make([]*Banner, 0) + err := bannerModel.GetList(&banners, "pid >= ?", []interface{}{1}, []int{10}, "status desc, id desc") + if err != nil { + t.Error(err) + } + fmt.Println(utils.JsonEncode(banners)) +} diff --git a/app/services/bannerservice/banner.go b/app/services/bannerservice/banner.go new file mode 100644 index 0000000..4522ef2 --- /dev/null +++ b/app/services/bannerservice/banner.go @@ -0,0 +1,25 @@ +package bannerservice + +import ( + "quenue/app/models/bannermodel" +) + +func GetListByPid(pid int, limit int, page int) (banners []*bannermodel.Banner, err error) { + limitStart := GetLimitStart(limit, page) + banners, err = bannermodel.GetInstance().GetListByPid(pid, limitStart...) + return +} + +func GetLimitStart(limit int, page int) (arr []int) { + arr = make([]int, 2) + if limit <= 0 { + limit = 20 + } + arr[0] = limit + if page > 0 { + arr[1] = (page - 1) * limit + } else { + arr[1] = 0 + } + return +} diff --git a/app/utils/.gitkeep b/app/utils/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/app/utils/httpclient/httpclient.go b/app/utils/httpclient/httpclient.go new file mode 100644 index 0000000..a64e216 --- /dev/null +++ b/app/utils/httpclient/httpclient.go @@ -0,0 +1,190 @@ +package httpclient + +import ( + "context" + "fmt" + "net/http" + "strconv" + "time" + + "quenue/app/http/trace" + "quenue/config" + + "github.com/SkyAPM/go2sky" + "github.com/SkyAPM/go2sky/propagation" + v3 "github.com/SkyAPM/go2sky/reporter/grpc/language-agent" + "github.com/go-resty/resty/v2" + "github.com/qit-team/snow-core/log/logger" +) + +const ( + RetryCounts = 2 + RetryInterval = 3 * time.Second +) + +const componentIDGOHttpClient = 5005 + +type ClientConfig struct { + ctx context.Context + client *resty.Client + tracer *go2sky.Tracer + extraTags map[string]string +} + +type ClientOption func(*ClientConfig) + +func WithClientTag(key string, value string) ClientOption { + return func(c *ClientConfig) { + if c.extraTags == nil { + c.extraTags = make(map[string]string) + } + c.extraTags[key] = value + } +} + +func WithClient(client *resty.Client) ClientOption { + return func(c *ClientConfig) { + c.client = client + } +} + +func WithContext(ctx context.Context) ClientOption { + return func(c *ClientConfig) { + c.ctx = ctx + } +} + +type transport struct { + *ClientConfig + delegated http.RoundTripper +} + +func (t *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) { + span, err := t.tracer.CreateExitSpan(t.ctx, fmt.Sprintf("/%s%s", req.Method, req.URL.Path), req.Host, func(header string) error { + // 将本层的调用链信息写入http头部, 传入到下一层调用, 当前使用v3版本的协议 + // https://github.com/apache/skywalking/blob/master/docs/en/protocols/Skywalking-Cross-Process-Propagation-Headers-Protocol-v3.md + req.Header.Set(propagation.Header, header) + return nil + }) + if err != nil { + return t.delegated.RoundTrip(req) + } + defer span.End() + span.SetComponent(componentIDGOHttpClient) + for k, v := range t.extraTags { + span.Tag(go2sky.Tag(k), v) + } + span.Tag(go2sky.TagHTTPMethod, req.Method) + span.Tag(go2sky.TagURL, req.URL.String()) + span.SetSpanLayer(v3.SpanLayer_Http) + resp, err = t.delegated.RoundTrip(req) + if err != nil { + span.Error(time.Now(), err.Error()) + return + } + span.Tag(go2sky.TagStatusCode, strconv.Itoa(resp.StatusCode)) + if resp.StatusCode >= http.StatusBadRequest { + span.Error(time.Now(), "Errors on handling client") + } + return resp, nil +} + +func NewClient(ctx context.Context, options ...ClientOption) (client *resty.Client) { + client = resty.New() + if config.IsDebug() { + client.SetDebug(true).EnableTrace() + } + + var ( + tracer *go2sky.Tracer + err error + ) + if len(config.GetConf().SkyWalkingOapServer) > 0 && config.IsEnvEqual(config.ProdEnv) { + tracer, err = trace.Tracer() + if err != nil { + logger.Error(ctx, "NewClient:Tracer", err.Error()) + } + } + if tracer != nil { + co := &ClientConfig{ctx: ctx, tracer: tracer} + for _, option := range options { + option(co) + } + if co.client == nil { + co.client = client + } + tp := &transport{ + ClientConfig: co, + delegated: http.DefaultTransport, + } + if co.client.GetClient().Transport != nil { + tp.delegated = co.client.GetClient().Transport + } + co.client.SetTransport(tp) + } + + client.OnBeforeRequest(func(ct *resty.Client, req *resty.Request) error { + //req.SetContext(c) + logger.Info(ctx, "OnBeforeRequest", logger.NewWithField("url", req.URL)) + return nil // if its success otherwise return error + }) + // Registering Response Middleware + client.OnAfterResponse(func(ct *resty.Client, resp *resty.Response) error { + logger.Info(ctx, "OnAfterResponse", logger.NewWithField("url", resp.Request.URL), logger.NewWithField("request", resp.Request.RawRequest), logger.NewWithField("response", resp.String())) + return nil + }) + return client +} + +func NewClientWithRetry(ctx context.Context, retryCounts int, retryInterval time.Duration, options ...ClientOption) (client *resty.Client) { + client = resty.New() + if config.IsDebug() { + client.SetDebug(true).EnableTrace() + } + if retryCounts == 0 { + retryCounts = RetryCounts + } + if retryInterval.Seconds() == 0.0 { + retryInterval = RetryInterval + } + client.SetRetryCount(retryCounts).SetRetryMaxWaitTime(retryInterval) + + var ( + tracer *go2sky.Tracer + err error + ) + if len(config.GetConf().SkyWalkingOapServer) > 0 && config.IsEnvEqual(config.ProdEnv) { + tracer, err = trace.Tracer() + if err != nil { + logger.Error(ctx, "NewClient:Tracer", err.Error()) + } + } + if tracer != nil { + co := &ClientConfig{ctx: ctx, tracer: tracer} + for _, option := range options { + option(co) + } + if co.client == nil { + co.client = client + } + tp := &transport{ + ClientConfig: co, + delegated: http.DefaultTransport, + } + if co.client.GetClient().Transport != nil { + tp.delegated = co.client.GetClient().Transport + } + co.client.SetTransport(tp) + } + + client.OnBeforeRequest(func(ct *resty.Client, req *resty.Request) error { + logger.Info(ctx, "OnBeforeRequest", logger.NewWithField("url", req.URL)) + return nil // if its success otherwise return error + }) + // Registering Response Middleware + client.OnAfterResponse(func(ct *resty.Client, resp *resty.Response) error { + logger.Info(ctx, "OnAfterResponse", logger.NewWithField("url", resp.Request.URL), logger.NewWithField("request", resp.Request.RawRequest), logger.NewWithField("response", resp.String())) + return nil + }) + return client +} diff --git a/app/utils/metric/reporter.go b/app/utils/metric/reporter.go new file mode 100644 index 0000000..36f4660 --- /dev/null +++ b/app/utils/metric/reporter.go @@ -0,0 +1,177 @@ +package metric + +// prometheus metric:unique identifier: name and optional key-value pairs called labels +// 1. name regexp: [a-zA-Z_:][a-zA-Z0-9_:]* +// 2. label name regexp: [a-zA-Z_][a-zA-Z0-9_]* +// 3. Label names beginning with __ are reserved for internal use. +// 4. Label values may contain any Unicode characters. +// 5. notation: {