|
|
@ -383,184 +383,165 @@ class LiftCommandGroup(SimpleCommandGroup): |
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
class MobileBaseCommandGroup: |
|
|
|
class MobileBaseCommandGroup(SimpleCommandGroup): |
|
|
|
def __init__(self): |
|
|
|
self.mobile_base_virtual_joint_ros_range = [-0.5, 0.5] |
|
|
|
SimpleCommandGroup.__init__(self, 'joint_mobile_base_translation', (-0.5, 0.5), acceptable_joint_error=0.005) |
|
|
|
self.incrementing_joint_names = ['translate_mobile_base', 'rotate_mobile_base'] |
|
|
|
self.active_translate_mobile_base = False |
|
|
|
self.active_rotate_mobile_base = False |
|
|
|
self.acceptable_mobile_base_error_m = 0.005 |
|
|
|
self.excellent_mobile_base_error_m = 0.005 |
|
|
|
self.acceptable_mobile_base_error_rad = (np.pi/180.0) * 6.0 |
|
|
|
self.excellent_mobile_base_error_rad = (np.pi/180.0) * 0.6 |
|
|
|
self.min_m_per_s = 0.002 |
|
|
|
self.min_rad_per_s = np.radians(1.0) |
|
|
|
|
|
|
|
def update(self, joint_names, invalid_joints_callback, **kwargs): |
|
|
|
def get_num_valid_commands(self): |
|
|
|
if self.active: |
|
|
|
num_inc = self.active_translate_mobile_base + self.active_rotate_mobile_base |
|
|
|
return num_inc if num_inc > 0 else 1 |
|
|
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
def update(self, commanded_joint_names, invalid_joints_callback, **kwargs): |
|
|
|
robot_mode = kwargs['robot_mode'] |
|
|
|
self.use_mobile_base_inc_rot = ('rotate_mobile_base' in joint_names) |
|
|
|
self.use_mobile_base_inc_trans = ('translate_mobile_base' in joint_names) |
|
|
|
self.use_mobile_base_virtual_joint = ('joint_mobile_base_translation' in joint_names) |
|
|
|
|
|
|
|
if (self.use_mobile_base_inc_trans or self.use_mobile_base_inc_rot) \ |
|
|
|
and self.use_mobile_base_virtual_joint: |
|
|
|
# Commands that attempt to control the mobile base's |
|
|
|
# virtual joint together with mobile base incremental |
|
|
|
# commands are invalid. |
|
|
|
command_string = '' |
|
|
|
if self.use_mobile_base_inc_rot: |
|
|
|
command_string += ' rotate_mobile_base ' |
|
|
|
if self.use_mobile_base_inc_trans: |
|
|
|
command_string += ' translate_mobile_base ' |
|
|
|
err_str = 'Received a command for the mobile base virtual joint (joint_mobile_base_translation) \ |
|
|
|
and mobile base incremental motions ({0}). These are mutually exclusive options. \ |
|
|
|
The joint names in the received command = {1}'.format(command_string, joint_names) |
|
|
|
invalid_joints_callback(err_str) |
|
|
|
return False |
|
|
|
self.active = False |
|
|
|
self.active_translate_mobile_base = False |
|
|
|
self.active_rotate_mobile_base = False |
|
|
|
self.index = None |
|
|
|
self.index_translate_mobile_base = None |
|
|
|
self.index_rotate_mobile_base = None |
|
|
|
active_incrementing_joint_names = list(set(commanded_joint_names) & set(self.incrementing_joint_names)) |
|
|
|
|
|
|
|
if self.use_mobile_base_virtual_joint: |
|
|
|
if robot_mode != 'manipulation': |
|
|
|
if self.name in commanded_joint_names: |
|
|
|
if robot_mode == 'manipulation': |
|
|
|
if len(active_incrementing_joint_names) == 0: |
|
|
|
self.active = True |
|
|
|
self.index = commanded_joint_names.index(self.name) |
|
|
|
else: |
|
|
|
err_str = 'Received a command for the mobile base virtual joint ({0}}) and mobile base \ |
|
|
|
incremental motions ({1}). These are mutually exclusive options. \ |
|
|
|
The joint names in the received command = {2}'.format(self.name, |
|
|
|
active_incrementing_joint_names, commanded_joint_names) |
|
|
|
invalid_joints_callback(err_str) |
|
|
|
return False |
|
|
|
else: |
|
|
|
err_str = 'Must be in manipulation mode to receive a command for the \ |
|
|
|
joint_mobile_base_translation joint. Current mode = {0}.'.format(robot_mode) |
|
|
|
invalid_joints_callback(err_str) |
|
|
|
return False |
|
|
|
self.virtual_joint_mobile_base_index = joint_names.index('joint_mobile_base_translation') |
|
|
|
|
|
|
|
if self.use_mobile_base_inc_rot: |
|
|
|
if robot_mode != 'position': |
|
|
|
err_str = 'Must be in position mode to receive a rotate_mobile_base command. \ |
|
|
|
Current mode = {0}.'.format(robot_mode) |
|
|
|
{0} joint. Current mode = {1}.'.format(self.name, robot_mode) |
|
|
|
invalid_joints_callback(err_str) |
|
|
|
return False |
|
|
|
self.rotate_mobile_base_index = joint_names.index('rotate_mobile_base') |
|
|
|
|
|
|
|
if self.use_mobile_base_inc_trans: |
|
|
|
if robot_mode != 'position': |
|
|
|
err_str = 'Must be in position mode to receive a translate_mobile_base command. \ |
|
|
|
Current mode = {0}.'.format(robot_mode) |
|
|
|
elif len(active_incrementing_joint_names) != 0: |
|
|
|
if robot_mode == 'position': |
|
|
|
self.active = True |
|
|
|
if 'translate_mobile_base' in active_incrementing_joint_names: |
|
|
|
self.active_translate_mobile_base = True |
|
|
|
self.index_translate_mobile_base = commanded_joint_names.index('translate_mobile_base') |
|
|
|
if 'rotate_mobile_base' in active_incrementing_joint_names: |
|
|
|
self.active_rotate_mobile_base = True |
|
|
|
self.index_rotate_mobile_base = commanded_joint_names.index('rotate_mobile_base') |
|
|
|
else: |
|
|
|
err_str = 'Must be in position mode to receive a command for the {0} joint(s). \ |
|
|
|
Current mode = {1}.'.format(active_positioning_joint_names, robot_mode) |
|
|
|
invalid_joints_callback(err_str) |
|
|
|
return False |
|
|
|
self.translate_mobile_base_index = joint_names.index('translate_mobile_base') |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
def get_num_valid_commands(self): |
|
|
|
number_of_valid_joints = 0 |
|
|
|
if self.use_mobile_base_virtual_joint: |
|
|
|
number_of_valid_joints += 1 |
|
|
|
if self.use_mobile_base_inc_rot: |
|
|
|
number_of_valid_joints += 1 |
|
|
|
if self.use_mobile_base_inc_trans: |
|
|
|
number_of_valid_joints += 1 |
|
|
|
return number_of_valid_joints |
|
|
|
|
|
|
|
def set_goal(self, point, invalid_goal_callback, **kwargs): |
|
|
|
self.rotate_mobile_base_goal = False |
|
|
|
self.translate_mobile_base_goal = False |
|
|
|
self.virtual_joint_mobile_base_goal = False |
|
|
|
|
|
|
|
if self.use_mobile_base_virtual_joint: |
|
|
|
self.goal_mobile_base_virtual_joint_ros = point.positions[self.virtual_joint_mobile_base_index] |
|
|
|
self.goal_mobile_base_virtual_joint_mecha = self.ros_to_mechaduino(self.goal_mobile_base_virtual_joint_ros, kwargs['manipulation_origin']) |
|
|
|
self.virtual_joint_mobile_base_goal = True |
|
|
|
|
|
|
|
if self.use_mobile_base_inc_rot: |
|
|
|
self.goal_rotate_mobile_base_mecha = point.positions[self.rotate_mobile_base_index] |
|
|
|
if not np.isclose(self.goal_rotate_mobile_base_mecha, 0.0, rtol=1e-5, atol=1e-8, equal_nan=False): |
|
|
|
self.rotate_mobile_base_goal = True |
|
|
|
|
|
|
|
if self.use_mobile_base_inc_trans: |
|
|
|
self.goal_translate_mobile_base_mecha = point.positions[self.translate_mobile_base_index] |
|
|
|
if not np.isclose(self.goal_translate_mobile_base_mecha, 0.0, rtol=1e-5, atol=1e-8, equal_nan=False): |
|
|
|
self.translate_mobile_base_goal = True |
|
|
|
|
|
|
|
if self.rotate_mobile_base_goal and self.translate_mobile_base_goal: |
|
|
|
err_str = 'Received a goal point with simultaneous rotation and translation mobile base goals. \ |
|
|
|
This is not allowed. Only one is allowed to be sent for a given goal point. \ |
|
|
|
rotate_mobile_base = {0} and translate_mobile_base = \ |
|
|
|
{1}'.format(self.goal_rotate_mobile_base_mecha, self.goal_translate_mobile_base_mecha) |
|
|
|
invalid_goal_callback(err_str) |
|
|
|
return False |
|
|
|
self.goal = {"position": None} |
|
|
|
self.goal_translate_mobile_base = {"position": None} |
|
|
|
self.goal_rotate_mobile_base = {"position": None} |
|
|
|
if self.active: |
|
|
|
if self.active_translate_mobile_base or self.active_rotate_mobile_base: |
|
|
|
if self.active_translate_mobile_base and \ |
|
|
|
not np.isclose(point.positions[self.index_translate_mobile_base], 0.0, rtol=1e-5, atol=1e-8, equal_nan=False): |
|
|
|
self.goal_translate_mobile_base['position'] = point.positions[self.index_translate_mobile_base] |
|
|
|
|
|
|
|
if self.active_rotate_mobile_base and \ |
|
|
|
not np.isclose(point.positions[self.index_rotate_mobile_base], 0.0, rtol=1e-5, atol=1e-8, equal_nan=False): |
|
|
|
self.goal_rotate_mobile_base['position'] = point.positions[self.index_rotate_mobile_base] |
|
|
|
|
|
|
|
if (self.goal_translate_mobile_base['position'] is not None) and \ |
|
|
|
(self.goal_rotate_mobile_base['position'] is not None): |
|
|
|
err_str = 'Received a goal point with simultaneous translation and rotation mobile base goals. \ |
|
|
|
This is not allowed. Only one is allowed to be sent for a given goal point. \ |
|
|
|
translate_mobile_base = {0} and rotate_mobile_base = {1}'.format(self.goal_translate_mobile_base['position'], |
|
|
|
self.goal_rotate_mobile_base['position']) |
|
|
|
invalid_goal_callback(err_str) |
|
|
|
return False |
|
|
|
else: |
|
|
|
self.goal['position'] = self.ros_to_mechaduino(point.positions[self.index], kwargs['manipulation_origin']) |
|
|
|
if self.goal['position'] is None: |
|
|
|
err_str = 'Received {0} goal point that is out of bounds. \ |
|
|
|
Range = {1}, but goal point = {2}.'.format(self.name, self.range, self.goal['position']) |
|
|
|
invalid_goal_callback(err_str) |
|
|
|
return False |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
def bound_ros_command(self, bounds, ros_pos, clip_ros_tolerance=0.001): |
|
|
|
# Clip the command with clip_ros_tolerance, instead of |
|
|
|
# invalidating it, if it is close enough to the valid ranges. |
|
|
|
if ros_pos < bounds[0]: |
|
|
|
# Command is lower than the minimum value. |
|
|
|
if (bounds[0] - ros_pos) < clip_ros_tolerance: |
|
|
|
return bounds[0] |
|
|
|
else: |
|
|
|
return None |
|
|
|
if ros_pos > bounds[1]: |
|
|
|
# Command is greater than the maximum value. |
|
|
|
if (ros_pos - bounds[1]) < clip_ros_tolerance: |
|
|
|
return bounds[1] |
|
|
|
else: |
|
|
|
return None |
|
|
|
return ros_pos |
|
|
|
|
|
|
|
def ros_to_mechaduino(self, ros_ros, manipulation_origin): |
|
|
|
bounds = self.mobile_base_virtual_joint_ros_range |
|
|
|
ros_pos = self.bound_ros_command(bounds, ros_ros) |
|
|
|
if ros_pos is None: |
|
|
|
return None |
|
|
|
else: |
|
|
|
return (manipulation_origin['x'] + ros_pos) |
|
|
|
ros_pos = hm.bound_ros_command(self.range, ros_ros) |
|
|
|
return (manipulation_origin['x'] + ros_pos) if ros_pos is not None else None |
|
|
|
|
|
|
|
def init_execution(self, robot, robot_status, **kwargs): |
|
|
|
self.initial_mobile_base_translation_mecha_x = robot_status['base']['x'] |
|
|
|
self.initial_mobile_base_translation_mecha_y = robot_status['base']['y'] |
|
|
|
self.initial_mobile_base_rotation_mecha = robot_status['base']['theta'] |
|
|
|
b = robot_status['base'] |
|
|
|
self.startx = robot_status['base']['x'] |
|
|
|
self.starty = robot_status['base']['y'] |
|
|
|
self.starttheta = robot_status['base']['theta'] |
|
|
|
self.base_status = robot_status['base'] |
|
|
|
|
|
|
|
mobile_base_error_m, mobile_base_error_rad = self.update_execution(robot_status) |
|
|
|
if mobile_base_error_m is not None: |
|
|
|
robot.base.translate_by(mobile_base_error_m) |
|
|
|
if mobile_base_error_rad is not None: |
|
|
|
robot.base.rotate_by(mobile_base_error_rad) |
|
|
|
if self.active: |
|
|
|
if self.active_translate_mobile_base or self.active_rotate_mobile_base: |
|
|
|
mobile_base_error_m, mobile_base_error_rad = self.update_execution(robot_status) |
|
|
|
if mobile_base_error_m is not None: |
|
|
|
robot.base.translate_by(mobile_base_error_m) |
|
|
|
elif mobile_base_error_rad is not None: |
|
|
|
robot.base.rotate_by(mobile_base_error_rad) |
|
|
|
else: |
|
|
|
robot.base.translate_by(self.update_execution(robot_status)) |
|
|
|
|
|
|
|
def update_execution(self, robot_status, **kwargs): |
|
|
|
current_mobile_base_translation_mecha_x = robot_status['base']['x'] |
|
|
|
current_mobile_base_translation_mecha_y = robot_status['base']['y'] |
|
|
|
current_mobile_base_rotation_mecha = robot_status['base']['theta'] |
|
|
|
b = robot_status['base'] |
|
|
|
|
|
|
|
self.mobile_base_error_m = None |
|
|
|
self.mobile_base_error_rad = None |
|
|
|
self.mobile_base_goal_reached = True |
|
|
|
if self.virtual_joint_mobile_base_goal: |
|
|
|
self.mobile_base_error_m = self.goal_mobile_base_virtual_joint_mecha - current_mobile_base_translation_mecha_x |
|
|
|
self.mobile_base_goal_reached = (abs(self.mobile_base_error_m) < self.acceptable_mobile_base_error_m) |
|
|
|
elif self.translate_mobile_base_goal: |
|
|
|
# incremental motion relative to the initial position |
|
|
|
x0 = self.initial_mobile_base_translation_mecha_x |
|
|
|
y0 = self.initial_mobile_base_translation_mecha_y |
|
|
|
x1 = current_mobile_base_translation_mecha_x |
|
|
|
y1 = current_mobile_base_translation_mecha_y |
|
|
|
distance_traveled = np.sqrt(np.square(x1 - x0) + np.square(y1 - y0)) |
|
|
|
self.mobile_base_error_m = abs(self.goal_translate_mobile_base_mecha) - distance_traveled |
|
|
|
self.mobile_base_error_m = np.sign(self.goal_translate_mobile_base_mecha) * self.mobile_base_error_m |
|
|
|
self.mobile_base_goal_reached = (abs(self.mobile_base_error_m) < self.acceptable_mobile_base_error_m) |
|
|
|
if (abs(self.mobile_base_error_m) < self.excellent_mobile_base_error_m): |
|
|
|
self.mobile_base_goal_reached = True |
|
|
|
else: |
|
|
|
# Use velocity to help decide when the low-level command |
|
|
|
# has been finished. |
|
|
|
min_m_per_s = 0.002 |
|
|
|
b = robot_status['base'] |
|
|
|
speed = np.sqrt(np.square(b['x_vel']) + np.square(b['y_vel'])) |
|
|
|
self.mobile_base_goal_reached = self.mobile_base_goal_reached and (speed < min_m_per_s) |
|
|
|
|
|
|
|
elif self.rotate_mobile_base_goal: |
|
|
|
incremental_rad = hm.angle_diff_rad(current_mobile_base_rotation_mecha, self.initial_mobile_base_rotation_mecha) |
|
|
|
self.mobile_base_error_rad = hm.angle_diff_rad(self.goal_rotate_mobile_base_mecha, incremental_rad) |
|
|
|
self.mobile_base_goal_reached = (abs(self.mobile_base_error_rad) < self.acceptable_mobile_base_error_rad) |
|
|
|
if (abs(self.mobile_base_error_rad) < self.excellent_mobile_base_error_rad): |
|
|
|
self.mobile_base_goal_reached = True |
|
|
|
currx = robot_status['base']['x'] |
|
|
|
curry = robot_status['base']['y'] |
|
|
|
currtheta = robot_status['base']['theta'] |
|
|
|
self.base_status = robot_status['base'] |
|
|
|
|
|
|
|
self.error = None |
|
|
|
self.error_translate_mobile_base_m = None |
|
|
|
self.error_rotate_mobile_base_rad = None |
|
|
|
if self.active: |
|
|
|
if self.active_translate_mobile_base or self.active_rotate_mobile_base: |
|
|
|
if self.goal_translate_mobile_base['position'] is not None: |
|
|
|
dist = np.sqrt(np.square(currx - self.startx) + np.square(curry - self.starty)) |
|
|
|
self.error_translate_mobile_base_m = self.goal_translate_mobile_base['position'] - (dist * np.sign(self.goal_translate_mobile_base['position'])) |
|
|
|
return (self.error_translate_mobile_base_m, self.error_rotate_mobile_base_rad) |
|
|
|
elif self.goal_rotate_mobile_base['position'] is not None: |
|
|
|
rot = hm.angle_diff_rad(currtheta, self.starttheta) |
|
|
|
self.error_rotate_mobile_base_rad = hm.angle_diff_rad(self.goal_rotate_mobile_base['position'], rot) |
|
|
|
return (self.error_translate_mobile_base_m, self.error_rotate_mobile_base_rad) |
|
|
|
else: |
|
|
|
# Use velocity to help decide when the low-level command |
|
|
|
# has been finished. |
|
|
|
min_deg_per_s = 1.0 |
|
|
|
min_rad_per_s = min_deg_per_s * (np.pi/180.0) |
|
|
|
self.mobile_base_goal_reached = self.mobile_base_goal_reached and (abs(robot_status['base']['theta_vel']) < min_rad_per_s) |
|
|
|
return self.mobile_base_error_m, self.mobile_base_error_rad |
|
|
|
self.error = self.goal['position'] - currx |
|
|
|
return self.error |
|
|
|
|
|
|
|
return None |
|
|
|
|
|
|
|
def goal_reached(self): |
|
|
|
return self.mobile_base_goal_reached |
|
|
|
if self.active: |
|
|
|
if self.active_translate_mobile_base or self.active_rotate_mobile_base: |
|
|
|
if self.active_translate_mobile_base: |
|
|
|
reached = (abs(self.error_translate_mobile_base_m) < self.acceptable_mobile_base_error_m) |
|
|
|
if not (abs(self.error_translate_mobile_base_m) < self.excellent_mobile_base_error_m): |
|
|
|
# Use velocity to help decide when the low-level command has been finished |
|
|
|
speed = np.sqrt(np.square(self.base_status['x_vel']) + np.square(self.base_status['y_vel'])) |
|
|
|
reached = reached and (speed < self.min_m_per_s) |
|
|
|
return reached |
|
|
|
elif self.active_rotate_mobile_base: |
|
|
|
reached = (abs(self.error_rotate_mobile_base_rad) < self.acceptable_mobile_base_error_rad) |
|
|
|
if not (abs(self.error_rotate_mobile_base_rad) < self.excellent_mobile_base_error_rad): |
|
|
|
# Use velocity to help decide when the low-level command has been finished |
|
|
|
speed = self.base_status['theta_vel'] |
|
|
|
reached = reached and (abs(speed) < self.min_rad_per_s) |
|
|
|
return reached |
|
|
|
else: |
|
|
|
return (abs(self.error) < self.acceptable_joint_error) |
|
|
|
|
|
|
|
return True |