Source code for flagevalmm.evaluator.retrieval_evaluator
import json
import numpy as np
import os
from typing import Dict, Any
from flagevalmm.registry import EVALUATORS
# TODO: refactor code
[docs]
def i2t(probs: np.ndarray, return_ranks: bool = False):
npts = probs.shape[0]
ranks = np.zeros(npts)
top1 = np.zeros(npts)
# captions per imae
k = probs.shape[1] // probs.shape[0]
for index in range(npts):
inds = np.argsort(probs[index])[::-1]
# Score
rank = 1e20
for i in range(k * index, k * index + k, 1):
tmp = np.where(inds == i)[0][0]
if tmp < rank:
rank = tmp
ranks[index] = rank
top1[index] = inds[0]
# Compute metrics
r1 = float(100.0 * len(np.where(ranks < 1)[0]) / len(ranks))
r5 = float(100.0 * len(np.where(ranks < 5)[0]) / len(ranks))
r10 = float(100.0 * len(np.where(ranks < 10)[0]) / len(ranks))
medr = float(np.floor(np.median(ranks)) + 1)
meanr = float(ranks.mean() + 1)
metrics = (r1, r5, r10, medr, meanr)
if return_ranks:
return metrics, (ranks, top1)
return metrics
[docs]
def t2i(probs: np.ndarray, return_ranks: bool = False):
npts = probs.shape[0]
# captions per imae
k = probs.shape[1] // probs.shape[0]
ranks = np.zeros(k * npts)
top1 = np.zeros(k * npts)
probs = probs.T
for index in range(npts):
for i in range(k):
inds = np.argsort(probs[k * index + i])[::-1]
ranks[k * index + i] = np.where(inds == index)[0][0]
top1[k * index + i] = inds[0]
# Compute metrics
r1 = float(100.0 * len(np.where(ranks < 1)[0]) / len(ranks))
r5 = float(100.0 * len(np.where(ranks < 5)[0]) / len(ranks))
r10 = float(100.0 * len(np.where(ranks < 10)[0]) / len(ranks))
medr = float(np.floor(np.median(ranks)) + 1)
meanr = float(ranks.mean() + 1)
metrics = (r1, r5, r10, medr, meanr)
if return_ranks:
return metrics, (ranks, top1)
return metrics
[docs]
def json_save(content: Dict[str, Any], jf_nm: str) -> None:
with open(jf_nm, "w") as jf:
json.dump(content, jf)
[docs]
@EVALUATORS.register_module()
class RetrievalEvaluator:
[docs]
def __init__(self, **kwargs):
pass
[docs]
def process(self, dataset, output_dir, **kwargs):
dataset_name = dataset.name
# Load similarity matrix
sim_matrix = np.load(os.path.join(output_dir, f"{dataset_name}.npy"))
# Dataset-specific shape validation
if dataset_name == "f30k" and sim_matrix.shape != (1000, 5000):
print(
f"f30k_sim.shape: {sim_matrix.shape}, please check it. If in try-run mode, ignore the message"
)
# Calculate retrieval metrics
result_i2t = i2t(sim_matrix)
result_t2i = t2i(sim_matrix)
# Print raw results
print(f"{result_i2t}_{result_t2i}")
# Prepare results dictionary
content = {
"i2t_R@1": result_i2t[0],
"i2t_R@5": result_i2t[1],
"i2t_R@10": result_i2t[2],
"t2i_R@1": result_t2i[0],
"t2i_R@5": result_t2i[1],
"t2i_R@10": result_t2i[2],
"mean_recall": (sum(result_i2t[:3]) + sum(result_t2i[:3])) / 6.0,
}
# Save results
json_save(content, os.path.join(output_dir, f"{dataset_name}_result.json"))
print(f"{content}")