Metadata-Version: 2.4
Name: rainbow-rb-sdk
Version: 0.0.9.dev38
Summary: Rainbow Robotics 통합 Python SDK — AMR, 매니퓰레이터, RB-Y1 휴머노이드와 Rainbow Robot Service(common) 연동
Author-email: Derek <dfd1123@rainbow-robotics.com>
Requires-Python: >=3.11
Description-Content-Type: text/markdown
Requires-Dist: rainbow-rb-utils==0.0.9.dev38
Requires-Dist: rainbow-rb-zenoh==0.0.9.dev38
Requires-Dist: rainbow-rb-flat-buffers==0.0.9.dev38
Requires-Dist: rainbow-rb-schemas==0.0.9.dev38
Requires-Dist: rainbow-rb-log==0.0.9.dev38
Requires-Dist: rby1-sdk>=0.9.1

# rb_sdk

Rainbow Robotics 통합 Python SDK. AMR · Manipulator · RB-Y1 양팔로봇 제어와 Rainbow Robot Service(common) 연동을 단일 API로 제공합니다.

## 진입점

| 클래스 | 용도 |
|---|---|
| `RBAmrSDK` | AMR(자율 주행) 로봇 제어 |
| `RBManipulateSDK` | 협동 로봇 매니퓰레이터 제어 |
| `RBRby1SDK` | RB-Y1 양팔로봇 제어 |
| `RBBaseSDK` | 공통 베이스 (직접 상속하지 않는 한 사용할 일 없음) |

내부적으로 Zenoh pub/sub/query 메시 위에서 동작하며, 같은 PC(로봇 본체)에서 실행할 때는 별도 설정 없이 즉시 사용 가능합니다.

---

## 1. 설치

```bash
pip install rb_sdk
# RB-Y1 양팔로봇 제어가 필요하면 함께 설치
pip install rby1-sdk
```

`requires-python = ">=3.12,<3.13"` — Python 3.12 전용.

---

## 2. 기본 사용 (로봇 본체에서 실행하는 경우)

`common` 서비스와 같은 PC에서 실행되는 스크립트라면 추가 설정 없이 그대로 사용 가능합니다.

```python
from rb_sdk import RBAmrSDK, RBManipulateSDK

amr = RBAmrSDK()
manipulate = RBManipulateSDK()

# 이 시점에 ZenohClient가 127.0.0.1:7447(common)에 자동 연결
manipulate.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
)
```

---

## 3. 외부 PC에서 사용 (Remote Connect)

개발용 노트북 등 **로봇 본체와 다른 머신**에서 로봇과 통신하고 싶다면 `sdk.connect()`를 호출합니다.

```python
import asyncio
from rb_sdk import RBManipulateSDK
from rb_flat_buffers.IPC.State_Core import State_CoreT
from rb_zenoh.schema import SubscribeOptions

sdk = RBManipulateSDK()


async def main():
    # ⚠️  subscribe는 반드시 connect() 이후에 등록해야 한다.
    # connect() 내부에서 ZenohClient를 close() 후 재연결하므로
    # connect() 전에 등록한 subscribe는 세션과 함께 모두 사라진다.
    await sdk.connect(
        token="서비스 Token",
        host="192.168.1.100",
        # ttl=30,
        # zenoh_port=None,
        # heartbeat_interval=15.0,
    )

    @sdk.zenoh_subscribe(
        "C500920/state_core",
        flatbuffer_obj_t=State_CoreT,
        opts=SubscribeOptions(parse_dict_payload=False),
    )
    def on_state_core(*, obj_payload: State_CoreT | None, **kwargs):
        if obj_payload is None:
            return
        print("motion mode:", obj_payload.motionMode)

    try:
        # heartbeat task는 asyncio.run()이 살아있는 동안만 동작한다.
        # asyncio.run()이 반환되면 event loop가 닫히고 heartbeat가 취소되어
        # TTL 후 서버가 peer를 제거하므로 loop 안에서 대기해야 한다.
        while True:
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        pass
    finally:
        await sdk.disconnect()


try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("Ctrl+C pressed")
```

#### 주요 특징

