diff --git a/stretch_core/nodes/command_groups.py b/stretch_core/nodes/command_groups.py index 202d581..8d44642 100644 --- a/stretch_core/nodes/command_groups.py +++ b/stretch_core/nodes/command_groups.py @@ -7,62 +7,162 @@ from hello_helpers.gripper_conversion import GripperConversion class SimpleCommandGroup: - def __init__(self, joint_name, acceptable_joint_error=0.015, clip_ros_tolerance=0.001): + def __init__(self, joint_name, joint_range, acceptable_joint_error=0.015, clip_ros_tolerance=0.001): + """Simple command group to extend + + Attributes + ---------- + name: str + joint name + range: tuple(float) + acceptable joint bounds + active: bool + whether joint is active + index: int + index of joint's goal in point + goal: dict + components of the goal + error: float + the error between actual and desired + acceptable_joint_error: float + how close to zero the error must reach + clip_ros_tolerance: float + the clip ros tolerance + """ + self.name = joint_name + self.range = joint_range + self.active = False + self.index = None + self.goal = {"position": None} + self.error = None self.clip_ros_tolerance = clip_ros_tolerance self.acceptable_joint_error = acceptable_joint_error - self.joint_name = joint_name - - def update(self, joint_names, invalid_joints_error_func, **kwargs): - self.use_joint = False - if self.joint_name in joint_names: - self.joint_index = joint_names.index(self.joint_name) - self.use_joint = True - return True def get_num_valid_commands(self): - if self.use_joint: + """Returns number of active joints in the group + + Returns + ------- + int + the number of active joints within this group + """ + if self.active: return 1 + return 0 - def set_goal(self, point, invalid_goal_error_func, **kwargs): - self.joint_goal = False - self.goal_joint_ros = None - self.goal_joint_hello = None - if self.use_joint: - self.goal_joint_ros = point.positions[self.joint_index] - self.goal_joint_hello = self.ros_to_mechaduino(self.goal_joint_ros) - self.joint_goal = True - if self.joint_goal and (self.goal_joint_hello is None): - error_string = 'received goal point that is out of bounds. The first error that was caught is that the {0} goal is invalid ({1} = {2}).'.format(self.joint_name, self.joint_name, self.goal_joint_ros) - invalid_goal_error_func(error_string) - return False + def update(self, commanded_joint_names, invalid_joints_callback, **kwargs): + """Activates joints in the group + + Checks commanded joints to activate the command + group and validates joints used correctly. + + Parameters + ---------- + commanded_joints_names: list(str) + list of commanded joints in the trajectory + invalid_joints_callback: func + error callback for misuse of joints in trajectory + + Returns + ------- + bool + False if commanded joints invalid, else True + """ + self.active = False + self.index = None + if self.name in commanded_joint_names: + self.index = commanded_joint_names.index(self.name) + self.active = True + return True - def ros_to_mechaduino(self, joint_ros): - return joint_ros + def set_goal(self, point, invalid_goal_callback, **kwargs): + """Sets goal for the joint group + + Sets and validates the goal point for the joints + in this command group. + + Parameters + ---------- + point: trajectory_msgs.JointTrajectoryPoint + the target point for all joints + invalid_goal_callback: func + error callback for invalid goal + + Returns + ------- + bool + False if commanded goal invalid, else True + """ + # TODO: validate commanded_joint_names and all arrays in JointTrajectoryPoint (positions/velocities/etc) are same size + self.goal = {"position": None} + if self.active: + self.goal['position'] = point.positions[self.index] + if self.goal['position'] is None: + err_str = 'Received goal point that is out of bounds. The first error that was caught is that the {0} goal is invalid ({1} = {2}).'.format(self.name, self.name, self.goal['position']) + invalid_goal_callback(err_str) + return False + + return True def init_execution(self, robot, robot_status, **kwargs): - pass + """Starts execution of the point + + Uses Stretch's Python API to begin moving to the + target point. + + Parameters + ---------- + robot: stretch_body.robot.Robot + top-level interface to Python API + robot_status: dict + robot's current status + """ + raise NotImplementedError def update_execution(self, robot_status, **kwargs): - # This method needs to be implemented. It also needs to set self.joint_error. - return None + """Monitors progress of joint group + + Checks against robot's status to track progress + towards the target point. + + This method must set self.error. + + Parameters + ---------- + robot_status: dict + robot's current status + + Returns + ------- + float/None + error value if group active, else None + """ + raise NotImplementedError def goal_reached(self): - if self.joint_goal: - return (abs(self.joint_error) < self.acceptable_joint_error) - else: - return True + """Returns whether reached target point + + Returns + ------- + bool + if active, whether reached target point, else True + """ + if self.active: + return (abs(self.error) < self.acceptable_joint_error) + + return True class HeadPanCommandGroup(SimpleCommandGroup): def __init__(self, head_pan_calibrated_offset, head_pan_calibrated_looked_left_offset): - SimpleCommandGroup.__init__(self, 'joint_head_pan', acceptable_joint_error=0.15, clip_ros_tolerance=0.001) + SimpleCommandGroup.__init__(self, 'joint_head_pan', (0, 0), acceptable_joint_error=0.15) self.head_pan_calibrated_offset = head_pan_calibrated_offset self.head_pan_calibrated_looked_left_offset = head_pan_calibrated_looked_left_offset def init_execution(self, robot, robot_status, **kwargs): - if self.joint_goal: + if self.active: pan_error = self.update_execution(robot_status, backlash_state=kwargs['backlash_state']) robot.head.move_by('head_pan', pan_error) if pan_error > 0.0: @@ -71,29 +171,32 @@ class HeadPanCommandGroup(SimpleCommandGroup): kwargs['backlash_state']['head_pan_looked_left'] = False def update_execution(self, robot_status, **kwargs): + self.error = None backlash_state = kwargs['backlash_state'] - if self.joint_goal: + if self.active: if backlash_state['head_pan_looked_left']: pan_backlash_correction = self.head_pan_calibrated_looked_left_offset else: pan_backlash_correction = 0.0 - self.current_joint_hello = robot_status['head']['head_pan']['pos'] + self.head_pan_calibrated_offset + pan_backlash_correction - self.joint_error = self.goal_joint_hello - self.current_joint_hello - self.joint_target = self.goal_joint_hello - return self.joint_error - else: - return None + pan_current = robot_status['head']['head_pan']['pos'] + \ + self.head_pan_calibrated_offset + pan_backlash_correction + self.error = self.goal['position'] - pan_current + return self.error + + return None class HeadTiltCommandGroup(SimpleCommandGroup): - def __init__(self, head_tilt_calibrated_offset, head_tilt_calibrated_looking_up_offset, head_tilt_backlash_transition_angle): - SimpleCommandGroup.__init__(self, 'joint_head_tilt', acceptable_joint_error=0.52, clip_ros_tolerance=0.001) + def __init__(self, head_tilt_calibrated_offset, + head_tilt_calibrated_looking_up_offset, + head_tilt_backlash_transition_angle): + SimpleCommandGroup.__init__(self, 'joint_head_tilt', (0, 0), acceptable_joint_error=0.52) self.head_tilt_calibrated_offset = head_tilt_calibrated_offset self.head_tilt_calibrated_looking_up_offset = head_tilt_calibrated_looking_up_offset self.head_tilt_backlash_transition_angle = head_tilt_backlash_transition_angle def init_execution(self, robot, robot_status, **kwargs): - if self.joint_goal: + if self.active: tilt_error = self.update_execution(robot_status, backlash_state=kwargs['backlash_state']) robot.head.move_by('head_tilt', tilt_error) if tilt_error > (self.head_tilt_backlash_transition_angle + self.head_tilt_calibrated_offset): @@ -102,36 +205,36 @@ class HeadTiltCommandGroup(SimpleCommandGroup): kwargs['backlash_state']['head_tilt_looking_up'] = False def update_execution(self, robot_status, **kwargs): + self.error = None backlash_state = kwargs['backlash_state'] - if self.joint_goal: + if self.active: if backlash_state['head_tilt_looking_up']: tilt_backlash_correction = self.head_tilt_calibrated_looking_up_offset else: tilt_backlash_correction = 0.0 - self.current_joint_hello = robot_status['head']['head_tilt']['pos'] + self.head_tilt_calibrated_offset + tilt_backlash_correction - self.joint_error = self.goal_joint_hello - self.current_joint_hello - self.joint_target = self.goal_joint_hello - return self.joint_error - else: - return None + tilt_current = robot_status['head']['head_tilt']['pos'] + \ + self.head_tilt_calibrated_offset + tilt_backlash_correction + self.error = self.goal['position'] - tilt_current + return self.error + + return None class WristYawCommandGroup(SimpleCommandGroup): def __init__(self): - SimpleCommandGroup.__init__(self, 'joint_wrist_yaw', acceptable_joint_error=0.015, clip_ros_tolerance=0.001) + SimpleCommandGroup.__init__(self, 'joint_wrist_yaw', (0, 0)) def init_execution(self, robot, robot_status, **kwargs): - if self.joint_goal: + if self.active: robot.end_of_arm.move_by('wrist_yaw', self.update_execution(robot_status)) def update_execution(self, robot_status, **kwargs): - if self.joint_goal: - self.current_joint_hello = robot_status['end_of_arm']['wrist_yaw']['pos'] - self.joint_error = self.goal_joint_hello - self.current_joint_hello - self.joint_target = self.goal_joint_hello - return self.joint_error - else: - return None + self.error = None + if self.active: + self.error = self.goal['position'] - robot_status['end_of_arm']['wrist_yaw']['pos'] + return self.error + + return None class GripperCommandGroup: