Browse Source

Included comments and fixed logic in the feedback_callback function.

pull/14/head
Alan G. Sanchez 2 years ago
parent
commit
f90cf699df
1 changed files with 40 additions and 22 deletions
  1. +40
    -22
      src/effort_sensing.py

+ 40
- 22
src/effort_sensing.py View File

@ -7,8 +7,6 @@ import actionlib
import os import os
import csv import csv
from datetime import datetime
# Import the FollowJointTrajectoryGoal from the control_msgs.msg package to # Import the FollowJointTrajectoryGoal from the control_msgs.msg package to
# control the Stretch robot. # control the Stretch robot.
from control_msgs.msg import FollowJointTrajectoryGoal from control_msgs.msg import FollowJointTrajectoryGoal
@ -24,33 +22,40 @@ from sensor_msgs.msg import JointState
# Import hello_misc script for handling trajecotry goals with an action client. # Import hello_misc script for handling trajecotry goals with an action client.
import hello_helpers.hello_misc as hm import hello_helpers.hello_misc as hm
class SingleJointActuator(hm.HelloNode):
# Import datetime for naming the exported data files
from datetime import datetime
class JointActuatorEffortSensor(hm.HelloNode):
""" """
A class that sends multiple joint trajectory goals to a single joint. A class that sends multiple joint trajectory goals to a single joint.
""" """
# Initialize the inhereted hm.Hellonode class. # Initialize the inhereted hm.Hellonode class.
def __init__(self): def __init__(self):
"""
Function that initializes the subscriber,and other features.
:param self: The self reference.
"""
hm.HelloNode.__init__(self) hm.HelloNode.__init__(self)
# Set up a subscriber. We're going to subscribe to the topic "joint_states" # Set up a subscriber. We're going to subscribe to the topic "joint_states"
self.sub = rospy.Subscriber('joint_states', JointState, self.callback) self.sub = rospy.Subscriber('joint_states', JointState, self.callback)
# Create a list of joints
self.joints = ['joint_lift'] self.joints = ['joint_lift']
self.joint_positions = []
# Create an empty list for later storage.
self.joint_effort = [] self.joint_effort = []
# Create path to save effort and position values # Create path to save effort and position values
self.save_path = '/home/hello-robot/catkin_ws/src/stretch_ros_tutorials/stored_data' self.save_path = '/home/hello-robot/catkin_ws/src/stretch_ros_tutorials/stored_data'
# Create boolean data type for conditional statements if you want to export effort data.
self.export_data = False self.export_data = False
def callback(self, msg): def callback(self, msg):
""" """
Callback function to deal with the incoming JointState messages.
Callback function to update and store JointState messages.
:param self: The self reference. :param self: The self reference.
:param msg: The JointState message. :param msg: The JointState message.
""" """
@ -71,37 +76,39 @@ class SingleJointActuator(hm.HelloNode):
trajectory_goal.trajectory.joint_names = self.joints trajectory_goal.trajectory.joint_names = self.joints
# Provide desired positions for joint name. # Provide desired positions for joint name.
# Set positions for the following 5 trajectory points.
point0 = JointTrajectoryPoint() point0 = JointTrajectoryPoint()
point0.positions = [0.9] point0.positions = [0.9]
# Then trajectory_goal.trajectory.points is set as a list of the joint
# trajectory points
# Set a list to the trajectory_goal.trajectory.points
trajectory_goal.trajectory.points = [point0] trajectory_goal.trajectory.points = [point0]
# Specify the coordinate frame that we want (base_link) and set the time to be now. # Specify the coordinate frame that we want (base_link) and set the time to be now.
trajectory_goal.trajectory.header.stamp = rospy.Time(0.0) trajectory_goal.trajectory.header.stamp = rospy.Time(0.0)
trajectory_goal.trajectory.header.frame_id = 'base_link' trajectory_goal.trajectory.header.frame_id = 'base_link'
# Make the action call and send the goal. The last line of code waits
# for the result before it exits the python script.
# Make the action call and send the goal. Also define the feedback and
# done callback function.
self.trajectory_client.send_goal(trajectory_goal, feedback_cb=self.feedback_callback, done_cb=self.done_callback) self.trajectory_client.send_goal(trajectory_goal, feedback_cb=self.feedback_callback, done_cb=self.done_callback)
rospy.loginfo('Sent stow goal = {0}'.format(trajectory_goal)) rospy.loginfo('Sent stow goal = {0}'.format(trajectory_goal))
self.trajectory_client.wait_for_result() self.trajectory_client.wait_for_result()
def feedback_callback(self,feedback): def feedback_callback(self,feedback):
""" """
print_states function to deal with the incoming JointState messages.
The feedback_callback function deals with the incoming feedback messages
from the trajectory_client. Although, in this function, we do not use the
feedback information.
:param self: The self reference. :param self: The self reference.
:param joints: A list of joint names.
:param feedback: FollowJointTrajectoryActilnFeedback message.
""" """
# Conditional statement for replacement of joint names if wrist_extension
# is in the self.joint_names list.
if 'wrist_extension' in self.joints: if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension') self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0') self.joints.append('joint_arm_l0')
# create an empty list of current effort values that will be appended # create an empty list of current effort values that will be appended
# to the overall joint_effort
# to the overall joint_effort list.
current_effort = [] current_effort = []
# Use of forloop to parse the names of the requested joints list. # Use of forloop to parse the names of the requested joints list.
@ -111,27 +118,38 @@ class SingleJointActuator(hm.HelloNode):
index = self.joint_states.name.index(joint) index = self.joint_states.name.index(joint)
current_effort.append(self.joint_states.effort[index]) current_effort.append(self.joint_states.effort[index])
# If the self.export_data boolean is false, then print the names and efforts
# of the joints in the terminal.
if not self.export_data: if not self.export_data:
# Print the joint position values to the terminal # Print the joint position values to the terminal
print("name: " + str(self.joints)) print("name: " + str(self.joints))
print("effort: " + str(current_effort)) print("effort: " + str(current_effort))
# Else, append the current effort to the joint_effort list.
else: else:
self.joint_effort.append(current_effort) self.joint_effort.append(current_effort)
def done_callback(self,status, result):
def done_callback(self, status, result):
"""
The done_callback function will be called when the joint action is complete.
Within this function we export the data to a .txt file in the /stored_data directory.
:param self: The self reference.
:param feedback: FollowJointTrajectoryActionFeedback message.
"""
# Conditional statemets to notify whether the action succeeded or failed.
if status == actionlib.GoalStatus.SUCCEEDED: if status == actionlib.GoalStatus.SUCCEEDED:
rospy.loginfo('Suceeded') rospy.loginfo('Suceeded')
else: else:
rospy.loginfo('Failed') rospy.loginfo('Failed')
# Conditional statement for exporting data.
if self.export_data: if self.export_data:
# File name is the date and time the action was complete
file_name = datetime.now().strftime("%Y-%m-%d_%I:%M:%S-%p") file_name = datetime.now().strftime("%Y-%m-%d_%I:%M:%S-%p")
completeName = os.path.join(self.save_path, file_name) completeName = os.path.join(self.save_path, file_name)
rows = zip(self.joint_positions, self.joint_effort)
# Write the joint names and efforts to a .txt file.
with open(completeName, "w") as f: with open(completeName, "w") as f:
writer = csv.writer(f) writer = csv.writer(f)
writer.writerow(self.joints) writer.writerow(self.joints)
@ -153,9 +171,9 @@ class SingleJointActuator(hm.HelloNode):
if __name__ == '__main__': if __name__ == '__main__':
try: try:
# Initialize the SingleJointActuator() class and set it to node and run the
# main() function.
node = SingleJointActuator()
# Initialize the SingleJointActuator() class and set it to node and run the
# main() function.
node = JointActuatorEffortSensor()
node.main() node.main()
except KeyboardInterrupt: except KeyboardInterrupt:
rospy.loginfo('interrupt received, so shutting down') rospy.loginfo('interrupt received, so shutting down')

Loading…
Cancel
Save