Skip to content

Commit

Permalink
Move ImuCalibrator node into leo_fw package (#8)
Browse files Browse the repository at this point in the history
* Move ImuCalibrator node into leo_fw package

* Rename node

* Fix lint warnings

* Add numpy to dependencies
  • Loading branch information
bjsowa authored Nov 15, 2023
1 parent ab8bc68 commit f7967a9
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 83 deletions.
3 changes: 2 additions & 1 deletion leo_fw/leo_fw/nodes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
# THE SOFTWARE.

from .parameter_bridge import ParameterBridge
from .imu_calibrator import ImuCalibrator

__all__ = ["ParameterBridge"]
__all__ = ["ParameterBridge", "ImuCalibrator"]
83 changes: 83 additions & 0 deletions leo_fw/leo_fw/nodes/imu_calibrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Copyright 2023 Fictionlab sp. z o.o.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

#!/usr/bin/env python3

import numpy as np

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSReliabilityPolicy
from leo_msgs.srv import SetImuCalibration
from leo_msgs.msg import Imu


class ImuCalibrator(Node):
def __init__(self, time=2.0):
super().__init__("imu_calibrator")

self.service_client = self.create_client(
SetImuCalibration, "set_imu_calibration"
)

while not self.service_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("service not available, waiting again...")

self.data = []
self.end_time = self.get_clock().now() + rclpy.duration.Duration(seconds=time)

qos = QoSProfile(
depth=1,
durability=QoSDurabilityPolicy.VOLATILE,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
)

self.running = True

self.imu_sub = self.create_subscription(
Imu, "firmware/imu", self.imu_sub_callback, qos
)

self.get_logger().info(f"Collecting IMU measurements for {time} seconds...")

def imu_sub_callback(self, data: Imu):
if self.get_clock().now() >= self.end_time:
self.running = False
self.destroy_subscription(self.imu_sub)

self.data.append([data.gyro_x, data.gyro_y, data.gyro_z])

def send_bias(self):
self.get_logger().info(f"Calculating bias from {len(self.data)} samples.")

matrix = np.matrix(self.data)
bias = matrix.mean(0) * -1.0
bias = bias.tolist()[0]

self.get_logger().info(f"Calculated bias: {bias}")

req = SetImuCalibration.Request()
req.gyro_bias_x, req.gyro_bias_y, req.gyro_bias_z = bias

future = self.service_client.call_async(req)
rclpy.spin_until_future_complete(self, future)

if not future.result().success:
self.get_logger().error("Failed to set new imu calibration.")
1 change: 1 addition & 0 deletions leo_fw/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<exec_depend>ament_index_python</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>python3-dbus</exec_depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-whichcraft</exec_depend>
<exec_depend>python3-yaml</exec_depend>
<exec_depend>rclpy</exec_depend>
Expand Down
103 changes: 21 additions & 82 deletions leo_fw/scripts/calibrate_imu
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,34 @@

import argparse

import numpy as np
from leo_fw.nodes import ImuCalibrator

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSReliabilityPolicy
from leo_msgs.srv import SetImuCalibration
from leo_msgs.msg import Imu

rclpy.init()

class Calibrator(Node):
def __init__(self, time=2.0):
super().__init__("calibrate_imu")
parser = argparse.ArgumentParser(
description="Calculate imu bias and send it to firmware_message_converter.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

self.service_client = self.create_client(
SetImuCalibration, "set_imu_calibration"
)
parser.add_argument(
"duration",
default=10.0,
type=float,
nargs="?",
help="The duration in seconds for which the IMU data is collected.",
)

while not self.service_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("service not available, waiting again...")
args = parser.parse_args()

self.data = []
self.end_time = self.get_clock().now() + rclpy.duration.Duration(seconds=time)
calib = ImuCalibrator(args.duration)

qos = QoSProfile(
depth=1,
durability=QoSDurabilityPolicy.VOLATILE,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
)
while calib.running and rclpy.ok():
rclpy.spin_once(calib)

self.running = True
calib.send_bias()

self.imu_sub = self.create_subscription(
Imu, "firmware/imu", self.imu_sub_callback, qos
)

self.get_logger().info(f"Collecting IMU measurements for {time} seconds...")

def imu_sub_callback(self, data: Imu):
if self.get_clock().now() >= self.end_time:
self.running = False
self.destroy_subscription(self.imu_sub)

self.data.append([data.gyro_x, data.gyro_y, data.gyro_z])

def send_bias(self):
self.get_logger().info(f"Calculating bias from {len(self.data)} samples.")

matrix = np.matrix(self.data)
bias = matrix.mean(0) * -1.0
bias = bias.tolist()[0]

self.get_logger().info(f"Calculated bias: {bias}")

req = SetImuCalibration.Request()
req.gyro_bias_x, req.gyro_bias_y, req.gyro_bias_z = bias

self.future = self.service_client.call_async(req)
rclpy.spin_until_future_complete(self, self.future)

if not self.future.result().success:
self.get_logger().error(f"Failed to set new imu calibration.")


def add_arguments(parser: argparse.ArgumentParser):
parser.add_argument(
"duration",
default=10.0,
type=float,
nargs="?",
help="The duration in seconds for which the IMU data is collected.",
)


if __name__ == "__main__":
rclpy.init()

parser = argparse.ArgumentParser(
description="Calculate imu bias and send it to firmware_message_converter.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
add_arguments(parser)
args = parser.parse_args()

calib = Calibrator(args.duration)

while calib.running and rclpy.ok():
rclpy.spin_once(calib)

calib.send_bias()
calib.get_logger().info("Finishing node.")
calib.destroy_node()
rclpy.shutdown()
calib.get_logger().info("Finishing node.")
calib.destroy_node()
rclpy.shutdown()

0 comments on commit f7967a9

Please sign in to comment.