Browse Source

Created new ROS2 instructions for tutorials

pull/10/head
hello-jesus 1 year ago
parent
commit
ad355bc4c8
4 changed files with 1128 additions and 0 deletions
  1. +120
    -0
      ros2/aruco_marker_detection.md
  2. +458
    -0
      ros2/example_12.md
  3. +208
    -0
      ros2/example_5.md
  4. +342
    -0
      ros2/example_6.md

+ 120
- 0
ros2/aruco_marker_detection.md View File

@ -0,0 +1,120 @@
## ArUco Marker Detector
For this tutorial, we will go over how to detect Stretch's ArUco markers and review the files that hold the information for the tags.
### Visualize ArUco Markers in RViz
Begin by running the stretch driver launch file.
```{.bash .shell-prompt}
ros2 launch stretch_core stretch_driver.launch.py
```
To activate the RealSense camera and publish topics to be visualized, run the following launch file in a new terminal.
```{.bash .shell-prompt}
ros2 launch stretch_core d435i_high_resolution.launch
```
Next, in a new terminal, run the stretch ArUco launch file which will bring up the [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) node.
```{.bash .shell-prompt}
ros2 launch stretch_core stretch_aruco.launch.py
```
Within this tutorial package, there is an [RViz config file](https://github.com/hello-robot/stretch_tutorials/blob/iron/rviz/aruco_detector_example.rviz) with the topics for the transform frames in the Display tree. You can visualize these topics and the robot model by running the command below in a new terminal.
```{.bash .shell-prompt}
ros2 run rviz2 rviz2 -d /home/hello-robot/ament_ws/src/stretch_tutorials/rviz/aruco_detector_example.rviz
```
You are going to need to teleoperate Stretch's head to detect the ArUco marker tags. Run the following command in a new terminal and control the head to point the camera toward the markers.
```{.bash .shell-prompt}
ros2 run stretch_core keyboard_teleop
```
<p align="center">
<img src="https://raw.githubusercontent.com/hello-robot/stretch_tutorials/noetic/images/aruco_detector.gif"/>
</p>
### The ArUco Marker Dictionary
When defining the ArUco markers on Stretch, hello robot utilizes a YAML file, [stretch_marker_dict.yaml](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/config/stretch_marker_dict.yaml), that holds the information about the markers.
If [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) node doesn’t find an entry in [stretch_marker_dict.yaml](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/config/stretch_marker_dict.yaml) for a particular ArUco marker ID number, it uses the default entry. For example, most robots have shipped with the following default entry:
```yaml
'default':
'length_mm': 24
'use_rgb_only': False
'name': 'unknown'
'link': None
```
and the following entry for the ArUco marker on the top of the wrist
```yaml
'133':
'length_mm': 23.5
'use_rgb_only': False
'name': 'wrist_top'
'link': 'link_aruco_top_wrist'
```
**Dictionary Breakdown**
```yaml
'133':
```
The dictionary key for each entry is the ArUco marker’s ID number or `default`. For example, the entry shown above for the ArUco marker on the top of the wrist assumes that the marker’s ID number is `133`.
```yaml
'length_mm': 23.5
```
The `length_mm` value used by [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) is important for estimating the pose of an ArUco marker.
!!! note
If the actual width and height of the marker do not match this value, then pose estimation will be poor. Thus, carefully measure custom Aruco markers.
```yaml
'use_rgb_only': False
```
If `use_rgb_only` is `True`, [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) will ignore depth images from the [Intel RealSense D435i depth camera](https://www.intelrealsense.com/depth-camera-d435i/) when estimating the pose of the marker and will instead only use RGB images from the D435i.
```yaml
'name': 'wrist_top'
```
`name` is used for the text string of the ArUco marker’s [ROS Marker](http://docs.ros.org/en/melodic/api/visualization_msgs/html/msg/Marker.html) in the [ROS MarkerArray](http://docs.ros.org/en/melodic/api/visualization_msgs/html/msg/MarkerArray.html) Message published by the [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) ROS node.
```yaml
'link': 'link_aruco_top_wrist'
```
`link` is currently used by [stretch_calibration](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_calibration/stretch_calibration/collect_head_calibration_data.py). It is the name of the link associated with a body-mounted ArUco marker in the [robot’s URDF](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_description/urdf/stretch_aruco.xacro).
It’s good practice to add an entry to [stretch_marker_dict.yaml](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/config/stretch_marker_dict.yaml) for each ArUco marker you use.
### Create a New ArUco Marker
At Hello Robot, we’ve used the following guide when generating new ArUco markers.
We generate ArUco markers using a 6x6-bit grid (36 bits) with 250 unique codes. This corresponds with[ DICT_6X6_250 defined in OpenCV](https://docs.opencv.org/3.4/d9/d6a/group__aruco.html). We generate markers using this [online ArUco marker generator](https://chev.me/arucogen/) by setting the Dictionary entry to 6x6 and then setting the Marker ID and Marker size, mm as appropriate for the specific application. We strongly recommend measuring the actual marker by hand before adding an entry for it to [stretch_marker_dict.yaml](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/config/stretch_marker_dict.yaml).
We select marker ID numbers using the following ranges.
* 0 - 99: reserved for users
* 100 - 249: reserved for official use by Hello Robot Inc.
* 100 - 199: reserved for robots with distinct sets of body-mounted markers
* Allows different robots near each other to use distinct sets of body-mounted markers to avoid confusion. This could be valuable for various uses of body-mounted markers, including calibration, visual servoing, visual motion capture, and multi-robot tasks.
* 5 markers per robot = 2 on the mobile base + 2 on the wrist + 1 on the shoulder
* 20 distinct sets = 100 available ID numbers / 5 ID numbers per robot
* 200 - 249: reserved for official accessories
* 245 for the prototype docking station
* 246-249 for large floor markers
When coming up with this guide, we expected the following:
* Body-mounted accessories with the same ID numbers mounted to different robots could be disambiguated using the expected range of 3D locations of the ArUco markers on the calibrated body.
* Accessories in the environment with the same ID numbers could be disambiguated using a map or nearby observable features of the environment.

+ 458
- 0
ros2/example_12.md View File

@ -0,0 +1,458 @@
# Example 12
For this example, we will send follow joint trajectory commands for the head camera to search and locate an ArUco tag. In this instance, a Stretch robot will try to locate the docking station's ArUco tag.
## Modifying Stretch Marker Dictionary YAML File
When defining the ArUco markers on Stretch, hello robot utilizes a YAML file, [stretch_marker_dict.yaml](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/config/stretch_marker_dict.yaml), that holds the information about the markers. A further breakdown of the YAML file can be found in our [Aruco Marker Detection](aruco_marker_detection.md) tutorial.
Below is what needs to be included in the [stretch_marker_dict.yaml](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/config/stretch_marker_dict.yaml) file so the [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) node can find the docking station's ArUco tag.
```yaml
'245':
'length_mm': 88.0
'use_rgb_only': False
'name': 'docking_station'
'link': None
```
## Getting Started
Begin by running the stretch driver launch file.
```{.bash .shell-prompt}
ros2 launch stretch_core stretch_driver.launch.py
```
To activate the RealSense camera and publish topics to be visualized, run the following launch file in a new terminal.
```{.bash .shell-prompt}
ros2 launch stretch_core d435i_high_resolution.launch.py
```
Next, run the stretch ArUco launch file which will bring up the [detect_aruco_markers](https://github.com/hello-robot/stretch_ros2/blob/iron/stretch_core/stretch_core/detect_aruco_markers.py) node. In a new terminal, execute:
```{.bash .shell-prompt}
ros2 launch stretch_core stretch_aruco.launch.py
```
Within this tutorial package, there is an [RViz config file](https://github.com/hello-robot/stretch_tutorials/blob/iron/rviz/aruco_detector_example.rviz) with the topics for the transform frames in the Display tree. You can visualize these topics and the robot model by running the command below in a new terminal.
```{.bash .shell-prompt}
ros2 run rviz2 rviz2 -d /home/hello-robot/ament_ws/src/stretch_tutorials/rviz/aruco_detector_example.rviz
```
Then run the [aruco_tag_locator.py](https://github.com/hello-robot/stretch_tutorials/blob/iron/stretch_ros_tutorials/aruco_tag_locator.py) node. In a new terminal, execute:
```{.bash .shell-prompt}
cd ament_ws/src/stretch_tutorials/stretch_ros_tutorials/
python3 aruco_tag_locator.py
```
<p align="center">
<img src="https://raw.githubusercontent.com/hello-robot/stretch_tutorials/noetic/images/aruco_locator.gif"/>
</p>
### The Code
```python
#!/usr/bin/env python3
# Import modules
import rclpy
import time
import tf2_ros
from tf2_ros import TransformException
from rclpy.time import Time
from math import pi
# Import hello_misc script for handling trajectory goals with an action client
import hello_helpers.hello_misc as hm
# We're going to subscribe to a JointState message type, so we need to import
# the definition for it
from sensor_msgs.msg import JointState
# Import the FollowJointTrajectory from the control_msgs.action package to
# control the Stretch robot
from control_msgs.action import FollowJointTrajectory
# Import JointTrajectoryPoint from the trajectory_msgs package to define
# robot trajectories
from trajectory_msgs.msg import JointTrajectoryPoint
# Import TransformStamped from the geometry_msgs package for the publisher
from geometry_msgs.msg import TransformStamped
class LocateArUcoTag(hm.HelloNode):
"""
A class that actuates the RealSense camera to find the docking station's
ArUco tag and returns a Transform between the `base_link` and the requested tag.
"""
def __init__(self):
"""
A function that initializes the subscriber and other needed variables.
:param self: The self reference.
"""
# Initialize the inhereted hm.Hellonode class
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'aruco_tag_locator', 'aruco_tag_locator', wait_for_first_pointcloud=False)
# Initialize subscriber
self.joint_states_sub = self.create_subscription(JointState, '/stretch/joint_states', self.joint_states_callback, 1)
# Initialize publisher
self.transform_pub = self.create_publisher(TransformStamped, 'ArUco_transform', 10)
# Initialize the variable that will store the joint state positions
self.joint_state = None
# Provide the min and max joint positions for the head pan. These values
# are needed for sweeping the head to search for the ArUco tag
self.min_pan_position = -3.8
self.max_pan_position = 1.50
# Define the number of steps for the sweep, then create the step size for
# the head pan joint
self.pan_num_steps = 10
self.pan_step_size = abs(self.min_pan_position - self.max_pan_position)/self.pan_num_steps
# Define the min tilt position, number of steps, and step size
self.min_tilt_position = -0.75
self.tilt_num_steps = 3
self.tilt_step_size = pi/16
# Define the head actuation rotational velocity
self.rot_vel = 0.5 # radians per sec
def joint_states_callback(self, msg):
"""
A callback function that stores Stretch's joint states.
:param self: The self reference.
:param msg: The JointState message type.
"""
self.joint_state = msg
def send_command(self, command):
'''
Handles single joint control commands by constructing a FollowJointTrajectoryGoal
message and sending it to the trajectory_client created in hello_misc.
:param self: The self reference.
:param command: A dictionary message type.
'''
if (self.joint_state is not None) and (command is not None):
# Extract the string value from the `joint` key
joint_name = command['joint']
# Set trajectory_goal as a FollowJointTrajectory.Goal and define
# the joint name
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = [joint_name]
# Create a JointTrajectoryPoint message type
point = JointTrajectoryPoint()
# Check to see if `delta` is a key in the command dictionary
if 'delta' in command:
# Get the current position of the joint and add the delta as a
# new position value
joint_index = self.joint_state.name.index(joint_name)
joint_value = self.joint_state.position[joint_index]
delta = command['delta']
new_value = joint_value + delta
point.positions = [new_value]
# Check to see if `position` is a key in the command dictionary
elif 'position' in command:
# extract the head position value from the `position` key
point.positions = [command['position']]
# Set the rotational velocity
point.velocities = [self.rot_vel]
# Assign goal position with updated point variable
trajectory_goal.trajectory.points = [point]
# Specify the coordinate frame that we want (base_link) and set the time to be now.
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
# Make the action call and send the goal. The last line of code waits
# for the result
self.trajectory_client.send_goal(trajectory_goal)
def find_tag(self, tag_name='docking_station'):
"""
A function that actuates the camera to search for a defined ArUco tag
marker. Then the function returns the pose.
:param self: The self reference.
:param tag_name: A string value of the ArUco marker name.
:returns transform: The docking station's TransformStamped message.
"""
# Create dictionaries to get the head in its initial position
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'position': self.min_tilt_position}
self.send_command(tilt_command)
# Nested for loop to sweep the joint_head_pan and joint_head_tilt in increments
for i in range(self.tilt_num_steps):
for j in range(self.pan_num_steps):
# Update the joint_head_pan position by the pan_step_size
pan_command = {'joint': 'joint_head_pan', 'delta': self.pan_step_size}
self.send_command(pan_command)
# Give time for system to do a Transform lookup before next step
time.sleep(0.2)
# Use a try-except block
try:
now = Time()
# Look up transform between the base_link and requested ArUco tag
transform = self.tf_buffer.lookup_transform('base_link',
tag_name,
now)
self.get_logger().info("Found Requested Tag: \n%s", transform)
# Publish the transform
self.transform_pub.publish(transform)
# Return the transform
return transform
except TransformException as ex:
continue
# Begin sweep with new tilt angle
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'delta': self.tilt_step_size}
self.send_command(tilt_command)
time.sleep(0.25)
# Notify that the requested tag was not found
self.get_logger().info("The requested tag '%s' was not found", tag_name)
def main(self):
"""
Function that initiates the issue_command function.
:param self: The self reference.
"""
# Create a StaticTranformBoradcaster Node. Also, start a Tf buffer that
# will store the tf information for a few seconds.Then set up a tf listener, which
# will subscribe to all of the relevant tf topics, and keep track of the information
self.static_broadcaster = tf2_ros.StaticTransformBroadcaster(self)
self.tf_buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tf_buffer, self)
# Give the listener some time to accumulate transforms
time.sleep(1.0)
# Notify Stretch is searching for the ArUco tag with `get_logger().info()`
self.get_logger().info('Searching for docking ArUco tag')
# Search for the ArUco marker for the docking station
pose = self.find_tag("docking_station")
def main():
try:
# Instantiate the `LocateArUcoTag()` object
node = LocateArUcoTag()
# Run the `main()` method
node.main()
node.new_thread.join()
except:
node.get_logger().info('Interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
### The Code Explained
Now let's break the code down.
```python
#!/usr/bin/env python3
```
Every Python ROS [Node](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html) will have this declaration at the top. The first line makes sure your script is executed as a Python3 script.
```python
import rclpy
import time
import tf2_ros
from tf2_ros import TransformException
from rclpy.time import Time
from math import pi
import hello_helpers.hello_misc as hm
from sensor_msgs.msg import JointState
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
from geometry_msgs.msg import TransformStamped
```
You need to import `rclpy` if you are writing a ROS [Node](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html). Import other python modules needed for this node. Import the `FollowJointTrajectory` from the [control_msgs.action](http://wiki.ros.org/control_msgs) package to control the Stretch robot. Import `JointTrajectoryPoint` from the [trajectory_msgs](https://github.com/ros2/common_interfaces/tree/iron/trajectory_msgs) package to define robot trajectories. The [hello_helpers](https://github.com/hello-robot/stretch_ros2/tree/iron/hello_helpers) package consists of a module that provides various Python scripts used across [stretch_ros](https://github.com/hello-robot/stretch_ros2). In this instance, we are importing the `hello_misc` script.
```python
def __init__(self):
# Initialize the inhereted hm.Hellonode class
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'aruco_tag_locator', 'aruco_tag_locator', wait_for_first_pointcloud=False)
# Initialize subscriber
self.joint_states_sub = self.create_subscription(JointState, '/stretch/joint_states', self.joint_states_callback, 1)
# Initialize publisher
self.transform_pub = self.create_publisher(TransformStamped, 'ArUco_transform', 10)
# Initialize the variable that will store the joint state positions
self.joint_state = None
```
The `LocateArUcoTag` class inherits the `HelloNode` class from `hm` and is instantiated.
Set up a subscriber with `self.create_subscription(JointState, '/stretch/joint_states', self.joint_states_callback, 1)`. We're going to subscribe to the topic `stretch/joint_states`, looking for `JointState` messages. When a message comes in, ROS is going to pass it to the function `joint_states_callback()` automatically.
`self.create_publisher(TransformStamped, 'ArUco_transform', 10)` declares that your node is publishing to the `ArUco_transform` topic using the message type `TransformStamped`. The `10` argument limits the amount of queued messages if any subscriber is not receiving them fast enough.
```python
self.min_pan_position = -4.10
self.max_pan_position = 1.50
self.pan_num_steps = 10
self.pan_step_size = abs(self.min_pan_position - self.max_pan_position)/self.pan_num_steps
```
Provide the minimum and maximum joint positions for the head pan. These values are needed for sweeping the head to search for the ArUco tag. We also define the number of steps for the sweep, then create the step size for the head pan joint.
```python
self.min_tilt_position = -0.75
self.tilt_num_steps = 3
self.tilt_step_size = pi/16
```
Set the minimum position of the tilt joint, the number of steps, and the size of each step.
```python
self.rot_vel = 0.5 # radians per sec
```
Define the head actuation rotational velocity.
```python
def joint_states_callback(self, msg):
self.joint_state = msg
```
The `joint_states_callback()` function stores Stretch's joint states.
```python
def send_command(self, command):
if (self.joint_state is not None) and (command is not None):
joint_name = command['joint']
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = [joint_name]
point = JointTrajectoryPoint()
```
Assign `trajectory_goal` as a `FollowJointTrajectory.Goal` message type. Then extract the string value from the `joint` key. Also, assign `point` as a `JointTrajectoryPoint` message type.
```python
if 'delta' in command:
joint_index = self.joint_state.name.index(joint_name)
joint_value = self.joint_state.position[joint_index]
delta = command['delta']
new_value = joint_value + delta
point.positions = [new_value]
```
Check to see if `delta` is a key in the command dictionary. Then get the current position of the joint and add the delta as a new position value.
```python
elif 'position' in command:
point.positions = [command['position']]
```
Check to see if `position` is a key in the command dictionary. Then extract the position value.
```python
point.velocities = [self.rot_vel]
trajectory_goal.trajectory.points = [point]
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
self.trajectory_client.send_goal(trajectory_goal)
```
Then `trajectory_goal.trajectory.points` is defined by the positions set in `point`. Specify the coordinate frame that we want (*base_link*) and set the time to be now. Make the action call and send the goal.
```python
def find_tag(self, tag_name='docking_station'):
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'position': self.min_tilt_position}
self.send_command(tilt_command)
```
Create a dictionary to get the head in its initial position for its search and send the commands with the `send_command()` function.
```python
for i in range(self.tilt_num_steps):
for j in range(self.pan_num_steps):
pan_command = {'joint': 'joint_head_pan', 'delta': self.pan_step_size}
self.send_command(pan_command)
time.sleep(0.5)
```
Utilize a nested for loop to sweep the pan and tilt in increments. Then update the `joint_head_pan` position by the `pan_step_size`. Use `time.sleep()` function to give time to the system to do a Transform lookup before the next step.
```python
try:
now = Time()
transform = self.tf_buffer.lookup_transform('base_link',
tag_name,
now)
self.get_logger().info("Found Requested Tag: \n%s", transform)
self.transform_pub.publish(transform)
return transform
except TransformException as ex:
continue
```
Use a try-except block to look up the transform between the *base_link* and the requested ArUco tag. Then publish and return the `TransformStamped` message.
```python
pan_command = {'joint': 'joint_head_pan', 'position': self.min_pan_position}
self.send_command(pan_command)
tilt_command = {'joint': 'joint_head_tilt', 'delta': self.tilt_step_size}
self.send_command(tilt_command)
time.sleep(.25)
```
Begin sweep with new tilt angle.
```python
def main(self):
self.static_broadcaster = tf2_ros.StaticTransformBroadcaster(self)
self.tf_buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tf_buffer, self)
time.sleep(1.0)
```
Create a StaticTranformBoradcaster Node. Also, start a tf buffer that will store the tf information for a few seconds. Then set up a tf listener, which will subscribe to all of the relevant tf topics, and keep track of the information. Include `time.sleep(1.0)` to give the listener some time to accumulate transforms.
```python
self.get_logger().info('Searching for docking ArUco tag')
pose = self.find_tag("docking_station")
```
Notice Stretch is searching for the ArUco tag with a `self.get_logger().info()` function. Then search for the ArUco marker for the docking station.
```python
def main():
try:
node = LocateArUcoTag()
node.main()
node.new_thread.join()
except:
node.get_logger().info('Interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
```
Instantiate the `LocateArUcoTag()` object and run the `main()` method.

+ 208
- 0
ros2/example_5.md View File

@ -0,0 +1,208 @@
## Example 5
In this example, we will review a Python script that prints out the positions of a selected group of Stretch joints. This script is helpful if you need the joint positions after you teleoperated Stretch with the Xbox controller or physically moved the robot to the desired configuration after hitting the run stop button.
If you are looking for a continuous print of the joint states while Stretch is in action, then you can use the [ros2 topic command-line tool](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Topics/Understanding-ROS2-Topics.html) shown in the [Internal State of Stretch Tutorial](https://github.com/hello-robot/stretch_tutorials/blob/master/ros2/internal_state_of_stretch.md).
Begin by starting up the stretch driver launch file.
```{.bash .shell-prompt}
ros2 launch stretch_core stretch_driver.launch.py
```
You can then hit the run-stop button (you should hear a beep and the LED light in the button blink) and move the robot's joints to a desired configuration. Once you are satisfied with the configuration, hold the run-stop button until you hear a beep. Then run the following command to execute the [joint_state_printer.py](https://github.com/hello-robot/stretch_tutorials/blob/iron/stretch_ros_tutorials/joint_state_printer.py) which will print the joint positions of the lift, arm, and wrist. In a new terminal, execute:
```{.bash .shell-prompt}
cd ament_ws/src/stretch_tutorials/stretch_ros_tutorials/
python3 joint_state_printer.py
```
Your terminal will output the `position` information of the previously mentioned joints shown below.
```{.bash .no-copy}
name: ['joint_lift', 'wrist_extension', 'joint_wrist_yaw']
position: [0.6043133175850597, 0.19873586673129257, 0.017257283863713464]
```
!!! note
Stretch's arm has four prismatic joints and the sum of their positions gives the *wrist_extension* distance. The *wrist_extension* is needed when sending [joint trajectory commands](https://github.com/hello-robot/stretch_tutorials/blob/master/ros2/follow_joint_trajectory.md) to the robot. Further, you can not actuate an individual arm joint. Here is an image of the arm joints for reference:
<p align="center">
<img src="https://raw.githubusercontent.com/hello-robot/stretch_tutorials/noetic/images/joints.png"/>
</p>
### The Code
```python
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import sys
import time
# We're going to subscribe to a JointState message type, so we need to import
# the definition for it
from sensor_msgs.msg import JointState
class JointStatePublisher(Node):
"""
A class that prints the positions of desired joints in Stretch.
"""
def __init__(self):
"""
Function that initializes the subscriber.
:param self: The self reference
"""
super().__init__('stretch_joint_state')
# Set up a subscriber. We're going to subscribe to the topic "joint_states"
self.sub = self.create_subscription(JointState, 'joint_states', self.callback, 1)
def callback(self, msg):
"""
Callback function to deal with the incoming JointState messages.
:param self: The self reference.
:param msg: The JointState message.
"""
# Store the joint messages for later use
self.get_logger().info('Receiving JointState messages')
self.joint_states = msg
def print_states(self, joints):
"""
print_states function to deal with the incoming JointState messages.
:param self: The self reference.
:param joints: A list of string values of joint names.
"""
# Create an empty list that will store the positions of the requested joints
joint_positions = []
# Use of forloop to parse the names of the requested joints list.
# The index() function returns the index at the first occurrence of
# the name of the requested joint in the self.joint_states.name list
for joint in joints:
if joint == "wrist_extension":
index = self.joint_states.name.index('joint_arm_l0')
joint_positions.append(4*self.joint_states.position[index])
continue
index = self.joint_states.name.index(joint)
joint_positions.append(self.joint_states.position[index])
# Print the joint position values to the terminal
print("name: " + str(joints))
print("position: " + str(joint_positions))
# Sends a signal to rclpy to shutdown the ROS interfaces
rclpy.shutdown()
# Exit the Python interpreter
sys.exit(0)
def main(args=None):
# Initialize the node
rclpy.init(args=args)
joint_publisher = JointStatePublisher()
time.sleep(1)
rclpy.spin_once(joint_publisher)
# Create a list of the joints and name them joints. These will be an argument
# for the print_states() function
joints = ["joint_lift", "wrist_extension", "joint_wrist_yaw"]
joint_publisher.print_states(joints)
# Give control to ROS. This will allow the callback to be called whenever new
# messages come in. If we don't put this line in, then the node will not work,
# and ROS will not process any messages
rclpy.spin(joint_publisher)
if __name__ == '__main__':
main()
```
### The Code Explained
Now let's break the code down.
```python
#!/usr/bin/env python3
```
Every Python ROS [Node](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html) will have this declaration at the top. The first line makes sure your script is executed as a Python3 script.
```python
import rclpy
import sys
import time
from rclpy.node import Node
from sensor_msgs.msg import JointState
```
You need to import rclpy if you are writing a ROS 2 Node. Import `sensor_msgs.msg` so that we can subscribe to `JointState` messages.
```python
self.sub = self.create_subscription(JointState, 'joint_states', self.callback, 1)
```
Set up a subscriber. We're going to subscribe to the topic `joint_states`, looking for `JointState` messages. When a message comes in, ROS is going to pass it to the function "callback" automatically
```python
def callback(self, msg):
self.joint_states = msg
```
This is the callback function where the `JointState` messages are stored as `self.joint_states`. Further information about this message type can be found here: [JointState Message](http://docs.ros.org/en/lunar/api/sensor_msgs/html/msg/JointState.html)
```python
def print_states(self, joints):
joint_positions = []
```
This is the `print_states()` function which takes in a list of joints of interest as its argument. the is also an empty list set as `joint_positions` and this is where the positions of the requested joints will be appended.
```python
for joint in joints:
if joint == "wrist_extension":
index = self.joint_states.name.index('joint_arm_l0')
joint_positions.append(4*self.joint_states.position[index])
continue
index = self.joint_states.name.index(joint)
joint_positions.append(self.joint_states.position[index])
```
In this section of the code, a for loop is used to parse the names of the requested joints from the `self.joint_states` list. The `index()` function returns the index of the name of the requested joint and appends the respective position to the `joint_positions` list.
```python
rclpy.shutdown()
sys.exit(0)
```
The first line of code initiates a clean shutdown of ROS. The second line of code exits the Python interpreter.
```python
rclpy.init(args=args)
joint_publisher = JointStatePublisher()
time.sleep(1)
```
The next line, rclpy.init_node initializes the node. In this case, your node will take on the name 'stretch_joint_state'. **NOTE:** the name must be a base name, i.e. it cannot contain any slashes "/".
Declare object, *joint_publisher*, from the `JointStatePublisher` class.
The use of the `time.sleep()` function is to allow the *joint_publisher* class to initialize all of its features before requesting to publish joint positions of desired joints (running the `print_states()` method).
```python
joints = ["joint_lift", "wrist_extension", "joint_wrist_yaw"]
#joints = ["joint_head_pan","joint_head_tilt", joint_gripper_finger_left", "joint_gripper_finger_right"]
joint_publisher.print_states(joints)
```
Create a list of the desired joints that you want positions to be printed. Then use that list as an argument for the `print_states()` method.
```python
rclpy.spin(joint_publisher)
```
Give control to ROS. This will allow the callback to be called whenever new messages come in. If we don't put this line in, then the node will not work, and ROS will not process any messages.

+ 342
- 0
ros2/example_6.md View File

@ -0,0 +1,342 @@
## Example 6
In this example, we will review a Python script that prints and stores the effort values from a specified joint. If you are looking for a continuous print of the joint state efforts while Stretch is in action, then you can use the [rostopic command-line tool](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Topics/Understanding-ROS2-Topics.html) shown in the [Internal State of Stretch Tutorial](https://github.com/hello-robot/stretch_tutorials/blob/master/ros2/internal_state_of_stretch.md).
<p align="center">
<img src="https://raw.githubusercontent.com/hello-robot/stretch_tutorials/noetic/images/effort_sensing.gif"/>
</p>
Begin by running the following command in a terminal.
```{.bash .shell-prompt}
ros2 launch stretch_core stretch_driver.launch.py
```
There's no need to switch to the position mode in comparison with ROS1 because the default mode of the launcher is this position mode. Then run the [effort_sensing.py](https://github.com/hello-robot/stretch_tutorials/blob/iron/stretch_ros_tutorials/effort_sensing.py) node. In a new terminal, execute:
```{.bash .shell-prompt}
cd ament_ws/src/stretch_tutorials/stretch_ros_tutorials/
python3 effort_sensing.py
```
This will send a `FollowJointTrajectory` command to move Stretch's arm or head while also printing the effort of the lift.
### The Code
```python
##!/usr/bin/env python3
import rclpy
import hello_helpers.hello_misc as hm
import os
import csv
import pandas as pd
import matplotlib.pyplot as plt
import time
from matplotlib import animation
from datetime import datetime
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
class JointActuatorEffortSensor(hm.HelloNode):
def __init__(self, export_data=True, animate=True):
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'issue_command', 'issue_command', wait_for_first_pointcloud=False)
self.joints = ['joint_lift']
self.joint_effort = []
self.save_path = '/home/hello-robot/ament_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
def issue_command(self):
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = self.joints
point0 = JointTrajectoryPoint()
point0.positions = [0.9]
trajectory_goal.trajectory.points = [point0]
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
self._send_goal_future = self.trajectory_client.send_goal_async(trajectory_goal, self.feedback_callback)
self.get_logger().info('Sent position goal = {0}'.format(trajectory_goal))
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Failed')
return
self.get_logger().info('Succeded')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Sent position goal = {0}'.format(result))
def feedback_callback(self, feedback_msg):
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
current_effort = []
for joint in self.joints:
index = self.joint_state.name.index(joint)
current_effort.append(self.joint_state.effort[index])
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
if self.export_data:
file_name = datetime.now().strftime("%Y-%m-%d_%I-%p")
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
def plot_data(self, animate = True):
time.sleep(0.1)
file_name = datetime.now().strftime("%Y-%m-%d_%I-%p")
self.completeName = os.path.join(self.save_path, file_name)
self.data = pd.read_csv(self.completeName)
self.y_anim = []
self.animate = animate
for joint in self.data.columns:
# Create figure, labels, and title
fig = plt.figure()
plt.title(joint + ' Effort Sensing')
plt.ylabel('Effort')
plt.xlabel('Data Points')
# Conditional statement for animation plotting
if self.animate:
self.effort = self.data[joint]
frames = len(self.effort)-1
anim = animation.FuncAnimation(fig=fig,func=self.plot_animate, repeat=False,blit=False,frames=frames, interval =75)
plt.show()
## If you want to save a video, make sure to comment out plt.show(),
## right before this comment.
# save_path = str(self.completeName + '.mp4')
# anim.save(save_path, writer=animation.FFMpegWriter(fps=10))
# Reset y_anim for the next joint effort animation
del self.y_anim[:]
# Conditional statement for regular plotting (No animation)
else:
self.data[joint].plot(kind='line')
# save_path = str(self.completeName + '.png')
# plt.savefig(save_path, bbox_inches='tight')
plt.show()
def plot_animate(self,i):
# Append self.effort values for given joint
self.y_anim.append(self.effort.values[i])
plt.plot(self.y_anim, color='blue')
def main(self):
self.get_logger().info('issuing command')
self.issue_command()
def main():
try:
node = JointActuatorEffortSensor(export_data=True, animate=True)
node.main()
node.plot_data()
node.new_thread.join()
except KeyboardInterrupt:
node.get_logger().info('interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
```
### The Code Explained
This code is similar to that of the [multipoint_command](https://github.com/hello-robot/stretch_tutorials/blob/iron/stretch_ros_tutorials/multipoint_command.py) and [joint_state_printer](https://github.com/hello-robot/stretch_tutorials/blob/iron/stretch_ros_tutorials/joint_state_printer.py) node. Therefore, this example will highlight sections that are different from those tutorials. Now let's break the code down.
```python
#!/usr/bin/env python3
```
Every Python ROS [Node](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html) will have this declaration at the top. The first line makes sure your script is executed as a Python3 script.
```python
import rclpy
import hello_helpers.hello_misc as hm
import os
import csv
import pandas as pd
import matplotlib.pyplot as plt
import time
from matplotlib import animation
from datetime import datetime
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
```
You need to import rclpy if you are writing a ROS [Node](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Understanding-ROS2-Nodes/Understanding-ROS2-Nodes.html). Import the `FollowJointTrajectory` from the `control_msgs.action` package to control the Stretch robot. Import `JointTrajectoryPoint` from the `trajectory_msgs` package to define robot trajectories. The `hello_helpers` package consists of a module that provides various Python scripts used across [stretch_ros](https://github.com/hello-robot/stretch_ros2). In this instance, we are importing the `hello_misc` script.
```Python
class JointActuatorEffortSensor(hm.HelloNode):
def __init__(self, export_data=False):
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'issue_command', 'issue_command', wait_for_first_pointcloud=False)
```
The `JointActuatorEffortSensor` class inherits the `HelloNode` class from `hm` and is initialized also the HelloNode class already have the topic joint_states, thanks to this we don't need to create a subscriber.
```python
self.joints = ['joint_lift']
```
Create a list of the desired joints you want to print.
```Python
self.joint_effort = []
self.save_path = '/home/hello-robot/ament_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
```
Create an empty list to store the joint effort values. The `self.save_path` is the directory path where the .txt file of the effort values will be stored. You can change this path to a preferred directory. The `self.export_data` is a boolean and its default value is set to `True`. If set to `False`, then the joint values will be printed in the terminal, otherwise, it will be stored in a .txt file and that's what we want to see the plot graph.
```python
self._send_goal_future = self.trajectory_client.send_goal_async(trajectory_goal, self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
```
The [ActionClient.send_goal_async()](https://docs.ros2.org/latest/api/rclpy/api/actions.html#rclpy.action.client.ActionClient.send_goal_async) method returns a future to a goal handle. Include the goal and `feedback_callback` functions in the send goal function. Also we need to register a `goal_response_callback` for when the future is complete
```python
def goal_response_callback(self,future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Failed')
return
self.get_logger().info('Succeded')
```
Looking at the `goal_response_callback` in more detail we can see if the future is complete with the messages that will appear.
```python
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
```
We need the goal_handle to request the result with the method get_result_async. With this we will get a future that will complete when the result is ready so we need a callback for this result.
```python
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Sent position goal = {0}'.format(result))
```
In the result callback we log the result of our poistion goal
```python
def feedback_callback(self,feedback_msg):
```
The feedback callback function takes in the `FollowJointTrajectoryActionFeedback` message as its argument.
```python
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
```
Use a conditional statement to replace `wrist_extenstion` with `joint_arm_l0`. This is because `joint_arm_l0` has the effort values that the `wrist_extension` is experiencing.
```python
current_effort = []
for joint in self.joints:
index = self.joint_states.name.index(joint)
current_effort.append(self.joint_states.effort[index])
```
Create an empty list to store the current effort values. Then use a for loop to parse the joint names and effort values.
```python
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
```
Use a conditional statement to print effort values in the terminal or store values into a list that will be used for exporting the data in a .txt file.
```python
if self.export_data:
file_name = datetime.now().strftime("%Y-%m-%d_%I-%p")
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
```
A conditional statement is used to export the data to a .txt file. The file's name is set to the date and time the node was executed.
```python
def plot_data(self, animate = True):
time.sleep(0.1)
file_name = datetime.now().strftime("%Y-%m-%d_%I-%p")
self.completeName = os.path.join(self.save_path, file_name)
self.data = pd.read_csv(self.completeName)
self.y_anim = []
self.animate = animate
```
This function will help us initialize some values to plot our data, the file is going to be the one we created and we need to create an empty list for the animation
```python
for joint in self.data.columns:
# Create figure, labels, and title
fig = plt.figure()
plt.title(joint + ' Effort Sensing')
plt.ylabel('Effort')
plt.xlabel('Data Points')
```
Create a for loop to print each joint's effort writing the correct labels for x and y
```python
if self.animate:
self.effort = self.data[joint]
frames = len(self.effort)-1
anim = animation.FuncAnimation(fig=fig,func=self.plot_animate, repeat=False,blit=False,frames=frames, interval =75)
plt.show()
del self.y_anim[:]
else:
self.data[joint].plot(kind='line')
# save_path = str(self.completeName + '.png')
# plt.savefig(save_path, bbox_inches='tight')
plt.show()
```
This is a conditional statement for the animation plotting
```python
def plot_animate(self,i):
self.y_anim.append(self.effort.values[i])
plt.plot(self.y_anim, color='blue')
```
We will create another function that will plot every increment in the data frame
<p align="center">
<img src="https://raw.githubusercontent.com/hello-robot/stretch_tutorials/noetic/stored_data/2022-06-30_11:26:20-AM.png"/>
</p>

Loading…
Cancel
Save