CUDA Kernels User Guide

This guide covers the CUDA acceleration features in ManipulaPy, which provide GPU-accelerated functions for high-performance robotic computations including trajectory planning, dynamics computation, and potential field calculations.

What is GPU Acceleration?

GPU acceleration uses NVIDIA graphics cards to perform parallel computations much faster than traditional CPU processing. ManipulaPy leverages CUDA through:

  • Numba CUDA kernels: Low-level parallel computation

  • CuPy arrays: NumPy-compatible GPU arrays

  • Automatic fallback: Graceful degradation to CPU when GPU unavailable

  • Memory optimization: Efficient GPU memory management

Key benefits include:

  • 10-100x Performance Improvement: Significant speedup for large-scale computations

  • Real-time Capable: Suitable for real-time control applications

  • Seamless Integration: Works directly with existing ManipulaPy classes

Mathematical Foundation

CUDA Kernel Implementation

Trajectory Generation Kernel:

The GPU kernel computes smooth trajectories using parallel processing:

\[\begin{split}s(t) = \begin{cases} 3(t/T_f)^2 - 2(t/T_f)^3 & \text{cubic} \\ 10(t/T_f)^3 - 15(t/T_f)^4 + 6(t/T_f)^5 & \text{quintic} \end{cases}\end{split}\]

where each thread computes one trajectory point in parallel:

@cuda.jit
def trajectory_kernel(thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method):
    idx = cuda.grid(1)
    if idx < N:
        t = idx * (Tf / (N - 1))
        # Compute time scaling function...

Dynamics Computation:

Parallel inverse dynamics using the Newton-Euler formulation:

\[\boldsymbol\tau = M(\boldsymbol\theta)\ddot{\boldsymbol\theta} + C(\boldsymbol\theta,\dot{\boldsymbol\theta})\dot{\boldsymbol\theta} + G(\boldsymbol\theta) + J^T(\boldsymbol\theta)F_{ext}\]

Each GPU thread processes one trajectory point simultaneously.

Installation and Setup

System Requirements

Hardware: - NVIDIA GPU with Compute Capability 3.0 or higher - 4GB+ GPU memory recommended - PCIe 3.0 x16 slot

Software: - NVIDIA GPU drivers (latest recommended) - CUDA Toolkit 11.0+ or 12.0+ - Python 3.8+

Installation Options

Option 1: Full GPU Support (Recommended)

# For CUDA 11.x systems
pip install cupy-cuda11x numba ManipulaPy

# For CUDA 12.x systems
pip install cupy-cuda12x numba ManipulaPy

Option 2: CPU-Only Installation

# Basic installation (no GPU acceleration)
pip install ManipulaPy

Verification

from numba import cuda
import numpy as np

# Check CUDA availability
try:
    cuda.detect()
    print("βœ… CUDA acceleration available")
    CUDA_AVAILABLE = True
except:
    print("❌ CUDA not available - using CPU fallback")
    CUDA_AVAILABLE = False

# Check CuPy availability
try:
    import cupy as cp
    cp.cuda.Device(0).compute_capability
    print("βœ… CuPy GPU arrays available")
    CUPY_AVAILABLE = True
except:
    print("❌ CuPy not available")
    CUPY_AVAILABLE = False

Available CUDA Kernels

Trajectory Generation Kernels

trajectory_kernel

Generates smooth joint trajectories with cubic or quintic time scaling.

@cuda.jit
def trajectory_kernel(thetastart, thetaend, traj_pos, traj_vel, traj_acc, Tf, N, method):

Parameters: - thetastart: Starting joint angles - thetaend: Ending joint angles - traj_pos: Output trajectory positions - traj_vel: Output trajectory velocities - traj_acc: Output trajectory accelerations - Tf: Total trajectory time - N: Number of trajectory points - method: Time scaling method (3=cubic, 5=quintic)

cartesian_trajectory_kernel

Generates Cartesian space trajectories for end-effector motion.

@cuda.jit
def cartesian_trajectory_kernel(pstart, pend, traj_pos, traj_vel, traj_acc, Tf, N, method):

Dynamics Computation Kernels

inverse_dynamics_kernel

Computes required joint torques for given motion trajectories.

@cuda.jit
def inverse_dynamics_kernel(
    thetalist_trajectory, dthetalist_trajectory, ddthetalist_trajectory,
    gravity_vector, Ftip, Glist, Slist, M, torques_trajectory, torque_limits):

Features: - Parallel computation across trajectory points - Includes mass matrix, Coriolis, and gravity effects - Automatic torque limit enforcement

forward_dynamics_kernel

Simulates robot motion given applied torques.

@cuda.jit
def forward_dynamics_kernel(
    thetalist, dthetalist, taumat, g, Ftipmat, dt, intRes,
    Glist, Slist, M, thetamat, dthetamat, ddthetamat, joint_limits):

