llm_train/train.py

237 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import torch
from datasets import Dataset
from modelscope import snapshot_download, AutoTokenizer
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
import os
import swanlab
from modelscope.msdatasets import MsDataset
import json
import random
from config import Config,Default,Dir
def dataset_jsonl_transfer(origin_path, new_path):
"""
将原始数据集转换为大模型微调所需数据格式的新数据集
"""
messages = []
# 读取旧的JSONL文件
with open(origin_path, "r") as file:
for line in file:
# 解析每一行的json数据
data = json.loads(line)
input = data["question"]
output = f"<think>{data['think']}</think> \n {data['answer']}"
message = {
"instruction": Config.PROMPT,
"input": f"{input}",
"output": output,
}
messages.append(message)
# 保存重构后的JSONL文件
with open(new_path, "w", encoding="utf-8") as file:
for message in messages:
file.write(json.dumps(message, ensure_ascii=False) + "\n")
def predict(messages, model, tokenizer):
device = "cuda"
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(device)
generated_ids = model.generate(
model_inputs.input_ids,
max_new_tokens=Config.DATA_MAX_LENGTH,
)
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
return response
def load_model_and_tokenizer():
print("🧼 开始加载模型...")
# 在modelscope上下载Qwen模型到本地目录下
model_dir = snapshot_download(Config.MODEL_NAME)
# Transformers加载模型权重
tokenizer = AutoTokenizer.from_pretrained(Config.MODEL_NAME, use_fast=False,
trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto",
dtype=torch.bfloat16)
model.enable_input_require_grads() # 开启梯度检查点时,要执行该方法
return model,tokenizer
def mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file):
data_list = list(ds)
random.shuffle(data_list)
split_idx = int(len(data_list) * 0.9)
train_data = data_list[:split_idx]
val_data = data_list[split_idx:]
with open(dataset_train_json_file, 'w', encoding='utf-8') as f:
for item in train_data:
json.dump(item, f, ensure_ascii=False)
f.write('\n')
with open(dataset_test_json_file, 'w', encoding='utf-8') as f:
for item in val_data:
json.dump(item, f, ensure_ascii=False)
f.write('\n')
print(f"✅ 原始数据清洗完成 | 训练样本量: {len(train_data)} | 测试样本量: {len(train_data)}")
def get_dataset_json():
# 确保缓存目录存在
os.makedirs(Dir.DATASET_DIR, exist_ok=True)
ds = MsDataset.load(
dataset_name=Config.DATASET_NAME,
subset_name=Config.DATASET_SUBJECT,
split=Config.DATASET_SPLIT,
cache_dir=Dir.DATASET_DIR # 统一使用配置的路径
)
dataset_train_json_file = Dir.DATASET_DIR+"/"+Default.TRAIN_DATASET_FILE
dataset_test_json_file = Dir.DATASET_DIR + "/" + Default.TEST_DATASET_FILE
print(f"🏷️ 原始训练数据文件:{dataset_train_json_file}")
print(f"🏷️ 原始测试数据文件:{dataset_test_json_file}")
if (not os.path.exists(dataset_train_json_file)
or not os.path.exists(dataset_test_json_file)
or not Config.DATASET_USE_CACHE):
print("🏷️ 正在生成原始数据集...")
mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file)
return dataset_train_json_file,dataset_test_json_file
def get_model_train_dataset_json_file():
model_train_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TRAIN_JSONL_NEW_FILE
model_test_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TEST_JSONL_NEW_FILE
return model_train_json_file, model_test_json_file
def get_model_train_dataset_json():
model_train_json_file,model_test_json_file = get_model_train_dataset_json_file()
print(f"🏷️ 模型训练数据文件:{model_train_json_file}")
print(f"🏷️ 模型测试数据文件:{model_test_json_file}")
if not os.path.exists(model_train_json_file) or not os.path.exists(
model_test_json_file) or not Config.DATASET_USE_CACHE:
print("🏷️ 未找到模型对应数据集,准备从原始数据集进行生成数据集...")
dataset_train_json_file,dataset_test_json_file = get_dataset_json()
print("🏷️ 原始数据集生成成功...")
print("🧼 开始转换模型训练数据集...")
dataset_jsonl_transfer(dataset_train_json_file, model_train_json_file)
print("🧼 开始转换模型验证数据集...")
dataset_jsonl_transfer(dataset_test_json_file, model_test_json_file)
return model_train_json_file,model_test_json_file
def load_train_and_eval_data():
#获取训练数据集json.
print("🧼 开始获取训练数据...")
model_train_json_file,model_test_json_file = get_model_train_dataset_json()
# 得到训练集
train_df = pd.read_json(model_train_json_file, lines=True)
train_ds = Dataset.from_pandas(train_df)
train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names)
# 得到验证集
eval_df = pd.read_json(model_test_json_file, lines=True)
eval_ds = Dataset.from_pandas(eval_df)
eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names)
return train_dataset,eval_dataset
def set_swanlab():
os.environ["SWANLAB_PROJECT"] = Config.SWANLAB_PROJECT
swanlab.config.update({
"model": "",
"prompt": Config.PROMPT,
"data_max_length": Config.DATA_MAX_LENGTH,
})
def process_func(example):
"""
将数据集进行预处理
"""
input_ids, attention_mask, labels = [], [], []
instruction = tokenizer(
f"<|im_start|>system\n{Config.PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n",
add_special_tokens=False,
)
response = tokenizer(f"{example['output']}", add_special_tokens=False)
input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id]
attention_mask = (
instruction["attention_mask"] + response["attention_mask"] + [1]
)
labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id]
if len(input_ids) > Config.DATA_MAX_LENGTH: # 做一个截断
input_ids = input_ids[:Config.DATA_MAX_LENGTH]
attention_mask = attention_mask[:Config.DATA_MAX_LENGTH]
labels = labels[:Config.DATA_MAX_LENGTH]
return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}
def set_trainer(train_dataset):
print("🧼 正在生成训练对象...")
print(f"🧼 {Dir.MODEL_DIR}")
args = TrainingArguments(
output_dir=Dir.MODEL_DIR,
per_device_train_batch_size=1,
per_device_eval_batch_size=1,
gradient_accumulation_steps=4,
eval_strategy="steps",
eval_steps=100,
logging_steps=10,
num_train_epochs=2,
save_steps=400,
learning_rate=1e-4,
save_on_each_node=True,
gradient_checkpointing=True,
report_to="swanlab",
run_name=Config.MODEL_NAME,
)
trainer = Trainer(
model=model,
args=args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True),
)
return trainer
if __name__ == "__main__":
#设置SWANLAB
set_swanlab()
#加载模型和分词器
model, tokenizer = load_model_and_tokenizer()
# 加载数据集和验证集
train_dataset, eval_dataset = load_train_and_eval_data()
model_train_json_file, model_test_json_file = get_model_train_dataset_json_file()
#设置训练对象
trainer = set_trainer(train_dataset)
print("🚬🚬🚬 开始模型训练...")
trainer.train()
print(" 🎇🎇🎇模型训练完成,...")
# 用测试集的前3条主观看模型
test_df = pd.read_json(model_train_json_file, lines=True)[:3]
test_text_list = []
for index, row in test_df.iterrows():
instruction = row['instruction']
input_value = row['input']
messages = [
{"role": "system", "content": f"{instruction}"},
{"role": "user", "content": f"{input_value}"}
]
response = predict(messages, model, tokenizer)
response_text = f"""
Question: {input_value}
LLM:{response}
"""
test_text_list.append(swanlab.Text(response_text))
print(response_text)
swanlab.log({"Prediction": test_text_list})
swanlab.finish()