In this example, we will review a Python script that prints and stores the effort values from a specified joint. If you are looking for a continuous print of the joint state efforts while Stretch is in action, then you can use the ros2 topic command-line tool shown in the Internal State of Stretch Tutorial.
Begin by running the following command in a terminal.
ros2 launch stretch_core stretch_driver.launch.py
There's no need to switch to the position mode in comparison with ROS1 because the default mode of the launcher is this position mode. Then run the effort_sensing.py node. In a new terminal, execute:
cd ament_ws/src/stretch_tutorials/stretch_ros_tutorials/
python3 effort_sensing.py
This will send a FollowJointTrajectory
command to move Stretch's arm or head while also printing the effort of the lift.
##!/usr/bin/env python3
import rclpy
import hello_helpers.hello_misc as hm
import os
import csv
import time
import pandas as pd
import matplotlib
matplotlib.use('tkagg')
import matplotlib.pyplot as plt
from matplotlib import animation
from datetime import datetime
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
class JointActuatorEffortSensor(hm.HelloNode):
def __init__(self, export_data=True, animate=True):
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'issue_command', 'issue_command', wait_for_first_pointcloud=False)
self.joints = ['joint_lift']
self.joint_effort = []
self.save_path = '/home/hello-robot/ament_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
self.result = False
self.file_name = datetime.now().strftime("effort_sensing_tutorial_%Y%m%d%I")
def issue_command(self):
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = self.joints
point0 = JointTrajectoryPoint()
point0.positions = [0.9]
trajectory_goal.trajectory.points = [point0]
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
while self.joint_state is None:
time.sleep(0.1)
self._send_goal_future = self.trajectory_client.send_goal_async(trajectory_goal, self.feedback_callback)
self.get_logger().info('Sent position goal = {0}'.format(trajectory_goal))
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Failed')
return
self.get_logger().info('Succeded')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
self.result = future.result().result
self.get_logger().info('Sent position goal = {0}'.format(self.result))
def feedback_callback(self, feedback_msg):
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
current_effort = []
for joint in self.joints:
index = self.joint_state.name.index(joint)
current_effort.append(self.joint_state.effort[index])
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
if self.export_data:
file_name = self.file_name
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
def plot_data(self, animate = True):
while not self.result:
time.sleep(0.1)
file_name = self.file_name
self.completeName = os.path.join(self.save_path, file_name)
self.data = pd.read_csv(self.completeName)
self.y_anim = []
self.animate = animate
for joint in self.data.columns:
# Create figure, labels, and title
fig = plt.figure()
plt.title(joint + ' Effort Sensing')
plt.ylabel('Effort')
plt.xlabel('Data Points')
# Conditional statement for animation plotting
if self.animate:
self.effort = self.data[joint]
frames = len(self.effort)-1
anim = animation.FuncAnimation(fig=fig,func=self.plot_animate, repeat=False,blit=False,frames=frames, interval =75)
plt.show()
## If you want to save a video, make sure to comment out plt.show(),
## right before this comment.
# save_path = str(self.completeName + '.mp4')
# anim.save(save_path, writer=animation.FFMpegWriter(fps=10))
# Reset y_anim for the next joint effort animation
del self.y_anim[:]
# Conditional statement for regular plotting (No animation)
else:
self.data[joint].plot(kind='line')
# save_path = str(self.completeName + '.png')
# plt.savefig(save_path, bbox_inches='tight')
plt.show()
def plot_animate(self,i):
# Append self.effort values for given joint
self.y_anim.append(self.effort.values[i])
plt.plot(self.y_anim, color='blue')
def main(self):
self.get_logger().info('issuing command')
self.issue_command()
def main():
try:
node = JointActuatorEffortSensor(export_data=True, animate=True)
node.main()
node.plot_data()
node.new_thread.join()
except KeyboardInterrupt:
node.get_logger().info('interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
This code is similar to that of the multipoint_command and joint_state_printer node. Therefore, this example will highlight sections that are different from those tutorials. Now let's break the code down.
#!/usr/bin/env python3
Every Python ROS Node will have this declaration at the top. The first line makes sure your script is executed as a Python3 script.
import rclpy
import hello_helpers.hello_misc as hm
import os
import csv
import time
import pandas as pd
import matplotlib
matplotlib.use('tkagg')
import matplotlib.pyplot as plt
from matplotlib import animation
from datetime import datetime
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
You need to import rclpy if you are writing a ROS Node. Import the FollowJointTrajectory
from the control_msgs.action
package to control the Stretch robot. Import JointTrajectoryPoint
from the trajectory_msgs
package to define robot trajectories. The hello_helpers
package consists of a module that provides various Python scripts used across stretch_ros. In this instance, we are importing the hello_misc
script.
class JointActuatorEffortSensor(hm.HelloNode):
def __init__(self, export_data=False):
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'issue_command', 'issue_command', wait_for_first_pointcloud=False)
The JointActuatorEffortSensor
class inherits the HelloNode
class from hm
and is initialized also the HelloNode class already have the topic joint_states, thanks to this we don't need to create a subscriber.
self.joints = ['joint_lift']
Create a list of the desired joints you want to print.
self.joint_effort = []
self.save_path = '/home/hello-robot/ament_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
self.result = False
self.file_name = datetime.now().strftime("effort_sensing_tutorial_%Y%m%d%I")
Create an empty list to store the joint effort values. The self.save_path
is the directory path where the .txt file of the effort values will be stored. You can change this path to a preferred directory. The self.export_data
is a boolean and its default value is set to True
. If set to False
, then the joint values will be printed in the terminal, otherwise, it will be stored in a .txt file and that's what we want to see the plot graph. Also we want to give our text file a name since the beginning and we use the self.file_name
to access this later.
self._send_goal_future = self.trajectory_client.send_goal_async(trajectory_goal, self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
The ActionClient.send_goal_async() method returns a future to a goal handle. Include the goal and feedback_callback
functions in the send goal function. Also we need to register a goal_response_callback
for when the future is complete
def goal_response_callback(self,future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Failed')
return
self.get_logger().info('Succeded')
Looking at the goal_response_callback
in more detail we can see if the future is complete with the messages that will appear.
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
We need the goal_handle to request the result with the method get_result_async. With this we will get a future that will complete when the result is ready so we need a callback for this result.
def get_result_callback(self, future):
self.result = future.result().result
self.get_logger().info('Sent position goal = {0}'.format(result))
In the result callback we log the result of our poistion goal
def feedback_callback(self,feedback_msg):
The feedback callback function takes in the FollowJointTrajectoryActionFeedback
message as its argument.
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
Use a conditional statement to replace wrist_extenstion
with joint_arm_l0
. This is because joint_arm_l0
has the effort values that the wrist_extension
is experiencing.
current_effort = []
for joint in self.joints:
index = self.joint_states.name.index(joint)
current_effort.append(self.joint_states.effort[index])
Create an empty list to store the current effort values. Then use a for loop to parse the joint names and effort values.
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
Use a conditional statement to print effort values in the terminal or store values into a list that will be used for exporting the data in a .txt file.
if self.export_data:
file_name = self.file_name
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
A conditional statement is used to export the data to a .txt file. The file's name is set to the one we created at the beginning.
def plot_data(self, animate = True):
while not self.result:
time.sleep(0.1)
file_name = self.file_name
self.completeName = os.path.join(self.save_path, file_name)
self.data = pd.read_csv(self.completeName)
self.y_anim = []
self.animate = animate
This function will help us initialize some values to plot our data, we need to wait until we get the results to start plotting, because the file could still be storing values and we want to plot every point also we need to create an empty list for the animation.
for joint in self.data.columns:
# Create figure, labels, and title
fig = plt.figure()
plt.title(joint + ' Effort Sensing')
plt.ylabel('Effort')
plt.xlabel('Data Points')
Create a for loop to print each joint's effort writing the correct labels for x and y
if self.animate:
self.effort = self.data[joint]
frames = len(self.effort)-1
anim = animation.FuncAnimation(fig=fig,func=self.plot_animate, repeat=False,blit=False,frames=frames, interval =75)
plt.show()
del self.y_anim[:]
else:
self.data[joint].plot(kind='line')
# save_path = str(self.completeName + '.png')
# plt.savefig(save_path, bbox_inches='tight')
plt.show()
This is a conditional statement for the animation plotting
def plot_animate(self,i):
self.y_anim.append(self.effort.values[i])
plt.plot(self.y_anim, color='blue')
We will create another function that will plot every increment in the data frame