コンピュータサイエンス系勉強ノート

計算機科学に限らず日々学んだことを色々まとめていきます

確率伝播法(Belief Propagation)を実装してみた(Python)

確率伝播法とは?

確率伝播法とは信念伝播法(Belief Propagation)ともよばれ,ベイジアンネットワークやマルコフ確率場などのグラフィカルモデル上で各ノードが持つ状態の周辺分布を効率的に求めるためのアルゴリズムです.元々はこの周辺分布を求めようとするとノード数がNで状態数がKとすると,計算量が O(K^N)となり,ノード数が多くなると有限時間で計算が出来なくなります.でもこの確率伝播法を使うと O(NK^2)となり有限時間で計算できるようになります.この周辺分布が求められると何が便利かというと,グラフの構造によっては各ノードの最適な状態が周辺分布を使って求められたり,最適解でなくともそれに近い解を求められることです.今回はPythonでこの確率伝播法を組んでみたので順を追ってそのコードの解説を行っていきたいと思います.確率伝播法に関する数学的な説明はググったらいくらでも出てくると思うのでそれに関してはそちらの方に譲りたいと思います.

使用データ

f:id:clientver2:20160812012057p:plain

今回は二値化したレナさんの画像とそれにノイズを掛けた画像を使用します.このノイズがかかった画像を確率伝播法を用いてデノイジングを行い原画像を復元します.

ソースコード解説

まずは原画像に対してノイズを掛けます.コードは以下の様になります.

def addNoise(image):
    output = np.copy(image)
    flags  = np.random.binomial(n=1, p=0.05, size=image.shape)

    for i in range(image.shape[0]):
        for j in range(image.shape[1]):
            if flags[i,j]:
                output[i,j] = not(output[i,j])

    return output

次に画像中の各画素をマルコフ確率場上のノードとみなし,マルコフ確率場を構築します.まずマルコフ確率場クラスを作成します.

class MRF:
    def __init__(self):
        self.nodes = [] #MRF上のノード
        self.id = {} #ノードのID
    
    #MRFにノードを追加する
    def addNode(self, id, node):
        self.nodes.append(node)
        self.id[id] = node
    
    #IDに応じたノードを返す
    def getNode(self, id):
        return self.id[id]
    
    #全部のノードを返す
    def getNodes(self):
        return self.nodes
    
    #確率伝播を開始する
    def beliefPropagation(self, iter=20):
        
        #各ノードについて隣接ノードからのメッセージを初期化
        for node in self.nodes:
            node.initializeMessage()
        
        #一定回数繰り返す
        for t in range(iter):
            print(t)
            
            #各ノードについて,そのノードに隣接するノードへメッセージを送信する
            for node in self.nodes:
                for neighbor in node.getNeighbor():
                    neighbor.message[node] = node.sendMessage(neighbor)
        
        #各ノードについて周辺分布を計算する
        for node in self.nodes:
            node.marginal()

このMRFクラスで特に注目すべきはbeliefPropagation関数です.画像上でのマルコフ確率場はループ構造をもったネットワークなのでメッセージ送信を複数回繰り返す必要があります.今回はそれをiter回繰り返します.送信を繰り返す前に各ノードが隣接するノードから受け取るメッセージを1で初期化しておきます.送信ループに入った後は隣接ノードへsendMessageメソッドでメッセージを送信し続け,最後に各ノードについて自分が持つ隣接ノードからのメッセージを統合し,周辺分布を計算します.これを行うのがmarginalメソッドです.


次にノードクラスを定義します.

