NLP Course documentation

大数据?🤗 Datasets 应对有方!

Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

大数据?🤗 Datasets 应对有方!

Ask a Question Open In Colab Open In Studio Lab

如今,处理 GB 级别的数据集已不再罕见,特别是如果你打算从头开始预训练像 BERT 或者 GPT-2 这样的 Transormer 模型。在这种情况下,甚至 加载(load) 数据集都可能成为挑战。例如,用于预训练 GPT-2 的 WebText 语料库包含超过 800 万个文档和 40 GB 的文本 —— 将其加载到笔记本电脑的 RAM 中都可能会让人抓狂!

幸运的是,🤗 Datasets 的设计旨在克服这些限制。它通过将数据集作为 内存映射(memory-mapped) 文件来处理,解放内存管理问题;并通过 流式处理(streaming) 来摆脱硬盘限制。

在本节中,我们将使用一个庞大的 825 GB 语料库——被称为 the Pile 的数据集,来探索🤗 Datasets 的这些功能。让我们开始吧!

什么是 the Pile?

The Pile 是由 EleutherAI 创建的一个用于训练大规模语言模型的英语文本语料库。它包含各种各样的数据集,涵盖科学文章,GitHub 代码库以及过滤后的 Web 文本。训练语料库以 14 GB 的文件块 提供,并且你也可以下载几个 单独的组件 。让我们先来看看 PubMed Abstracts 部分,它是 PubMed 上的 1500 万篇生物医学出版物的摘要的语料库。数据集采用 JSON Lines格式 并使用 zstandard 库进行压缩,所以我们首先需要先安装 zstandard 库:

!pip install zstandard

接下来,我们可以使用 第二节 中所学的加载远程数据集的方法加载数据集:

from datasets import load_dataset

# 这需要几分钟才能运行,所以在你等待的时候去喝杯茶或咖啡 :)
data_files = "https://the-eye.eu/public/AI/pile_preliminary_components/PUBMED_title_abstracts_2019_baseline.jsonl.zst"
pubmed_dataset = load_dataset("json", data_files=data_files, split="train")
pubmed_dataset
Dataset({
    features: ['meta', 'text'],
    num_rows: 15518009
})

我们可以看到我们的数据集中有 15,518,009 行和 2 列 —— 如此庞大!

✏️ 默认情况下,🤗 Datasets 会自动解压加载数据集所需的文件。如果你想保留硬盘空间,你可以把 DownloadConfig(delete_extracted=True) 传递给 load_dataset()download_config 参数。更多详细信息,请参阅 文档

让我们看看数据集的第一个元素的内容:

pubmed_dataset[0]
{'meta': {'pmid': 11409574, 'language': 'eng'},
 'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection.\nTo determine the prevalence of hypoxaemia in children aged under 5 years suffering acute lower respiratory infections (ALRI), the risk factors for hypoxaemia in children under 5 years of age with ALRI, and the association of hypoxaemia with an increased risk of dying in children of the same age ...'}

可以看到,这看起来像是医学文章的摘要。现在,让我们看看加载数据集所使用的 RAM!

内存映射的魔力

测量 Python 内存使用的简单方式是使用 psutil 库,可以通过如下方式安装:

!pip install psutil

它提供了一个 Process 类,让我们可以检查当前进程的内存使用情况,如下所示:

import psutil

# Process.memory_info 是以字节为单位的,所以转换为兆字节
print(f"使用的RAM:{psutil.Process().memory_info().rss / (1024 * 1024):.2f} MB")
RAM used: 5678.33 MB

这里的 rss 属性是指 常驻集(resident set size) 的大小,它是进程在 RAM 中占用的内存的部分。这个测量结果也包括了 Python 解释器和我们加载的库所使用的内存,所以实际上用于加载数据集的内存会更小一些。作为比较,让我们使用 dataset_size 属性看看数据集在磁盘上上的大小。由于结果像之前一样以字节为单位,我们需要手动将其转换为 GB:

