AI画像品質評価メトリクス LPIPS・SSIM実践ガイド 2025
公開: 2025年9月26日 · 読了目安: 16 分 · 著者: Unified Image Tools 編集部
画像処理の品質評価は、従来の数値的メトリクスから人間の知覚に基づくAI評価へと進化しています。本記事では、LPIPS(Learned Perceptual Image Patch Similarity)、SSIM(Structural Similarity Index Measure)をはじめとする最新の評価手法を実装レベルで詳しく解説します。
AI画像品質評価の進化
従来手法の限界
PSNR(Peak Signal-to-Noise Ratio)の問題
- ピクセル単位の差分のみを評価
- 人間の知覚との乖離が大きい
- 構造的な類似性を無視
- 圧縮アーティファクトを適切に評価できない
新しいアプローチの必要性
- 人間の視覚システムの模倣
- 深層学習による特徴抽出
- 知覚的類似度の定量化
- コンテンツ適応型評価
内部リンク: 画像品質バジェットとCIゲート 2025 — 破綻を未然に防ぐ運用, 画像圧縮 完全戦略 2025 ─ 画質を守りつつ体感速度を最適化する実践ガイド
LPIPS:学習ベース知覚メトリクス
LPIPSの理論的基盤
LPIPS(Learned Perceptual Image Patch Similarity)は、深層ニューラルネットワークの特徴表現を活用した知覚的類似度メトリクスです。
import torch
import torch.nn as nn
import lpips
from torchvision import models, transforms
class LPIPSEvaluator:
def __init__(self, net='alex', use_gpu=True):
"""
LPIPSモデルの初期化
net: 'alex', 'vgg', 'squeeze'から選択
"""
self.loss_fn = lpips.LPIPS(net=net)
self.device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')
self.loss_fn.to(self.device)
# 前処理パイプライン
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def calculate_lpips(self, img1, img2):
"""
2つの画像間のLPIPS距離を計算
"""
# 前処理
tensor1 = self.transform(img1).unsqueeze(0).to(self.device)
tensor2 = self.transform(img2).unsqueeze(0).to(self.device)
# LPIPS計算
with torch.no_grad():
distance = self.loss_fn(tensor1, tensor2)
return distance.item()
def batch_evaluate(self, image_pairs):
"""
バッチ処理でLPIPS評価
"""
results = []
for img1, img2 in image_pairs:
lpips_score = self.calculate_lpips(img1, img2)
results.append({
'lpips_distance': lpips_score,
'perceptual_similarity': 1 - lpips_score, # 類似度として表現
'quality_category': self.categorize_quality(lpips_score)
})
return results
def categorize_quality(self, lpips_score):
"""
LPIPSスコアに基づく品質カテゴリ分類
"""
if lpips_score < 0.1:
return 'excellent'
elif lpips_score < 0.2:
return 'good'
elif lpips_score < 0.4:
return 'acceptable'
else:
return 'poor'
カスタムLPIPSモデルの構築
class CustomLPIPSNetwork(nn.Module):
def __init__(self, backbone='resnet50'):
super().__init__()
# バックボーンネットワークの選択
if backbone == 'resnet50':
self.features = models.resnet50(pretrained=True)
self.features = nn.Sequential(*list(self.features.children())[:-2])
elif backbone == 'efficientnet':
self.features = models.efficientnet_b0(pretrained=True).features
# 特徴抽出層
self.feature_layers = [1, 4, 8, 12, 16] # 抽出する層のインデックス
# 線形変換層
self.linear_layers = nn.ModuleList([
nn.Sequential(
nn.Conv2d(64, 1, 1, bias=False),
nn.GroupNorm(1, 1, affine=False)
),
nn.Sequential(
nn.Conv2d(256, 1, 1, bias=False),
nn.GroupNorm(1, 1, affine=False)
),
nn.Sequential(
nn.Conv2d(512, 1, 1, bias=False),
nn.GroupNorm(1, 1, affine=False)
)
])
def forward(self, x1, x2):
# 特徴抽出
features1 = self.extract_features(x1)
features2 = self.extract_features(x2)
# 各層での距離計算
distances = []
for i, (f1, f2) in enumerate(zip(features1, features2)):
# L2正規化
f1_norm = f1 / (torch.norm(f1, dim=1, keepdim=True) + 1e-8)
f2_norm = f2 / (torch.norm(f2, dim=1, keepdim=True) + 1e-8)
# 距離計算
diff = (f1_norm - f2_norm) ** 2
# 線形変換
if i < len(self.linear_layers):
diff = self.linear_layers[i](diff)
# 空間平均
distance = torch.mean(diff, dim=[2, 3])
distances.append(distance)
# 加重平均
total_distance = sum(distances) / len(distances)
return total_distance
SSIM:構造的類似度指標
SSIM の数学的定義
import numpy as np
from skimage.metrics import structural_similarity
from scipy.ndimage import gaussian_filter
class SSIMEvaluator:
def __init__(self, window_size=11, k1=0.01, k2=0.03, sigma=1.5):
self.window_size = window_size
self.k1 = k1
self.k2 = k2
self.sigma = sigma
def calculate_ssim(self, img1, img2, data_range=1.0):
"""
基本的なSSIM計算
"""
return structural_similarity(
img1, img2,
data_range=data_range,
multichannel=True,
gaussian_weights=True,
sigma=self.sigma,
use_sample_covariance=False
)
def calculate_ms_ssim(self, img1, img2, weights=None):
"""
Multi-Scale SSIM(MS-SSIM)の実装
"""
if weights is None:
weights = [0.0448, 0.2856, 0.3001, 0.2363, 0.1333]
levels = len(weights)
mssim = 1.0
for i in range(levels):
ssim_val = self.calculate_ssim(img1, img2)
if i < levels - 1:
# ダウンサンプリング
img1 = self.downsample(img1)
img2 = self.downsample(img2)
mssim *= ssim_val ** weights[i]
else:
mssim *= ssim_val ** weights[i]
return mssim
def downsample(self, img):
"""
ガウシアンフィルタリング+ダウンサンプリング
"""
filtered = gaussian_filter(img, sigma=1.0, axes=[0, 1])
return filtered[::2, ::2]
def ssim_map(self, img1, img2):
"""
SSIM マップの生成
"""
# グレースケール変換
if len(img1.shape) == 3:
img1_gray = np.mean(img1, axis=2)
img2_gray = np.mean(img2, axis=2)
else:
img1_gray = img1
img2_gray = img2
# 平均
mu1 = gaussian_filter(img1_gray, self.sigma)
mu2 = gaussian_filter(img2_gray, self.sigma)
mu1_sq = mu1 ** 2
mu2_sq = mu2 ** 2
mu1_mu2 = mu1 * mu2
# 分散・共分散
sigma1_sq = gaussian_filter(img1_gray ** 2, self.sigma) - mu1_sq
sigma2_sq = gaussian_filter(img2_gray ** 2, self.sigma) - mu2_sq
sigma12 = gaussian_filter(img1_gray * img2_gray, self.sigma) - mu1_mu2
# SSIM計算
c1 = (self.k1 * 1.0) ** 2
c2 = (self.k2 * 1.0) ** 2
ssim_map = ((2 * mu1_mu2 + c1) * (2 * sigma12 + c2)) / \
((mu1_sq + mu2_sq + c1) * (sigma1_sq + sigma2_sq + c2))
return ssim_map
先進的評価メトリクス
DISTS:Deep Image Structure and Texture Similarity
import torch
import torchvision.models as models
class DISTSEvaluator:
def __init__(self, use_gpu=True):
self.device = torch.device('cuda' if use_gpu and torch.cuda.is_available() else 'cpu')
# VGGネットワークの特徴抽出部分を使用
vgg = models.vgg16(pretrained=True).features
self.stages = nn.ModuleList([
vgg[:4], # conv1_2
vgg[:9], # conv2_2
vgg[:16], # conv3_3
vgg[:23], # conv4_3
vgg[:30] # conv5_3
]).to(self.device)
for param in self.stages.parameters():
param.requires_grad = False
def extract_features(self, x):
features = []
for stage in self.stages:
x = stage(x)
features.append(x)
return features
def calculate_dists(self, img1, img2):
"""
DISTS(Deep Image Structure and Texture Similarity)計算
"""
# 前処理
tensor1 = self.preprocess(img1).to(self.device)
tensor2 = self.preprocess(img2).to(self.device)
# 特徴抽出
feats1 = self.extract_features(tensor1)
feats2 = self.extract_features(tensor2)
structure_score = 0
texture_score = 0
for f1, f2 in zip(feats1, feats2):
# Structure similarity (mean similarity)
struct_sim = self.structure_similarity(f1, f2)
structure_score += struct_sim
# Texture similarity (covariance similarity)
texture_sim = self.texture_similarity(f1, f2)
texture_score += texture_sim
# 重み付き合成
alpha = 0.8 # structure weight
beta = 0.2 # texture weight
dists_score = alpha * structure_score + beta * texture_score
return dists_score.item()
def structure_similarity(self, feat1, feat2):
"""
構造類似度の計算
"""
# チャンネル方向の平均
mean1 = torch.mean(feat1, dim=1, keepdim=True)
mean2 = torch.mean(feat2, dim=1, keepdim=True)
# 構造的類似度
numerator = 2 * mean1 * mean2
denominator = mean1 ** 2 + mean2 ** 2
structure_map = numerator / (denominator + 1e-8)
return torch.mean(structure_map)
def texture_similarity(self, feat1, feat2):
"""
テクスチャ類似度の計算
"""
# 特徴マップの共分散行列計算
b, c, h, w = feat1.shape
feat1_flat = feat1.view(b, c, -1)
feat2_flat = feat2.view(b, c, -1)
# 共分散計算
cov1 = torch.bmm(feat1_flat, feat1_flat.transpose(1, 2)) / (h * w - 1)
cov2 = torch.bmm(feat2_flat, feat2_flat.transpose(1, 2)) / (h * w - 1)
# Frobenius norm による類似度
diff_norm = torch.norm(cov1 - cov2, 'fro', dim=[1, 2])
max_norm = torch.maximum(torch.norm(cov1, 'fro', dim=[1, 2]),
torch.norm(cov2, 'fro', dim=[1, 2]))
texture_sim = 1 - diff_norm / (max_norm + 1e-8)
return torch.mean(texture_sim)
FID:Fréchet Inception Distance
from scipy.linalg import sqrtm
import numpy as np
class FIDEvaluator:
def __init__(self):
# Inception v3 モデル(特徴抽出用)
self.inception = models.inception_v3(pretrained=True, transform_input=False)
self.inception.fc = nn.Identity() # 分類層を削除
self.inception.eval()
for param in self.inception.parameters():
param.requires_grad = False
def extract_features(self, images):
"""
Inception v3 による特徴抽出
"""
features = []
with torch.no_grad():
for img in images:
# 適切なサイズにリサイズ(299x299)
img_resized = F.interpolate(img.unsqueeze(0),
size=(299, 299),
mode='bilinear')
feat = self.inception(img_resized)
features.append(feat.cpu().numpy())
return np.concatenate(features, axis=0)
def calculate_fid(self, real_images, generated_images):
"""
FID(Fréchet Inception Distance)の計算
"""
# 特徴抽出
real_features = self.extract_features(real_images)
gen_features = self.extract_features(generated_images)
# 統計計算
mu_real = np.mean(real_features, axis=0)
sigma_real = np.cov(real_features, rowvar=False)
mu_gen = np.mean(gen_features, axis=0)
sigma_gen = np.cov(gen_features, rowvar=False)
# Fréchet距離計算
diff = mu_real - mu_gen
covmean = sqrtm(sigma_real.dot(sigma_gen))
# 数値誤差による虚数成分を除去
if np.iscomplexobj(covmean):
covmean = covmean.real
fid = diff.dot(diff) + np.trace(sigma_real + sigma_gen - 2 * covmean)
return fid
統合評価システムの構築
マルチメトリクス評価器
class ComprehensiveQualityEvaluator:
def __init__(self):
self.lpips_evaluator = LPIPSEvaluator()
self.ssim_evaluator = SSIMEvaluator()
self.dists_evaluator = DISTSEvaluator()
self.fid_evaluator = FIDEvaluator()
# 重み設定
self.weights = {
'lpips': 0.3,
'ssim': 0.3,
'dists': 0.2,
'psnr': 0.1,
'fid': 0.1
}
def evaluate_single_pair(self, img1, img2):
"""
画像ペアの総合品質評価
"""
results = {}
# LPIPS
results['lpips'] = self.lpips_evaluator.calculate_lpips(img1, img2)
# SSIM
results['ssim'] = self.ssim_evaluator.calculate_ssim(img1, img2)
# DISTS
results['dists'] = self.dists_evaluator.calculate_dists(img1, img2)
# PSNR(参考値)
results['psnr'] = self.calculate_psnr(img1, img2)
# 総合スコア計算
composite_score = self.calculate_composite_score(results)
results['composite_score'] = composite_score
# 品質レベル判定
results['quality_level'] = self.determine_quality_level(composite_score)
return results
def calculate_psnr(self, img1, img2):
"""
PSNR計算
"""
mse = np.mean((img1 - img2) ** 2)
if mse == 0:
return float('inf')
return 20 * np.log10(1.0 / np.sqrt(mse))
def calculate_composite_score(self, metrics):
"""
複数メトリクスの統合スコア
"""
# 各メトリクスを0-1の範囲に正規化
normalized_scores = {
'lpips': 1 - min(metrics['lpips'], 1.0), # 低いほど良い
'ssim': metrics['ssim'], # 高いほど良い
'dists': metrics['dists'], # 高いほど良い
'psnr': min(metrics['psnr'] / 50, 1.0), # 正規化
}
# 重み付き合成
composite = sum(
self.weights[metric] * score
for metric, score in normalized_scores.items()
if metric in self.weights
)
return composite
def determine_quality_level(self, score):
"""
スコアに基づく品質レベル判定
"""
if score >= 0.9:
return 'excellent'
elif score >= 0.8:
return 'very_good'
elif score >= 0.7:
return 'good'
elif score >= 0.6:
return 'acceptable'
elif score >= 0.5:
return 'poor'
else:
return 'very_poor'
バッチ処理システム
import asyncio
import aiofiles
from pathlib import Path
class BatchQualityEvaluator:
def __init__(self, evaluator, max_workers=4):
self.evaluator = evaluator
self.max_workers = max_workers
self.semaphore = asyncio.Semaphore(max_workers)
async def evaluate_directory(self, original_dir, processed_dir, output_file):
"""
ディレクトリ単位でのバッチ評価
"""
original_path = Path(original_dir)
processed_path = Path(processed_dir)
# 画像ファイルのペアを取得
image_pairs = self.get_image_pairs(original_path, processed_path)
# 並列処理でバッチ評価
tasks = [
self.evaluate_pair_async(orig, proc)
for orig, proc in image_pairs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# レポート生成
report = self.generate_report(image_pairs, results)
# 結果保存
await self.save_report(report, output_file)
return report
async def evaluate_pair_async(self, original_path, processed_path):
"""
画像ペアの非同期評価
"""
async with self.semaphore:
# 画像読み込み
img1 = await self.load_image_async(original_path)
img2 = await self.load_image_async(processed_path)
# 評価実行
result = self.evaluator.evaluate_single_pair(img1, img2)
result['original_path'] = str(original_path)
result['processed_path'] = str(processed_path)
return result
async def load_image_async(self, path):
"""
画像の非同期読み込み
"""
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
# PIL で画像デコード
from PIL import Image
import io
img = Image.open(io.BytesIO(data))
return np.array(img) / 255.0
def generate_report(self, image_pairs, results):
"""
評価レポートの生成
"""
successful_results = [r for r in results if not isinstance(r, Exception)]
# 統計計算
stats = {
'total_images': len(image_pairs),
'successful_evaluations': len(successful_results),
'average_composite_score': np.mean([r['composite_score'] for r in successful_results]),
'average_lpips': np.mean([r['lpips'] for r in successful_results]),
'average_ssim': np.mean([r['ssim'] for r in successful_results]),
'quality_distribution': self.calculate_quality_distribution(successful_results)
}
report = {
'summary': stats,
'detailed_results': successful_results,
'failed_evaluations': [r for r in results if isinstance(r, Exception)]
}
return report
async def save_report(self, report, output_file):
"""
レポートのJSON保存
"""
import json
async with aiofiles.open(output_file, 'w') as f:
await f.write(json.dumps(report, indent=2, default=str))
実時間品質モニタリング
リアルタイム品質監視
import threading
import queue
from collections import deque
class RealTimeQualityMonitor:
def __init__(self, evaluator, window_size=100):
self.evaluator = evaluator
self.window_size = window_size
self.quality_history = deque(maxlen=window_size)
self.alert_queue = queue.Queue()
self.is_running = False
# アラート閾値
self.thresholds = {
'composite_score': {
'warning': 0.6,
'critical': 0.4
},
'lpips': {
'warning': 0.3,
'critical': 0.5
}
}
def start_monitoring(self, input_queue):
"""
リアルタイム監視開始
"""
self.is_running = True
monitor_thread = threading.Thread(
target=self.monitor_loop,
args=(input_queue,)
)
monitor_thread.start()
return monitor_thread
def monitor_loop(self, input_queue):
"""
監視メインループ
"""
while self.is_running:
try:
# キューから画像ペアを取得
img_pair = input_queue.get(timeout=1.0)
if img_pair is None: # 終了シグナル
break
# 品質評価
result = self.evaluator.evaluate_single_pair(*img_pair)
# 履歴に追加
self.quality_history.append(result)
# アラートチェック
self.check_alerts(result)
# 統計更新
self.update_statistics()
except queue.Empty:
continue
except Exception as e:
print(f"Monitoring error: {e}")
def check_alerts(self, result):
"""
アラート条件チェック
"""
for metric, thresholds in self.thresholds.items():
if metric in result:
value = result[metric]
if value < thresholds['critical']:
self.alert_queue.put({
'level': 'critical',
'metric': metric,
'value': value,
'threshold': thresholds['critical'],
'timestamp': time.time()
})
elif value < thresholds['warning']:
self.alert_queue.put({
'level': 'warning',
'metric': metric,
'value': value,
'threshold': thresholds['warning'],
'timestamp': time.time()
})
def get_current_statistics(self):
"""
現在の統計情報取得
"""
if not self.quality_history:
return {}
recent_scores = [r['composite_score'] for r in self.quality_history]
recent_lpips = [r['lpips'] for r in self.quality_history]
return {
'window_size': len(self.quality_history),
'average_quality': np.mean(recent_scores),
'quality_trend': self.calculate_trend(recent_scores),
'average_lpips': np.mean(recent_lpips),
'quality_stability': np.std(recent_scores)
}
品質最適化の自動化
動的パラメータ調整
class AdaptiveQualityOptimizer:
def __init__(self, evaluator, target_quality=0.8):
self.evaluator = evaluator
self.target_quality = target_quality
self.parameter_history = []
# 最適化対象パラメータ
self.parameters = {
'compression_quality': {'min': 50, 'max': 100, 'current': 85},
'resize_algorithm': {'options': ['lanczos', 'bicubic', 'bilinear'], 'current': 'lanczos'},
'sharpening_strength': {'min': 0.0, 'max': 2.0, 'current': 1.0}
}
def optimize_parameters(self, test_images, max_iterations=50):
"""
品質目標に向けたパラメータ最適化
"""
best_params = self.parameters.copy()
best_quality = 0
for iteration in range(max_iterations):
# 現在のパラメータで処理
processed_images = self.process_with_parameters(
test_images, self.parameters
)
# 品質評価
avg_quality = self.evaluate_batch_quality(
test_images, processed_images
)
print(f"Iteration {iteration + 1}: Quality = {avg_quality:.3f}")
# 最良結果の更新
if avg_quality > best_quality:
best_quality = avg_quality
best_params = self.parameters.copy()
# 目標達成チェック
if avg_quality >= self.target_quality:
print(f"Target quality {self.target_quality} achieved!")
break
# パラメータ更新
self.update_parameters(avg_quality)
# 履歴記録
self.parameter_history.append({
'iteration': iteration,
'parameters': self.parameters.copy(),
'quality': avg_quality
})
return best_params, best_quality
def update_parameters(self, current_quality):
"""
現在の品質に基づくパラメータ更新
"""
quality_gap = self.target_quality - current_quality
# 品質が低い場合はより保守的な設定に
if quality_gap > 0.1:
# 圧縮品質を上げる
self.parameters['compression_quality']['current'] = min(
100,
self.parameters['compression_quality']['current'] + 5
)
# シャープネスを下げる
self.parameters['sharpening_strength']['current'] = max(
0.0,
self.parameters['sharpening_strength']['current'] - 0.1
)
# 品質が十分高い場合は効率重視に
elif quality_gap < -0.05:
self.parameters['compression_quality']['current'] = max(
50,
self.parameters['compression_quality']['current'] - 2
)
実装とデプロイメント
Docker化された評価サービス
FROM pytorch/pytorch:1.9.0-cuda10.2-cudnn7-runtime
WORKDIR /app
# 依存関係インストール
COPY requirements.txt .
RUN pip install -r requirements.txt
# アプリケーションコード
COPY src/ ./src/
COPY models/ ./models/
# エントリーポイント
COPY entrypoint.sh .
RUN chmod +x entrypoint.sh
EXPOSE 8080
ENTRYPOINT ["./entrypoint.sh"]
Web API の実装
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
app = FastAPI(title="Image Quality Evaluation API")
# グローバル評価器
quality_evaluator = ComprehensiveQualityEvaluator()
@app.post("/evaluate/single")
async def evaluate_single_image(
original: UploadFile = File(...),
processed: UploadFile = File(...)
):
"""
単一画像ペアの評価
"""
try:
# 画像読み込み
original_img = await load_upload_image(original)
processed_img = await load_upload_image(processed)
# 評価実行
result = quality_evaluator.evaluate_single_pair(
original_img, processed_img
)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/evaluate/batch")
async def evaluate_batch_images(
files: List[UploadFile] = File(...)
):
"""
バッチ評価
"""
if len(files) % 2 != 0:
raise HTTPException(
status_code=400,
detail="Even number of files required (original + processed pairs)"
)
results = []
for i in range(0, len(files), 2):
original_img = await load_upload_image(files[i])
processed_img = await load_upload_image(files[i + 1])
result = quality_evaluator.evaluate_single_pair(
original_img, processed_img
)
results.append(result)
# 統計計算
summary = {
'total_pairs': len(results),
'average_quality': np.mean([r['composite_score'] for r in results]),
'quality_distribution': calculate_quality_distribution(results)
}
return JSONResponse(content={
'summary': summary,
'results': results
})
@app.get("/health")
async def health_check():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
まとめ
AI画像品質評価メトリクスは、従来の数値的指標を大きく超える精度で人間の知覚を反映した評価を可能にします。本記事で紹介した技術により、画像処理システムの品質管理を大幅に改善できます。
主要なポイント:
- 多角的評価: LPIPS、SSIM、DISTS の組み合わせで包括的な品質評価
- 実時間監視: リアルタイム品質モニタリングによる早期問題発見
- 自動最適化: 品質目標に向けた動的パラメータ調整
- スケーラビリティ: バッチ処理とAPI化による大規模運用対応
内部リンク: 画像品質バジェットとCIゲート 2025 — 破綻を未然に防ぐ運用, 画像圧縮 完全戦略 2025 ─ 画質を守りつつ体感速度を最適化する実践ガイド, フォーマット変換の戦略 2025 — WebP/AVIF/JPEG/PNG を使い分ける指針