机器学习--长尾分布

姜智浩 Lv4

长尾分布(Long-Tail Distribution) 是指数据集中少数类别(头部)占据大量样本,而多数类别(尾部)只有极少样本的现象。这种分布广泛存在于现实场景(如推荐系统、图像分类、自然语言处理等),对模型训练和评估带来显著挑战。

特点

  1. 头部类别(Head Classes)

    • 数量少但样本占比极高(如20%的类别覆盖80%的数据)。
    • 模型容易过拟合这些类别,导致对尾部类别表现差。
  2. 尾部类别(Tail Classes)

    • 数量多但样本极少(如每个类别仅几个样本)。
    • 因数据不足,模型难以学习有效特征,导致欠拟合。

常见场景

  • 图像分类:大规模数据集(如ImageNet)中稀有物体类别。
  • 推荐系统:热门商品点击量巨大,冷门商品极少被交互。
  • 自然语言处理:高频词汇vs.低频长尾词汇。
  • 异常检测:异常样本通常远少于正常样本。

解决长尾分布的方法

1. 数据层面

  • 重采样(Re-sampling)

    • 过采样(Oversampling):对尾部类别重复样本或生成新样本(如SMOTE、GANs)。
    • 欠采样(Undersampling):减少头部类别的样本,可能丢失信息。
    • 混合采样:结合过采样和欠采样。
  • 数据增强(Data Augmentation)

    • 对尾部类别使用增强技术(如旋转、裁剪、Mixup、CutMix)生成多样化样本。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from torch.utils.data import WeightedRandomSampler, Dataset
import numpy as np

# `labels`是类别标签列表
labels = [0, 0, 0, 1, 1, 2, 2, 2, 2, 3] # 类别0和2样本多,1和3样本少

# 计算每个类别的样本数
class_counts = np.bincount(labels)
num_samples = len(labels)

# 方法1:过采样(对少数类样本增加权重)
class_weights = 1. / class_counts # 逆频率权重
sample_weights = class_weights[labels] # 每个样本的权重
sampler = WeightedRandomSampler(sample_weights, num_samples, replacement=True)

# 方法2:欠采样(对多数类样本降权)
# 设置每个类别的采样数量为最小类别的样本数
min_samples = min(class_counts)
sample_weights = np.array([1.0 / class_counts[label] for label in labels])
sampler = WeightedRandomSampler(sample_weights, min_samples * len(class_counts), replacement=False)

# 在DataLoader中使用sampler
from torch.utils.data import DataLoader
dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
1
2
3
4
5
6
7
8
9
10
11
12
13
from imblearn.over_sampling import SMOTE
import numpy as np

# 假设特征X和标签y
X = np.random.rand(100, 10) # 100个样本,10维特征
y = np.array([0]*80 + [1]*15 + [2]*5) # 长尾分布

# 使用SMOTE生成少数类样本
smote = SMOTE(sampling_strategy='auto')
X_resampled, y_resampled = smote.fit_resample(X, y)

print(f"Original counts: {np.bincount(y)}")
print(f"Resampled counts: {np.bincount(y_resampled)}")

2. 损失函数设计

  • 类别加权损失(Class-Weighted Loss)
    • 为不同类别分配权重(如逆类别频率),使模型更关注尾部类别。
    • 例如:weight = 1 / sqrt(class_count)
      代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  import torch
import torch.nn as nn

# 假设类别数量为4,计算每个类别的权重(逆频率)
class_counts = torch.tensor([1000, 100, 10, 1]) # 长尾分布
weights = 1.0 / class_counts
weights = weights / weights.sum() # 归一化

# 定义加权交叉熵损失
criterion = nn.CrossEntropyLoss(weight=weights)

# 示例用法
logits = torch.randn(4, 4) # 模型输出(batch_size=4, num_classes=4)
targets = torch.tensor([0, 1, 2, 3]) # 真实标签
loss = criterion(logits, targets)
  • 焦点损失(Focal Loss)
    • 降低易分类样本(通常是头部类别)的损失权重,聚焦难样本(尾部)。
    • 公式:FL(p_t) = -α_t (1 - p_t)^γ log(p_t),其中γ调节难易样本权重。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class FocalLoss(nn.Module):
def __init__(self, alpha=1.0, gamma=2.0):
super().__init__()
self.alpha = alpha # 类别平衡参数
self.gamma = gamma # 难易样本调节参数

def forward(self, inputs, targets):
ce_loss = nn.functional.cross_entropy(inputs, targets, reduction='none')
pt = torch.exp(-ce_loss) # 模型对真实类别的预测概率
loss = self.alpha * (1 - pt) ** self.gamma * ce_loss
return loss.mean()

# 示例用法
focal_loss = FocalLoss(alpha=[1.0, 2.0, 5.0, 10.0], gamma=2) # alpha可对不同类别加权
loss = focal_loss(logits, targets)
  • 解耦训练(Decoupling)
    • 先学习特征表示(均匀采样),再调整分类器(重采样或重加权)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 第一阶段:均匀采样学习特征
uniform_sampler = WeightedRandomSampler(
weights=torch.ones(len(dataset)), # 均匀权重
num_samples=len(dataset),
replacement=True
)
dataloader_stage1 = DataLoader(dataset, batch_size=32, sampler=uniform_sampler)

