Bridging Python AI Agents to ROS 2
π€ Integrating AI with Roboticsβ
One of the most powerful aspects of ROS 2 is its ability to seamlessly integrate AI agents with robotic systems. Using rclpy (ROS Client Library for Python), you can connect machine learning models, decision-making algorithms, and AI agents directly to robot controllers.
π rclpy: The Python Bridgeβ
rclpy is the Python client library for ROS 2. It provides a Pythonic interface to all ROS 2 features.
Basic rclpy Structureβ
import rclpy
from rclpy.node import Node
class AIAgent(Node):
def __init__(self):
super().__init__('ai_agent')
# Initialize your AI model here
self.setup_ai_model()
def setup_ai_model(self):
# Load your trained model
self.get_logger().info('AI model loaded')
def main():
rclpy.init()
agent = AIAgent()
rclpy.spin(agent)
agent.destroy_node()
rclpy.shutdown()
π§ Example: Vision-Based Navigation Agentβ
Let's build an AI agent that processes camera images and publishes navigation commands.
Step 1: Create the AI Vision Nodeβ
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
from cv_bridge import CvBridge
import cv2
import numpy as np
class VisionNavigationAgent(Node):
def __init__(self):
super().__init__('vision_navigation_agent')
# Subscribe to camera feed
self.image_subscription = self.create_subscription(
Image,
'/camera/image_raw',
self.image_callback,
10
)
# Publish velocity commands
self.cmd_publisher = self.create_publisher(
Twist,
'/cmd_vel',
10
)
self.bridge = CvBridge()
self.get_logger().info('Vision Navigation Agent initialized')
def image_callback(self, msg):
# Convert ROS Image to OpenCV format
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
# Process image with AI
navigation_command = self.process_image(cv_image)
# Publish command
self.cmd_publisher.publish(navigation_command)
def process_image(self, image):
"""
AI-based image processing for navigation
"""
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Simple obstacle detection (replace with your AI model)
_, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
# Calculate safe direction
left_region = thresh[:, :thresh.shape[1]//2]
right_region = thresh[:, thresh.shape[1]//2:]
left_clear = np.sum(left_region == 255)
right_clear = np.sum(right_region == 255)
# Create velocity command
cmd = Twist()
cmd.linear.x = 0.5 # Move forward
if left_clear > right_clear:
cmd.angular.z = 0.3 # Turn left
else:
cmd.angular.z = -0.3 # Turn right
return cmd
def main(args=None):
rclpy.init(args=args)
agent = VisionNavigationAgent()
rclpy.spin(agent)
agent.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
π― Integrating Deep Learning Modelsβ
Using PyTorch with ROS 2β
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import String
from cv_bridge import CvBridge
import torch
import torchvision.transforms as transforms
from PIL import Image as PILImage
import io
class ObjectDetectionAgent(Node):
def __init__(self):
super().__init__('object_detection_agent')
# Load pre-trained model
self.model = torch.hub.load('ultralytics/yolov5', 'yolov5s')
self.model.eval()
# ROS 2 interfaces
self.image_sub = self.create_subscription(
Image,
'/camera/image_raw',
self.detect_objects,
10
)
self.detection_pub = self.create_publisher(
String,
'/detected_objects',
10
)
self.bridge = CvBridge()
self.get_logger().info('Object Detection Agent ready')
def detect_objects(self, msg):
# Convert ROS image to PIL
cv_image = self.bridge.imgmsg_to_cv2(msg, desired_encoding='rgb8')
# Run inference
results = self.model(cv_image)
# Extract detections
detections = results.pandas().xyxy[0]
# Publish results
detection_msg = String()
detection_msg.data = str(detections[['name', 'confidence']].to_dict())
self.detection_pub.publish(detection_msg)
self.get_logger().info(f'Detected: {len(detections)} objects')
def main(args=None):
rclpy.init(args=args)
agent = ObjectDetectionAgent()
rclpy.spin(agent)
agent.destroy_node()
rclpy.shutdown()
π Real-Time Decision Makingβ
State Machine for Humanoid Behaviorβ
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Twist
from enum import Enum
class RobotState(Enum):
IDLE = 1
SEARCHING = 2
APPROACHING = 3
GRASPING = 4
RETURNING = 5
class BehaviorAgent(Node):
def __init__(self):
super().__init__('behavior_agent')
self.state = RobotState.IDLE
# Subscriptions
self.object_sub = self.create_subscription(
String,
'/detected_objects',
self.object_callback,
10
)
# Publishers
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
self.status_pub = self.create_publisher(String, '/robot_status', 10)
# Timer for state machine
self.timer = self.create_timer(0.1, self.state_machine_update)
self.target_object = None
self.get_logger().info('Behavior Agent initialized')
def object_callback(self, msg):
# Update target object from detection
self.target_object = msg.data
def state_machine_update(self):
"""Execute state machine logic"""
if self.state == RobotState.IDLE:
self.handle_idle_state()
elif self.state == RobotState.SEARCHING:
self.handle_searching_state()
elif self.state == RobotState.APPROACHING:
self.handle_approaching_state()
elif self.state == RobotState.GRASPING:
self.handle_grasping_state()
elif self.state == RobotState.RETURNING:
self.handle_returning_state()
# Publish current state
status_msg = String()
status_msg.data = f'State: {self.state.name}'
self.status_pub.publish(status_msg)
def handle_idle_state(self):
if self.target_object is None:
self.transition_to(RobotState.SEARCHING)
def handle_searching_state(self):
# Rotate to search for objects
cmd = Twist()
cmd.angular.z = 0.5
self.cmd_pub.publish(cmd)
if self.target_object:
self.transition_to(RobotState.APPROACHING)
def handle_approaching_state(self):
# Move towards detected object
cmd = Twist()
cmd.linear.x = 0.3
self.cmd_pub.publish(cmd)
# Transition when close enough (simplified)
self.transition_to(RobotState.GRASPING)
def handle_grasping_state(self):
# Execute grasping behavior
self.get_logger().info('Grasping object')
self.transition_to(RobotState.RETURNING)
def handle_returning_state(self):
# Return to home position
cmd = Twist()
cmd.linear.x = -0.3
self.cmd_pub.publish(cmd)
def transition_to(self, new_state):
self.get_logger().info(f'Transitioning: {self.state.name} β {new_state.name}')
self.state = new_state
def main(args=None):
rclpy.init(args=args)
agent = BehaviorAgent()
rclpy.spin(agent)
agent.destroy_node()
rclpy.shutdown()
π Multi-Agent Coordinationβ
Coordinating Multiple AI Agentsβ
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import json
class CoordinatorAgent(Node):
def __init__(self):
super().__init__('coordinator_agent')
# Subscribe to agent statuses
self.vision_sub = self.create_subscription(
String, '/vision_agent/status', self.vision_callback, 10
)
self.planning_sub = self.create_subscription(
String, '/planning_agent/status', self.planning_callback, 10
)
# Publish coordination commands
self.coord_pub = self.create_publisher(String, '/coordination', 10)
self.agent_states = {}
self.timer = self.create_timer(1.0, self.coordinate)
def vision_callback(self, msg):
self.agent_states['vision'] = msg.data
def planning_callback(self, msg):
self.agent_states['planning'] = msg.data
def coordinate(self):
"""Coordinate multiple agents"""
if len(self.agent_states) < 2:
return
# Decision logic
coordination_msg = String()
coordination_msg.data = json.dumps({
'timestamp': self.get_clock().now().to_msg(),
'agents': self.agent_states,
'command': 'proceed'
})
self.coord_pub.publish(coordination_msg)
self.get_logger().info('Coordinating agents')
def main(args=None):
rclpy.init(args=args)
coordinator = CoordinatorAgent()
rclpy.spin(coordinator)
coordinator.destroy_node()
rclpy.shutdown()
π Performance Optimizationβ
Tips for Real-Time AI in ROS 2β
-
Use Callbacks Efficiently
- Keep callbacks short and fast
- Offload heavy computation to separate threads
-
Manage Message Queues
- Set appropriate QoS policies
- Use smaller queue sizes for real-time data
-
Optimize AI Models
- Use quantized models for faster inference
- Consider edge deployment (TensorRT, ONNX)
-
Leverage ROS 2 Executors
from rclpy.executors import MultiThreadedExecutor
executor = MultiThreadedExecutor()
executor.add_node(vision_agent)
executor.add_node(planning_agent)
executor.spin()
π― Key Takeawaysβ
- rclpy provides seamless Python integration with ROS 2
- AI models can be directly integrated into ROS 2 nodes
- State machines enable complex robotic behaviors
- Multi-agent systems can be coordinated through ROS 2 topics
- Performance optimization is crucial for real-time AI applications
π§ͺ Practice Exerciseβ
Build a complete AI agent that:
- Subscribes to a camera topic
- Runs object detection using a pre-trained model
- Publishes detection results
- Controls robot movement based on detections
- Implements a state machine for behavior control
Next: URDF for Humanoids β