print(f"数据集中文件的数量 : {pubmed_dataset.dataset_size}")
size_gb = pubmed_dataset.dataset_size / (1024**3)
print(f"数据集大小 (缓存文件) : {size_gb:.2f} GB")
数据集中文件的数量 : 20979437051
数据集大小 (缓存文件) : 19.54 GB

令人欣喜的是——尽管它将近 20GB 之大,我们却能用远小于此的 RAM 加载和访问数据集!

✏️ 试试看! 从 Pile 选择一个比你的笔记本电脑或台式机的 RAM 更大的 子集 ,用 🤗 Datasets 加载这个数据集,并且测量 RAM 的使用量。请注意,为了获得准确的测量结果,你需要新开一个进程执行这个操作。你可以在 the Pile paper 的表 1 中找到每个子集解压后的大小。

如果你熟悉 Pandas,这个结果可能会让人感到很惊奇。因为根据 Wes Kinney 的著名的 经验法则 ,你通常需要 5 到 10 倍于你数据集大小的 RAM。那么 🤗 Datasets 是如何解决这个内存管理问题的呢?🤗 Datasets 将每一个数据集看作一个 内存映射文件 ,它提供了 RAM 和文件系统存储之间的映射,该映射允许 Datasets 库无需将其完全加载到内存中即可访问和操作数据集的元素。

内存映射文件也一个在多个进程之间共享,这使得像 Dataset.map() 之类的方法可以在无需移动或者复制数据集的情况下实现并行化。在底层,这些功能都是由 Apache Arrow 内存格式和 pyarrow 库实现的,这使得数据加载和处理速度快如闪电。(更多有关 Apache Arrow 的详细信息以及与 Pandas 的比较,请查看 Dejan Simic的博客文章 。) 为了更清晰地看到这个过程,让我们通过遍历 PubMed 摘要数据集中的所有元素,运行一个小速度测试:

import timeit

code_snippet = """batch_size = 1000

for idx in range(0, len(pubmed_dataset), batch_size):
    _ = pubmed_dataset[idx:idx + batch_size]
"""

time = timeit.timeit(stmt=code_snippet, number=1, globals=globals())
print(
    f"在 {time:.1f}s 内遍历了 {len(pubmed_dataset)}个示例(约 {size_gb:.1f} GB),即 {size_gb/time:.3f} GB/s"
)
'在64.2s内遍历了15518009个示例(约19.5 GB),即0.304 GB/s'

这里我们使用了 Python 的 timeit 模块来测量执行 code_snippet 所耗的时间。你通常能以十分之几 GB/s 到几 GB/s 的速度遍历一个数据集。通过上述的方法就已经能够解决大多数大数据集加载的限制,但是有时候你不得不使用一个很大的数据集,它甚至都不能存储在笔记本电脑的硬盘上。例如,如果我们尝试下载整个 Pile,我们需要 825GB 的可用磁盘空间!为了处理这种情况,🤗 Datasets 提供了一个流式功能,这个功能允许我们动态下载和访问元素,并且不需要下载整个数据集。让我们来看看这个功能是如何工作的。

💡在 Jupyter 笔记中你还可以使用 %%timeit 魔术函数 为整个单元格计时。

流式数据集

要使用数据集流,你只需要将 streaming=True 参数传递给 load_dataset() 函数。接下来,让我们以流模式加载 PubMed 摘要数据集:

pubmed_dataset_streamed = load_dataset(
    "json", data_files=data_files, split="train", streaming=True
)

不同于我们在这一章其它地方遇到的熟悉的 Datasetstreaming=True 返回的对象是一个 IterableDataset 。顾名思义,要访问 IterableDataset ,我们需要迭代它。我们可以按照如下方式访问流式数据集的第一个元素:

