ex_pic_to_video/main.go

240 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"context"
"github.com/manifoldco/promptui"
"time"
"encoding/json"
"fmt"
"github.com/common-nighthawk/go-figure"
"github.com/volcengine/volcengine-go-sdk/service/arkruntime"
"github.com/volcengine/volcengine-go-sdk/service/arkruntime/model"
"github.com/volcengine/volcengine-go-sdk/volcengine"
"os"
"strings"
)
var (
reader = bufio.NewReader(os.Stdin)
)
var statusMap = map[string]string{
"queued": "排队中",
"running": "任务运行中",
"cancelled": "取消任务",
"succeeded": "任务成功",
"failed": "任务失败",
}
type Conf struct {
Key string `json:"key"`
Model string `json:"model"`
}
type Video struct {
Url string `json:"url"`
Text string `json:"text"`
}
func setVideoInfo() *Video {
var video = new(Video)
img(video)
desc(video)
return video
}
var c *Conf
func main() {
log("正在读取配置。。。")
c = getConf()
success("读取配置成功!请务必保证网络通畅")
client := arkruntime.NewClientWithApiKey(c.Key)
video := setVideoInfo()
//finish(client, video)
do(client, c, video)
}
func do(client *arkruntime.Client, c *Conf, video *Video) {
ctx := context.Background()
createReq := model.CreateContentGenerationTaskRequest{
Model: c.Model,
Content: []*model.CreateContentGenerationContentItem{
{
// 文本提示词与参数组合
Type: model.ContentGenerationContentItemTypeText,
Text: volcengine.String(video.Text),
},
{
// 图片URL
Type: model.ContentGenerationContentItemTypeImage,
ImageURL: &model.ImageURL{
URL: video.Url, //请上传可以访问的图片URL
},
},
},
}
createResponse, err := client.CreateContentGenerationTask(ctx, createReq)
if err != nil {
warning("创建视频失败: %v", err)
return
}
success("视频正在制作中,请稍后taskId: %s", createResponse.ID)
listenTask(ctx, client, createResponse.ID, video)
}
func listenTask(ctx context.Context, client *arkruntime.Client, taskId string, video *Video) {
var (
req = model.GetContentGenerationTaskRequest{taskId}
)
for {
time.Sleep(time.Second * 1)
resp, err := client.GetContentGenerationTask(ctx, req)
if err != nil {
warning("获取视频信息失败: %v", err)
continue
}
log(statusMap[resp.Status])
if resp.Status == "succeeded" {
tip("本次消耗token: %d\n", resp.Usage.TotalTokens)
success("请复制下面地址到浏览器进行下载:\n")
fmt.Println(resp.Content.VideoURL)
finish(client, video)
}
}
}
func finish(client *arkruntime.Client, video *Video) {
prompt := promptui.Select{
Label: "",
Items: []string{"退出", "重新生成", "修改设置重新生成"},
}
_, r, err := prompt.Run()
if err != nil {
warning("Prompt failed %v\n", err)
return
}
switch r {
case "退出":
prompt1 := promptui.Select{
Label: "是否退出",
Items: []string{"否", "是"},
}
_, q, _err := prompt1.Run()
if _err != nil {
warning("Prompt failed %v\n", err)
return
}
if q == "是" {
tip("最后祝您身体健康,再见")
exit()
} else {
finish(client, video)
}
case "重新生成":
do(client, c, video)
case "修改设置重新生成":
video = setVideoInfo()
do(client, c, video)
}
}
func getConf() *Conf {
wd, err := os.Getwd()
fp := fmt.Sprintf("%s/%s", wd, "ex_pic_to_video_conf")
if err != nil {
fatal("获取当前路径失败")
exit()
}
file, err := os.Open(fp)
if err != nil {
if !os.IsNotExist(err) {
os.Remove(fp)
}
return cConf(fp)
}
defer file.Close()
content, err := os.ReadFile(fp)
if err != nil {
fatal("获取配置信息失败")
exit()
}
var (
pwd string
conf = new(Conf)
)
for {
input("请输入访问密码: ")
pwd, _ = reader.ReadString('\n')
cjson, err := decrypt([]byte(padToLength(strings.TrimSpace(pwd), 16, ' ')), string(content))
if err != nil {
warning("密码错误")
continue
}
err = json.Unmarshal([]byte(cjson), &conf)
if err != nil {
warning("配置读取失败")
}
break
}
return conf
}
func cConf(fp string) *Conf {
conf := new(Conf)
log("进入初始化配置向导")
log("----------------------")
input("请设置key: ")
k, _ := reader.ReadString('\n')
conf.Key = strings.TrimSpace(k)
if conf.Key == "lsxd2025" {
conf.Key = ""
conf.Model = ""
myFigure := figure.NewFigure("LSXD", "", true)
myFigure.Print()
success("欢迎进入调试模式!")
return conf
}
input("请设置model: ")
m, _ := reader.ReadString('\n')
conf.Model = strings.TrimSpace(m)
var (
pwd string
)
for {
input("请设置访问密码(密码长度请小于16位): ")
pwd, _ = reader.ReadString('\n')
if len(strings.TrimSpace(pwd)) > 16 {
warning("密码长度请小于16位")
continue
}
break
}
cjson, _ := json.Marshal(conf)
pad := padToLength(strings.TrimSpace(pwd), 16, ' ')
en, e := encrypt([]byte(pad), string(cjson))
if e != nil {
fatal("设置失败,程序退出:%v", e)
exit()
}
if err := os.WriteFile(fp, []byte(en), 0666); err != nil {
fatal("写入失败,程序退出")
exit()
}
return conf
}
func exit() {
os.Exit(1)
}