282 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			282 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
import pandas as pd
 | 
						|
import torch
 | 
						|
from datasets import Dataset
 | 
						|
from modelscope import snapshot_download, AutoTokenizer
 | 
						|
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
 | 
						|
import os
 | 
						|
import swanlab
 | 
						|
from modelscope.msdatasets import MsDataset
 | 
						|
import json
 | 
						|
import random
 | 
						|
from config import Config,Default,Dir
 | 
						|
 | 
						|
class Config:
 | 
						|
    # 基座模型
 | 
						|
    MODEL_NAME = "Qwen/Qwen3-0.6B"
 | 
						|
    # 数据集
 | 
						|
    DATASET_NAME = "krisfu/delicate_medical_r1_data"
 | 
						|
    # 数据集主题(子数据集)
 | 
						|
    DATASET_SUBJECT = "default"
 | 
						|
    # 数据集用途
 | 
						|
    DATASET_SPLIT = "train"
 | 
						|
    # 是否使用缓存
 | 
						|
    DATASET_USE_CACHE = True
 | 
						|
    # swanlab项目名称
 | 
						|
    SWANLAB_PROJECT = "qweb3-sft-medical-10-11-1"
 | 
						|
    # 验证用system的提示词
 | 
						|
    PROMPT = "你是一个医学专家,你需要根据用户的问题,给出带有思考的回答。"
 | 
						|
    # 数据集question列名称
 | 
						|
    QUES_LABEL = "question"
 | 
						|
    # 数据集think列名称(可为空)
 | 
						|
    THINK_LABEL = "think"
 | 
						|
    # 数据集answer列名称
 | 
						|
    ANS_LABEL = "answer"
 | 
						|
    DATA_MAX_LENGTH = 2048
 | 
						|
 | 
						|
 | 
						|
class Default:
 | 
						|
    DATASET_PATH = os.getenv("DATASET_PATH", "./dataset")  # 支持环境变量覆
 | 
						|
    MODEL_DATASET_PATH = os.getenv("MODEL_DATASET_PATH", "./model_dataset")  # 支持环境变量覆
 | 
						|
    SAVE_DIR = "./saved_model"  # 微调后模型存储位置
 | 
						|
    TRAIN_DATASET_FILE = "train.jsonl"
 | 
						|
    TEST_DATASET_FILE = "val.jsonl"
 | 
						|
    TRAIN_JSONL_NEW_FILE = "train_format.jsonl"
 | 
						|
    TEST_JSONL_NEW_FILE = "val_format.jsonl"
 | 
						|
 | 
						|
 | 
						|
dataset_short_name = Config.DATASET_NAME.split("/")[-1]
 | 
						|
model_dataset_short_name = Config.MODEL_NAME.split("/")[-1]
 | 
						|
# 确保缓存目录存在
 | 
						|
dataset_dir = os.path.normpath(
 | 
						|
        os.path.join(Default.DATASET_PATH, dataset_short_name, Config.DATASET_SUBJECT, Config.DATASET_SPLIT)
 | 
						|
    )
 | 
						|
model_dataset_DIR = os.path.normpath(
 | 
						|
        os.path.join(Default.MODEL_DATASET_PATH, model_dataset_short_name, Config.DATASET_SUBJECT, Config.DATASET_SPLIT)
 | 
						|
    )
 | 
						|
model_dir = os.path.normpath(
 | 
						|
        os.path.join(Default.SAVE_DIR,model_dataset_short_name, dataset_short_name, Config.DATASET_SUBJECT, Config.DATASET_SPLIT)
 | 
						|
    )
 | 
						|
os.makedirs(dataset_dir, exist_ok=True)
 | 
						|
os.makedirs(model_dataset_DIR, exist_ok=True)
 | 
						|
os.makedirs(model_dir, exist_ok=True)
 | 
						|
 | 
						|
 | 
						|
