ai_scheduler/internal/domain/workflow/runtime/registry.go

96 lines
1.6 KiB
Go

package runtime
import (
"ai_scheduler/internal/config"
toolManager "ai_scheduler/internal/domain/tools"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/pkg/utils_ollama"
"context"
"errors"
"sync"
)
type Workflow interface {
ID() string
// Schema() map[string]any
Invoke(ctx context.Context, requireData *entitys.Recognize) (map[string]any, error)
}
type Deps struct {
Conf *config.Config
LLM *utils_ollama.Client
ToolManager *toolManager.Manager
}
type Factory func(deps *Deps) (Workflow, error)
var (
regMu sync.RWMutex
factories = map[string]Factory{}
deps *Deps
defaultReg *Registry
)
func Register(id string, f Factory) {
regMu.Lock()
factories[id] = f
regMu.Unlock()
}
func SetDeps(d *Deps) {
regMu.Lock()
deps = d
regMu.Unlock()
}
type Registry struct {
mu sync.RWMutex
instances map[string]Workflow
}
func NewRegistry() *Registry {
return &Registry{instances: make(map[string]Workflow)}
}
func SetDefault(r *Registry) {
regMu.Lock()
defaultReg = r
regMu.Unlock()
}
func Default() *Registry {
regMu.RLock()
r := defaultReg
regMu.RUnlock()
return r
}
func (r *Registry) Invoke(ctx context.Context, id string, rec *entitys.Recognize) (map[string]any, error) {
regMu.RLock()
f, ok := factories[id]
regMu.RUnlock()
if !ok {
return nil, errors.New("workflow not found: " + id)
}
r.mu.RLock()
w, exists := r.instances[id]
r.mu.RUnlock()
if !exists {
if deps == nil {
return nil, errors.New("deps not set")
}
nw, err := f(deps)
if err != nil {
return nil, err
}
r.mu.Lock()
r.instances[id] = nw
w = nw
r.mu.Unlock()
}
return w.Invoke(ctx, rec)
}