Personal Rank
目录
基于图的推荐 Personal Rank
将用户对物品的行为用二分图(二部图)表示,通过随机游走来计算用户与物品之间的相关度,相关度通过以下三个方面体现:
- 两个顶点之间的连通路径数多
- 两个顶点之间的连通路径长度短
- 两个顶点之间的路径通过的顶点的出度不大
公式:

上述公式算法的时间复杂度很高,所以可以通过矩阵求解进行优化,根据状态转移矩阵,经过一次矩阵运算就可以直接得到系统的稳态

其中
只有一个位置为1,其余位置为0,即固定顶点。
变形之后就变成了求解线性方程组:

复杂度很高的循环迭代法:
import os
# 构建二分图
# 返回{user:{item1:1,item2:1},item:{usera:1,userb:1}}
def get_graph_from_data(input_file):
if not os.path.exists(input_file):
return{}
graph = {}
score_thr = 4.5
with open(input_file,encoding='utf-8') as f:
next(f)
for line in f.readlines():
item = line.strip().split(",")
if len(item) < 3:
continue
# 加入前缀item_ 为了区分user与item
userid,itemid,rating = item[0],"item_"+item[1],item[2]
if float(rating) < score_thr:
continue
if userid not in graph:
graph[userid] = {}
graph[userid][itemid] = 1
if itemid not in graph:
graph[itemid] = {}
graph[itemid][userid] = 1
return graph
# 提取电影信息 {id:[name,genre]}
def get_item_info(input_file):
if not os.path.exists(input_file):
return {}
item_info = {}
with open(input_file,encoding='utf-8') as f:
next(f) #跳过第一行标题
for line in f:
item = line.strip().split(",")
if len(item) < 3:
continue
elif len(item) == 3:
itemid,title,genre = item
else:
itemid = item[0]
# 对于名字间有逗号的电影 American President, The (1995) 特别处理
title = ','.join(item[1:-1]).strip('"')
genre = item[-1]
item_info[itemid] = [title,genre]
return item_info
# personal rank算法
# 二分图 被推荐用户 随机游走概率 迭代次数 推荐数量
# 返回字典 {itemid:pr}
def personal_rank(graph,root,alpha,iter_num,recom_num=10):
recom_result = {}
rank = {} # 存储其他顶点对root顶点的pr值
rank = {point:0 for point in graph}
rank[root] = 1
for iter_index in range(iter_num):
print("迭代"+str(iter_index+1)+"次,共"+str(iter_num)+"次")
tmp_rank = {}
tmp_rank = {point:0 for point in graph}
for out_point,out_dict in graph.items():
for inner_point,value in graph[out_point].items():
tmp_rank[inner_point] += round(alpha*rank[out_point]/len(out_dict),4)
if inner_point == root:
tmp_rank[inner_point] += round(1-alpha,4)
if tmp_rank == rank:
break
rank = tmp_rank
cnt = 0
# 过滤掉用户顶点和root(即用户)顶点已经行为过的item
for item in sorted(rank.items(),key=lambda x:x[1], reverse=True):
point,pr_score = item
# 过滤用户顶点
if len(point.split("_")) < 2:
continue
if point in graph[root]:
continue
recom_result[point] = pr_score
cnt += 1
if cnt >= recom_num:
break
return recom_result
if __name__ == "__main__":
graph = get_graph_from_data("D:/document/Python/ml-latest-small/ratings.csv")
recom_result = personal_rank(graph,'1',0.7,50)
print(recom_result)
具有较高效率的矩阵化方法:
import os
import numpy as np
from scipy.sparse import coo_matrix # 存放稀疏矩阵,方式为存数据的列索引、行索引和数值
from scipy.sparse.linalg import gmres
# 构建二分图
# 返回{user:{item1:1,item2:1},item:{usera:1,userb:1}}
def get_graph_from_data(input_file):
if not os.path.exists(input_file):
return{}
graph = {}
score_thr = 4.5
with open(input_file,encoding='utf-8') as f:
next(f)
for line in f.readlines():
item = line.strip().split(",")
if len(item) < 3:
continue
# 加入前缀item_ 为了区分user与item
userid,itemid,rating = item[0],"item_"+item[1],item[2]
if float(rating) < score_thr:
continue
if userid not in graph:
graph[userid] = {}
graph[userid][itemid] = 1
if itemid not in graph:
graph[itemid] = {}
graph[itemid][userid] = 1
return graph
# 提取电影信息 {id:[name,genre]}
def get_item_info(input_file):
if not os.path.exists(input_file):
return {}
item_info = {}
with open(input_file,encoding='utf-8') as f:
next(f) #跳过第一行标题
for line in f:
item = line.strip().split(",")
if len(item) < 3:
continue
elif len(item) == 3:
itemid,title,genre = item
else:
itemid = item[0]
# 对于名字间有逗号的电影 American President, The (1995) 特别处理
title = ','.join(item[1:-1]).strip('"')
genre = item[-1]
item_info[itemid] = [title,genre]
return item_info
# 将二部图转换为稀疏矩阵
# 返回稀疏矩阵m,顶点vertex和顶点位置address_dict
def graph_to_matrix(graph):
vertex = list(graph.keys())
address_dict = {} # 记录顶点位置
matrix_len = len(graph)
for index in range(len(vertex)):
address_dict[vertex[index]] = index
# 定义行、列和数值数组,来通过coo_matrix模块存放稀疏矩阵
row = []
col = []
data = []
for element_i in graph:
weight = round(1/len(graph[element_i]),3)
row_index = address_dict[element_i]
for element_j in graph[element_i]:
col_index = address_dict[element_j]
row.append(row_index)
col.append(col_index)
data.append(weight)
row = np.array(row)
col = np.array(col)
data = np.array(data)
m = coo_matrix((data,(row,col)),shape=(matrix_len,matrix_len))
return m,vertex,address_dict
# 求E-αM^T
# 输入:矩阵M,顶点,随机游走概率α
def mat_all_point(m_mat,vertex,alpha):
matrix_len = len(vertex)
# row col data为存放单位矩阵E做准备
row = []
col = []
data = []
for index in range(matrix_len):
row.append(index)
col.append(index)
data.append(1)
# 存放单位矩阵E
row = np.array(row)
col = np.array(col)
data = np.array(data)
eye_t = coo_matrix((data,(row,col)),shape=(matrix_len,matrix_len))
# 转为csr格式会加快运算
return eye_t.tocsr() - alpha*m_mat.tocsr().transpose()
# 返回字典{itemid,pr}
def personal_rank_mat(graph,root,alpha,num=10):
m,vertex,address_dict = graph_to_matrix(graph)
if root not in address_dict:
return {}
mat_all = mat_all_point(m,vertex,alpha)
# 求root顶点的位置,得到r0矩阵
index = address_dict[root]
initial_list = [[0] for row in range(len(vertex))]
initial_list[index] = [1]
#计算线性代数方程,定义好误差
r0 = np.array(initial_list)
res = gmres(mat_all,r0,tol=1e-8)[0]
score_dict = {}
recom_dict = {}
for index in range(len(res)):
# 遍历每个顶点
point = vertex[index]
# 若是用户顶点则剔除
if len(point.strip().split("_")) < 2:
continue
# 若是用户已经购买过的物品,则剔除
if point in graph[root]:
continue
score_dict[point] = round(res[index],3)
for item in sorted(score_dict.items(),key=lambda x:x[1],reverse=True):
point,score = item
recom_dict[point] = score
return recom_dict
if __name__ == "__main__":
graph = get_graph_from_data("D:/document/Python/ml-latest-small/ratings.csv")
# m,vertex,address_dict = graph_to_matrix(graph)
# print(m.todense()) #答应出稠密矩阵
recom_result = personal_rank_mat(graph,'1',0.8,10)
print(recom_result)