专栏名称: 学姐带你玩AI
这里有人工智能前沿信息、算法技术交流、机器学习/深度学习经验分享、AI大赛解析、大厂大咖算法面试分享、人工智能论文技巧、AI环境工具库教程等……学姐带你玩转AI!
目录
相关文章推荐
安徽药品监管  ·  【安全用药】缺铁性贫血口服补铁剂的注意事项 ·  2 天前  
安徽药品监管  ·  【安全用药】缺铁性贫血口服补铁剂的注意事项 ·  2 天前  
TechWeb  ·  预售价超100万,尊界S800“尊”在哪? ·  2 天前  
凤凰网科技  ·  DeepSeek让腾讯阿里重新上桌了 ·  2 天前  
融媒吴江  ·  正大量上市,两男子吃进急诊! ·  3 天前  
51好读  ›  专栏  ›  学姐带你玩AI

Cross-Encoder实现文本匹配(重排序模型)

学姐带你玩AI  · 公众号  ·  · 2024-11-14 18:32

正文

来源:投稿  作者:175
编辑:学姐

引言

前面几篇文章都是基于表示型的方法训练BERT进行文本匹配,而本文是以交互型的方法。具体来说,将待匹配的两个句子拼接成一个输入喂给BERT模型,最后让其输出一个相似性得分。

本文完整代码 文末领取

文本匹配系列文章先更新到此,目前为止都是基于监督学习Sentence Pair的方式,后续有时间继续更新对比学习三元组(anchor, positive, negative)的方式和无监督学习的方式。

架构

Cross-Encoder会利用自注意力机制不断计算这两个句子之间的交互(注意力),最后接一个分类器输出一个分数(logits)代表相似度(可以经过sigmoid变成一个概率)。

实现

实现采用类似Huggingface的形式,每个文件夹下面有一种模型。分为 modeling arguments trainer 等不同的文件。不同的架构放置在不同的文件夹内。

modeling.py :

import torch
from torch import nn
import numpy as np

from tqdm import tqdm

from transformers import (
    AutoTokenizer,
    AutoConfig,
    AutoModelForSequenceClassification,
)
from torch.utils.data import DataLoader
from transformers.modeling_outputs import SequenceClassifierOutput
from transformers.tokenization_utils_base import BatchEncoding


import logging

logger = logging.getLogger(__name__)


class SentenceBert(nn.Module):
    def __init__(
        self,
        model_name: str,
        max_length: int = None,
        trust_remote_code: bool = True,
    ) -> None:
        super().__init__()
        self.config = AutoConfig.from_pretrained(
            model_name, trust_remote_code=trust_remote_code
        )
        self.config.num_labels = 1

        # reranker
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name, config=self.config, trust_remote_code=trust_remote_code
        )
        self.tokenizer = AutoTokenizer.from_pretrained(
            model_name, trust_remote_code=trust_remote_code
        )

        self.max_length = max_length

        self.loss_fct = nn.BCEWithLogitsLoss()

    def batching_collate(self, batch: list[tuple[str, str]]) -> BatchEncoding:
        texts = [[] for _ in range(len(batch[0]))]

        for example in batch:
            for idx, text in enumerate(example):
                texts[idx].append(text.strip())

        tokenized = self.tokenizer(
            *texts,
            padding=True,
            truncation="longest_first",
            return_tensors="pt",
            max_length=self.max_length
        ).to(self.model.device)

        return tokenized

    def predict(
        self,
        sentences: list[tuple[str, str]],
        batch_size: int = 64,
        convert_to_tensor: bool = True,
        show_progress_bar: bool = False,
    ):
        dataloader = DataLoader(
            sentences,
            batch_size=batch_size,
            collate_fn=self.batching_collate,
            shuffle=False,
        )

        preds = []

        for batch in tqdm(
            dataloader, disable=not show_progress_bar, desc="Running Inference"
        ):
            with torch.no_grad():
                logits = self.model(**batch).logits
                logits = torch.sigmoid(logits)

                preds.extend(logits)

        if convert_to_tensor:
            preds = torch.stack(preds)
        else:
            preds = np.asarray([pred.cpu().detach().float().numpy() for pred in preds])

        return preds

    def forward(self, inputs, labels=None):

        outputs = self.model(**inputs, return_dict=True)

        if labels is not None:
            labels = labels.float()

            logits = outputs.logits
            logits = logits.view(-1)

            loss = self.loss_fct(logits, labels)

            return SequenceClassifierOutput(loss, **outputs)

        return outputs

    def save_pretrained(self, output_dir: str) -> None:
        state_dict = self.model.state_dict()
        state_dict = type(state_dict)(
            {k: v.clone().cpu().contiguous() for k, v in state_dict.items()}
        )
        self.model.save_pretrained(output_dir, state_dict=state_dict)

