The aim of example 9 is to combine the ReSpeaker Microphone Array and Follow Joint Trajectory tutorials to voice teleoperate the mobile base of the Stretch robot.
Begin by running the following command in a new terminal.
# Terminal 1
roslaunch stretch_core stretch_driver.launch
Switch the mode to position mode using a rosservice call. Then run the respeaker.launch
file.
# Terminal 2
rosservice call /switch_to_position_mode
roslaunch stretch_core respeaker.launch
Then run the voice_teleoperation_base.py node in a new terminal.
# Terminal 3
cd catkin_ws/src/stretch_tutorials/src/
python voice_teleoperation_base.py
In terminal 3, a menu of voice commands is printed. You can reference this menu layout below.
------------ VOICE TELEOP MENU ------------
VOICE COMMANDS
"forward": BASE FORWARD
"back" : BASE BACK
"left" : BASE ROTATE LEFT
"right" : BASE ROTATE RIGHT
"stretch": BASE ROTATES TOWARDS SOUND
STEP SIZE
"big" : BIG
"medium" : MEDIUM
"small" : SMALL
"quit" : QUIT AND CLOSE NODE
-------------------------------------------
To stop the node from sending twist messages, type Ctrl + c or say "quit".
#!/usr/bin/env python
import math
import rospy
import sys
from sensor_msgs.msg import JointState
from std_msgs.msg import Int32
from control_msgs.msg import FollowJointTrajectoryGoal
from trajectory_msgs.msg import JointTrajectoryPoint
import hello_helpers.hello_misc as hm
from speech_recognition_msgs.msg import SpeechRecognitionCandidates
class GetVoiceCommands:
"""
A class that subscribes to the speech to text recognition messages, prints
a voice command menu, and defines step size for translational and rotational
mobile base motion.
"""
def __init__(self):
"""
A function that initializes subscribers and defines the three different
step sizes.
:param self: The self reference.
"""
self.step_size = 'medium'
self.rad_per_deg = math.pi/180.0
self.small_deg = 5.0
self.small_rad = self.rad_per_deg * self.small_deg
self.small_translate = 0.025
self.medium_deg = 10.0
self.medium_rad = self.rad_per_deg * self.medium_deg
self.medium_translate = 0.05
self.big_deg = 20.0
self.big_rad = self.rad_per_deg * self.big_deg
self.big_translate = 0.1
self.voice_command = None
self.sound_direction = 0
self.speech_to_text_sub = rospy.Subscriber("/speech_to_text", SpeechRecognitionCandidates, self.callback_speech)
self.sound_direction_sub = rospy.Subscriber("/sound_direction", Int32, self.callback_direction)
def callback_direction(self, msg):
"""
A callback function that converts the incoming message, sound direction,
from degrees to radians.
:param self: The self reference.
:param msg: The Int32 message type that represents the sound direction.
"""
self.sound_direction = msg.data * -self.rad_per_deg
def callback_speech(self,msg):
"""
A callback function takes the incoming message, a list of the speech to
text, and joins all items in that iterable list into a single string.
:param self: The self reference.
:param msg: The SpeechRecognitionCandidates message type.
"""
self.voice_command = ' '.join(map(str,msg.transcript))
def get_inc(self):
"""
A function that sets the increment size for translational and rotational
base motion.
:param self:The self reference.
:returns inc: A dictionary type the contains the increment size.
"""
if self.step_size == 'small':
inc = {'rad': self.small_rad, 'translate': self.small_translate}
if self.step_size == 'medium':
inc = {'rad': self.medium_rad, 'translate': self.medium_translate}
if self.step_size == 'big':
inc = {'rad': self.big_rad, 'translate': self.big_translate}
return inc
def print_commands(self):
"""
A function that prints the voice teleoperation menu.
:param self: The self reference.
"""
print(' ')
print('------------ VOICE TELEOP MENU ------------')
print(' ')
print(' VOICE COMMANDS ')
print(' "forward": BASE FORWARD ')
print(' "back" : BASE BACK ')
print(' "left" : BASE ROTATE LEFT ')
print(' "right" : BASE ROTATE RIGHT ')
print(' "stretch": BASE ROTATES TOWARDS SOUND ')
print(' ')
print(' STEP SIZE ')
print(' "big" : BIG ')
print(' "medium" : MEDIUM ')
print(' "small" : SMALL ')
print(' ')
print(' ')
print(' "quit" : QUIT AND CLOSE NODE ')
print(' ')
print('-------------------------------------------')
def get_command(self):
"""
A function that defines the teleoperation command based on the voice command.
:param self: The self reference.
:returns command: A dictionary type that contains the type of base motion.
"""
command = None
if self.voice_command == 'forward':
command = {'joint': 'translate_mobile_base', 'inc': self.get_inc()['translate']}
if self.voice_command == 'back':
command = {'joint': 'translate_mobile_base', 'inc': -self.get_inc()['translate']}
if self.voice_command == 'left':
command = {'joint': 'rotate_mobile_base', 'inc': self.get_inc()['rad']}
if self.voice_command == 'right':
command = {'joint': 'rotate_mobile_base', 'inc': -self.get_inc()['rad']}
if self.voice_command == 'stretch':
command = {'joint': 'rotate_mobile_base', 'inc': self.sound_direction}
if (self.voice_command == "small") or (self.voice_command == "medium") or (self.voice_command == "big"):
self.step_size = self.voice_command
rospy.loginfo('Step size = {0}'.format(self.step_size))
if self.voice_command == 'quit':
rospy.signal_shutdown("done")
sys.exit(0)
self.voice_command = None
return command
class VoiceTeleopNode(hm.HelloNode):
"""
A class that inherits the HelloNode class from hm and sends joint trajectory
commands.
"""
def __init__(self):
"""
A function that declares object from the GetVoiceCommands class, instantiates
the HelloNode class, and set the publishing rate.
:param self: The self reference.
"""
hm.HelloNode.__init__(self)
self.rate = 10.0
self.joint_state = None
self.speech = GetVoiceCommands()
def joint_states_callback(self, msg):
"""
A callback function that stores Stretch's joint states.
:param self: The self reference.
:param msg: The JointState message type.
"""
self.joint_state = msg
def send_command(self, command):
"""
Function that makes an action call and sends base joint trajectory goals
:param self: The self reference.
:param command: A dictionary type.
"""
joint_state = self.joint_state
if (joint_state is not None) and (command is not None):
point = JointTrajectoryPoint()
point.time_from_start = rospy.Duration(0.0)
trajectory_goal = FollowJointTrajectoryGoal()
trajectory_goal.goal_time_tolerance = rospy.Time(1.0)
joint_name = command['joint']
trajectory_goal.trajectory.joint_names = [joint_name]
inc = command['inc']
rospy.loginfo('inc = {0}'.format(inc))
new_value = inc
point.positions = [new_value]
trajectory_goal.trajectory.points = [point]
trajectory_goal.trajectory.header.stamp = rospy.Time.now()
rospy.loginfo('joint_name = {0}, trajectory_goal = {1}'.format(joint_name, trajectory_goal))
self.trajectory_client.send_goal(trajectory_goal)
rospy.loginfo('Done sending command.')
self.speech.print_commands()
def main(self):
"""
The main function that instantiates the HelloNode class, initializes the subscriber,
and call other methods in both the VoiceTeleopNode and GetVoiceCommands classes.
:param self: The self reference.
"""
hm.HelloNode.main(self, 'voice_teleop', 'voice_teleop', wait_for_first_pointcloud=False)
rospy.Subscriber('/stretch/joint_states', JointState, self.joint_states_callback)
rate = rospy.Rate(self.rate)
self.speech.print_commands()
while not rospy.is_shutdown():
command = self.speech.get_command()
self.send_command(command)
rate.sleep()
if __name__ == '__main__':
try:
node = VoiceTeleopNode()
node.main()
except KeyboardInterrupt:
rospy.loginfo('interrupt received, so shutting down')
This code is similar to that of the multipoint_command and joint_state_printer node. Therefore, this example will highlight sections that are different from those tutorials. Now let's break the code down.
#!/usr/bin/env python
Every Python ROS Node will have this declaration at the top. The first line makes sure your script is executed as a Python script.
import math
import rospy
import sys
from sensor_msgs.msg import JointState
from std_msgs.msg import Int32
from control_msgs.msg import FollowJointTrajectoryGoal
from trajectory_msgs.msg import JointTrajectoryPoint
import hello_helpers.hello_misc as hm
from speech_recognition_msgs.msg import SpeechRecognitionCandidates
You need to import rospy if you are writing a ROS Node. Import the FollowJointTrajectoryGoal
from the control_msgs.msg
package to control the Stretch robot. Import JointTrajectoryPoint
from the trajectory_msgs
package to define robot trajectories. The hello_helpers
package consists of a module that provides various Python scripts used across stretch_ros. In this instance, we are importing the hello_misc
script. Import sensor_msgs.msg
so that we can subscribe to JointState messages.
class GetVoiceCommands:
Create a class that subscribes to the speech to text recognition messages, prints a voice command menu, and defines step size for translational and rotational mobile base motion.
self.step_size = 'medium'
self.rad_per_deg = math.pi/180.0
Set the default step size as medium and create a float value, self.rad_per_deg, to convert degrees to radians.
self.small_deg = 5.0
self.small_rad = self.rad_per_deg * self.small_deg
self.small_translate = 0.025
self.medium_deg = 10.0
self.medium_rad = self.rad_per_deg * self.medium_deg
self.medium_translate = 0.05
self.big_deg = 20.0
self.big_rad = self.rad_per_deg * self.big_deg
self.big_translate = 0.1
Define the three rotation and translation step sizes.
self.voice_command = None
self.sound_direction = 0
self.speech_to_text_sub = rospy.Subscriber("/speech_to_text", SpeechRecognitionCandidates, self.callback_speech)
self.sound_direction_sub = rospy.Subscriber("/sound_direction", Int32, self.callback_direction)
Initialize the voice command and sound direction to values that will not result in moving the base.
Set up two subscribers. The first one subscribes to the topic /speech_to_text, looking for SpeechRecognitionCandidates
messages. When a message comes in, ROS is going to pass it to the function callback_speech
automatically. The second subscribes to /sound_direction message and passes it to the callback_direction
function.
self.sound_direction = msg.data * -self.rad_per_deg
The callback_direction
function converts the sound_direction topic from degrees to radians.
if self.step_size == 'small':
inc = {'rad': self.small_rad, 'translate': self.small_translate}
if self.step_size == 'medium':
inc = {'rad': self.medium_rad, 'translate': self.medium_translate}
if self.step_size == 'big':
inc = {'rad': self.big_rad, 'translate': self.big_translate}
return inc
The callback_speech
stores the increment size for translational and rotational base motion to inc. The increment size is contingent on the self.step_size string value.
command = None
if self.voice_command == 'forward':
command = {'joint': 'translate_mobile_base', 'inc': self.get_inc()['translate']}
if self.voice_command == 'back':
command = {'joint': 'translate_mobile_base', 'inc': -self.get_inc()['translate']}
if self.voice_command == 'left':
command = {'joint': 'rotate_mobile_base', 'inc': self.get_inc()['rad']}
if self.voice_command == 'right':
command = {'joint': 'rotate_mobile_base', 'inc': -self.get_inc()['rad']}
if self.voice_command == 'stretch':
command = {'joint': 'rotate_mobile_base', 'inc': self.sound_direction}
In the get_command()
function, the command is initialized as None, or set as a dictionary where the joint and inc values are stored. The command message type is dependent on the self.voice_command string value.
if (self.voice_command == "small") or (self.voice_command == "medium") or (self.voice_command == "big"):
self.step_size = self.voice_command
rospy.loginfo('Step size = {0}'.format(self.step_size))
Based on the self.voice_command value, set the step size for the increments.
if self.voice_command == 'quit':
rospy.signal_shutdown("done")
sys.exit(0)
If the self.voice_command is equal to "quit", then initiate a clean shutdown of ROS and exit the Python interpreter.
class VoiceTeleopNode(hm.HelloNode):
"""
A class that inherits the HelloNode class from hm and sends joint trajectory
commands.
"""
def __init__(self):
"""
A function that declares object from the GetVoiceCommands class, instantiates
the HelloNode class, and set the publishing rate.
:param self: The self reference.
"""
hm.HelloNode.__init__(self)
self.rate = 10.0
self.joint_state = None
self.speech = GetVoiceCommands()
A class that inherits the HelloNode
class from hm
, declares object from the GetVoiceCommands
class, and sends joint trajectory commands.
def send_command(self, command):
"""
Function that makes an action call and sends base joint trajectory goals
:param self: The self reference.
:param command: A dictionary type.
"""
joint_state = self.joint_state
if (joint_state is not None) and (command is not None):
point = JointTrajectoryPoint()
point.time_from_start = rospy.Duration(0.0)
The send_command
function stores the joint state message and uses a conditional statement to send joint trajectory goals. Also, assign point as a JointTrajectoryPoint
message type.
trajectory_goal = FollowJointTrajectoryGoal()
trajectory_goal.goal_time_tolerance = rospy.Time(1.0)
Assign trajectory_goal as a FollowJointTrajectoryGoal
message type.
joint_name = command['joint']
trajectory_goal.trajectory.joint_names = [joint_name]
Extract the joint name from the command dictionary.
inc = command['inc']
rospy.loginfo('inc = {0}'.format(inc))
new_value = inc
Extract the increment type from the command dictionary.
point.positions = [new_value]
trajectory_goal.trajectory.points = [point]
Assign the new value position to the trajectory goal message type.
self.trajectory_client.send_goal(trajectory_goal)
rospy.loginfo('Done sending command.')
Make the action call and send goal of the new joint position.
self.speech.print_commands()
Reprint the voice command menu after the trajectory goal is sent.
def main(self):
"""
The main function that instantiates the HelloNode class, initializes the subscriber,
and call other methods in both the VoiceTeleopNode and GetVoiceCommands classes.
:param self: The self reference.
"""
hm.HelloNode.main(self, 'voice_teleop', 'voice_teleop', wait_for_first_pointcloud=False)
rospy.Subscriber('/stretch/joint_states', JointState, self.joint_states_callback)
rate = rospy.Rate(self.rate)
self.speech.print_commands()
The main function instantiates the HelloNode
class, initializes the subscriber, and call other methods in both the VoiceTeleopNode
and GetVoiceCommands
classes.
while not rospy.is_shutdown():
command = self.speech.get_command()
self.send_command(command)
rate.sleep()
Run a while loop to continuously check speech commands and send those commands to execute an action.
try:
node = VoiceTeleopNode()
node.main()
except KeyboardInterrupt:
rospy.loginfo('interrupt received, so shutting down')
Declare a VoiceTeleopNode
object. Then execute the main()
method.