next(iter(pubmed_dataset_streamed))
{'meta': {'pmid': 11409574, 'language': 'eng'},
 'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection.\nTo determine the prevalence of hypoxaemia in children aged under 5 years suffering acute lower respiratory infections (ALRI), the risk factors for hypoxaemia in children under 5 years of age with ALRI, and the association of hypoxaemia with an increased risk of dying in children of the same age ...'}

如果你需要在训练期间对流式数据集中的元素 tokenize,可以使用 IterableDataset.map() 进行在线处理,而不需要等待数据集全部加载完毕。该过程与我们在 第三章 中对数据集 tokenize 的过程完全相同,唯一的区别是输出是逐个返回的:

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
tokenized_dataset = pubmed_dataset_streamed.map(lambda x: tokenizer(x["text"]))
next(iter(tokenized_dataset))
{'input_ids': [101, 4958, 5178, 4328, 6779, ...], 'attention_mask': [1, 1, 1, 1, 1, ...]}

💡 为了加速流式的 tokenize,你可以传递 batched=True ,就像我们在上一节看到的那样。它会批量处理示例;默认的批大小是 1000,可以通过 batch_size 参数指定批量大小。

你还可以使用 IterableDataset.shuffle() 打乱流式数据集,但与 Dataset.shuffle() 不同的是这只会打乱预定义 buffer_size 中的元素:

shuffled_dataset = pubmed_dataset_streamed.shuffle(buffer_size=10_000, seed=42)
next(iter(shuffled_dataset))
{'meta': {'pmid': 11410799, 'language': 'eng'},
 'text': 'Randomized study of dose or schedule modification of granulocyte colony-stimulating factor in platinum-based chemotherapy for elderly patients with lung cancer ...'}

在这个例子中,我们从缓冲区的前 10,000 个示例中随机选择了一个示例。一旦访问了一个示例,它在缓冲区中的位置就会被语料库中的下一个示例填充 (即,上述案例中的第 10,001 个示例)。你还可以使用 IterableDataset.take()IterableDataset.skip() 函数从流式数据集中选择元素,它的作用类似于 Dataset.select() 。例如,要选择 PubMed Abstracts 数据集的前 5 个示例,我们可以执行以下代码:

dataset_head = pubmed_dataset_streamed.take(5)
list(dataset_head)
[{'meta': {'pmid': 11409574, 'language': 'eng'},
  'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection ...'},
 {'meta': {'pmid': 11409575, 'language': 'eng'},
  'text': 'Clinical signs of hypoxaemia in children with acute lower respiratory infection: indicators of oxygen therapy ...'},
 {'meta': {'pmid': 11409576, 'language': 'eng'},
  'text': "Hypoxaemia in children with severe pneumonia in Papua New Guinea ..."},
 {'meta': {'pmid': 11409577, 'language': 'eng'},
  'text': 'Oxygen concentrators and cylinders ...'},
 {'meta': {'pmid': 11409578, 'language': 'eng'},
  'text': 'Oxygen supply in rural africa: a personal experience ...'}]

同样,你可以使用 IterableDataset.skip() 函数从打乱的数据集中创建训练集和验证集,如下所示:

# 跳过前 1,000 个示例 ,将其余部分创建为训练集
train_dataset = shuffled_dataset.skip(1000)
# 将前 1,000 个示例用于验证集
validation_dataset = shuffled_dataset.take(1000)

让我们用一个常见的任务来进行我们对数据集流的最后探索:将多个数据集组合在一起创建一个新的语料库。🤗 Datasets 提供了一个 interleave_datasets() 函数,它将一个 IterableDataset 对象列表组合为单个的 IterableDataset ,其中新数据集的元素是交替抽取列表中的数据集获得的。当你试图组合大型数据集时,这个函数特别有用,让我们通过下面这个例子来试着组合 Pile 的 FreeLaw 数据集,这是一个包含美国法院法律意见的 51 GB 数据集:

law_dataset_streamed = load_dataset(
    "json",
    data_files="https://the-eye.eu/public/AI/pile_preliminary_components/FreeLaw_Opinions.jsonl.zst",
    split="train",
    streaming=True,
)
next(iter(law_dataset_streamed))
{'meta': {'case_ID': '110921.json',
  'case_jurisdiction': 'scotus.tar.gz',
  'date_created': '2010-04-28T17:12:49Z'},
 'text': '\n461 U.S. 238 (1983)\nOLIM ET AL.\nv.\nWAKINEKONA\nNo. 81-1581.\nSupreme Court of United States.\nArgued January 19, 1983.\nDecided April 26, 1983.\nCERTIORARI TO THE UNITED STATES COURT OF APPEALS FOR THE NINTH CIRCUIT\n*239 Michael A. Lilly, First Deputy Attorney General of Hawaii, argued the cause for petitioners. With him on the brief was James H. Dannenberg, Deputy Attorney General...'}

这个数据集足够大,可以对大多数笔记本电脑的 RAM 有足够的压力,但是我们已经能够毫不费力地加载和访问它!现在我们使用 interleave_datasets() 函数将 FreeLaw 和 PubMed Abstracts 数据集的样本整合在一起:

from itertools import islice
from datasets import interleave_datasets

combined_dataset = interleave_datasets([pubmed_dataset_streamed, law_dataset_streamed])
list(islice(combined_dataset, 2))
[{'meta': {'pmid': 11409574, 'language': 'eng'},
  'text': 'Epidemiology of hypoxaemia in children with acute lower respiratory infection ...'},
 {'meta': {'case_ID': '110921.json',
   'case_jurisdiction': 'scotus.tar.gz',
   'date_created': '2010-04-28T17:12:49Z'},
  'text': '\n461 U.S. 238 (1983)\nOLIM ET AL.\nv.\nWAKINEKONA\nNo. 81-1581.\nSupreme Court of United States.\nArgued January 19, 1983.\nDecided April 26, 1983.\nCERTIORARI TO THE UNITED STATES COURT OF APPEALS FOR THE NINTH CIRCUIT\n*239 Michael A. Lilly, First Deputy Attorney General of Hawaii, argued the cause for petitioners. With him on the brief was James H. Dannenberg, Deputy Attorney General...'}]

这里我们使用了来自 Python 的 itertools 模块的 islice() 函数从合并的数据集中选择前两个示例,并且我们可以看到它们实际上就是两个源数据集中的前两个示例拼在一起形成的:

最后,如果你想流式传输整个 825GB 的 Pile,你可以按照如下方式获取所有的预处理文件:

base_url = "https://the-eye.eu/public/AI/pile/"
data_files = {
    "train": [base_url + "train/" + f"{idx:02d}.jsonl.zst" for idx in range(30)],
    "validation": base_url + "val.jsonl.zst",
    "test": base_url + "test.jsonl.zst",
}
pile_dataset = load_dataset("json", data_files=data_files, streaming=True)
next(iter(pile_dataset["train"]))
{'meta': {'pile_set_name': 'Pile-CC'},
 'text': 'It is done, and submitted. You can play “Survival of the Tastiest” on Android, and on the web...'}

✏️ 试试看! 使用像 mc4 或者 oscar 这样的大型 Common Crawl 语料库来创建一个流式多语言数据集,该数据集代表你选择的国家/地区语言的口语比例。例如,瑞士的四种民族语言分别是德语、法语、意大利语和罗曼什语,因此你可以尝试根据根据口语比例对 Oscar 子集进行抽样来创建一个瑞士语料库。

你现在拥有加载和处理各种类型和大小的数据集的所需的所有工具 —— 但是除非你非常幸运,否则在你的 NLP 之旅中会有一个难题,你将不得不亲自创建一个数据集来解决手头的问题。这就是我们接下来要讨论的主题!

< > Update on GitHub