Potential Field Kernels

attractive_potential_kernel

Computes attractive potential fields for path planning.

@cuda.jit
def attractive_potential_kernel(positions, goal, potential):

repulsive_potential_kernel

Computes repulsive potential fields around obstacles.

@cuda.jit
def repulsive_potential_kernel(positions, obstacles, potential, influence_distance):

gradient_kernel

Computes gradients of potential fields for navigation.

@cuda.jit
def gradient_kernel(potential, gradient):

Using GPU Acceleration

Basic Trajectory Generation

import numpy as np
from ManipulaPy.path_planning import TrajectoryPlanning
from ManipulaPy.urdf_processor import URDFToSerialManipulator

def generate_gpu_trajectory_example():
    """Example using ManipulaPy's TrajectoryPlanning with GPU acceleration."""

    # Load robot model
    urdf_path = "path/to/robot.urdf"
    urdf_processor = URDFToSerialManipulator(urdf_path)
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics

    # Create trajectory planner
    joint_limits = [(-np.pi, np.pi)] * 6  # Example 6-DOF robot
    trajectory_planner = TrajectoryPlanning(
        robot, urdf_path, dynamics, joint_limits
    )

    # Generate trajectory using GPU acceleration (automatic)
    thetastart = np.zeros(6, dtype=np.float32)
    thetaend = np.array([1.5, 0.8, -0.5, 0.3, 1.2, -0.7], dtype=np.float32)
    Tf = 2.0  # 2 seconds
    N = 1000  # 1000 points
    method = 5  # Quintic time scaling

    trajectory = trajectory_planner.joint_trajectory(
        thetastart, thetaend, Tf, N, method
    )

    print(f"Generated trajectory with {trajectory['positions'].shape[0]} points")
    print(f"Trajectory shape: {trajectory['positions'].shape}")

    return trajectory

# Example usage
trajectory = generate_gpu_trajectory_example()

TrajectoryPlanning with GPU Kernels

from ManipulaPy.path_planning import TrajectoryPlanning
from ManipulaPy.urdf_processor import URDFToSerialManipulator

# Initialize robot and dynamics
urdf_processor = URDFToSerialManipulator("robot.urdf")
robot = urdf_processor.serial_manipulator
dynamics = urdf_processor.dynamics

# Set up joint and torque limits
num_joints = len(dynamics.Glist)
joint_limits = [(-np.pi, np.pi)] * num_joints
torque_limits = [(-50, 50)] * num_joints

# Create trajectory planner (automatically uses GPU when available)
trajectory_planner = TrajectoryPlanning(
    robot, "robot.urdf", dynamics, joint_limits, torque_limits
)

# Generate trajectory - GPU acceleration happens automatically
thetastart = np.zeros(num_joints)
thetaend = np.ones(num_joints) * 0.5
trajectory = trajectory_planner.joint_trajectory(
    thetastart, thetaend, Tf=2.0, N=1000, method=3
)

# Compute inverse dynamics - also GPU accelerated
torques = trajectory_planner.inverse_dynamics_trajectory(
    trajectory["positions"],
    trajectory["velocities"],
    trajectory["accelerations"]
)

print(f"Computed torques shape: {torques.shape}")

Batch Dynamics Computation

def compute_trajectory_dynamics_example():
    """Compute dynamics for entire trajectory using GPU acceleration."""

    # Initialize robot and trajectory planner
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics

    num_joints = len(dynamics.Glist)
    joint_limits = [(-np.pi, np.pi)] * num_joints

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    # Generate trajectory first
    thetastart = np.zeros(num_joints)
    thetaend = np.ones(num_joints) * 0.5

    trajectory = trajectory_planner.joint_trajectory(
        thetastart, thetaend, Tf=2.0, N=2000, method=5
    )

    # Compute inverse dynamics (GPU accelerated automatically)
    torques = trajectory_planner.inverse_dynamics_trajectory(
        trajectory["positions"],
        trajectory["velocities"],
        trajectory["accelerations"],
        gravity_vector=[0, 0, -9.81],
        Ftip=[0, 0, 0, 0, 0, 0]
    )

    print(f"Computed torques shape: {torques.shape}")
    return torques

# Example usage
trajectory_torques = compute_trajectory_dynamics_example()

Potential Field Path Planning

from ManipulaPy.potential_field import PotentialField