class Node(object):
    def __init__(self, id):
        self.id = id
        self.neighbor = []
        self.message = {}
        self.prob = None
        
        #エネルギー関数用パラメータ
        self.alpha = 10.0
        self.beta = 5.0

    def addNeighbor(self, node):
        self.neighbor.append(node)

    def getNeighbor(self):
        return self.neighbor
    
    #隣接ノードからのメッセージを初期化
    def initializeMessage(self):
        for neighbor in self.neighbor:
            self.message[neighbor] = np.array([1.0, 1.0])
    
    #全てのメッセージを統合
    #probは周辺分布
    def marginal(self):
        prob = 1.0

        for message in self.message.values():
            prob *= message

        prob /= np.sum(prob)
        self.prob = prob
    
    #隣接ノードの状態を考慮した尤度を計算
    def sendMessage(self, target):
        neighbor_message = 1.0
        for neighbor in self.message.keys():
            if neighbor != target:
                neighbor_message *= self.message[neighbor]

        compatibility_0 = np.array([np.exp(-self.beta * np.abs(0.0 - 0.0)), np.exp(-self.beta * np.abs(0.0 - 1.0))])
        compatibility_1 = np.array([np.exp(-self.beta * np.abs(1.0 - 0.0)), np.exp(-self.beta * np.abs(1.0 - 1.0))])

        message = np.array([np.sum(neighbor_message * compatibility_0), np.sum(neighbor_message * compatibility_1)])
        message /= np.sum(message)

        return message
    
    #観測値から計算する尤度
    def calcLikelihood(self, value):
        likelihood = np.array([0.0, 0.0])

        if value == 0:
            likelihood[0] = np.exp(-self.alpha * 0.0)
            likelihood[1] = np.exp(-self.alpha * 1.0)
        else:
            likelihood[0] = np.exp(-self.alpha * 1.0)
            likelihood[1] = np.exp(-self.alpha * 0.0)

        self.message[self] = likelihood


ここの詳しい解説は気が向いたら載せます!!

次はマルコフ確率場を構築するための関数を作成します.

#各画素ごとにノードを作成し,隣接画素ノードとの接続を作成する
def generateBeliefNetwork(image):
    network = MRF()
    height, width = image.shape

    for i in range(height):
        for j in range(width):
            nodeID = width * i + j
            node = Node(nodeID)
            network.addNode(nodeID, node)

    dy = [-1, 0, 0, 1]
    dx = [0, -1, 1, 0]

    for i in range(height):
        for j in range(width):
            node = network.getNode(width * i + j)

            for k in range(4):
                if i + dy[k] >= 0 and i + dy[k] < height and j + dx[k] >= 0 and j + dx[k] < width:
                    neighbor = network.getNode(width * (i + dy[k]) + j + dx[k])
                    node.addNeighbor(neighbor)

    return network


最後はメイン関数です

import numpy as np
import cv2
import matplotlib.pyplot as plt
from skimage.filters import threshold_otsu

def main():
    #使用データ
    image = cv2.imread("Lenna.png", 0)
    binary = image > threshold_otsu(image).astype(np.int)
    noise = addNoise(binary)
    
    #MRF構築
    network = generateBeliefNetwork(image)
    
    #観測値(画素値)から尤度を作成
    for i in range(image.shape[0]):
        for j in range(image.shape[1]):
            node = network.getNode(image.shape[1] * i + j)
            node.calcLikelihood(noise[i,j])
    
    #確率伝播法を行う
    network.beliefPropagation()
    
    #周辺分布は[0の確率,1の確率]の順番
    #もし1の確率が大きければoutputの画素値を1に変える
    output = np.zeros(noise.shape)

    for i in range(output.shape[0]):
        for j in range(output.shape[1]):
            node = network.getNode(output.shape[1] * i + j)
            prob = node.prob
            if prob[1] > prob[0]:
                output[i,j] = 1
    
    #結果表示
    plt.gray()
    plt.subplot(121)
    plt.imshow(noise)
    plt.subplot(122)
    plt.imshow(output)
    plt.show()

実行結果

f:id:clientver2:20160812022405p:plain

確かに殆どのノイズが除去されて原画像が復元されていることが分かります.数式自体は結構難しいですが,プログラムで組んでみると結構簡単に組めます.今回は状態数が2値の離散的な場合を取り扱いましたが,トラッキングなどの連続的な状態を取り扱おうとすると連続変数版の確率伝播法を用いる必要があります.様々なものが提案されていますが,比較的簡単に組めるMean shift Belief Propagationを次回は組んでみたいと思います.

