Browse Source

Create and tested new ROS2 tutorial effort sensing

humble
Jesus Eduardo Rodriguez 1 year ago
committed by GitHub
parent
commit
3b993ab844
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 169 additions and 0 deletions
  1. +169
    -0
      stretch_ros_tutorials/effort_sensing.py

+ 169
- 0
stretch_ros_tutorials/effort_sensing.py View File

@ -0,0 +1,169 @@
##!/usr/bin/env python3
import rclpy
import hello_helpers.hello_misc as hm
import os
import csv
import pandas as pd
import matplotlib.pyplot as plt
import time
from matplotlib import animation
from datetime import datetime
from control_msgs.action import FollowJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
class JointActuatorEffortSensor(hm.HelloNode):
"""
A class that sends multiple joint trajectory goals to a single joint.
"""
def __init__(self, export_data=True, animate=True):
"""
Function that initializes the subscriber,and other features.
:param self: The self reference.
:param export_data: A boolean message type.
"""
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'issue_command', 'issue_command', wait_for_first_pointcloud=False)
self.joints = ['joint_lift']
self.joint_effort = []
self.save_path = '/home/hello-robot/ament_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
def issue_command(self):
"""
Function that makes an action call and sends joint trajectory goals
to a single joint.
:param self: The self reference.
"""
trajectory_goal = FollowJointTrajectory.Goal()
trajectory_goal.trajectory.joint_names = self.joints
point0 = JointTrajectoryPoint()
point0.positions = [0.9]
trajectory_goal.trajectory.points = [point0]
trajectory_goal.trajectory.header.stamp = self.get_clock().now().to_msg()
trajectory_goal.trajectory.header.frame_id = 'base_link'
self._send_goal_future = self.trajectory_client.send_goal_async(trajectory_goal, self.feedback_callback)
self.get_logger().info('Sent position goal = {0}'.format(trajectory_goal))
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Failed')
return
self.get_logger().info('Succeded')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Sent position goal = {0}'.format(result))
def feedback_callback(self, feedback_msg):
"""
The feedback_callback function deals with the incoming feedback messages
from the trajectory_client. Although, in this function, we do not use the
feedback information.
:param self: The self reference.
:param feedback: FollowJointTrajectoryActionFeedback message.
"""
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
current_effort = []
for joint in self.joints:
index = self.joint_state.name.index(joint)
current_effort.append(self.joint_state.effort[index])
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
if self.export_data:
file_name = datetime.now().strftime("%Y-%m-%d_%I-%p")
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
def plot_data(self, animate = True):
time.sleep(0.1)
file_name = datetime.now().strftime("%Y-%m-%d_%I-%p")
self.completeName = os.path.join(self.save_path, file_name)
self.data = pd.read_csv(self.completeName)
self.y_anim = []
self.animate = animate
for joint in self.data.columns:
# Create figure, labels, and title
fig = plt.figure()
plt.title(joint + ' Effort Sensing')
plt.ylabel('Effort')
plt.xlabel('Data Points')
# Conditional statement for animation plotting
if self.animate:
self.effort = self.data[joint]
frames = len(self.effort)-1
anim = animation.FuncAnimation(fig=fig,func=self.plot_animate, repeat=False,blit=False,frames=frames, interval =75)
plt.show()
## If you want to save a video, make sure to comment out plt.show(),
## right before this comment.
# save_path = str(self.completeName + '.mp4')
# anim.save(save_path, writer=animation.FFMpegWriter(fps=10))
# Reset y_anim for the next joint effort animation
del self.y_anim[:]
# Conditional statement for regular plotting (No animation)
else:
self.data[joint].plot(kind='line')
# save_path = str(self.completeName + '.png')
# plt.savefig(save_path, bbox_inches='tight')
plt.show()
def plot_animate(self,i):
"""
Function that plots every increment of the dataframe.
:param self: The self reference.
:param i: index value.
"""
# Append self.effort values for given joint
self.y_anim.append(self.effort.values[i])
plt.plot(self.y_anim, color='blue')
def main(self):
"""
Function that initiates the issue_command function.
:param self: The self reference.
"""
self.get_logger().info('issuing command')
self.issue_command()
def main():
try:
node = JointActuatorEffortSensor(export_data=True, animate=True)
node.main()
node.plot_data()
node.new_thread.join()
except KeyboardInterrupt:
node.get_logger().info('interrupt received, so shutting down')
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

Loading…
Cancel
Save