Hankel Singular Value Decompositionによる時系列データの前処理
HSVD
Hankel Singular Value Decomposition(HSVD)は時系列データを低周波と高周波の2つの成分に分解する手法です.HSVDを使用して時系列データの前処理をします.
Embedding
まず最初に,個の時系列データ を以下のような窓サイズのハンケル行列H(正確にはハンケル行列は正方行列なのでちょっと違います...)に変換します.
Decomposition
得られたハンケル行列を特異値分解して低ランク近似をして低周波成分を抽出します.
低ランク近似したハンケル行列をとします.
Unembedding
低周波成分を以下のように抽出します.
]
以下の画像のように赤い要素が低周波成分になります.
高周波成分は元のデータから低周波成分を引いたものになります.
例
例としてsin波形にノイズを加えたデータをHSVDで前処理を施します.
データ
分解
低周波
高周波
まとめ
上の画像から元のデータを低周波と高周波に分解できていることが分かると思います. 元のデータ = 低周波 + 高周波 なのでそれぞれの成分を時系列予測アルゴリズムにかけるなどの応用が可能です.
実装
def moving_window_matrix(x,window_size): # Fork from https://qiita.com/bauer/items/48ef4a57ff77b45244b6 n = x.shape[0] stride = x.strides[0] return np.lib.stride_tricks.as_strided(x, shape=(n-window_size+1, window_size), strides=(stride,stride) ).copy() def hsvd(x, window, rank): m = moving_window_matrix(x, window) u, s, vh = np.linalg.svd(m) h = u[:,:rank] @ np.diag(s[:rank]) @ vh[:rank,:] c = h[0,:] c = np.append(c, h[1:,-1]) return c, x-c
実装をgithubにあげました
Usage
import numpy as np from tfilter import hsvd N = 500 x = np.sin(np.arange(N) * np.pi/50.0) x = x + np.random.normal(0, 0.3, size=N) window = 100 rank = 2 low_freq, high_freq = hsvd(x, window, rank)
参考
しょぼん(´・ω・`)基底をGPLVMで獲得する
データの作成
以下の画像から,しょぼん(´・ω・`)基底を作成します.
まず,画像を読み込みます.
import numpy as np from PIL import Image image = np.asarray(Image.open('./data/shobon.png').convert('L'))
次に,2値化を行いしょぼん(´・ω・`)基底を作成します.
# binarization a = np.where(image < 240) shobon = np.c_[a[1], np.flipud(a[0])] # shobon.shape = (896, 2)
プロットすると以下のような,しょぼん(´・ω・`)基底が得られたことが確認できます.
高次元変換
ガウス分布に従う重みを用いて,100次元の高次元にしょぼん(´・ω・`)基底を変換します. そしてノイズを加えておきます.
dim = 100 w = np.random.normal(size=2*dim).reshape(2, dim) high = shobon @ w high = high + np.random.normal(size=high.shape[0]*high.shape[1]).reshape(high.shape[0], high.shape[1]) # high.shape = (896, 100)
GPLVMによる低次元埋め込み
Gaussian Process Latent Variable Models(GPLVM)とはガウス過程を利用して潜在変数空間に変換する教師なし学習のアルゴリズムです. 応用例として以下の動画のように人間のポーズを低次元埋め込みするなどの試みがあります.
今回はGPLVMを使うためにGPyを使用します.
GPLVMを使って100次元のデータを2次元の低次元埋め込みをします.
# latent gplvm = GPLVM(high, 2) latent = gplvm.X # latent.shape = (896, 2)
結果
得られた潜在変数をプロットすると以下のようになります.
しょぼん(´・ω・`)基底と似たものが獲得できたので成功ですね!!
実装
参考
Higher-Order-SVDで画像の低ランク近似
HOSVD
SVDは行列の低ランク近似手法であり,Higher-Order-SVD(HOSVD)はSVDをテンソルに拡張した手法です. HOSVDではタッカー分解というコアテンソルとそれぞれのモードの行列に分解する方法を使用します.以下にその概要を図示しました.
方法
の3階テンソルを例にしてタッカー分解をする方法を説明します.まず最初に,,,のそれぞれの行列に分解しSVDをします.
上記の式から,分解されたユニタリ行列からコアテンソルを以下のように定義します.
元のテンソルに復元するためには,以下のように再度ユニタリ行列を使います.
上記のユニタリ行列を任意のランクに切り取ることで,低ランク近似が可能となります.
例
3階テンソルであるカラー画像を使って,画像の低ランク近似をしてみます.使用する画像は画像処理でよく使われるレナ画像を使用します.
オリジナル画像
Rank1の画像
Rank5の画像
Rank10の画像
Rank20の画像
Rank30の画像
Rank50の画像
Rank100の画像
実装コード
参考
PyTorchで最急降下法による最適化
余談
Differentiable Programming(微分可能プログラミング)という捉え方がとても素晴らしいと個人的に思います. 詳しくは以下のブログを見てください.
最急降下法による最適化
機械学習フレームワークのPyTorchを使って単純に最急降下法で関数の最適化をする. 最適化する関数は10000000次元のsphere関数を使用する.
コード
# -*- coding: utf-8 -*- import torch import torch.nn as nn import torch.optim as optim # 評価関数 def _sphere(x): return torch.pow(x, 2).sum() class Model(nn.Module): def __init__(self, dim=100, func=None): super(Model, self).__init__() if func is None: func = _sphere self.func = func # 設計変数 self.x = nn.Parameter(torch.rand(dim)) def forward(self): return self.func(self.x) def vars(self): return self.x.detach().numpy() def objective(self): with torch.no_grad(): return self.func(self.x).numpy() # 次元は10000000次元 model = Model(dim=10000000) optimizer = optim.SGD(model.parameters(), lr=0.1) N = 100 print('初期の評価値', model.objective()) print('optimization...') for i in range(N): output = model() optimizer.zero_grad() loss = output loss.backward() optimizer.step() print('\rloss:', loss.item(), end='') print() print('評価値', model.objective())
実行結果
初期の評価値 3331740.8 optimization... loss: 2.1601908358880734e-13 評価値 1.382506e-13
gist
参考
GPyTorchで多クラス分類
GPyTorchとは
PyTorch上で実装されたガウス過程(GP)のライブラリである. ハイパーパラメータをPyTorchのシステムを使用してGPUなどで最適化できるのが特徴である. GPは逆行列を計算する際に の計算量がかかり,スケーラビリティに難があるが,PyTorchのシステムを使ってそれを補おうとしている.
GPでは尤度を変更することで回帰や二値分類,多クラス分類が可能となる. 今回はアヤメ(iris)データセットを用いて3クラスの多クラス分類をGPyTorchで分類してみる.
コード
# -*- coding: utf-8 -*- import torch import gpytorch import numpy as np from gpytorch.models import AbstractVariationalGP from gpytorch.variational import CholeskyVariationalDistribution from gpytorch.mlls.variational_elbo import VariationalELBO from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split class GPMultiClassificationModel(AbstractVariationalGP): def __init__(self, num_dim=2, grid_bounds=(-10., 10.), grid_size=64): variational_distribution = gpytorch.variational.CholeskyVariationalDistribution( num_inducing_points=grid_size, batch_size=num_dim ) variational_strategy = gpytorch.variational.AdditiveGridInterpolationVariationalStrategy( self, grid_size=grid_size, grid_bounds=[grid_bounds], num_dim=num_dim, variational_distribution=variational_distribution, mixing_params=False, sum_output=False ) super().__init__(variational_strategy) self.mean_module = gpytorch.means.ConstantMean() self.covar_module = gpytorch.kernels.ScaleKernel(gpytorch.kernels.RBFKernel()) def forward(self, x): mean_x = self.mean_module(x) covar_x = self.covar_module(x) latent_pred = gpytorch.distributions.MultivariateNormal(mean_x, covar_x) return latent_pred device = 'cuda' if torch.cuda.is_available() else 'cpu' iris = load_iris() # 特徴量は2次元 # testデータは3割 (train_x, test_x, train_y, test_y) = train_test_split(iris.data[:, [0, 2]].astype(np.float32), iris.target, test_size=0.3, random_state=0,) # 3クラス分類 n_labels = len(np.unique(train_y)) train_x = torch.from_numpy(train_x).to(device) train_y = torch.from_numpy(train_y).to(device) test_x = torch.from_numpy(test_x).to(device) test_y = torch.from_numpy(test_y).to(device) model = GPMultiClassificationModel(num_dim=2).to(device) # 多クラス分類なのでSoftmaxを使用する likelihood = gpytorch.likelihoods.SoftmaxLikelihood(num_classes=n_labels, num_features=2).to(device) # 周辺尤度 mll = VariationalELBO(likelihood, model, train_y.numel()) optimizer = torch.optim.Adam(model.parameters(), lr=0.1) # train model.train() likelihood.train() training_iter = 10000 for i in range(training_iter): optimizer.zero_grad() output = model(train_x) loss = -mll(output, train_y) loss.backward() print('\rIter %d/%d - Loss: %.3f' % (i + 1, training_iter, loss.item()), end='') optimizer.step() print('') # test model.eval() likelihood.eval() correct = 0.0 # sample数を64とする with torch.no_grad(), gpytorch.settings.num_likelihood_samples(64): data = test_x target = test_y output = likelihood(model(data)) pred = output.probs.mean(0).argmax(-1) correct += pred.eq(target.view_as(pred)).cpu().sum() acc = correct.numpy() / float(len(test_x)) * 100.0 print('Accuracy:', acc)
出力結果
Iter 10000/10000 - Loss: 0.397 Accuracy: 86.66666666666667
gist
参考
カーネル密度推定とエントロピー
カーネル密度推定をしてエントロピーを計算する方法を忘備録として書いておく.
標準正規分布を例にとって計算してみる. ちなみに理論値は
import numpy as np import matplotlib.pyplot as plt from scipy import integrate from scipy.stats import norm from scipy.stats import gaussian_kde x = norm.rvs(size=10000) plt.hist(x, bins=50) plt.title('sampling') plt.show() k = gaussian_kde(x) X = np.linspace(-5, 5, 100) plt.plot(X, norm().pdf(X), label='norm') plt.plot(X, k(X), label='kernel estimation') plt.legend() plt.title('pdf') plt.show() def entropy(pdf): def f(x): px = pdf(x) return - px * np.log(px) return f entropy_k = entropy(k) print('entropy:') print('カーネル密度推定', integrate.quad(entropy_k, -5.0, 5.0)) print('理論値', np.log(np.sqrt(2*np.pi*np.e)))
出力結果
entropy: カーネル密度推定 (1.4426170314750473, 1.2370907729071283e-08) 理論値 1.4189385332046727
gist
Pykkaでping pong
Pythonでactor model使いたかったので忘備録として
Pykka
Pythonのactor model frameworkです.
インストール方法
pip install pykka
Actorの実装方法
標準のThreadによる実装やgeventなどがある.
ドキュメントに gevent
の方が一般的に Thread
よりも早いことが記載されている.
gevent実装のActorを使用する場合,geventのインストール方法
pip install gevent
通信方法
ask
は同期通信,tell
は非同期通信となっている.
ask
の場合は返り値を受け取れるが,tell
の場合は None
が返り値となる.
ping pong
pingというメッセージに対してpongを返すサンプルを書いてみる.
以下は threading.Thread
の実装
from pykka import ThreadingActor class Actor(ThreadingActor): def __init__(self): super().__init__() def on_receive(self, message): print('recieve:', message) return 'pong' actor = Actor.start() r = actor.tell('ping') # None print('tell:', r) r = actor.ask('ping') # pong print('ask:', r) actor.stop() ''' tell: None recieve: ping recieve: ping ask: pong '''
以下は gevent
の実装
from pykka.gevent import GeventActor class Gactor(GeventActor): def __init__(self): super().__init__() def on_receive(self, message): print('recieve:', message) return 'pong' gactor = Gactor.start() r = gactor.tell('ping') print('tell:', r) r = gactor.ask('ping') print('ask:', r) gactor.stop() ''' tell: None recieve: ping recieve: ping ask: pong '''