Metadata-Version: 2.4
Name: puyuan-robot-sdk
Version: 1.0.0
Summary: 警小安特种安防机器人服务端集成 SDK
Home-page: https://github.com/puyuan/puyuan-robot-sdk
Author: PuYuan
Author-email: PuYuan <dev@puyuan.com>
License: MIT
Project-URL: Homepage, https://github.com/puyuan/puyuan-robot-sdk
Project-URL: Documentation, https://github.com/puyuan/puyuan-robot-sdk#readme
Project-URL: Repository, https://github.com/puyuan/puyuan-robot-sdk.git
Project-URL: Issues, https://github.com/puyuan/puyuan-robot-sdk/issues
Keywords: robot,mqtt,iot,security-robot,puyuan
Classifier: Development Status :: 5 - Production/Stable
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Operating System :: OS Independent
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Hardware :: Hardware Drivers
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: paho-mqtt>=1.6.0
Requires-Dist: httpx>=0.24.0
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: build>=1.0; extra == "dev"
Requires-Dist: twine>=5.0; extra == "dev"
Requires-Dist: pytest>=7.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21; extra == "dev"
Dynamic: author
Dynamic: home-page
Dynamic: requires-python

# 警小安管控 SDK (Python)

`puyuan-robot-sdk` 是警小安特种安防机器人的 Python 服务端集成 SDK，基于 `asyncio`、`paho-mqtt`、`httpx`、`pydantic` V2 构建。

**架构亮点：**

- **单连接多路复用** — 全局仅占用一条 TCP MQTT 长链路，即可支撑大规模机器人集群的高并发控制与状态订阅。
- **双线程异步桥接** — paho-mqtt 守护线程通过 `call_soon_threadsafe` 桥接到 asyncio 事件循环，上层业务代码全程享受 `async/await` 编程体验。
- **`asyncio.Future` + `msgId` 回执引擎** — 指令下发后协程挂起，设备 ACK 到达后自动唤醒，支持常规 5 秒与 OTA 级 600 秒双超时策略。
- **Pydantic V2 严格类型校验** — 所有数据模型（电机状态、底盘工况、抓拍回执、告警事件等）均通过 Pydantic V2 构建，编译级类型安全与运行时参数合法性校验双保险。

---

## 目录

