Skip to content

Latest commit

 

History

History
841 lines (680 loc) · 43.3 KB

scan-matching-branch-and-bound-impl.md

File metadata and controls

841 lines (680 loc) · 43.3 KB

title: 分枝限定法によるスキャンマッチング(実装編) author: SternGerlach

ホームに戻る

このページについて

このページは、慶應理工アドベントカレンダー2021の15日目の記事です。

こちらのページの続きです。 分枝限定法によるスキャンマッチングのPythonによる実装を示します。 Pythonの実装はGitHubのリポジトリに置かれています(中の人が1から作っています)。

2次元の姿勢と位置

2次元の姿勢をPose2Dクラス、2次元の位置をPoint2Dクラスとして次のように定義します。 Pose2DクラスのソースコードはこちらPoint2Dクラスのソースコードはこちらを参照してください。

class Point2D(object):
    def __init__(self) -> None:
        self.x = 0.0
        self.y = 0.0

    def __init__(self, x: float, y: float) -> None:
        self.x = x
        self.y = y

project_point_2d()関数は、ある姿勢のもとで、点の座標を変換します。 姿勢は引数pose、点の座標は引数pointにそれぞれ指定します。 例えば、poseがロボットの姿勢(地図座標系からLiDAR座標系への座標変換)、pointがLiDAR座標系におけるスキャン点の座標(直交座標)であれば、地図座標系におけるスキャン点の座標が返されます。

class Pose2D(object):
    def __init__(self) -> None:
        self.x = 0.0
        self.y = 0.0
        self.theta = 0.0

    def __init__(self, x: float, y: float, theta: float) -> None:
        self.x = x
        self.y = y
        self.theta = theta

def project_point_2d(pose: Pose2D, point: Point2D) -> Point2D:
    """Project a 2D point using a 2D pose"""
    sin_theta = math.sin(pose.theta)
    cos_theta = math.cos(pose.theta)

    x = pose.x + cos_theta * point.x - sin_theta * point.y
    y = pose.y + sin_theta * point.x + cos_theta * point.y

    return Point2D(x, y)

占有格子地図の実装

占有格子地図(Occupancy Grid Map)の実装(Grid2Dクラス)を示します。 Grid2Dクラスのソースコードはこちらを参照してください。 重要な部分だけを以下に列挙します。

  • 占有格子地図は、解像度、中心の座標、$x$と$y$方向の格子の数で定義されます。 解像度はresolution()メソッド、中心の座標はcenter_pos()メソッド、左下端($(0, 0)$番目の格子)の座標はmin_pos()メソッド、$x$と$y$方向の格子の数はnum_of_cells_x()num_of_cells_y()メソッドからそれぞれ取得できます。
  • 各格子の占有確率は、16ビットの整数(1から65535まで)に量子化したうえで、リストに格納されます。 格子は$x$と$y$の2つのインデックスで表せますが、flat_index()メソッドによって、リストにアクセスするための1次元のインデックスに変換できます。 分枝限定法によるスキャンマッチングの際は、占有確率の総和を取る必要があります。 占有確率が16ビット整数に量子化されているので、浮動小数点数の総和が整数の総和に置き換わって、多少の高速化を期待できます。
  • index_to_point()メソッドは、指定された格子に対応する座標を返します。 point_to_index()メソッドはその逆で、指定された位置に対応する格子のインデックスを返します。
  • get_index_value()get_index_probability()メソッドは、指定された格子の占有確率を、16ビットの整数、あるいは浮動小数点数で返します。
  • compute_coarse_map()関数は、以前のページにおける$\mathcal{M}\mathrm{precomp}^h$を計算します。 引数win_sizeに$2^h$、grid_mapに占有格子地図$\mathcal{M}$を指定します。 計算にあたっては、占有確率の比較が繰り返し必要です。 占有確率が16ビット整数に量子化されているので、浮動小数点数の比較が整数の比較に置き換わって、多少の高速化を期待できます。 $$ \mathcal{M}\mathrm{precomp}^h(i, j) = \max_{\substack{i \le i' < i + 2^h \ j \le j' < j + 2^h}} \mathcal{M}(i', j') $$
  • ここでは使っていませんが、地図の更新に必要な各種メソッドも定義されているので、興味があれば読んでみてください。 格子がもつ占有確率の更新には、バイナリベイズフィルタを利用します。 また、LiDARセンサの中心と各スキャン点を結ぶ直線を考え、その直線上を通過する格子のインデックスを計算するために、ブレゼンハムのアルゴリズムを利用します。
# coding: utf-8
# grid_2d.py

import math

from collections import deque
from typing import Callable, List, Tuple

from point_2d import Point2D
from pose_2d import Pose2D, project_point_2d
from util import bresenham

def probability_to_value(x: float) -> int:
    return int(x * 65536.0)

def value_to_probability(x: int) -> float:
    return float(x) / 65536.0

def probability_to_log_odds(x: float) -> float:
    return math.log(x / (1.0 - x))

def log_odds_to_probability(x: float) -> float:
    return 1.0 / (1.0 + math.exp(-x))

def probability_to_odds(x: float) -> float:
    return x / (1.0 - x)

def odds_to_probability(x: float) -> float:
    return x / (1.0 + x)

def log_odds_to_value(x: float) -> int:
    return probability_to_value(log_odds_to_probability(x))

def value_to_log_odds(x: int) -> float:
    return probability_to_log_odds(value_to_probability(x))

def probability_to_grayscale(x: float) -> int:
    return 255 - int(x * 255.0)

def value_to_grayscale(x: int) -> int:
    return 255 - (x >> 8)

