|
|
@ -2,168 +2,10 @@ |
|
|
|
|
|
|
|
import numpy as np |
|
|
|
import hello_helpers.hello_misc as hm |
|
|
|
from hello_helpers.simple_command_group import SimpleCommandGroup |
|
|
|
from hello_helpers.gripper_conversion import GripperConversion |
|
|
|
|
|
|
|
|
|
|
|
class SimpleCommandGroup: |
|
|
|
def __init__(self, joint_name, joint_range, acceptable_joint_error=0.015): |
|
|
|
"""Simple command group to extend |
|
|
|
|
|
|
|
Attributes |
|
|
|
---------- |
|
|
|
name: str |
|
|
|
joint name |
|
|
|
range: tuple(float) |
|
|
|
acceptable joint bounds |
|
|
|
active: bool |
|
|
|
whether joint is active |
|
|
|
index: int |
|
|
|
index of joint's goal in point |
|
|
|
goal: dict |
|
|
|
components of the goal |
|
|
|
error: float |
|
|
|
the error between actual and desired |
|
|
|
acceptable_joint_error: float |
|
|
|
how close to zero the error must reach |
|
|
|
""" |
|
|
|
self.name = joint_name |
|
|
|
self.range = joint_range |
|
|
|
self.active = False |
|
|
|
self.index = None |
|
|
|
self.goal = {"position": None} |
|
|
|
self.error = None |
|
|
|
self.acceptable_joint_error = acceptable_joint_error |
|
|
|
|
|
|
|
def get_num_valid_commands(self): |
|
|
|
"""Returns number of active joints in the group |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
int |
|
|
|
the number of active joints within this group |
|
|
|
""" |
|
|
|
if self.active: |
|
|
|
return 1 |
|
|
|
|
|
|
|
return 0 |
|
|
|
|
|
|
|
def update(self, commanded_joint_names, invalid_joints_callback, **kwargs): |
|
|
|
"""Activates joints in the group |
|
|
|
|
|
|
|
Checks commanded joints to activate the command |
|
|
|
group and validates joints used correctly. |
|
|
|
|
|
|
|
Parameters |
|
|
|
---------- |
|
|
|
commanded_joint_names: list(str) |
|
|
|
list of commanded joints in the trajectory |
|
|
|
invalid_joints_callback: func |
|
|
|
error callback for misuse of joints in trajectory |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
bool |
|
|
|
False if commanded joints invalid, else True |
|
|
|
""" |
|
|
|
self.active = False |
|
|
|
self.index = None |
|
|
|
if self.name in commanded_joint_names: |
|
|
|
self.index = commanded_joint_names.index(self.name) |
|
|
|
self.active = True |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
def set_goal(self, point, invalid_goal_callback, fail_out_of_range_goal, **kwargs): |
|
|
|
"""Sets goal for the joint group |
|
|
|
|
|
|
|
Sets and validates the goal point for the joints |
|
|
|
in this command group. |
|
|
|
|
|
|
|
Parameters |
|
|
|
---------- |
|
|
|
point: trajectory_msgs.JointTrajectoryPoint |
|
|
|
the target point for all joints |
|
|
|
invalid_goal_callback: func |
|
|
|
error callback for invalid goal |
|
|
|
fail_out_of_range_goal: bool |
|
|
|
whether to bound out-of-range goals to range or fail |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
bool |
|
|
|
False if commanded goal invalid, else True |
|
|
|
""" |
|
|
|
self.goal = {"position": None, "velocity": None, "acceleration": None, "contact_threshold": None} |
|
|
|
if self.active: |
|
|
|
goal_pos = point.positions[self.index] if len(point.positions) > self.index else None |
|
|
|
if goal_pos is None: |
|
|
|
err_str = ("Received goal point with positions array length={0}. " |
|
|
|
"This joint ({1})'s index is {2}. Length of array must cover all joints listed " |
|
|
|
"in commanded_joint_names.").format(len(point.positions), self.name, self.index) |
|
|
|
invalid_goal_callback(err_str) |
|
|
|
return False |
|
|
|
|
|
|
|
self.goal['position'] = hm.bound_ros_command(self.range, goal_pos, fail_out_of_range_goal) |
|
|
|
self.goal['velocity'] = point.velocities[self.index] if len(point.velocities) > self.index else None |
|
|
|
self.goal['acceleration'] = point.accelerations[self.index] if len(point.accelerations) > self.index else None |
|
|
|
self.goal['contact_threshold'] = point.effort[self.index] if len(point.effort) > self.index else None |
|
|
|
if self.goal['position'] is None: |
|
|
|
err_str = ("Received {0} goal point that is out of bounds. " |
|
|
|
"Range = {1}, but goal point = {2}.").format(self.name, self.range, goal_pos) |
|
|
|
invalid_goal_callback(err_str) |
|
|
|
return False |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
def init_execution(self, robot, robot_status, **kwargs): |
|
|
|
"""Starts execution of the point |
|
|
|
|
|
|
|
Uses Stretch's Python API to begin moving to the |
|
|
|
target point. |
|
|
|
|
|
|
|
Parameters |
|
|
|
---------- |
|
|
|
robot: stretch_body.robot.Robot |
|
|
|
top-level interface to Python API |
|
|
|
robot_status: dict |
|
|
|
robot's current status |
|
|
|
""" |
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
def update_execution(self, robot_status, **kwargs): |
|
|
|
"""Monitors progress of joint group |
|
|
|
|
|
|
|
Checks against robot's status to track progress |
|
|
|
towards the target point. |
|
|
|
|
|
|
|
This method must set self.error. |
|
|
|
|
|
|
|
Parameters |
|
|
|
---------- |
|
|
|
robot_status: dict |
|
|
|
robot's current status |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
float/None |
|
|
|
error value if group active, else None |
|
|
|
""" |
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
def goal_reached(self): |
|
|
|
"""Returns whether reached target point |
|
|
|
|
|
|
|
Returns |
|
|
|
------- |
|
|
|
bool |
|
|
|
if active, whether reached target point, else True |
|
|
|
""" |
|
|
|
if self.active: |
|
|
|
return (abs(self.error) < self.acceptable_joint_error) |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
class HeadPanCommandGroup(SimpleCommandGroup): |
|
|
|
def __init__(self, range_rad, head_pan_calibrated_offset, head_pan_calibrated_looked_left_offset): |
|
|
|
SimpleCommandGroup.__init__(self, 'joint_head_pan', range_rad, acceptable_joint_error=0.15) |
|
|
|