Metadata-Version: 2.1
Name: maas-task-backoff-framework
Version: 1.0.13
Summary: 基于Redis的任务退避重试、调度框架，支持多种退避策略
Home-page: https://github.com/xiaofucoding/maas-task-backoff-framework.git/
Author: xinzf
Author-email: 515361725@qq.com
License: MIT
Project-URL: Source, https://github.com/yourusername/task-backoff-scheduler
Project-URL: Documentation, https://github.com/yourusername/task-backoff-scheduler#readme
Project-URL: Bug Reports, https://github.com/yourusername/task-backoff-scheduler/issues
Keywords: task retry backoff redis queue
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.7
Description-Content-Type: text/markdown
Requires-Dist: redis (>=4.0.0)
Requires-Dist: PyYAML (>=6.0)
Requires-Dist: dataclasses (>=0.6) ; python_version < "3.7"
Provides-Extra: dev
Requires-Dist: pytest (>=6.0.0) ; extra == 'dev'
Requires-Dist: pytest-cov (>=2.0.0) ; extra == 'dev'
Requires-Dist: black (>=21.0.0) ; extra == 'dev'
Requires-Dist: flake8 (>=3.8.0) ; extra == 'dev'
Requires-Dist: mypy (>=0.800) ; extra == 'dev'

# 任务退避重试、调度框架

[![Python](https://img.shields.io/badge/Python-3-blue.svg)](https://www.python.org/)
[![License](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE)

一个基于Redis的分布式任务退避重试、调度框架，支持多种退避策略和并发执行。

## ✨ 核心特性

- 🔄 **多种退避策略**: 固定间隔、指数退避、线性退避
- 🚀 **高并发执行**: 支持线程池/进程池并发处理
- 📊 **任务优先级**: 支持任务优先级管理
- ⏱️ **超时控制**: 任务执行超时和失败处理
- 📈 **状态监控**: 完整的任务状态管理和监控
- 🎯 **灵活调度**: 可配置的定时调度间隔
- 🔒 **分布式锁**: 基于Redis的分布式任务队列
- 💾 **存储方式**: 目前只支持 Redis，后续会增加MySQL、达梦数据库等支持

## 📚设计思路


### 业务架构设计

![](http://fire-blog.oss-cn-beijing.aliyuncs.com//images/20250909113017.png)

### 退避工具内部组件设计


![](http://fire-blog.oss-cn-beijing.aliyuncs.com//images/20250909113043.png)

### 设计类图

![](http://fire-blog.oss-cn-beijing.aliyuncs.com//images/20250909113102.png)

### 调用时序图

用户提交task

![](http://fire-blog.oss-cn-beijing.aliyuncs.com//images/20250909113117.png)

调度执行task

![](http://fire-blog.oss-cn-beijing.aliyuncs.com//images/20250909113543.png)

## 📦 安装

```bash
pip install maas-task-backoff-framework
```

## 🚀 快速开始

### 使用示例

```python
from backoff.scheduler.backoff_scheduler import (
    TaskBackoffScheduler,
    TaskBackoffConfig,
    TaskEntity,
    TaskConfig,
    StorageConfig,
    ThreadPoolConfig,
    SchedulerConfig,
    ResultEntity,
)

# 1. 创建配置
config = TaskBackoffConfig()

# Redis存储配置
config.storage = StorageConfig(
    type="redis", 
    host="localhost", 
    port=6379, 
    database=0, 
    password=""
)

# 任务配置
config.task = TaskConfig(
    biz_prefix="my_service",
    batch_size=10,
    max_retry_count=3,
    backoff_strategy="exponential",
    backoff_interval=30,
    backoff_multiplier=2.0,
    min_gpu_memory_gb=0.5,
    min_gpu_utilization=10
)

# 线程池配置
config.threadpool = ThreadPoolConfig(
    concurrency=5, 
    exec_timeout=300, 
    proc_mode="process"
)

# 调度器配置
config.scheduler = SchedulerConfig(interval=10)

# 2. 创建调度器
scheduler = TaskBackoffScheduler(config)

# 3. 定义任务处理器
def task_handler(task: TaskEntity):
    # 您的业务逻辑
    print(f"处理任务: {task.task_id}")
    return ResultEntity.ok(
        result={"status": "success", "data": "处理完成"},
        task_id=task.task_id,
    )

def exception_handler(task: TaskEntity):
    # 异常处理逻辑
    return ResultEntity.fail(
        code=-1,
        message="任务执行失败",
        task_id=task.task_id,
    )

# 4. 注册处理器
scheduler.set_custom_task_handler(task_handler)

scheduler.set_custom_task_exception_handler(exception_handler)

# 5. 启动调度器（如果不需要启动调度器，请忽略此步骤，只提供基础的Task API操作）
scheduler.start()
```

## 💻 Task任务数据结构

```python
{
    "task_id": "task_5775520148",
    "process": 0,
    "status": "pending",
    "result": "",
    "param": "{\"index\": 111, \"data\": \"test_data\"}",
    "retry_count": 0,
    "next_execution_time": 0,
    "create_time": 1756695076,
    "update_time": 1756695076,
    "max_retry_count": 5,
    "backoff_strategy": "fixed",
    "backoff_interval": 60,
    "backoff_multiplier": 2.0,
    "min_gpu_memory_gb": 0,
    "min_gpu_utilization": 0.0,
    "biz_prefix": "custom_scheduling_example"
}
```

结果字段说明

|字段     | 类型   |  说明           |
|----------|--------|----------------|
| task_id     | string |  任务唯一id     |
| biz_prefix     | string |  任务的业务前缀    |
| process     | int |  任务进度  |
| status     | string    |  任务状态      |
| result | string    | 任务的执行结果    |
| param | string |   任务的参数      |
| retry_count | int |   重试次数      |
| next_execution_time | int |   下次执行时间 |
| create_time | int |   创建时间      |
| update_time | int |   更新时间      |
| max_retry_count | int |   最大重试次数      |
| backoff_strategy | string |   退避类型      |
| backoff_interval | int |   退避间隔      |
| backoff_multiplier | int |      退避倍数   |
| min_gpu_memory_gb | int |      需要的最小GPU内存   |
| min_gpu_utilization | int |      需要的最小GPU使用率   |


## 📚 API 参考

### 任务管理

#### 1.创建任务

创建任务可选权重值 `priority` , 不传默认会以为当前时间为权重值先进先出；`priority` 越小越先执行

```python
scheduler.create_task(
    task_params={"key": "value"},
    task_id="unique_task_id",
    priority=100
) -> Tuple[bool, message: str]
```

参数字段说明
| 字段     | 类型 | 必填   |  说明           |
|----------|--------|--------|----------------|
| task_params     |是| dict |  任务的业务参数     |
| task_id     | 是 |string |  自定义的任务id    |
| priority     |否| int |  权重值（不传默认会以为当前时间为权重值先进先出；越小越先执行）  |


#### 2.查询任务

```python
scheduler.get_task(task_id) -> TaskEntity
```

#### 3.撤销任务

- 线程模式下撤销任务仅会变更任务状态为 `canceled`，等待任务执行完成

- 进程模式下会直接取消正在运行中的任务并变更任务状态

```python
scheduler.cancel_task(task_id) -> Tuple[bool, str]:
```

#### 4.获取队列统计

```python
scheduler.get_queue_statistics() -> Dict[str, int]:
```

队列统计结构返回结果示例

```python
{
    "system_status": {
        "update_time": "2025-09-01 15:30:13"
    },
    "tasks": {
        "PENDING": {
            "total": 34,
            "queue": [
                "task_2728059290",
                "task_2829088805"
            ]
        },
        "DOING": {
            "total": 0,
            "queue": []
        },
        "DONE": {
            "total": 0,
            "queue": []
        },
        "FAILED": {
            "total": 0,
            "queue": []
        }
    }
}
```

字段说明
| 字段     | 类型   |  说明           |
|----------|--------|----------------|
| update_time     | string |  请求时间    |
| tasks     | object |  任务队列统计      |
| PENDING     | object |  等待队列      |
| DOING     | object |  处理中队列      |
| DONE     | object |  已完成队列      |
| FAILED     | object |  失败队列      |
| total     | int  |  任务队列成员总数      | 
| queue     | list |  任务成员列表      |


### 5.更任务状态

```python
# 标记任务完成
scheduler.mark_task_completed(task_id, "执行结果") -> bool

# 标记任务失败
scheduler.mark_task_failed(task_id, "失败原因") -> bool

# 标记任务处理中
scheduler.mark_task_processing(task_id) -> bool
```

### 6.更新任务进度

```python
# 更新任务进度
scheduler.update_task_progress(task_id, 75) -> bool # 75%
```

### 7.调度器控制

如果不显示执行 `scheduler.start()` 则不会启动定时调度，只提供 Task API 的基础操作

```python
# 启动调度器
scheduler.start()

# 关闭调度器
scheduler.shutdown()
```

### 8.处理器管理

- 任务处理器 ( `set_custom_task_handler` ): 可选设置，处理正常的业务逻辑

- 异常处理器 ( `set_custom_task_exception_handler` ): 可选设置，处理任务执行过程中的异常情况

```python
# 注册任务处理器（可选）
scheduler.set_custom_task_handler(task_handler)

# 注册异常处理器（可选）
scheduler.set_custom_task_exception_handler(exception_handler)
```

**处理器函数签名**

处理器参数必须为 `TaskEntity` ，响应类型必须为 `ResultEntity`

```python
from backoff.scheduler.backoff_scheduler import (
    TaskEntity,
    ResultEntity,
)
def task_handler(task: TaskEntity) -> ResultEntity:
    # 您的业务逻辑
    pass

def exception_handler(task: TaskEntity) -> ResultEntity:
    # 异常处理逻辑
    pass
```

#### 返回值示例

**成功处理:**

```python
return ResultEntity.ok(
    result={"status": "success", "data": "处理结果"},
    task_id=task.task_id,
)
```

**失败处理:**
```python
return ResultEntity.fail(
    code=-1,
    message="任务执行失败",
    result={"error": "具体错误信息"},
    task_id=task.task_id,
)
```


## ⚙️ 配置详解

### StorageConfig (存储配置)

| 参数     | 类型   | 必填 | 默认值    | 说明           |
|----------|--------|------|-----------|----------------|
| type     | string | 是   | redis     | 存储类型，目前仅支持Redis      |
| host     | string | 是   |  localhost| Redis主机地址  |
| port     | int    | 是   | 6379      | Redis端口      |
| database | int    | 否   | 0         | Redis数据库    |
| password | string | 否   | ""        | Redis密码      |

**type 存储类型:**

- `redis` : 默认
- `mysql` : 暂未支持
- `达梦` : 暂未支持

### TaskConfig (任务配置)

| 参数                | 类型   | 必填 | 默认值        | 说明                    |
|---------------------|--------|------|---------------|-------------------------|
| biz_prefix          | string | **是**   | -         | 业务唯一标识               |
| max_retry_count     | int    | 否   | 3             | 最大重试次数            |
| backoff_strategy    | string | 否   | exponential   | 退避策略                |
| backoff_interval    | int    | 否   | 30            | 初始退避间隔(秒)        |
| backoff_multiplier  | float  | 否   | 2.0           | 退避倍数                |
| batch_size          | int    | 否   | 100           | 批次大小                |
| min_gpu_memory_gb   | float  | 否   | 0             | 最小GPU内存(GB)         |
| min_gpu_utilization | int    | 否   | 0             | 最小GPU利用率(%)        |

**backoff_strategy 退避策略说明:**

- `exponential` : 指数退避 (默认)
- `fixed` : 固定间隔重试
- `linear` : 线性退避

### ThreadPoolConfig (线程池配置)

| 参数          | 类型   | 必填 | 默认值 | 说明                    |
|---------------|--------|------|--------|-------------------------|
| concurrency   | int    | 否   | 10     | 并发线程/进程数         |
| proc_mode     | string | 否   | process | 执行模式                |
| exec_timeout  | int    | 否   | 300    | 任务超时时间(秒)        |

**proc_mode 执行模式说明:**

- `thread` : 线程池模式
- `process` : 进程池模式 (默认)

线程池模式，一旦线程启动后，线程将会持续运行，直到执行完毕，线程将被回收。

进程池模式，如果进程内业务异常或者超时等待，进程将被收回。

### SchedulerConfig (调度器配置)

| 参数     | 类型 | 必填 | 默认值 | 说明           |
|----------|------|------|--------|----------------|
| interval | int  | 否   | 10     | 轮询间隔(秒)   |