def potential_field_example():
    """Example using potential fields with GPU acceleration."""

    # Create potential field
    potential_field = PotentialField(
        attractive_gain=1.0,
        repulsive_gain=100.0,
        influence_distance=0.5
    )

    # Define workspace points
    workspace_points = np.random.rand(10000, 3).astype(np.float32)
    goal_position = np.array([5.0, 5.0, 2.0])
    obstacles = np.array([[2.0, 2.0, 1.0], [3.0, 4.0, 1.5]])

    # Compute potential field (uses GPU kernels internally when available)
    total_potential = np.zeros(len(workspace_points))

    for i, point in enumerate(workspace_points):
        attractive = potential_field.compute_attractive_potential(point, goal_position)
        repulsive = potential_field.compute_repulsive_potential(point, obstacles)
        total_potential[i] = attractive + repulsive

    return total_potential

# Example usage
potential_values = potential_field_example()

Performance Optimization

Memory Management

def optimal_memory_usage():
    """Example of optimal GPU memory usage with ManipulaPy."""

    # Pre-allocate trajectories for efficiency
    max_points = 5000
    num_joints = 6

    # Use float32 for GPU efficiency
    thetastart = np.zeros(num_joints, dtype=np.float32)
    thetaend = np.ones(num_joints, dtype=np.float32)

    # Initialize trajectory planner once
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * num_joints

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    # Process multiple trajectories efficiently
    trajectories = []
    for i in range(10):
        # Vary end positions
        end_pos = thetaend * (i + 1) * 0.1

        trajectory = trajectory_planner.joint_trajectory(
            thetastart, end_pos, Tf=2.0, N=max_points, method=3
        )
        trajectories.append(trajectory)

    return trajectories

def cleanup_gpu_memory():
    """Clean up GPU memory after computations."""
    try:
        import cupy as cp
        mempool = cp.get_default_memory_pool()
        mempool.free_all_blocks()
        print("GPU memory cleaned up")
    except ImportError:
        pass

trajectories = optimal_memory_usage()
cleanup_gpu_memory()

GPU Configuration Check

def check_gpu_configuration():
    """Check optimal GPU configuration for kernels."""

    try:
        from numba import cuda

        if cuda.is_available():
            device = cuda.get_current_device()
            print(f"GPU: {device.name}")
            print(f"Compute Capability: {device.compute_capability}")
            print(f"Max threads per block: {device.MAX_THREADS_PER_BLOCK}")
            print(f"Max block dimensions: {device.MAX_BLOCK_DIM_X}")
            print(f"Multiprocessor count: {device.MULTIPROCESSOR_COUNT}")
        else:
            print("No CUDA device available")
    except ImportError:
        print("Numba CUDA not available")

check_gpu_configuration()

Memory Pool Management

def setup_gpu_memory_pool(max_memory_gb=4):
    """Configure GPU memory pool for optimal performance."""

    try:
        import cupy as cp
        mempool = cp.get_default_memory_pool()

        # Set memory limit to prevent OOM
        max_bytes = int(max_memory_gb * 1024**3)
        mempool.set_limit(size=max_bytes)

        print(f"GPU memory pool configured with {max_memory_gb} GB limit")

        return mempool

    except ImportError:
        print("CuPy not available - cannot configure memory pool")
        return None
    except Exception as e:
        print(f"Error configuring memory pool: {e}")
        return None

def cleanup_gpu_memory():
    """Clean up GPU memory."""

    try:
        import cupy as cp
        mempool = cp.get_default_memory_pool()
        mempool.free_all_blocks()
        print("GPU memory cleaned up")
    except ImportError:
        pass
    except Exception as e:
        print(f"Error cleaning up GPU memory: {e}")

# Example usage
mempool = setup_gpu_memory_pool(max_memory_gb=6)

# ... perform GPU computations ...

cleanup_gpu_memory()

CPU Fallback

Automatic Fallback System

The ManipulaPy modules automatically fall back to CPU computation when GPU acceleration is not available:

def test_cpu_fallback():
    """Test CPU fallback functionality."""

    # Initialize components
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    # TrajectoryPlanning automatically handles CPU fallback
    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    # This will use GPU if available, CPU otherwise
    thetastart = np.zeros(6)
    thetaend = np.ones(6)

    trajectory = trajectory_planner.joint_trajectory(
        thetastart, thetaend, Tf=1.0, N=1000, method=3
    )

    # Check if result is valid regardless of GPU/CPU
    assert trajectory["positions"].shape == (1000, 6)
    assert trajectory["velocities"].shape == (1000, 6)
    assert trajectory["accelerations"].shape == (1000, 6)

    print("Trajectory generation successful (GPU or CPU)")

    return trajectory

test_trajectory = test_cpu_fallback()

Performance Comparison