- **로컬 IP 자동 감지**: `host` 방향의 라우팅 인터페이스 IP를 자동으로 찾아 로봇에 알림
- **포트 자동 배정**: 별도 지정 없이 OS가 빈 포트를 잡아주므로 포트 충돌 없음
- **자동 heartbeat**: 주기적으로 TTL 연장. 스크립트 종료 / 크래시 시 TTL 만료로 서버 측 정리
- **같은 PC 자동 감지**: `host`가 localhost거나 자기 IP인 경우 `connect()`는 아무것도 하지 않고 `None` 반환 (이미 메시 안에 있음)

#### connect() 인자

| 인자 | 타입 | 기본 | 설명 |
|---|---|---|---|
| `token` | `str` | (필수) | common 서비스 JWT |
| `host` | `str` | (필수) | common이 실행 중인 로봇 IP |
| `zenoh_port` | `int \| None` | `None` | 로컬 Zenoh listen 포트. `None`이면 자동 |
| `ttl` | `int` | `30` | 서버 측 세션 TTL(초) |
| `heartbeat_interval` | `float \| None` | `None` | heartbeat 주기. `None`이면 `ttl/2` |
| `mesh_timeout` | `float \| None` | `None` | mesh 형성 대기 최대 시간(초). `None`이면 무제한 대기 |
| `mesh_probe_timeout` | `float` | `1.0` | probe query 1회 timeout(초) |
| `mesh_probe_interval` | `float` | `0.5` | probe 재시도 간격(초) |

---

## 4. use_directly 모드

`RBBaseSDK` 생성자에 `use_directly=True`를 전달하면, 메서드 호출 시 `{name}_directly` 메서드가 존재하면 자동으로 그쪽으로 라우팅됩니다. 없으면 원래 메서드가 그대로 호출됩니다.

```python
class MySDK(RBManipulateSDK):
    def call_move_j_directly(self, *, robot_model, target, **kwargs):
        # Zenoh를 거치지 않고 직접 하드웨어 제어하는 구현
        ...

# use_directly=True → call_move_j 호출 시 call_move_j_directly로 라우팅
sdk = MySDK(use_directly=True)
sdk.move.call_move_j(robot_model="C500920", target={...})  # → call_move_j_directly 호출

# use_directly=False (기본) 또는 _directly 메서드 없으면 원래 메서드 사용
sdk2 = MySDK(use_directly=False)
sdk2.move.call_move_j(...)  # → call_move_j 호출
```

싱글톤 인스턴스가 이미 존재해도 `use_directly`는 매 생성자 호출마다 갱신됩니다.

---

## 5. AMR SDK (`RBAmrSDK`)

```python
from rb_sdk import RBAmrSDK

sdk = RBAmrSDK()
```

### 하위 모듈

| 속성 | 클래스 | 주요 기능 |
|---|---|---|
| `sdk.move` | `AmrMoveSDK` | 목표 지점 이동, jog, 정지/일시정지/재개, 직선·원호·회전 이동 |
| `sdk.control` | `AmrControlSDK` | 안전 필드/플래그/IO 설정, 도킹/언도킹, LED/모터/jog 제어, 장애물 박스 |
| `sdk.localization` | `AmrLocalizationSDK` | 위치 초기화, 랜덤/자동/반자동 init, 위치 추정 시작/정지 |
| `sdk.map` | `AmrMapSDK` | 맵 목록/로드/삭제, 토폴로지 get/set, 매핑 시작/정지/저장 |
| `sdk.setting` | `AmrSettingSDK` | 로봇 타입, 설정 카테고리/파라미터, PDU/드라이브/센서 설정 |
| `sdk.status` | `AmrStatusSDK` | 상태·이동 상태·LiDAR·경로 조회 |
| `sdk.file` | `AmrFileSDK` | 맵·리소스 파일 관리 |
| `sdk.capability` | `AmrCapabilitySDK` | 로봇 능력 조회 |
| `sdk.accessory` | `AmrAccessorySDK` | 액세서리 제어 |
| `sdk.program` | `AmrProgramSDK` | AMR 프로그램 실행/제어 |

### 이동 예시

```python
# 목표 노드로 이동
sdk.move.send_move_goal(
    robot_model="AMR001",
    goal_type=0,
    goal_node_name="node_A",
)

# 이동 결과 대기 (async)
result = await sdk.move.get_move_result(robot_model="AMR001")

# jog 이동
sdk.move.send_move_jog(robot_model="AMR001", vx=0.3, vy=0.0, wz=0.0)

# 멀티 경로
sdk.move.send_multi_path(robot_model="AMR001", path=["node_A", "node_B", "node_C"])
```

