[noetic] ROS Diagnostics and RQT Dashboardfeature/testing_updates
@ -0,0 +1,35 @@ | |||
type: diagnostic_aggregator/AnalyzerGroup | |||
path: Stretch | |||
analyzers: | |||
hardware: | |||
path: Hardware | |||
type: diagnostic_aggregator/AnalyzerGroup | |||
analyzers: | |||
Lift: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: Lift'] | |||
path: Lift | |||
Arm: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: Arm'] | |||
path: Arm | |||
Base: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: Base'] | |||
path: Base | |||
EndOfArm: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: EndOfArm'] | |||
path: EndOfArm | |||
Head: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: Head'] | |||
path: Head | |||
Pimu: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: Pimu'] | |||
path: Pimu | |||
Wacc: | |||
type: diagnostic_aggregator/GenericAnalyzer | |||
find_and_remove_prefix: ['stretch_driver: Wacc'] | |||
path: Wacc |
@ -0,0 +1,127 @@ | |||
import diagnostic_msgs | |||
import diagnostic_updater | |||
import stretch_body.hello_utils as hello_utils # For get_fleet_id | |||
# Shortcut constants | |||
ERROR = diagnostic_msgs.msg.DiagnosticStatus.ERROR | |||
WARN = diagnostic_msgs.msg.DiagnosticStatus.WARN | |||
OK = diagnostic_msgs.msg.DiagnosticStatus.OK | |||
def add_keyvalues_to_status(status, d, skip_keys=None): | |||
"""Add all the key value pairs in dictionary d (except those keys in skip_keys) to the diagnostic status.""" | |||
for k, v in d.items(): | |||
if skip_keys and k in skip_keys: | |||
continue | |||
status.add(k, str(v)) | |||
def report_stats(status, d, skip_keys=None): | |||
"""Diagnostic update function that marks the summary as OK and just reports the stats from d.""" | |||
status.summary(OK, 'OK') | |||
add_keyvalues_to_status(status, d, skip_keys) | |||
return status | |||
def analyze_stepper(status, device): | |||
"""Diagnostic update function for device with stepper motor.""" | |||
if not device.motor.hw_valid: | |||
status.summary(ERROR, 'Hardware not present') | |||
elif not device.motor.status['pos_calibrated']: | |||
status.summary(WARN, 'Hardware not calibrated') | |||
elif device.motor.status['runstop_on']: | |||
status.summary(WARN, 'Runstop on') | |||
else: | |||
status.summary(OK, 'Hardware present and calibrated') | |||
add_keyvalues_to_status(status, device.status, ['motor']) | |||
return status | |||
def analyze_dynamixel(status, device, joint_name): | |||
"""Diagnostic update function for device with dynamixel motor.""" | |||
motor = device.motors[joint_name] | |||
if not motor.hw_valid: | |||
status.summary(ERROR, 'Hardware not present') | |||
elif not motor.is_calibrated: | |||
status.summary(WARN, 'Hardware not calibrated') | |||
else: | |||
status.summary(OK, 'Hardware present and calibrated') | |||
add_keyvalues_to_status(status, motor.status) | |||
return status | |||
def analyze_base(status, base): | |||
"""Diagnostic update function for the base.""" | |||
for wheel in [base.left_wheel, base.right_wheel]: | |||
if not wheel.hw_valid: | |||
status.summary(ERROR, 'Hardware not present') | |||
break | |||
else: | |||
status.summary(OK, 'Hardware present') | |||
add_keyvalues_to_status(status, base.status, ['left_wheel', 'right_wheel']) | |||
return status | |||
def check_range(status, device, key, lower_bound, upper_bound, out_of_bounds_status=ERROR): | |||
"""Diagnostic update function that checks if value(s) exceeds the specified bounds.""" | |||
value = device.status[key] | |||
if isinstance(value, list): | |||
values = value | |||
for i, value in enumerate(values): | |||
status.add(f'{key}[{i}]', str(value)) | |||
else: | |||
status.add(key, str(value)) | |||
values = [value] | |||
for value in values: | |||
if value < lower_bound or value > upper_bound: | |||
status.summary(out_of_bounds_status, f'{key}={value} | out of range [{lower_bound},{upper_bound}]') | |||
return status | |||
status.summary(OK, 'OK') | |||
return status | |||
class StretchDiagnostics: | |||
def __init__(self, node, robot): | |||
self.node = node | |||
self.robot = robot | |||
self.updater = diagnostic_updater.Updater(node) | |||
self.updater.setHardwareID(hello_utils.get_fleet_id()) | |||
# Configure the different update functions | |||
self.updater.add('Pimu/Device', | |||
lambda status: report_stats(status, self.robot.pimu.status, ['imu', 'transport'])) | |||
# TODO: Add IMU | |||
self.updater.add('Pimu/Voltage', | |||
lambda status: check_range(status, self.robot.pimu, 'voltage', | |||
self.robot.pimu.config['low_voltage_alert'], 14.5)) | |||
self.updater.add('Pimu/Current', | |||
lambda status: check_range(status, self.robot.pimu, 'current', | |||
0.5, self.robot.pimu.config['high_current_alert'])) | |||
self.updater.add('Pimu/Temperature', lambda status: check_range(status, self.robot.pimu, 'temp', 10, 40)) | |||
self.updater.add('Pimu/Cliff', | |||
lambda status: check_range(status, self.robot.pimu, 'cliff_range', | |||
self.robot.pimu.config['cliff_thresh'], 20)) | |||
self.updater.add('Lift/Device', lambda status: analyze_stepper(status, self.robot.lift)) | |||
self.updater.add('Lift/Motor', lambda status: report_stats(status, self.robot.lift.motor.status, ['transport'])) | |||
self.updater.add('Arm/Device', lambda status: analyze_stepper(status, self.robot.arm)) | |||
self.updater.add('Arm/Motor', lambda status: report_stats(status, self.robot.arm.motor.status, ['transport'])) | |||
self.updater.add('Base/Device', lambda status: analyze_base(status, self.robot.base)) | |||
self.updater.add('Base/LeftWheel', | |||
lambda status: report_stats(status, self.robot.base.left_wheel.status, ['transport'])) | |||
self.updater.add('Base/RightWheel', | |||
lambda status: report_stats(status, self.robot.base.right_wheel.status, ['transport'])) | |||
self.updater.add('Wacc/Device', lambda status: check_range(status, self.robot.wacc, 'ax', 8.0, 11.0)) | |||
for name, device in [('Head', self.robot.head), ('EndOfArm', self.robot.end_of_arm)]: | |||
for jname in device.joints: | |||
self.updater.add(f'{name}/{jname}', lambda status: analyze_dynamixel(status, device, jname)) |
@ -0,0 +1,14 @@ | |||
cmake_minimum_required(VERSION 3.0.2) | |||
project(stretch_dashboard) | |||
find_package(catkin REQUIRED) | |||
catkin_python_setup() | |||
catkin_package() | |||
install(FILES plugin.xml | |||
DESTINATION ${CATKIN_PACKAGE_SHARE_DESTINATION} | |||
) | |||
catkin_install_python(PROGRAMS scripts/dashboard | |||
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} | |||
) |
@ -0,0 +1,13 @@ | |||
![Stretch ROS banner](../images/banner.png) | |||
# Stretch Dashboard | |||
![screenshot](Screenshot.png) | |||
* Gear icon used for calibration | |||
* Next button used for changing modes | |||
* Battery shows voltage/current as a tooltip | |||
## To Run | |||
rosrun stretch_dashboard dashboard | |||
*Note*: The first time you run it you may need to add the `--force-discover` arg to the end of the above command. |
@ -0,0 +1,23 @@ | |||
<?xml version="1.0"?> | |||
<package format="2"> | |||
<name>stretch_dashboard</name> | |||
<version>0.2.0</version> | |||
<description> | |||
A Python GUI plugin for displaying a dashboard that displays and interacts with the Stretch robot. | |||
</description> | |||
<maintainer email="davidvlu@gmail.com">David V. Lu!!</maintainer> | |||
<license>Apache License 2.0</license> | |||
<buildtool_depend>catkin</buildtool_depend> | |||
<exec_depend>rospy</exec_depend> | |||
<exec_depend>rqt_gui</exec_depend> | |||
<exec_depend>rqt_robot_dashboard</exec_depend> | |||
<exec_depend>sensor_msgs</exec_depend> | |||
<exec_depend>std_msgs</exec_depend> | |||
<exec_depend>std_srvs</exec_depend> | |||
<export> | |||
<rqt_gui plugin="${prefix}/plugin.xml"/> | |||
</export> | |||
</package> |
@ -0,0 +1,20 @@ | |||
<library path="src"> | |||
<class name="StretchDashboard" type="stretch_dashboard.stretch_dashboard.StretchDashboard" | |||
base_class_type="rqt_gui_py::Plugin"> | |||
<description> | |||
A Python GUI plugin for displaying a dashboard that displays and interacts with the Stretch robot. | |||
</description> | |||
<qtgui> | |||
<group> | |||
<label>Robot</label> | |||
<icon type="theme">folder</icon> | |||
<statustip>Plugins related to specific robots.</statustip> | |||
</group> | |||
<label>Stretch Dashboard</label> | |||
<icon type="theme">emblem-important</icon> | |||
<statustip> | |||
A Python GUI plugin for displaying a dashboard that displays and interacts with the Stretch robot. | |||
</statustip> | |||
</qtgui> | |||
</class> | |||
</library> |
@ -0,0 +1,12 @@ | |||
#!/usr/bin/env python3 | |||
import sys | |||
from rqt_gui.main import Main | |||
def main(): | |||
sys.exit(Main().main(sys.argv, standalone='stretch_dashboard.stretch_dashboard.StretchDashboard')) | |||
if __name__ == '__main__': | |||
main() |
@ -0,0 +1,11 @@ | |||
#!/usr/bin/env python | |||
from distutils.core import setup | |||
from catkin_pkg.python_setup import generate_distutils_setup | |||
package_info = generate_distutils_setup( | |||
packages=['stretch_dashboard'], | |||
package_dir={'': 'src'} | |||
) | |||
setup(**package_info) |
@ -0,0 +1,90 @@ | |||
import math | |||
import rospy | |||
from rqt_robot_dashboard.dashboard import Dashboard | |||
from rqt_robot_dashboard.menu_dash_widget import MenuDashWidget | |||
from rqt_robot_dashboard.widgets import BatteryDashWidget, ConsoleDashWidget, MonitorDashWidget | |||
from sensor_msgs.msg import BatteryState | |||
from std_msgs.msg import Bool, String | |||
from std_srvs.srv import Trigger | |||
class CalibrateWidget(MenuDashWidget): | |||
def __init__(self): | |||
MenuDashWidget.__init__(self, 'Calibration', icons=[ | |||
['bg-grey.svg', 'ic-motors.svg'], # State 0: Unknown | |||
['bg-green.svg', 'ic-motors.svg'], # State 1: Calibrated | |||
['bg-yellow.svg', 'ic-motors.svg']]) # State 2: Not Calibrated | |||
self.update_state(0) | |||
self.setToolTip('Calibration') | |||
self.client = rospy.ServiceProxy('/calibrate_the_robot', Trigger) | |||
self.status_sub = rospy.Subscriber('is_calibrated', Bool, self.status_cb, queue_size=1) | |||
self.add_action('Calibrate!', lambda: self.client.call()) | |||
def status_cb(self, msg): | |||
if msg.data: | |||
self.update_state(1) | |||
else: | |||
self.update_state(2) | |||
class ModeWidget(MenuDashWidget): | |||
def __init__(self): | |||
# TODO: These could probably use custom icons | |||
MenuDashWidget.__init__(self, 'Robot Mode', icons=[ | |||
['bg-grey.svg'], # State 0: Unknown | |||
['bg-green.svg', 'ic-steering-wheel.svg'], # State 1: Navigation | |||
['bg-green.svg', 'ic-wrench.svg'], # State 2: Manipulation | |||
['bg-green.svg', 'ic-runstop-off.svg']]) # State 3: Position | |||
self.update_state(0) | |||
self.setToolTip('Unknown Mode') | |||
self.mode_map = { | |||
'navigation': 1, | |||
'manipulation': 2, | |||
'position': 3 | |||
} | |||
self.clients = {} | |||
for mode in self.mode_map: | |||
self.clients[mode] = rospy.ServiceProxy(f'/switch_to_{mode}_mode', Trigger) | |||
self.add_action(f'Switch to {mode} mode', | |||
lambda mode_arg=mode: self.clients[mode_arg].call()) | |||
self.status_sub = rospy.Subscriber('mode', String, self.status_cb, queue_size=1) | |||
def status_cb(self, msg): | |||
if msg.data in self.mode_map: | |||
self.update_state(self.mode_map[msg.data]) | |||
self.setToolTip(msg.data.title() + ' Mode') | |||
else: | |||
self.update_state(0) | |||
self.setToolTip('Unknown Mode') | |||
class StretchDashboard(Dashboard): | |||
def __init__(self, context): | |||
super(StretchDashboard, self).__init__(context) | |||
self.power_subscriber = rospy.Subscriber('/battery', BatteryState, self.battery_cb, 10) | |||
def get_widgets(self): | |||
self.monitor = MonitorDashWidget(self.context) | |||
self.console = ConsoleDashWidget(self.context) | |||
self.calibrate = CalibrateWidget() | |||
self.mode_w = ModeWidget() | |||
self.battery = BatteryDashWidget(self.context) | |||
return [[self.monitor, self.console, self.calibrate, self.mode_w], [self.battery]] | |||
def battery_cb(self, msg): | |||
# TODO: Incorporate logic if robot is plugged in | |||
if not math.isnan(msg.percentage): | |||
self.battery.update_perc(msg.percentage) | |||
self.battery.setToolTip('{:.2f} V\n{:.2f} A'.format(msg.voltage, msg.current)) |