Browse Source

Add autodocking nodes

pull/95/head
hello-chintan 1 year ago
parent
commit
8312a5cb09
4 changed files with 686 additions and 0 deletions
  1. +205
    -0
      stretch_demos/nodes/aruco_head_scan_action.py
  2. +180
    -0
      stretch_demos/nodes/autodocking_behaviours.py
  3. +119
    -0
      stretch_demos/nodes/autodocking_bt.py
  4. +182
    -0
      stretch_demos/nodes/visual_servoing_action.py

+ 205
- 0
stretch_demos/nodes/aruco_head_scan_action.py View File

@ -0,0 +1,205 @@
#!/usr/bin/env python3
import rospy
from visualization_msgs.msg import MarkerArray
from geometry_msgs.msg import Transform, TransformStamped, Pose
import ros_numpy
import numpy as np
import tf2_ros
import time
import actionlib
from stretch_core.msg import ArucoHeadScanAction, ArucoHeadScanGoal, ArucoHeadScanFeedback, ArucoHeadScanResult
from trajectory_msgs.msg import JointTrajectoryPoint
from control_msgs.msg import FollowJointTrajectoryGoal
from tf.transformations import euler_from_quaternion, quaternion_from_euler
from sensor_msgs.msg import JointState
from sensor_msgs.msg import PointCloud2
import hello_helpers.hello_misc as hm
class ArucoHeadScan(hm.HelloNode):
def __init__(self):
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'aruco_head_scan', 'aruco_head_scan', wait_for_first_pointcloud=False)
self.server = actionlib.SimpleActionServer('ArucoHeadScan', ArucoHeadScanAction, self.execute_cb, False)
self.goal = ArucoHeadScanGoal()
self.feedback = ArucoHeadScanFeedback()
self.result = ArucoHeadScanResult()
self.aruco_marker_array = rospy.Subscriber('aruco/marker_array', MarkerArray, self.aruco_callback)
self.predock_pose_pub = rospy.Publisher('/predock_pose', Pose, queue_size=10)
self.dock_pose_pub = rospy.Publisher('/dock_pose', Pose, queue_size=10)
self.server.start()
self.aruco_id = 1000 # Placeholder value, can never be true
self.aruco_found = False
self.markers = MarkerArray().markers
def execute_cb(self, goal):
self.goal = goal
self.aruco_id = self.goal.aruco_id
self.tilt_angle = self.goal.tilt_angle
self.fill_in_blindspot_with_second_scan = self.goal.fill_in_blindspot_with_second_scan
self.fast_scan = self.goal.fast_scan
self.publish_to_map = self.goal.publish_to_map
self.scan_and_detect()
def scan_and_detect(self):
self.aruco_tf = None
self.predock_tf = None
pan_angle = -3.69
markers = []
scan_point = JointTrajectoryPoint()
scan_point.time_from_start = rospy.Duration(3.0)
scan_point.positions = [self.tilt_angle, pan_angle]
trajectory_goal = FollowJointTrajectoryGoal()
trajectory_goal.trajectory.joint_names = ['joint_head_tilt', 'joint_head_pan']
trajectory_goal.trajectory.points = [scan_point]
trajectory_goal.trajectory.header.stamp = rospy.Time.now()
trajectory_goal.trajectory.header.frame_id = 'base_link'
self.trajectory_client.send_goal(trajectory_goal)
self.trajectory_client.wait_for_result()
self.feedback.pan_angle = pan_angle
self.server.publish_feedback(self.feedback)
for pan_angle in np.arange(-3.69, 1.66, 0.7):
time.sleep(3.0)
for i in range(20):
if self.markers:
markers = self.markers
break
rospy.loginfo("Markers found: {}".format(markers))
if markers != []:
for marker in markers:
if marker.id == self.aruco_id:
self.aruco_found = True
self.aruco_name = marker.text
if self.publish_to_map:
trans = self.tf2_buffer.lookup_transform('map', self.aruco_name, rospy.Time())
self.aruco_tf = self.broadcast_tf(trans.transform, self.aruco_name, 'map')
rospy.loginfo("{} pose published to tf".format(self.aruco_name))
if self.aruco_name == 'docking_station':
# Transform the docking station frame such that x-axis points out of the aruco plane and 0.5 m in the front of the dock
# This facilitates passing the goal pose as this predock frame so that the robot can back up into the dock
saved_pose = Transform()
saved_pose.translation.x = 0.0
saved_pose.translation.y = -0.45
saved_pose.translation.z = 0.47
saved_pose.rotation.x = -0.382
saved_pose.rotation.y = -0.352
saved_pose.rotation.z = -0.604
saved_pose.rotation.w = 0.604
tran = self.broadcast_tf(saved_pose, 'predock_pose', 'docking_station')
self.tf2_broadcaster.sendTransform(tran)
try:
trans = self.tf2_buffer.lookup_transform('map', 'predock_pose', rospy.Time(), rospy.Duration(1.0))
# Bring predock_frame at base_link level
angles = euler_from_quaternion([trans.transform.rotation.x,
trans.transform.rotation.y,
trans.transform.rotation.z,
trans.transform.rotation.w])
q = quaternion_from_euler(0, 0, angles[2])
trans.transform.translation.z = 0
trans.transform.rotation.x = q[0]
trans.transform.rotation.y = q[1]
trans.transform.rotation.z = q[2]
trans.transform.rotation.w = q[3]
trans.header.stamp = rospy.Time.now()
self.predock_tf = trans
rospy.loginfo("Published predock pose")
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
rospy.loginfo("Could not publish pose to tf")
pass
if not self.aruco_found:
self.feedback.pan_angle = pan_angle
self.server.publish_feedback(self.feedback)
scan_point.time_from_start = rospy.Duration(3.0)
scan_point.positions = [self.tilt_angle, pan_angle]
trajectory_goal.trajectory.points = [scan_point]
trajectory_goal.trajectory.header.stamp = rospy.Time.now()
trajectory_goal.trajectory.header.frame_id = 'base_link'
self.trajectory_client.send_goal(trajectory_goal)
self.trajectory_client.wait_for_result()
else:
break
time.sleep(2.0)
self.result_cb(self.aruco_found, "after headscan")
def result_cb(self, aruco_found, str=None):
self.result.aruco_found = aruco_found
if aruco_found:
rospy.loginfo("Aruco marker found")
self.server.set_succeeded(self.result)
else:
rospy.loginfo("Could not find aruco marker {}".format(str))
self.server.set_aborted(self.result)
def broadcast_tf(self, trans, name, ref):
t = TransformStamped()
t.header.stamp = rospy.Time.now()
t.header.frame_id = ref
t.child_frame_id = name
t.transform = trans
return t
def aruco_callback(self, msg):
self.markers = msg.markers
def main(self):
self.rate = 5.0
rate = rospy.Rate(self.rate)
self.tf2_buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tf2_buffer)
self.tf2_broadcaster = tf2_ros.TransformBroadcaster()
dock_pose = Pose()
predock_pose = Pose()
while not rospy.is_shutdown():
try:
self.aruco_tf.header.stamp = rospy.Time.now()
self.predock_tf.header.stamp = rospy.Time.now()
self.tf2_broadcaster.sendTransform(self.aruco_tf)
if self.aruco_name == 'docking_station':
dock_pose.position.x = self.aruco_tf.transform.translation.x
dock_pose.position.y = self.aruco_tf.transform.translation.y
dock_pose.position.z = self.aruco_tf.transform.translation.z
dock_pose.orientation.x = self.aruco_tf.transform.rotation.x
dock_pose.orientation.y = self.aruco_tf.transform.rotation.y
dock_pose.orientation.z = self.aruco_tf.transform.rotation.z
dock_pose.orientation.w = self.aruco_tf.transform.rotation.w
self.dock_pose_pub.publish(dock_pose)
self.tf2_broadcaster.sendTransform(self.predock_tf)
predock_pose.position.x = self.predock_tf.transform.translation.x
predock_pose.position.y = self.predock_tf.transform.translation.y
predock_pose.position.z = self.predock_tf.transform.translation.z
predock_pose.orientation.x = self.predock_tf.transform.rotation.x
predock_pose.orientation.y = self.predock_tf.transform.rotation.y
predock_pose.orientation.z = self.predock_tf.transform.rotation.z
predock_pose.orientation.w = self.predock_tf.transform.rotation.w
self.predock_pose_pub.publish(predock_pose)
except AttributeError:
pass
rate.sleep()
def main():
try:
node = ArucoHeadScan()
node.main()
rospy.spin()
except KeyboardInterrupt:
print('interrupt received, so shutting down')
if __name__ == '__main__':
main()