class Grid2D(object):
    """Grid2D class represents a simple 2D grid map. Each grid cell contains
    a quantized occupancy probability, which is incrementally updated by
    Binary Bayes Filter."""

    VALUE_MIN = 1
    VALUE_MAX = 65535
    PROBABILITY_MIN = value_to_probability(VALUE_MIN)
    PROBABILITY_MAX = value_to_probability(VALUE_MAX)
    LOG_ODDS_MIN = probability_to_log_odds(PROBABILITY_MIN)
    LOG_ODDS_MAX = probability_to_log_odds(PROBABILITY_MAX)

    def __init__(self, resolution: float, center_x: float, center_y: float,
                 num_of_cells_x: int, num_of_cells_y: int,
                 log_odds_hit: float, log_odds_miss: float) -> None:
        assert num_of_cells_x > 0
        assert num_of_cells_y > 0
        assert Grid2D.LOG_ODDS_MIN <= log_odds_miss <= Grid2D.LOG_ODDS_MAX
        assert Grid2D.LOG_ODDS_MIN <= log_odds_hit <= Grid2D.LOG_ODDS_MAX

        self.__resolution = resolution
        self.__min_x = center_x - resolution * num_of_cells_x / 2.0
        self.__min_y = center_y - resolution * num_of_cells_y / 2.0
        self.__center_x = center_x
        self.__center_y = center_y
        self.__num_of_cells_x = num_of_cells_x
        self.__num_of_cells_y = num_of_cells_y
        self.__log_odds_miss = log_odds_miss
        self.__log_odds_hit = log_odds_hit

        num_of_cells = num_of_cells_x * num_of_cells_y
        self.__values = [Grid2D.VALUE_MIN for _ in range(num_of_cells)]

    def resolution(self) -> float:
        return self.__resolution

    def num_of_cells_x(self) -> int:
        return self.__num_of_cells_x

    def num_of_cells_y(self) -> int:
        return self.__num_of_cells_y

    def log_odds_miss(self) -> float:
        return self.__log_odds_miss

    def log_odds_hit(self) -> float:
        return self.__log_odds_hit

    def shape(self) -> Tuple[int, int]:
        return self.__num_of_cells_x, self.__num_of_cells_y

    def min_pos(self) -> Tuple[float, float]:
        return self.__min_x, self.__min_y

    def center_pos(self) -> Tuple[float, float]:
        return self.__center_x, self.__center_y

    def flat_index(self, x: int, y: int) -> int:
        return y * self.__num_of_cells_x + x

    def is_index_inside(self, x: int, y: int) -> bool:
        return 0 <= x < self.__num_of_cells_x and \
               0 <= y < self.__num_of_cells_y

    def is_point_inside(self, p: Point2D) -> bool:
        return self.is_index_inside(*self.point_to_index(p))

    def index_to_point(self, x: int, y: int) -> Point2D:
        """Compute the minimum coordinates in the map frame"""
        return Point2D(self.__min_x + x * self.__resolution,
                       self.__min_y + y * self.__resolution)

    def point_to_index(self, p: Point2D) -> Tuple[int, int]:
        """Compute the index from the coordinates in the map frame"""
        idx_x = int((p.x - self.__min_x) // self.__resolution)
        idx_y = int((p.y - self.__min_y) // self.__resolution)
        return idx_x if p.x >= self.__min_x else idx_x - 1, \
               idx_y if p.y >= self.__min_y else idx_y - 1

    def point_to_index_float(self, p: Point2D) -> Tuple[float, float]:
        """Compute the index in the floating point in the map frame"""
        idx_x = (p.x - self.__min_x) / self.__resolution
        idx_y = (p.y - self.__min_y) / self.__resolution
        return idx_x, idx_y

    def get_index_value(self, x: int, y: int) -> int:
        assert self.is_index_inside(x, y)
        return self.__values[self.flat_index(x, y)]

    def get_index_probability(self, x: int, y: int) -> float:
        assert self.is_index_inside(x, y)
        return value_to_probability(self.__values[self.flat_index(x, y)])

    def get_point_value(self, p: Point2D) -> int:
        return self.get_index_value(*self.point_to_index(p))

    def get_point_probability(self, p: Point2D) -> float:
        return self.get_index_probability(*self.point_to_index(p))

    def get_neighbor_probabilities(self, p: Point2D):
        x, y = self.point_to_index_float(p)
        x0, y0 = int(x), int(y)
        x1, y1 = x0 + 1, y0 + 1
        dx, dy = x - x0, y - y0

        x0_valid = 0 <= x0 < self.__num_of_cells_x
        x1_valid = 0 <= x1 < self.__num_of_cells_x
        y0_valid = 0 <= y0 < self.__num_of_cells_y
        y1_valid = 0 <= y1 < self.__num_of_cells_y

        m00 = self.get_index_probability(x0, y0) \
              if x0_valid and y0_valid else 0.5
        m01 = self.get_index_probability(x0, y1) \
              if x0_valid and y1_valid else 0.5
        m10 = self.get_index_probability(x1, y0) \
              if x1_valid and y0_valid else 0.5
        m11 = self.get_index_probability(x1, y1) \
              if x1_valid and y1_valid else 0.5

        interpolated = dy * (dx * m11 + (1.0 - dx) * m01) + \
                       (1.0 - dy) * (dx * m10 + (1.0 - dx) * m00)

        return (x0, y0), (dx, dy), (m00, m01, m10, m11), interpolated

    def get_interpolated_probability(self, p: Point2D) -> float:
        return self.get_neighbor_probabilities(p)[3]

    def set_index_value(self, x: int, y: int, value: int) -> None:
        assert self.is_index_inside(x, y)
        self.__values[self.flat_index(x, y)] = self.__clamp_value(value)

    def set_index_probability(self, x: int, y: int, value: float) -> None:
        assert self.is_index_inside(x, y)
        self.__values[self.flat_index(x, y)] = \
             self.__clamp_value(probability_to_value(value))

    def set_point_value(self, p: Point2D, value: int) -> None:
        self.set_index_value(*self.point_to_index(p), value)

    def set_point_probability(self, p: Point2D, value: float) -> None:
        self.set_index_probability(*self.point_to_index(p), value)

    def __update_index_log_odds(self, x: int, y: int, value: float) -> None:
        assert self.is_index_inside(x, y)
        idx = self.flat_index(x, y)
        log_odds = value_to_log_odds(self.__values[idx]) + value
        self.__values[idx] = self.__clamp_value(log_odds_to_value(log_odds))

    def update_index_hit(self, x: int, y: int) -> None:
        self.__update_index_log_odds(x, y, self.__log_odds_hit)

    def update_index_miss(self, x: int, y: int) -> None:
        self.__update_index_log_odds(x, y, self.__log_odds_miss)

    def update_point_hit(self, p: Point2D) -> None:
        self.update_index_hit(*self.point_to_index(p))

    def update_point_miss(self, p: Point2D) -> None:
        self.update_index_miss(*self.point_to_index(p))

    def update_from_scans(self, pose: Pose2D, scan: List[Point2D]) -> None:
        pose_idx = self.point_to_index(Point2D(pose.x, pose.y))
        points = [project_point_2d(pose, p) for p in scan]
        scan_indices = [self.point_to_index(p) for p in points]
        ray_indices = [bresenham(*pose_idx, *idx) for idx in scan_indices]
        # hit_indices = [indices[-1] for indices in ray_indices]

        for indices in ray_indices:
            for missed_idx in indices[:-1]:
                if self.is_index_inside(*missed_idx):
                    self.update_index_miss(*missed_idx)

            if self.is_index_inside(*indices[-1]):
                self.update_index_hit(*indices[-1])

    def __clamp_value(self, x: int) -> int:
        return max(Grid2D.VALUE_MIN, min(Grid2D.VALUE_MAX, x))

    def to_bytes(self) -> bytes:
        return bytes([value_to_grayscale(x) for x in self.__values])

def sliding_window_max(in_func: Callable[[int], int],
                       out_func: Callable[[int, int], None],
                       num_of_elements: int, win_size: int) -> None:
    idx_queue, idx_in, idx_out = deque(), 0, 0

    # Process the first `win_size` elements (first window)
    while idx_in < win_size:
        # Previous smaller elements are useless so remove them from `idx_queue`
        while idx_queue and in_func(idx_in) >= in_func(idx_queue[-1]):
            idx_queue.pop()
        idx_queue.append(idx_in)
        idx_in += 1

    # `idx_queue[0]` contains index of the maximum element in the first window
    # Process the rest of the elements here
    while idx_in < num_of_elements:
        # The element pointed by the `idx_queue[0]` is the maximum element of
        # the previous window
        out_func(idx_out, in_func(idx_queue[0]))
        idx_out += 1

        # Remove the elements that are out of the current window
        while idx_queue and idx_queue[0] <= idx_in - win_size:
            idx_queue.popleft()

        # Remove all elements smaller than the current element
        while idx_queue and in_func(idx_in) >= in_func(idx_queue[-1]):
            idx_queue.pop()

        # Append the current element to `idx_queue`
        idx_queue.append(idx_in)
        idx_in += 1

    # Repeat the last elements
    while idx_out < num_of_elements:
        out_func(idx_out, in_func(idx_queue[0]))
        idx_out += 1

def compute_coarse_map_slow(grid_map: Grid2D, win_size: int) -> Grid2D:
    num_of_cells = grid_map.num_of_cells_x() * grid_map.num_of_cells_y()
    buffer = [0 for _ in range(num_of_cells)]
    coarse_map = Grid2D(grid_map.resolution(), *grid_map.center_pos(),
                        grid_map.num_of_cells_x(), grid_map.num_of_cells_y(),
                        grid_map.log_odds_hit(), grid_map.log_odds_miss())

    def get_buffer(idx_x: int, idx_y: int) -> int:
        return buffer[idx_y * grid_map.num_of_cells_x() + idx_x]

    def set_buffer(idx_x: int, idx_y: int, value: int) -> None:
        buffer[idx_y * grid_map.num_of_cells_x() + idx_x] = value

    def set_coarse_map(idx_x: int, idx_y: int, value: int) -> None:
        coarse_map.set_index_value(idx_x, idx_y, value)

    # Sliding window maximum for y-axis
    for x in range(grid_map.num_of_cells_x()):
        in_func_row = lambda y: grid_map.get_index_value(x, y)
        out_func_row = lambda y, max_value: set_buffer(x, y, max_value)
        sliding_window_max(in_func_row, out_func_row, \
                           grid_map.num_of_cells_y(), win_size)

    # Sliding window maximum for x-axis
    for y in range(grid_map.num_of_cells_y()):
        in_func_col = lambda x: get_buffer(x, y)
        out_func_col = lambda x, max_value: set_coarse_map(x, y, max_value)
        sliding_window_max(in_func_col, out_func_col, \
                           grid_map.num_of_cells_x(), win_size)

    return coarse_map

def compute_coarse_map_fast(grid_map: Grid2D, win_size: int) -> Grid2D:
    num_of_cells_x = grid_map.num_of_cells_x()
    num_of_cells_y = grid_map.num_of_cells_y()
    num_of_cells = num_of_cells_x * num_of_cells_y
    buffer = [0 for _ in range(num_of_cells)]
    coarse_map = Grid2D(grid_map.resolution(), *grid_map.center_pos(),
                        grid_map.num_of_cells_x(), grid_map.num_of_cells_y(),
                        grid_map.log_odds_hit(), grid_map.log_odds_miss())

    # Sliding window maximum for y-axis
    for x in range(num_of_cells_x):
        idx_queue, idx_in, idx_out = deque(), 0, 0

        while idx_in < win_size:
            while idx_queue and grid_map.get_index_value(x, idx_in) >= \
                grid_map.get_index_value(x, idx_queue[-1]):
                idx_queue.pop()
            idx_queue.append(idx_in)
            idx_in += 1

        while idx_in < num_of_cells_y:
            buffer[idx_out * num_of_cells_x + x] = \
                grid_map.get_index_value(x, idx_queue[0])
            idx_out += 1

            while idx_queue and idx_queue[0] <= idx_in - win_size:
                idx_queue.popleft()
            while idx_queue and grid_map.get_index_value(x, idx_in) >= \
                grid_map.get_index_value(x, idx_queue[-1]):
                idx_queue.pop()

            idx_queue.append(idx_in)
            idx_in += 1

        while idx_out < num_of_cells_y:
            buffer[idx_out * num_of_cells_x + x] = \
                grid_map.get_index_value(x, idx_queue[0])
            idx_out += 1

    # Sliding window maximum for x-axis
    for y in range(num_of_cells_y):
        idx_queue, idx_in, idx_out = deque(), 0, 0
        idx_offset = y * num_of_cells_x

        while idx_in < win_size:
            while idx_queue and buffer[idx_offset + idx_in] >= \
                buffer[idx_offset + idx_queue[-1]]:
                idx_queue.pop()
            idx_queue.append(idx_in)
            idx_in += 1

        while idx_in < num_of_cells_x:
            coarse_map.set_index_value(
                idx_out, y, buffer[idx_offset + idx_queue[0]])
            idx_out += 1

            while idx_queue and idx_queue[0] <= idx_in - win_size:
                idx_queue.popleft()
            while idx_queue and buffer[idx_offset + idx_in] >= \
                buffer[idx_offset + idx_queue[-1]]:
                idx_queue.pop()

            idx_queue.append(idx_in)
            idx_in += 1

        while idx_out < num_of_cells_x:
            coarse_map.set_index_value(
                idx_out, y, buffer[idx_offset + idx_queue[0]])
            idx_out += 1

    return coarse_map

def compute_coarse_map(grid_map: Grid2D, win_size: int) -> Grid2D:
    return compute_coarse_map_fast(grid_map, win_size)

分枝限定法によるスキャンマッチングの実装

続いて、分枝限定法によるスキャンマッチングの実装(ScanMatcherBranchBoundクラス)を示します。 ScanMatcherBranchBoundクラスのソースコードはこちらを参照してください。 アルゴリズムの説明は大変でしたが、実装自体は100行程度で済みます。

  • コンストラクタでは、アルゴリズムの各種パラメータを初期化します。 探索領域のサイズ$2W_x, 2W_y, 2W_\theta$をwindow_size_xwindow_size_ywindow_size_thetaの3つのメンバ、$\theta$方向のステップサイズの最小値$\delta_{\theta, \mathrm{min}}$をmin_step_thetaメンバ、ノードの最大の高さ$h_0$をmax_node_heightメンバ、$2^{h_0}$をmax_strideメンバに格納します。
  • compute_step()メソッドは、占有格子地図の解像度$r$と、スキャンデータ$\mathcal{S}$を基に、$x$、$y$、$\theta$方向のステップサイズ($r$および$\delta_\theta$)を計算します。 $$ \delta_\theta = \max \left( \delta_{\theta, \mathrm{min}}, \arccos \left( 1 - \frac{r^2}{2 d_\mathrm{max}^2} \right) \right) $$ $d_\mathrm{max}$は、LiDARセンサの中心から、スキャン点までの距離の最大値です。 スキャンデータ$\mathcal{S} = \left{ (r_1, \theta_1), \ldots, (r_N, \theta_N) \right}$について、$d_\mathrm{max} = \max_i r_i$となります。
  • compute_window()メソッドは、探索領域のサイズ$W_x, W_y, W_\theta$とステップサイズ$r, \delta_\theta$を基に、各軸方向の解の候補数$w_x, w_y, w_\theta$を計算します。 $$ w_x = \left\lceil \frac{W_x}{r} \right\rceil,
    w_y = \left\lceil \frac{W_y}{r} \right\rceil,
    w_\theta = \left\lceil \frac{W_\theta}{\delta_\theta} \right\rceil $$
  • compute_score()メソッドは、指定されたインデックスの格子がもつ占有確率を足し合わせて、スコアを計算します(詳細は後ほど説明します)。

match_scan()メソッドは、分枝限定法によるスキャンマッチングの本体です。 以前のページの末尾に示したアルゴリズムと照らし合わせて、ソースコードを詳しくみてみましょう。

  • 引数initial_posegrid_mapscanから、探索領域の中心に対応する姿勢$\mathbf{\xi}0 = \left[ \xi{0, x}, \xi_{0, y}, \xi_{0, \theta} \right]^\top$、占有格子地図$\mathcal{M}$(解像度$r$)、スキャンデータ$\mathcal{S}$を受け取ります。 以前のページでは、スキャンデータは極座標形式(距離$r_i$と方向$\theta_i$)で表現していましたが、この実装では直交座標($x, y$)を用いています。 あるスキャンが極座標で$\mathbf{z}_i = (r_i, \theta_i)$のように表されるとき、$(r_i \cos \theta_i, r_i \sin \theta_i)$とすれば直交座標に変換できます。 また、直交座標で$\mathbf{z}_i = (x_i, y_i)$のように表されるとき、$(\sqrt{x_i^2 + y_i^2}, \arctan \cfrac{y_i}{x_i})$とすれば極座標に変換できます。

  • 最初に、compute_step()メソッドとcompute_window()メソッドを使って、ステップサイズ$r, \delta_\theta$と探索領域のサイズ$w_x, w_y, w_\theta$を計算します。 変数step_xstep_ystep_thetaは、$r$, $r$, $\delta_\theta$に対応します。 また変数win_xwin_ywin_thetaは、$w_x$、$w_y$、$w_\theta$に対応します。 スコアの最大値$s^$と最適解$(j_x^, j_y^, j_\theta^) \in \overline{\mathcal{W}}$は、変数best_scorebest_xbest_ybest_thetaにそれぞれ保持します。 以前のページでは、次のように探索領域$\overline{\mathcal{W}}$を定義していました。 $$ \overline{\mathcal{W}} = \left{ 0, \ldots, 2w_x \right} \times \left{ 0, \ldots, 2w_y \right} \times \left{ 0, \ldots, 2w_\theta \right} $$ $$ \mathcal{W} = \left{ \mathcal{\xi}_0

    • \left[ r \left( -w_x + j_x \right), r \left( -w_y + j_y \right), \delta_\theta \left( -w_\theta + j_\theta \right) \right]^\top \mid \left( j_x, j_y, j_\theta \right) \in \overline{\mathcal{W}} \right} $$ このメソッド内では、以下の定義を採用しています(上記の$j_x, j_y, j_\theta$を、$j_x - w_x, j_y - w_y, j_\theta - w_\theta$に置き換えただけです)。 $$ \overline{\mathcal{W}} = \left{ -w_x, \ldots, w_x - 1\right} \times \left{ -w_y, \ldots, w_y - 1\right} \times \left{ -w_\theta, \ldots, w_\theta - 1\right} $$ $$ \mathcal{W} = \left{ \mathcal{\xi}_0
    • \left[ r j_x, r j_y, \delta_\theta j_\theta \right]^\top \mid \left( j_x, j_y, j_\theta \right) \in \overline{\mathcal{W}} \right} $$
  • 続いて、compute_coarse_map()メソッドを使って、与えられた地図$\mathcal{M}$から、$h_0$個の地図$\mathcal{M}\mathrm{precomp}^1, \ldots, \mathcal{M}\mathrm{precomp}^{h_0}$を計算します。 元々の地図$\mathcal{M}$と、計算した地図$\mathcal{M}\mathrm{precomp}^h$をまとめて、変数grid_mapsに格納します。 $$ \mathcal{M}\mathrm{precomp}^h(i, j) = \max_{\substack{i \le i' < i + 2^h \ j \le j' < j + 2^h}} \mathcal{M}(i', j') $$

  • その後、優先度付きキュー$\mathcal{C}$を作成します(ここでは優先度付きキューを用いましたが、単なるスタックで良いと思います)。 これは変数node_queueに対応します。 優先度付きキュー$\mathcal{C}$を、高さ$h_0$の一連のノード$\mathcal{C}0$で初期化します(探索領域の定義が少し異なるので、それに応じて$\overline{\mathcal{W}}0$と$\mathcal{C}0$の定義も変えています)。 ソースコードは変数xytの3重ループになっていますが、$\overline{\mathcal{W}}{0, x}$、$\overline{\mathcal{W}}{0, y}$、$\overline{\mathcal{W}}{0, \theta}$の各要素を順に辿っているわけです。 $$ \begin{eqnarray} \overline{\mathcal{W}}{0, x} &=& \left{ -w_x + 2^{h_0} j_x \mid j_x \in \mathbb{Z}, 0 \le 2^{h_0} j_x < 2 w_x \right} \ \overline{\mathcal{W}}{0, y} &=& \left{ -w_y + 2^{h_0} j_y \mid j_y \in \mathbb{Z}, 0 \le 2^{h_0} j_y < 2 w_y \right} \ \overline{\mathcal{W}}{0, \theta} &=& \left{ j\theta \mid j_\theta \in \mathbb{Z}, -w_\theta \le j_\theta < w_\theta \right} \ \mathcal{C}0 &=& \overline{\mathcal{W}}{0, x} \times \overline{\mathcal{W}}{0, y} \times \overline{\mathcal{W}}{0, \theta} \times \left{ h_0 \right} \end{eqnarray} $$ 各ノード$\mathbf{c} = \left( -w_x + 2^{h_0} j_x, -w_y + 2^{h_0} j_y, j_\theta, h_0 \right) \in \mathcal{C}0$について、append_node()関数を呼び出します。 引数xytは、$-w_x + 2^{h_0} j_x$、$-w_y + 2^{h_0} j_y$、$j\theta$に対応します。 また引数heightは$h_0$、引数indicesは以下で定義するインデックス${ (I_{i, x}^0, I_{i, y}^0) }$に対応します。 append_node()関数内では、スコアの上界$\overline{s}(\mathbf{c})$を次のような手順で計算し、変数scoreに格納します。 最初に、解の候補$(0, 0, j_\theta)$に対応する姿勢$\mathbf{\xi}(0, 0, j_\theta)$を計算し、変数poseに格納します。 $$ \mathbf{\xi}(0, 0, j_\theta) = \mathbf{\xi}0 + \left[ 0, 0, \delta\theta j_\theta \right]^\top = \left[ \xi_{0, x}, \xi_{0, y}, \xi_{0, \theta} + \delta_\theta j_\theta \right]^\top $$ この姿勢$\mathbf{\xi}(0, 0, j_\theta)$を基に、スキャン点$\mathbf{z}i = (r_i, \theta_i)$に対応する地図上の格子のインデックス$(I{i, x}^0, I_{i, y}^0)$を計算し、変数indicesに格納します。 $$ \begin{eqnarray} I_{i, x}^0 &=& \left\lfloor \frac{\xi_{0, x} + r_i \cos \left( \xi_{0, \theta} + \delta_\theta j_\theta

    • \theta_i \right)}{r} \right\rfloor \ I_{i, y}^0 &=& \left\lfloor \frac{\xi_{0, y} + r_i \sin \left( \xi_{0, \theta} + \delta_\theta j_\theta
    • \theta_i \right)}{r} \right\rfloor \end{eqnarray} $$ その後compute_score()メソッドを用いて、スコアの上界$\overline{s}(\mathbf{c})$を計算します。 compute_score()メソッドの引数grid_mapは$\mathcal{M}\mathrm{precomp}^{h_0}$、indicesはインデックス${ (I{i, x}^0, I_{i, y}^0) }$、またoffsetは$(-w_x + 2^{h_0} j_x, -w_y + 2^{h_0} j_y)$にそれぞれ対応しています。 $$ \overline{s}(\mathbf{c}) = \sum_{i = 1}^N \mathcal{M}\mathrm{precomp}^{h_0}(I{i, x}^0 - w_x + 2^{h_0} j_x, I_{i, y}^0 - w_y + 2^{h_0} j_y) $$ スコアの上界$\overline{s}(\mathbf{c})$が最も高いノード$\mathbf{c} \in \mathcal{C}0$が先頭に来るように、$\mathcal{C}0$に含まれる全てのノードを優先度付きキュー$\mathcal{C}$に追加していきます。 優先度付きキューには、ノード$\mathbf{c}$の4つのパラメータ$(-w_x + 2^{h_0} j_x, -w_y + 2^{h_0} j_y, j\theta, h_0)$(変数xythetaheight)のほかに、スコアの上界$\overline{s}(\mathbf{c})$(変数score)、そしてインデックス${ (I{i, x}^0, I_{i, y}^0) }$(変数indices)も追加しておきます。
  • 優先度付きキュー$\mathcal{C}$(変数node_queue)が空になるまで、以下の処理を繰り返します。

    • $\mathcal{C}$の先頭から、ノード$\mathbf{c} = (c_x, c_y, c_\theta, h) \in \overline{\mathcal{W}}$とスコアの上界$\overline{s}(\mathbf{c})$、そしてインデックス${ (I_{i, x}^0, I_{i, y}^0) }$を取得します。 $(I_{i, x}^0, I_{i, y}^0)$は、姿勢$\mathbf{\xi}(0, 0, c_\theta)$のもとで計算した、スキャン点$\mathbf{z}i = (r_i, \theta_i)$に対応する格子のインデックスです(上記の説明を確認してください)。 $c_x, c_y, c\theta, h$は変数xytheight、上界$\overline{s}(\mathbf{c})$は変数score、またインデックス${ (I_{i, x}^0, I_{i, y}^0) }$は変数indicesに格納されます。

    • スコアの上界$\overline{s}(\mathbf{c})$が、現在の最大スコア$s^*$(変数best_score)以下であれば、現在のノード$\mathbf{c}$と、その子ノードの探索は不要です(枝刈り)。

    • 現在のノード$\mathbf{c}$が葉ノードである場合(高さ$h = 0$)は、現在の最大スコア$s^$(変数best_score)を$\overline{s}(\mathbf{c})$、また最適解$(j_x^, j_y^, j_\theta^)$(変数best_xbest_ybest_theta)を$(c_x, c_y, c_\theta)$で更新します。

    • 現在のノード$\mathbf{c}$が葉ノードでなければ、4つの子ノード$\mathbf{c}1, \mathbf{c}2, \mathbf{c}3, \mathbf{c}4$に分割します。 $$ \begin{eqnarray} \mathbf{c}1 &=& (c_x, c_y, c\theta, h - 1) \ \mathbf{c}2 &=& (c_x + 2^{h - 1}, c_y, c\theta, h - 1) \ \mathbf{c}3 &=& (c_x, c_y + 2^{h - 1}, c\theta, h - 1) \ \mathbf{c}4 &=& (c_x + 2^{h - 1}, c_y + 2^{h - 1}, c\theta, h - 1) \end{eqnarray} $$ 変数new_xnew_ynew_heightsは、$c_x, c_y, h - 1, 2^{h - 1}$にそれぞれ対応します。 4つの子ノードに対してappend_node()関数が順に呼び出されます。 append_node()関数内では、スコアの上界$\overline{s}(\mathbf{c}1), \overline{s}(\mathbf{c}2), \overline{s}(\mathbf{c}3), \overline{s}(\mathbf{c}4)$を計算します。 $$ \begin{eqnarray} \overline{s}(\mathbf{c}1) &=& \sum{i = 1}^N \mathcal{M}\mathrm{precomp}^{h - 1}( I{i, x}^0 + c_x, I{i, y}^0 + c_y) \ \overline{s}(\mathbf{c}2) &=& \sum{i = 1}^N \mathcal{M}\mathrm{precomp}^{h - 1}( I{i, x}^0 + c_x + 2^{h - 1}, I{i, y}^0 + c_y) \ \overline{s}(\mathbf{c}3) &=& \sum{i = 1}^N \mathcal{M}\mathrm{precomp}^{h - 1}( I{i, x}^0 + c_x, I_{i, y}^0 + c_y + 2^{h - 1}) \ \overline{s}(\mathbf{c}4) &=& \sum{i = 1}^N \mathcal{M}\mathrm{precomp}^{h - 1}( I{i, x}^0 + c_x + 2^{h - 1}, I_{i, y}^0 + c_y + 2^{h - 1}) \ I_{i, x}^0 &=& \left\lfloor \frac{\xi_{0, x} + r_i \cos \left( \xi_{0, \theta} + \delta_\theta c_\theta

      • \theta_i \right)}{r} \right\rfloor \ I_{i, y}^0 &=& \left\lfloor \frac{\xi_{0, y} + r_i \sin \left( \xi_{0, \theta} + \delta_\theta c_\theta
      • \theta_i \right)}{r} \right\rfloor \end{eqnarray} $$ 4つの子ノードは、親ノード$\mathbf{c}$と同様の回転$c_\theta$を表しています。 従って、スコアの上界の計算時にインデックス${ (I_{i, x}^0, I_{i, y}^0) }$を再利用できます。 LiDAR座標系から地図座標系への座標変換と、地図座標から格子のインデックスへの変換の2つを省略できます。 上界の最も大きな子ノードが先頭に来るように、4つの子ノードを優先度付きキューに追加していきます(ノードの4つのパラメータに加えて、上界とインデックスも一緒に追加)。
  • 上記の手続きによって最適解$(j_x^, j_y^, j_\theta^) \in \overline{\mathcal{W}}$が得られたので、最適な姿勢$\mathbf{\xi}^ \in \mathcal{W}$を計算します(変数best_pose)。 $$ \mathbf{\xi}^* = \left[ \xi_{0, x} + r j_x^, \xi_{0, y} + r j_y^, \xi_{0, \theta} + \delta_\theta j_\theta^* \right]^\top \in \mathcal{W} $$

  • スコアの最大値$s^$、最適解$(j_x^, j_y^, j_\theta^) \in \overline{\mathcal{W}}$、最適な姿勢$\mathbf{\xi}^* \in \mathcal{W}$などをまとめて返します。

# coding: utf-8
# scan_matcher_branch_bound.py

import heapq
import math

from typing import Any, List, Tuple

from grid_2d import Grid2D, compute_coarse_map
from point_2d import Point2D
from pose_2d import Pose2D, project_point_2d, wrap_angle

class ScanMatcherBranchBound(object):
    """`ScanMatcherBranchBound` class implements branch-and-bound scan matching,
    which aligns a LiDAR scan and a 2D occupancy grid map. \n
    For more details, please refer to the following paper: \n
    Wolfgang Hess, Damon Kohler, Holger Rapp, and Daniel Andor.
    "Real-time Loop Closure in 2D LiDAR SLAM," in Proceedings of the IEEE
    International Conference on Robotics and Automation (ICRA), 2016.
    """

    class Result(object):
        def __init__(self, win: Tuple[int, int, int],
                     step: Tuple[float, float, float],
                     best: Tuple[int, int, int], best_score: float,
                     initial_pose: Pose2D, best_pose: Pose2D,
                     num_solutions: int, num_processed: int) -> None:
            self.win_x, self.win_y, self.win_theta = win
            self.step_x, self.step_y, self.step_theta = step
            self.best_x, self.best_y, self.best_theta = best
            self.best_score = best_score
            self.initial_pose = initial_pose
            self.best_pose = best_pose
            self.num_solutions = num_solutions
            self.num_processed = num_processed

        def dump(self) -> None:
            msg = "Statistics for branch-and-bound scan matcher\n" \
                  "Window size: ({}, {}, {})\n" \
                  "Step size: ({:.6f} m, {:.6f} m, {:.6f} rad)\n" \
                  "Best solution: ({}, {}, {})\n" \
                  "Best score: {}\n" \
                  "Initial pose: ({:.6f} m, {:.6f} m, {:.6f} rad)\n" \
                  "Final pose: ({:.6f} m, {:.6f} m, {:.6f} rad)\n" \
                  "# of possible solutions: {}\n" \
                  "# of solutions processed: {}\n"
            print(msg.format(self.win_x * 2, self.win_y * 2, self.win_theta * 2,
                             self.step_x, self.step_y, self.step_theta,
                             self.best_x, self.best_y, self.best_theta,
                             self.best_score,
                             self.initial_pose.x, self.initial_pose.y,
                             self.initial_pose.theta,
                             self.best_pose.x, self.best_pose.y,
                             self.best_pose.theta,
                             self.num_solutions, self.num_processed))

    def __init__(self, window_size_x: float, window_size_y: float,
                 window_size_theta: float, min_step_theta: float,
                 max_node_height: int) -> None:
        self.window_size_x = window_size_x
        self.window_size_y = window_size_y
        self.window_size_theta = window_size_theta
        self.min_step_theta = min_step_theta
        self.max_node_height = max_node_height
        self.max_stride = 1 << self.max_node_height

    def compute_step(self, grid_map: Grid2D, scan: List[Point2D]):
        ranges = [(p.x ** 2 + p.y ** 2) for p in scan]
        theta = grid_map.resolution() / max(ranges)

        step_x = grid_map.resolution()
        step_y = grid_map.resolution()
        step_theta = math.acos(1.0 - 0.5 * (theta ** 2))
        step_theta = max(step_theta, self.min_step_theta)

        return step_x, step_y, step_theta

    def compute_window(self, step_x: float, step_y: float, step_theta: float):
        return int(math.ceil(0.5 * self.window_size_x / step_x)), \
               int(math.ceil(0.5 * self.window_size_y / step_y)), \
               int(math.ceil(0.5 * self.window_size_theta / step_theta))

    def compute_score(self, grid_map: Grid2D, indices: List[Tuple[int, int]],
                      offset: Tuple[int, int]) -> int:
        score = 0
        for idx_base in indices:
            idx = (idx_base[0] + offset[0], idx_base[1] + offset[1])
            if grid_map.is_index_inside(*idx):
                score += grid_map.get_index_value(*idx)
        return score

    def match_scan(self, initial_pose: Pose2D, grid_map: Grid2D,
                   scan: List[Point2D]) -> Tuple[Pose2D, Any]:
        # Determine the search step and window size
        step = self.compute_step(grid_map, scan)
        win = self.compute_window(*step)
        step_x, step_y, step_theta = step
        win_x, win_y, win_theta = win
        best_x, best_y, best_theta, best_score = 0, 0, 0, 0

        num_solutions = (2 * win_x) * (2 * win_y) * (2 * win_theta)
        num_processed = 0

        # Compute coarse grid maps, which are analogous to image pyramids
        grid_maps = [grid_map]

        for i in range(self.max_node_height):
            grid_maps.append(compute_coarse_map(grid_map, 2 << i))

        # `node_queue` is a priority queue of nodes, where each node represents
        # a subregion in the 3D search window along x, y, and theta axes
        node_queue = []

        def append_node(x: int, y: int, theta: int,
                        height: int, indices: List[Tuple[int, int]]):
            score = self.compute_score(grid_maps[height], indices, (x, y))

            if score > best_score:
                # Store the negative score as a key, since `heapq.heappop()`
                # returns a node with the smallest key
                heapq.heappush(node_queue,
                    (-score, (x, y, theta, height, indices, score)))

        # Initialize a priority queue
        for t in range(-win_theta, win_theta):
            pose = Pose2D(initial_pose.x, initial_pose.y,
                          initial_pose.theta + t * step_theta)
            points = [project_point_2d(pose, p) for p in scan]
            indices = [grid_map.point_to_index(p) for p in points]

            for y in range(-win_y, win_y, self.max_stride):
                for x in range(-win_x, win_x, self.max_stride):
                    append_node(x, y, t, self.max_node_height, indices)

        while node_queue:
            # Get the node from the priority queue
            _, (x, y, t, height, indices, score) = heapq.heappop(node_queue)
            num_processed += 1

            if score <= best_score:
                # Skip the node if the score is below the best score so far
                continue

            if height == 0:
                # If the current node is a left, then update the solution
                best_x, best_y, best_theta, best_score = x, y, t, score
            else:
                # Otherwise, split the current node into four new nodes
                new_x, new_y, new_height = x, y, height - 1
                # Compute a new stride
                s = 1 << new_height
                append_node(new_x, new_y, t, new_height, indices)
                append_node(new_x + s, new_y, t, new_height, indices)
                append_node(new_x, new_y + s, t, new_height, indices)
                append_node(new_x + s, new_y + s, t, new_height, indices)

        best_pose = Pose2D(initial_pose.x + best_x * step_x,
                           initial_pose.y + best_y * step_y,
                           initial_pose.theta + best_theta * step_theta)
        best_pose.theta = wrap_angle(best_pose.theta)

        result = ScanMatcherBranchBound.Result(
            win, step, (best_x, best_y, best_theta), best_score,
            initial_pose, best_pose, num_solutions, num_processed)

        return best_pose, result

分枝限定法によるスキャンマッチングの動作例

その他の実装はGitHubのリポジトリを参照してください。 ここでは幾つかのデータセットを使った動作例を示します。 青色で示したスキャンは、姿勢の初期値を使って地図上に投影したものです(match_scan()メソッドの引数initial_pose)。 また赤色で示したスキャンは、姿勢の解を使って描画しています(match_scan()メソッドの変数best_pose)。

初期値と最適解がかなり大きく離れていますが、正しく動作していることが分かります。 ガウス・ニュートン法やレーベンバーグ・マーカート法などの、勾配を基にした逐次的なマッチング手法では、最適解への収束は期待できないでしょう。

  • データセット7

  • データセット8

  • データセット18

  • データセット25

  • データセット32

データセット18の例では、探索範囲のサイズを$25 \mathrm{m} \times 25 \mathrm{m} \times 0.2 \mathrm{rad}$、ステップサイズを$0.05 \mathrm{m}, 0.0025 \mathrm{rad}$に設定しているので、20,000,000個もの解の候補が存在します。 しかし、分枝限定法で調べたノード数は11,252だったので(ノードの高さは最大6に設定)、解の候補のうちたったの0.056%を探索するだけで済んでいます。 言い換えると、全ての解の候補をしらみつぶしに調べる場合と比べて、1,777倍高速化されているということです。

これらを試すには、リポジトリ内のmain.pyを実行してください。 main()関数の先頭に書かれている変数data_idxは、データセットの番号(7、8、18、25、32のいずれか)を表します。 また変数perturb_xperturb_yperturb_thetaは、初期値を最適解からどれだけ移動させるかを決定します。