Browse Source

Created and tested new ROS2 tutorial for the ArUco tag locator

humble
Jesus Eduardo Rodriguez 1 year ago
committed by GitHub
parent
commit
78e295b922
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 212 additions and 0 deletions
  1. +212
    -0
      stretch_ros_tutorials/aruco_tag_locator.py

+ 212
- 0
stretch_ros_tutorials/aruco_tag_locator.py View File

@ -0,0 +1,212 @@
#!/usr/bin/env python3
# Import modules
import rclpy
import time
import tf2_ros
from tf2_ros import TransformException
from rclpy.time import Time
from math import pi
# Import hello_misc script for handling trajectory goals with an action client
import hello_helpers.hello_misc as hm
# We're going to subscribe to a JointState message type, so we need to import
# the definition for it
from sensor_msgs.msg import JointState
# Import the FollowJointTrajectory from the control_msgs.action package to
# control the Stretch robot
from control_msgs.action import FollowJointTrajectory
# Import JointTrajectoryPoint from the trajectory_msgs package to define
# robot trajectories
from trajectory_msgs.msg import JointTrajectoryPoint
# Import TransformStamped from the geometry_msgs package for the publisher
from geometry_msgs.msg import TransformStamped
class LocateArUcoTag(hm.HelloNode):
"""
A class that actuates the RealSense camera to find the docking station's
ArUco tag and returns a Transform between the `base_link` and the requested tag.
"""
def __init__(self):
"""
A function that initializes the subscriber and other needed variables.
:param self: The self reference.
"""
# Initialize the inhereted hm.Hellonode class
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'aruco_tag_locator', 'aruco_tag_locator', wait_for_first_pointcloud=False)
# Initialize subscriber
self.joint_states_sub = self.create_subscription(JointState, '/stretch/joint_states', self.joint_states_callback, 1)
# Initialize publisher
self.transform_pub = self.create_publisher(TransformStamped, 'ArUco_transform', 10)
# Initialize the variable that will store the joint state positions
self.joint_state = None
# Provide the min and max joint positions for the head pan. These values
# are needed for sweeping the head to search for the ArUco tag
self.min_pan_position = -3.8
self.max_pan_position = 1.50
# Define the number of steps for the sweep, then create the step size for
# the head pan joint
self.pan_num_steps = 10
self.pan_step_size = abs(self.min_pan_position - self.max_pan_position)/self.pan_num_steps
# Define the min tilt position, number of steps, and step size
self.min_tilt_position = -0.75
self.tilt_num_steps = 3
self.tilt_step_size = pi/16
# Define the head actuation rotational velocity
self.rot_vel = 0.5 # radians per sec
def joint_states_callback(self, msg):
"""
A callback function that stores Stretch's joint states.
:param self: The self reference.
:param msg: The JointState message type.
"""
self.joint_state = msg
def send_command(self, command):
'''
Handles single joint control commands by constructing a FollowJointTrajectoryGoal
message and sending it to the trajectory_client created in hello_misc.
:param self: The self reference.
:param command: A dictionary message type.
'''
if (self.joint_state is not None) and (command is not None):
# Extract the string value from the `joint` key
joint_name = command['joint']
# Set trajectory_goal as a FollowJointTrajectory.Goal and define
# the joint name
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = [joint_name]
# Create a JointTrajectoryPoint message type
point = JointTrajectoryPoint()
# Check to see if `delta` is a key in the command dictionary
if 'delta' in command:
# Get the current position of the joint and add the delta as a
# new position value
joint_index = self.joint_state.name.index(joint_name)
joint_value = self.joint_state.position[joint_index]
delta = command['delta']
new_value = joint_value + delta
point.positions = [new_value]
# Check to see if `position` is a key in the command dictionary
elif 'position' in command:
# extract the head position value from the `position` key
point.positions = [command['position']]
# Set the rotational velocity
point.velocities = [self.rot_vel]
# Assign goal position with updated point variable
trajectory_goal.trajectory.points = [point]
# Specify the coordinate frame that we want (base_link) and set the time to be now.
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
# Make the action call and send the goal. The last line of code waits
# for the result
self.trajectory_client.send_goal(trajectory_goal)
def find_tag(self, tag_name='docking_station'):
"""
A function that actuates the camera to search for a defined ArUco tag
marker. Then the function returns the pose.
:param self: The self reference.
:param tag_name: A string value of the ArUco marker name.
:returns transform: The docking station's TransformStamped message.
"""
# Create dictionaries to get the head in its initial position
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'position': self.min_tilt_position}
self.send_command(tilt_command)
# Nested for loop to sweep the joint_head_pan and joint_head_tilt in increments
for i in range(self.tilt_num_steps):
for j in range(self.pan_num_steps):
# Update the joint_head_pan position by the pan_step_size
pan_command = {'joint': 'joint_head_pan', 'delta': self.pan_step_size}
self.send_command(pan_command)
# Give time for system to do a Transform lookup before next step
time.sleep(0.2)
# Use a try-except block
try:
now = Time()
# Look up transform between the base_link and requested ArUco tag
transform = self.tf_buffer.lookup_transform('base_link',
tag_name,
now)
self.get_logger().info("Found Requested Tag: \n%s", transform)
# Publish the transform
self.transform_pub.publish(transform)
# Return the transform
return transform
except TransformException as ex:
continue
# Begin sweep with new tilt angle
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'delta': self.tilt_step_size}
self.send_command(tilt_command)
time.sleep(0.25)
# Notify that the requested tag was not found
self.get_logger().info("The requested tag '%s' was not found", tag_name)
def main(self):
"""
Function that initiates the issue_command function.
:param self: The self reference.
"""
# Create a StaticTranformBoradcaster Node. Also, start a Tf buffer that
# will store the tf information for a few seconds.Then set up a tf listener, which
# will subscribe to all of the relevant tf topics, and keep track of the information
self.static_broadcaster = tf2_ros.StaticTransformBroadcaster(self)
self.tf_buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tf_buffer, self)
# Give the listener some time to accumulate transforms
time.sleep(1.0)
# Notify Stretch is searching for the ArUco tag with `get_logger().info()`
self.get_logger().info('Searching for docking ArUco tag')
# Search for the ArUco marker for the docking station
pose = self.find_tag("docking_station")
def main():
try:
# Instantiate the `LocateArUcoTag()` object
node = LocateArUcoTag()
# Run the `main()` method
node.main()
node.new_thread.join()
except:
node.get_logger().info('Interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

Loading…
Cancel
Save