+ 180
- 0
stretch_demos/nodes/autodocking_behaviours.py View File

@ -0,0 +1,180 @@
#!/usr/bin/env python3
import rospy
import actionlib
import py_trees
from actionlib_msgs.msg import GoalStatus
from math import sqrt, pow
from geometry_msgs.msg import Pose, TransformStamped
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from autodocking.msg import VisualServoAction, VisualServoGoal, VisualServoResult, VisualServoFeedback
import tf2_ros
class MoveBaseActionClient(py_trees.behaviour.Behaviour):
def __init__(self, tf2_buffer, name="ActionClient", override_feedback_message_on_running="moving"):
super(MoveBaseActionClient, self).__init__(name)
self.action_client = None
self.sent_goal = False
self.action_spec = MoveBaseAction
self.action_goal = MoveBaseGoal()
self.action_namespace = "move_base"
self.override_feedback_message_on_running = override_feedback_message_on_running
self.tf2_buffer = tf2_buffer
def setup(self, timeout):
self.logger.debug("%s.setup()" % self.__class__.__name__)
self.action_client = actionlib.SimpleActionClient(
self.action_namespace,
self.action_spec
)
if not self.action_client.wait_for_server(rospy.Duration(timeout)):
self.logger.error("{0}.setup() could not connect to the action server at '{1}'".format(self.__class__.__name__, self.action_namespace))
self.action_client = None
return False
return True
def initialise(self):
# Update the goal data here
self.logger.debug("{0}.initialise()".format(self.__class__.__name__))
transform = self.tf2_buffer.lookup_transform('map', 'predock_pose', rospy.Time(), rospy.Duration(2.0))
goal_pose = Pose()
goal_pose.position.x = transform.transform.translation.x
goal_pose.position.y = transform.transform.translation.y
goal_pose.position.z = transform.transform.translation.z
goal_pose.orientation.x = transform.transform.rotation.x
goal_pose.orientation.y = transform.transform.rotation.y
goal_pose.orientation.z = transform.transform.rotation.z
goal_pose.orientation.w = transform.transform.rotation.w
self.action_goal.target_pose.header.frame_id = "map"
self.action_goal.target_pose.header.stamp = rospy.Time.now()
self.action_goal.target_pose.pose = goal_pose
self.sent_goal = False
def update(self):
self.logger.debug("{0}.update()".format(self.__class__.__name__))
if not self.action_client:
self.feedback_message = "no action client, did you call setup() on your tree?"
return py_trees.Status.INVALID
if not self.sent_goal:
self.action_client.send_goal(self.action_goal)
self.sent_goal = True
self.feedback_message = "sent goal to the action server"
return py_trees.Status.RUNNING
self.feedback_message = self.action_client.get_goal_status_text()
if self.action_client.get_state() in [GoalStatus.ABORTED,
GoalStatus.PREEMPTED]:
return py_trees.Status.FAILURE
result = self.action_client.get_result()
if result:
return py_trees.Status.SUCCESS
else:
self.feedback_message = self.override_feedback_message_on_running
return py_trees.Status.RUNNING
def terminate(self, new_status):
self.logger.debug("%s.terminate()" % self.__class__.__name__)
class CheckTF(py_trees.behaviour.Behaviour):
def __init__(self, tf2_buffer, name="CheckTF", source_frame="base_link", target_frame=None):
super(CheckTF, self).__init__(name)
self.source_frame = source_frame
self.target_frame = target_frame
self.tf2_buffer = tf2_buffer
def setup(self, timeout):
self.logger.debug("%s.setup()" % self.__class__.__name__)
return True
def initialise(self):
# Update the goal data here
self.logger.debug("{0}.initialise()".format(self.__class__.__name__))
transform = TransformStamped()
try:
transform = self.tf2_buffer.lookup_transform(self.source_frame, self.target_frame, rospy.Time(), rospy.Duration(1.0))
x = transform.transform.translation.x
y = transform.transform.translation.y
z = transform.transform.translation.z
self.error = sqrt(pow(x, 2) + pow(y, 2) + pow(z, 2))
except tf2_ros.ExtrapolationException:
self.error = 10.0
def update(self):
"""
Check only to see whether the underlying action server has
succeeded, is running, or has cancelled/aborted for some reason and
map these to the usual behaviour return states.
"""
self.logger.debug("{0}.update()".format(self.__class__.__name__))
if self.error < 0.05:
return py_trees.Status.SUCCESS
else:
return py_trees.Status.FAILURE
def terminate(self, new_status):
self.logger.debug("%s.terminate()" % self.__class__.__name__)
class VisualServoing(py_trees.behaviour.Behaviour):
def __init__(self, name="VisualServoing", source_frame="base_link", target_frame=None, override_feedback_message_on_running="servoing"):
super(VisualServoing, self).__init__(name)
self.action_client = None
self.sent_goal = False
self.action_spec = VisualServoAction
self.action_goal = VisualServoGoal()
self.action_namespace = "autodock_visual_servo"
self.source_frame = source_frame
self.target_frame = target_frame
self.override_feedback_message_on_running = override_feedback_message_on_running
def setup(self, timeout):
self.logger.debug("%s.setup()" % self.__class__.__name__)
self.action_client = actionlib.SimpleActionClient(
self.action_namespace,
self.action_spec
)
if not self.action_client.wait_for_server(rospy.Duration(timeout)):
self.logger.error("{0}.setup() could not connect to the action server at '{1}'".format(self.__class__.__name__, self.action_namespace))
self.action_client = None
return False
return True
def initialise(self):
# Update the goal data here
self.logger.debug("{0}.initialise()".format(self.__class__.__name__))
self.action_goal = VisualServoGoal()
self.action_goal.source_frame = self.source_frame
self.action_goal.target_frame = self.target_frame
def update(self):
"""
Check only to see whether the underlying action server has
succeeded, is running, or has cancelled/aborted for some reason and
map these to the usual behaviour return states.
"""
self.logger.debug("{0}.update()".format(self.__class__.__name__))
if not self.action_client:
self.feedback_message = "no action client, did you call setup() on your tree?"
return py_trees.Status.INVALID
if not self.sent_goal:
self.action_client.send_goal(self.action_goal)
self.sent_goal = True
self.feedback_message = "sent goal to the action server"
return py_trees.Status.RUNNING
self.feedback_message = self.action_client.get_goal_status_text()
if self.action_client.get_state() in [GoalStatus.ABORTED,
GoalStatus.PREEMPTED]:
return py_trees.Status.FAILURE
result = self.action_client.get_result()
if result:
return py_trees.Status.SUCCESS
else:
self.feedback_message = self.override_feedback_message_on_running
return py_trees.Status.RUNNING
def terminate(self, new_status):
self.logger.debug("%s.terminate()" % self.__class__.__name__)

