Densenet ArcFace
캐글 Shopee - Price Match Guarantee 대회 코드 리뷰
공개된 노트북 : https://www.kaggle.com/underwearfitting/pytorch-densenet-arcface-validation-training
간단한 대회 소개
Shopee 라는 e-commerce platform (전자상거래 플랫폼)에서 개최한 대회로 등록된 아이템들 중에 어떤 아이템들이 같은 제품인지 판별하는 대회이다 -> 평가 방식은 mean F1 score로 하나의 행마다 예측된 값에 대한 F1 score를 계산해서 평균을 내는 방식이다. 각 포스팅 아이디 별로 매칭되는 (같은 제품)인 포스팅 아이디들이 예측값이고, 제출시 test set은 70,000장으로 하나의 포스팅 아이디 마다 같은?비슷한? 제품 50개를 찾는다.
현재까지 사용된 techniques -
www.kaggle.com/c/shopee-product-matching/discussion/228537
=> 각 포스팅 마다 이미지 사진이 제공되는데 image retrieval 분야의 기법들을 적용해서 비슷한 사진을 가지는 포스팅을 찾는 방식
* image retrieval : 간단하게 생각하면 이미지를 넣어 유사한 이미지를 반환해주는 검색, 더 구체적으로는 이미지 내 오브젝트와 가장 유사한 오브젝트를 찾아서 반환해주는 시스템
=> 각 포스팅 마다 포스팅 제목이 제공되는데 text embedding를 통해서 (bert, tfidf 등) 비슷한 포스팅을 찾는 방식
등등 여러 기법들을 사용해 앙상블 하는 방법을 쓰는 것 같다.
코드 분석 / 이해
1. Make Folds
GroupKFold : 그룹 KFold는 k-폴드의 변형으로, 그룹 정보를 고려해서 동일한 그룹에 속해있는 데이터가 train set과 test set 에 동시에 들어있지 않도록 함
예를 들어 : <<얼굴 사진에서 표정을 인식하는 시스템을 만들기 위해 100명의 사진을 모았다고 가정>>
한 사람을 찍은 여러 장의 사진이 각기 다른 표정을 담고 있음 ==>
이 데이터셋에 없는 사람의 표정을 정확히 구분할 수 있는 분류기를 만드는 것이 목표
같은 사람의 사진이 훈련 세트와 테스트 세트에 모두 나타날 수 있으므로 그룹별 교차겸증을 하는것이 타당함
새 얼굴에 대한 일반화 성능을 더 정확하게 평가하려면 훈련 세트와 테스트 세트에 서로 다른 사람의 사진이 들어가도록 해야함
출처 : https://woolulu.tistory.com/71
사용방법 (예시) :
>>> from sklearn.model_selection import GroupKFold
>>> X = [0.1, 0.2, 2.2, 2.4, 2.3, 4.55, 5.8, 8.8, 9, 10]
>>> y = ["a", "b", "b", "b", "c", "c", "c", "d", "d", "d"]
>>> groups = [1, 1, 1, 2, 2, 2, 3, 3, 3, 3]
>>> gkf = GroupKFold(n_splits=3)
>>> for train, test in gkf.split(X, y, groups=groups):
... print("%s %s" % (train, test))
[0 1 2 3 4 5] [6 7 8 9]
[0 1 2 6 7 8 9] [3 4 5]
[3 4 5 6 7 8 9] [0 1 2]
코드 :
for fold, (train_idx, valid_idx) in enumerate(gkf.split(df_train, None, df_train.label_group)):
# train["label_group"] 정보를 기준으로 Train/Val 나눔
df_train.loc[valid_idx, 'fold'] = fold
# train["label_group"]를 고려해서 전체 데이터 셋을 크게 5개의 그룹(fold)으로 나누고 train["fold"] col에 fold 번호 저장
2. Label Encoding 라벨인코딩
le = LabelEncoder()
df_train.label_group = le.fit_transform(df_train.label_group)
# train["label_group"] => 카테고리형 데이터를 수치형으로 변환
3. Transforms (image augmentation library - albumentations)
# p – probability of applying the transform. Default: 0.5.
transforms_train = albumentations.Compose([
# resize to 512*512
albumentations.Resize(image_size, image_size),
# y축을 기준으로 가로로 뒤집음
albumentations.HorizontalFlip(p=0.5),
# 밝기와 대비 변경
albumentations.RandomBrightnessContrast(p=0.5, brightness_limit=(-0.2, 0.2), contrast_limit=(-0.2, 0.2)),
# 색조, 채도 변경
albumentations.HueSaturationValue(p=0.5, hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2),
# 입력 변환, 크기 조정 및 회전
albumentations.ShiftScaleRotate(p=0.5, shift_limit=0.0625, scale_limit=0.2, rotate_limit=20),
# 직사각형 영역 제거
albumentations.CoarseDropout(p=0.5),
albumentations.Normalize()
])
transforms_valid = albumentations.Compose([
albumentations.Resize(image_size, image_size),
# resize to 512*512
albumentations.Normalize()
])
*albumentations : 다른 image augmentation 관련 library들과 비교해서 가장 큰 특징은 빠르다는 점 ⇒ numpy, OpenCV, imgaug 등 여러 library(OpenCV 가 메인)들을 기반으로 optimization을 하였기 때문에 다른 library들보다 빠름 (https://hoya012.github.io/blog/albumentation_tutorial/)
4. Pytorch Dataset 정의
# 추상클래스인 Dataset은 최소한 __getitem__, __len__ 함수 구현을 요구
class SHOPEEDataset(Dataset):
def __init__(self, df, mode, transform=None): # 파라미터 인자를 받아 변수에 할당
self.df = df.reset_index(drop=True)
self.mode = mode # train or test (생성될 객체의 용도가 test 인가)
self.transform = transform # Augmentation 인자
def __len__(self): # 전체 데이터의 길이를 계산함
return len(self.df)
def __getitem__(self, index): # 로드한 data를 차례차례 돌려줌, index는 데이터의 인덱스
row = self.df.loc[index]
img = cv2.imread(row.file_path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # pytorch pretrained model use RGB
# cv2는 BGR로 읽기에 RGB로 변환 필수
if self.transform is not None:
res = self.transform(image=img)
img = res['image']
img = img.astype(np.float32)
img = img.transpose(2,0,1) # swap color axis
# numpy image: H x W x C
# torch image: C X H X W
if self.mode == 'test':
# 모델에서 weight들이 float형태이기 때문에 .float()을 이용해서 float형으로 변환
return torch.tensor(img).float()
else:
return torch.tensor(img).float(),
torch.tensor(row.label_group).float()
dataset = SHOPEEDataset(df_train, 'train', transform = transforms_train)
rcParams['figure.figsize'] = 15,5 # figsize 처럼 이미지 크기 바꾸는거
# very useful when you plot inline => subplots들 포함 전체 이미지 크기
for i in range(2):
f, axarr = plt.subplots(1,5)
for p in range(5):
idx = i*5 + p
img, label = dataset[idx]
axarr[p].imshow(img.transpose(0,1).transpose(1,2).squeeze())
axarr[p].set_title(label.item())
5. Modeling
Additive Angular Margin Loss
- softmax 에서 출발
- large-margin softmax loss : 임베딩 벡터와 클래스 중심 간의 거리에 따른 손실값 계산 시 공평하게 거리를 계산하는 것이 아니라, 정답 클래스 중심까지의 거리를 계산할 때 margin값을 더해주어 더 혹독하게 모델을 훈련시키는 방법 ⇒ 이를 통해 feature vector 가 메트릭 러닝이 필요로 하는 “동일 클래스 내에서는 더 잘 모여있고, 다른 클래스와는 더 멀리 떨어지는 곳으로” 임베딩
- SphereFace: L-softmax + normalized - Softmax 함수에 Angular Margin 을 넣어보자
- ArcFace: https://soobarkbar.tistory.com/60
코드
"""
in_features: size of each input sample
out_features: size of each output sample
s: norm of input feature
m: margin
cos(theta + m)
"""
class ArcModule(nn.Module):
def __init__(self, in_features, out_features, s = 10, m = margin):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.s = s
self.m = m
self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
nn.init.xavier_normal_(self.weight)
self.cos_m = math.cos(m)
self.sin_m = math.sin(m)
self.th = torch.tensor(math.cos(math.pi - m))
self.mm = torch.tensor(math.sin(math.pi - m) * m)
def forward(self, inputs, labels):
# Regularization
# cos
cos_th = F.linear(inputs, F.normalize(self.weight))
# prevent zero division when backward
cos_th = cos_th.clamp(-1, 1)
# sin
sin_th = torch.sqrt(1.0 - torch.pow(cos_th, 2))
# phi = cos(theta + m) Cosine formula
cos_th_m = cos_th * self.cos_m - sin_th * self.sin_m
cos_th_m = torch.where(cos_th > self.th, cos_th_m, cos_th - self.mm)
cond_v = cos_th - self.th
cond = cond_v <= 0
cos_th_m[cond] = (cos_th - self.mm)[cond]
# convert label to one-hot
if labels.dim() == 1:
labels = labels.unsqueeze(-1)
onehot = torch.zeros(cos_th.size()).cuda()
labels = labels.type(torch.LongTensor).cuda()
# Map the label of the sample to one hot form. For example, N labels are mapped to (N, num_classes).
onehot.scatter_(1, labels, 1.0)
# For the correct category (1*phi) is the cos(theta + m) in the formula, for the wrong category (1*cosine) ie the cos(theta) in the formula
# Thus for each sample, such as [0,0,0,1,0,0] belongs to the fourth category, the final result is [cosine, cosine, cosine, phi, cosine, cosine]
outputs = onehot * cos_th_m + (1.0 - onehot) * cos_th
# Multiply by radius
outputs = outputs * self.s
return outputs
# densenet121 모델
class SHOPEEDenseNet(nn.Module):
def __init__(self, channel_size, out_feature, dropout=0.5, backbone='densenet121', pretrained=True):
super(SHOPEEDenseNet, self).__init__()
self.backbone = timm.create_model(backbone, pretrained=pretrained)
self.channel_size = channel_size
self.out_feature = out_feature
self.in_features = self.backbone.classifier.in_features
# arcface loss
self.margin = ArcModule(in_features=self.channel_size, out_features = self.out_feature)
self.bn1 = nn.BatchNorm2d(self.in_features)
self.dropout = nn.Dropout2d(dropout, inplace=True)
self.fc1 = nn.Linear(self.in_features * 16 * 16 , self.channel_size)
self.bn2 = nn.BatchNorm1d(self.channel_size)
def forward(self, x, labels=None):
features = self.backbone.features(x)
features = self.bn1(features)
features = self.dropout(features)
features = features.view(features.size(0), -1) # flatten?
features = self.fc1(features)
features = self.bn2(features)
features = F.normalize(features)
if labels is not None:
return self.margin(features, labels)
return features
model = SHOPEEDenseNet(512, df_train.label_group.nunique())
model.to(device);
6. Utils
Automatic Mixed Precision
- 처리 속도를 높이기 위한 FP16(16bit floating point)연산과 정확도 유지를 위한 FP32 연산을 섞어 학습하는 방법
- Tensor Core를 활용한 FP16연산을 이용하면 FP32연산 대비 절반의 메모리 사용량과 8배의 연산 처리량 & 2배의 메모리 처리량 효과가 있다
- ⇒ 최적화가 되어 배치를 늘릴 수 있기 때문에 학습 속도가 빨라지지만 배치 뿐만 아니라 모델 최적화도 이루어지기 때문에 속도가 증가함
- ⇒ 파이토치 1.5.0 버전부터 amp 모듈이 내부 라이브러리에 들어오기 시작
- amp 설명 : https://bo-10000.tistory.com/32
- amp code : https://github.com/hoya012/automatic-mixed-precision-tutorials-pytorch
코드
def train_func(train_loader):
model.train()
bar = tqdm(train_loader)
if use_amp:
# define loss scaler for automatic mixed precision
scaler = torch.cuda.amp.GradScaler()
losses = []
for batch_idx, (images, targets) in enumerate(bar):
images, targets = images.to(device), targets.to(device).long()
if debug and batch_idx == 100:
print('Debug Mode. Only train on first 100 batches.')
break
if use_amp:
with torch.cuda.amp.autocast():
logits = model(images, targets)
loss = criterion(logits, targets)
# Scales the loss, and calls backward()
# to create scaled gradients
scaler.scale(loss).backward()
if ((batch_idx + 1) % accumulation_step == 0) or ((batch_idx + 1) == len(train_loader)):
# Unscales gradients and calls
# or skips optimizer.step()
scaler.step(optimizer)
# Updates the scale for next iteration
scaler.update()
optimizer.zero_grad()
else:
logits = model(images, targets)
loss = criterion(logits, targets)
loss.backward()
optimizer.step()
optimizer.zero_grad()
losses.append(loss.item())
smooth_loss = np.mean(losses[-30:])
bar.set_description(f'loss: {loss.item():.5f}, smth: {smooth_loss:.5f}')
loss_train = np.mean(losses)
return loss_train
def valid_func(valid_loader):
model.eval()
bar = tqdm(valid_loader)
PROB = []
TARGETS = []
losses = []
PREDS = []
with torch.no_grad():
for batch_idx, (images, targets) in enumerate(bar):
images, targets = images.to(device), targets.to(device).long()
logits = model(images, targets)
PREDS += [torch.argmax(logits, 1).detach().cpu()]
TARGETS += [targets.detach().cpu()]
loss = criterion(logits, targets)
losses.append(loss.item())
bar.set_description(f'loss: {loss.item():.5f}')
PREDS = torch.cat(PREDS).cpu().numpy()
TARGETS = torch.cat(TARGETS).cpu().numpy()
accuracy = (PREDS==TARGETS).mean()
loss_valid = np.mean(losses)
return loss_valid, accuracy
def generate_test_features(test_loader):
model.eval()
bar = tqdm(test_loader)
FEAS = []
TARGETS = []
with torch.no_grad():
for batch_idx, (images) in enumerate(bar):
images = images.to(device)
features = model(images)
FEAS += [features.detach().cpu()]
FEAS = torch.cat(FEAS).cpu().numpy()
return FEAS
'''
Submissions will be evaluated based on their mean F1 score.
The mean is calculated in a sample-wise fashion, meaning that an F1 score is calculated for every predicted row, then averaged.
'''
def row_wise_f1_score(labels, preds):
scores = []
for label, pred in zip(labels, preds):
n = len(np.intersect1d(label, pred))
score = 2 * n / (len(label)+len(pred))
scores.append(score)
return scores, np.mean(scores)
def find_threshold(df, lower_count_thresh, upper_count_thresh, search_space):
'''
Compute the optimal threshold for the given count threshold.
'''
score_by_threshold = []
best_score = 0
best_threshold = -1
for i in tqdm(search_space):
sim_thresh = i/100
selection = ((FEAS@FEAS.T) > sim_thresh).cpu().numpy()
matches = []
oof = []
for row in selection:
oof.append(df.iloc[row].posting_id.tolist())
matches.append(' '.join(df.iloc[row].posting_id.tolist()))
tmp = df.groupby('label_group').posting_id.agg('unique').to_dict()
df['target'] = df.label_group.map(tmp)
scores, score = row_wise_f1_score(df.target, oof)
df['score'] = scores
df['oof'] = oof
selected_score = df.query(f'count > {lower_count_thresh} and count < {upper_count_thresh}').score.mean()
score_by_threshold.append(selected_score)
if selected_score > best_score:
best_score = selected_score
best_threshold = i
plt.title(f'Threshold Finder for count in [{lower_count_thresh},{upper_count_thresh}].')
plt.plot(score_by_threshold)
plt.axis('off')
plt.show()
print(f'Best score is {best_score} and best threshold is {best_threshold/100}')
7. Train
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = init_lr)
# learing rate가 cos함수를 따라서 eat_min까지 떨어졌다 다시 초기 learning rate까지 올라옴
# eta_min: 최소로 떨어질 수있는 learning rate default=0
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, n_epochs)
df_train_this = df_train[df_train['fold'] != fold_id]
df_valid_this = df_train[df_train['fold'] == fold_id]
df_valid_this['count'] = df_valid_this.label_group.map(df_valid_this.label_group.value_counts().to_dict())
dataset_train = SHOPEEDataset(df_train_this, 'train', transform = transforms_train)
dataset_valid = SHOPEEDataset(df_valid_this, 'test', transform = transforms_valid)
train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers = n_worker)
valid_loader = torch.utils.data.DataLoader(dataset_valid, batch_size=batch_size, shuffle=False, num_workers = n_worker)
for epoch in range(n_epochs):
scheduler.step()
loss_train = train_func(train_loader)
if epoch % valid_every == 0:
print('Now generating features for the validation set to simulate the submission.')
FEAS = generate_test_features(valid_loader)
FEAS = torch.tensor(FEAS).cuda()
print('Finding Best Threshold in the given search space.')
find_threshold(df = df_valid_this,
lower_count_thresh = 0,
upper_count_thresh = 999,
search_space = search_space)
if epoch >= save_after:
torch.save(model.state_dict(), f'{model_dir}{kernel_type}_fold{fold_id}_densenet_{image_size}_epoch{epoch}.pth')
현재 코드는 몇개의 배치만 사용해서 최적의 Threshold 값을 찾는 (test simulation) 코드이므로 추후 Train 코드 작성 필요!