@ -9,40 +9,38 @@ import stretch_body.robot as rb
from stretch_body.hello_utils import ThreadServiceExit
import tf2_ros
import tf_conversions
import pyquaternion
import rospy
import rclpy
from rclpy.duration import Duration
from rclpy.node import Node
from geometry_msgs.msg import Quaternion
from geometry_msgs.msg import Twist
from geometry_msgs.msg import TransformStamped
import actionlib
from control_msgs.msg import FollowJointTrajectoryAction
from control_msgs.msg import FollowJointTrajectoryResult
from std_srvs.srv import Trigger , TriggerResponse
from std_srvs.srv import SetBool , SetBoolResponse
from std_srvs.srv import Trigger
from std_srvs.srv import SetBool
from nav_msgs.msg import Odometry
from sensor_msgs.msg import JointState , Imu , MagneticField
from std_msgs.msg import Header
import hello_helpers.hello_misc as hm
from hello_helpers.gripper_conversion import GripperConversion
from joint_trajectory_server import JointTrajectoryAction
from . joint_trajectory_server import JointTrajectoryAction
GRIPPER_DEBUG = False
BACKLASH_DEBUG = False
class StretchBodyNode :
class StretchBodyNode ( Node ) :
def __init__ ( self ) :
super ( ) . __init__ ( ' stretch_driver ' )
self . use_robotis_head = True
self . use_robotis_end_of_arm = True
self . default_goal_timeout_s = 10.0
self . default_goal_timeout_duration = rospy . Duration ( self . default_goal_timeout_s )
self . default_goal_timeout_duration = Duration ( seconds = self . default_goal_timeout_s )
# Initialize calibration offsets
self . head_tilt_calibrated_offset_rad = 0.0
@ -75,11 +73,11 @@ class StretchBodyNode:
self . robot_mode_rwlock . acquire_read ( )
if self . robot_mode != ' navigation ' :
error_string = ' {0} action server must be in navigation mode to receive a twist on cmd_vel. Current mode = {1}. ' . format ( self . node_name , self . robot_mode )
rospy . loger r( error_string )
self . get_logger ( ) . erro r( error_string )
return
self . linear_velocity_mps = twist . linear . x
self . angular_velocity_radps = twist . angular . z
self . last_twist_time = rospy . get_time ( )
self . last_twist_time = self . node . get_clock ( ) . now ( )
self . robot_mode_rwlock . release_read ( )
def command_mobile_base_velocity_and_publish_state ( self ) :
@ -92,7 +90,7 @@ class StretchBodyNode:
# set new mobile base velocities, if appropriate
# check on thread safety for this with callback that sets velocity command values
if self . robot_mode == ' navigation ' :
time_since_last_twist = rospy . get_time ( ) - self . last_twist_time
time_since_last_twist = self . node . get_clock ( ) . now ( ) - self . last_twist_time
if time_since_last_twist < self . timeout :
self . robot . base . set_velocity ( self . linear_velocity_mps , self . angular_velocity_radps )
self . robot . push_command ( )
@ -107,12 +105,12 @@ class StretchBodyNode:
# In the future, consider using time stamps from the robot's
# motor control boards and other boards. These would need to
# be synchronized with the rospy clock.
# be synchronized with the ros clock.
#robot_time = robot_status['timestamp_pc']
#rospy.log info('robot_time =', robot_time)
#self.get_logger(). info('robot_time =', robot_time)
#current_time = rospy.Time.from_sec(robot_time)
current_time = rospy . Time . now ( )
current_time = self . get_clock ( ) . now ( )
# obtain odometry
# assign relevant base status to variables
@ -196,7 +194,7 @@ class StretchBodyNode:
head_tilt_vel = head_tilt_status [ ' vel ' ]
head_tilt_effort = head_tilt_status [ ' effort ' ]
q = tf_conversions . transformations . quaternion_from_euler ( 0 , 0 , theta )
q = pyquaternion . Quaternion ( axis = [ 0 , 0 , 1 ] , angle = theta )
if self . broadcast_odom_tf :
# publish odometry via TF
@ -207,10 +205,10 @@ class StretchBodyNode:
t . transform . translation . x = x
t . transform . translation . y = y
t . transform . translation . z = 0.0
t . transform . rotation . x = q [ 0 ]
t . transform . rotation . y = q [ 1 ]
t . transform . rotation . z = q [ 2 ]
t . transform . rotation . w = q [ 3 ]
t . transform . rotation . x = q . x
t . transform . rotation . y = q . y
t . transform . rotation . z = q . z
t . transform . rotation . w = q . w
self . tf_broadcaster . sendTransform ( t )
# publish odometry via the odom topic
@ -220,10 +218,10 @@ class StretchBodyNode:
odom . child_frame_id = self . base_frame_id
odom . pose . pose . position . x = x
odom . pose . pose . position . y = y
odom . pose . pose . orientation . x = q [ 0 ]
odom . pose . pose . orientation . y = q [ 1 ]
odom . pose . pose . orientation . z = q [ 2 ]
odom . pose . pose . orientation . w = q [ 3 ]
odom . pose . pose . orientation . x = q . x
odom . pose . pose . orientation . y = q . y
odom . pose . pose . orientation . z = q . z
odom . pose . pose . orientation . w = q . w
odom . twist . twist . linear . x = x_vel
odom . twist . twist . angular . z = theta_vel
self . odom_pub . publish ( odom )
@ -238,21 +236,21 @@ class StretchBodyNode:
joint_state . name = [ ' wrist_extension ' , ' joint_lift ' , ' joint_arm_l3 ' , ' joint_arm_l2 ' , ' joint_arm_l1 ' , ' joint_arm_l0 ' ]
# set positions of the telescoping joints
positions = [ pos_out / 4.0 for i in x range( 4 ) ]
positions = [ pos_out / 4.0 for i in range ( 4 ) ]
# set lift position
positions . insert ( 0 , pos_up )
# set wrist_extension position
positions . insert ( 0 , pos_out )
# set velocities of the telescoping joints
velocities = [ vel_out / 4.0 for i in x range( 4 ) ]
velocities = [ vel_out / 4.0 for i in range ( 4 ) ]
# set lift velocity
velocities . insert ( 0 , vel_up )
# set wrist_extension velocity
velocities . insert ( 0 , vel_out )
# set efforts of the telescoping joints
efforts = [ eff_out for i in x range( 4 ) ]
efforts = [ eff_out for i in range ( 4 ) ]
# set lift effort
efforts . insert ( 0 , eff_up )
# set wrist_extension effort
@ -356,7 +354,7 @@ class StretchBodyNode:
self . robot_mode_rwlock . acquire_write ( )
self . robot_mode = new_mode
code_to_run ( )
rospy . log info( ' {0}: Changed to mode = {1} ' . format ( self . node_name , self . robot_mode ) )
self . get_logger ( ) . info( ' {0}: Changed to mode = {1} ' . format ( self . node_name , self . robot_mode ) )
self . robot_mode_rwlock . release_write ( )
# TODO : add a freewheel mode or something comparable for the mobile base?
@ -427,29 +425,29 @@ class StretchBodyNode:
self . robot . end_of_arm . move_by ( ' stretch_gripper ' , 0.0 )
self . robot . push_command ( )
rospy . log info( ' Received stop_the_robot service call, so commanded all actuators to stop. ' )
return TriggerResponse (
self . get_logger ( ) . info( ' Received stop_the_robot service call, so commanded all actuators to stop. ' )
return Trigger . Result (
success = True ,
message = ' Stopped the robot. '
)
def navigation_mode_service_callback ( self , request ) :
self . turn_on_navigation_mode ( )
return TriggerResponse (
return Trigger . Result (
success = True ,
message = ' Now in navigation mode. '
)
def manipulation_mode_service_callback ( self , request ) :
self . turn_on_manipulation_mode ( )
return TriggerResponse (
return Trigger . Result (
success = True ,
message = ' Now in manipulation mode. '
)
def position_mode_service_callback ( self , request ) :
self . turn_on_position_mode ( )
return TriggerResponse (
return Trigger . Result (
success = True ,
message = ' Now in position mode. '
)
@ -475,98 +473,103 @@ class StretchBodyNode:
else :
self . robot . pimu . runstop_event_reset ( )
return SetBoolResponse (
return SetBool . Result (
success = True ,
message = ' is_runstopped: {0} ' . format ( request . data )
)
########### MAIN ##### #######
def main ( self ) :
########### ROS Setup #######
def ros_setup ( self ) :
self . node_name = self . get_name ( )
rospy . init_node ( ' stretch_driver ' )
self . node_name = rospy . get_name ( )
self . get_logger ( ) . info ( " For use with S T R E T C H (TM) RESEARCH EDITION from Hello Robot Inc. " )
rospy . log info( " For use with S T R E T C H (TM) RESEARCH EDITION from Hello Robot Inc. " )
self . get_logger ( ) . info( " {0} started " . format ( self . node_name ) )
rospy . loginfo ( " {0} started " . format ( self . node_name ) )
self . declare_parameter ( ' mode ' , ' position ' )
self . declare_parameter ( ' broadcast_odom_tf ' , False )
self . declare_parameter ( ' controller_calibration_file ' , ' not_set ' )
self . declare_parameter ( ' rate ' , 15.0 )
self . declare_parameter ( ' use_fake_mechaduinos ' , False )
self . declare_parameter ( ' fail_out_of_range_goal ' , True )
# self.set_parameters_callback(self.parameter_callback)
mode = rospy . get_param ( ' ~mode ' , " position " )
rospy . loginfo ( ' mode = ' + str ( mode ) )
mode = self . get_parameter ( ' mode ' ) . value
self . get_logger ( ) . info( ' mode = ' + str ( mode ) )
self . broadcast_odom_tf = rospy . get_param ( ' ~broadcast_odom_tf ' , False )
rospy . loginfo ( ' broadcast_odom_tf = ' + str ( self . broadcast_odom_tf ) )
self . broadcast_odom_tf = self . get_parameter ( ' broadcast_odom_tf ' ) . value
self . get_logger ( ) . info( ' broadcast_odom_tf = ' + str ( self . broadcast_odom_tf ) )
if self . broadcast_odom_tf :
self . tf_broadcaster = tf2_ros . TransformBroadcaster ( )
large_ang = 45.0 * np . pi / 180.0
filename = rospy . get_param ( ' ~ controller_calibration_file' )
rospy . log info( ' Loading controller calibration parameters for the head from YAML file named {0} ' . format ( filename ) )
filename = self . get_parameter ( ' controller_calibration_file ' ) . value
self . get_logger ( ) . info( ' Loading controller calibration parameters for the head from YAML file named {0} ' . format ( filename ) )
with open ( filename , ' r ' ) as fid :
controller_parameters = yaml . safe_load ( fid )
rospy . log info( ' controller parameters loaded = {0} ' . format ( controller_parameters ) )
self . get_logger ( ) . info( ' controller parameters loaded = {0} ' . format ( controller_parameters ) )
self . head_tilt_calibrated_offset_rad = controller_parameters [ ' tilt_angle_offset ' ]
ang = self . head_tilt_calibrated_offset_rad
if ( abs ( ang ) > large_ang ) :
rospy . log info( ' WARNING: self.head_tilt_calibrated_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
rospy . log info( ' self.head_tilt_calibrated_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_tilt_calibrated_offset_rad ) ) )
self . get_logger ( ) . info( ' WARNING: self.head_tilt_calibrated_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
self . get_logger ( ) . info( ' self.head_tilt_calibrated_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_tilt_calibrated_offset_rad ) ) )
self . head_pan_calibrated_offset_rad = controller_parameters [ ' pan_angle_offset ' ]
ang = self . head_pan_calibrated_offset_rad
if ( abs ( ang ) > large_ang ) :
rospy . log info( ' WARNING: self.head_pan_calibrated_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
rospy . log info( ' self.head_pan_calibrated_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_pan_calibrated_offset_rad ) ) )
self . get_logger ( ) . info( ' WARNING: self.head_pan_calibrated_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
self . get_logger ( ) . info( ' self.head_pan_calibrated_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_pan_calibrated_offset_rad ) ) )
self . head_pan_calibrated_looked_left_offset_rad = controller_parameters [ ' pan_looked_left_offset ' ]
ang = self . head_pan_calibrated_looked_left_offset_rad
if ( abs ( ang ) > large_ang ) :
rospy . log info( ' WARNING: self.head_pan_calibrated_looked_left_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
rospy . log info( ' self.head_pan_calibrated_looked_left_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_pan_calibrated_looked_left_offset_rad ) ) )
self . get_logger ( ) . info( ' WARNING: self.head_pan_calibrated_looked_left_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
self . get_logger ( ) . info( ' self.head_pan_calibrated_looked_left_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_pan_calibrated_looked_left_offset_rad ) ) )
self . head_tilt_backlash_transition_angle_rad = controller_parameters [ ' tilt_angle_backlash_transition ' ]
rospy . log info( ' self.head_tilt_backlash_transition_angle_rad in degrees = {0} ' . format ( np . degrees ( self . head_tilt_backlash_transition_angle_rad ) ) )
self . get_logger ( ) . info( ' self.head_tilt_backlash_transition_angle_rad in degrees = {0} ' . format ( np . degrees ( self . head_tilt_backlash_transition_angle_rad ) ) )
self . head_tilt_calibrated_looking_up_offset_rad = controller_parameters [ ' tilt_looking_up_offset ' ]
ang = self . head_tilt_calibrated_looking_up_offset_rad
if ( abs ( ang ) > large_ang ) :
rospy . log info( ' WARNING: self.head_tilt_calibrated_looking_up_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
rospy . log info( ' self.head_tilt_calibrated_looking_up_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_tilt_calibrated_looking_up_offset_rad ) ) )
self . get_logger ( ) . info( ' WARNING: self.head_tilt_calibrated_looking_up_offset_rad HAS AN UNUSUALLY LARGE MAGNITUDE ' )
self . get_logger ( ) . info( ' self.head_tilt_calibrated_looking_up_offset_rad in degrees = {0} ' . format ( np . degrees ( self . head_tilt_calibrated_looking_up_offset_rad ) ) )
self . wrist_extension_calibrated_retracted_offset_m = controller_parameters [ ' arm_retracted_offset ' ]
m = self . wrist_extension_calibrated_retracted_offset_m
if ( abs ( m ) > 0.05 ) :
rospy . log info( ' WARNING: self.wrist_extension_calibrated_retracted_offset_m HAS AN UNUSUALLY LARGE MAGNITUDE ' )
rospy . log info( ' self.wrist_extension_calibrated_retracted_offset_m in meters = {0} ' . format ( self . wrist_extension_calibrated_retracted_offset_m ) )
self . get_logger ( ) . info( ' WARNING: self.wrist_extension_calibrated_retracted_offset_m HAS AN UNUSUALLY LARGE MAGNITUDE ' )
self . get_logger ( ) . info( ' self.wrist_extension_calibrated_retracted_offset_m in meters = {0} ' . format ( self . wrist_extension_calibrated_retracted_offset_m ) )
self . linear_velocity_mps = 0.0 # m/s ROS SI standard for cmd_vel (REP 103)
self . angular_velocity_radps = 0.0 # rad/s ROS SI standard for cmd_vel (REP 103)
self . max_arm_height = 1.1
self . odom_pub = rospy . Publisher ( ' odom ' , Odometry , queue_size = 1 )
self . odom_pub = self . node . create_publisher ( Odometry , ' odom ' )
self . imu_mobile_base_pub = rospy . Publisher ( ' imu_mobile_base ' , Imu , queue_size = 1 )
self . magnetometer_mobile_base_pub = rospy . Publisher ( ' magnetometer_mobile_base ' , MagneticField , queue_size = 1 )
self . imu_wrist_pub = rospy . Publisher ( ' imu_wrist ' , Imu , queue_size = 1 )
self . imu_mobile_base_pub = self . node . create_publisher ( Imu , ' imu_mobile_base ' )
self . magnetometer_mobile_base_pub = self . node . create_publisher ( MagneticField , ' magnetometer_mobile_base ' )
self . imu_wrist_pub = self . node . create_publisher ( Imu , ' imu_wrist ' )
rospy . Subscriber ( " cmd_vel " , Twist , self . set_mobile_base_velocity_callback )
self . node . create_subscription ( Twist , " cmd_vel " , self . set_mobile_base_velocity_callback )
# ~ symbol gets parameter from private namespace
self . joint_state_rate = rospy . get_param ( ' ~ rate' , 15.0 )
self . timeout = rospy . get_param ( ' ~ timeout' , 1.0 )
rospy . log info( " {0} rate = {1} Hz " . format ( self . node_name , self . joint_state_rate ) )
rospy . log info( " {0} timeout = {1} s " . format ( self . node_name , self . timeout ) )
self . joint_state_rate = self . get_parameter ( ' rate ' ) . value
self . timeout = self . get_parameter ( ' timeout ' ) . value
self . get_logger ( ) . info( " {0} rate = {1} Hz " . format ( self . node_name , self . joint_state_rate ) )
self . get_logger ( ) . info( " {0} timeout = {1} s " . format ( self . node_name , self . timeout ) )
self . use_fake_mechaduinos = rospy . get_param ( ' ~ use_fake_mechaduinos' , False )
rospy . log info( " {0} use_fake_mechaduinos = {1} " . format ( rospy . get_name ( ) , self . use_fake_mechaduinos ) )
self . use_fake_mechaduinos = self . get_parameter ( ' use_fake_mechaduinos ' ) . value
self . get_logger ( ) . info( " {0} use_fake_mechaduinos = {1} " . format ( self . node_name , self . use_fake_mechaduinos ) )
self . base_frame_id = ' base_link '
rospy . log info( " {0} base_frame_id = {1} " . format ( self . node_name , self . base_frame_id ) )
self . get_logger ( ) . info( " {0} base_frame_id = {1} " . format ( self . node_name , self . base_frame_id ) )
self . odom_frame_id = ' odom '
rospy . log info( " {0} odom_frame_id = {1} " . format ( self . node_name , self . odom_frame_id ) )
self . get_logger ( ) . info( " {0} odom_frame_id = {1} " . format ( self . node_name , self . odom_frame_id ) )
self . robot = rb . Robot ( )
self . robot . startup ( )
@ -574,13 +577,13 @@ class StretchBodyNode:
# TODO: check with the actuators to see if calibration is required
#self.calibrate()
self . joint_state_pub = rospy . Publisher ( ' joint_states ' , JointState , queue_size = 1 )
self . joint_state_pub = self . node . create_publisher ( JointState , ' joint_states ' )
command_base_velocity_and_publish_joint_state_rate = rospy . R ate( self . joint_state_rate )
self . last_twist_time = rospy . get_time ( )
self . command_base_velocity_and_publish_joint_state_rate = self . create_r ate( self . joint_state_rate )
self . last_twist_time = self . node . get_clock ( ) . now ( )
# start action server for joint trajectories
self . fail_out_of_range_goal = rospy . get_param ( ' ~ fail_out_of_range_goal' , True )
self . fail_out_of_range_goal = self . get_parameter ( ' fail_out_of_range_goal ' ) . value
self . joint_trajectory_action = JointTrajectoryAction ( self )
self . joint_trajectory_action . server . start ( )
@ -591,41 +594,50 @@ class StretchBodyNode:
elif mode == " manipulation " :
self . turn_on_manipulation_mode ( )
self . switch_to_manipulation_mode_service = rospy . Service ( ' /switch_to_manipulation_mode ' ,
Trigger ,
self . manipulation_mode_service_callback )
self . switch_to_manipulation_mode_service = self . create_service ( Trigger ,
' /switch_to_manipulation_mode ' ,
self . manipulation_mode_service_callback )
self . switch_to_navigation_mode_service = rospy . Service ( ' /switch_to_navigation_mode ' ,
Trigger ,
self . navigation_mode_service_callback )
self . switch_to_navigation_mode_service = self . create_service ( Trigger ,
' /switch_to_navigation_mode ' ,
self . navigation_mode_service_callback )
self . switch_to_position_mode_service = rospy . Service ( ' /switch_to_position_mode ' ,
Trigger ,
self . position_mode_service_callback )
self . switch_to_position_mode_service = self . create_service ( Trigger ,
' /switch_to_position_mode ' ,
self . position_mode_service_callback )
self . stop_the_robot_service = rospy . Service ( ' /stop_the_robot ' ,
Trigger ,
self . stop_the_robot_callback )
self . stop_the_robot_service = self . create_service ( Trigger ,
' /stop_the_robot ' ,
self . stop_the_robot_callback )
self . runstop_service = rospy . Service ( ' /runstop ' ,
SetBool ,
self . runstop_service_callback )
self . runstop_service = self . create_service ( SetBool ,
' /runstop ' ,
self . runstop_service_callback )
def parameter_callback ( self , parameters ) :
self . get_logger ( ) . warn ( ' Dynamic parameters not available yet ' )
########### MAIN ############
def main ( self ) :
self . ros_setup ( )
try :
# start loop to command the mobile base velocity, publish
# odometry, and publish joint states
while not rospy . is_shutdown ( ) :
while rclpy . ok ( ) :
self . command_mobile_base_velocity_and_publish_state ( )
command_base_velocity_and_publish_joint_state_rate . sleep ( )
except ( rospy . ROSInterruptException , ThreadServiceExit ) :
self . command_base_velocity_and_publish_joint_state_rate . sleep ( )
except ( KeyboardInterrupt , ThreadServiceExit ) :
self . robot . stop ( )
rospy . signal_shutdown ( " stretch_driver shutdown " )
def main ( ) :
rclpy . init ( )
node = StretchBodyNode ( )
node . main ( )
node . destroy_node ( )
rclpy . shutdown ( )
if __name__ == ' __main__ ' :
main ( )
main ( )