Metadata-Version: 2.3
Name: blossom-data
Version: 0.3.0
Summary: A simple way to synthesize LLM training data.
Author: Azure99
Requires-Python: >=3.9,<4.0
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: numpy (>=2.0,<3.0)
Requires-Dist: pillow (>=10.0,<11.0)
Requires-Dist: pyarrow (>=15.0.0)
Requires-Dist: pydantic (>=2.0,<3.0)
Requires-Dist: pyspark (>=3.0,<4.0)
Requires-Dist: pyyaml (>=6.0,<7.0)
Requires-Dist: ray[data] (>=2.0,<3.0)
Requires-Dist: requests (>=2.0,<3.0)
Project-URL: Repository, https://github.com/Azure99/BlossomData
Description-Content-Type: text/markdown

# BlossomData

BlossomData是一个专为大模型训练打造的数据处理框架，旨在提供灵活、高效的数据处理解决方案。它内置丰富的算子库，支持不同模态、不同训练阶段的数据处理与合成。能够从单机环境无缝迁移到分布式环境，允许用户灵活扩展自定义算子，从而大幅降低数据处理流程的开发和计算成本。

⚠注意：该项目仍处于原型阶段，API正在快速迭代，建议在实验环境中测试使用。

# 使用示例

使用pip安装。

```
pip3 install git+https://github.com/Azure99/BlossomData.git
```

在使用之前，请在`config.yaml`文件中配置模型服务提供商的API密钥和相关参数。（可参考`config.yaml.example`）

## 灵活处理数据

下面是一个非常实用的示例，仅依赖数学题目和参考答案，即可合成经过验证的长推理中文训练数据。

框架提供了大量内置算子，可以在[blossom.op](https://github.com/Azure99/BlossomData/blob/main/src/blossom/op/__init__.py)中查看。例如，当原始数据缺失答案时，可以使用[ChatDistiller](https://github.com/Azure99/BlossomData/blob/main/src/blossom/op/chat/chat_distiller.py)生成回答，然后通过[ChatMultiReasoningFilter](https://github.com/Azure99/BlossomData/blob/main/src/blossom/op/chat/chat_multi_reasoning_filter.py)基于投票方式过滤掉潜在错误样本。

```python
# 示例数学指令数据
data = [
    ChatSchema(
        messages=[
            user("Suppose that $wz = 12-8i$, and $|w| = \\sqrt{13}$. What is $|z|$?"),
            assistant("4"),
        ]
    ),
]

# 定义要使用的算子
ops = [
    # 利用非推理模型将指令数据翻译为中文
    ChatTranslator(model="deepseek-chat", target_language="Chinese"),
    # 使用推理模型生成问题的回答，并使用非推理模型验证回答正确性
    ChatVerifyDistiller(
        model="deepseek-reasoner",
        mode=ChatVerifyDistiller.Mode.LLM,
        validation_model="deepseek-chat"
    ),
    # 将reasoning_content合并到content中，以便用于训练
    ChatReasoningContentMerger()
]

dataset = create_dataset(data)
result = dataset.execute(ops).collect()
print(result)
```

## 分布式数据处理

框架支持多种执行引擎(Local/Spark/Ray)，以便适应各种规模的数据和任务负载。算子仅需关注最核心的处理逻辑，无需感知实际的执行引擎。

```python
# 基于已有数据创建Dataset，仅适合少量数据
# dataset = create_dataset(data, type=DatasetEngine.SPARK)
# 从本地文件创建Dataset
dataset = load_dataset("/path/to/data.json", type=DatasetEngine.SPARK)
(
    # 可以使用map、filter、transform等底层操作
    dataset.filter(lambda x: x.metadata["language"] == "en")
    # 随机打乱数据并取前10条
    .shuffle()
    .limit(10)
    .execute([ChatTranslator(model="gpt-4o-mini", target_language="Chinese")])
    # 将数据写入本地文件
    .write_json("/path/to/output")
)
```

## 自定义数据加载

对于已经存在的数据，我们可能有特定的格式要求，因此可以通过`DataHandler`来实现自定义的数据加载/保存逻辑。

`from_dict`将自定义数据字典转换为框架内部的Schema，而`to_dict`反向将框架内部的Schema转换为自定义数据字典。

```python
# 自定义加载/保存逻辑
class CustomDataHandler(DataHandler):
    def from_dict(self, data):
        return TextSchema(content=data["your_text_field"])

    def to_dict(self, schema):
        return {"your_text_field": schema.content}


ops = [TextContentFilter(contents="bad_word")]

dataset = load_dataset(path="your_data.json", data_handler=CustomDataHandler())
dataset.execute(ops).write_json(
    "filtered_data.json",
    data_handler=CustomDataHandler(),
)
```

## 自定义算子

定义自己的算子，更灵活地处理和生成训练数据。下面的示例中，首先翻译英文文档为中文，然后从中抽取问答对作为训练数据。

```python
# 自定义Map算子，进行一对一映射
@context_map_operator(parallel=4)
def self_qa_op(context, item):
    self_qa_prompt = (
        "基于给定的文本，随意生成一个问题以及对应的长答案。\n"
        "你的输出应该是一个json，包含question、answer两个字符串字段，不需要输出任何其他的无关解释。\n"
        f"给定的文本：{item.content}"
    )
    raw_result = context.chat_completion("gpt-4o-mini", [user(self_qa_prompt)])
    result = loads_markdown_first_json(raw_result)
    return ChatSchema(
        messages=[
            user(result["question"]),
            assistant(result["answer"]),
        ]
    )


# 纯文本英文数据
data = [
    TextSchema(
        content="""Tomato scrambled eggs is a common dish in Eastern cuisine. 
        Because its ingredients are easy to obtain and the cooking steps are relatively simple, 
        it is also loved by beginners in the kitchen."""
    ),
]

dataset = create_dataset(data)

result = dataset.execute([
    # 翻译英文文本
    TextTranslator(
        model="gpt-4o-mini",
        target_language="Chinese",
    ),
    # 基于翻译后的文本，生成问题和答案
    self_qa_op,
]).collect()
print(result)
```

你可能会得到这样的输出：

```
# User: 为什么厨房新手喜欢做番茄炒蛋？
# Assistant: 厨房新手喜欢做番茄炒蛋有几个主要原因。
# 首先，番茄和鸡蛋这两种食材非常容易获得，几乎在所有的超市和市场都可以买到。
# 其次，番茄炒蛋的烹饪步骤也比较简单，没有复杂的技巧要求，非常适合新手尝试。
# 步骤通常包括切番茄、打鸡蛋、热锅上油、炒熟等，整个过程比较直观。
# 再者，番茄炒蛋作为一道家常菜，口味鲜美，营养丰富，成品容易让人满意，能给新手带来成就感。
# 此外，这道菜还可以根据个人口味进行简易的调味调整，无需严格遵循复杂的配方。
# 这些因素使得番茄炒蛋成为新手下厨时的首选之一。
```
