237 lines
9.1 KiB
Python
237 lines
9.1 KiB
Python
import pandas as pd
|
||
import torch
|
||
from datasets import Dataset
|
||
from modelscope import snapshot_download, AutoTokenizer
|
||
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
|
||
import os
|
||
import swanlab
|
||
from modelscope.msdatasets import MsDataset
|
||
import json
|
||
import random
|
||
from config import Config,Default,Dir
|
||
|
||
|
||
|
||
|
||
def dataset_jsonl_transfer(origin_path, new_path):
|
||
"""
|
||
将原始数据集转换为大模型微调所需数据格式的新数据集
|
||
"""
|
||
messages = []
|
||
# 读取旧的JSONL文件
|
||
with open(origin_path, "r") as file:
|
||
for line in file:
|
||
# 解析每一行的json数据
|
||
data = json.loads(line)
|
||
input = data["question"]
|
||
output = f"<think>{data['think']}</think> \n {data['answer']}"
|
||
message = {
|
||
"instruction": Config.PROMPT,
|
||
"input": f"{input}",
|
||
"output": output,
|
||
}
|
||
messages.append(message)
|
||
# 保存重构后的JSONL文件
|
||
with open(new_path, "w", encoding="utf-8") as file:
|
||
for message in messages:
|
||
file.write(json.dumps(message, ensure_ascii=False) + "\n")
|
||
|
||
|
||
def predict(messages, model, tokenizer):
|
||
device = "cuda"
|
||
text = tokenizer.apply_chat_template(
|
||
messages,
|
||
tokenize=False,
|
||
add_generation_prompt=True
|
||
)
|
||
model_inputs = tokenizer([text], return_tensors="pt").to(device)
|
||
generated_ids = model.generate(
|
||
model_inputs.input_ids,
|
||
max_new_tokens=Config.DATA_MAX_LENGTH,
|
||
)
|
||
generated_ids = [
|
||
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
|
||
]
|
||
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
|
||
return response
|
||
|
||
|
||
def load_model_and_tokenizer():
|
||
print("🧼 开始加载模型...")
|
||
# 在modelscope上下载Qwen模型到本地目录下
|
||
model_dir = snapshot_download(Config.MODEL_NAME)
|
||
# Transformers加载模型权重
|
||
tokenizer = AutoTokenizer.from_pretrained(Config.MODEL_NAME, use_fast=False,
|
||
trust_remote_code=True)
|
||
model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto",
|
||
dtype=torch.bfloat16)
|
||
model.enable_input_require_grads() # 开启梯度检查点时,要执行该方法
|
||
return model,tokenizer
|
||
|
||
|
||
def mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file):
|
||
data_list = list(ds)
|
||
random.shuffle(data_list)
|
||
split_idx = int(len(data_list) * 0.9)
|
||
train_data = data_list[:split_idx]
|
||
val_data = data_list[split_idx:]
|
||
with open(dataset_train_json_file, 'w', encoding='utf-8') as f:
|
||
for item in train_data:
|
||
json.dump(item, f, ensure_ascii=False)
|
||
f.write('\n')
|
||
with open(dataset_test_json_file, 'w', encoding='utf-8') as f:
|
||
for item in val_data:
|
||
json.dump(item, f, ensure_ascii=False)
|
||
f.write('\n')
|
||
print(f"✅ 原始数据清洗完成 | 训练样本量: {len(train_data)} | 测试样本量: {len(train_data)}")
|
||
|
||
|
||
|
||
def get_dataset_json():
|
||
# 确保缓存目录存在
|
||
os.makedirs(Dir.DATASET_DIR, exist_ok=True)
|
||
ds = MsDataset.load(
|
||
dataset_name=Config.DATASET_NAME,
|
||
subset_name=Config.DATASET_SUBJECT,
|
||
split=Config.DATASET_SPLIT,
|
||
cache_dir=Dir.DATASET_DIR # 统一使用配置的路径
|
||
)
|
||
dataset_train_json_file = Dir.DATASET_DIR+"/"+Default.TRAIN_DATASET_FILE
|
||
dataset_test_json_file = Dir.DATASET_DIR + "/" + Default.TEST_DATASET_FILE
|
||
print(f"🏷️ 原始训练数据文件:{dataset_train_json_file}")
|
||
print(f"🏷️ 原始测试数据文件:{dataset_test_json_file}")
|
||
if (not os.path.exists(dataset_train_json_file)
|
||
or not os.path.exists(dataset_test_json_file)
|
||
or not Config.DATASET_USE_CACHE):
|
||
print("🏷️ 正在生成原始数据集...")
|
||
mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file)
|
||
return dataset_train_json_file,dataset_test_json_file
|
||
|
||
def get_model_train_dataset_json_file():
|
||
model_train_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TRAIN_JSONL_NEW_FILE
|
||
model_test_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TEST_JSONL_NEW_FILE
|
||
return model_train_json_file, model_test_json_file
|
||
def get_model_train_dataset_json():
|
||
model_train_json_file,model_test_json_file = get_model_train_dataset_json_file()
|
||
print(f"🏷️ 模型训练数据文件:{model_train_json_file}")
|
||
print(f"🏷️ 模型测试数据文件:{model_test_json_file}")
|
||
if not os.path.exists(model_train_json_file) or not os.path.exists(
|
||
model_test_json_file) or not Config.DATASET_USE_CACHE:
|
||
print("🏷️ 未找到模型对应数据集,准备从原始数据集进行生成数据集...")
|
||
dataset_train_json_file,dataset_test_json_file = get_dataset_json()
|
||
print("🏷️ 原始数据集生成成功...")
|
||
print("🧼 开始转换模型训练数据集...")
|
||
dataset_jsonl_transfer(dataset_train_json_file, model_train_json_file)
|
||
print("🧼 开始转换模型验证数据集...")
|
||
dataset_jsonl_transfer(dataset_test_json_file, model_test_json_file)
|
||
return model_train_json_file,model_test_json_file
|
||
|
||
|
||
def load_train_and_eval_data():
|
||
#获取训练数据集json.
|
||
print("🧼 开始获取训练数据...")
|
||
model_train_json_file,model_test_json_file = get_model_train_dataset_json()
|
||
# 得到训练集
|
||
train_df = pd.read_json(model_train_json_file, lines=True)
|
||
train_ds = Dataset.from_pandas(train_df)
|
||
train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names)
|
||
# 得到验证集
|
||
eval_df = pd.read_json(model_test_json_file, lines=True)
|
||
eval_ds = Dataset.from_pandas(eval_df)
|
||
eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names)
|
||
return train_dataset,eval_dataset
|
||
|
||
|
||
def set_swanlab():
|
||
os.environ["SWANLAB_PROJECT"] = Config.SWANLAB_PROJECT
|
||
swanlab.config.update({
|
||
"model": "",
|
||
"prompt": Config.PROMPT,
|
||
"data_max_length": Config.DATA_MAX_LENGTH,
|
||
})
|
||
|
||
|
||
def process_func(example):
|
||
"""
|
||
将数据集进行预处理
|
||
"""
|
||
input_ids, attention_mask, labels = [], [], []
|
||
instruction = tokenizer(
|
||
f"<|im_start|>system\n{Config.PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n",
|
||
add_special_tokens=False,
|
||
)
|
||
response = tokenizer(f"{example['output']}", add_special_tokens=False)
|
||
input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id]
|
||
attention_mask = (
|
||
instruction["attention_mask"] + response["attention_mask"] + [1]
|
||
)
|
||
labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id]
|
||
if len(input_ids) > Config.DATA_MAX_LENGTH: # 做一个截断
|
||
input_ids = input_ids[:Config.DATA_MAX_LENGTH]
|
||
attention_mask = attention_mask[:Config.DATA_MAX_LENGTH]
|
||
labels = labels[:Config.DATA_MAX_LENGTH]
|
||
return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}
|
||
|
||
|
||
def set_trainer(train_dataset):
|
||
print("🧼 正在生成训练对象...")
|
||
print(f"🧼 {Dir.MODEL_DIR}")
|
||
args = TrainingArguments(
|
||
output_dir=Dir.MODEL_DIR,
|
||
per_device_train_batch_size=1,
|
||
per_device_eval_batch_size=1,
|
||
gradient_accumulation_steps=4,
|
||
eval_strategy="steps",
|
||
eval_steps=100,
|
||
logging_steps=10,
|
||
num_train_epochs=2,
|
||
save_steps=400,
|
||
learning_rate=1e-4,
|
||
save_on_each_node=True,
|
||
gradient_checkpointing=True,
|
||
report_to="swanlab",
|
||
run_name=Config.MODEL_NAME,
|
||
)
|
||
|
||
trainer = Trainer(
|
||
model=model,
|
||
args=args,
|
||
train_dataset=train_dataset,
|
||
eval_dataset=eval_dataset,
|
||
data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True),
|
||
)
|
||
return trainer
|
||
|
||
|
||
if __name__ == "__main__":
|
||
#设置SWANLAB
|
||
set_swanlab()
|
||
#加载模型和分词器
|
||
model, tokenizer = load_model_and_tokenizer()
|
||
# 加载数据集和验证集
|
||
train_dataset, eval_dataset = load_train_and_eval_data()
|
||
model_train_json_file, model_test_json_file = get_model_train_dataset_json_file()
|
||
#设置训练对象
|
||
trainer = set_trainer(train_dataset)
|
||
print("🚬🚬🚬 开始模型训练...")
|
||
trainer.train()
|
||
print(" 🎇🎇🎇模型训练完成,...")
|
||
# 用测试集的前3条,主观看模型
|
||
test_df = pd.read_json(model_train_json_file, lines=True)[:3]
|
||
test_text_list = []
|
||
for index, row in test_df.iterrows():
|
||
instruction = row['instruction']
|
||
input_value = row['input']
|
||
messages = [
|
||
{"role": "system", "content": f"{instruction}"},
|
||
{"role": "user", "content": f"{input_value}"}
|
||
]
|
||
response = predict(messages, model, tokenizer)
|
||
response_text = f"""
|
||
Question: {input_value}
|
||
LLM:{response}
|
||
"""
|
||
test_text_list.append(swanlab.Text(response_text))
|
||
print(response_text)
|
||
swanlab.log({"Prediction": test_text_list})
|
||
swanlab.finish() |