---

## 6. Manipulate SDK (`RBManipulateSDK`)

```python
from rb_sdk import RBManipulateSDK

sdk = RBManipulateSDK()
```

### 하위 모듈

| 속성 | 클래스 | 주요 기능 |
|---|---|---|
| `sdk.move` | `ManipulateMoveSDK` | 관절/직선/원호 이동, jog, 서보/온라인, 어드미턴스, 위빙, 컨베이어 |
| `sdk.program` | `ManipulateProgramSDK` | 프로그램 실행·정지·재개, 핀포인트, 툴 출력 |
| `sdk.config` | `ManipulateConfigSDK` | 툴·유저프레임·충돌 파라미터, 속도/impedance/freedrive 설정 |
| `sdk.io` | `ManipulateIOSDK` | side/flange/ext DIO·AIO 출력, 펄스, 비트 조합, IO 함수 설정 |
| `sdk.get_data` | `ManipulateGetDataSDK` | 변수 조회, 상대/절대 좌표 계산 |
| `sdk.point` | `ManipulatePointSDK` | 포인트·좌표 등록 |
| `sdk.service` | `ManipulateServiceSDK` | 미니 탭 기능 |
| `sdk.maintenance` | `ManipulateMaintenanceSDK` | 관리자 설정 |
| `sdk.state` | `ManipulateStateSDK` | whoami, 전원·서보·레퍼런스 제어, 실시간 상태 구독 |

### move 모듈 — 주요 메서드

| 메서드 | 설명 |
|---|---|
| `call_move_j` | 관절 공간(Joint) 이동 |
| `call_move_l` | 직선(Linear) 이동 |
| `call_move_cir_axis` | 축 기반 원호 이동 |
| `call_move_cir_threepoint` | 3점 기반 원호 이동 |
| `call_move_home` | Home 위치로 이동 |
| `call_move_jb_clr/add/run` | Joint 블렌딩 경로 등록·실행 |
| `call_move_lb_clr/add/run` | Linear 블렌딩 경로 등록·실행 |
| `call_move_xb_clr/add/run` | X 블렌딩 경로 등록·실행 |
| `call_smoothjog_j/l` | 스무스 jog (관절/직선) |
| `call_tickjog_j/l` | 틱 jog (관절/직선) |
| `call_approach_j/l` | 어프로치 이동 |
| `call_servo_j/l` | 서보 모드 관절/직선 이동 |
| `call_online_j/l` | 온라인 궤적 관절/직선 |
| `call_admittance_force_on/off/change` | 어드미턴스 포스 제어 |
| `call_arc_sensing_on/off` | 아크 센싱 제어 |
| `call_tcp_weaving_on/off` | TCP 위빙 제어 |
| `call_base_conveyor_on/off/speed` | 베이스 컨베이어 제어 |

### 이동 예시

```python
# 관절 이동
sdk.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [0, 0, 90, 0, 90, 0], "tar_frame": 0, "tar_unit": 1},
    speed={"spd_mode": 1, "spd_vel_para": 60, "spd_acc_para": 120},
)

# 직선 이동
sdk.move.call_move_l(
    robot_model="C500920",
    target={"tar_values": [400, 0, 500, 0, 180, 0], "tar_frame": 1, "tar_unit": 0},
)

# 기준 위치로부터 상대 이동
sdk.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [10, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 1},
    reference_value=[0, 0, 90, 0, 90, 0],  # 현재 기준값
)

# 전원·서보 제어
sdk.state.call_powercontrol(robot_model="C500920", control=1)
sdk.state.call_servocontrol(robot_model="C500920", control=1)
```

---

## 7. RB-Y1 SDK (`RBRby1SDK`)

공식 `rby1-sdk` 래퍼. 자세한 설명은 [rby1_sdk/README.md](src/rb_sdk/rby1_sdk/README.md) 참조.

```python
from rb_sdk import RBRby1SDK

sdk = RBRby1SDK(endpoint="192.168.30.1:50051", model=None, auto_connect=True)

print(sdk.connected)
print(sdk.whoami())

await sdk.control.power_on(robot_model="a")
await sdk.control.servo_on(robot_model="a")

state = await sdk.state.get_state(robot_model="a")
```

