Browse Source

Move simple_command_group to hello_helpers package

feature/pluggable_end_effector
hello-binit 3 years ago
parent
commit
16c2fc8950
3 changed files with 176 additions and 175 deletions
  1. +175
    -0
      hello_helpers/src/hello_helpers/simple_command_group.py
  2. +1
    -174
      stretch_core/nodes/command_groups.py
  3. +0
    -1
      stretch_core/nodes/joint_trajectory_server.py

+ 175
- 0
hello_helpers/src/hello_helpers/simple_command_group.py View File

@ -0,0 +1,175 @@
import hello_helpers.hello_misc as hm
class SimpleCommandGroup:
def __init__(self, joint_name, joint_range, acceptable_joint_error=0.015):
"""Simple command group to extend
Attributes
----------
name: str
joint name
range: tuple(float)
acceptable joint bounds
active: bool
whether joint is active
index: int
index of joint's goal in point
goal: dict
components of the goal
error: float
the error between actual and desired
acceptable_joint_error: float
how close to zero the error must reach
"""
self.name = joint_name
self.range = joint_range
self.active = False
self.index = None
self.goal = {"position": None}
self.error = None
self.acceptable_joint_error = acceptable_joint_error
def get_num_valid_commands(self):
"""Returns number of active joints in the group
Returns
-------
int
the number of active joints within this group
"""
if self.active:
return 1
return 0
def update(self, commanded_joint_names, invalid_joints_callback, **kwargs):
"""Activates joints in the group
Checks commanded joints to activate the command
group and validates joints used correctly.
Parameters
----------
commanded_joint_names: list(str)
list of commanded joints in the trajectory
invalid_joints_callback: func
error callback for misuse of joints in trajectory
Returns
-------
bool
False if commanded joints invalid, else True
"""
self.active = False
self.index = None
if self.name in commanded_joint_names:
self.index = commanded_joint_names.index(self.name)
self.active = True
return True
def set_goal(self, point, invalid_goal_callback, fail_out_of_range_goal, **kwargs):
"""Sets goal for the joint group
Sets and validates the goal point for the joints
in this command group.
Parameters
----------
point: trajectory_msgs.JointTrajectoryPoint
the target point for all joints
invalid_goal_callback: func
error callback for invalid goal
fail_out_of_range_goal: bool
whether to bound out-of-range goals to range or fail
Returns
-------
bool
False if commanded goal invalid, else True
"""
self.goal = {"position": None, "velocity": None, "acceleration": None, "contact_threshold": None}
if self.active:
goal_pos = point.positions[self.index] if len(point.positions) > self.index else None
if goal_pos is None:
err_str = ("Received goal point with positions array length={0}. "
"This joint ({1})'s index is {2}. Length of array must cover all joints listed "
"in commanded_joint_names.").format(len(point.positions), self.name, self.index)
invalid_goal_callback(err_str)
return False
self.goal['position'] = hm.bound_ros_command(self.range, goal_pos, fail_out_of_range_goal)
self.goal['velocity'] = point.velocities[self.index] if len(point.velocities) > self.index else None
self.goal['acceleration'] = point.accelerations[self.index] if len(point.accelerations) > self.index else None
self.goal['contact_threshold'] = point.effort[self.index] if len(point.effort) > self.index else None
if self.goal['position'] is None:
err_str = ("Received {0} goal point that is out of bounds. "
"Range = {1}, but goal point = {2}.").format(self.name, self.range, goal_pos)
invalid_goal_callback(err_str)
return False
return True
def init_execution(self, robot, robot_status, **kwargs):
"""Starts execution of the point
Uses Stretch's Python API to begin moving to the
target point.
Parameters
----------
robot: stretch_body.robot.Robot
top-level interface to Python API
robot_status: dict
robot's current status
"""
raise NotImplementedError
def update_execution(self, robot_status, **kwargs):
"""Monitors progress of joint group
Checks against robot's status to track progress
towards the target point.
This method must set self.error.
Parameters
----------
robot_status: dict
robot's current status
Returns
-------
float/None
error value if group active, else None
"""
raise NotImplementedError
def joint_state(self, robot_status, **kwargs):
"""Returns joint state of the robot
Parameters
----------
robot_status: dict
robot's current status
Returns
-------
(float, float, float)
Current position, velocity, and effort
"""
raise NotImplementedError
def goal_reached(self):
"""Returns whether reached target point
Returns
-------
bool
if active, whether reached target point, else True
"""
if self.active:
return (abs(self.error) < self.acceptable_joint_error)
return True

+ 1
- 174
stretch_core/nodes/command_groups.py View File