整个模型的实现放到 modeling.py 文件中。

这里首先设置类别数为 1 num_labels = 1 ;然后通过 AutoModelForSequenceClassification 增加一个序列分类器头,该分类器头核心代码为:

class BertForSequenceClassification(BertPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.config = config
  # 实例化BERT模型
        self.bert = BertModel(config)
  
  # 增加一个线性层,从hidden_size映射为num_labels维度,这里是1
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
       
  # 先得到bert模型的输出
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
  # 实际上是cls 标记对应的表示
        pooled_output = outputs[1]
  # 得到一个一维的logits
        logits = self.classifier(pooled_output)

BERT模型中所谓的 pooled_output 实际上是:

class BertPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        # 从hidden_size空间映射到另一个hidden_size空间
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        # 经过tanh激活函数
        self.activation = nn.Tanh()

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        # 取最后一层隐藏状态第一个token: [cls]
        first_token_tensor = hidden_states[:, 0]
        pooled_output = self.dense(first_token_tensor)
        pooled_output = self.activation(pooled_output)
        return pooled_output

回到我们的 modeling.py ,训练时利用 forward 方法;推理时利用 predict 方法,支持批处理。输入是表示语句对的元组。

arguments.py :

from dataclasses import dataclass, field
from typing import Optional

import os


@dataclass
class ModelArguments:
    model_name_or_path: str = field(
        metadata={
            "help""Path to pretrained model or model identifier from huggingface"
        }
    )
    config_name: Optional[str] = field(
        default=None,
        metadata={
            "help""Pretrained config name or path if not the same as model_name"
        },
    )
    tokenizer_name: Optional[str] = field(
        default=None,
        metadata={
            "help""Pretrained tokenizer name or path if not the same as model_name"
        },
    )


@dataclass
class DataArguments:
    train_data_path: str = field(
        default=None, metadata={"help""Path to train corpus"}
    )
    eval_data_path: str = field(default=None, metadata={"help""Path to eval corpus"})
    max_length: int = field(
        default=512,
        metadata={
            "help""The maximum total input sequence length after tokenization for input text."
        },
    )

    def __post_init__(self):
        if not os.path.exists(self.train_data_path):
            raise FileNotFoundError(
                f"cannot find file: {self.train_data_path}, please set a true path"
            )

        if not os.path.exists(self.eval_data_path):
            raise FileNotFoundError(
                f"cannot find file: {self.eval_data_path}, please set a true path"
            )

定义了模型和数据相关参数。

dataset.py :

from torch.utils.data import Dataset
from transformers import PreTrainedTokenizer, DataCollatorWithPadding

from datasets import Dataset as dt

from typing import Any

from utils import build_dataframe_from_csv


class PairDataset(Dataset):
    def __init__(
        self, data_path: str, tokenizer: PreTrainedTokenizer, max_len: int
    ) -> None:

        df = build_dataframe_from_csv(data_path)
        self.dataset = dt.from_pandas(df, split="train")

        self.total_len = len(self.dataset)
        self.tokenizer = tokenizer

        self.max_len = max_len

    def __len__(self):
        return self.total_len

    def __getitem__(self, index) -> dict[str, Any]:
        query1 = self.dataset[index]["query1"]
        query2 = self.dataset[index]["query2"]
        label = self.dataset[index]["label"]

        encoding = self.tokenizer.encode_plus(
            query1,
            query2,
            truncation="only_second",
            max_length=self.max_len,
            padding=False,
        )

        encoding["label"] = label

        return encoding

数据集类考虑了LCQMC数据集的格式,即成对的语句和一个数值标签。类似:

Hello. Hi. 1
Nice to see you. Nice 0

这里数据集的处理和之前的有所不同,主要是调用 encode_plus 将文本对拼接在一起,并且仅阶段第二个文本。

这里没有进行padding,交给 DataCollatorWithPadding 来做。

trainer.py :

import torch
from transformers.trainer import Trainer

from typing import Optional
import os
import logging

TRAINING_ARGS_NAME = "training_args.bin"


from modeling import SentenceBert

logger = logging.getLogger(__name__)


class CrossTrainer(Trainer):

    def compute_loss(self, model: SentenceBert, inputs, return_outputs=False):
        labels = inputs.pop("labels")

        return model(inputs, labels)["loss"]

    def _save(self, output_dir: Optional[str] = None, state_dict=None):
        # If we are executing this function, we are the process zero, so we don't check for that.
        output_dir = output_dir if output_dir is not None else self.args.output_dir
        os.makedirs(output_dir, exist_ok=True)
        logger.info(f"Saving model checkpoint to {output_dir}")

        self.model.save_pretrained(output_dir)

        if self.tokenizer is not None:
            self.tokenizer.save_pretrained(output_dir)

        # Good practice: save your training arguments together with the trained model
        torch.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME))