def compare_cpu_gpu_performance():
    """Compare CPU vs GPU performance across different problem sizes."""

    try:
        from numba import cuda
        cuda_available = cuda.is_available()
    except:
        cuda_available = False

    # Initialize trajectory planner
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    problem_sizes = [100, 500, 1000, 2000]
    results = []

    for N in problem_sizes:
        thetastart = np.zeros(6)
        thetaend = np.ones(6)

        # Time the trajectory generation
        start_time = time.perf_counter()
        trajectory = trajectory_planner.joint_trajectory(
            thetastart, thetaend, Tf=2.0, N=N, method=3
        )
        elapsed_time = time.perf_counter() - start_time

        results.append({
            'problem_size': N,
            'time_ms': elapsed_time * 1000,
            'cuda_available': cuda_available
        })

        status = "GPU" if cuda_available else "CPU"
        print(f"N={N:4d}: {elapsed_time*1000:6.2f}ms ({status})")

    return results

# Run performance comparison
import time
perf_results = compare_cpu_gpu_performance()

Troubleshooting

Common Issues and Solutions

CUDA Installation Issues

Problem: ImportError: No module named 'numba.cuda'

Solutions:

# Check CUDA installation
nvidia-smi

# Install appropriate CuPy version
pip install cupy-cuda11x  # For CUDA 11.x
pip install cupy-cuda12x  # For CUDA 12.x

# Install Numba with CUDA support
pip install numba

# Verify installation
python -c "from numba import cuda; print('CUDA available:', cuda.is_available())"

GPU Memory Errors

Problem: CudaAPIError: CUDA_ERROR_OUT_OF_MEMORY

Solutions:

# Reduce trajectory size
N = 1000  # Instead of 10000

# Process in smaller chunks
def process_large_trajectory(thetastart, thetaend, total_points=10000):
    """Process large trajectory in chunks."""
    chunk_size = 1000
    all_positions = []
    all_velocities = []
    all_accelerations = []

    # Initialize trajectory planner
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    for i in range(0, total_points, chunk_size):
        chunk_N = min(chunk_size, total_points - i)
        chunk_Tf = 2.0 * chunk_N / total_points

        chunk_traj = trajectory_planner.joint_trajectory(
            thetastart, thetaend, chunk_Tf, chunk_N, method=3
        )

        all_positions.append(chunk_traj["positions"])
        all_velocities.append(chunk_traj["velocities"])
        all_accelerations.append(chunk_traj["accelerations"])

    return {
        "positions": np.vstack(all_positions),
        "velocities": np.vstack(all_velocities),
        "accelerations": np.vstack(all_accelerations)
    }

Performance Issues

Problem: GPU slower than expected

Debugging:

import time

def benchmark_trajectory_generation():
    """Benchmark trajectory generation performance."""

    # Initialize trajectory planner
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    problem_sizes = [100, 500, 1000, 2000, 5000]

    for N in problem_sizes:
        thetastart = np.zeros(6)
        thetaend = np.ones(6)

        # Time the trajectory generation
        start_time = time.perf_counter()

        trajectory = trajectory_planner.joint_trajectory(
            thetastart, thetaend, Tf=2.0, N=N, method=3
        )

        end_time = time.perf_counter()
        elapsed_ms = (end_time - start_time) * 1000

        print(f"N={N:4d}: {elapsed_ms:6.2f} ms")

benchmark_trajectory_generation()

Best Practices

Data Types

# βœ… Good: Use float32 for GPU efficiency
thetastart = np.zeros(6, dtype=np.float32)
thetaend = np.ones(6, dtype=np.float32)

# ❌ Bad: Using float64 (unnecessary precision, slower)
thetastart = np.zeros(6, dtype=np.float64)

Problem Sizing

# βœ… Good: Use appropriate problem sizes for GPU acceleration
def optimal_problem_sizing():
    """Choose problem sizes that benefit from GPU acceleration."""

    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    # Check if CUDA is available
    try:
        from numba import cuda
        cuda_available = cuda.is_available()
    except:
        cuda_available = False

    # Adjust problem size based on available acceleration
    if cuda_available:
        N = 5000  # Large problem size for GPU
        print("Using GPU acceleration with large problem size")
    else:
        N = 1000  # Smaller problem size for CPU
        print("Using CPU with moderate problem size")

    trajectory = trajectory_planner.joint_trajectory(
        np.zeros(6), np.ones(6), Tf=2.0, N=N, method=3
    )

    return trajectory

Error Handling

# βœ… Good: Comprehensive error handling
def safe_trajectory_operation(trajectory_params):
    """Safe trajectory operation with comprehensive error handling."""

    try:
        # Initialize components
        urdf_processor = URDFToSerialManipulator("robot.urdf")
        robot = urdf_processor.serial_manipulator
        dynamics = urdf_processor.dynamics
        joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

        trajectory_planner = TrajectoryPlanning(
            robot, "robot.urdf", dynamics, joint_limits
        )

        # Perform trajectory generation
        result = trajectory_planner.joint_trajectory(**trajectory_params)

        # Validate result
        if result is None or result["positions"].shape[0] == 0:
            print("Invalid result, using fallback")
            return create_fallback_trajectory(**trajectory_params)

        return result

    except FileNotFoundError:
        print("URDF file not found, using default robot model")
        return create_default_trajectory(**trajectory_params)
    except MemoryError:
        print("Insufficient memory, reducing problem size")
        reduced_params = trajectory_params.copy()
        reduced_params['N'] = min(reduced_params.get('N', 1000), 500)
        return safe_trajectory_operation(reduced_params)
    except Exception as e:
        print(f"Unexpected error: {e}. Using basic fallback.")
        return create_fallback_trajectory(**trajectory_params)