マルコフ確率場を用いたノイズ除去を組んでみた(Python)

確率伝播法を使う必要がでてきたので,まずはマルコフ確率場についての勉強をしました.今回はマルコフ確率場上で画像上のノイズを除去するサンプルプログラムを組んでみました.

ソースコード

#encoding:utf-8
import numpy as np
import cv2
import matplotlib.pyplot as plt
from skimage.filters import threshold_otsu

def MRF(image):
    iter = 30
    height, width = image.shape
    dy = [-1,-1,-1,0,0,1,1,1]
    dx = [-1,0,1,-1,1,-1,0,1]
    reference = np.copy(image)
    alpha = 2.5
    beta = 4.0

    for t in range(iter):
        print(t)

        prob0 = np.zeros(reference.shape)
        prob1 = np.zeros(reference.shape)

        for i in range(1, height - 1):
            for j in range(1, width - 1):
                for k in range(len(dy)):
                    if reference[i+dy[k],j+dx[k]] == 1:
                        prob1[i,j] -= beta
                        prob0[i,j] += beta
                    else:
                        prob1[i,j] += beta
                        prob0[i,j] -= beta

        prob1[image == 1] -= alpha
        prob0[image == 0] -= alpha

        reference[prob1 > prob0] = 0
        reference[prob1 < prob0] = 1

    return reference


def get_corrupted_input(img, corruption_level):
    corrupted = np.copy(img)
    inv = np.random.binomial(n=1, p=corruption_level, size=img.shape)
    for r in range(img.shape[0]):
        for c in range(img.shape[1]):
            if inv[r, c]:
                corrupted[r, c] = ~(corrupted[r, c].astype(bool))
    return corrupted

def main():
    image = cv2.imread("Lenna.png", 0)
    binary = image > threshold_otsu(image)
    noise = get_corrupted_input(binary.astype(np.int), 0.05)
    denoise = MRF(noise)

    plt.gray()
    plt.subplot(131)
    plt.imshow(image)
    plt.subplot(132)
    plt.imshow(noise)
    plt.subplot(133)
    plt.imshow(denoise)
    plt.show()

if __name__ == '__main__':
    main()

実行結果

f:id:clientver2:20160622005909p:plain

オプティカルフローの実装(Python)

オプティカルフローを求めるための方法の1つであるLucas Kanade法を実装してみました.

勾配法

Lucal Kanade法を実装する前にまず勾配法について学ぶ必要があります.勾配法とは,連続する2枚の画像において追跡対象の物体の移動量は微小であると仮定し,オプティカルフローを求める方法です.まず時刻tにおける画像上の輝度値をI(x,y,t)としましょう.そして時間Δtの間にその画素が(Δx,Δy)動いたとしましょう.すると,

 I(x,y,t) = I(x + Δx, y + Δy, t + Δt) (1)

そしてIの微小変化を表現するするために,(1式)をテイラー展開で一次近似します.x,y,tが変化した分だけIも変化しその増分が加えられることを意味しています

 I(x,y,t) ≒ I(x,y,t) + \frac{∂I}{∂x}Δx + \frac{∂I}{∂y}Δy + \frac{∂I}{∂t}Δt (2)

ここで(2)について両辺のI(x,y,t)はほぼ同じとみなせるので移行して打ち消せます.そしてΔtで割ると,

\frac{Δx}{Δt}\frac{∂I}{∂x}+\frac{Δy}{Δt}\frac{∂I}{∂y}+\frac{∂I}{∂t}=0 (3)

そしてΔtが限りなく0に近いと仮定すると微分の形にもっていけます.つまり,

\frac{∂x}{∂t}\frac{∂I}{∂x}+\frac{∂y}{∂t}\frac{∂I}{∂y}+\frac{∂I}{∂t}=0 (4)

\frac{∂x}{∂t}=u\frac{∂y}{∂t}=vが時間変化に対する画素の移動方向を表しており,これがオプティカルフローとなります.また,\frac{∂I}{∂t}=I_x\frac{∂I}{∂y}=I_yが時間変化に対する輝度値の変化となります.そして最終的には,