@ -3,183 +3,10 @@ from __future__ import print_function
import numpy as np
import hello_helpers.hello_misc as hm
from hello_helpers.simple_command_group import SimpleCommandGroup
from hello_helpers.gripper_conversion import GripperConversion
class SimpleCommandGroup:
def __init__(self, joint_name, joint_range, acceptable_joint_error=0.015):
"""Simple command group to extend
Attributes
----------
name: str
joint name
range: tuple(float)
acceptable joint bounds
active: bool
whether joint is active
index: int
index of joint's goal in point
goal: dict
components of the goal
error: float
the error between actual and desired
acceptable_joint_error: float
how close to zero the error must reach
"""
self.name = joint_name
self.range = joint_range
self.active = False
self.index = None
self.goal = {"position": None}
self.error = None
self.acceptable_joint_error = acceptable_joint_error
def get_num_valid_commands(self):
"""Returns number of active joints in the group
Returns
-------
int
the number of active joints within this group
"""
if self.active:
return 1
return 0
def update(self, commanded_joint_names, invalid_joints_callback, **kwargs):
"""Activates joints in the group
Checks commanded joints to activate the command
group and validates joints used correctly.
Parameters
----------
commanded_joint_names: list(str)
list of commanded joints in the trajectory
invalid_joints_callback: func
error callback for misuse of joints in trajectory
Returns
-------
bool
False if commanded joints invalid, else True
"""
self.active = False
self.index = None
if self.name in commanded_joint_names:
self.index = commanded_joint_names.index(self.name)
self.active = True
return True
def set_goal(self, point, invalid_goal_callback, fail_out_of_range_goal, **kwargs):
"""Sets goal for the joint group
Sets and validates the goal point for the joints
in this command group.
Parameters
----------
point: trajectory_msgs.JointTrajectoryPoint
the target point for all joints
invalid_goal_callback: func
error callback for invalid goal
fail_out_of_range_goal: bool
whether to bound out-of-range goals to range or fail
Returns
-------
bool
False if commanded goal invalid, else True
"""
self.goal = {"position": None, "velocity": None, "acceleration": None, "contact_threshold": None}
if self.active:
goal_pos = point.positions[self.index] if len(point.positions) > self.index else None
if goal_pos is None:
err_str = ("Received goal point with positions array length={0}. "
"This joint ({1})'s index is {2}. Length of array must cover all joints listed "
"in commanded_joint_names.").format(len(point.positions), self.name, self.index)
invalid_goal_callback(err_str)
return False
self.goal['position'] = hm.bound_ros_command(self.range, goal_pos, fail_out_of_range_goal)
self.goal['velocity'] = point.velocities[self.index] if len(point.velocities) > self.index else None
self.goal['acceleration'] = point.accelerations[self.index] if len(point.accelerations) > self.index else None
self.goal['contact_threshold'] = point.effort[self.index] if len(point.effort) > self.index else None
if self.goal['position'] is None:
err_str = ("Received {0} goal point that is out of bounds. "
"Range = {1}, but goal point = {2}.").format(self.name, self.range, goal_pos)
invalid_goal_callback(err_str)
return False
return True
def init_execution(self, robot, robot_status, **kwargs):
"""Starts execution of the point
Uses Stretch's Python API to begin moving to the
target point.
Parameters
----------
robot: stretch_body.robot.Robot
top-level interface to Python API
robot_status: dict
robot's current status
"""
raise NotImplementedError
def update_execution(self, robot_status, **kwargs):
"""Monitors progress of joint group
Checks against robot's status to track progress
towards the target point.
This method must set self.error.
Parameters
----------
robot_status: dict
robot's current status
Returns
-------
float/None
error value if group active, else None
"""
raise NotImplementedError
def joint_state(self, robot_status, **kwargs):
"""Returns joint state of the robot
Parameters
----------
robot_status: dict
robot's current status
Returns
-------
(float, float, float)
Current position, velocity, and effort
"""
raise NotImplementedError
def goal_reached(self):
"""Returns whether reached target point
Returns
-------
bool
if active, whether reached target point, else True
"""
if self.active:
return (abs(self.error) < self.acceptable_joint_error)
return True
class HeadPanCommandGroup(SimpleCommandGroup):
def __init__(self, range_rad, head_pan_calibrated_offset_rad, head_pan_calibrated_looked_left_offset_rad, robot=None):
if range_rad is None and robot is not None and robot.head is not None:

+ 0
- 1
stretch_core/nodes/joint_trajectory_server.py View File

@ -66,7 +66,6 @@ class JointTrajectoryAction:
else:
module_name = self.node.robot.end_of_arm.params['devices'][joint].get('ros_py_module_name')
class_name = self.node.robot.end_of_arm.params['devices'][joint].get('ros_py_class_name')
print(class_name)
if module_name and class_name:
endofarm_device = getattr(importlib.import_module(module_name), class_name)(None, self.node.robot)
self.command_groups.append(endofarm_device)

Loading…
Cancel
Save