话题通信操作
流程:
在packages.xml
中,依赖包需要补充
1 2 3 4 5 6 7
| <build_depend>rosidl_default_generators</build_depend> <exec_depend>rosidl_default_runtime</exec_depend> <member_of_group>rosidl_interface_packages</member_of_group>
|
然后在CMakeList.txt
中写入:
1 2 3 4 5
| find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} "msg/Student.msg" )
|
- 编译: 进入
(dpl39) tyfelix@tyfelix-B760M:~/zxz_ROS/ws01_plumbing$
工作空间,编译colcon build --packages-select base interface_demo
- 测试:编译完成之后,
. install/setup.bash
然后ros2 interface show base_interface_demo/msg/Student
。输出内容为Student.msg
的内容,那么说明编译成功。可以进行之后的操作。
下面展示了一个服务通信的例子(主要注意 1.话题名称需要保持一致性;2.接口的引入; 3. 函数不一样):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| ----发布方---- from std_msgs.msg import String import rclpy from rclpy.node import Node from base_interfaces_demo.msg import Student
class Talker(Node): def __init__(self, node_name): super().__init__(node_name=node_name) self.count = 0 self.get_logger().info("发布方创建了(python)") ''' 参数:消息类型;话题名称;QoS队列长度 返回这个发布者对象 ''' self.publisher = self.create_publisher(Student,'chatter1',10) ''' 参数: 时间间隔;回调函数(函数作为参数传入) ''' self.timer = self.create_timer(1.0,self.on_timer) def on_timer(self): stu = Student() stu.name = "Felix Jackson" stu.age = 20 stu.height = 175.0 self.publisher.publish(stu) self.count += 1 def main(): rclpy.init() node = Talker("talker_node_py3") rclpy.spin(node) rclpy.shutdown() if __name__ == '__main__': main()
----订阅方---- from std_msgs.msg import String import rclpy from rclpy.node import Node from base_interfaces_demo.msg import Student
class Listener(Node): def __init__(self, node_name): super().__init__(node_name=node_name) self.get_logger().info("订阅方创建了(python)") ''' 参数:消息类型;话题名称(比如和发布者一致);回调函数;QoS队列长度 返回这个发布者对象 ''' self.subcriber = self.create_subscription(Student,'chatter1', self.do_cb, 10) def do_cb(self, stu): self.get_logger().info(f"订阅的数据是:name:{stu.name}; age:{stu.age}; height:{stu.height}") def main(): rclpy.init() node = Listener("listener_node_py4") rclpy.spin(node) rclpy.shutdown() if __name__ == '__main__': main()
|