Transformers documentation

如何创建自定义流水线?

You are viewing main version, which requires installation from source. If you'd like regular pip install, checkout the latest stable version (v4.47.1).
Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

如何创建自定义流水线?

在本指南中,我们将演示如何创建一个自定义流水线并分享到 Hub,或将其添加到 🤗 Transformers 库中。

首先,你需要决定流水线将能够接受的原始条目。它可以是字符串、原始字节、字典或任何看起来最可能是期望的输入。 尽量保持输入为纯 Python 语言,因为这样可以更容易地实现兼容性(甚至通过 JSON 在其他语言之间)。 这些将是流水线 (preprocess) 的 inputs

然后定义 outputs。与 inputs 相同的策略。越简单越好。这些将是 postprocess 方法的输出。

首先继承基类 Pipeline,其中包含实现 preprocess_forwardpostprocess_sanitize_parameters 所需的 4 个方法。

from transformers import Pipeline


class MyPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "maybe_arg" in kwargs:
            preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, inputs, maybe_arg=2):
        model_input = Tensor(inputs["input_ids"])
        return {"model_input": model_input}

    def _forward(self, model_inputs):
        # model_inputs == {"model_input": model_input}
        outputs = self.model(**model_inputs)
        # Maybe {"logits": Tensor(...)}
        return outputs

    def postprocess(self, model_outputs):
        best_class = model_outputs["logits"].softmax(-1)
        return best_class

这种分解的结构旨在为 CPU/GPU 提供相对无缝的支持,同时支持在不同线程上对 CPU 进行预处理/后处理。

preprocess 将接受最初定义的输入,并将其转换为可供模型输入的内容。它可能包含更多信息,通常是一个 Dict

_forward 是实现细节,不应直接调用。forward 是首选的调用方法,因为它包含保障措施,以确保一切都在预期的设备上运作。 如果任何内容与实际模型相关,它应该属于 _forward 方法,其他内容应该在 preprocess/postprocess 中。

postprocess 方法将接受 _forward 的输出,并将其转换为之前确定的最终输出。

_sanitize_parameters 存在是为了允许用户在任何时候传递任何参数,无论是在初始化时 pipeline(...., maybe_arg=4) 还是在调用时 pipe = pipeline(...); output = pipe(...., maybe_arg=4)

_sanitize_parameters 的返回值是将直接传递给 preprocess_forwardpostprocess 的 3 个关键字参数字典。 如果调用方没有使用任何额外参数调用,则不要填写任何内容。这样可以保留函数定义中的默认参数,这总是更”自然”的。

在分类任务中,一个经典的例子是在后处理中使用 top_k 参数。

>>> pipe = pipeline("my-new-task")
>>> pipe("This is a test")
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}, {"label": "3-star", "score": 0.05}
{"label": "4-star", "score": 0.025}, {"label": "5-star", "score": 0.025}]

>>> pipe("This is a test", top_k=2)
[{"label": "1-star", "score": 0.8}, {"label": "2-star", "score": 0.1}]

为了实现这一点,我们将更新我们的 postprocess 方法,将默认参数设置为 5, 并编辑 _sanitize_parameters 方法,以允许这个新参数。

def postprocess(self, model_outputs, top_k=5):
    best_class = model_outputs["logits"].softmax(-1)
    # Add logic to handle top_k
    return best_class


def _sanitize_parameters(self, **kwargs):
    preprocess_kwargs = {}
    if "maybe_arg" in kwargs:
        preprocess_kwargs["maybe_arg"] = kwargs["maybe_arg"]

    postprocess_kwargs = {}
    if "top_k" in kwargs:
        postprocess_kwargs["top_k"] = kwargs["top_k"]
    return preprocess_kwargs, {}, postprocess_kwargs

尽量保持简单输入/输出,最好是可 JSON 序列化的,因为这样可以使流水线的使用非常简单,而不需要用户了解新的对象类型。 通常也相对常见地支持许多不同类型的参数以便使用(例如音频文件,可以是文件名、URL 或纯字节)。

将其添加到支持的任务列表中

要将你的 new-task 注册到支持的任务列表中,你需要将其添加到 PIPELINE_REGISTRY 中:

from transformers.pipelines import PIPELINE_REGISTRY

PIPELINE_REGISTRY.register_pipeline(
    "new-task",
    pipeline_class=MyPipeline,
    pt_model=AutoModelForSequenceClassification,
)

如果需要,你可以指定一个默认模型,此时它应该带有一个特定的修订版本(可以是分支名称或提交哈希,这里我们使用了 "abcdef"),以及类型:

PIPELINE_REGISTRY.register_pipeline(
    "new-task",
    pipeline_class=MyPipeline,
    pt_model=AutoModelForSequenceClassification,
    default={"pt": ("user/awesome_model", "abcdef")},
    type="text",  # current support type: text, audio, image, multimodal
)

