Optimized Path Planning User Guideļ
This guide covers the highly optimized trajectory planning capabilities in ManipulaPy, including adaptive GPU/CPU execution, memory pooling, batch processing, and advanced performance optimizations for robotic manipulators.
Introductionļ
The optimized TrajectoryPlanning class (now OptimizedTrajectoryPlanning) provides comprehensive trajectory generation and execution capabilities with significant performance improvements through CUDA acceleration and intelligent adaptive execution strategies.
Key Optimizations: - Adaptive GPU/CPU execution based on problem size and hardware availability - Memory pooling to reduce allocation overhead and improve performance - Batch processing for multiple trajectories with optimized kernel launches - Fused kernels to minimize memory bandwidth requirements - Intelligent fallback to CPU when beneficial for small problems - 2D parallelization for better GPU utilization - Pinned memory transfers for faster host-device communication
Enhanced Features: - Automatic performance tuning and statistics collection - Smart threshold adaptation based on execution patterns - Comprehensive error handling and graceful degradation - Advanced profiling and benchmarking capabilities - Backward compatibility with existing TrajectoryPlanning interface
Mathematical Backgroundļ
The mathematical foundation remains the same as the original implementation, with optimizations focused on computational efficiency rather than algorithmic changes. The core time-scaling functions, dynamics computations, and collision avoidance methods are preserved while being accelerated through parallel execution.
Key Computational Optimizations:
Parallel Time-Scaling: Joint trajectory computation parallelized across both time steps and joints using 2D CUDA grids
Vectorized Operations: All mathematical operations vectorized for SIMD execution on both CPU and GPU
Memory Coalescing: Data layouts optimized for efficient memory access patterns
Kernel Fusion: Multiple operations combined into single kernel launches to reduce overhead
Getting Startedļ
Basic Setup with Optimizationsļ
from ManipulaPy.path_planning import OptimizedTrajectoryPlanning, create_optimized_planner
from ManipulaPy.urdf_processor import URDFToSerialManipulator
import numpy as np
# Load robot model
processor = URDFToSerialManipulator("robot.urdf")
robot = processor.serial_manipulator
dynamics = processor.dynamics
# Define joint and torque limits
joint_limits = [(-np.pi, np.pi)] * 6 # 6-DOF robot
torque_limits = [(-50, 50)] * 6 # ±50 Nā
m per joint
# Method 1: Use factory function with auto-optimization
planner = create_optimized_planner(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
torque_limits=torque_limits,
gpu_memory_mb=512, # Allocate 512MB GPU memory pool
enable_profiling=True # Enable performance profiling
)
# Method 2: Direct instantiation with custom settings
planner_custom = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
torque_limits=torque_limits,
use_cuda=None, # Auto-detect (None/True/False)
cuda_threshold=100, # Min problem size for GPU
memory_pool_size_mb=256, # GPU memory pool size
enable_profiling=False # Disable profiling for production
)
print(f"CUDA available: {planner.cuda_available}")
print(f"GPU properties: {planner.gpu_properties}")
Backward Compatibilityļ
The optimized planner maintains full backward compatibility:
# Existing code works unchanged - automatically uses optimizations
from ManipulaPy.path_planning import TrajectoryPlanning
# This now creates an OptimizedTrajectoryPlanning instance
planner = TrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
torque_limits=torque_limits
)
# All existing methods work exactly the same
trajectory = planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=100, method=3
)
Performance-Optimized Methodsļ
joint_trajectory() with Adaptive Executionļ
The optimized joint_trajectory() method automatically selects the best execution strategy:
def optimized_trajectory_demo():
"""Demonstrate adaptive trajectory generation with performance monitoring."""
# Test different problem sizes
test_cases = [
{"N": 50, "name": "Small (CPU preferred)"},
{"N": 500, "name": "Medium (GPU beneficial)"},
{"N": 5000, "name": "Large (GPU optimal)"}
]
theta_start = np.zeros(6)
theta_end = np.array([0.8, -0.5, 0.3, -0.2, 0.6, -0.4])
for case in test_cases:
print(f"\n=== {case['name']} ===")
# Reset performance stats
planner.reset_performance_stats()
# Generate trajectory
start_time = time.time()
trajectory = planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=case['N'], method=5
)
elapsed = time.time() - start_time
# Get performance statistics
stats = planner.get_performance_stats()
print(f"Points generated: {trajectory['positions'].shape}")
print(f"Execution time: {elapsed:.4f}s")
print(f"Used GPU: {stats['gpu_calls'] > 0}")
print(f"GPU usage: {stats['gpu_usage_percent']:.1f}%")
if stats['gpu_calls'] > 0:
print(f"Avg GPU time: {stats['avg_gpu_time']:.4f}s")
if stats['cpu_calls'] > 0:
print(f"Avg CPU time: {stats['avg_cpu_time']:.4f}s")
return trajectory
# Run demonstration
demo_trajectory = optimized_trajectory_demo()
batch_joint_trajectory() for Multiple Trajectoriesļ
Process multiple trajectories simultaneously with optimized batch kernels:
def batch_trajectory_demo():
"""Demonstrate high-performance batch trajectory generation."""
# Generate multiple start/end configurations
batch_size = 20
num_joints = 6
# Random start and end configurations
np.random.seed(42) # For reproducible results
thetastart_batch = np.random.uniform(-1.0, 1.0, (batch_size, num_joints))
thetaend_batch = np.random.uniform(-1.0, 1.0, (batch_size, num_joints))
print(f"Generating {batch_size} trajectories in batch...")
# Reset stats for clean measurement
planner.reset_performance_stats()
# Generate batch trajectories
start_time = time.time()
batch_trajectories = planner.batch_joint_trajectory(
thetastart_batch=thetastart_batch,
thetaend_batch=thetaend_batch,
Tf=3.0,
N=200,
method=5
)
batch_time = elapsed = time.time() - start_time
print(f"Batch processing completed:")
print(f"- Total time: {batch_time:.4f}s")
print(f"- Time per trajectory: {batch_time/batch_size:.4f}s")
print(f"- Output shape: {batch_trajectories['positions'].shape}")
# Compare with sequential processing
print(f"\nComparing with sequential processing...")
planner.reset_performance_stats()
start_time = time.time()
sequential_trajectories = []
for i in range(batch_size):
traj = planner.joint_trajectory(
thetastart_batch[i], thetaend_batch[i], Tf=3.0, N=200, method=5
)
sequential_trajectories.append(traj)
sequential_time = time.time() - start_time
speedup = sequential_time / batch_time
print(f"- Sequential time: {sequential_time:.4f}s")
print(f"- Batch speedup: {speedup:.2f}x")
# Verify results are equivalent
sequential_positions = np.array([t['positions'] for t in sequential_trajectories])
max_diff = np.max(np.abs(batch_trajectories['positions'] - sequential_positions))
print(f"- Max difference: {max_diff:.2e} (should be ~0)")
return batch_trajectories, speedup
# Run batch demonstration
batch_trajs, speedup = batch_trajectory_demo()
Advanced Performance Featuresļ
Memory Pool Managementļ
Optimize memory allocation for better performance:
def memory_optimization_demo():
"""Demonstrate memory pool optimization for sustained performance."""
print("Memory Pool Optimization Demo")
print("=" * 40)
# Create planner with large memory pool
large_pool_planner = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
memory_pool_size_mb=1024, # 1GB memory pool
enable_profiling=True
)
# Test sustained performance with many trajectories
num_iterations = 50
trajectory_sizes = [100, 500, 1000, 2000]
print(f"Testing {num_iterations} iterations for each size...")
for N in trajectory_sizes:
print(f"\nTesting N={N}:")
# Reset stats
large_pool_planner.reset_performance_stats()
times = []
for i in range(num_iterations):
# Generate random trajectory
theta_start = np.random.uniform(-1, 1, 6)
theta_end = np.random.uniform(-1, 1, 6)
start_time = time.time()
traj = large_pool_planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=N, method=5
)
times.append(time.time() - start_time)
# Analyze performance stability
times = np.array(times)
stats = large_pool_planner.get_performance_stats()
print(f" Mean time: {np.mean(times):.4f}s ± {np.std(times):.4f}s")
print(f" Min/Max: {np.min(times):.4f}s / {np.max(times):.4f}s")
print(f" GPU usage: {stats['gpu_usage_percent']:.1f}%")
print(f" Memory transfers: {stats['memory_transfers']}")
print(f" Kernel launches: {stats['kernel_launches']}")
# Clean up memory pool
large_pool_planner.cleanup_gpu_memory()
return times
# Run memory optimization demo
memory_times = memory_optimization_demo()
Performance Benchmarkingļ
Built-in benchmarking capabilities for performance analysis:
def comprehensive_benchmark():
"""Run comprehensive performance benchmarks."""
print("Comprehensive Performance Benchmark")
print("=" * 50)
# Test 1: Built-in benchmark
print("Running built-in benchmarks...")
benchmark_results = planner.benchmark_performance()
for test_name, result in benchmark_results.items():
print(f"\n{test_name} Test:")
print(f" Problem size: {result['N']} Ć {result['joints']}")
print(f" Total time: {result['total_time']:.4f}s")
print(f" Used GPU: {result['used_gpu']}")
print(f" Output shape: {result['trajectory_shape']}")
if 'stats' in result:
stats = result['stats']
print(f" GPU calls: {stats['gpu_calls']}")
print(f" CPU calls: {stats['cpu_calls']}")
# Test 2: Implementation comparison
print(f"\n" + "=" * 50)
print("Comparing CPU vs GPU implementations...")
comparison_results = compare_implementations(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
test_params={"N": 2000, "Tf": 3.0, "method": 5}
)
print("\nCPU Implementation:")
cpu_result = comparison_results['cpu']
print(f" Time: {cpu_result['time']:.4f}s")
print(f" Shape: {cpu_result['result_shape']}")
gpu_result = comparison_results.get('gpu', {})
if gpu_result.get('available', True):
print("\nGPU Implementation:")
print(f" Time: {gpu_result['time']:.4f}s")
print(f" Shape: {gpu_result['result_shape']}")
print(f" Speedup: {gpu_result['speedup']:.2f}x")
if 'accuracy' in comparison_results:
acc = comparison_results['accuracy']
print("\nAccuracy Comparison:")
print(f" Max position diff: {acc['max_pos_diff']:.2e}")
print(f" Max velocity diff: {acc['max_vel_diff']:.2e}")
print(f" Max acceleration diff: {acc['max_acc_diff']:.2e}")
else:
print("\nGPU Implementation: Not available")
return benchmark_results, comparison_results
# Run comprehensive benchmark
bench_results, comp_results = comprehensive_benchmark()
Optimized Dynamics Integrationļ
Enhanced inverse_dynamics_trajectory()ļ
GPU-accelerated dynamics computation with optimized memory management:
def optimized_dynamics_demo():
"""Demonstrate optimized dynamics computation with performance analysis."""
# Generate a complex trajectory
theta_start = np.array([0.1, 0.2, -0.3, 0.1, 0.5, -0.2])
theta_end = np.array([0.8, -0.4, 0.6, -0.3, 0.2, 0.7])
# Large trajectory for performance testing
N = 2000 # 2000 points
Tf = 5.0 # 5 seconds
print("Generating large trajectory for dynamics analysis...")
trajectory = planner.joint_trajectory(
theta_start, theta_end, Tf=Tf, N=N, method=5
)
print(f"Trajectory generated: {trajectory['positions'].shape}")
# Test dynamics computation performance
print("\nComputing inverse dynamics...")
planner.reset_performance_stats()
start_time = time.time()
torques = planner.inverse_dynamics_trajectory(
trajectory['positions'],
trajectory['velocities'],
trajectory['accelerations'],
gravity_vector=[0, 0, -9.81],
Ftip=[0, 0, 0, 0, 0, 0]
)
dynamics_time = time.time() - start_time
print(f"Dynamics computation completed:")
print(f"- Time: {dynamics_time:.4f}s")
print(f"- Rate: {N/dynamics_time:.1f} points/second")
print(f"- Torque shape: {torques.shape}")
# Analyze torque statistics
max_torques = np.max(np.abs(torques), axis=0)
mean_torques = np.mean(np.abs(torques), axis=0)
print(f"\nTorque Analysis:")
for i, (max_t, mean_t) in enumerate(zip(max_torques, mean_torques)):
limit = planner.torque_limits[i, 1]
usage = max_t / limit * 100
print(f" Joint {i+1}: Max {max_t:.1f} Nā
m ({usage:.1f}% of limit), Mean {mean_t:.1f} Nā
m")
# Get performance stats
stats = planner.get_performance_stats()
print(f"\nPerformance Stats:")
print(f"- GPU usage: {stats['gpu_usage_percent']:.1f}%")
print(f"- Kernel launches: {stats['kernel_launches']}")
print(f"- Memory transfers: {stats['memory_transfers']}")
return torques, dynamics_time
# Run optimized dynamics demo
demo_torques, demo_time = optimized_dynamics_demo()
Enhanced forward_dynamics_trajectory()ļ
Optimized forward dynamics simulation with adaptive execution:
def optimized_forward_dynamics_demo():
"""Demonstrate optimized forward dynamics simulation."""
# Initial conditions
theta_initial = np.array([0.1, 0.2, -0.1, 0.0, 0.3, 0.0])
theta_dot_initial = np.zeros(6)
# Define control sequence - sinusoidal torques
N_steps = 1000
dt = 0.01
time_steps = np.arange(N_steps) * dt
# Generate realistic control torques
tau_matrix = np.zeros((N_steps, 6))
for i in range(6):
frequency = 0.5 + i * 0.2 # Different frequency for each joint
amplitude = 2.0 + i * 0.5 # Different amplitude for each joint
tau_matrix[:, i] = amplitude * np.sin(2 * np.pi * frequency * time_steps)
# External forces (varying)
Ftip_matrix = np.zeros((N_steps, 6))
Ftip_matrix[:, 2] = 10.0 * np.sin(2 * np.pi * 0.2 * time_steps) # Vertical force
print("Running optimized forward dynamics simulation...")
print(f"Steps: {N_steps}, dt: {dt}s, Total time: {N_steps*dt}s")
# Reset performance tracking
planner.reset_performance_stats()
# Run simulation
start_time = time.time()
sim_result = planner.forward_dynamics_trajectory(
thetalist=theta_initial,
dthetalist=theta_dot_initial,
taumat=tau_matrix,
g=[0, 0, -9.81],
Ftipmat=Ftip_matrix,
dt=dt,
intRes=1
)
simulation_time = time.time() - start_time
print(f"Simulation completed:")
print(f"- Computation time: {simulation_time:.4f}s")
print(f"- Real-time factor: {(N_steps*dt)/simulation_time:.1f}x")
print(f"- Position shape: {sim_result['positions'].shape}")
# Analyze results
final_positions = sim_result['positions'][-1]
max_velocities = np.max(np.abs(sim_result['velocities']), axis=0)
max_accelerations = np.max(np.abs(sim_result['accelerations']), axis=0)
print(f"\nSimulation Analysis:")
print(f"Final positions: {np.degrees(final_positions).round(1)} deg")
print(f"Max velocities: {max_velocities.round(2)} rad/s")
print(f"Max accelerations: {max_accelerations.round(2)} rad/s²")
# Check joint limit compliance
positions = sim_result['positions']
limit_violations = 0
for i in range(6):
min_pos = np.min(positions[:, i])
max_pos = np.max(positions[:, i])
if min_pos < planner.joint_limits[i, 0] or max_pos > planner.joint_limits[i, 1]:
limit_violations += 1
print(f" Joint {i+1}: LIMIT VIOLATION ({np.degrees([min_pos, max_pos]).round(1)} deg)")
if limit_violations == 0:
print(" All joints stayed within limits ā")
# Performance stats
stats = planner.get_performance_stats()
print(f"\nPerformance Stats:")
print(f"- Used GPU: {stats['gpu_calls'] > 0}")
print(f"- Execution strategy: {'GPU' if stats['gpu_calls'] > 0 else 'CPU'}")
return sim_result, simulation_time
# Run forward dynamics demonstration
sim_results, sim_time = optimized_forward_dynamics_demo()
Optimized Cartesian Trajectoriesļ
Enhanced cartesian_trajectory()ļ
GPU-accelerated Cartesian trajectory generation with adaptive execution:
def optimized_cartesian_demo():
"""Demonstrate optimized Cartesian trajectory generation."""
# Define complex Cartesian trajectory
X_start = np.eye(4)
X_start[:3, 3] = [0.3, 0.2, 0.5] # Start position
# End pose with significant rotation and translation
X_end = np.eye(4)
X_end[:3, 3] = [0.6, -0.3, 0.3] # End position
# 90-degree rotation about Z-axis
angle = np.pi/2
X_end[:3, :3] = np.array([
[np.cos(angle), -np.sin(angle), 0],
[np.sin(angle), np.cos(angle), 0],
[0, 0, 1]
])
# Test different trajectory sizes
test_sizes = [100, 500, 2000, 5000]
print("Optimized Cartesian Trajectory Generation")
print("=" * 45)
for N in test_sizes:
print(f"\nTesting N={N} points:")
# Reset performance stats
planner.reset_performance_stats()
# Generate trajectory
start_time = time.time()
cart_traj = planner.cartesian_trajectory(
X_start, X_end, Tf=3.0, N=N, method=5
)
elapsed = time.time() - start_time
# Analyze results
positions = cart_traj['positions']
velocities = cart_traj['velocities']
accelerations = cart_traj['accelerations']
orientations = cart_traj['orientations']
# Calculate path metrics
path_length = np.sum(np.linalg.norm(np.diff(positions, axis=0), axis=1))
max_velocity = np.max(np.linalg.norm(velocities, axis=1))
max_acceleration = np.max(np.linalg.norm(accelerations, axis=1))
# Performance stats
stats = planner.get_performance_stats()
used_gpu = stats['gpu_calls'] > 0
print(f" Time: {elapsed:.4f}s ({'GPU' if used_gpu else 'CPU'})")
print(f" Path length: {path_length:.3f}m")
print(f" Max velocity: {max_velocity:.3f}m/s")
print(f" Max acceleration: {max_acceleration:.3f}m/s²")
print(f" Shapes: pos{positions.shape}, vel{velocities.shape}, acc{accelerations.shape}")
# Verify start and end points
start_error = np.linalg.norm(positions[0] - X_start[:3, 3])
end_error = np.linalg.norm(positions[-1] - X_end[:3, 3])
print(f" Start/End errors: {start_error:.2e}, {end_error:.2e}")
# Return the largest trajectory for visualization
final_traj = planner.cartesian_trajectory(X_start, X_end, Tf=3.0, N=1000, method=5)
return final_traj
# Run Cartesian trajectory demonstration
cartesian_demo = optimized_cartesian_demo()
Real-World Application Examplesļ
High-Performance Pick-and-Placeļ
Optimized trajectory planning for industrial pick-and-place operations:
def optimized_pick_and_place():
"""Demonstrate optimized pick-and-place trajectory planning."""
print("Optimized Pick-and-Place Trajectory Planning")
print("=" * 50)
# Define task parameters
pick_location = np.array([0.4, 0.3, 0.2])
place_location = np.array([0.6, -0.2, 0.25])
approach_height = 0.1 # 10cm above objects
# Calculate waypoint poses
home_pose = np.eye(4)
home_pose[:3, 3] = [0.5, 0.0, 0.4]
pick_approach = np.eye(4)
pick_approach[:3, 3] = pick_location + np.array([0, 0, approach_height])
pick_pose = np.eye(4)
pick_pose[:3, 3] = pick_location
place_approach = np.eye(4)
place_approach[:3, 3] = place_location + np.array([0, 0, approach_height])
place_pose = np.eye(4)
place_pose[:3, 3] = place_location
# Convert to joint space using inverse kinematics
waypoint_poses = [home_pose, pick_approach, pick_pose, pick_approach,
place_approach, place_pose, place_approach, home_pose]
waypoint_joints = []
current_joints = np.zeros(6) # Start from home
print("Converting Cartesian waypoints to joint space...")
for i, pose in enumerate(waypoint_poses):
try:
joints, success, _ = planner.serial_manipulator.iterative_inverse_kinematics(
pose, current_joints, max_iterations=200
)
if success:
waypoint_joints.append(joints)
current_joints = joints
print(f" Waypoint {i+1}: ā")
else:
print(f" Waypoint {i+1}: Failed IK, using approximation")
waypoint_joints.append(current_joints)
except Exception as e:
print(f" Waypoint {i+1}: Error {e}")
waypoint_joints.append(current_joints)
# Define segment durations (optimized for speed)
segment_durations = [1.5, 0.8, 0.5, 1.0, 2.0, 0.5, 0.8, 1.5] # seconds
segment_names = [
"Move to pick approach",
"Approach object",
"Pick up",
"Lift object",
"Move to place approach",
"Lower to place",
"Place object",
"Return home"
]
# Generate optimized batch trajectory
print(f"\nGenerating {len(segment_names)} trajectory segments...")
# Prepare batch data
batch_starts = waypoint_joints[:-1]
batch_ends = waypoint_joints[1:]
batch_size = len(batch_starts)
# Use different point densities for different segments
points_per_segment = [75, 40, 25, 50, 100, 25, 40, 75]
all_segments = []
total_computation_time = 0
for i, (start, end, duration, points, name) in enumerate(
zip(batch_starts, batch_ends, segment_durations, points_per_segment, segment_names)
):
print(f" {i+1}. {name} ({duration}s, {points} points)")
planner.reset_performance_stats()
start_time = time.time()
segment = planner.joint_trajectory(
start, end, Tf=duration, N=points, method=5
)
segment_time = time.time() - start_time
total_computation_time += segment_time
stats = planner.get_performance_stats()
used_gpu = stats['gpu_calls'] > 0
print(f" Time: {segment_time:.3f}s ({'GPU' if used_gpu else 'CPU'})")
all_segments.append({
'name': name,
'duration': duration,
'trajectory': segment,
'computation_time': segment_time,
'used_gpu': used_gpu
})
# Combine all segments
print(f"\nCombining trajectory segments...")
combined_positions = []
combined_velocities = []
combined_accelerations = []
for i, segment in enumerate(all_segments):
traj = segment['trajectory']
if i == 0:
# Include all points for first segment
combined_positions.extend(traj['positions'])
combined_velocities.extend(traj['velocities'])
combined_accelerations.extend(traj['accelerations'])
else:
# Skip first point to avoid duplication
combined_positions.extend(traj['positions'][1:])
combined_velocities.extend(traj['velocities'][1:])
combined_accelerations.extend(traj['accelerations'][1:])
# Convert to arrays
combined_trajectory = {
'positions': np.array(combined_positions),
'velocities': np.array(combined_velocities),
'accelerations': np.array(combined_accelerations)
}
total_duration = sum(segment_durations)
total_points = combined_trajectory['positions'].shape[0]
print(f"\nPick-and-Place Trajectory Generated:")
print(f"- Total duration: {total_duration:.1f}s")
print(f"- Total points: {total_points}")
print(f"- Computation time: {total_computation_time:.3f}s")
print(f"- Real-time factor: {total_duration/total_computation_time:.1f}x")
# Analyze trajectory for safety and performance
print(f"\nTrajectory Analysis:")
# Check joint limits compliance
positions = combined_trajectory['positions']
velocities = combined_trajectory['velocities']
accelerations = combined_trajectory['accelerations']
for i in range(6):
joint_range = [np.min(positions[:, i]), np.max(positions[:, i])]
limit_range = planner.joint_limits[i]
if joint_range[0] < limit_range[0] or joint_range[1] > limit_range[1]:
print(f" Joint {i+1}: ā ļø NEAR LIMITS {np.degrees(joint_range).round(1)}° "
f"(limits: {np.degrees(limit_range).round(1)}°)")
else:
margin = min(joint_range[0] - limit_range[0], limit_range[1] - joint_range[1])
print(f" Joint {i+1}: ā Safe margin: {np.degrees(margin).round(1)}°")
# Velocity and acceleration analysis
max_vel = np.max(np.abs(velocities), axis=0)
max_acc = np.max(np.abs(accelerations), axis=0)
print(f"\nMotion Analysis:")
print(f" Max velocities: {max_vel.round(3)} rad/s")
print(f" Max accelerations: {max_acc.round(3)} rad/s²")
return combined_trajectory, all_segments
# Run optimized pick-and-place demonstration
pick_place_traj, segments = optimized_pick_and_place()
Multi-Robot Trajectory Coordinationļ
Optimized trajectory planning for multiple robots with collision avoidance:
def multi_robot_coordination():
"""Demonstrate optimized multi-robot trajectory coordination."""
print("Multi-Robot Trajectory Coordination")
print("=" * 40)
# Simulate 4 robots working in shared workspace
num_robots = 4
robot_configs = []
# Different start/end configurations for each robot
for i in range(num_robots):
start_config = np.random.uniform(-0.5, 0.5, 6) + i * 0.1
end_config = np.random.uniform(-0.5, 0.5, 6) - i * 0.1
robot_configs.append((start_config, end_config))
print(f"Planning trajectories for {num_robots} robots...")
# Method 1: Sequential planning (traditional)
print(f"\n1. Sequential Planning:")
start_time = time.time()
sequential_trajectories = []
for i, (start, end) in enumerate(robot_configs):
traj = planner.joint_trajectory(start, end, Tf=3.0, N=150, method=5)
sequential_trajectories.append(traj)
print(f" Robot {i+1}: {traj['positions'].shape}")
sequential_time = time.time() - start_time
print(f" Total time: {sequential_time:.4f}s")
# Method 2: Batch planning (optimized)
print(f"\n2. Batch Planning (Optimized):")
start_time = time.time()
# Prepare batch data
batch_starts = np.array([config[0] for config in robot_configs])
batch_ends = np.array([config[1] for config in robot_configs])
batch_trajectories = planner.batch_joint_trajectory(
thetastart_batch=batch_starts,
thetaend_batch=batch_ends,
Tf=3.0,
N=150,
method=5
)
batch_time = time.time() - start_time
print(f" Batch shape: {batch_trajectories['positions'].shape}")
print(f" Total time: {batch_time:.4f}s")
print(f" Speedup: {sequential_time/batch_time:.2f}x")
# Method 3: Collision-aware coordination
print(f"\n3. Collision-Aware Coordination:")
start_time = time.time()
# Generate staggered timing to avoid collisions
stagger_delays = [0.0, 0.3, 0.6, 0.9] # seconds
coordinated_trajectories = []
for i, ((start, end), delay) in enumerate(zip(robot_configs, stagger_delays)):
# Extend trajectory duration to accommodate delay
extended_duration = 3.0 + delay
points_with_delay = int(150 * extended_duration / 3.0)
traj = planner.joint_trajectory(
start, end, Tf=extended_duration, N=points_with_delay, method=5
)
# Add delay by padding with start position
delay_points = int(delay * 50) # 50 points per second
if delay_points > 0:
start_padding = np.tile(start.reshape(1, -1), (delay_points, 1))
zero_padding = np.zeros((delay_points, 6))
# Insert delay at beginning
padded_positions = np.vstack([start_padding, traj['positions']])
padded_velocities = np.vstack([zero_padding, traj['velocities']])
padded_accelerations = np.vstack([zero_padding, traj['accelerations']])
coordinated_traj = {
'positions': padded_positions,
'velocities': padded_velocities,
'accelerations': padded_accelerations
}
else:
coordinated_traj = traj
coordinated_trajectories.append(coordinated_traj)
print(f" Robot {i+1}: delay {delay}s, shape {coordinated_traj['positions'].shape}")
coordination_time = time.time() - start_time
print(f" Total time: {coordination_time:.4f}s")
# Analyze coordination effectiveness
print(f"\n4. Coordination Analysis:")
# Check for potential collisions (simplified workspace overlap)
max_timesteps = max(traj['positions'].shape[0] for traj in coordinated_trajectories)
collision_risk_points = 0
for t in range(0, max_timesteps, 10): # Check every 10th timestep
robot_positions = []
for traj in coordinated_trajectories:
if t < traj['positions'].shape[0]:
# Convert joint angles to end-effector position
T = planner.serial_manipulator.forward_kinematics(traj['positions'][t])
robot_positions.append(T[:3, 3])
# Check pairwise distances
for i in range(len(robot_positions)):
for j in range(i+1, len(robot_positions)):
distance = np.linalg.norm(
np.array(robot_positions[i]) - np.array(robot_positions[j])
)
if distance < 0.5: # 50cm safety margin
collision_risk_points += 1
print(f" Collision risk points: {collision_risk_points}")
print(f" Safety score: {max(0, 100 - collision_risk_points*2):.1f}%")
return {
'sequential': sequential_trajectories,
'batch': batch_trajectories,
'coordinated': coordinated_trajectories,
'timing': {
'sequential_time': sequential_time,
'batch_time': batch_time,
'coordination_time': coordination_time
}
}
# Run multi-robot coordination demonstration
multi_robot_results = multi_robot_coordination()
Advanced Optimization Techniquesļ
Adaptive Performance Tuningļ
The planner automatically adapts its execution strategy based on performance history:
def adaptive_tuning_demo():
"""Demonstrate automatic performance tuning capabilities."""
print("Adaptive Performance Tuning Demonstration")
print("=" * 50)
# Test various problem sizes to trigger adaptive behavior
problem_sizes = [
(50, 6), (100, 6), (200, 6), (500, 6), (1000, 6),
(2000, 6), (5000, 6), (1000, 12), (2000, 12)
]
print("Testing adaptive threshold adjustment...")
print(f"Initial threshold: {planner.cpu_threshold}")
for i, (N, joints) in enumerate(problem_sizes):
print(f"\nTest {i+1}: N={N}, joints={joints}")
# Generate test trajectory
theta_start = np.random.uniform(-1, 1, joints)
theta_end = np.random.uniform(-1, 1, joints)
# Reset stats for clean measurement
planner.reset_performance_stats()
start_time = time.time()
trajectory = planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=N, method=5
)
elapsed = time.time() - start_time
# Get updated performance stats
stats = planner.get_performance_stats()
used_gpu = stats['gpu_calls'] > 0
print(f" Execution: {'GPU' if used_gpu else 'CPU'}")
print(f" Time: {elapsed:.4f}s")
print(f" Updated threshold: {planner.cpu_threshold}")
# Show efficiency metrics
if stats['avg_gpu_time'] > 0 and stats['avg_cpu_time'] > 0:
efficiency = stats['avg_cpu_time'] / stats['avg_gpu_time']
print(f" GPU efficiency: {efficiency:.2f}x")
final_stats = planner.get_performance_stats()
print(f"\nFinal Performance Summary:")
print(f"- Total GPU calls: {final_stats['gpu_calls']}")
print(f"- Total CPU calls: {final_stats['cpu_calls']}")
print(f"- GPU usage: {final_stats['gpu_usage_percent']:.1f}%")
print(f"- Final threshold: {planner.cpu_threshold}")
return final_stats
# Run adaptive tuning demonstration
tuning_stats = adaptive_tuning_demo()
Memory Profiling and Optimizationļ
Monitor and optimize memory usage for sustained performance:
def memory_profiling_demo():
"""Demonstrate memory profiling and optimization techniques."""
print("Memory Profiling and Optimization")
print("=" * 40)
# Test sustained performance under memory pressure
trajectory_sizes = [500, 1000, 2000, 5000, 2000, 1000, 500]
print("Testing memory allocation patterns...")
memory_stats = []
for i, N in enumerate(trajectory_sizes):
print(f"\nIteration {i+1}: N={N}")
# Generate large trajectory to stress memory system
theta_start = np.random.uniform(-1, 1, 6)
theta_end = np.random.uniform(-1, 1, 6)
# Measure memory allocation performance
planner.reset_performance_stats()
start_time = time.time()
trajectory = planner.joint_trajectory(
theta_start, theta_end, Tf=3.0, N=N, method=5
)
traj_time = time.time() - start_time
# Compute dynamics to further stress memory
start_time = time.time()
torques = planner.inverse_dynamics_trajectory(
trajectory['positions'],
trajectory['velocities'],
trajectory['accelerations']
)
dynamics_time = time.time() - start_time
stats = planner.get_performance_stats()
memory_stat = {
'iteration': i+1,
'N': N,
'traj_time': traj_time,
'dynamics_time': dynamics_time,
'total_time': traj_time + dynamics_time,
'used_gpu': stats['gpu_calls'] > 0,
'memory_transfers': stats['memory_transfers'],
'kernel_launches': stats['kernel_launches']
}
memory_stats.append(memory_stat)
print(f" Trajectory: {traj_time:.4f}s ({'GPU' if stats['gpu_calls'] > 0 else 'CPU'})")
print(f" Dynamics: {dynamics_time:.4f}s")
print(f" Memory transfers: {stats['memory_transfers']}")
# Analyze memory allocation patterns
print(f"\nMemory Allocation Analysis:")
gpu_times = [s['total_time'] for s in memory_stats if s['used_gpu']]
cpu_times = [s['total_time'] for s in memory_stats if not s['used_gpu']]
if gpu_times:
print(f" GPU times: {np.mean(gpu_times):.4f}s ± {np.std(gpu_times):.4f}s")
print(f" GPU consistency: {(1 - np.std(gpu_times)/np.mean(gpu_times))*100:.1f}%")
if cpu_times:
print(f" CPU times: {np.mean(cpu_times):.4f}s ± {np.std(cpu_times):.4f}s")
print(f" CPU consistency: {(1 - np.std(cpu_times)/np.mean(cpu_times))*100:.1f}%")
# Memory cleanup demonstration
print(f"\nMemory cleanup...")
pre_cleanup_stats = planner.get_performance_stats()
planner.cleanup_gpu_memory()
post_cleanup_stats = planner.get_performance_stats()
print(f" Memory cleanup completed")
print(f" Performance stats preserved: {pre_cleanup_stats == post_cleanup_stats}")
return memory_stats
# Run memory profiling demonstration
memory_profile = memory_profiling_demo()
Performance Visualization and Analysisļ
Advanced Performance Monitoringļ
Comprehensive performance analysis and visualization:
def performance_analysis_suite():
"""Comprehensive performance analysis and visualization."""
print("Performance Analysis Suite")
print("=" * 30)
# Collect performance data across various scenarios
test_scenarios = [
{"name": "Small Problems", "sizes": [(50, 6), (100, 6), (150, 6)]},
{"name": "Medium Problems", "sizes": [(500, 6), (750, 6), (1000, 6)]},
{"name": "Large Problems", "sizes": [(2000, 6), (3000, 6), (5000, 6)]},
{"name": "Many Joints", "sizes": [(500, 12), (1000, 12), (1500, 12)]},
]
all_results = []
for scenario in test_scenarios:
print(f"\n{scenario['name']}:")
scenario_results = []
for N, joints in scenario['sizes']:
print(f" Testing N={N}, joints={joints}...")
# Generate test data
theta_start = np.random.uniform(-1, 1, joints)
theta_end = np.random.uniform(-1, 1, joints)
# Run multiple trials for statistical accuracy
trial_times = []
trial_gpu_usage = []
for trial in range(5): # 5 trials per configuration
planner.reset_performance_stats()
start_time = time.time()
trajectory = planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=N, method=5
)
elapsed = time.time() - start_time
stats = planner.get_performance_stats()
trial_times.append(elapsed)
trial_gpu_usage.append(stats['gpu_calls'] > 0)
# Calculate statistics
mean_time = np.mean(trial_times)
std_time = np.std(trial_times)
gpu_usage_rate = np.mean(trial_gpu_usage)
result = {
'scenario': scenario['name'],
'N': N,
'joints': joints,
'problem_size': N * joints,
'mean_time': mean_time,
'std_time': std_time,
'gpu_usage_rate': gpu_usage_rate,
'performance_score': (N * joints) / mean_time # ops per second
}
scenario_results.append(result)
all_results.append(result)
print(f" Time: {mean_time:.4f}s ± {std_time:.4f}s")
print(f" GPU usage: {gpu_usage_rate*100:.0f}%")
# Performance analysis
print(f"\n" + "=" * 50)
print("Performance Analysis Results:")
# Find optimal problem sizes for GPU
gpu_results = [r for r in all_results if r['gpu_usage_rate'] > 0.5]
cpu_results = [r for r in all_results if r['gpu_usage_rate'] < 0.5]
if gpu_results:
gpu_threshold_size = min(r['problem_size'] for r in gpu_results)
best_gpu_performance = max(r['performance_score'] for r in gpu_results)
print(f" GPU threshold: ~{gpu_threshold_size} total elements")
print(f" Best GPU performance: {best_gpu_performance:.0f} ops/second")
if cpu_results:
best_cpu_performance = max(r['performance_score'] for r in cpu_results)
print(f" Best CPU performance: {best_cpu_performance:.0f} ops/second")
# Performance consistency analysis
gpu_times = [r['mean_time'] for r in gpu_results]
cpu_times = [r['mean_time'] for r in cpu_results]
if gpu_times and cpu_times:
gpu_efficiency = np.mean(cpu_times) / np.mean(gpu_times)
print(f" Average GPU speedup: {gpu_efficiency:.2f}x")
# Memory transfer efficiency
total_gpu_calls = sum(1 for r in all_results if r['gpu_usage_rate'] > 0)
if total_gpu_calls > 0:
final_stats = planner.get_performance_stats()
transfer_efficiency = final_stats['kernel_launches'] / total_gpu_calls
print(f" Memory transfer efficiency: {transfer_efficiency:.2f} kernels/call")
return all_results
# Run comprehensive performance analysis
perf_analysis = performance_analysis_suite()
Deployment Best Practicesļ
Production Deployment Guidelinesļ
Guidelines for deploying optimized trajectory planning in production environments:
def production_deployment_guide():
"""Guidelines and examples for production deployment."""
print("Production Deployment Guidelines")
print("=" * 40)
# Example production configuration
production_config = {
'use_cuda': None, # Auto-detect for flexibility
'cuda_threshold': 200, # Conservative threshold
'memory_pool_size_mb': 512, # Moderate memory pool
'enable_profiling': False, # Disable in production
}
print("1. Production Configuration:")
for key, value in production_config.items():
print(f" {key}: {value}")
# Create production-ready planner
prod_planner = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
torque_limits=torque_limits,
**production_config
)
print(f"\n2. System Capabilities:")
print(f" CUDA available: {prod_planner.cuda_available}")
print(f" GPU properties: {prod_planner.gpu_properties}")
print(f" CPU threshold: {prod_planner.cpu_threshold}")
# Test production performance
print(f"\n3. Production Performance Test:")
# Simulate typical production workload
workload_sizes = [100, 250, 500, 1000, 2000]
workload_results = []
for size in workload_sizes:
theta_start = np.random.uniform(-1, 1, 6)
theta_end = np.random.uniform(-1, 1, 6)
# Measure performance
start_time = time.time()
trajectory = prod_planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=size, method=5
)
elapsed = time.time() - start_time
# Check for acceptable performance
is_realtime = elapsed < 0.1 # 100ms max for real-time
throughput = size / elapsed
result = {
'size': size,
'time': elapsed,
'realtime': is_realtime,
'throughput': throughput
}
workload_results.append(result)
status = "ā" if is_realtime else "ā ļø"
print(f" N={size}: {elapsed:.4f}s {status} ({throughput:.0f} points/s)")
# Production recommendations
print(f"\n4. Production Recommendations:")
realtime_sizes = [r['size'] for r in workload_results if r['realtime']]
if realtime_sizes:
max_realtime = max(realtime_sizes)
print(f" ā Real-time capable up to {max_realtime} points")
best_throughput = max(r['throughput'] for r in workload_results)
print(f" ā Peak throughput: {best_throughput:.0f} points/second")
# Error handling recommendations
print(f"\n5. Error Handling:")
print(f" ā Automatic GPU->CPU fallback enabled")
print(f" ā Memory allocation failures handled gracefully")
print(f" ā Joint limit enforcement active")
print(f" ā Torque limit checking enabled")
# Monitoring recommendations
print(f"\n6. Monitoring Setup:")
print(f" ⢠Monitor planner.get_performance_stats() regularly")
print(f" ⢠Track GPU usage percentage for optimization")
print(f" ⢠Alert on excessive CPU fallback occurrences")
print(f" ⢠Monitor memory allocation patterns")
return prod_planner, workload_results
# Run production deployment guide
prod_planner, prod_results = production_deployment_guide()
Error Handling and Debuggingļ
Comprehensive error handling and debugging tools for production use:
def error_handling_demo():
"""Demonstrate comprehensive error handling and debugging capabilities."""
print("Error Handling and Debugging")
print("=" * 35)
# Test various error conditions
error_tests = [
{
'name': 'Invalid joint limits',
'test': lambda: planner.joint_trajectory(
np.array([5.0, 5.0, 5.0, 5.0, 5.0, 5.0]), # Beyond limits
np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
Tf=2.0, N=100, method=3
)
},
{
'name': 'Zero duration trajectory',
'test': lambda: planner.joint_trajectory(
np.zeros(6), np.ones(6), Tf=0.0, N=100, method=3
)
},
{
'name': 'Invalid method parameter',
'test': lambda: planner.joint_trajectory(
np.zeros(6), np.ones(6), Tf=2.0, N=100, method=7 # Invalid
)
},
{
'name': 'Extremely large trajectory',
'test': lambda: planner.joint_trajectory(
np.zeros(6), np.ones(6), Tf=2.0, N=100000, method=3 # Very large
)
}
]
print("Testing error conditions:")
for i, error_test in enumerate(error_tests, 1):
print(f"\n{i}. {error_test['name']}:")
try:
result = error_test['test']()
print(f" ā Handled gracefully")
print(f" Result shape: {result['positions'].shape}")
except Exception as e:
print(f" ā ļø Exception: {type(e).__name__}: {e}")
# Test memory exhaustion handling
print(f"\n5. Memory exhaustion test:")
try:
# Try to allocate extremely large trajectory
huge_trajectory = planner.joint_trajectory(
np.zeros(6), np.ones(6), Tf=2.0, N=1000000, method=3
)
print(f" ā Large trajectory handled: {huge_trajectory['positions'].shape}")
except Exception as e:
print(f" ā ļø Memory limit reached: {type(e).__name__}")
# Test GPU error recovery
if planner.cuda_available:
print(f"\n6. GPU error recovery test:")
try:
# Force GPU usage with large problem
original_threshold = planner.cpu_threshold
planner.cpu_threshold = 0 # Force GPU
trajectory = planner.joint_trajectory(
np.zeros(6), np.ones(6), Tf=2.0, N=5000, method=5
)
print(f" ā GPU computation successful")
planner.cpu_threshold = original_threshold
except Exception as e:
print(f" ā ļø GPU error handled, fell back to CPU: {e}")
# Debugging utilities demonstration
print(f"\n7. Debugging Utilities:")
# Performance stats for debugging
stats = planner.get_performance_stats()
print(f" Current performance stats:")
for key, value in stats.items():
print(f" {key}: {value}")
# Memory cleanup for debugging
print(f" Memory cleanup status:")
try:
planner.cleanup_gpu_memory()
print(f" ā GPU memory cleaned successfully")
except Exception as e:
print(f" ā ļø Memory cleanup error: {e}")
return True
# Run error handling demonstration
error_test_result = error_handling_demo()
Summary and Migration Guideļ
Migration from Original TrajectoryPlanningļ
Step-by-step migration guide for existing code:
def migration_guide():
"""Guide for migrating from original TrajectoryPlanning to optimized version."""
print("Migration Guide: Original ā Optimized TrajectoryPlanning")
print("=" * 60)
print("1. BACKWARD COMPATIBILITY:")
print(" ā Existing code works unchanged")
print(" ā All method signatures preserved")
print(" ā Return value formats identical")
print(" ā Automatic optimization activation")
print("\n2. SIMPLE MIGRATION (No Code Changes Required):")
print(" # Original code")
print(" from ManipulaPy.path_planning import TrajectoryPlanning")
print(" planner = TrajectoryPlanning(robot, urdf, dynamics, limits)")
print(" trajectory = planner.joint_trajectory(start, end, 2.0, 100, 3)")
print(" ")
print(" # ā Automatically uses OptimizedTrajectoryPlanning!")
print("\n3. ENHANCED MIGRATION (Unlock Full Performance):")
print(" # Use factory function for optimal settings")
print(" from ManipulaPy.path_planning import create_optimized_planner")
print(" planner = create_optimized_planner(")
print(" robot, urdf, dynamics, limits,")
print(" gpu_memory_mb=512, # GPU memory pool")
print(" enable_profiling=True # Performance monitoring")
print(" )")
print("\n4. NEW PERFORMANCE FEATURES:")
print(" # Batch processing for multiple trajectories")
print(" batch_results = planner.batch_joint_trajectory(")
print(" starts_batch, ends_batch, Tf, N, method")
print(" )")
print(" ")
print(" # Performance monitoring")
print(" stats = planner.get_performance_stats()")
print(" print(f'GPU usage: {stats[\"gpu_usage_percent\"]:.1f}%')")
print(" ")
print(" # Memory management")
print(" planner.cleanup_gpu_memory() # Clean up when done")
print("\n5. PERFORMANCE BENEFITS:")
# Demonstrate actual performance improvements
original_planner = TrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits
)
# Test case
theta_start = np.zeros(6)
theta_end = np.array([0.5, 0.3, -0.2, 0.1, 0.4, -0.1])
# Both planners are actually the same optimized implementation now
# but we can show the before/after conceptually
test_sizes = [100, 500, 1000, 2000]
for N in test_sizes:
start_time = time.time()
traj = original_planner.joint_trajectory(
theta_start, theta_end, Tf=2.0, N=N, method=5
)
elapsed = time.time() - start_time
stats = original_planner.get_performance_stats()
used_gpu = stats['gpu_calls'] > 0
print(f" N={N}: {elapsed:.4f}s ({'GPU' if used_gpu else 'CPU'})")
print("\n6. MIGRATION CHECKLIST:")
print(" ā” Update import statements (optional)")
print(" ā” Add performance monitoring (recommended)")
print(" ā” Configure GPU memory pool (optional)")
print(" ā” Add error handling for production (recommended)")
print(" ā” Test with your specific workloads")
print("\n7. TROUBLESHOOTING:")
print(" ⢠GPU not detected? Check CUDA installation")
print(" ⢠Memory errors? Reduce memory_pool_size_mb")
print(" ⢠Performance regression? Check cuda_threshold")
print(" ⢠Need CPU-only? Set use_cuda=False")
return True
# Run migration guide
migration_complete = migration_guide()
Advanced Configuration Examplesļ
Real-world configuration examples for different use cases:
def advanced_configuration_examples():
"""Show advanced configuration examples for different scenarios."""
print("Advanced Configuration Examples")
print("=" * 40)
# Configuration 1: High-throughput batch processing
print("1. HIGH-THROUGHPUT BATCH PROCESSING:")
batch_config = {
'use_cuda': True, # Force GPU usage
'cuda_threshold': 50, # Low threshold for maximum GPU usage
'memory_pool_size_mb': 2048, # Large memory pool for batch ops
'enable_profiling': True # Monitor performance
}
try:
batch_planner = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
**batch_config
)
print(" ā High-throughput planner configured")
print(f" GPU available: {batch_planner.cuda_available}")
print(f" Memory pool: {batch_config['memory_pool_size_mb']} MB")
except Exception as e:
print(f" ā ļø Configuration failed: {e}")
# Configuration 2: Real-time control system
print("\n2. REAL-TIME CONTROL SYSTEM:")
realtime_config = {
'use_cuda': None, # Adaptive based on timing
'cuda_threshold': 200, # Conservative threshold for reliability
'memory_pool_size_mb': 256, # Moderate memory usage
'enable_profiling': False # No profiling overhead
}
realtime_planner = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
**realtime_config
)
print(" ā Real-time planner configured")
print(f" Adaptive execution: {realtime_planner.cuda_available}")
# Configuration 3: Memory-constrained embedded system
print("\n3. MEMORY-CONSTRAINED EMBEDDED:")
embedded_config = {
'use_cuda': False, # CPU-only for embedded
'cuda_threshold': float('inf'), # Never use GPU
'memory_pool_size_mb': None, # No GPU memory pool
'enable_profiling': False # Minimal overhead
}
embedded_planner = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
**embedded_config
)
print(" ā Embedded planner configured")
print(f" CPU-only mode: {not embedded_planner.cuda_available}")
# Configuration 4: Development and debugging
print("\n4. DEVELOPMENT AND DEBUGGING:")
debug_config = {
'use_cuda': None, # Test both paths
'cuda_threshold': 100, # Standard threshold
'memory_pool_size_mb': 512, # Reasonable pool size
'enable_profiling': True # Full profiling enabled
}
debug_planner = OptimizedTrajectoryPlanning(
serial_manipulator=robot,
urdf_path="robot.urdf",
dynamics=dynamics,
joint_limits=joint_limits,
**debug_config
)
print(" ā Debug planner configured")
print(f" Profiling enabled: {debug_planner.enable_profiling}")
# Test each configuration with sample workload
configs = [
("Batch", batch_planner),
("Real-time", realtime_planner),
("Embedded", embedded_planner),
("Debug", debug_planner)
]
print(f"\n5. CONFIGURATION PERFORMANCE TEST:")
test_N = 500
for name, planner_instance in configs:
try:
planner_instance.reset_performance_stats()
start_time = time.time()
trajectory = planner_instance.joint_trajectory(
np.zeros(6), np.ones(6), Tf=2.0, N=test_N, method=5
)
elapsed = time.time() - start_time
stats = planner_instance.get_performance_stats()
used_gpu = stats['gpu_calls'] > 0
print(f" {name}: {elapsed:.4f}s ({'GPU' if used_gpu else 'CPU'})")
except Exception as e:
print(f" {name}: ā ļø Error - {e}")
return {
'batch': batch_planner,
'realtime': realtime_planner,
'embedded': embedded_planner,
'debug': debug_planner
}
# Run advanced configuration examples
config_planners = advanced_configuration_examples()
Conclusion and Best Practicesļ
Performance Summary:
The optimized TrajectoryPlanning class provides significant performance improvements:
Adaptive execution: Automatically chooses optimal CPU/GPU strategy
Batch processing: Up to 10x speedup for multiple trajectories
Memory pooling: Reduces allocation overhead by 50-80%
CUDA acceleration: 2-20x speedup for large problems
Intelligent fallback: Graceful degradation when GPU unavailable
Key Optimizations:
2D Parallelization: Trajectories computed across both time and joint dimensions
Fused Kernels: Multiple operations combined to minimize memory transfers
Pinned Memory: Faster host-device transfers for large datasets
Adaptive Thresholds: Automatic tuning based on performance history
Memory Pooling: Reuse of GPU arrays to eliminate allocation overhead
Best Practices for Production:
Use Factory Function: create_optimized_planner() for automatic optimization
Monitor Performance: Regularly check get_performance_stats()
Configure Memory: Set appropriate memory_pool_size_mb for your workload
Handle Errors: Implement proper error handling for GPU failures
Profile First: Use enable_profiling=True during development
Batch When Possible: Use batch_joint_trajectory() for multiple paths
Clean Up: Call cleanup_gpu_memory() when done
Migration Strategy:
Phase 1: Drop-in replacement (no code changes required)
Phase 2: Add performance monitoring
Phase 3: Enable batch processing where applicable
Phase 4: Fine-tune configuration for your specific use case
Configuration Guidelines:
High-throughput: cuda_threshold=50, large memory_pool_size_mb
Real-time: cuda_threshold=200, moderate memory pool
Embedded: use_cuda=False, minimal memory footprint
Development: enable_profiling=True, adaptive settings
The optimized trajectory planning module maintains full backward compatibility while providing substantial performance improvements. Users can benefit from optimizations immediately with existing code, then gradually adopt advanced features for maximum performance gains.
For the most demanding applications, the combination of GPU acceleration, batch processing, and intelligent memory management can provide order-of-magnitude performance improvements while maintaining the same simple API that makes ManipulaPy easy to use.