def create_fallback_trajectory(**params):
    """Create simple linear trajectory as fallback."""
    thetastart = params.get('thetastart', np.zeros(6))
    thetaend = params.get('thetaend', np.ones(6))
    N = params.get('N', 100)

    positions = np.linspace(thetastart, thetaend, N)
    velocities = np.zeros_like(positions)
    accelerations = np.zeros_like(positions)

    return {
        'positions': positions,
        'velocities': velocities,
        'accelerations': accelerations
    }

Memory Management

# βœ… Good: Clean up GPU memory after large computations
def efficient_trajectory_processing():
    """Process trajectories efficiently with memory management."""

    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    results = []

    try:
        for i in range(10):  # Process multiple trajectories
            thetastart = np.zeros(6)
            thetaend = np.random.uniform(-1, 1, 6)

            trajectory = trajectory_planner.joint_trajectory(
                thetastart, thetaend, Tf=2.0, N=1000, method=3
            )
            results.append(trajectory)

        return results

    finally:
        # Clean up GPU memory
        try:
            import cupy as cp
            mempool = cp.get_default_memory_pool()
            mempool.free_all_blocks()
        except ImportError:
            pass

Performance Optimization

# βœ… Good: Minimize data transfers and reuse components
class EfficientTrajectoryProcessor:
    def __init__(self, urdf_path):
        """Initialize processor with reusable components."""

        # Initialize once and reuse
        self.urdf_processor = URDFToSerialManipulator(urdf_path)
        self.robot = self.urdf_processor.serial_manipulator
        self.dynamics = self.urdf_processor.dynamics
        self.joint_limits = [(-np.pi, np.pi)] * len(self.dynamics.Glist)

        self.trajectory_planner = TrajectoryPlanning(
            self.robot, urdf_path, self.dynamics, self.joint_limits
        )

    def process_trajectory(self, thetastart, thetaend, Tf=2.0, N=1000, method=3):
        """Process single trajectory efficiently."""

        return self.trajectory_planner.joint_trajectory(
            thetastart, thetaend, Tf, N, method
        )

    def process_batch(self, trajectory_configs):
        """Process multiple trajectories efficiently."""

        results = []
        for config in trajectory_configs:
            result = self.trajectory_planner.joint_trajectory(**config)
            results.append(result)

        # Clean up after batch
        try:
            import cupy as cp
            mempool = cp.get_default_memory_pool()
            mempool.free_all_blocks()
        except ImportError:
            pass

        return results

# Usage example
processor = EfficientTrajectoryProcessor("robot.urdf")

configs = [
    {"thetastart": np.zeros(6), "thetaend": np.ones(6)*0.5, "Tf": 2.0, "N": 1000, "method": 3},
    {"thetastart": np.ones(6)*0.5, "thetaend": np.zeros(6), "Tf": 1.5, "N": 800, "method": 5},
]

results = processor.process_batch(configs)

Integration with ManipulaPy

Integration with Simulation

from ManipulaPy.sim import Simulation

def gpu_accelerated_simulation():
    """Example of GPU-accelerated simulation."""

    # Create simulation
    joint_limits = [(-np.pi, np.pi)] * 6
    torque_limits = [(-50, 50)] * 6

    sim = Simulation(
        urdf_file_path="robot.urdf",
        joint_limits=joint_limits,
        torque_limits=torque_limits
    )

    # Initialize robot and planner
    sim.initialize_robot()
    sim.initialize_planner_and_controller()

    # Generate GPU-accelerated trajectory
    thetastart = np.zeros(6)
    thetaend = np.array([0.5, -0.3, 0.8, -0.2, 0.4, -0.6])

    trajectory = sim.trajectory_planner.joint_trajectory(
        thetastart, thetaend, Tf=3.0, N=3000, method=5
    )

    # Run simulation with GPU-generated trajectory
    end_pos = sim.run_trajectory(trajectory["positions"])

    return end_pos

final_position = gpu_accelerated_simulation()

Integration with Control

from ManipulaPy.control import ManipulatorController
import cupy as cp

