From 78e295b9227452c41841c4ec785b7bc2290d037c Mon Sep 17 00:00:00 2001 From: Jesus Eduardo Rodriguez <141784078+hello-jesus@users.noreply.github.com> Date: Thu, 7 Sep 2023 15:23:21 -0700 Subject: [PATCH] Created and tested new ROS2 tutorial for the ArUco tag locator --- stretch_ros_tutorials/aruco_tag_locator.py | 212 +++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 stretch_ros_tutorials/aruco_tag_locator.py diff --git a/stretch_ros_tutorials/aruco_tag_locator.py b/stretch_ros_tutorials/aruco_tag_locator.py new file mode 100644 index 0000000..6ddb983 --- /dev/null +++ b/stretch_ros_tutorials/aruco_tag_locator.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 + +# Import modules +import rclpy +import time +import tf2_ros +from tf2_ros import TransformException +from rclpy.time import Time +from math import pi + +# Import hello_misc script for handling trajectory goals with an action client +import hello_helpers.hello_misc as hm + +# We're going to subscribe to a JointState message type, so we need to import +# the definition for it +from sensor_msgs.msg import JointState + +# Import the FollowJointTrajectory from the control_msgs.action package to +# control the Stretch robot +from control_msgs.action import FollowJointTrajectory + +# Import JointTrajectoryPoint from the trajectory_msgs package to define +# robot trajectories +from trajectory_msgs.msg import JointTrajectoryPoint + +# Import TransformStamped from the geometry_msgs package for the publisher +from geometry_msgs.msg import TransformStamped + +class LocateArUcoTag(hm.HelloNode): + """ + A class that actuates the RealSense camera to find the docking station's + ArUco tag and returns a Transform between the `base_link` and the requested tag. + """ + def __init__(self): + """ + A function that initializes the subscriber and other needed variables. + :param self: The self reference. + """ + # Initialize the inhereted hm.Hellonode class + hm.HelloNode.__init__(self) + hm.HelloNode.main(self, 'aruco_tag_locator', 'aruco_tag_locator', wait_for_first_pointcloud=False) + # Initialize subscriber + self.joint_states_sub = self.create_subscription(JointState, '/stretch/joint_states', self.joint_states_callback, 1) + # Initialize publisher + self.transform_pub = self.create_publisher(TransformStamped, 'ArUco_transform', 10) + + # Initialize the variable that will store the joint state positions + self.joint_state = None + + # Provide the min and max joint positions for the head pan. These values + # are needed for sweeping the head to search for the ArUco tag + self.min_pan_position = -3.8 + self.max_pan_position = 1.50 + + # Define the number of steps for the sweep, then create the step size for + # the head pan joint + self.pan_num_steps = 10 + self.pan_step_size = abs(self.min_pan_position - self.max_pan_position)/self.pan_num_steps + + # Define the min tilt position, number of steps, and step size + self.min_tilt_position = -0.75 + self.tilt_num_steps = 3 + self.tilt_step_size = pi/16 + + # Define the head actuation rotational velocity + self.rot_vel = 0.5 # radians per sec + + def joint_states_callback(self, msg): + """ + A callback function that stores Stretch's joint states. + :param self: The self reference. + :param msg: The JointState message type. + """ + self.joint_state = msg + + def send_command(self, command): + ''' + Handles single joint control commands by constructing a FollowJointTrajectoryGoal + message and sending it to the trajectory_client created in hello_misc. + :param self: The self reference. + :param command: A dictionary message type. + ''' + if (self.joint_state is not None) and (command is not None): + + # Extract the string value from the `joint` key + joint_name = command['joint'] + + # Set trajectory_goal as a FollowJointTrajectory.Goal and define + # the joint name + trajectory_goal = FollowJointTrajectory.Goal() + trajectory_goal.trajectory.joint_names = [joint_name] + + # Create a JointTrajectoryPoint message type + point = JointTrajectoryPoint() + + # Check to see if `delta` is a key in the command dictionary + if 'delta' in command: + # Get the current position of the joint and add the delta as a + # new position value + joint_index = self.joint_state.name.index(joint_name) + joint_value = self.joint_state.position[joint_index] + delta = command['delta'] + new_value = joint_value + delta + point.positions = [new_value] + + # Check to see if `position` is a key in the command dictionary + elif 'position' in command: + # extract the head position value from the `position` key + point.positions = [command['position']] + + # Set the rotational velocity + point.velocities = [self.rot_vel] + + # Assign goal position with updated point variable + trajectory_goal.trajectory.points = [point] + + # Specify the coordinate frame that we want (base_link) and set the time to be now. + trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg() + trajectory_goal.trajectory.header.frame_id = 'base_link' + + # Make the action call and send the goal. The last line of code waits + # for the result + self.trajectory_client.send_goal(trajectory_goal) + + def find_tag(self, tag_name='docking_station'): + """ + A function that actuates the camera to search for a defined ArUco tag + marker. Then the function returns the pose. + :param self: The self reference. + :param tag_name: A string value of the ArUco marker name. + + :returns transform: The docking station's TransformStamped message. + """ + # Create dictionaries to get the head in its initial position + pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position} + self.send_command(pan_command) + tilt_command = {'joint': 'joint_head_tilt', 'position': self.min_tilt_position} + self.send_command(tilt_command) + + # Nested for loop to sweep the joint_head_pan and joint_head_tilt in increments + for i in range(self.tilt_num_steps): + for j in range(self.pan_num_steps): + # Update the joint_head_pan position by the pan_step_size + pan_command = {'joint': 'joint_head_pan', 'delta': self.pan_step_size} + self.send_command(pan_command) + + # Give time for system to do a Transform lookup before next step + time.sleep(0.2) + + # Use a try-except block + try: + now = Time() + # Look up transform between the base_link and requested ArUco tag + transform = self.tf_buffer.lookup_transform('base_link', + tag_name, + now) + self.get_logger().info("Found Requested Tag: \n%s", transform) + + # Publish the transform + self.transform_pub.publish(transform) + + # Return the transform + return transform + except TransformException as ex: + continue + + # Begin sweep with new tilt angle + pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position} + self.send_command(pan_command) + tilt_command = {'joint': 'joint_head_tilt', 'delta': self.tilt_step_size} + self.send_command(tilt_command) + time.sleep(0.25) + + # Notify that the requested tag was not found + self.get_logger().info("The requested tag '%s' was not found", tag_name) + + def main(self): + """ + Function that initiates the issue_command function. + :param self: The self reference. + """ + # Create a StaticTranformBoradcaster Node. Also, start a Tf buffer that + # will store the tf information for a few seconds.Then set up a tf listener, which + # will subscribe to all of the relevant tf topics, and keep track of the information + self.static_broadcaster = tf2_ros.StaticTransformBroadcaster(self) + self.tf_buffer = tf2_ros.Buffer() + self.listener = tf2_ros.TransformListener(self.tf_buffer, self) + + # Give the listener some time to accumulate transforms + time.sleep(1.0) + + # Notify Stretch is searching for the ArUco tag with `get_logger().info()` + self.get_logger().info('Searching for docking ArUco tag') + + # Search for the ArUco marker for the docking station + pose = self.find_tag("docking_station") + +def main(): + try: + # Instantiate the `LocateArUcoTag()` object + node = LocateArUcoTag() + # Run the `main()` method + node.main() + node.new_thread.join() + except: + node.get_logger().info('Interrupt received, so shutting down') + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main()