やましなぶろぐ

kerasのmodel.fit_generatorを使用する

🕒 Last mod: 2020-10-23


1. model.fit_generator

keras.models.Modelはfitとは別にfit_generatorというメソッドを持っている。

ジェネレータを使用したプログラムを始めて見たときに処理の流れを把握できずに戸惑った記憶が残っている。

大量な画像などを学習するDeepLearningではメモリの有効利用の観点からジェネレータを使用を迫られることがあるので実装方法に慣れておきたい。

2. sklearn API ラッパーのKerasClassifierにfit_generatorはない

sklearnを良く使用するのでsklearnAPIラッパーを使用することを前提に考えていた。

kerasにもAPIラッパーがあるので使用しようと考えていたのだがこれらにはfit_generatorがない。

ジェネレータを使用したいときにはsklearn APIラッパーを使用できないということには認識しておこう。

3. keras.utils.Sequence

学習用データを生成するジェネレータを準備すればよいのだがkerasでは専用のユーティリティが用意されている。

keras.utils.Sequence

このユーティリティを使用しなくてもジェネレータの実装は可能だがせっかく便利なユーティリティが用意されているので利用させてもらう。

4. ジェネレータクラスの作成

下記を定義する。

  1. keras.utils.Sequenceを継承したクラスを定義する

  2. __init__を定義する

  3. __getitem__を定義する

  4. __len__を定義する

クラスのなかでイメージを都度読み込んだり前処理を実装することが可能です。

シンプルなクラスなら下記のコードのままでそのまま流用できそうです。

generator.py
from keras.utils import Sequence

class gen(Sequence):

    def __init__(self, x_set, y_set, batch_size=10):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.x) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]

        return batch_x, batch_y

5. fit_generatorを使用してみる。

実際にシミュレーションデータで実装してみます。

example.py
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.utils import Sequence

## start create X, y
X_0 = np.random.normal(
    0.1,
    0.1,
    200,
)
X_1 = np.random.normal(
    0.2,
    0.1,
    200,
)
X = np.concatenate([X_0, X_1])
X = X.reshape((400, 1))
y = np.zeros(400)
y[200:] = 1
y = to_categorical(y, 2)

X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.3,
    random_state=42
)
## end create X, y

## start create model
def create_model():
    model = Sequential()
    model.add(
        Dense(
            2,
            activation='softmax',
        )
    )
    model.compile(
        loss='categorical_crossentropy',
        optimizer='adam',
        metrics=['accuracy']
    )
    return model
## end create model


## start create gen
class gen(Sequence):

    def __init__(self, x_set, y_set, batch_size=10):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size

    def __len__(self):
        return int(np.ceil(len(self.x) / float(self.batch_size)))

    def __getitem__(self, idx):
        batch_x = self.x[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_y = self.y[idx * self.batch_size:(idx + 1) * self.batch_size]

        return batch_x, batch_y
## end create gen

## start fit
g = gen(X_train, y_train)
m = create_model()
m.fit_generator(
    g,
    epochs = 5
)
p = m.predict_proba(X_test)
score = roc_auc_score(
    y_test[:, 1],
    p[:, 1],
)
print(score)
## end fit