def gpu_accelerated_control():
    """Example of using GPU acceleration with control."""

    # Initialize robot components
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    # Create controller
    controller = ManipulatorController(dynamics)

    # Generate reference trajectory using GPU
    thetastart = np.zeros(6)
    thetaend = np.ones(6) * 0.5

    trajectory = trajectory_planner.joint_trajectory(
        thetastart, thetaend, Tf=2.0, N=2000, method=5
    )

    # Use CuPy for control computations if available
    try:
        import cupy as cp

        # Convert to CuPy arrays for GPU computation
        thetalistd = cp.asarray(trajectory["positions"][0])
        dthetalistd = cp.asarray(trajectory["velocities"][0])
        ddthetalistd = cp.asarray(trajectory["accelerations"][0])

        thetalist = cp.zeros(6)
        dthetalist = cp.zeros(6)
        g = cp.array([0, 0, -9.81])

        # Control gains
        Kp = cp.array([10.0] * 6)
        Ki = cp.array([0.1] * 6)
        Kd = cp.array([1.0] * 6)

        # Compute control signal
        tau = controller.computed_torque_control(
            thetalistd, dthetalistd, ddthetalistd,
            thetalist, dthetalist, g, dt=0.01,
            Kp=Kp, Ki=Ki, Kd=Kd
        )

        return cp.asnumpy(tau)

    except ImportError:
        # Fall back to NumPy
        return controller.computed_torque_control(
            trajectory["positions"][0],
            trajectory["velocities"][0],
            trajectory["accelerations"][0],
            np.zeros(6), np.zeros(6),
            np.array([0, 0, -9.81]),
            dt=0.01,
            Kp=np.array([10.0] * 6),
            Ki=np.array([0.1] * 6),
            Kd=np.array([1.0] * 6)
        )

control_torque = gpu_accelerated_control()

Working Examples

Complete GPU Workflow Example

def complete_gpu_workflow_example():
    """Complete example showing GPU acceleration workflow."""

    print("=== Complete GPU Acceleration Example ===")

    # 1. Check GPU availability
    try:
        from numba import cuda
        cuda_available = cuda.is_available()
        print(f"1. CUDA Available: {cuda_available}")
    except:
        cuda_available = False
        print("1. CUDA Not Available - using CPU fallback")

    try:
        import cupy as cp
        cupy_available = True
        print("   CuPy Available: True")
    except:
        cupy_available = False
        print("   CuPy Available: False")

    # 2. Initialize robot and trajectory planner
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    print(f"2. Initialized {len(dynamics.Glist)}-DOF robot")

    # 3. Generate trajectory (GPU accelerated if available)
    thetastart = np.zeros(len(dynamics.Glist))
    thetaend = np.ones(len(dynamics.Glist)) * 0.5
    N = 5000 if cuda_available else 1000  # Larger problem for GPU

    start_time = time.perf_counter()
    trajectory = trajectory_planner.joint_trajectory(
        thetastart, thetaend, Tf=2.0, N=N, method=5
    )
    trajectory_time = time.perf_counter() - start_time

    acceleration_type = "GPU" if cuda_available else "CPU"
    print(f"3. Generated {N}-point trajectory using {acceleration_type}")
    print(f"   Time: {trajectory_time*1000:.2f} ms")

    # 4. Compute dynamics (also GPU accelerated)
    start_time = time.perf_counter()
    torques = trajectory_planner.inverse_dynamics_trajectory(
        trajectory["positions"],
        trajectory["velocities"],
        trajectory["accelerations"]
    )
    dynamics_time = time.perf_counter() - start_time

    print(f"4. Computed inverse dynamics")
    print(f"   Time: {dynamics_time*1000:.2f} ms")
    print(f"   Torques shape: {torques.shape}")

    # 5. Memory cleanup
    if cupy_available:
        try:
            import cupy as cp
            mempool = cp.get_default_memory_pool()
            mempool.free_all_blocks()
            print("5. GPU memory cleaned up")
        except:
            print("5. Memory cleanup skipped")
    else:
        print("5. No GPU memory to clean up")

    # 6. Performance summary
    total_time = trajectory_time + dynamics_time
    print(f"\n=== Performance Summary ===")
    print(f"Total computation time: {total_time*1000:.2f} ms")
    print(f"Trajectory generation: {trajectory_time*1000:.2f} ms")
    print(f"Dynamics computation: {dynamics_time*1000:.2f} ms")
    print(f"Acceleration: {acceleration_type}")

    return {
        'trajectory': trajectory,
        'torques': torques,
        'performance': {
            'total_time_ms': total_time * 1000,
            'trajectory_time_ms': trajectory_time * 1000,
            'dynamics_time_ms': dynamics_time * 1000,
            'acceleration_type': acceleration_type,
            'problem_size': N
        }
    }

# Run the complete example
import time
results = complete_gpu_workflow_example()

Batch Processing Example

def batch_processing_example():
    """Example of efficient batch processing with GPU acceleration."""

    print("=== Batch Processing Example ===")

    # Initialize trajectory processor
    processor = EfficientTrajectoryProcessor("robot.urdf")

    # Define multiple trajectory configurations
    trajectory_configs = [
        {
            "thetastart": np.zeros(6),
            "thetaend": np.array([0.5, -0.3, 0.2, -0.4, 0.6, -0.1]),
            "Tf": 2.0, "N": 1000, "method": 3
        },
        {
            "thetastart": np.array([0.1, 0.2, -0.1, 0.3, -0.2, 0.4]),
            "thetaend": np.zeros(6),
            "Tf": 1.5, "N": 800, "method": 5
        },
        {
            "thetastart": np.ones(6) * 0.2,
            "thetaend": np.ones(6) * -0.3,
            "Tf": 3.0, "N": 1500, "method": 3
        },
        {
            "thetastart": np.random.uniform(-0.5, 0.5, 6),
            "thetaend": np.random.uniform(-0.5, 0.5, 6),
            "Tf": 2.5, "N": 1200, "method": 5
        }
    ]

    # Process batch
    start_time = time.perf_counter()
    results = processor.process_batch(trajectory_configs)
    batch_time = time.perf_counter() - start_time

    # Analyze results
    total_points = sum(result["positions"].shape[0] for result in results)

    print(f"Processed {len(trajectory_configs)} trajectories")
    print(f"Total trajectory points: {total_points}")
    print(f"Batch processing time: {batch_time*1000:.2f} ms")
    print(f"Average time per trajectory: {batch_time*1000/len(trajectory_configs):.2f} ms")

    # Validate results
    for i, result in enumerate(results):
        config = trajectory_configs[i]
        expected_shape = (config["N"], 6)
        actual_shape = result["positions"].shape

        print(f"Trajectory {i+1}: {actual_shape} ({'βœ…' if actual_shape == expected_shape else '❌'})")

    return results

batch_results = batch_processing_example()

Performance Profiling Example

def performance_profiling_example():
    """Comprehensive performance profiling example."""

    print("=== Performance Profiling Example ===")

    # Check available acceleration
    try:
        from numba import cuda
        cuda_available = cuda.is_available()
    except:
        cuda_available = False

    # Initialize trajectory planner
    urdf_processor = URDFToSerialManipulator("robot.urdf")
    robot = urdf_processor.serial_manipulator
    dynamics = urdf_processor.dynamics
    joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)

    trajectory_planner = TrajectoryPlanning(
        robot, "robot.urdf", dynamics, joint_limits
    )

    # Test different problem sizes
    problem_sizes = [100, 500, 1000, 2000, 5000]

    print(f"Testing trajectory generation performance:")
    print(f"Acceleration: {'GPU (CUDA)' if cuda_available else 'CPU'}")
    print("Problem Size | Time (ms) | Points/sec")
    print("-" * 40)

    results = []

    for N in problem_sizes:
        thetastart = np.zeros(len(dynamics.Glist))
        thetaend = np.random.uniform(-1, 1, len(dynamics.Glist))

        # Warm up (first run may be slower due to initialization)
        trajectory_planner.joint_trajectory(
            thetastart, thetaend, Tf=2.0, N=min(N, 100), method=3
        )

        # Measure actual performance
        start_time = time.perf_counter()
        trajectory = trajectory_planner.joint_trajectory(
            thetastart, thetaend, Tf=2.0, N=N, method=3
        )
        elapsed_time = time.perf_counter() - start_time

        points_per_sec = N / elapsed_time

        print(f"{N:11d} | {elapsed_time*1000:8.2f} | {points_per_sec:9.0f}")

        results.append({
            'problem_size': N,
            'time_ms': elapsed_time * 1000,
            'points_per_sec': points_per_sec,
            'cuda_available': cuda_available
        })

        # Clean up GPU memory between tests
        try:
            import cupy as cp
            mempool = cp.get_default_memory_pool()
            mempool.free_all_blocks()
        except ImportError:
            pass

    # Performance analysis
    if len(results) >= 2:
        small_size_time = results[0]['time_ms']
        large_size_time = results[-1]['time_ms']

        print(f"\n=== Performance Analysis ===")
        print(f"Smallest problem: {results[0]['problem_size']} points in {small_size_time:.2f} ms")
        print(f"Largest problem: {results[-1]['problem_size']} points in {large_size_time:.2f} ms")

        if large_size_time > 0:
            efficiency = (results[-1]['problem_size'] / results[0]['problem_size']) / (large_size_time / small_size_time)
            print(f"Scaling efficiency: {efficiency:.2f} (1.0 = linear scaling)")

    return results

profile_results = performance_profiling_example()

When to Use GPU Acceleration