# 第二阶段:冻结特征层,调整分类器(使用类别加权损失)
for param in model.feature_extractor.parameters():
param.requires_grad = False # 冻结特征层

criterion = nn.CrossEntropyLoss(weight=class_weights) # 使用加权损失
optimizer = torch.optim.Adam(model.classifier.parameters(), lr=0.001)

3. 模型结构改进

  • 解耦框架(Decoupling Framework)
    • Decoupling(NeurIPS 2019)分离特征学习和分类器调整。
  • 两阶段训练
    • 第一阶段:正常训练;第二阶段:冻结骨干网络,微调分类器(使用重采样或加权)。
  • 专家混合(Mixture of Experts, MoE)
    • 为不同类别分配专用子模型(专家)。

4. 迁移学习 & 自监督学习

  • 预训练 + 微调:在大规模平衡数据上预训练,再在长尾数据上微调。
  • 自监督学习:通过对比学习(如SimCLR)学习通用特征,减少对标签的依赖。

5. 评估指标优化

  • 避免单一准确率(Accuracy),采用更全面的指标:
    • 宏平均(Macro-F1):各类别F1的均值,平等对待所有类别。
1
2
3
4
5
6
7
8
9
from sklearn.metrics import f1_score

def macro_f1(y_true, y_pred):
return f1_score(y_true, y_pred, average='macro')

# 示例
y_true = [0, 1, 2, 0, 1, 2]
y_pred = [0, 1, 1, 0, 0, 2]
print(f"Macro-F1: {macro_f1(y_true, y_pred)}")
  • 平衡准确率(Balanced Accuracy):各类别召回率的均值。
  • AUC-ROC:衡量模型在不同阈值下的整体性能。

经典论文与模型

  • Decoupling(NeurIPS 2019):解耦表示学习和分类器调整。
  • BBN(CVPR 2020):双分支网络平衡重采样和原始分布。
  • Logit Adjustment(ICML 2020):通过调整logit偏移解决类别不平衡。
  • Balanced Softmax(NeurIPS 2020):修改Softmax适应长尾分布。

方向

  • 过拟合与泛化:尾部数据不足易导致过拟合,需更好的正则化或小样本学习技术。
  • 动态长尾分布:现实世界中类别分布可能随时间变化(如热门商品更替)。
  • 无监督长尾学习:探索自监督或半监督方法减少对标签的依赖。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Subset
import torch
import numpy as np

# 加载 CIFAR-10 原始训练集
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)

# 统计每类索引
class_indices = [[] for _ in range(10)]
for idx, (_, label) in enumerate(train_dataset):
class_indices[label].append(idx)

# 人为制造长尾分布,例如类0有5000个样本,类9只有10个样本
long_tail_counts = [5000, 2000, 1000, 500, 200, 100, 50, 30, 20, 10]
selected_indices = []
for i, count in enumerate(long_tail_counts):
selected_indices.extend(class_indices[i][:count])

# 构建新的训练子集
long_tail_dataset = Subset(train_dataset, selected_indices)


# 获取子集标签
labels = [train_dataset[i][1] for i in selected_indices]
class_counts = np.bincount(labels, minlength=10)
print("每类样本数量:", class_counts)

# 计算类别权重(样本越少权重越大)
weights = 1.0 / (class_counts + 1e-6)
weights = weights / weights.sum() * len(class_counts)
class_weights = torch.FloatTensor(weights)

import torch.nn as nn
import torch.nn.functional as F

# 简单CNN模型
class SimpleCNN(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(3, 32, 3)
self.conv2 = nn.Conv2d(32, 64, 3)
self.fc1 = nn.Linear(64 * 6 * 6, 128)
self.fc2 = nn.Linear(128, 10)

def forward(self, x):
x = F.relu(self.conv1(x)) # [batch, 32, 30, 30]
x = F.max_pool2d(x, 2) # [batch, 32, 15, 15]
x = F.relu(self.conv2(x)) # [batch, 64, 13, 13]
x = F.max_pool2d(x, 2) # [batch, 64, 6, 6]
x = x.view(-1, 64 * 6 * 6)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return x

# 使用权重定义加权损失函数
criterion = nn.CrossEntropyLoss(weight=class_weights)

from torch.utils.data import DataLoader
from torch import optim

model = SimpleCNN()
optimizer = optim.Adam(model.parameters(), lr=0.001)
loader = DataLoader(long_tail_dataset, batch_size=64, shuffle=True)

for epoch in range(10):
for images, labels in loader:
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Epoch {epoch}: loss = {loss.item():.4f}")

Epoch 0: loss = 1.9016
Epoch 1: loss = 1.7739
Epoch 2: loss = 1.4057
Epoch 3: loss = 1.8090
Epoch 4: loss = 1.6333
Epoch 5: loss = 1.5193
Epoch 6: loss = 0.8202
Epoch 7: loss = 0.5897
Epoch 8: loss = 0.7408
Epoch 9: loss = 1.2246

  • Title: 机器学习--长尾分布
  • Author: 姜智浩
  • Created at : 2025-05-26 11:45:14
  • Updated at : 2025-05-26 19:44:22
  • Link: https://super-213.github.io/zhihaojiang.github.io/2025/05/26/20250526机器学习--长尾分布/
  • License: This work is licensed under CC BY-NC-SA 4.0.