class Dir:
 | 
						|
    DATASET_DIR = dataset_dir
 | 
						|
    MODEL_DIR = model_dir
 | 
						|
    MODEL_DATASET_DIR = model_dataset_DIR
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
def dataset_jsonl_transfer(origin_path, new_path):
 | 
						|
    """
 | 
						|
    将原始数据集转换为大模型微调所需数据格式的新数据集
 | 
						|
    """
 | 
						|
    messages = []
 | 
						|
    # 读取旧的JSONL文件
 | 
						|
    with open(origin_path, "r") as file:
 | 
						|
        for line in file:
 | 
						|
            # 解析每一行的json数据
 | 
						|
            data = json.loads(line)
 | 
						|
            input = data[Config.QUES_LABEL]
 | 
						|
            output = ""
 | 
						|
            if hasattr(Config, "THINK_LABEL") and Config.THINK_LABEL in data:
 | 
						|
                output = data[Config.THINK_LABEL]
 | 
						|
            if hasattr(Config, "ANS_LABEL") and Config.ANS_LABEL in data:
 | 
						|
                output += f"{data[Config.ANS_LABEL]}"
 | 
						|
 | 
						|
            message = {
 | 
						|
                "instruction": Config.PROMPT,
 | 
						|
                "input": f"{input}",
 | 
						|
                "output": output,
 | 
						|
            }
 | 
						|
            messages.append(message)
 | 
						|
    # 保存重构后的JSONL文件
 | 
						|
    with open(new_path, "w", encoding="utf-8") as file:
 | 
						|
        for message in messages:
 | 
						|
            file.write(json.dumps(message, ensure_ascii=False) + "\n")
 | 
						|
 | 
						|
 | 
						|
def predict(messages, model, tokenizer):
 | 
						|
    device = "cuda"
 | 
						|
    text = tokenizer.apply_chat_template(
 | 
						|
        messages,
 | 
						|
        tokenize=False,
 | 
						|
        add_generation_prompt=True
 | 
						|
    )
 | 
						|
    model_inputs = tokenizer([text], return_tensors="pt").to(device)
 | 
						|
    generated_ids = model.generate(
 | 
						|
        model_inputs.input_ids,
 | 
						|
        max_new_tokens=Config.DATA_MAX_LENGTH,
 | 
						|
    )
 | 
						|
    generated_ids = [
 | 
						|
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
 | 
						|
    ]
 | 
						|
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
 | 
						|
    return response
 | 
						|
 | 
						|
 | 
						|
def load_model_and_tokenizer():
 | 
						|
    print("🧼 开始加载模型...")
 | 
						|
    # 在modelscope上下载Qwen模型到本地目录下
 | 
						|
    model_dir = snapshot_download(Config.MODEL_NAME)
 | 
						|
    # Transformers加载模型权重
 | 
						|
    tokenizer = AutoTokenizer.from_pretrained(Config.MODEL_NAME, use_fast=False,
 | 
						|
                                              trust_remote_code=True)
 | 
						|
    model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto",
 | 
						|
                                                 dtype=torch.bfloat16)
 | 
						|
    model.enable_input_require_grads()  # 开启梯度检查点时,要执行该方法
 | 
						|
    return model,tokenizer
 | 
						|
 | 
						|
 | 
						|
def mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file):
 | 
						|
    data_list = list(ds)
 | 
						|
    random.shuffle(data_list)
 | 
						|
    split_idx = int(len(data_list) * 0.9)
 | 
						|
    train_data = data_list[:split_idx]
 | 
						|
    val_data = data_list[split_idx:]
 | 
						|
    with open(dataset_train_json_file, 'w', encoding='utf-8') as f:
 | 
						|
        for item in train_data:
 | 
						|
            json.dump(item, f, ensure_ascii=False)
 | 
						|
            f.write('\n')
 | 
						|
    with open(dataset_test_json_file, 'w', encoding='utf-8') as f:
 | 
						|
        for item in val_data:
 | 
						|
            json.dump(item, f, ensure_ascii=False)
 | 
						|
            f.write('\n')
 | 
						|
    print(f"✅ 原始数据清洗完成 | 训练样本量: {len(train_data)} | 测试样本量: {len(train_data)}")
 | 
						|
 | 
						|
 | 
						|
 | 
						|