Recommended for:

  • Large trajectory generation (N > 1000 points)

  • Batch dynamics computation for multiple trajectories

  • Dense potential field calculations with many sample points

  • Real-time path planning with many obstacles

  • Iterative optimization algorithms with repeated computations

Not recommended for:

  • Small computations (N < 100 points)

  • One-off calculations or prototype development

  • Memory-constrained systems with limited GPU memory

  • Development/debugging phases where CPU debugging is easier

Performance Guidelines

Problem Size Recommendations

Problem Size

Trajectory Points

Recommended Hardware

Expected Speedup

Small

< 100

CPU sufficient

No benefit

Medium

100-1000

GPU optional

2-5x speedup

Large

1000-10000

GPU recommended

10-50x speedup

Very Large

> 10000

GPU required

50-100x speedup

Getting Started Checklist

Installation and Setup

  1. βœ… Install CUDA support: pip install cupy-cuda11x numba ManipulaPy

  2. βœ… Verify installation: Check cuda.is_available()

  3. βœ… Start with TrajectoryPlanning: Use existing ManipulaPy classes

  4. βœ… Test performance: Compare GPU vs CPU for your use case

  5. βœ… Optimize gradually: Apply memory management best practices

  6. βœ… Monitor system: Watch for memory usage and thermal limits

Quick Start Guide

# Quick start: GPU-accelerated trajectory generation

# 1. Import required modules
from ManipulaPy.urdf_processor import URDFToSerialManipulator
from ManipulaPy.path_planning import TrajectoryPlanning
import numpy as np

# 2. Load robot
urdf_processor = URDFToSerialManipulator("your_robot.urdf")
robot = urdf_processor.serial_manipulator
dynamics = urdf_processor.dynamics

# 3. Create trajectory planner (GPU acceleration automatic)
joint_limits = [(-np.pi, np.pi)] * len(dynamics.Glist)
planner = TrajectoryPlanning(robot, "your_robot.urdf", dynamics, joint_limits)

# 4. Generate trajectory
start = np.zeros(len(dynamics.Glist))
end = np.ones(len(dynamics.Glist)) * 0.5

trajectory = planner.joint_trajectory(
    start, end, Tf=2.0, N=2000, method=5  # Large N benefits from GPU
)

# 5. Compute dynamics
torques = planner.inverse_dynamics_trajectory(
    trajectory["positions"],
    trajectory["velocities"],
    trajectory["accelerations"]
)

print(f"Generated {trajectory['positions'].shape[0]} trajectory points")
print(f"Computed {torques.shape[0]} torque values")

Common Patterns

Pattern 1: Batch Processing

# Process multiple trajectories efficiently
configs = [
    {"thetastart": start1, "thetaend": end1, "Tf": 2.0, "N": 1000, "method": 3},
    {"thetastart": start2, "thetaend": end2, "Tf": 1.5, "N": 800, "method": 5},
    # ... more configurations
]

results = []
for config in configs:
    trajectory = planner.joint_trajectory(**config)
    results.append(trajectory)

Pattern 2: Progressive Problem Sizing

# Start small, scale up based on performance
base_N = 500

# Test performance with base size
start_time = time.perf_counter()
test_trajectory = planner.joint_trajectory(start, end, Tf=2.0, N=base_N, method=3)
test_time = time.perf_counter() - start_time

# Scale up if performance is good
if test_time < 0.1:  # Less than 100ms
    N = base_N * 10  # Scale up 10x
else:
    N = base_N

# Generate final trajectory
trajectory = planner.joint_trajectory(start, end, Tf=2.0, N=N, method=3)

Pattern 3: Memory-Aware Processing

# Process with automatic memory cleanup
try:
    # Large computation
    trajectory = planner.joint_trajectory(start, end, Tf=2.0, N=10000, method=5)
    torques = planner.inverse_dynamics_trajectory(
        trajectory["positions"], trajectory["velocities"], trajectory["accelerations"]
    )

finally:
    # Always clean up
    try:
        import cupy as cp
        cp.get_default_memory_pool().free_all_blocks()
    except ImportError:
        pass

Conclusion

The ManipulaPy CUDA Kernels module provides significant performance improvements for robotics applications through:

  • High-Performance Computing: 10-100x speedup for suitable workloads

  • Seamless Integration: Works directly with TrajectoryPlanning and other ManipulaPy modules

  • Automatic Fallback: Graceful degradation to CPU when GPU unavailable

  • Memory Efficiency: Optimized GPU memory management

The GPU acceleration is built into ManipulaPy’s core modules like TrajectoryPlanning, so you can benefit from it without changing your existing code - just install the CUDA dependencies and ManipulaPy will automatically use GPU acceleration when available.

For additional support and advanced usage patterns, refer to the ManipulaPy documentation and GitHub repository.

API Reference

For complete function documentation: CUDA Kernels API Reference