In [1]:
import os
import sys
sys.path.append(os.path.abspath('..'))
from config import *

import random
import numpy as np
import torch
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

# 演習：内視鏡画像ポリープ領域分割

内視鏡画像からポリープのある部位を正常な組織から明確に分離し、正確に特定することで、ポリープ除去手術や治療の精度を向上させることができます。このような領域の特定にはセグメンテーション技術が欠かせません。本節では、セマンティックセグメンテーション手法の一つである DeepLab V3 を用いて、ポリープ領域を抽出する方法を学びます。

## 準備

### ライブラリ

本節で必要なライブラリを読み込みます。NumPy、Pandas、Matplotlib、Pillow（PIL）、OpenCV（cv2）は訓練過程の可視化や推論結果の表示に利用します。scikit-image（skimage）はマスクから輪郭線を計算する際に使用します。さらに、torch、torchvision、torchmetrics はインスタンスセグメンテーションモデルの訓練、検証、推論に利用します。

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import PIL

import sklearn.metrics
import torch
import torchvision
import torchvision.transforms.v2

print(f'torch v{torch.__version__}; torchvision v{torchvision.__version__}')

ライブラリの読み込み時に *ImportError* や *ModuleNotFoundError* が発生した場合は、該当するライブラリをインストールしてください。ライブラリのバージョンを揃える必要はありませんが、PyTorch（torch）および torchvision が上記のバージョンと異なる時、実行中に警告メッセージが現れたり、同じ結果にならなかったりする可能性があります。

### データセット