I_xu+I_yv=-I_t (5)

と表され,時間変化に対するIの変化は,画素値の勾配と画素のオプティカルフローとの内積で表されることになります.

ここでオプティカルフロー(u,v)を求めたいわけですが,未知変数2つに対して式が一つしかありません.なのでこのままでは解けません.しかし,Lucal Kanade法ではある仮定を置くことで式の数を増やして(u,v)を求めることができます.


Lucal Kanade法

ではその仮定とは何かというと,注目画素の周辺の局所領域では(u,v)の値は同じだという仮定です.


f:id:clientver2:20160601001357p:plain:w500

こっからの数式は気力がなくなったのでそのうち書きます.今のところは他のサイトを参照・・

ソースコード

汚いコードですんません.なんか間違ってる気がしますが大体こんな感じです.

from scipy import ndimage
import cv2
import numpy as np

def opticalFlow(pre, next):
    image1 = cv2.cvtColor(pre, cv2.COLOR_RGB2GRAY).astype(np.float)
    image2 = cv2.cvtColor(next, cv2.COLOR_RGB2GRAY).astype(np.float)
    
    #輝度勾配
    dx = ndimage.gaussian_filter(image1, sigma=1.0, order=(0,1))
    dy = ndimage.gaussian_filter(image1, sigma=1.0, order=(1,0))
    
    #Iの時間変化 つまり次の時間の画像と前時間の画像との差分が時間変化
    de = image2 - image1

    height, width = image1.shape

    for i in range(1, height-1, 20):
        for j in range(1, width-1, 20):
            #局所領域(3*3)
            patchDx = dx[i-1:i+1+1,j-1:j+1+1]
            patchDy = dy[i-1:i+1+1,j-1:j+1+1]
            patchDe = de[i-1:i+1+1,j-1:j+1+1]

            A = np.zeros((2,2))
            A[0,0] = np.sum(patchDx**2)
            A[0,1] = np.sum(patchDx * patchDy)
            A[1,0] = A[0,1]
            A[1,1] = np.sum(patchDy**2)

            B = np.zeros((2))
            B[0] = -np.sum(patchDx * patchDe)
            B[1] = -np.sum(patchDy * patchDe)
            
            #ランクが2じゃないと逆行列もとめられん
            if np.linalg.matrix_rank(A) == 2:
                u, v = np.linalg.solve(A, B)
                #フローを描画
                next = cv2.line(next, (j,i), (int(j+u), int(i+v)), (255,0,0), 1)

    return next


def main():
    cap = cv2.VideoCapture(0)
    cv2.namedWindow("frame")
    preFrame = None

    while(True):
        ret, frame = cap.read()

        if preFrame != None:
            flow = opticalFlow(preFrame, frame)
            cv2.imshow("frame", flow)

        if cv2.waitKey(20) & 0xFF == 27:
            break

        preFrame = frame
    cap.release()
    cv2.destroyAllWindows()

main()

平均情報量でクラスタリングの曖昧さを図ってみる(python)

混合ガウス分布クラスタリングを行う際に任意の点がどのクラスタに属するかの所属度を使って,その点がどのクラスタに属するかの曖昧さを平均情報量で求めてみました.何かに使えるかもしれないのでメモっておきます

まずサンプル点を生成する

まずクラスタリング用のサンプル点を生成します.4クラスタに分かれてます.

#encoding:utf-8

#平均
mu1 = [-2,-2]
mu2 = [2,2]
#共分散
cov = [[0.1,0.0],[0.0,0.1]]

#500はデータ数
x1,y1 = np.random.multivariate_normal(np.array([1.0,1.0]),cov,2000).T
x2,y2 = np.random.multivariate_normal(np.array([1.0,3.0]),cov,2000).T
x3,y3 = np.random.multivariate_normal(np.array([3.0,1.0]),cov,2000).T
x4,y4 = np.random.multivariate_normal(np.array([3.0,3.0]),cov,2000).T

