!apt -y -q install tree
!pip install wfdb==2.2.1 scikit-learn==0.20.1 imbalanced-learn==0.4.3
import os
import random
import numpy as np
import chainer
import scipy
import pandas as pd
import matplotlib
import seaborn as sn
import wfdb
import sklearn
import imblearn
chainer.print_runtime_info()
print("Scipy: ", scipy.__version__)
print("Pandas: ", pd.__version__)
print("Matplotlib: ", matplotlib.__version__)
print("Seaborn: ", sn.__version__)
print("WFDB: ", wfdb.__version__)
print("Scikit-learn: ", sklearn.__version__)
print("Imbalanced-learn: ", imblearn.__version__)
def reset_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
if chainer.cuda.available:
chainer.cuda.cupy.random.seed(seed)
reset_seed(42)
dataset_root = './dataset'
download_dir = os.path.join(dataset_root, 'download')
wfdb.dl_database('mitdb', dl_dir=download_dir)
Finished downloading files
というメッセージが表示されます.print(sorted(os.listdir(download_dir)))
.dat
: シグナル(バイナリ形式).atr
: アノテーション(バイナリ形式).hea
: ヘッダ(バイナリファイルの読み込みに必要)__init__()
(コンストラクタ) : 変数の初期化,学習用とテスト用への分割ルール,利用するラベルの集約ルール_load_data()
: シグナル,及びアノテーションの読み込み_normalize_signal()
: method
オプションに応じてシグナルをスケーリング_segment_data()
: 読み込んだシグナルとアノテーションを,一定幅(window_size
)で切り出しpreprocess_dataset()
: 学習データ,テストデータを作成_preprocess_dataset_core()
: preprocess_datataset()
内で呼ばれるメインの処理.class BaseECGDatasetPreprocessor(object):
def __init__(
self,
dataset_root,
window_size=720, # 2 seconds
):
self.dataset_root = dataset_root
self.download_dir = os.path.join(self.dataset_root, 'download')
self.window_size = window_size
self.sample_rate = 360.
# split list
self.train_record_list = [
'101', '106', '108', '109', '112', '115', '116', '118', '119', '122',
'124', '201', '203', '205', '207', '208', '209', '215', '220', '223', '230'
]
self.test_record_list = [
'100', '103', '105', '111', '113', '117', '121', '123', '200', '210',
'212', '213', '214', '219', '221', '222', '228', '231', '232', '233', '234'
]
# annotation
self.labels = ['N', 'V']
self.valid_symbols = ['N', 'L', 'R', 'e', 'j', 'V', 'E']
self.label_map = {
'N': 'N', 'L': 'N', 'R': 'N', 'e': 'N', 'j': 'N',
'V': 'V', 'E': 'V'
}
def _load_data(
self,
base_record,
channel=0 # [0, 1]
):
record_name = os.path.join(self.download_dir, str(base_record))
# read dat file
signals, fields = wfdb.rdsamp(record_name)
assert fields['fs'] == self.sample_rate
# read annotation file
annotation = wfdb.rdann(record_name, 'atr')
symbols = annotation.symbol
positions = annotation.sample
return signals[:, channel], symbols, positions
def _normalize_signal(
self,
signal,
method='std'
):
if method == 'minmax':
# Min-Max scaling
min_val = np.min(signal)
max_val = np.max(signal)
return (signal - min_val) / (max_val - min_val)
elif method == 'std':
# Zero mean and unit variance
signal = (signal - np.mean(signal)) / np.std(signal)
return signal
else:
raise ValueError("Invalid method: {}".format(method))
def _segment_data(
self,
signal,
symbols,
positions
):
X = []
y = []
sig_len = len(signal)
for i in range(len(symbols)):
start = positions[i] - self.window_size // 2
end = positions[i] + self.window_size // 2
if symbols[i] in self.valid_symbols and start >= 0 and end <= sig_len:
segment = signal[start:end]
assert len(segment) == self.window_size, "Invalid length"
X.append(segment)
y.append(self.labels.index(self.label_map[symbols[i]]))
return np.array(X), np.array(y)
def preprocess_dataset(
self,
normalize=True
):
# preprocess training dataset
self._preprocess_dataset_core(self.train_record_list, "train", normalize)
# preprocess test dataset
self._preprocess_dataset_core(self.test_record_list, "test", normalize)
def _preprocess_dataset_core(
self,
record_list,
mode="train",
normalize=True
):
Xs, ys = [], []
save_dir = os.path.join(self.dataset_root, 'preprocessed', mode)
for i in range(len(record_list)):
signal, symbols, positions = self._load_data(record_list[i])
if normalize:
signal = self._normalize_signal(signal)
X, y = self._segment_data(signal, symbols, positions)
Xs.append(X)
ys.append(y)
os.makedirs(save_dir, exist_ok=True)
np.save(os.path.join(save_dir, "X.npy"), np.vstack(Xs))
np.save(os.path.join(save_dir, "y.npy"), np.concatenate(ys))
preprocess_dataset()
を実行することで,前処理後のデータがNumpy Array形式で所定の場所に保存されます.BaseECGDatasetPreprocessor(dataset_root).preprocess_dataset()
!tree ./dataset/preprocessed
2 directories, 4 files
X_train = np.load(os.path.join(dataset_root, 'preprocessed', 'train', 'X.npy'))
y_train = np.load(os.path.join(dataset_root, 'preprocessed', 'train', 'y.npy'))
X_test = np.load(os.path.join(dataset_root, 'preprocessed', 'test', 'X.npy'))
y_test = np.load(os.path.join(dataset_root, 'preprocessed', 'test', 'y.npy'))
print("X_train.shape = ", X_train.shape, " \t y_train.shape = ", y_train.shape)
print("X_test.shape = ", X_test.shape, " \t y_test.shape = ", y_test.shape)
uniq_train, counts_train = np.unique(y_train, return_counts=True)
print("y_train count each labels: ", dict(zip(uniq_train, counts_train)))
uniq_test, counts_test = np.unique(y_test, return_counts=True)
print("y_test count each labels: ", dict(zip(uniq_test, counts_test)))
%matplotlib inline
import matplotlib.pyplot as plt
idx_n = np.where(y_train == 0)[0]
plt.plot(X_train[idx_n[0]])
idx_s = np.where(y_train == 1)[0]
plt.plot(X_train[idx_s[0]])
class ECGDataset(chainer.dataset.DatasetMixin):
def __init__(
self,
path
):
if os.path.isfile(os.path.join(path, 'X.npy')):
self.X = np.load(os.path.join(path, 'X.npy'))
else:
raise FileNotFoundError("{}/X.npy not found.".format(path))
if os.path.isfile(os.path.join(path, 'y.npy')):
self.y = np.load(os.path.join(path, 'y.npy'))
else:
raise FileNotFoundError("{}/y.npy not found.".format(path))
def __len__(self):
return len(self.X)
def get_example(self, i):
return self.X[None, i].astype(np.float32), self.y[i]
import chainer.functions as F
import chainer.links as L
from chainer import reporter
from chainer import Variable
class BaseBlock(chainer.Chain):
def __init__(
self,
channels,
stride=1,
dilate=1
):
self.stride = stride
super(BaseBlock, self).__init__()
with self.init_scope():
self.c1 = L.ConvolutionND(1, None, channels, 3, stride, dilate, dilate=dilate)
self.c2 = L.ConvolutionND(1, None, channels, 3, 1, dilate, dilate=dilate)
if stride > 1:
self.cd = L.ConvolutionND(1, None, channels, 1, stride, 0)
self.b1 = L.BatchNormalization(channels)
self.b2 = L.BatchNormalization(channels)
def __call__(self, x):
h = F.relu(self.b1(self.c1(x)))
if self.stride > 1:
res = self.cd(x)
else:
res = x
h = res + self.b2(self.c2(h))
return F.relu(h)
class ResBlock(chainer.Chain):
def __init__(
self,
channels,
n_block,
dilate=1
):
self.n_block = n_block
super(ResBlock, self).__init__()
with self.init_scope():
self.b0 = BaseBlock(channels, 2, dilate)
for i in range(1, n_block):
bx = BaseBlock(channels, 1, dilate)
setattr(self, 'b{}'.format(str(i)), bx)
def __call__(self, x):
h = self.b0(x)
for i in range(1, self.n_block):
h = getattr(self, 'b{}'.format(str(i)))(h)
return h
class ResNet34(chainer.Chain):
def __init__(self):
super(ResNet34, self).__init__()
with self.init_scope():
self.conv1 = L.ConvolutionND(1, None, 64, 7, 2, 3)
self.bn1 = L.BatchNormalization(64)
self.resblock0 = ResBlock(64, 3)
self.resblock1 = ResBlock(128, 4)
self.resblock2 = ResBlock(256, 6)
self.resblock3 = ResBlock(512, 3)
self.fc = L.Linear(None, 2)
def __call__(self, x):
h = F.relu(self.bn1(self.conv1(x)))
h = F.max_pooling_nd(h, 3, 2)
for i in range(4):
h = getattr(self, 'resblock{}'.format(str(i)))(h)
h = F.average(h, axis=2)
h = self.fc(h)
return h
class Classifier(chainer.Chain):
def __init__(
self,
predictor,
lossfun=F.softmax_cross_entropy
):
super(Classifier, self).__init__()
with self.init_scope():
self.predictor = predictor
self.lossfun = lossfun
def __call__(self, *args):
assert len(args) >= 2
x = args[:-1]
t = args[-1]
y = self.predictor(*x)
# loss
loss = self.lossfun(y, t)
with chainer.no_backprop_mode():
# other metrics
accuracy = F.accuracy(y, t)
# reporter
reporter.report({'loss': loss}, self)
reporter.report({'accuracy': accuracy}, self)
return loss
def predict(self, x):
with chainer.function.no_backprop_mode(), chainer.using_config('train', False):
x = Variable(self.xp.asarray(x, dtype=self.xp.float32))
y = self.predictor(x)
return y
create_train_dataset()
:学習用データセットを ECGDataset
クラスに渡すcreate_trainer()
:学習に必要な設定を行い,Trainerオブジェクトを作成from chainer import optimizers
from chainer.optimizer import WeightDecay
from chainer.iterators import MultiprocessIterator
from chainer import training
from chainer.training import extensions
from chainer.training import triggers
from chainer.backends.cuda import get_device_from_id
def create_train_dataset(root_path):
train_path = os.path.join(root_path, 'preprocessed', 'train')
train_dataset = ECGDataset(train_path)
return train_dataset
def create_trainer(
batchsize, train_dataset, nb_epoch=1,
device=0, lossfun=F.softmax_cross_entropy
):
# setup model
model = ResNet34()
train_model = Classifier(model, lossfun=lossfun)
# use Adam optimizer
optimizer = optimizers.Adam(alpha=0.001)
optimizer.setup(train_model)
optimizer.add_hook(WeightDecay(0.0001))
# setup iterator
train_iter = MultiprocessIterator(train_dataset, batchsize)
# define updater
updater = training.StandardUpdater(train_iter, optimizer, device=device)
# setup trainer
stop_trigger = (nb_epoch, 'epoch')
trainer = training.trainer.Trainer(updater, stop_trigger)
logging_attributes = [
'epoch', 'iteration',
'main/loss', 'main/accuracy'
]
trainer.extend(
extensions.LogReport(logging_attributes, trigger=(2000 // batchsize, 'iteration'))
)
trainer.extend(
extensions.PrintReport(logging_attributes)
)
trainer.extend(
extensions.ExponentialShift('alpha', 0.75, optimizer=optimizer),
trigger=(4000 // batchsize, 'iteration')
)
return trainer
train_dataset = create_train_dataset(dataset_root)
trainer = create_trainer(256, train_dataset, nb_epoch=1, device=0)
%time trainer.run()
create_test_dataset()
: 評価用データの読み込みpredict()
: 推論を行い,結果の配列(正解ラベルと予測ラベル)を出力print_confusion_matrix()
: 予測結果から混同行列とよばれる表を出力print_scores()
: 予測結果から予測精度の評価指標を出力from chainer import cuda
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
def create_test_dataset(root_path):
test_path = os.path.join(root_path, 'preprocessed', 'test')
test_dataset = ECGDataset(test_path)
return test_dataset
def predict(trainer, test_dataset, batchsize, device=-1):
model = trainer.updater.get_optimizer('main').target
ys = []
ts = []
for i in range(len(test_dataset) // batchsize + 1):
if i == len(test_dataset) // batchsize:
X, t = zip(*test_dataset[i*batchsize: len(test_dataset)])
else:
X, t = zip(*test_dataset[i*batchsize:(i+1)*batchsize])
X = cuda.to_gpu(np.array(X), device)
y = model.predict(X)
y = cuda.to_cpu(y.data.argmax(axis=1))
ys.append(y)
ts.append(np.array(t))
return np.concatenate(ts), np.concatenate(ys)
def print_confusion_matrix(y_true, y_pred):
labels = sorted(list(set(y_true)))
target_names = ['Normal', 'VEB']
cmx = confusion_matrix(y_true, y_pred, labels=labels)
df_cmx = pd.DataFrame(cmx, index=target_names, columns=target_names)
plt.figure(figsize = (5,3))
sn.heatmap(df_cmx, annot=True, annot_kws={"size": 18}, fmt="d", cmap='Blues')
plt.show()
def print_scores(y_true, y_pred):
target_names = ['Normal', 'VEB']
print(classification_report(y_true, y_pred, target_names=target_names))
print("accuracy: ", accuracy_score(y_true, y_pred))
test_dataset = create_test_dataset(dataset_root)
%time y_true_test, y_pred_test = predict(trainer, test_dataset, 256, 0)
print_confusion_matrix(y_true_test, y_pred_test)
print_scores(y_true_test, y_pred_test)
Normal 0.99 0.94 0.96 42149
VEB 0.52 0.90 0.66 3200
micro avg 0.94 0.94 0.94 45349
macro avg 0.76 0.92 0.81 45349
weighted avg 0.96 0.94 0.94 45349
accuracy: 0.9351694634942336
SampledECGDataset
クラスを定義します.create_sampled_train_datset()
関数を用意します.from imblearn.datasets import make_imbalance
from imblearn.over_sampling import SMOTE
class SampledECGDataset(ECGDataset):
def __init__(
self,
path
):
super(SampledECGDataset, self).__init__(path)
_, counts = np.unique(self.y, return_counts=True)
self.X, self.y = make_imbalance(
self.X, self.y,
sampling_strategy={0: counts[0]//4, 1: counts[1]}
)
smote = SMOTE(random_state=42)
self.X, self.y = smote.fit_sample(self.X, self.y)
def create_sampled_train_dataset(root_path):
train_path = os.path.join(root_path, 'preprocessed', 'train')
train_dataset = SampledECGDataset(train_path)
return train_dataset
train_dataset = create_sampled_train_dataset(dataset_root)
trainer = create_trainer(256, train_dataset, nb_epoch=2, device=0)
%time trainer.run()
%time y_true_test, y_pred_test = predict(trainer, test_dataset, 256, 0)