183 lines
4.1 KiB
Go
183 lines
4.1 KiB
Go
package repoimpl
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
"time"
|
|
err2 "voucher/api/err"
|
|
"voucher/internal/biz/bo"
|
|
"voucher/internal/biz/do"
|
|
"voucher/internal/biz/repo"
|
|
"voucher/internal/biz/vo"
|
|
"voucher/internal/data"
|
|
"voucher/internal/data/model"
|
|
"voucher/internal/pkg/lock"
|
|
)
|
|
|
|
// ProductRepoImpl .
|
|
type ProductRepoImpl struct {
|
|
Base[model.Product, bo.ProductBo]
|
|
db *data.Db
|
|
rdb *data.Rdb
|
|
}
|
|
|
|
// NewProductRepoImpl .
|
|
func NewProductRepoImpl(db *data.Db, rdb *data.Rdb) repo.ProductRepo {
|
|
return &ProductRepoImpl{db: db, rdb: rdb}
|
|
}
|
|
|
|
func (r *ProductRepoImpl) FindWarningBudget(ctx context.Context, fun func(ctx context.Context, rows []*bo.ProductBo) error) error {
|
|
|
|
var results = make([]*model.Product, 0)
|
|
|
|
nowTime := time.Now().Format(time.DateTime)
|
|
|
|
result := r.db.DB(ctx).
|
|
Where("start_time <= ?", nowTime).
|
|
Where("start_time <= ?", nowTime).
|
|
Where("warning_budget > 0").
|
|
Where("warning_person IS NOT NULL").
|
|
FindInBatches(&results, 5, func(tx *gorm.DB, batch int) error {
|
|
return fun(ctx, r.ToBos(results))
|
|
})
|
|
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ProductRepoImpl) UpdateWarningBudget(ctx context.Context, id int32, req *do.WarningBudget) error {
|
|
|
|
now := time.Now()
|
|
|
|
u := model.Product{
|
|
AllBudget: req.AllBudget,
|
|
RemainingBudget: req.RemainingBudget,
|
|
UpdateTime: &now,
|
|
}
|
|
|
|
if req.StartTime != nil {
|
|
u.StartTime = req.StartTime
|
|
}
|
|
if req.EndTime != nil {
|
|
u.EndTime = req.EndTime
|
|
}
|
|
|
|
tx := r.db.DB(ctx).
|
|
Where(model.Product{
|
|
ID: id,
|
|
}).
|
|
Updates(u)
|
|
|
|
if tx.Error != nil {
|
|
return fmt.Errorf("db fail %w", tx.Error)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ProductRepoImpl) GetByBatchNo(ctx context.Context, batchNo string) (*bo.ProductBo, error) {
|
|
|
|
var item *model.Product
|
|
|
|
db := r.db.DB(ctx).Model(model.Product{})
|
|
tx := db.Where(model.Product{BatchNo: batchNo}).First(&item)
|
|
|
|
if tx.Error != nil {
|
|
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
|
|
return nil, err2.ErrorDbNotFound("商品数据不存在")
|
|
}
|
|
return nil, fmt.Errorf("product db fail %w", tx.Error)
|
|
}
|
|
|
|
if tx.RowsAffected == 0 {
|
|
return nil, err2.ErrorDbNotFound("商品数据不存在")
|
|
}
|
|
|
|
return r.ToBo(item), nil
|
|
}
|
|
|
|
func (r *ProductRepoImpl) GetByProductNo(ctx context.Context, productNo string) (*bo.ProductBo, error) {
|
|
|
|
c := vo.ProductQueryKey.BuildCache([]string{productNo})
|
|
|
|
cacheValue, err := r.rdb.Rdb.Get(context.Background(), c.Key).Result()
|
|
|
|
if err != nil && err != redis.Nil {
|
|
return nil, fmt.Errorf(fmt.Sprintf("获取商品缓存异常,%s:%v", c.Key, err))
|
|
}
|
|
|
|
var item *model.Product
|
|
|
|
if len(cacheValue) > 0 {
|
|
if err = json.Unmarshal([]byte(cacheValue), &item); err != nil {
|
|
return nil, err
|
|
}
|
|
return r.ToBo(item), nil
|
|
}
|
|
|
|
cl := vo.ProductQueryLockKey.BuildCache([]string{productNo})
|
|
|
|
err = lock.NewMutex(r.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error {
|
|
|
|
cacheValue, err = r.rdb.Rdb.Get(ctx, c.Key).Result()
|
|
|
|
if err != nil && err != redis.Nil {
|
|
return fmt.Errorf(fmt.Sprintf("二次获取商品缓存异常,%s:%v", c.Key, err))
|
|
}
|
|
|
|
if len(cacheValue) > 0 {
|
|
return json.Unmarshal([]byte(cacheValue), &item)
|
|
}
|
|
|
|
item, err = r.getByProductNo(ctx, item, productNo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b, err3 := json.Marshal(item)
|
|
if err3 != nil {
|
|
return err3
|
|
}
|
|
|
|
return r.rdb.Rdb.Set(ctx, c.Key, string(b), c.TTL).Err()
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.ToBo(item), err
|
|
}
|
|
|
|
func (r *ProductRepoImpl) getByProductNo(ctx context.Context, item *model.Product, productNo string) (*model.Product, error) {
|
|
|
|
db := r.db.DB(ctx).Model(model.Product{})
|
|
tx := db.Where(model.Product{ProductNo: productNo}).First(&item)
|
|
|
|
if tx.Error != nil {
|
|
|
|
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
|
|
return nil, err2.ErrorDbNotFound("商品数据不存在")
|
|
}
|
|
|
|
sqlDB, _ := db.DB()
|
|
log.Warnf("product当前打开连接数:%d,空闲连接数:%d", sqlDB.Stats().OpenConnections, sqlDB.Stats().Idle)
|
|
|
|
return nil, fmt.Errorf("product db fail %w", tx.Error)
|
|
}
|
|
|
|
if tx.RowsAffected == 0 {
|
|
return nil, err2.ErrorDbNotFound("商品数据不存在")
|
|
}
|
|
|
|
return item, nil
|
|
}
|