Metadata-Version: 2.3
Name: blossom-data
Version: 0.4.2
Summary: A simple way to synthesize LLM training data.
License: MIT
Keywords: llm,data synthesis,data processing,pyspark,ray,machine learning,nlp
Author: Azure99
Requires-Python: >=3.9,<4.0
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Requires-Dist: numpy (>=2.0,<3.0)
Requires-Dist: pillow (>=10.0,<11.0)
Requires-Dist: pyarrow (>=15.0.0)
Requires-Dist: pydantic (>=2.0,<3.0)
Requires-Dist: pyspark (>=3.0,<4.0)
Requires-Dist: pyyaml (>=6.0,<7.0)
Requires-Dist: ray[data] (>=2.0,<3.0)
Requires-Dist: requests (>=2.0,<3.0)
Project-URL: Repository, https://github.com/Azure99/BlossomData
Description-Content-Type: text/markdown

# BlossomData

![](https://img.shields.io/badge/language-Python-214870)
![GitHub License](https://img.shields.io/github/license/Azure99/BlossomData)
[![PyPI - Version](https://img.shields.io/pypi/v/blossom-data)](https://pypi.org/project/blossom-data/)
[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/Azure99/BlossomData)

BlossomData是一个专为大模型训练打造的数据处理框架，旨在提供灵活、高效的数据处理解决方案。它内置丰富的算子库，支持不同模态、不同训练阶段的数据处理与合成。能够从单机环境无缝迁移到分布式环境，允许用户灵活扩展自定义算子，从而大幅降低数据处理流程的开发和计算成本。

# 使用示例

```bash
pip install blossom-data
# 从源码安装
# pip install git+https://github.com/Azure99/BlossomData.git
```

在使用之前，请在`config.yaml`文件中配置模型服务提供商的API密钥和相关参数。（可参考`config.yaml.example`）

## 第一个数据合成任务

下面是一个非常实用的示例，仅依赖数学题目和参考答案，即可合成经过验证的长推理中文训练数据。

框架提供了大量内置算子，可以在[blossom.op](https://github.com/Azure99/BlossomData/blob/main/src/blossom/op/__init__.py)中查看。例如，当原始数据缺失答案时，可以使用[ChatDistiller](https://github.com/Azure99/BlossomData/blob/main/src/blossom/op/chat/chat_distiller.py)生成回答，然后通过[ChatMultiReasoningFilter](https://github.com/Azure99/BlossomData/blob/main/src/blossom/op/chat/chat_multi_reasoning_filter.py)基于投票方式过滤掉潜在错误样本。

```python
from blossom import *
from blossom.op import *
from blossom.schema import *

# 示例数学指令数据
data = [
    ChatSchema(
        messages=[
            user("Suppose that $wz = 12-8i$, and $|w| = \\sqrt{13}$. What is $|z|$?"),
            assistant("4"),
        ]
    ),
]

# 定义要使用的算子
ops = [
    # 利用非推理模型将指令数据翻译为中文
    ChatTranslator(model="deepseek-chat", target_language="Chinese"),
    # 使用推理模型生成问题的回答，并使用非推理模型验证回答正确性
    ChatVerifyDistiller(
        model="deepseek-reasoner",
        mode=ChatVerifyDistiller.Mode.LLM,
        validation_model="deepseek-chat",
    ),
    # 将reasoning_content合并到content中，以便用于训练
    ChatReasoningContentMerger(),
]

dataset = create_dataset(data)
result = dataset.execute(ops).collect()
print(result)
```

# 框架核心概念

## Schema

Schema是框架中的基础数据结构，用于表示和处理不同类型的数据。所有Schema类型都继承自基础Schema类，提供了统一的接口和功能。

```python
# 文本数据
text_data = TextSchema(content="这是一段文本内容")

# 对话数据
chat_data = ChatSchema(
    messages=[
        user("你好"),
        assistant("你好，请问需要什么帮助？")
    ]
)

# 结构化数据
row_data = RowSchema(data={"name": "张三", "age": 30, "score": 95})

# 自定义数据
custom_data = CustomSchema(data=1)
```

每个 Schema 实例都包含以下通用字段：
- `id`: 唯一标识符
- `type`: Schema类型标识
- `failed`: 处理失败标志
- `metadata`: dict类型的附加元数据

## DataFrame与Dataset

DataFrame是对数据的抽象表示，提供了对数据进行转换、过滤和聚合的接口。框架支持多种DataFrame实现，包括Local、Spark和Ray，使得同一套代码可以在不同的执行环境中运行。

Dataset是对DataFrame的高级封装，提供了更加便捷的接口和额外的功能，特别是对算子的支持。Dataset是用户交互的主要接口，隐藏了底层执行引擎的复杂性。

```python
# 创建数据集
dataset = create_dataset(data, engine=DatasetEngine.LOCAL)
# 或从文件加载，并指定执行引擎为Spark
dataset = load_dataset("/path/to/data.json", engine=DatasetEngine.SPARK)

# Dataset 提供丰富的数据处理接口
dataset = (
    # 对每个元素应用转换函数
    dataset.map(lambda x: TextSchema(content=x.content.upper()))
    # 过滤数据
    .filter(lambda x: len(x.content) > 10)
    # 批量处理数据
    .transform(lambda items: [item for item in items if "关键词" in item.content])
    # 排序
    .sort(lambda x: len(x.content))
    # 限制数量
    .limit(100)
    # 随机打乱
    .shuffle()
    # 重新分区
    .repartition(8)
    # 执行一系列算子
    .execute([
        TextContentFilter(min_length=100),
        TextTranslator(model="gpt-4o-mini", target_language="Chinese")
    ])
)

# 聚合操作
result = dataset.aggregate(
    Sum(lambda x: x["score"]),
    Mean(lambda x: x["score"]),
    Count()
)

# 分组操作
grouped = dataset.group_by(lambda x: x["category"])
group_stats = grouped.aggregate(Count(), Mean(lambda x: x["score"]))

# 收集结果
results = dataset.collect()
# 写入文件
dataset.write_json("/path/to/output.json")
```

## Operator

Operator（算子）是数据处理的核心单元，封装了特定的数据处理逻辑。框架提供了三种基本类型的算子：

1. **MapOperator**: 一对一映射，对每个元素单独处理
2. **FilterOperator**: 过滤操作，决定保留或删除元素
3. **TransformOperator**: 批量处理，可以对多个元素同时操作

```python
# 使用装饰器定义自定义算子
@map_operator()
def uppercase_text(item):
    if isinstance(item, TextSchema):
        return TextSchema(content=item.content.upper())
    return item

@filter_operator()
def filter_short_text(item):
    if isinstance(item, TextSchema):
        return len(item.content) > 10
    return True

@transform_operator()
def batch_process(items):
    # 批量处理逻辑
    return [item for item in items if "关键词" in item.content]

# 使用上下文调用模型的算子
@context_map_operator(parallel=4)
def translate_with_model(context, item):
    if isinstance(item, TextSchema):
        result = context.chat_completion(
            "gpt-4o-mini", 
            [user(f"Translate to Chinese: {item.content}")]
        )
        return TextSchema(content=result)
    return item
```

## Context

Context（上下文）提供了算子执行所需的环境和资源，包括配置信息和模型提供者的访问。它是算子与外部资源交互的桥梁，使得算子可以访问模型服务、配置参数等。

```python
# 创建上下文
context = Context()

# 访问配置
config = context.get_config()

# 获取模型提供者
provider = context.get_model("gpt-4o-mini")

# 使用模型生成内容
response = context.chat_completion(
    "gpt-4o-mini",
    [user("你好，请问今天天气如何？")]
)

# 生成嵌入向量
embedding = context.embedding("text-embedding-3-small", "这是一段文本")
```