def get_dataset_json():
 | 
						|
    # 确保缓存目录存在
 | 
						|
    os.makedirs(Dir.DATASET_DIR, exist_ok=True)
 | 
						|
    ds = MsDataset.load(
 | 
						|
        dataset_name=Config.DATASET_NAME,
 | 
						|
        subset_name=Config.DATASET_SUBJECT,
 | 
						|
        split=Config.DATASET_SPLIT,
 | 
						|
        cache_dir=Dir.DATASET_DIR  # 统一使用配置的路径
 | 
						|
    )
 | 
						|
    dataset_train_json_file = Dir.DATASET_DIR+"/"+Default.TRAIN_DATASET_FILE
 | 
						|
    dataset_test_json_file = Dir.DATASET_DIR + "/" + Default.TEST_DATASET_FILE
 | 
						|
    print(f"🏷️ 原始训练数据文件:{dataset_train_json_file}")
 | 
						|
    print(f"🏷️ 原始测试数据文件:{dataset_test_json_file}")
 | 
						|
    if (not os.path.exists(dataset_train_json_file)
 | 
						|
            or not os.path.exists(dataset_test_json_file)
 | 
						|
            or not Config.DATASET_USE_CACHE):
 | 
						|
        print("🏷️ 正在生成原始数据集...")
 | 
						|
        mk_new_dataset_json(ds,dataset_train_json_file,dataset_test_json_file)
 | 
						|
    return dataset_train_json_file,dataset_test_json_file
 | 
						|
 | 
						|
def get_model_train_dataset_json_file():
 | 
						|
    model_train_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TRAIN_JSONL_NEW_FILE
 | 
						|
    model_test_json_file = Dir.MODEL_DATASET_DIR + "/" + Default.TEST_JSONL_NEW_FILE
 | 
						|
    return model_train_json_file, model_test_json_file
 | 
						|
def get_model_train_dataset_json():
 | 
						|
    model_train_json_file,model_test_json_file = get_model_train_dataset_json_file()
 | 
						|
    print(f"🏷️ 模型训练数据文件:{model_train_json_file}")
 | 
						|
    print(f"🏷️ 模型测试数据文件:{model_test_json_file}")
 | 
						|
    if not os.path.exists(model_train_json_file) or not os.path.exists(
 | 
						|
            model_test_json_file) or not Config.DATASET_USE_CACHE:
 | 
						|
        print("🏷️ 未找到模型对应数据集,准备从原始数据集进行生成数据集...")
 | 
						|
        dataset_train_json_file,dataset_test_json_file = get_dataset_json()
 | 
						|
        print("🏷️ 原始数据集生成成功...")
 | 
						|
        print("🧼 开始转换模型训练数据集...")
 | 
						|
        dataset_jsonl_transfer(dataset_train_json_file, model_train_json_file)
 | 
						|
        print("🧼 开始转换模型验证数据集...")
 | 
						|
        dataset_jsonl_transfer(dataset_test_json_file, model_test_json_file)
 | 
						|
    return model_train_json_file,model_test_json_file
 | 
						|
 | 
						|
 | 
						|
def load_train_and_eval_data():
 | 
						|
    #获取训练数据集json.
 | 
						|
    print("🧼 开始获取训练数据...")
 | 
						|
    model_train_json_file,model_test_json_file = get_model_train_dataset_json()
 | 
						|
    # 得到训练集
 | 
						|
    train_df = pd.read_json(model_train_json_file, lines=True)
 | 
						|
    train_ds = Dataset.from_pandas(train_df)
 | 
						|
    train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names)
 | 
						|
    # 得到验证集
 | 
						|
    eval_df = pd.read_json(model_test_json_file, lines=True)
 | 
						|
    eval_ds = Dataset.from_pandas(eval_df)
 | 
						|
    eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names)
 | 
						|
    return train_dataset,eval_dataset
 | 
						|
 | 
						|
 | 
						|