plt.plot(x1,y1, "ro")
plt.plot(x2,y2, "go")
plt.plot(x3,y3, "bo")
plt.plot(x4,y4, "yo")
plt.show()

f:id:clientver2:20160217173106p:plain

混合ガウスモデルでサンプル点の所属度を求める

#encoding:utf-8

#平均
mu1 = [-2,-2]
mu2 = [2,2]
#共分散
cov = [[0.1,0.0],[0.0,0.1]]

#500はデータ数
x1,y1 = np.random.multivariate_normal(np.array([1.0,1.0]),cov,2000).T
x2,y2 = np.random.multivariate_normal(np.array([1.0,3.0]),cov,2000).T
x3,y3 = np.random.multivariate_normal(np.array([3.0,1.0]),cov,2000).T
x4,y4 = np.random.multivariate_normal(np.array([3.0,3.0]),cov,2000).T

Y = np.concatenate([y1,y2,y3,y4])
X = np.concatenate([x1,x2,x3,x4])
N = np.vstack([Y,X]).T

n_components = 4
gmm = sklearn.mixture.GMM(n_components, covariance_type='full')
gmm.fit(N)

Y, X = np.meshgrid(np.linspace(1,3,500), np.linspace(1,3,500))
prob = gmm.predict_proba(np.vstack([Y.reshape(-1), X.reshape(-1)]).T)
prob = np.sum(-prob * np.log2(prob), axis=1).reshape(500,500)
plt.imshow(prob)
plt.show()

f:id:clientver2:20160217173119p:plain

考察

平均情報量の値が高い部分が十字状に広がっていることが分かります.また,平均情報量の値は中心付近が最も高くなっていますね.これは中心付近がどのクラスタに属するのかが曖昧だということを示しています.どのクラスタ中心にも距離的に近いのでどのクラスタに属しているかははっきりとは分からないということです.

パーティクルフィルタ組んでみた(Python)

研究でトラッキングをする必要がでてきたので,トラッキング分野では非常によく用いられているパーティクルフィルタについて勉強しました.それについてまとめておこうと思います.

パーティクルフィルタとは?

時系列データ中の観測データがあるパラメータに従って確率的に生成されるとします.その観測値だけを元にパラメータを推定したいとします.パーティクルフィルタでは複数のパラメータ(パーティクルもしくは粒子とよばれる)を生成して,このパラメータが観測値を生成するのに相応しいか尤度を計算して有望なパラメータだけを残し,残ったパラメータの平均をとることで真のパラメータを推定します.最尤推定法などでは解析的に尤度の高いパラメータを推定しますが,パーティクルフィルタだと解析的に求められない場合でもパラメータを推定できます.

パーティクルフィルタの流れ

パーティクルフィルタは以下のような流れでパラメータの推定を行います.

  1. パーティクル初期化
  2. パーティクルの状態遷移
  3. 各パーティクルの重み(尤度)計算
  4. 重みの正規化
  5. パーティクルのリサンプリング
  6. 期待値計算
  7. 2~6を繰り返す

まず①でパーティクルを初期化します.適当に玉をばらまいておくイメージです.次に②でパーティクルの状態を変化させます.パラメータは時間経過に伴ってなんらかの規則に従って変化するかもしれません.その規則をモデル化してこのパーティクルの状態遷移に組み込みます.例えば物体のトラッキングをしているときには時間経過に従って物体は移動します.等速で適当に動くかもしれません.その場合は適当な速度を位置パラメータに加えることになります.③の尤度計算では各パーティクルの尤度を計算します.④で重み(尤度)を正規化して⑤でパーティクルのリサンプリングを行います.大抵のリサンプリングでは重みにしたがって確率的にパーティクルを選びます.選ばれなかったパーティクルは捨てられることになります.最後に⑥の期待値計算を行います.リサンプリング後のパーティクルの平均値で計算できます.その後同じように②~⑥を繰り返すことになります.

パーティクルフィルタでトラッキングしてみる

パーティクルフィルタのpythonコードは以下の様になります.汚くてすいません.