모든 public 메서드는 `async def` + 내부 `asyncio.to_thread`입니다 (공식 SDK가 blocking C++ pybind11이므로).
sync 컨텍스트(PyFM Step 등)에서는 `asyncio.run(...)` 또는 `RBRby1BaseSDK.run_sync(...)` 사용.

---

## 8. FlowManagerArgs 통합 (PyFM)

PyFM Step에서 SDK 메서드를 호출할 때 `flow_manager_args`를 전달하면 done 콜백 처리, 이동 완료 대기, 변수 바인딩 등이 자동으로 처리됩니다.

```python
# PyFM Step 내부
def execute(self, flow_manager_args):
    sdk.move.call_move_j(
        robot_model="C500920",
        target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
        flow_manager_args=flow_manager_args,  # done() 콜백 자동 호출
    )
```

`flow_manager_args`를 생략하면 블로킹 호출로 동작합니다.

---

## 9. 아키텍처 메모

### 프로세스 단위 싱글톤

`RBBaseSDK`(`RBAmrSDK`, `RBManipulateSDK`)는 `(pid, cls)` 기준 싱글톤입니다. 같은 프로세스에서 여러 번 생성자를 호출해도 동일 인스턴스가 반환됩니다. ZenohClient도 PID당 하나를 공유합니다.

```python
a = RBManipulateSDK()
b = RBManipulateSDK()
assert a is b  # True
```

### 자동 에러 래핑

`RBBaseSDK`를 상속한 클래스의 모든 public instance method는 `__init_subclass__`에 의해 자동으로 공통 try/except로 래핑됩니다. `ZenohNoReply` / `ZenohTransportError` 발생 시 자동 재연결 후 1회 재시도합니다.

### SDK 종료

```python
sdk.close()                         # 현재 인스턴스 정리
RBBaseSDK.close_all_for_pid()       # 현재 프로세스의 모든 SDK 인스턴스 정리
```

---

## 10. 주의사항

- `RBRby1SDK`는 최초 `endpoint/model` 기준으로 세션이 공유됩니다.
- 연결 대상(zenoh, rby1 endpoint, Muscat HTTP)이 준비되지 않으면 런타임 예외가 발생합니다.
- 외부 PC에서 SDK 사용 시 **방화벽이 7447/TCP(zenoh)와 8000/TCP을 양방향으로 허용**해야 합니다.
- `use_directly=True`는 싱글톤 재진입 시에도 반영됩니다. 단, 하위 SDK(`sdk.move` 등)에는 전파되지 않으므로 `_directly` 메서드는 최상위 SDK에 정의해야 합니다.

---

## 11. Subscribe / Queryable

SDK를 상속하거나 직접 사용해 **Zenoh 이벤트를 수신(subscribe)** 하거나 **RPC 응답자(queryable)** 를 등록할 수 있습니다.

### 11-1. `zenoh_subscribe` — 토픽 구독 (데코레이터)

```python
from rb_sdk import RBManipulateSDK
from rb_zenoh.schema import SubscribeOptions, OverflowPolicy

sdk = RBManipulateSDK()

# ── 패턴 1: dict 페이로드 수신 (JSON 토픽) ──
@sdk.zenoh_subscribe("amr/AMR001/status")
def on_status(*, dict_payload: dict | None, **kwargs):
    print("status:", dict_payload)

# ── 패턴 2: Object API (T 클래스) — 권장 ──
# flatbuffer_obj_t 지정 시 InitFromPackedBuf로 역직렬화된 T 객체가 obj_payload로 전달됨
# T 클래스 필드는 snake_case 속성으로 직접 접근 가능
from rb_flat_buffers.IPC.State_Core import State_CoreT

@sdk.zenoh_subscribe(
    "C500920/state_core",
    flatbuffer_obj_t=State_CoreT,
    opts=SubscribeOptions(parse_dict_payload=False),
)
def on_state_core(*, obj_payload: State_CoreT | None, **kwargs):
    if obj_payload is None:
        return
    print("motion mode:", obj_payload.motionMode)  # snake_case 속성

# ── 패턴 3: Table API (mv + GetRootAs) — lazy access, 성능 우선 시 ──
# flatbuffer_obj_t 미지정 → mv(memoryview)를 직접 받아 수동 파싱
# GetRootAs는 필요한 필드만 lazy 접근하므로 T 클래스보다 빠름
from rb_flat_buffers.IPC.State_Core import State_Core

@sdk.zenoh_subscribe(
    "C500920/state_core",
    opts=SubscribeOptions(parse_dict_payload=False),
)
def on_state_core_mv(*, mv=None, **kwargs):
    if mv is None:
        return
    obj = State_Core.GetRootAs(mv, 0)
    print("motion mode:", obj.MotionMode())  # 메서드 호출
```