本章では、[Kvasir データセット](https://datasets.simula.no/kvasir/)[^kvasir_dataset]を使用します。このデータセットは内視鏡画像を集めた医療用データセットで、[Simula Research Laboratory](https://datasets.simula.no/) にて公開されています。Kvasir データセットは研究および教育目的に限り利用可能で、それ以外の用途での使用は許可されていません[^kvasir_termsofuse]。データセットを扱う際は、利用規約を必ず遵守してください。

オリジナルの Kvasir データセットでは、セグメンテーション用の画像にはポリープを含むもののみが収録されています。しかし、実際の医療現場では、ポリープが存在しない健常者の画像も含まれることが一般的です。そこで、本演習では、オリジナルの Kvasir セグメンテーション用データから一部を抽出し、そこに Kvasir 分類用データから健常者の内視鏡画像を追加して、新しいデータセットを作成しました。

新しいデータセットは、訓練データ 120 枚、検証データ 30 枚、テストデータ 30 枚で構成されています。訓練データは 100 枚の画像がポリープを持つ画像であり、残りの 20 枚は健全な画像です。また、検証データとテストデータはそれぞれ 20 枚のポリープ画像と 10 枚の健全画像が含まれています。

```{figure} ../_static/kvasir_detection_dataset.jpg
---
name: kvasir_detection_dataset_example
---
Kvasir データセットに含まれる各カテゴリのサンプル画像。
```

Jupyter Notebook 上では、次のコマンドを実行することでダウンロードできます。

```bash
!wget https://dl.biopapyrus.jp/data/kvasirdet.zip
!unzip kvasirdet.zip
```


[^kvasir_dataset]: Pogorelov et al. (2017) KVASIR: A Multi-Class Image Dataset for Computer Aided Gastrointestinal Disease Detection. *Proceedings of the 8th ACM on Multimedia Systems Conference*, [10.1145/3083187.3083212](https://doi.org/10.1145/3083187.3083212)

[^kvasir_termsofuse]: "The use of the Kvasir dataset is restricted for research and educational purposes only." [simula Kvasir](https://datasets.simula.no/kvasir/)

### 前処理

セマンティックセグメンテーションモデルを学習させるには、画像だけでなく、画像内のどこにどのようなオブジェクトが存在するかを示すマスクも同時に入力する必要があります。本節で使用する Kvasir データセットでは、アノテーションが COCO フォーマットで提供されています。そのため、この COCO フォーマットのアノテーションを適切に変換し、PyTorch が扱える形式であるテンソルに変換する必要があります。

In [3]:
class CocoDataset(torchvision.datasets.CocoDetection):
    def __init__(self, root, annFile, image_size=(512, 512)):
        super(CocoDataset, self).__init__(root, annFile)
        self.image_size = image_size
        self.transforms = torchvision.transforms.v2.Compose([
            torchvision.transforms.v2.ToDtype(torch.float, scale=True),
            torchvision.transforms.v2.ToPureTensor()
        ])

    def __getitem__(self, idx):
        image, target = super(CocoDataset, self).__getitem__(idx)

        labels = []
        boxes = []
        masks = []
        for obj in target:
            bbox = obj['bbox']
            bbox = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1] + bbox[3]]
            mask = PIL.Image.new('L', image.size, 0)
            for poly in obj['segmentation']:
                if isinstance(poly, list):
                    PIL.ImageDraw.Draw(mask).polygon(poly, outline=1, fill=1)
                else:
                    rle_mask = Image.fromarray((pycocotools.mask.decode(rle) * 255).astype(np.uint8))                    
                    mask = PIL.Image.composite(rle_mask, mask, rle_mask)
            labels.append(obj['category_id'])
            boxes.append(bbox)
            masks.append(torch.tensor(np.array(mask), dtype=torch.float32))

        if len(boxes) == 0: # dumy mask for non-polyp image
            boxes.append([0, 0, 1, 1])
            labels.append(0)
            masks.append(torch.tensor(np.array(PIL.Image.new('L', image.size, 0)), dtype=torch.float32))
        
        
        image = torchvision.transforms.Resize(self.image_size)(torchvision.transforms.functional.to_tensor(image))
        # convert masks to a single mask
        masks = [torchvision.transforms.Resize(self.image_size)(mask.unsqueeze(0)) for mask in masks] 
        masks = torch.stack(masks).squeeze(1)
        mask_combined = torch.zeros(self.image_size, dtype=torch.long)
        for i, mask in enumerate(masks):
            mask_combined[mask > 0] = labels[i]

        target = {
            'boxes': torch.as_tensor(boxes, dtype=torch.float32),
            'labels': torch.as_tensor(labels, dtype=torch.int64),
            'masks': mask_combined,
        }

        return image, target

            


def calculate_iou(pred_mask, true_mask, num_classes=2):
    ious = []
    for i in range(num_classes):
        pred = (pred_mask == i).cpu().numpy().flatten()
        true = (true_mask == i).cpu().numpy().flatten()
        ious.append(sklearn.metrics.jaccard_score(true, pred))
    return np.mean(ious)



### 計算デバイス

計算デバイスを設定します。PyTorch が GPU を認識できる場合は GPU を利用し、認識できない場合は CPU を利用します。

In [4]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## モデル構築

DeepLab V3 のアーキテクチャは、torchvision.models.segmentation モジュールで提供されています。しかし、このモジュールに含まれるアーキテクチャは、車や人など90種類の一般的なオブジェクトを対象としているため、そのままではポリープのセマンティックセグメンテーションには適用できません。この問題に対処するには、DeepLab V3 の出力層のユニット数を変更する必要があります。ただし、この修正をモデルを呼び出すたびに行うのは手間がかかります。そこで、指定されたクラス数に応じたアーキテクチャを生成し、必要に応じて出力層を修正する処理を関数として定義します。

なお、セマンティックセグメンテーションでは、物体検出と同様に背景も 1 つのクラスとして扱います。そのため、出力層のユニット数を変更する際には、セグメンテーション対象のクラス数に 1 を加えた数に設定する必要があります。

In [None]:
def deeplabv3(num_classes, weights=None):
    num_classes = num_classes + 1
    model = torchvision.models.segmentation.deeplabv3_resnet50(weights='DEFAULT')
    model.classifier[4] = torch.nn.Conv2d(256, num_classes, kernel_size=(1, 1), stride=(1, 1))
    if weights is not None:
        model.load_state_dict(torch.load(weights))
    return model

model = deeplabv3(1)
model.to(device)

## 訓練

訓練を開始する前に、モデルのパラメータを最適化するためのアルゴリズム（`optimizer`）と、学習率（`lr`）を動的に調整するスケジューラ（`lr_scheduler`）を設定します。セマンティックセグメンテーションでは、各ピクセルごとにクラス分類を行うため、損失関数（`criterion`）として多クラス分類で一般的に使用される交差エントロピー関数を採用します。

In [6]:
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
criterion = torch.nn.CrossEntropyLoss()

次に、訓練サブセットおよび検証サブセットを読み込み、モデルに入力できる形式に整えます。

In [None]:
train_loader = torch.utils.data.DataLoader(
                    CocoDataset('kvasirdet/train', 'kvasirdet/train/segm.json'),
                    batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
valid_loader = torch.utils.data.DataLoader(
                    CocoDataset('kvasirdet/valid', 'kvasirdet/valid/segm.json'),
                    batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

準備が整ったら、訓練を開始します。訓練プロセスでは、訓練と検証を繰り返します。訓練では、訓練データを使用してモデルのパラメータを更新し、訓練データにおける損失（誤差）を記録します。また、検証では検証データを利用してモデルの予測性能（IoU）を計算し記録します。

In [None]:
num_epochs= 10
metric_dict = []

for epoch in range(num_epochs):
    metric_dict_ = {'epoch': epoch + 1, 'train_loss': 0.0, 'valid_iou': 0.0}
    
    # training
    model.train()
    n_trains = 0
    for images, targets in train_loader:
        images = torch.stack([img.to(device) for img in images])
        masks = torch.stack([t['masks'].long().to(device) for t in targets])
        
        outputs = model(images)
        batch_loss = criterion(outputs['out'], masks)
        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()
        
        metric_dict_['train_loss'] += batch_loss.item()
        n_trains += len(targets)

    # validation
    model.eval()
    n_valids = 0
    with torch.no_grad():
        for images, targets in valid_loader:
            images = torch.stack([img.to(device) for img in images])
            masks =  torch.stack([t['masks'].long().to(device) for t in targets])
            
            outputs = model(images)
            preds = torch.argmax(outputs['out'], dim=1)
            
            for i in range(len(preds)):
                metric_dict_['valid_iou'] +=  calculate_iou(preds[i], masks[i])
            n_valids += len(targets)

    # compute avg. loss/IoU per sample
    metric_dict_['train_loss'] /= n_trains
    metric_dict_['valid_iou'] /= n_valids

    metric_dict.append(metric_dict_)
    print(metric_dict_)


訓練データに対する損失と検証データに対する予測性能（IoU）を可視化し、訓練過程を評価します。

In [None]:
metric_dict = pd.DataFrame(metric_dict)

fig, ax = plt.subplots(1, 2)
ax[0].plot(metric_dict['epoch'], metric_dict['train_loss'])
ax[0].set_xlabel('epoch')
ax[0].set_ylabel('loss')
ax[0].set_title('Train')
ax[1].plot(metric_dict['epoch'], metric_dict['valid_iou'])
ax[1].set_ylim(0, 1)
ax[1].set_xlabel('epoch')
ax[1].set_ylabel('IoU')
ax[1].set_title('Validation')
plt.tight_layout()
fig.show()

可視化の結果から、エポック数が増加するにつれて訓練データに対する損失が継続的に減少していることが確認できます。特に、10 エポック目においても訓練損失が収束する兆候は見られません。一方で、検証データに対する予測性能（IoU）は、数エポックで最大値に達し、その後は収束しているように見受けられます。

次に、この手順を U-Net など他の深層ニューラルネットワークのアーキテクチャに適用し、それぞれの検証性能を比較します。この比較によって、データセットに最も適したアーキテクチャを選定することが可能です。ただし、本節ではこの比較を省略し、DeepLab V3 を最適なアーキテクチャとして採用し、次のステップに進みます。

次のステップでは、訓練サブセットと検証サブセットを統合し、選定した最適なアーキテクチャを用いてモデルを初めから再訓練します。その準備として、まず訓練サブセットと検証サブセットを結合します。

In [10]:
!rm -rf kvasirdet/trainvalid/images

In [11]:
!mkdir kvasirdet/trainvalid/images
!cp kvasirdet/train/images/* kvasirdet/trainvalid/images
!cp kvasirdet/valid/images/* kvasirdet/trainvalid/images

次に、モデルの構築を行います。先ほど可視化した検証性能の推移グラフを確認した結果、数エポックの訓練で十分に高い予測性能を達成できることがわかりました。そこで、ここでは訓練サブセットと検証サブセットを統合したデータを用いて、5 エポックのみ訓練を行います。

In [None]:
model = deeplabv3(1)
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)
criterion = torch.nn.CrossEntropyLoss()

train_loader = torch.utils.data.DataLoader(
                    CocoDataset('kvasirdet/trainvalid', 'kvasirdet/trainvalid/segm.json'),
                    batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))


num_epochs= 5
metric_dict = []

for epoch in range(num_epochs):
    metric_dict_ = {'epoch': epoch + 1, 'train_loss': 0.0}
    
    # training
    model.train()
    n_trains = 0
    for images, targets in train_loader:
        images = torch.stack([img.to(device) for img in images])
        masks = torch.stack([t['masks'].long().to(device) for t in targets])
        
        outputs = model(images)
        batch_loss = criterion(outputs['out'], masks)
        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()
        
        metric_dict_['train_loss'] += batch_loss.item()
        n_trains += len(targets)

    # compute metrics per sample
    metric_dict_['train_loss'] /= n_trains
    metric_dict.append(metric_dict_)
    print(metric_dict_)


訓練が完了したら、訓練済みモデルの重みをファイルに保存します。

In [13]:
model.to('cpu')
torch.save(model.state_dict(), 'kvasirsegm.pth')

## 推論

推論時にも、訓練時と同じように torchvision モジュールからアーキテクチャを呼び出し、出力層のクラス数を設定します。その後、`load_state_dict` メソッドを使って、訓練済みの重みファイルをモデルにロードします。これらの操作はすべて `deeplabv3` 関数で定義されているので、その関数を利用します。

In [None]:
model = deeplabv3(1, 'kvasirsegm.pth')
model.to(device)
model.eval()

このモデルを利用して推論を行います。まず、1 枚の画像を指定し、PIL モジュールを用いて画像を開きます。その後、画像をテンソル形式に変換してモデルに入力します。モデルは予測結果として、分類対象のカテゴリ数と同じ数のマスクを生成します。これらのマスクについて、各画素位置で値を比較し、最も大きな値を持つマスクが何番目かを調べることで、その画素がどのクラスに属しているかを判定します。

すべての画素について判定した結果を1枚のマスク（`pred_mask`）として保存します。このマスク（`pred_mask`）は入力画像と同じ解像度を持ち、要素は0、1、2、...といった整数で構成されます。ここで、0 の領域は背景を表し、1 の領域は ID が 1 のオブジェクトを、2 の領域は ID が 2 のオブジェクトを示すようになっています。

In [15]:
threshold = 0.5
image_path = 'kvasirdet/test/images/cju2lz8vqktne0993fuym6drw.jpg'

image = PIL.Image.open(image_path).convert('RGB')
input_tensor = torchvision.transforms.functional.to_tensor(image).unsqueeze(0).to(device)

with torch.no_grad():
    predictions = model(input_tensor)['out']

pred_mask = torch.argmax(predictions, dim=1).squeeze(0).cpu().numpy()


本節では、ポリープの検出のみを行っているため、生成されたマスクは 0（背景）または 1（ポリープ）となっています。このため、1 の領域を緑色に染めて可視化してみましょう。

In [None]:
pred_mask = (pred_mask == 1).astype(np.uint8)
image = np.array(image)

mask_overlay = np.zeros_like(image)
mask_overlay[pred_mask == 1] = [0, 255, 0]

image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
mask_overlay = cv2.cvtColor(mask_overlay, cv2.COLOR_RGB2BGR)

overlayed_image = cv2.addWeighted(image, 0.7, mask_overlay, 0.3, 0)
overlayed_image = cv2.cvtColor(overlayed_image, cv2.COLOR_BGR2RGB)

plt.figure(figsize=(10, 10))
plt.imshow(overlayed_image)
plt.axis('off')
plt.show()