#encoding:utf-8
import numpy as np
import cv2

def tracking():
    #カメラ
    cap = cv2.VideoCapture(0)
    #パーティクルフィルタ初期化
    filter = ParticleFilter()
    filter.initialize()

    while True:
        ret, frame = cap.read()
        gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        y, x = filter.filtering(gray)#トラッキング
        frame = cv2.circle(frame, (int(x), int(y)), 10, (0, 0, 255), -1)
        for i in range(filter.SAMPLEMAX):
            frame = cv2.circle(frame, (int(filter.X[i]), int(filter.Y[i])), 2, (0, 0, 255), -1)
        cv2.imshow("frame", frame)
        if cv2.waitKey(20) & 0xFF == 27:
            break
    cap.release()
    cv2.destroyAllWindows()

#パーティクルフィルタクラス
class ParticleFilter:
    def __init__(self):
        self.SAMPLEMAX = 1000
        self.height, self.width = 480, 640#フレーム画像のサイズ

    #パーティクル初期化
    #画像全体にまく
    def initialize(self):
        self.Y = np.random.random(self.SAMPLEMAX) * self.height
        self.X = np.random.random(self.SAMPLEMAX) * self.width

    #パーティクルの状態の更新 物体が適当な速さで適当に動くと仮定
    def modeling(self):
        self.Y += np.random.random(self.SAMPLEMAX) * 20 - 10
        self.X += np.random.random(self.SAMPLEMAX) * 20 - 10

    #重み正規化
    def normalize(self, weight):
        return weight / np.sum(weight)

    #パーティクルリサンプリング
    #重みに従ってパーティクルを選択
    #残ったパーティクルのインデックスを返す
    def resampling(self, weight):
        index = np.arange(self.SAMPLEMAX)
        sample = []

        for i in range(self.SAMPLEMAX):
            idx = np.random.choice(index, p=weight)
            sample.append(idx)

        return sample

    #尤度計算
    #画像の外に飛んでいったパーティクルは重み0
    #白い物体を仮定
    def calcLikelihood(self, image):
        mean, std = 250.0, 10.0
        intensity = []

        for i in range(self.SAMPLEMAX):
            y, x = self.Y[i], self.X[i]
            if y >= 0 and y < self.height and x >= 0 and x < self.width:
                intensity.append(image[y,x])
            else:
                intensity.append(-1)

        weights = 1.0 / np.sqrt(2 * np.pi * std) * np.exp(-(np.array(intensity) - mean)**2 /(2 * std**2))
        weights[intensity == -1] = 0
        weights = self.normalize(weights)
        return weights

    #トラッキング開始
    #期待値を返す
    def filtering(self, image):
        self.modeling()
        weights = self.calcLikelihood(image)
        index = self.resampling(weights)
        self.Y = self.Y[index]
        self.X = self.X[index]
        
        return np.sum(self.Y) / float(len(self.Y)), np.sum(self.X) / float(len(self.X))

tracking()

実行結果

実行した際のスナップショットです.白い物体にパーティクルが集まっていることが分かります.早く動かしすぎるとパーティクルがどっかいきます.ゆっくり動かしてあげるとそのまま白い物体の上にパーティクルが残ったままになります.とりあえずトラッキングできたことにしときましょう.

f:id:clientver2:20160208195111p:plain

Iterative Closest Point を試してみた

今回はIterative Closest Point(ICP)アルゴリズムを試してみました.ICPは点群Aと点群Bが与えられた際に点群Bを回転と平行移動させて点群Aに位置合わせするためのアルゴリズムだそうです。

具体的なアルゴリズムは以下の通り。

  1. 点群Bの各点について点群Aの中から最近傍の点を見つける。
  2. 点群Bの各点とAの各点が対応しているので、各対応点に移るような変換行列を計算する。
  3. 変換行列を使ってBを写像する。
  4. 指定回数分もしくは収束するまで以上の処理を繰り返す。

f:id:clientver2:20151105132537p:plain

ソースコード