#### `zenoh_subscribe` 인자

| 인자 | 타입 | 기본 | 설명 |
|---|---|---|---|
| `topic` | `str` | (필수) | Zenoh key expression (예: `"amr/{model}/status"`) |
| `flatbuffer_obj_t` | FlatBuffer T 클래스 \| `None` | `None` | FlatBuffer로 역직렬화할 때 지정. `None`이면 콜백에 `mv: memoryview` 전달 |
| `opts` | `SubscribeOptions \| None` | `None` | 디스패치 / 오버플로우 옵션 |

#### `SubscribeOptions` 필드

| 필드 | 기본 | 설명 |
|---|---|---|
| `dispatch` | `"immediate"` | `"immediate"`: 수신 스레드에서 바로 콜백. `"queue"`: C++ bounded queue + 전용 dispatch 스레드 |
| `parse_dict_payload` | `True` | FlatBuffer 수신 시 `False`로 지정 |
| `overflow` | `DROP_OLDEST` | `DROP_OLDEST` / `DROP_NEW` / `LATEST_ONLY` |
| `maxsize` | `50` | `"queue"` 모드의 C++ 큐 최대 크기 |
| `rate_limit_per_sec` | `None` | 초당 최대 콜백 호출 수 |
| `sample_every` | `1` | N개 중 1개만 전달 (1 = 전부) |
| `max_async_inflight` | `1` | async 콜백 동시 실행 허용 수 |

#### 직접 subscribe (handle 보관이 필요한 경우)

데코레이터 대신 `zenoh_client.subscribe()`를 직접 호출하면 handle을 직접 관리할 수 있습니다.

```python
sub_handle = sdk.zenoh_client.subscribe(
    "amr/AMR001/moveStatus",
    callback,
    flatbuffer_obj_t=MoveStatusT,
    options=SubscribeOptions(dispatch="queue", overflow=OverflowPolicy.LATEST_ONLY),
)

# 구독 해제
sub_handle.close()
```

---

### 11-2. `zenoh_queryable` — RPC 응답자 (데코레이터)

원격에서 `get(keyexpr)` 요청이 오면 등록된 handler가 호출되어 응답을 반환합니다.

```python
from rb_sdk import RBManipulateSDK

sdk = RBManipulateSDK()

# dict 요청 (payload 없음) → dict 반환
# params는 URL query string (예: ?model=C500920)을 파싱한 dict[str, str]
@sdk.zenoh_queryable("my_service/robot_status")
def handle_status_query(*, params: dict[str, str]) -> dict:
    return {"status": "ok", "model": params.get("model")}

# FlatBuffer 요청 수신 → FlatBuffer 응답
from rb_flat_buffers.manipulate.RB_Functions import RB_FunctionsT

@sdk.zenoh_queryable(
    "my_service/move_j",
    flatbuffer_req_T_class=RB_FunctionsT,
    flatbuffer_res_buf_size=256,
)
def handle_move_j(*, req: RB_FunctionsT, params: dict[str, str]) -> RB_FunctionsT:
    res = RB_FunctionsT()
    res.returnValue = 0
    return res
```

#### `zenoh_queryable` 인자

| 인자 | 타입 | 기본 | 설명 |
|---|---|---|---|
| `keyexpr` | `str` | (필수) | Zenoh key expression (예: `"my_service/status"`) |
| `flatbuffer_req_T_class` | FlatBuffer T 클래스 \| `None` | `None` | 요청 페이로드를 FlatBuffer로 역직렬화할 때 지정 |
| `flatbuffer_res_buf_size` | `int \| None` | `None` | FlatBuffer 응답 직렬화 버퍼 크기(바이트). FlatBuffer 응답 시 필수 |

#### 중복 등록 방지

같은 `keyexpr`로 이미 등록된 queryable이 있으면 경고 메시지를 출력하고 기존 handler를 유지합니다. SDK `close()` 시 자동 해제됩니다.

---

## 12. 라이선스 / 문의

© Rainbow Robotics. 내부 이슈/문의: [GitHub Issues](https://github.com/rainbow-mobile/rainbow-release-apps/issues)
