In [1]:
import os
import sys
sys.path.append(os.path.abspath('..'))
from config import *

import random
import numpy as np
import torch
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

# 演習：網膜疾患診断

光干渉断層撮影（optical coherence tomography; OCT）は眼底検査に利用され、加齢黄斑変性や糖尿病網膜症などの疾患の早期発見や経過観察に役立っています。本節では、光干渉断層撮影画像を使用し、物体分類のための深層ニューラルネットワークアーキテクチャである ResNet を活用して、網膜疾患を判定するモデルの構築方法を学びます。

## 演習準備

### ライブラリ

本節で利用するライブラリを読み込みます。NumPy、Pnadas、Matplotlib、[Pillow](https://pillow.readthedocs.io/)（PIL）などのライブライは、モデルの性能や推論結果などの可視化に利用します。[scikit-learn](https://scikit-learn.org/)（sklearn）、[PyTorch](https://pytorch.org/)（torch）、torchvision は機械学習関連のライブラリであり、モデルの構築、検証や推論などに利用します。また、OpenCV（cv2）、pytorch_grad_cam（[grad-cam](https://github.com/jacobgil/pytorch-grad-cam)）などはモデルの推論根拠を可視化するために利用します。

In [None]:
# image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import PIL.Image

# machine learning
import sklearn.metrics
import torch
import torchvision

# grad-CAM visualization
import cv2
import pytorch_grad_cam

print(f'torch v{torch.__version__}; torchvision v{torchvision.__version__}')

ライブラリの読み込み時に *ImportError* や *ModuleNotFoundError* が発生した場合は、該当するライブラリをインストールしてください。ライブラリのバージョンを揃える必要はありませんが、PyTorch（torch）および torchvision が上記のバージョンと異なる時、実行中に警告メッセージが現れたり、同じ結果にならなかったりする可能性があります。

### データセット

本節では OCT2017[^oct2017_dataset] を使用します。このデータセットは、光干渉断層撮影（optical coherence tomography; OCT）で撮影された網膜の画像を、健康（NORMAL）、加齢黄斑変性による脈絡膜新生血管（choroidal neovascularization; CNV）、糖尿病黄斑浮腫（diabetic macular edema; DME）、および網膜色素上皮の機能低下により生じるドルーゼン（DRUSEN）の 4 つのカテゴリに整理されています（{numref}`fig-oct2017_dataset_example`）。


```{figure} ../_static/oct2017_dataset.jpg
---
name: fig-oct2017_dataset_example
---
OCT2017 データセットに含まれる各カテゴリのサンプル画像。
```


OCT2017 データセットは、CC-BY 4.0 ライセンスのもと、[Mendeley Data](https://doi.org/10.17632/rscbjbr9sj.2) で公開されており、著作権表示を行うことで自由に利用できます。オリジナルデータセットは大きいため、本節では、オリジナルのデータセットから 1,200 枚の画像をランダムに抽出して作成した小規模なデータセットを使用します。本節で利用するデータセットは、Jupyter Notebook 上では、次のコマンドを実行することでダウンロードできます。


```bash
!wget https://dl.biopapyrus.jp/data/oct2017.zip
!unzip oct2017.zip
```

このデータセットは、訓練、検証、およびテストの 3 つのサブセットで構成されています。訓練サブセットには各カテゴリに 200 枚の画像が含まれています。また、検証およびテストサブセットには、それぞれ各カテゴリごとに 50 枚の画像が含まれています。

[^oct2017_dataset]: Kermany et al. (2018) Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning. Cell. DOI: [10.1016/j.cell.2018.02.010](https://doi.org/10.1016/j.cell.2018.02.010)

### 画像前処理

畳み込みニューラルネットワークは、ニューロンの数などが固定されているため、入力する画像のサイズにも制限があります。例えば、本節で使用する ResNet 18 [^resnet18_input] では、224&times;224 ピクセルの正方形画像を入力として設計されています。また、PyTorch ではすべてのデータをテンソル形式で扱う必要があります。そのため、畳み込みニューラルネットワークに画像を入力する前に、画像サイズを適切に調整し、テンソル型に変換するといった前処理を行う必要があります。以下では、この前処理の手順を定義します。

[^resnet18_input]: https://pytorch.org/vision/main/models/generated/torchvision.models.resnet18.html

In [3]:
class SquareResize:
    def __init__(self, shape=224, bg_color = (0, 0, 0)):
        self.shape = shape
        self.bg_color = tuple(bg_color)

    def __call__(self, img):
        w, h = img.size
        img_square = None

        if w == h:
            img_square = img
        elif w > h:
            img_square = PIL.Image.new(img.mode, (w, w), self.bg_color)
            img_square.paste(img, (0, (w - h) // 2))
        else:
            img_square = PIL.Image.new(img.mode, (h, h), self.bg_color)
            img_square.paste(img, ((h - w) // 2, 0))

        img_square = img_square.resize((self.shape, self.shape))
        return img_square

transform = torchvision.transforms.Compose([
    SquareResize(),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

画像データは通常、0 から 255 の範囲の整数値で表現されていますが、前処理の段階でこれを正規化します。正規化により、画像データの値は平均約 0.5、標準偏差約 0.23 の範囲に変換され、モデルの学習を効率的に進めることができます。なお、正規化の際に平均を 0.50、分散を 0.23 のような切りの良い数値にしない理由は、これから利用する torchvision.models が提供する訓練済みモデルが、特定の数値（例えば平均 0.485、標準偏差 0.229）で訓練されているためです。そのため、この訓練済みモデルに合わせて正規化を行います。

### 計算デバイス

計算を実行するデバイスを設定します。PyTorch が GPU を認識できる場合は GPU を使用し、認識できない場合は CPU を利用するようにします。Google Colab を利用している場合は、メニューから「Runtime」→「Change runtime type」を選び、「Hardware accelerator」を GPU（例: T4 GPU や A100 GPU など）に設定することで、GPU を利用できるようになります。なお、ランタイムを変更すると、Google Colab が再起動されるので、上のコードをもう一度実行する必要があります。

In [4]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## モデル構築

torchvision.models モジュールで提供されているアーキテクチャは、飛行機や車、人など、1000 種類の一般的な物体を分類するように設計されています。これに対して、本節では、NORMAL、CNV、DME および DRUSEN の 4 カテゴリの分類問題を扱います。そのため、torchvision.models モジュールから読み込んだ ResNet 18 の出力層のユニット数を 4 に変更する必要があります。この修正作業はモデルを呼び出すたびに行う必要があり、手間がかかります。そこで、一連の処理を関数化してから利用します。

In [5]:
def resnet18(num_classes, weights=None):
    model = torchvision.models.resnet18(weights='DEFAULT')
    in_features = model.fc.in_features
    model.fc = torch.nn.Linear(in_features, num_classes)
    if weights is not None:
        model.load_state_dict(torch.load(weights))
    return model

model = resnet18(4)

## モデル訓練

モデルが学習データを効率よく学習できるようにするため、損失関数（`criterion`）、学習アルゴリズム（`optimizer`）、学習率（`lr`）、および学習率を調整するスケジューラ（`lr_scheduler`）を設定します。

In [6]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
criterion = torch.nn.CrossEntropyLoss()
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

次に、訓練データと検証データを読み込み、モデルが入力できる形式に整えます。

In [7]:
train_dataset = torchvision.datasets.ImageFolder('oct2017/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)

valid_dataset = torchvision.datasets.ImageFolder('oct2017/valid', transform=transform)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=32, shuffle=False)

準備が整ったら、訓練を開始します。訓練プロセスでは、訓練と検証を交互に繰り返します。訓練では、訓練データを使ってモデルのパラメータを更新し、その際の損失（誤差）を記録します。検証では、検証データを使ってモデルの予測性能（正解率）を計算し、その結果を記録します。

In [None]:
model.to(device)

num_epochs = 10
metric_dict = []

for epoch in range(num_epochs):
    # training phase
    model.train()

    running_loss = 0.0
    n_correct_train = 0
    n_train_samples = 0
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        _, predicted_labels = torch.max(outputs.data, 1)
        n_correct_train += torch.sum(predicted_labels == labels).item()
        n_train_samples += labels.size(0)
        running_loss +=  loss.item() / len(train_loader)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    lr_scheduler.step()


    # validation phase
    model.eval()
    
    n_correct_valid = 0
    n_valid_samples = 0
    with torch.no_grad():
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            _, predicted_labels = torch.max(outputs.data, 1)
            n_correct_valid += torch.sum(predicted_labels == labels).item()
            n_valid_samples += labels.size(0)

    metric_dict.append({
        'epoch': epoch + 1,
        'train_loss': running_loss,
        'train_acc': n_correct_train / n_train_samples,
        'valid_acc': n_correct_valid / n_valid_samples
    })
  
    print(metric_dict[-1])

訓練データに対する損失と検証データに対する正解率を可視化し、訓練過程を評価します。

In [None]:
metric_dict = pd.DataFrame(metric_dict)

fig, ax = plt.subplots(1, 2)
ax[0].plot(metric_dict['epoch'], metric_dict['train_loss'])
ax[0].set_xlabel('epoch')
ax[0].set_ylabel('loss')
ax[0].set_title('Train')
ax[1].plot(metric_dict['epoch'], metric_dict['valid_acc'])
ax[1].set_ylim(0, 1)
ax[1].set_xlabel('epoch')
ax[1].set_ylabel('accuracy')
ax[1].set_title('Validation')
plt.tight_layout()
fig.show()

In [None]:
glue('valid_acc', metric_dict['valid_acc'].iloc[5:].mean(), display=False)

訓練過程を可視化した結果、エポックが進むにつれて訓練データに対する損失が減少し、5 エポック前後で収束し始める傾向が確認できました。また、検証データに対する分類性能（正解率）は、最初の数エポックで既に高い性能を示していることがわかりました。

次に、同じ手順を他の深層ニューラルネットワークアーキテクチャ（DenseNet や Inception など）に対して実施し、それぞれの検証性能を比較します。そして、このデータセットに最適なアーキテクチャを選定します。ただし、本節ではモデル（アーキテクチャ）選択を行わずに、ResNet 18 を最適なアーキテクチャとして採用し、次のステップに進みます。

次のステップでは、訓練サブセットと検証サブセットを統合し、最適なアーキテクチャで最初から訓練を行います。

In [11]:
!rm -rf oct2017/trainvalid

In [12]:
!mkdir oct2017/trainvalid
!cp -r oct2017/train/* oct2017/trainvalid
!cp -r oct2017/valid/* oct2017/trainvalid

最適なモデルを選択する段階で、数エポックの訓練だけでも十分に高い予測性能を獲得できたことがわかったので、ここでは訓練サブセットと検証サブセットを統合したデータに対して 5 エポックだけ訓練させます。

In [None]:
# model
model = resnet18(4)
model.to(device)

# training params
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# training data
train_dataset = torchvision.datasets.ImageFolder('oct2017/trainvalid', transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)

# training
num_epochs = 5
metric_dict = []

for epoch in range(num_epochs):
    model.train()

    running_loss = 0.0
    n_correct_train = 0
    n_train_samples = 0
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        loss = criterion(outputs, labels)

        _, predicted_labels = torch.max(outputs.data, 1)
        n_correct_train += torch.sum(predicted_labels == labels).item()
        n_train_samples += labels.size(0)
        running_loss +=  loss.item() / len(train_loader)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    lr_scheduler.step()

    metric_dict.append({
        'epoch': epoch + 1,
        'train_loss': running_loss,
        'train_acc': n_correct_train / n_train_samples,
    })
    print(metric_dict[-1])

訓練が完了したら、訓練済みモデルの重みをファイルに保存します。

In [14]:
model = model.to('cpu')
torch.save(model.state_dict(), 'oct2017.pth')

## モデル評価

最適なモデルが得られたら、次にテストデータを用いてモデルを詳細に評価します。正解率だけでなく、適合率、再現率、F1 スコアなどの評価指標を計算し、モデルを総合的に評価します。まず、テストデータをモデルに入力し、その予測結果を取得します。

In [15]:
model = resnet18(4, 'oct2017.pth')
model.to(device)
model.eval()

test_dataset = torchvision.datasets.ImageFolder('oct2017/test', transform=transform)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False)

pred_labels = []
true_labels = []
with torch.no_grad():
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        _, _labels = torch.max(outputs.data, 1)
        #print(_labels)
        pred_labels.extend(_labels.cpu().detach().numpy().tolist())
        true_labels.extend(labels.cpu().detach().numpy().tolist())

pred_labels = [test_dataset.classes[_] for _ in pred_labels]
true_labels = [test_dataset.classes[_] for _ in true_labels]

次に、予測結果とラベルを比較し、混同行列を作成します。これにより、間違いやすいカテゴリを特定することができます。

In [None]:
cm = sklearn.metrics.confusion_matrix(true_labels, pred_labels)
cm

In [None]:
cmp = sklearn.metrics.ConfusionMatrixDisplay(cm, display_labels=test_dataset.classes)
cmp.plot()

それぞれのクラスに対する適合率、再現率、F1 スコアなどは、scikit-learn ライブラリを利用して計算します。

In [None]:
pd.DataFrame(sklearn.metrics.classification_report(true_labels, pred_labels, output_dict=True))

## 推論


推論を行う際には、訓練や評価時と同様に、torchvision.models モジュールから ResNet 18 のアーキテクチャを読み込み、出力層のクラス数を設定します。その後、`load_state_dict` メソッドを使用して訓練済みの重みファイルをモデルにロードします。これらの処理はすでに関数化（`resnet18`）されているため、その関数を利用して簡単に実行できます。

In [None]:
labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']
model = resnet18(4, 'oct2017.pth')
model.to(device)
model.eval()

このモデルを使って推論を行います。まず、polyps の画像を 1 枚選び、訓練時と同じ前処理を施します。その後、前処理をした画像をモデルに入力し、予測結果を表示させます。

In [None]:
image_path = 'oct2017/test/CNV/CNV-198660-1.jpeg'

image = PIL.Image.open(image_path).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device)

with torch.no_grad():
    score = model(input_tensor)[0]

output = pd.DataFrame({
    'class': labels,
    'probability': torch.softmax(score, axis=0).cpu().detach().numpy() 
})
output

もう一例を見てみましょう。DME の画像をモデルに入力し、推論を行います。

In [None]:
image_path = 'oct2017/test/DME/DME-269181-1.jpeg'

image = PIL.Image.open(image_path).convert('RGB')
input_tensor = transform(image).unsqueeze(0).to(device)

with torch.no_grad():
    score = model(input_tensor)[0]

pd.DataFrame({
    'class': labels,
    'probability': torch.softmax(score, axis=0).cpu().detach().numpy() 
})

## 分類根拠の可視化

畳み込みニューラルネットワークを用いた画像分類では、畳み込み層で抽出された特徴マップが分類に大きな影響を与えています。そのため、最後の畳み込み層で得られた特徴マップと、それに対応する重みを可視化することで、モデルがどの部分に注目して分類を行ったのか、つまり判断の根拠を明確にすることができます。

本節では、Grad-CAM（Gradient-weighted Class Activation Mapping）および Guided Grad-CAM という手法を用いて、モデルの判断根拠を可視化します。可視化には Python の grad-cam パッケージを使用します。必要に応じてインストールし、grad-cam の[チュートリアル](https://jacobgil.github.io/pytorch-gradcam-book/introduction.html)を参考にしながら、Grad-CAM および Guided Grad-CAM を計算し、可視化するための関数を定義します。


In [22]:
def viz(image_path):
    # load models
    labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']
    model = resnet18(4, 'oct2017.pth')
    model.to(device)
    model.eval()

    # load image
    image = PIL.Image.open(image_path).convert('RGB')
    input_tensor = transform(image).unsqueeze(0).to(device)

    rgb_img = cv2.imread(image_path, 1)[:, :, ::-1]
    rgb_img = np.float32(np.array(SquareResize()(image))) / 255
    
    # Grad-CAM
    with pytorch_grad_cam.GradCAM(model=model, target_layers=[model.layer4]) as cam:
        cam.batch_size = 32
        grayscale_cam = cam(input_tensor=input_tensor, targets=None,aug_smooth=True, eigen_smooth=True)
        grayscale_cam = grayscale_cam[0, :]
        cam_image = pytorch_grad_cam.utils.image.show_cam_on_image(rgb_img, grayscale_cam, use_rgb=True)
        prob = torch.softmax(cam.outputs[0], axis=0).cpu().detach().numpy()

    gb_model = pytorch_grad_cam.GuidedBackpropReLUModel(model=model, device=device)
    gb = gb_model(input_tensor, target_category=None)
    cam_mask = np.stack([grayscale_cam, grayscale_cam, grayscale_cam], axis=-1)
    cam_gb = pytorch_grad_cam.utils.image.deprocess_image(cam_mask * gb)
    gb = pytorch_grad_cam.utils.image.deprocess_image(gb)
    
    # plot
    fig, ax = plt.subplots(2, 2)
    ax[0, 0].imshow(rgb_img)
    ax[0, 0].axis('off')
    ax[0, 0].set_title('Original Image', fontsize=16)
    ax[0, 1].imshow(cam_image)
    ax[0, 1].axis('off')
    ax[0, 1].set_title('Grad-CAM', fontsize=16)
    ax[1, 0].imshow(gb)
    ax[1, 0].axis('off')
    ax[1, 0].set_title('Guided Backpropagation', fontsize=16)
    ax[1, 1].imshow(cam_gb)
    ax[1, 1].axis('off')
    ax[1, 1].set_title('Guided Grad-CAM', fontsize=16)
    print(pd.DataFrame({'class': labels, 'probability': prob}))
    fig.show()


次に、いくつかの画像をこの可視化関数に入力し、モデルの予測結果とその判断根拠を可視化します。これにより、モデルがどの部分に注目して分類を行ったのかを視覚的に確認することができます。必要に応じて、他の画像を入力し、それぞれの分類結果と判断根拠を可視化してみてください。

In [None]:
viz('oct2017/test/CNV/CNV-198660-1.jpeg')

In [None]:
viz('oct2017/test/DME/DME-269181-1.jpeg')

In [None]:
viz('oct2017/test/DRUSEN/DRUSEN-1730592-1.jpeg')

In [None]:
viz('oct2017/test/NORMAL/NORMAL-402066-5.jpeg')

In [27]:
!rm -rf oct2017/trainvalid