各ステップでの回転行列と平行移動行列の計算は以下のコードで計算しました.コードの詳細は以下のサイトを参照のこと.
Finding optimal rotation and translation between corresponding 3D points | Nghia Ho

def calcRigidTranformation(MatA, MatB):
    A, B = copy(MatA), copy(MatB)

    centroid_A = mean(A, axis=0)
    centroid_B = mean(B, axis=0)

    A -= centroid_A
    B -= centroid_B

    H = dot(A.T, B)
    U, S, V = linalg.svd(H)
    R = dot(V.T, U.T)
    T = dot(-R, centroid_A) + centroid_B

    return R, T

ICPのコードはこちら

from scipy.spatial import KDTree
import numpy as np
from math import sin, cos

class ICP(object):
    def __init__(self, pointsA, pointsB):
        self.pointsA = pointsA
        self.pointsB = pointsB
        self.kdtree = KDTree(self.pointsA)

    def calculate(self, iter):
        old_points = np.copy(self.pointsB)
        new_points = np.copy(self.pointsB)

        for i in range(iter):
            neighbor_idx = self.kdtree.query(old_points)[1]
            targets = self.pointsA[neighbor_idx]
            R, T = calcRigidTranformation(old_points, targets)
            new_points = np.dot(R, old_points.T).T + T

            if  np.sum(np.abs(old_points - new_points)) < 0.000000001:
                break

            old_points = np.copy(new_points)

        return new_points

def icp_test():
    Y, X = np.mgrid[0:100:5, 0:100:5]
    Z = Y ** 2 + X ** 2
    A = np.vstack([Y.reshape(-1), X.reshape(-1), Z.reshape(-1)]).T

    R = np.array([
        [cos(50), -sin(50), 0],
        [sin(50), cos(50), 0],
        [0, 0, 1]
    ])

    T = np.array([5.0, 20.0, 10.0])
    B = np.dot(R, A.T).T + T

    icp = ICP(A, B)
    points = icp.calculate(3000)

    from matplotlib import pyplot
    from mpl_toolkits.mplot3d import Axes3D

    fig = pyplot.figure()
    ax = Axes3D(fig)

    ax.set_label("x - axis")
    ax.set_label("y - axis")
    ax.set_label("z - axis")

    ax.plot(A[:,1], A[:,0], A[:,2], "o", color="#cccccc", ms=4, mew=0.5)
    ax.plot(points[:,1], points[:,0], points[:,2], "o", color="#00cccc", ms=4, mew=0.5)
    ax.plot(B[:,0], B[:,1], B[:,2], "o", color="#ff0000", ms=4, mew=0.5)

    pyplot.show()

icp_test()

出力結果

f:id:clientver2:20151127160400p:plain
f:id:clientver2:20151127160401p:plain

灰色の点が元々の点群Aで,赤いのがそれを変換した点群Bです.黄緑色の点はBをAに位置合わせしたもので,ちゃんと重なっていることが分かります.

平均値シフトでトラッキングしてみた(Python)

平均値シフトでトラッキングをしてみました.理屈はよくわかりませんが,最初にトラッキング対象を囲むようなウインドウを作成し,ウインドウ内でトラッキング対象の特徴量を計算してあげます.フレームが更新されるとウインドウ内の各画素でどれくらい特徴量に近いかの重みを計算してあげて,重みを使って重心を計算し,その重心を中心としたウインドウを作成することを繰り返してトラッキング対象を追跡するそうです.

重み計算

ウインドウの中心で一番重みが大きく,中心から離れるにつれて重みが小さくなるような関数を使用するそうです.すぐ思いつくのはガウス関数だと思います.ガウス関数だと下図のような感じで重みが広がっています.

import scipy.ndimage.filters as fi
import matplotlib.pyplot as plt

input = np.zeros((500, 500))
input[500/2, 500/2] = 1
output = fi.gaussian_filter(input, 100)

plt.imshow(output)
plt.show()

f:id:clientver2:20151122134834p:plain