+ 119
- 0
stretch_demos/nodes/autodocking_bt.py View File

@ -0,0 +1,119 @@
#!/usr/bin/env python3
import py_trees
import py_trees_ros
import py_trees.console as console
import rospy
import sys
import functools
from autodocking.autodocking_behaviours import MoveBaseActionClient
from autodocking.autodocking_behaviours import CheckTF
from autodocking.autodocking_behaviours import VisualServoing
from stretch_core.msg import ArucoHeadScanAction, ArucoHeadScanGoal
from geometry_msgs.msg import Pose
from sensor_msgs.msg import BatteryState
import hello_helpers.hello_misc as hm
class AutodockingBT(hm.HelloNode):
def __init__(self):
hm.HelloNode.__init__(self)
def create_root(self):
# behaviours
autodocking_seq_root = py_trees.composites.Sequence("autodocking")
dock_found_fb = py_trees.composites.Selector("dock_found_fb")
at_predock_fb = py_trees.composites.Selector("at_predock_fb")
charging_fb = py_trees.composites.Selector("charging_fb")
predock_found_sub = py_trees_ros.subscribers.CheckData(
name="predock_found_sub?",
topic_name='/predock_pose',
expected_value=None,
topic_type=Pose,
fail_if_no_data=True,fail_if_bad_comparison=False)
is_charging_sub = py_trees_ros.subscribers.CheckData(
name="battery_charging?",
topic_name='/battery',
variable_name='present',
expected_value=True,
topic_type=BatteryState,
fail_if_no_data=True,fail_if_bad_comparison=True)
# at_predock_tf = CheckTF(self.tf2_buffer, name="at_predock_tf", target_frame='predock_pose')
aruco_goal = ArucoHeadScanGoal()
aruco_goal.aruco_id = 245
aruco_goal.tilt_angle = -0.68
aruco_goal.publish_to_map = True
aruco_goal.fill_in_blindspot_with_second_scan = False
aruco_goal.fast_scan = False
head_scan_action = py_trees_ros.actions.ActionClient( # Publishes predock pose to /predock_pose topic and tf frame called /predock_pose
name="ArucoHeadScan",
action_namespace="ArucoHeadScan",
action_spec=ArucoHeadScanAction,
action_goal=aruco_goal,
override_feedback_message_on_running="rotating"
)
predock_action = MoveBaseActionClient(
self.tf2_buffer,
name="predock_action",
override_feedback_message_on_running="moving"
)
invert_predock = py_trees.decorators.SuccessIsFailure(name='invert_predock', child=predock_action)
dock_action = VisualServoing(
name='dock_action',
source_frame='docking_station',
target_frame='charging_port',
override_feedback_message_on_running="docking"
)
# tree
autodocking_seq_root.add_children([dock_found_fb, at_predock_fb, dock_action, charging_fb])
dock_found_fb.add_children([predock_found_sub, head_scan_action])
at_predock_fb.add_children([predock_action])
charging_fb.add_children([is_charging_sub, invert_predock])
return autodocking_seq_root
def shutdown(self, behaviour_tree):
behaviour_tree.interrupt()
def main(self):
"""
Entry point for the demo script.
"""
hm.HelloNode.main(self, 'funmap', 'funmap')
root = self.create_root()
self.behaviour_tree = py_trees_ros.trees.BehaviourTree(root)
rospy.on_shutdown(functools.partial(self.shutdown, self.behaviour_tree))
if not self.behaviour_tree.setup(timeout=15):
console.logerror("failed to setup the tree, aborting.")
sys.exit(1)
def print_tree(tree):
print(py_trees.display.unicode_tree(root=tree.root, show_status=True))
try:
self.behaviour_tree.tick_tock(
500
# period_ms=500,
# number_of_iterations=py_trees.trees.CONTINUOUS_TICK_TOCK,
# pre_tick_handler=None,
# post_tick_handler=print_tree
)
except KeyboardInterrupt:
self.behaviour_tree.interrupt()
def main():
node = AutodockingBT()
node.main()
if __name__ == '__main__':
main()

