????????首先说明为什么用pytorch实现该算法:因为暑假里尝试过在cpu里运行的简陋版本,运行速度过于缓慢;pytorch中有现成的embedding方法可以使用并且做梯度下降比较容易。
下面先说核心算法部分:
这里的时间复杂度仍为o(kn^2),下面是化简到o(kn)
具体实现:
class FactorizationMachine(nn.Module):
def __init__(self, field_dims, embed_dim=4):
super(FactorizationMachine, self).__init__()
self.embed1 = FeaturesEmbedding(field_dims, 1)
self.embed2 = FeaturesEmbedding(field_dims, embed_dim)
self.bias = nn.Parameter(torch.zeros((1,)))
def forward(self, x):
# x shape: (batch_size, num_fields)
# embed(x) shape: (batch_size, num_fields, embed_dim)
square_sum = self.embed2(x).sum(dim=1).pow(2).sum(dim=1)
sum_square = self.embed2(x).pow(2).sum(dim=1).sum(dim=1)
output = self.embed1(x).squeeze(-1).sum(dim=1) + self.bias + (square_sum - sum_square) / 2
output = torch.sigmoid(output).unsqueeze(-1)
return output
最后附上全部代码:
FM.py
from utils import create_dataset, Trainer
from layer import Embedding, FeaturesEmbedding, EmbeddingsInteraction, MultiLayerPerceptron
import torch
import torch.nn as nn
import torch.optim as optim
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Training on [{}].'.format(device))
dataset = create_dataset(sample_num=100000, device=device)
field_dims, (train_X, train_y), (test_X, test_y) = dataset.train_valid_test_split()
class FactorizationMachine(nn.Module):
def __init__(self, field_dims, embed_dim=4):
super(FactorizationMachine, self).__init__()
self.embed1 = FeaturesEmbedding(field_dims, 1)
self.embed2 = FeaturesEmbedding(field_dims, embed_dim)
self.bias = nn.Parameter(torch.zeros((1,)))
def forward(self, x):
# x shape: (batch_size, num_fields)
# embed(x) shape: (batch_size, num_fields, embed_dim)
square_sum = self.embed2(x).sum(dim=1).pow(2).sum(dim=1)
sum_square = self.embed2(x).pow(2).sum(dim=1).sum(dim=1)
output = self.embed1(x).squeeze(-1).sum(dim=1) + self.bias + (square_sum - sum_square) / 2
output = torch.sigmoid(output).unsqueeze(-1)
return output
EMBEDDING_DIM = 8
LEARNING_RATE = 1e-4
REGULARIZATION = 1e-6
BATCH_SIZE = 4096
EPOCH = 600
# TRIAL = 100
fm = FactorizationMachine(field_dims, EMBEDDING_DIM).to(device)
optimizer = optim.Adam(fm.parameters(), lr=LEARNING_RATE, weight_decay=REGULARIZATION)
criterion = nn.BCELoss()
# 二分类交叉熵
trainer = Trainer(fm, optimizer, criterion, BATCH_SIZE)
trainer.train(train_X, train_y, epoch=EPOCH)
test_loss, test_mae, test_acc, test_recall = trainer.test(test_X, test_y)
print('test_loss: {:.5f} | test_mean_absolute_error: {:.5f} | test_accuracy: {:.5f} | test_recall: {:.5f}'.format(test_loss, test_mae, test_acc, test_recall))
layer.py(用到了部分):
import numpy as np
import torch
import torch.nn as nn
# 在 cpu 下,比 nn.Embedding 快,但是在 gpu 的序列模型下比后者慢太多了
class CpuEmbedding(nn.Module):
def __init__(self, num_embeddings, embed_dim):
super(CpuEmbedding, self).__init__()
self.weight = nn.Parameter(torch.zeros((num_embeddings, embed_dim)))
nn.init.xavier_uniform_(self.weight.data)
def forward(self, x):
"""
:param x: shape (batch_size, num_fields)
:return: shape (batch_size, num_fields, embedding_dim)
"""
return self.weight[x]
class Embedding:
def __new__(cls, num_embeddings, embed_dim):
if torch.cuda.is_available():
embedding = nn.Embedding(num_embeddings, embed_dim)
nn.init.xavier_uniform_(embedding.weight.data)
return embedding
else:
return CpuEmbedding(num_embeddings, embed_dim)
class FeaturesEmbedding(nn.Module):
def __init__(self, field_dims, embed_dim):
super(FeaturesEmbedding, self).__init__()
self.embedding = Embedding(sum(field_dims), embed_dim)
# e.g. field_dims = [2, 3, 4, 5], offsets = [0, 2, 5, 9]
self.offsets = np.array((0, *np.cumsum(field_dims)[:-1]), dtype=np.long)
def forward(self, x):
"""
:param x: shape (batch_size, num_fields)
:return: shape (batch_size, num_fields, embedding_dim)
"""
x = x + x.new_tensor(self.offsets)
return self.embedding(x)
class EmbeddingsInteraction(nn.Module):
def __init__(self):
super(EmbeddingsInteraction, self).__init__()
def forward(self, x):
"""
:param x: shape (batch_size, num_fields, embedding_dim)
:return: shape (batch_size, num_fields*(num_fields)//2, embedding_dim)
"""
num_fields = x.shape[1]
i1, i2 = [], []
for i in range(num_fields):
for j in range(i + 1, num_fields):
i1.append(i)
i2.append(j)
interaction = torch.mul(x[:, i1], x[:, i2])
return interaction
class MultiLayerPerceptron(nn.Module):
def __init__(self, layer, batch_norm=True):
super(MultiLayerPerceptron, self).__init__()
layers = []
input_size = layer[0]
for output_size in layer[1: -1]:
layers.append(nn.Linear(input_size, output_size))
if batch_norm:
layers.append(nn.BatchNorm1d(output_size))
layers.append(nn.ReLU())
input_size = output_size
layers.append(nn.Linear(input_size, layer[-1]))
self.mlp = nn.Sequential(*layers)
def forward(self, x):
return self.mlp(x)
rom tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, OrdinalEncoder, KBinsDiscretizer
from sklearn.model_selection import train_test_split
from sklearn import metrics
import torch
class Dataset:
def __init__(self):
self.device = torch.device('cpu')
def to(self, device):
self.device = device
return self
def train_valid_test_split(self, train_size=0.8, test_size=0.2):
field_dims = (self.data.max(axis=0).astype(int) + 1).tolist()[:-1]
train, test = train_test_split(self.data, train_size=train_size, random_state=2021)
device = self.device
train_X = torch.tensor(train[:, :-1], dtype=torch.long).to(device)
test_X = torch.tensor(test[:, :-1], dtype=torch.long).to(device)
train_y = torch.tensor(train[:, -1], dtype=torch.float).unsqueeze(1).to(device)
test_y = torch.tensor(test[:, -1], dtype=torch.float).unsqueeze(1).to(device)
return field_dims, (train_X, train_y), (test_X, test_y)
class MovieLensDataset(Dataset):
def __init__(self, file, read_part=False, sample_num=1000000, task='classification'):
super(MovieLensDataset, self).__init__()
dtype = {
'userId': np.int32,
'movieId': np.int32,
'rating': np.float16,
}
if read_part:
ratings_title = ['userID', 'movieID', 'ratings', 'timestamps']
data_df = pd.read_csv(file, sep='\t', header=None, names=ratings_title,dtype=dtype, nrows=sample_num, engine='python')
data_df = data_df.filter(regex='userID|movieID|ratings')
else:
ratings_title = ['userID', 'movieID', 'ratings', 'timestamps']
data_df = pd.read_csv(file, sep='\t', header=None, names=ratings_title,dtype=dtype, engine='python')
data_df = data_df.filter(regex='userID|movieID|ratings')
if task == 'classification':
data_df['ratings'] = data_df.apply(lambda x: 1 if x['ratings'] > 3 else 0, axis=1).astype(np.int8)
self.data = data_df.values
def create_dataset( read_part=True, sample_num=100000, task='classification', device=torch.device('cpu')):
return MovieLensDataset('ml-100k/u.data', read_part=read_part, sample_num=sample_num, task=task).to(device)
# return MovieLensDataset('ml-latest-small/ratings.csv', read_part=read_part, sample_num=sample_num, task=task).to(device)
class BatchLoader:
def __init__(self, X, y, batch_size=4096, shuffle=True):
assert len(X) == len(y)
self.batch_size = batch_size
if shuffle:
seq = list(range(len(X)))
np.random.shuffle(seq)
self.X = X[seq]
self.y = y[seq]
else:
self.X = X
self.y = y
def __iter__(self):
def iteration(X, y, batch_size):
start = 0
end = batch_size
while start < len(X):
yield X[start: end], y[start: end]
start = end
if end + batch_size < len(X):
end += batch_size
else:
end = len(X)
return iteration(self.X, self.y, self.batch_size)
class Trainer:
def __init__(self, model, optimizer, criterion, batch_size=None, task='classification'):
assert task in ['classification', 'regression']
self.model = model
self.optimizer = optimizer
self.criterion = criterion
self.batch_size = batch_size
self.task = task
def train(self, train_X, train_y, epoch=100):
if self.batch_size:
train_loader = BatchLoader(train_X, train_y, self.batch_size)
else:
# 为了在 for b_x, b_y in train_loader 的时候统一
train_loader = [[train_X, train_y]]
animator = Animator(xlabel='epoch', xlim=[1, epoch],
legend=('train loss', 'train mae',))
for e in tqdm(range(epoch)):
# train part
self.model.train()
train_loss_ = 0
train_mae = 0
for b_x, b_y in train_loader:
self.optimizer.zero_grad()
pred_y = self.model(b_x)
train_loss = self.criterion(pred_y, b_y)
train_loss.backward()
self.optimizer.step()
print('train_loss: {:.5f} | train_mean_absolute_error: {:.5f} | train_accuracy: {:.5f} | train_recall: {:.5f}'.format(*self.test(train_X, train_y)))
def test(self, test_X, test_y):
self.model.eval()
with torch.no_grad():
pred_y = self.model(test_X)
test_loss = self.criterion(pred_y, test_y).detach()
if self.task == 'classification':
# test_metric = metrics.mean_absolute_error(test_y.cpu(), np.around(pred_y.cpu()))
test_mae = abs(test_y.cpu() - np.around(pred_y.cpu())).sum() / len(test_y.cpu())
test_acc = metrics.accuracy_score(test_y.cpu(), np.around(pred_y.cpu()))
test_recall = metrics.recall_score(test_y.cpu(), np.around(pred_y.cpu()))
elif self.task == 'regression':
test_metric = -test_loss
return test_loss, test_mae,test_acc,test_recall
排版可能有点问题。源码参考了github上的一些大佬。主要体会是貌似由于将评分预测变成了二分类问题,及大于3分是1,小于3分是0,使用MAE和RMSE进行评估可能不会那么有对比度。
因为看到网络上貌似很少有对FM基于pytorch实现的整理,故在此贴出。希望对自己日后的科研有所启发。
参考(其中b站up主将原理讲解的相当透彻了):
机器学习算法(3)——FM(Factorization Machine)算法(推导与实现)_ChaucerG的博客-CSDN博客_机器学习fm
FM 代码实现细节(Embedding 和数学实现) - 知乎 【推荐算法】特征交叉 之 FM模型 —— 隐向量特征交叉_哔哩哔哩_bilibili
|