llm_train/train.py

282 lines
11 KiB
Python

import pandas as pd
import torch
from datasets import Dataset
from modelscope import snapshot_download, AutoTokenizer
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
import os
import swanlab
from modelscope.msdatasets import MsDataset
import json
import random
from config import Config,Default,Dir
class Config:
# 基座模型
MODEL_NAME = "Qwen/Qwen3-0.6B"
# 数据集
DATASET_NAME = "krisfu/delicate_medical_r1_data"
# 数据集主题(子数据集)
DATASET_SUBJECT = "default"
# 数据集用途
DATASET_SPLIT = "train"
# 是否使用缓存
DATASET_USE_CACHE = True
# swanlab项目名称
SWANLAB_PROJECT = "qweb3-sft-medical-10-11-1"
# 验证用system的提示词
PROMPT = "你是一个医学专家,你需要根据用户的问题,给出带有思考的回答。"
# 数据集question列名称
QUES_LABEL = "question"
# 数据集think列名称(可为空)
THINK_LABEL = "think"
# 数据集answer列名称
ANS_LABEL = "answer"
DATA_MAX_LENGTH = 2048
class Default:
DATASET_PATH = os.getenv("DATASET_PATH", "./dataset") # 支持环境变量覆
MODEL_DATASET_PATH = os.getenv("MODEL_DATASET_PATH", "./model_dataset") # 支持环境变量覆
SAVE_DIR = "./saved_model" # 微调后模型存储位置
TRAIN_DATASET_FILE = "train.jsonl"
TEST_DATASET_FILE = "val.jsonl"
TRAIN_JSONL_NEW_FILE = "train_format.jsonl"
TEST_JSONL_NEW_FILE = "val_format.jsonl"
dataset_short_name = Config.DATASET_NAME.split("/")[-1]
model_dataset_short_name = Config.MODEL_NAME.split("/")[-1]
# 确保缓存目录存在
dataset_dir = os.path.normpath(
os.path.join(Default.DATASET_PATH, dataset_short_name, Config.DATASET_SUBJECT, Config.DATASET_SPLIT)
)
model_dataset_DIR = os.path.normpath(
os.path.join(Default.MODEL_DATASET_PATH, model_dataset_short_name, Config.DATASET_SUBJECT, Config.DATASET_SPLIT)
)
model_dir = os.path.normpath(
os.path.join(Default.SAVE_DIR,model_dataset_short_name, dataset_short_name, Config.DATASET_SUBJECT, Config.DATASET_SPLIT)
)
os.makedirs(dataset_dir, exist_ok=True)
os.makedirs(model_dataset_DIR, exist_ok=True)
os.makedirs(model_dir, exist_ok=True)
class Dir:
DATASET_DIR = dataset_dir
MODEL_DIR = model_dir
MODEL_DATASET_DIR = model_dataset_DIR
def dataset_jsonl_transfer(origin_path, new_path):
"""
将原始数据集转换为大模型微调所需数据格式的新数据集
"""
messages = []
# 读取旧的JSONL文件
with open(origin_path, "r") as file:
for line in file:
# 解析每一行的json数据
data = json.loads(line)
input = data[Config.QUES_LABEL]
output = ""
if hasattr(Config, "THINK_LABEL") and Config.THINK_LABEL in data:
output = data[Config.THINK_LABEL]
if hasattr(Config, "ANS_LABEL") and Config.ANS_LABEL in data:
output += f"{data[Config.ANS_LABEL]}"
message = {
"instruction": Config.PROMPT,
"input": f"{input}",
"output": output,
}
messages.append(message)
# 保存重构后的JSONL文件
with open(new_path, "w", encoding="utf-8") as file:
for message in messages:
file.write(json.dumps(message, ensure_ascii=False) + "\n")
def predict(messages, model, tokenizer):
device = "cuda"
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(device)
generated_ids = model.generate(
model_inputs.input_ids,
max_new_tokens=Config.DATA_MAX_LENGTH,
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
return response
def load_model_and_tokenizer():
print("🧼 开始加载模型...")
# 在modelscope上下载Qwen模型到本地目录下
model_dir = snapshot_download(Config.MODEL_NAME)
# Transformers加载模型权重
tokenizer = AutoTokenizer.from_pretrained(Config.MODEL_NAME, use_fast=False,
trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto",
dtype=torch.bfloat16)
model.enable_input_require_grads() # 开启梯度检查点时,要执行该方法
return model,tokenizer
def mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file):
data_list = list(ds)
random.shuffle(data_list)
split_idx = int(len(data_list) * 0.9)
train_data = data_list[:split_idx]
val_data = data_list[split_idx:]
with open(dataset_train_json_file, 'w', encoding='utf-8') as f:
for item in train_data:
json.dump(item, f, ensure_ascii=False)
f.write('\n')
with open(dataset_test_json_file, 'w', encoding='utf-8') as f:
for item in val_data:
json.dump(item, f, ensure_ascii=False)
f.write('\n')
print(f"✅ 原始数据清洗完成 | 训练样本量: {len(train_data)} | 测试样本量: {len(train_data)}")
def get_dataset_json():
# 确保缓存目录存在
os.makedirs(Dir.DATASET_DIR, exist_ok=True)
ds = MsDataset.load(
dataset_name=Config.DATASET_NAME,
subset_name=Config.DATASET_SUBJECT,
split=Config.DATASET_SPLIT,
cache_dir=Dir.DATASET_DIR # 统一使用配置的路径
)
dataset_train_json_file = Dir.DATASET_DIR+"/"+Default.TRAIN_DATASET_FILE
dataset_test_json_file = Dir.DATASET_DIR + "/" + Default.TEST_DATASET_FILE
print(f"🏷️ 原始训练数据文件:{dataset_train_json_file}")
print(f"🏷️ 原始测试数据文件:{dataset_test_json_file}")
if (not os.path.exists(dataset_train_json_file)
or not os.path.exists(dataset_test_json_file)
or not Config.DATASET_USE_CACHE):
print("🏷️ 正在生成原始数据集...")
mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file)
return dataset_train_json_file,dataset_test_json_file
def get_model_train_dataset_json_file():
model_train_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TRAIN_JSONL_NEW_FILE
model_test_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TEST_JSONL_NEW_FILE
return model_train_json_file, model_test_json_file
def get_model_train_dataset_json():
model_train_json_file,model_test_json_file = get_model_train_dataset_json_file()
print(f"🏷️ 模型训练数据文件:{model_train_json_file}")
print(f"🏷️ 模型测试数据文件:{model_test_json_file}")
if not os.path.exists(model_train_json_file) or not os.path.exists(
model_test_json_file) or not Config.DATASET_USE_CACHE:
print("🏷️ 未找到模型对应数据集,准备从原始数据集进行生成数据集...")
dataset_train_json_file,dataset_test_json_file = get_dataset_json()
print("🏷️ 原始数据集生成成功...")
print("🧼 开始转换模型训练数据集...")
dataset_jsonl_transfer(dataset_train_json_file, model_train_json_file)
print("🧼 开始转换模型验证数据集...")
dataset_jsonl_transfer(dataset_test_json_file, model_test_json_file)
return model_train_json_file,model_test_json_file
def load_train_and_eval_data():
#获取训练数据集json.
print("🧼 开始获取训练数据...")
model_train_json_file,model_test_json_file = get_model_train_dataset_json()
# 得到训练集
train_df = pd.read_json(model_train_json_file, lines=True)
train_ds = Dataset.from_pandas(train_df)
train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names)
# 得到验证集
eval_df = pd.read_json(model_test_json_file, lines=True)
eval_ds = Dataset.from_pandas(eval_df)
eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names)
return train_dataset,eval_dataset
def set_swanlab():
os.environ["SWANLAB_PROJECT"] = Config.SWANLAB_PROJECT
swanlab.config.update({
"model": "",
"prompt": Config.PROMPT,
"data_max_length": Config.DATA_MAX_LENGTH,
})
def process_func(example):
"""
将数据集进行预处理
"""
input_ids, attention_mask, labels = [], [], []
instruction = tokenizer(
f"<|im_start|>system\n{Config.PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n",
add_special_tokens=False,
)
response = tokenizer(f"{example['output']}", add_special_tokens=False)
input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id]
attention_mask = (
instruction["attention_mask"] + response["attention_mask"] + [1]
)
labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id]
if len(input_ids) > Config.DATA_MAX_LENGTH: # 做一个截断
input_ids = input_ids[:Config.DATA_MAX_LENGTH]
attention_mask = attention_mask[:Config.DATA_MAX_LENGTH]
labels = labels[:Config.DATA_MAX_LENGTH]
return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}
def set_trainer():
print("🧼 正在生成训练对象...")
print(f"🧼 {Dir.MODEL_DIR}")
args = TrainingArguments(
output_dir=Dir.MODEL_DIR,
per_device_train_batch_size=1,
per_device_eval_batch_size=1,
gradient_accumulation_steps=4,
eval_strategy="steps",
eval_steps=100,
logging_steps=10,
num_train_epochs=2,
save_steps=400,
learning_rate=1e-4,
save_on_each_node=True,
gradient_checkpointing=True,
report_to="swanlab",
run_name=Config.MODEL_NAME,
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True),
)
return trainer
if __name__ == "__main__":
#释放显存
torch.cuda.empty_cache()
#设置SWANLAB
set_swanlab()
#加载模型和分词器
model, tokenizer = load_model_and_tokenizer()
# 加载数据集和验证集
train_dataset, eval_dataset = load_train_and_eval_data()
model_train_json_file, model_test_json_file = get_model_train_dataset_json_file()
#设置训练对象
trainer = set_trainer()
print("🚬🚬🚬 开始模型训练...")
trainer.train()
print(" 🎇🎇🎇模型训练完成,...")