こういった重み関数をカーネル関数とよぶらしく,収束を保証するためのものらしいです.さらにこの重みにトラッキング対象の特徴量を表す重みを掛けてあげる必要があります.今回は初期ウインドウ内で正規化ヒストグラムを使って,どの画素値をもっていればトラッキング対象らしいかを表す確率のようなものを計算してあげます.画像処理の世界ではバックプロジェクションとよぶそうです.

最終的に重心計算の際には,ウインドウ内の各画素を x_i (i=0...N)カーネル関数の重みを g_i,特徴量の重みを w_iとすると重心 Pは,

 P = \frac{\sum_{i=0}^{N}x_iw_ig_i}{\sum_{i=0}^{N}w_ig_i}
みたいな感じになるそうです.求めた重心を中心にまたウインドウを作成し,また重心を求めるを繰り返して物体を追跡します.

プログラムを組む

プログラムは以下の様になります.opencvのカメラ処理のところはどうでもいいです.

import numpy as np
import cv2
import scipy.stats as st

class CameraGui(object):
    def __init__(self):
        self.pos1, self.pos2 = None, None
        self.image = None
        self.patch_height, self.patch_width = None, None
        self.kernel = None

    def start(self):
        cap = cv2.VideoCapture(0)
        cv2.namedWindow("frame")
        cv2.setMouseCallback("frame", self.mouseFunc)

        while(True):
            ret, frame = cap.read()
            self.image = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
            draw = self.tracking(self.image)
            if draw != None:
                cv2.imshow("frame", draw)
            else:
                cv2.imshow("frame", self.image)
            if cv2.waitKey(20) & 0xFF == 27:
                break
        self.cap.release()
        cv2.destroyAllWindows()

    def mouseFunc(self, event, x, y, flags, param):
        if event == cv2.EVENT_LBUTTONDOWN:
            if self.pos1 == None:
                self.pos1 = [y, x]
            elif self.pos2 == None:
                self.pos2 = [y, x]
                self.hist = cv2.calcHist([self.image[self.pos1[0]:self.pos2[0], self.pos1[1]:self.pos2[1]]], [0], None, [256], [0, 256]).astype(np.float)
                self.hist /= np.sum(self.hist)
                self.patch_height = self.pos2[0] - self.pos1[0]
                self.patch_width = self.pos2[1] - self.pos1[1]
                self.kernel = self.calc_gkern(self.patch_height, self.patch_width)

    def tracking(self, image):
        if self.pos1 != None and self.pos2 != None:
            center = self.calcWeight(image)
            draw = cv2.cvtColor(np.copy(image), cv2.COLOR_GRAY2RGB)
            pt1 = (center[0] - self.patch_height / 2, center[1] - self.patch_width / 2)
            pt2 = (center[0] + self.patch_height / 2, center[1] + self.patch_width / 2)
            draw = cv2.circle(draw, (center[1], center[0]), 50, (0, 0, 255), -1)
            self.pos1 = pt1
            self.pos2 = pt2
            return draw
        return None

    def calcWeight(self, image):
        patch = image[self.pos1[0]:self.pos1[0]+self.patch_height,self.pos1[1]:self.pos1[1]+self.patch_width]
        height, width = patch.shape
        weight = np.copy(self.kernel)

        #back projection
        for i in range(height):
            for j in range(width):
                weight[i, j] *= self.hist[patch[i, j]]
    
    #calculate center of mass
        Y, X = np.mgrid[:height, :width]
        Y, X = Y * weight, X * weight
        W = np.sum(weight)
        centerY, centerX = np.sum(Y) / W, np.sum(X) / W

        return [int(self.pos1[0] + centerY), int(self.pos1[1] + centerX)]

    def calc_gkern(self, height, width):
        import scipy.ndimage.filters as fi
        inp = np.zeros((height, width))
        inp[height//2, width//2] = 1
        return fi.gaussian_filter(inp, 50)

def main():
    gui = CameraGui()
    gui.start()

main()

結果

一応トラッキングはできました.物体を早く動かすとダメでした.適当に組んでるので改良の余地は大いにあります.気が向いたらちゃんと組みます.