+ 182
- 0
stretch_demos/nodes/visual_servoing_action.py View File

@ -0,0 +1,182 @@
#!/usr/bin/env python3
import rospy
from visualization_msgs.msg import MarkerArray
from geometry_msgs.msg import Transform, TransformStamped, Pose
import ros_numpy
import numpy as np
import tf2_ros
import time
import actionlib
from math import sqrt, pow, acos, degrees
from geometry_msgs.msg import Twist
from trajectory_msgs.msg import JointTrajectoryPoint
from control_msgs.msg import FollowJointTrajectoryGoal
from tf.transformations import euler_from_quaternion, quaternion_from_euler
from autodocking.msg import VisualServoAction, VisualServoGoal, VisualServoResult, VisualServoFeedback
from sensor_msgs.msg import JointState
from sensor_msgs.msg import PointCloud2
import hello_helpers.hello_misc as hm
class VisualServoing(hm.HelloNode):
def __init__(self):
hm.HelloNode.__init__(self)
hm.HelloNode.main(self, 'dock_visual_servo', 'dock_visual_servo', wait_for_first_pointcloud=False)
self.cmd_vel_pub = rospy.Publisher('/stretch/cmd_vel', Twist, queue_size=10)
self.server = actionlib.SimpleActionServer('autodock_visual_servo', VisualServoAction, self.execute_cb, False)
self.goal = VisualServoGoal()
self.feedback = VisualServoFeedback()
self.result = VisualServoResult()
self.cmd_vel = Twist()
self.cmd_vel.linear.x = 0.0
self.cmd_vel.linear.y = 0.0
self.cmd_vel.linear.z = 0.0
self.cmd_vel.angular.x = 0.0
self.cmd_vel.angular.y = 0.0
self.cmd_vel.angular.z = 0.0
self.server.start()
self.distance = 0.0
self.angle = 0.0
self.theta = 0.0 # angle between baselink x-axis and predock_pose x-axis
self.phi = 0.0 # angle between line connecting docking station and predock_pose and line connecting docking station with base_link
def execute_cb(self, goal):
self.goal = goal
scan_point = JointTrajectoryPoint()
scan_point.time_from_start = rospy.Duration(2.0)
scan_point.positions = [-1.051, -2.914] # Look back and down at the docking station
trajectory_goal = FollowJointTrajectoryGoal()
trajectory_goal.trajectory.joint_names = ['joint_head_tilt', 'joint_head_pan']
trajectory_goal.trajectory.points = [scan_point]
trajectory_goal.trajectory.header.stamp = rospy.Time.now()
trajectory_goal.trajectory.header.frame_id = 'base_link'
self.trajectory_client.send_goal(trajectory_goal)
self.trajectory_client.wait_for_result()
while self.distance > 0.1 and self.distance < 0.5:
try:
predock_trans = self.tf2_buffer.lookup_transform('docking_station', 'predock_pose', rospy.Time(), rospy.Duration(1.0))
base_trans = self.tf2_buffer.lookup_transform('docking_station', 'base_link', rospy.Time(), rospy.Duration(1.0))
base_predock_trans = self.tf2_buffer.lookup_transform('predock_pose', 'base_link', rospy.Time(), rospy.Duration(1.0))
x = predock_trans.transform.translation.x
y = predock_trans.transform.translation.y
z = predock_trans.transform.translation.z
side_a = sqrt(pow(x, 2) + pow(y, 2) + pow(z, 2))
x = base_trans.transform.translation.x
y = base_trans.transform.translation.y
z = base_trans.transform.translation.z
side_b = sqrt(pow(x, 2) + pow(y, 2) + pow(z, 2))
x = base_predock_trans.transform.translation.x
y = base_predock_trans.transform.translation.y
z = base_predock_trans.transform.translation.z
side_c = sqrt(pow(x, 2) + pow(y, 2) + pow(z, 2))
if y < 0:
self.phi = -acos((pow(side_a, 2) + pow(side_b, 2) - pow(side_c, 2))/(2.0 * side_a * side_b))
else:
self.phi = acos((pow(side_a, 2) + pow(side_b, 2) - pow(side_c, 2))/(2.0 * side_a * side_b))
angles = euler_from_quaternion([base_predock_trans.transform.rotation.x,
base_predock_trans.transform.rotation.y,
base_predock_trans.transform.rotation.z,
base_predock_trans.transform.rotation.w])
self.theta = -angles[2]
self.angle = self.phi + self.theta
rospy.loginfo("Angle obtained: {}".format(self.angle))
# if self.angle > 0.05 or self.angle < -0.05:
if self.angle > (self.theta + 0.05) or self.angle < (self.theta - 0.05):
self.cmd_vel.angular.z = self.angle*0.5
else:
self.cmd_vel.angular.z = 0.0
self.cmd_vel.linear.x = -(self.distance*0.25)
self.cmd_vel_pub.publish(self.cmd_vel)
self.feedback.angle_error = self.angle
self.feedback.distance_error = self.distance
self.server.publish_feedback(self.feedback)
except:
self.cmd_vel.linear.x = 0.0
self.cmd_vel.angular.z = 0.0
self.cmd_vel_pub.publish(self.cmd_vel)
if self.distance > 0.5:
self.cmd_vel.linear.x = 0.0
self.cmd_vel.angular.z = 0.0
self.cmd_vel_pub.publish(self.cmd_vel)
self.result.result = False
self.result_cb(self.result.result)
else:
self.cmd_vel.linear.x = 0.0
self.cmd_vel.angular.z = 0.0
self.cmd_vel_pub.publish(self.cmd_vel)
self.result.result = True
self.result_cb(self.result.result)
def result_cb(self, result):
if result:
rospy.loginfo("Docking complete")
self.server.set_succeeded(self.result)
else:
rospy.loginfo("Failed to dock")
self.server.set_aborted(self.result)
def aruco_callback(self, msg):
self.markers = msg.markers
def main(self):
self.rate = 10.0
rate = rospy.Rate(self.rate)
self.tf2_buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.tf2_buffer)
self.tf2_broadcaster = tf2_ros.TransformBroadcaster()
charging_port = TransformStamped()
charging_port.transform.translation.x = -0.3
charging_port.transform.translation.y = 0.0
charging_port.transform.translation.z = 0.1
charging_port.transform.rotation.x = 0.0
charging_port.transform.rotation.y = 0.0
charging_port.transform.rotation.z = 1.0
charging_port.transform.rotation.w = 0.0
charging_port.header.frame_id = 'base_link'
charging_port.child_frame_id = 'charging_port'
time.sleep(1.0)
while not rospy.is_shutdown():
try:
charging_port.header.stamp = rospy.Time.now()
self.tf2_broadcaster.sendTransform(charging_port)
transform = self.tf2_buffer.lookup_transform('charging_port', 'docking_station', rospy.Time(), rospy.Duration(1.0))
x = transform.transform.translation.x
y = transform.transform.translation.y
z = transform.transform.translation.z
self.distance = sqrt(pow(x, 2) + pow(y, 2) + pow(z, 2))
# rospy.loginfo("distance to docking_station {}".format(self.distance))
except (AttributeError, tf2_ros.LookupException, tf2_ros.ExtrapolationException):
pass
rate.sleep()
def main():
try:
node = VisualServoing()
node.main()
rospy.spin()
except KeyboardInterrupt:
print('interrupt received, so shutting down')
if __name__ == '__main__':
main()

Loading…
Cancel
Save