继承🤗 Transformers的 Trainer 类,重写了 compute_loss _save 方法。

这样我们就可以利用🤗 Transformers来训练我们的模型了。

utils.py :

import torch
import pandas as pd
from scipy.stats import pearsonr, spearmanr
from typing import Tuple


def build_dataframe_from_csv(dataset_csv: str) -> pd.DataFrame:
    df = pd.read_csv(
        dataset_csv,
        sep="\t",
        header=None,
        names=["query1""query2""label"],
    )

    return df


def compute_spearmanr(x, y):
    return spearmanr(x, y).correlation


def compute_pearsonr(x, y):
    return pearsonr(x, y)[0]


def find_best_acc_and_threshold(scores, labels, high_score_more_similar: bool):
    """Copied from https://github.com/UKPLab/sentence-transformers/tree/master"""
    assert len(scores) == len(labels)
    rows = list(zip(scores, labels))

    rows = sorted(rows, key=lambda x: x[0], reverse=high_score_more_similar)

    max_acc = 0
    best_threshold = -1
    # positive examples number so far
    positive_so_far = 0
    # remain negative examples
    remaining_negatives = sum(labels == 0)

    for i in range(len(rows) - 1):
        score, label = rows[i]
        if label == 1:
            positive_so_far += 1
        else:
            remaining_negatives -= 1

        acc = (positive_so_far + remaining_negatives) / len(labels)
        if acc > max_acc:
            max_acc = acc
            best_threshold = (rows[i][0] + rows[i + 1][0]) / 2

    return max_acc, best_threshold


def metrics(y: torch.Tensor, y_pred: torch.Tensor) -> Tuple[floatfloatfloatfloat]:
    TP = ((y_pred == 1) & (y == 1)).sum().float()  # True Positive
    TN = ((y_pred == 0) & (y == 0)).sum().float()  # True Negative
    FN = ((y_pred == 0) & (y == 1)).sum().float()  # False Negatvie
    FP = ((y_pred == 1) & (y == 0)).sum().float()  # False Positive
    p = TP / (TP + FP).clamp(min=1e-8)  # Precision
    r = TP / (TP + FN).clamp(min=1e-8)  # Recall
    F1 = 2 * r * p / (r + p).clamp(min=1e-8)  # F1 score
    acc = (TP + TN) / (TP + TN + FP + FN).clamp(min=1e-8)  # Accurary
    return acc, p, r, F1