def set_swanlab():
 | 
						|
    os.environ["SWANLAB_PROJECT"] = Config.SWANLAB_PROJECT
 | 
						|
    swanlab.config.update({
 | 
						|
        "model": "",
 | 
						|
        "prompt": Config.PROMPT,
 | 
						|
        "data_max_length": Config.DATA_MAX_LENGTH,
 | 
						|
    })
 | 
						|
 | 
						|
 | 
						|
def process_func(example):
 | 
						|
    """
 | 
						|
    将数据集进行预处理
 | 
						|
    """
 | 
						|
    input_ids, attention_mask, labels = [], [], []
 | 
						|
    instruction = tokenizer(
 | 
						|
        f"<|im_start|>system\n{Config.PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n",
 | 
						|
        add_special_tokens=False,
 | 
						|
    )
 | 
						|
    response = tokenizer(f"{example['output']}", add_special_tokens=False)
 | 
						|
    input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id]
 | 
						|
    attention_mask = (
 | 
						|
        instruction["attention_mask"] + response["attention_mask"] + [1]
 | 
						|
    )
 | 
						|
    labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id]
 | 
						|
    if len(input_ids) > Config.DATA_MAX_LENGTH:  # 做一个截断
 | 
						|
        input_ids = input_ids[:Config.DATA_MAX_LENGTH]
 | 
						|
        attention_mask = attention_mask[:Config.DATA_MAX_LENGTH]
 | 
						|
        labels = labels[:Config.DATA_MAX_LENGTH]
 | 
						|
    return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}
 | 
						|
 | 
						|
 | 
						|
def set_trainer():
 | 
						|
    print("🧼 正在生成训练对象...")
 | 
						|
    print(f"🧼 {Dir.MODEL_DIR}")
 | 
						|
    args = TrainingArguments(
 | 
						|
        output_dir=Dir.MODEL_DIR,
 | 
						|
        per_device_train_batch_size=1,
 | 
						|
        per_device_eval_batch_size=1,
 | 
						|
        gradient_accumulation_steps=4,
 | 
						|
        eval_strategy="steps",
 | 
						|
        eval_steps=100,
 | 
						|
        logging_steps=10,
 | 
						|
        num_train_epochs=2,
 | 
						|
        save_steps=400,
 | 
						|
        learning_rate=1e-4,
 | 
						|
        save_on_each_node=True,
 | 
						|
        gradient_checkpointing=True,
 | 
						|
        report_to="swanlab",
 | 
						|
        run_name=Config.MODEL_NAME,
 | 
						|
        load_best_model_at_end=True,
 | 
						|
    )
 | 
						|
 | 
						|
    trainer = Trainer(
 | 
						|
        model=model,
 | 
						|
        args=args,
 | 
						|
        train_dataset=train_dataset,
 | 
						|
        eval_dataset=eval_dataset,
 | 
						|
        data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True),
 | 
						|
    )
 | 
						|
    return trainer
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    #释放显存
 | 
						|
    torch.cuda.empty_cache()
 | 
						|
    #设置SWANLAB
 | 
						|
    set_swanlab()
 | 
						|
    #加载模型和分词器
 | 
						|
    model, tokenizer = load_model_and_tokenizer()
 | 
						|
    # 加载数据集和验证集
 | 
						|
    train_dataset, eval_dataset = load_train_and_eval_data()
 | 
						|
    model_train_json_file, model_test_json_file = get_model_train_dataset_json_file()
 | 
						|
    #设置训练对象
 | 
						|
    trainer = set_trainer()
 | 
						|
    print("🚬🚬🚬 开始模型训练...")
 | 
						|
    trainer.train()
 | 
						|
    print(" 🎇🎇🎇模型训练完成,...") |