"""CoLLie 中预定义的数据结构
"""
import io
import json
import mmap
import os
import random
import threading
from functools import reduce
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data.dataset import Dataset
from transformers import PreTrainedTokenizer
from collie.driver.io import FileIODriver
__all__ = [
"CollieDatasetForTraining",
"CollieDatasetForGeneration",
"CollieDatasetForClassification",
]
class _ShardContainer(list):
def __init__(self, path, shuffle: bool = False, seed: int = 1024) -> None:
list.__init__([])
self.file = None
self.file_name = None
self.shuffle = shuffle
self.seed = seed
self.threadlocal = threading.local()
meta_files = []
for file in FileIODriver.list(path):
if file.endswith(".meta"):
meta_files.append(os.path.join(path, file))
self.meta = [
[
{"file": file, "offset": meta[0], "length": meta[1]}
for meta in FileIODriver.load(file, mode="rb")
]
for file in meta_files
]
self.meta = np.hstack(self.meta).tolist()
self.indices = list(range(len(self.meta)))
if self.shuffle:
random.seed(self.seed)
random.shuffle(self.indices)
def _get_mmap(self, path):
if not hasattr(self.threadlocal, "handles"):
with open(path, "rb") as f:
mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
self.threadlocal.handles = [f, mm]
if (
path.endswith(".gz")
or path.endswith(".bz")
or path.endswith(".bz2")
):
raise NotImplementedError(
"Compressed files are not supported because .seek() would require "
"rereading the entire file, making performance too slow."
)
return self.threadlocal.handles[-1]
def __len__(self):
return len(self.meta)
def __getitem__(self, index):
meta = self.meta[self.indices[index]]
file_name: str = meta["file"]
if (
self.file is None
or self.file_name != file_name.replace(".meta", "")
or (isinstance(self.file, mmap.mmap) and self.file.closed)
):
self.file_name = file_name.replace(".meta", "")
if self.file is not None:
self.file.close()
self.file = self._get_mmap(self.file_name)
self.file.seek(meta["offset"])
return json.loads(self.file.readline().decode())
def _inspect_special_tokens_length(tokenizer):
ids_with_special_tokens = tokenizer("a", add_special_tokens=True).input_ids
ids_without_special_tokens = tokenizer("a", add_special_tokens=False).input_ids
bos_length = 0
for bos_length in range(len(ids_with_special_tokens)):
if ids_with_special_tokens[bos_length] == ids_without_special_tokens[0]:
break
eos_length = (
len(ids_with_special_tokens) - len(ids_without_special_tokens) - bos_length
)
return bos_length, eos_length
[文档]class CollieDatasetForTraining(Dataset):
"""**CoLLie** 中的基本数据格式,可用于预训练、微调任务。
需提供的数据格式形似:
.. code-block::
[
{
"text": "这是prompt部分的文本",
},
...
]
或者:
.. code-block::
[
{
"input": "这是prompt部分的文本",
"output": "这是output部分的文本"
},
...
]
或者:
.. code-block::
[
{
"tokens": [token_id_1, token_id_2, ...],
"labels": [-100, token_id_2, ...] # 可选,-100 表示计算 loss 时忽略该 token
},
...
]
当使用第二种数据格式时,只有 `output` 部分的 token 会参与 loss计算。
当使用第二种数据格式时,`labels` 字段是可选的,如果不提供 `labels` 默认计算所有 token 的 loss
"""
def __init__(
self,
dataset: Sequence[Dict],
tokenizer: Optional[PreTrainedTokenizer] = None,
add_special_tokens: bool = True,
shuffle: bool = False,
seed: int = 1024,
max_length: int = -1,
):
self.dataset = dataset
self.tokenizer = tokenizer
self.add_special_tokens = add_special_tokens
self.indices = list(range(len(self.dataset)))
self.max_length = max_length
if shuffle:
random.seed(seed)
random.shuffle(self.indices)
if self.tokenizer is not None:
self.bos_length, self.eos_length = _inspect_special_tokens_length(self.tokenizer)
def __len__(self):
return len(self.dataset)
def __getitem__(self, index) -> Dict:
if isinstance(index, slice):
return self._get_slice(index)
if index > len(self):
raise IndexError("Index out of range.")
index = self.indices[index]
if self.tokenizer is None:
input_ids = self.dataset[index]["tokens"]
labels = self.dataset[index].get("labels", input_ids.detach().clone())
if "attention_mask" in self.dataset[index].keys():
attention_mask = self.dataset[index]["attention_mask"]
else:
attention_mask = torch.ones_like(torch.tensor(input_ids)).cpu().tolist()
else:
if "text" in self.dataset[0].keys():
inputs = self.tokenizer(
self.dataset[index]["text"],
add_special_tokens=self.add_special_tokens,
)
input_ids = inputs["input_ids"]
labels = torch.tensor(input_ids).cpu().tolist()
attention_mask = inputs.get(
"attention_mask",
torch.ones_like(torch.tensor(input_ids)).cpu().tolist(),
)
elif (
"input" in self.dataset[0].keys() and "output" in self.dataset[0].keys()
):
inputs = self.tokenizer(
self.dataset[index]["input"] + self.dataset[index]["output"],
add_special_tokens=self.add_special_tokens,
)
input_ids = inputs["input_ids"]
attention_mask = inputs.get(
"attention_mask",
torch.ones_like(torch.tensor(input_ids)).cpu().tolist(),
)
labels = torch.tensor(input_ids)
target_length = len(
self.tokenizer(
self.dataset[index]["output"],
add_special_tokens=self.add_special_tokens,
).input_ids
)
if self.add_special_tokens:
target_length -= self.bos_length
labels[: -target_length] = -100
labels = labels.cpu().tolist()
else:
raise ValueError("Dataset must have one or two fields.")
if self.max_length > 0:
input_ids = input_ids[: self.max_length]
attention_mask = attention_mask[: self.max_length]
labels = labels[: self.max_length]
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels,
}
def _get_slice(self, s: slice):
result = []
for idx in self.indices[s]:
result.append(self[idx])
return result
@classmethod
def from_json(
cls,
path: str,
tokenizer: Optional[PreTrainedTokenizer] = None,
shuffle: bool = False,
seed: int = 1024,
):
dataset = cls(
dataset=json.loads(FileIODriver.load(path, mode="r")),
shuffle=shuffle,
seed=seed,
tokenizer=tokenizer,
)
return dataset
@classmethod
def from_processed(cls, path: str, shuffle: bool = False, seed: int = 1024):
dataset = cls(dataset=_ShardContainer(path), shuffle=shuffle, seed=seed)
return dataset
def save_propressed(self, path: str, shard_size: int = 4):
shard = io.BytesIO()
shard_idx = 0
meta = np.empty((0, 2), int)
for i in self.indices:
data = {"tokens": self[i]["input_ids"]}
data.update(
{key: value for key, value in self[i].items() if key != "input_ids"}
)
bytes_data = json.dumps(data).encode() + "\n".encode()
offset = shard.tell()
length = len(data["tokens"])
shard.write(bytes_data)
meta = np.append(meta, np.array([[offset, length]]), axis=0)
if shard.tell() > shard_size * 1024 * 1024 or i == len(self) - 1:
file = f"collie-dataset-shard-{shard_idx}.bin"
shard.seek(0)
FileIODriver.save(meta, os.path.join(path, file + ".meta"))
FileIODriver.save(shard.read().decode(), os.path.join(path, file))
shard.close()
shard = io.BytesIO()
meta = np.empty((0, 2), int)
shard_idx += 1
class CollieDatasetForPerplexity(CollieDatasetForTraining):
...
class CollieDatasetForGeneration(CollieDatasetForTraining):
"""**CoLLie** 中的生成数据集,主要用于数据生成或者与生成相关的检验
须搭配 :class:`~collie.controller.evaluator.EvaluatorForGeneration` 使用。需提供的数据格式形似:
.. code-block::
[
{
"text": "这是prompt部分的文本",
"target": "目标文本" # 需要计算 bleu, rouge 值时需要加上这个字段
},
...
]
"""
def __getitem__(self, index) -> Dict:
if index > len(self):
raise IndexError("Index out of range.")
target = None
index = self.indices[index]
if self.tokenizer is None:
input_ids = self.dataset[index]["tokens"]
if "attention_mask" in self.dataset[index].keys():
attention_mask = self.dataset[index]["attention_mask"]
else:
attention_mask = torch.ones_like(torch.tensor(input_ids).cpu().tolist())
target = self.dataset[index].get("target", None)
else:
inputs = self.tokenizer(
self.dataset[index]["text"], add_special_tokens=self.add_special_tokens
)
input_ids = inputs["input_ids"]
attention_mask = inputs.get(
"attention_mask",
torch.ones_like(torch.tensor(input_ids)).cpu().tolist(),
)
if "target" in self.dataset[index].keys():
if isinstance(self.dataset[index]["target"], str):
target = [self.tokenizer(self.dataset[index]["target"]).input_ids]
elif isinstance(self.dataset[index]["target"], (list, tuple, set)):
target = [
self.tokenizer(x).input_ids
for x in self.dataset[index]["target"]
]
if self.max_length > 0:
input_ids = input_ids[: self.max_length]
attention_mask = attention_mask[: self.max_length]
sample = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": input_ids,
}
if target is not None:
sample["target"] = target
return sample
[文档]class CollieDatasetForClassification(CollieDatasetForTraining):
"""**CoLLie** 中的分类任务数据集
须搭配 :class:`~collie.controller.evaluator.EvaluatorForClassfication` 使用。需提供的数据格式形似:
.. code-block::
[
{
"input": "这是prompt部分的文本",
"output": ["类别1", "类别2", "类别3"],
"target": 0
},
...
]
"""
def __init__(
self,
dataset: Sequence[Dict],
tokenizer: Optional[PreTrainedTokenizer] = None,
add_special_tokens: bool = True,
shuffle: bool = False,
seed: int = 1024,
max_length: int = -1,
style: str = "harness",
):
super().__init__(
dataset=dataset,
tokenizer=tokenizer,
add_special_tokens=add_special_tokens,
shuffle=shuffle,
seed=seed,
max_length=max_length,
)
assert style.lower() in (
"harness",
"helm",
), "Style can only be one of `harness` or `helm`"
self.style = style.lower()
def __getitem__(self, index) -> Dict:
if index > len(self):
raise IndexError("Index out of range.")
index = self.indices[index]
if self.tokenizer is None:
input_ids = tuple(self.dataset[index]["tokens"])
target = self.dataset[index]["target"]
else:
if self.style == "harness":
if (
"input" in self.dataset[0].keys()
and "output" in self.dataset[0].keys()
and "target" in self.dataset[0].keys()
):
input_ids = []
attention_mask = []
labels = []
for output in self.dataset[index]["output"]:
inputs = self.tokenizer(
self.dataset[index]["input"] + output,
add_special_tokens=self.add_special_tokens,
)
input_ids.append(inputs.get("input_ids"))
attention_mask.append(
inputs.get(
"attention_mask",
torch.ones_like(torch.tensor(inputs.get("input_ids"))),
)
)
label = torch.tensor(inputs.get("input_ids"))
target_length = len(
self.tokenizer(
output,
add_special_tokens=self.add_special_tokens,
).input_ids
)
if self.add_special_tokens:
target_length -= self.bos_length
label[: -target_length] = -100
label = label.cpu().tolist()
labels.append(label)
input_ids = tuple(input_ids)
labels = tuple(labels)
attention_mask = tuple(attention_mask)
target = self.dataset[index]["target"]
else:
raise ValueError(
"CollieDatasetForClassification must have three fields (`input`, `output` and `target`)."
)
if self.max_length > 1:
input_ids = [sample[: self.max_length] for sample in input_ids]
labels = [sample[: self.max_length] for sample in labels]
attention_mask = [
sample[: self.max_length] for sample in attention_mask
]
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels,
"target": target,
}
elif self.style == "helm":
if (
"input" in self.dataset[0].keys()
and "output" in self.dataset[0].keys()
and "target" in self.dataset[0].keys()
):
inputs = self.tokenizer(
self.dataset[index]["input"],
add_special_tokens=self.add_special_tokens,
)
input_ids = inputs["input_ids"]
attention_mask = inputs.get(
"attention_mask",
torch.ones_like(torch.tensor(input_ids)).cpu().tolist(),
)
output = tuple([option for option in self.dataset[index]["output"]])
target = self.dataset[index]["target"]
else:
raise ValueError(
"CollieDatasetForClassification must have three fields (`input`, `output` and `target`)."
)
if self.max_length > 0:
input_ids = input_ids[: self.max_length]
attention_mask = attention_mask[: self.max_length]
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": input_ids,
"target": target,
"output": output,
}
else:
raise ValueError("Style can only be one of `harness` or `helm`")