- [安装](#安装)
- [快速开始](#快速开始)
- [配置](#配置)
- [枚举字典](#枚举字典)
- [数据模型](#数据模型)
- [舰队管理器](#舰队管理器)
- [设备操作句柄](#设备操作句柄)
  - [车辆运控](#车辆运控)
  - [任务与巡逻](#任务与巡逻)
  - [云台与感知](#云台与感知)
  - [媒体与外设](#媒体与外设)
  - [系统与运维](#系统与运维)
- [告警事件](#告警事件)
- [HTTP 客户端](#http-客户端)
- [异常体系与错误处理](#异常体系与错误处理)
- [鉴权算法](#鉴权算法)
- [发布与打包](#发布与打包)

---

## 安装

### 从 PyPI 安装（推荐）

```bash
pip install puyuan-robot-sdk
```

### 从 Git 仓库安装

```bash
pip install git+https://github.com/puyuan/puyuan-robot-sdk.git
```

### 离线安装（私有部署）

```bash
# 1. 下载依赖 wheel 包
./download_deps.sh

# 2. 安装依赖
pip install --no-index --find-links wheels/ paho-mqtt httpx "pydantic>=2.0.0"

# 3. 安装 SDK
pip install -e .
```

### 依赖

| 依赖 | 最低版本 | 用途 |
|------|----------|------|
| `paho-mqtt` | 1.6.0 | MQTT 长连接通信 |
| `httpx` | 0.24.0 | 异步 HTTP API 请求 |
| `pydantic` | 2.0.0 | 类型校验与数据模型 |

---

## 快速开始

```python
import asyncio
from puyuan_robot_sdk import (
    PuYuanFleetManager,
    RobotConfig,
    Direction,
    CameraPos,
    LampPos,
    DeviceBizError,
    TimeoutError,
)


async def main():
    # 1. 初始化舰队管理器
    config = RobotConfig(
        http_url="https://api.puyuan.com",
        mqtt_url="tcp://mqtt.puyuan.com:1883",
        app_key="YOUR_APP_KEY",
        app_secret="YOUR_APP_SECRET",
    )
    fleet = PuYuanFleetManager(config)

    # 2. 建立双擎链路（MQTT + HTTP）
    await fleet.connect()

    try:
        # 3. 获取设备操作句柄
        robot = fleet.get_device("111222")

        # 4. 启动设备，夺取控制权
        await robot.vehicle.launch()
        await robot.vehicle.activate_control()

        # 5. 控制底盘移动
        await robot.vehicle.move(Direction.FRONT)

        # 6. 转动云台并抓拍
        await robot.camera.turn(CameraPos.HEAD, horizontal=45, vertical=-15)
        capture = await robot.camera.capture()
        print(f"抓拍图片: {capture.url}")

        # 7. 开启警示灯
        await robot.media.lamp(LampPos.WARNING, True)

        # 8. 释放控制权
        await robot.vehicle.close_control()

    except TimeoutError as e:
        print(f"指令超时: {e.msg_id}")
    except DeviceBizError as e:
        print(f"设备执行失败: [{e.code}] {e.message}")
    finally:
        # 9. 断开连接
        await fleet.disconnect()


asyncio.run(main())
```

---

## 配置

```python
from puyuan_robot_sdk import RobotConfig

config = RobotConfig(
    http_url="https://api.puyuan.com",          # HTTP API 基地址
    mqtt_url="tcp://mqtt.puyuan.com:1883",      # MQTT Broker 地址（支持 tcp:// / mqtt://）
    app_key="YOUR_APP_KEY",                      # 平台分配的应用标识
    app_secret="YOUR_APP_SECRET",                # 平台分配的应用密钥
    default_timeout_sec=5.0,                     # 全局默认超时（秒），默认 5.0
)
```

---

## 枚举字典

```python
from puyuan_robot_sdk import Direction, CameraPos, LampPos, ArmAction

# 移动方向（4 项）
Direction.FRONT   # "front" — 前进
Direction.AFTER   # "after" — 后退
Direction.LEFT    # "left"  — 左转
Direction.RIGHT   # "right" — 右转

# 摄像头方位（6 项）
CameraPos.FRONT   # "front" — 前视
CameraPos.AFTER   # "after" — 后视
CameraPos.LEFT    # "left"  — 左视
CameraPos.RIGHT   # "right" — 右视
CameraPos.HEAD    # "head"  — 头部云台
CameraPos.MIXED   # "mixed" — 混合多视角

# 灯光位置（4 项）
LampPos.RUNNING      # "running"    — 行车灯
LampPos.SIDE_MARKER  # "sideMarker" — 示廓灯
LampPos.WARNING      # "warning"    — 警示灯
LampPos.GLARE        # "glare"      — 强光灯

# 机械臂动作（3 项）
ArmAction.SALUTE    # "salute"    — 敬礼
ArmAction.HANDSHAKE # "handshake" — 握手
ArmAction.DEFENSE   # "defense"   — 防御姿态
```

---

## 数据模型

### MotorState — 电机节点快照

```python
from puyuan_robot_sdk import MotorState

motor = MotorState(
    id=1,       # 电机节点 ID
    fault=0,    # 故障码（0 为正常）
    temp=35.5,  # 实时温度（℃）
    curr=2.1,   # 实时电流（A）
)
```

### HardwareStatus — 硬件工况快照

```python
from puyuan_robot_sdk import HardwareStatus

hw = HardwareStatus(
    env_temp=28.0,     # 舱内环境温度（℃）
    latency_ms=12,     # 总线延迟（ms）
    motors=[motor],    # 电机矩阵
)
```

### StatusData — 设备全景运行状态

```python
from puyuan_robot_sdk import StatusData

status = StatusData(
    power=85,              # 电量百分比（0-100，Pydantic Field 校验）
    status="patrol",       # 状态机: patrol / charge / maintenance / alarm
    speed=1.2,             # 实时速度（m/s）
    emergency_stop=0,      # 急停: 1=触发 0=正常
    opened_lamps=[],       # 当前亮灯列表
    hardware_status=hw,    # 硬件底况
)
```

### CaptureResult — 抓拍回执

```python
from puyuan_robot_sdk import CaptureResult

capture = CaptureResult(
    url="https://oss.puyuan.com/img/abc.jpg",  # 云端图片地址
    time=1717425600000,                          # 抓拍时间戳（ms）
)
```

### MediaAttachment — 媒体附件

```python
from puyuan_robot_sdk import MediaAttachment

video = MediaAttachment(
    url="https://oss.puyuan.com/video/xyz.mp4",
    duration=30,   # 时长（秒），图片为 0
)
```

### AlarmEvent — AI 告警事件

```python
from puyuan_robot_sdk import AlarmEvent

alarm = AlarmEvent(
    event_id="ALM-20240603-001",          # 告警流水号
    event_type="crowd_gather",             # 事件类型
    address="西区 3 号楼一层大厅",          # 事发地
    videos=[video],                        # 取证视频
    images=[capture],                      # 抓拍图片
)
```

---

## 舰队管理器

`PuYuanFleetManager` 是 SDK 的全局入口，管理 MQTT 长连接与 HTTP 连接池的生命周期。

```python
from puyuan_robot_sdk import PuYuanFleetManager, RobotConfig

config = RobotConfig(
    http_url="https://api.puyuan.com",
    mqtt_url="tcp://mqtt.puyuan.com:1883",
    app_key="YOUR_APP_KEY",
    app_secret="YOUR_APP_SECRET",
)
fleet = PuYuanFleetManager(config)

# 建立双擎链路
await fleet.connect()

# 获取设备操作句柄（自动缓存，同一 SN 只创建一个）
robot = fleet.get_device("ROBOT-SN-001")

# 监控指标
print(f"连接状态: {fleet.metrics.is_connected}")
print(f"重连次数: {fleet.metrics.reconnect_count}")

# 断开所有连接
await fleet.disconnect()
```

### 多租户隔离

对接多个客户平台时，实例化多个 `PuYuanFleetManager` 即可实现连接、配置完全隔离：

```python
customer_a = PuYuanFleetManager(config_a)
customer_b = PuYuanFleetManager(config_b)

await customer_a.connect()
await customer_b.connect()
```

---

## 设备操作句柄

`DeviceHandle` 通过 `fleet.get_device(sn)` 获取，聚合五大业务域：

```python
robot = fleet.get_device("111222")

robot.vehicle  # 车辆运控
robot.task     # 任务与巡逻
robot.camera   # 云台与感知
robot.media    # 媒体与外设
robot.system   # 系统与运维
```

### 车辆运控

```python
# 移动控制
await robot.vehicle.move(Direction.FRONT)
await robot.vehicle.move(Direction.AFTER)

# 紧急制动（最高优先级，中断所有机动）
await robot.vehicle.emergency_stop()

# 启动 / 唤醒
await robot.vehicle.launch()

# 控制权锁（夺取最高控制权，暂停自主任务）
await robot.vehicle.activate_control()

# 释放控制权（务必在 finally 块中调用）
await robot.vehicle.close_control()

# 底盘升降（0-100 百分比，越界抛 ValueError）
await robot.vehicle.set_chassis_height(50)
```

### 任务与巡逻

```python
# 自主回桩充电
await robot.task.return_to_pile(
    pile_sn="PILE-01",
    x=120.345,
    y=30.678,
    toward=0.0,      # 入桩绝对朝向，0=正北
)

# 下发巡逻计划
await robot.task.start_patrol(plan_id="PLAN-WEST-01")

# 定点布控
await robot.task.execute_deployment(task_id="DEPLOY-20240603")
```

### 云台与感知

```python
from puyuan_robot_sdk import CameraPos

# 云台绝对角度定位
await robot.camera.turn(
    CameraPos.HEAD,
    horizontal=45,   # -180 ~ 180
    vertical=-15,    # -90 ~ 90
)

# 光学变焦
await robot.camera.set_zoom(CameraPos.HEAD, rate=2.5)

# 瞬时抓拍（挂起直至图片上传完毕）
capture = await robot.camera.capture()
print(f"图片地址: {capture.url}")

# AI 告警事件闭环处置
await robot.camera.handle_ai_event(
    event_id="ALM-001",
    scheme_json='{"action":"dispatch_guard","priority":"high"}',
)
```

### 媒体与外设

```python
from puyuan_robot_sdk import LampPos

# 灯光控制
await robot.media.lamp(LampPos.WARNING, True)   # 开启警示灯
await robot.media.lamp(LampPos.RUNNING, False)   # 关闭行车灯

# TTS 大喇叭喊话
await robot.media.shout("警报：发现可疑人员，请立即离开！")

# 多通道音量调节
await robot.media.set_volumes(shout=80, intercom=60, broadcast=50)
```

### 系统与运维

```python
# OTA 固件升级（内部自动使用 600 秒长超时）
await robot.system.update_ota(
    record_id=8848,
    device_type="main_controller",
    version="v2.3.1",
    url="https://cdn.puyuan.com/firmware/main_v2.3.1.bin",
)
```

---

## 告警事件

SDK 支持通过回调订阅设备主动上报的告警事件。回调在 asyncio 事件循环中执行，严禁阻塞。

```python
from puyuan_robot_sdk import AlarmEvent


async def handle_alarm(sn: str, event: AlarmEvent):
    """告警回调 — 内部处理逻辑需通过 asyncio.create_task() 异步化。"""
    print(f"[{sn}] 告警: {event.event_type} @ {event.address}")
    for img in event.images:
        print(f"  → 抓拍: {img.url}")
    for vid in event.videos:
        print(f"  → 视频: {vid.url}")


# 注册回调
robot.on_alarm(handle_alarm)

# 取消回调
robot.off_alarm()
```

---

## HTTP 客户端

SDK 内置 HTTP 客户端，每次请求自动注入签名 Header。一般通过业务域方法间接使用，也可直接调用：

```python
# POST
resp = await fleet._http_client.post(
    sn="111222",
    uri="/car-api/device-status",
    data={"fields": ["power", "speed"]},
)

# GET
resp = await fleet._http_client.get(
    sn="111222",
    uri="/car-api/patrol-record",
    params={"date": "2024-06-03"},
)

# 文件上传（签名中 file 参数取字节大小）
resp = await fleet._http_client.upload_file(
    sn="111222",
    uri="/car-api/upload-file",
    file_path="/path/to/video.mp4",
    file_size=20971520,
    extra_data={"filename": "video.mp4"},
)
```

---

## 异常体系与错误处理

```python
from puyuan_robot_sdk import DeviceBizError, TimeoutError, AuthError

try:
    await robot.vehicle.move(Direction.FRONT)
except TimeoutError as e:
    # 链路超时：设备在超时窗口内未返回 ACK
    print(f"超时 msgId: {e.msg_id}")
except DeviceBizError as e:
    # 业务失败：设备返回 code != 0
    print(f"设备错误 [{e.code}]: {e.message}")
except AuthError as e:
    # 鉴权失败：app_key/app_secret 未配置
    print(f"鉴权异常: {e.reason}")
```

**异常继承关系：** 所有异常直接继承自 `Exception`，互不嵌套，方便业务侧按需捕获。

---

## 鉴权算法

SDK 内置签名引擎，无需手动计算。

### HTTP 请求签名

字典排序 MD5 算法：

```
MD5(sn + "\n" + timestamp_ms + "\n" + method + "\n" + URI + "\n" + k1=v1&k2=v2)
```

```python
from puyuan_robot_sdk.auth import generate_http_sign

sign = generate_http_sign(
    sn="111222",
    timestamp="1742457802838",
    method="POST",
    uri="/car-api/upload-file",
    params={"filename": "test.mp4", "file": "20971520"},
)
# 输出: 05d753e0e0952df698b0f9e613644a5b
```

### MQTT 连接密码

16 轮循环 MD5 算法：

```python
from puyuan_robot_sdk.auth import generate_mqtt_password

password = generate_mqtt_password("account123", "salt456")
# 输出: 7d96d21b62aa854a883dd6af6d8eaf3c
```

---

## 发布与打包

### 构建 Wheel 分发

```bash
# 安装构建工具
pip install build

# 构建
python -m build

# 产物:
# dist/puyuan-robot-sdk-1.0.0.tar.gz
# dist/puyuan_robot_sdk-1.0.0-py3-none-any.whl
```

### 发布到 PyPI

```bash
# 安装上传工具
pip install twine

# 上传到 PyPI
twine upload dist/*

# 或上传到私有仓库
twine upload --repository-url https://your-private-pypi.com/ dist/*
```

### 本地验证 Wheel

```bash
# 创建虚拟环境
python -m venv /tmp/test-sdk
source /tmp/test-sdk/bin/activate

# 从本地 wheel 安装
pip install dist/puyuan_robot_sdk-1.0.0-py3-none-any.whl

# 验证
python -c "
from puyuan_robot_sdk import (
    PuYuanFleetManager, RobotConfig,
    Direction, CameraPos, LampPos, ArmAction,
    StatusData, CaptureResult, AlarmEvent,
    DeviceBizError, TimeoutError, AuthError,
)
print('SDK 安装成功，全部 16 个模块可用')
"
```

### 通过 Git Tag 发布

```bash
git tag v1.0.0
git push origin v1.0.0
```

第三方可通过以下方式安装：

```bash
pip install git+https://github.com/puyuan/puyuan-robot-sdk.git@v1.0.0
```

---

## 项目结构

```
puyuan-robot-sdk/
├── puyuan_robot_sdk/
│   ├── __init__.py          # 公共 API 导出
│   ├── enums.py             # 枚举字典（Direction, CameraPos, LampPos, ArmAction）
│   ├── exceptions.py        # 异常体系（DeviceBizError, TimeoutError, AuthError）
│   ├── config.py            # 全局配置（RobotConfig）
│   ├── models.py            # 数据模型（6 个 Pydantic V2 BaseModel）
│   ├── auth.py              # 鉴权引擎（HTTP Sign + MQTT Password）
│   ├── http_client.py       # HTTP 异步客户端（httpx 封装）
│   ├── mqtt_client.py       # MQTT 通信核心（双线程 + Future 回执引擎）
│   ├── fleet_manager.py     # 舰队管理器（全局入口）
│   ├── device.py            # 设备操作句柄（五大域聚合）
│   └── domains/
│       ├── __init__.py
│       ├── vehicle.py       # 车辆运控域
│       ├── task.py          # 任务与巡逻域
│       ├── camera.py        # 云台与感知域
│       ├── media.py         # 媒体与外设域
│       └── system.py        # 系统与运维域
├── setup.py                 # 传统打包配置
├── pyproject.toml           # 现代打包配置
├── requirements.txt         # 运行时依赖
├── download_deps.sh         # 离线依赖下载脚本
└── README.md
```

---

## License

MIT