def compute_metrics(predicts, labels):
    return metrics(labels, predicts)

定义了一些帮助函数,从sentence-transformers库中拷贝了寻找最佳准确率阈值的实现 find_best_acc_and_threshold

除了准确率,还计算了句嵌入的余弦相似度与真实标签之间的斯皮尔曼等级相关系数指标。

最后定义训练和测试脚本。

train.py :

from transformers import (
    set_seed,
    HfArgumentParser,
    TrainingArguments,
    DataCollatorWithPadding,
)


import logging
import os
from pathlib import Path

from datetime import datetime

from modeling import SentenceBert
from trainer import CrossTrainer
from arguments import DataArguments, ModelArguments
from dataset import PairDataset

logger = logging.getLogger(__name__)
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
    datefmt="%m/%d/%Y %H:%M:%S",
    level=logging.INFO,
)


def main():
    parser = HfArgumentParser((TrainingArguments, DataArguments, ModelArguments))
    training_args, data_args, model_args = parser.parse_args_into_dataclasses()

    output_dir = f"{training_args.output_dir}/{model_args.model_name_or_path.replace('/', '-')}-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}"
    training_args.output_dir = output_dir

    logger.info(f"Training parameters {training_args}")
    logger.info(f"Data parameters {data_args}")
    logger.info(f"Model parameters {model_args}")

    set_seed(training_args.seed)

    model = SentenceBert(
        model_args.model_name_or_path,
        max_length=data_args.max_length,
        trust_remote_code=True,
    )

    tokenizer = model.tokenizer

    train_dataset = PairDataset(
        data_args.train_data_path,
        tokenizer,
        data_args.max_length,
    )

    eval_dataset = PairDataset(
        data_args.eval_data_path,
        tokenizer,
        data_args.max_length,
    )

    trainer = CrossTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        data_collator=DataCollatorWithPadding(tokenizer),
        tokenizer=tokenizer,
    )
    Path(training_args.output_dir).mkdir(parents=True, exist_ok=True)

    trainer.train()
    trainer.save_model()


if __name__ == "__main__":
    main()

训练

基于 train.py 定义了 train.sh 传入相关参数:

timestamp=$(date +%Y%m%d%H%M)
logfile="train_${timestamp}.log"

# change CUDA_VISIBLE_DEVICES
CUDA_VISIBLE_DEVICES=1 nohup python train.py \
    --model_name_or_path=hfl/chinese-macbert-large \
    --output_dir=output \
    --train_data_path=data/train.txt \
    --eval_data_path=data/dev.txt \
    --num_train_epochs=3 \
    --save_total_limit=5 \
    --learning_rate=2e-5 \
    --weight_decay=0.01 \
    --warmup_ratio=0.01 \
    --bf16=True \
    --save_strategy=epoch \
    --per_device_train_batch_size=64 \
    --report_to="none" \
    --remove_unused_columns=False \
    --max_length=128 \
    > "$logfile" 2>&1 &

以上参数根据个人环境修改,这里使用的是哈工大的 chinese-macbert-large 预训练模型。

注意:

  • 通过bf16=True可以加速训练同时不影响效果,不支持可以尝试fp16;
  • 其他参数可以自己调整。
100%|██████████| 18655/18655 [1:15:47<00:00,  5.06it/s]
100%|██████████| 18655/18655 [1:15:47<00:00,  4.10it/s]
{'loss': 0.0464, 'grad_norm': 4.171152591705322, 'learning_rate': 1.6785791639592811e-07, 'epoch': 4.96}
{'train_runtime': 4547.2543, 'train_samples_per_second': 262.539, 'train_steps_per_second': 4.102, 'train_loss': 0.11396670312096753, 'epoch': 5.0}

这里训练了5轮,为了测试效果,但发现实际上3轮的结果还好一些,因此最终拿它来测试。







请到「今天看啥」查看全文