在 Hub 上分享你的流水线

要在 Hub 上分享你的自定义流水线,你只需要将 Pipeline 子类的自定义代码保存在一个 Python 文件中。 例如,假设我们想使用一个自定义流水线进行句对分类,如下所示:

import numpy as np

from transformers import Pipeline


def softmax(outputs):
    maxes = np.max(outputs, axis=-1, keepdims=True)
    shifted_exp = np.exp(outputs - maxes)
    return shifted_exp / shifted_exp.sum(axis=-1, keepdims=True)


class PairClassificationPipeline(Pipeline):
    def _sanitize_parameters(self, **kwargs):
        preprocess_kwargs = {}
        if "second_text" in kwargs:
            preprocess_kwargs["second_text"] = kwargs["second_text"]
        return preprocess_kwargs, {}, {}

    def preprocess(self, text, second_text=None):
        return self.tokenizer(text, text_pair=second_text, return_tensors=self.framework)

    def _forward(self, model_inputs):
        return self.model(**model_inputs)

    def postprocess(self, model_outputs):
        logits = model_outputs.logits[0].numpy()
        probabilities = softmax(logits)

        best_class = np.argmax(probabilities)
        label = self.model.config.id2label[best_class]
        score = probabilities[best_class].item()
        logits = logits.tolist()
        return {"label": label, "score": score, "logits": logits}

这个实现与框架无关,适用于 PyTorch 和 TensorFlow 模型。如果我们将其保存在一个名为 pair_classification.py 的文件中,然后我们可以像这样导入并注册它:

from pair_classification import PairClassificationPipeline
from transformers.pipelines import PIPELINE_REGISTRY
from transformers import AutoModelForSequenceClassification, TFAutoModelForSequenceClassification

PIPELINE_REGISTRY.register_pipeline(
    "pair-classification",
    pipeline_class=PairClassificationPipeline,
    pt_model=AutoModelForSequenceClassification,
    tf_model=TFAutoModelForSequenceClassification,
)

完成这些步骤后,我们可以将其与预训练模型一起使用。例如,sgugger/finetuned-bert-mrpc 已经在 MRPC 数据集上进行了微调,用于将句子对分类为是释义或不是释义。

from transformers import pipeline

classifier = pipeline("pair-classification", model="sgugger/finetuned-bert-mrpc")

然后,我们可以通过在 Repository 中使用 save_pretrained 方法将其分享到 Hub 上:

from huggingface_hub import Repository

repo = Repository("test-dynamic-pipeline", clone_from="{your_username}/test-dynamic-pipeline")
classifier.save_pretrained("test-dynamic-pipeline")
repo.push_to_hub()

这将会复制包含你定义的 PairClassificationPipeline 的文件到文件夹 "test-dynamic-pipeline" 中, 同时保存流水线的模型和分词器,然后将所有内容推送到仓库 {your_username}/test-dynamic-pipeline 中。 之后,只要提供选项 trust_remote_code=True,任何人都可以使用它:

from transformers import pipeline

classifier = pipeline(model="{your_username}/test-dynamic-pipeline", trust_remote_code=True)

将流水线添加到 🤗 Transformers

如果你想将你的流水线贡献给 🤗 Transformers,你需要在 pipelines 子模块中添加一个新模块, 其中包含你的流水线的代码,然后将其添加到 pipelines/__init__.py 中定义的任务列表中。

然后,你需要添加测试。创建一个新文件 tests/test_pipelines_MY_PIPELINE.py,其中包含其他测试的示例。

run_pipeline_test 函数将非常通用,并在每种可能的架构上运行小型随机模型,如 model_mappingtf_model_mapping 所定义。

这对于测试未来的兼容性非常重要,这意味着如果有人为 XXXForQuestionAnswering 添加了一个新模型, 流水线测试将尝试在其上运行。由于模型是随机的,所以不可能检查实际值,这就是为什么有一个帮助函数 ANY,它只是尝试匹配流水线的输出类型。

你还 需要 实现 2(最好是 4)个测试。

  • test_small_model_pt:为这个流水线定义一个小型模型(结果是否合理并不重要),并测试流水线的输出。 结果应该与 test_small_model_tf 的结果相同。
  • test_small_model_tf:为这个流水线定义一个小型模型(结果是否合理并不重要),并测试流水线的输出。 结果应该与 test_small_model_pt 的结果相同。
  • test_large_model_pt(可选):在一个真实的流水线上测试流水线,结果应该是有意义的。 这些测试速度较慢,应该被如此标记。这里的目标是展示流水线,并确保在未来的发布中没有漂移。
  • test_large_model_tf(可选):在一个真实的流水线上测试流水线,结果应该是有意义的。 这些测试速度较慢,应该被如此标记。这里的目标是展示流水线,并确保在未来的发布中没有漂移。
< > Update on GitHub