Diversifying Recommendations
Often the task of recommendations is to find items which are similar but not too similar, diversification of recommendation results aims to retain relevancy without showing items that are "too similar".
For this recipe we will apply modifications to the pattern provided in the similar item recommendations recipe using the recommend endpoint.
Naive Approach
A very simple approach is to simply offset the search to achieve a "no more similar than 'x'" effect. The nice properties of this approach are its simplicity, predictability, and speed. However, while this can be effective at making the recommendations diverse with regard to the item you are recommending from, the recall set itself may not be diverse.
def recommend_diverse_naive(
mq: marqo.Client,
item_id: str,
index_name: str,
limit: int = 10,
diversity: float = 0, # 0.0 to 1.0, a term to control the diversification
max_offset: int = 500, # ensure you have at least 500 items in your index
) -> Dict[str, Any]:
offset = int(
max_offset * diversity
) # calculate the offset based on the diversity term
results = mq.index(index_name).recommend(
documents=[item_id],
limit=limit,
offset=offset,
)
return results
A more Sophisticated Approach
To acheive more diverity not only in regards to the item you are recommending from but also in the recall set itself, we can extend the previous approach and sample points from across the recall set.
The aim is to get limit
items from a set of up to max_limit
items. The diversity term controls the diversification, with a diversity of 0 returning the first limit
items and a diversity of 1 returning limit
items evenly spaced across the first max_limit
items. i.e. a diversity value of x
would return limit
items evenly spaced over the first max_limit*x
items.
def recommend_diverse_stepped(
mq: marqo.Client,
item_id: str,
index_name: str,
limit: int = 10,
diversity: float = 0, # 0.0 to 1.0, a term to control the diversification
max_limit: int = 500, # ensure you have at least 500 items in your index
) -> Dict[str, Any]:
if limit > max_limit:
raise ValueError(
f"limit must be less than or equal to max_limit, got {limit} and {max_limit}"
)
# marqo has a max limit of 1000
if max_limit > 1000:
raise ValueError(
f"max_limit must be less than or equal to 1000, got {max_limit}"
)
# we calculate an actual_limit and step based on the diversity term
actual_limit = int((max_limit - limit) * diversity) + limit
step = actual_limit // limit
results = mq.index(index_name).recommend(
documents=[item_id],
limit=actual_limit,
)
# sample the results
sampled_results = []
# update the results to only include the sampled results
for i in range(0, len(results["hits"]), step):
sampled_results.append(results["hits"][i])
results["hits"] = sampled_results
return results
Using _score
to Control Diversity with Random Sampling
The _score
for each document in the search results can be used to control the diversity of the results. We know that documents with similar _score
values are unlikely to be considered diverse as they are a similar distance from the query. We can treat the differences in scores as weights to inform samping across the recall set. The diversity
parameter acts as a temperature term to control the importance of the rank of the results compared to the _score
changes.
def recommend_diverse_sampled(
mq: marqo.Client,
item_id: str,
index_name: str,
limit: int = 10,
diversity: float = 0, # 0.0 to 1.0, a term to control the diversification
marqo_limit: int = 500, # the limit to fetch from Marqo
) -> Dict[str, Any]:
if limit > marqo_limit:
raise ValueError(
f"limit must be less than or equal to marqo_limit, got {limit} and {marqo_limit}"
)
if marqo_limit > 1000:
raise ValueError(
f"marqo_limit must be less than or equal to 1000, got {marqo_limit}"
)
if diversity < 0 or diversity > 1:
raise ValueError(f"diversity must be between 0 and 1, got {diversity}")
results = mq.index(index_name).recommend(
documents=[item_id],
limit=marqo_limit,
)
ranks = (
1 - (np.arange(1, len(results["hits"]) + 1) / len(results["hits"])) ** 2
) # 1 to 0 descending, exponential decay
marqo_scores = [0] + [hit["_score"] for hit in results["hits"]]
score_diffs = np.abs(
np.diff(marqo_scores)
) # calculate the absolute score differences
score_diffs = (score_diffs - np.min(score_diffs)) / (
np.max(score_diffs) - np.min(score_diffs)
) # normalize the score diffs
score_diffs[0] = np.max(score_diffs) # provide a sensible value for the first score
# calculate the weights using the diversity term as a temperature
weights = ((1 - diversity) * ranks) + (diversity * score_diffs)
# sample indices based on the weights
sampled_indices = np.random.choice(
len(results["hits"]), size=limit, replace=False, p=weights
)
# update the results to only include the sampled results
sampled_results = [results["hits"][i] for i in sampled_indices]
results["hits"] = sampled_results
return results
Using Large Language Models (LLMs)
Large Language Models (LLMs) are very good for data augmentation tasks. Instead of only recommending items directly from the vector of a known item we can use an LLM to generate a set of search terms and then use these terms to search for items. This can be a very effective way to diversify recommendations.
In this example we will use Google's Gemini model with the generative AI API. In practice you could use an LLM of your choice (Gemini has a great cost/performance ratio).
Taking more care with how you represent your data to the LLM can also improve results. In this example we will use a JSON representation of the item to generate search terms. For multimodal indexes using a multimodal LLM with the images can be very effective.
import marqo
import json
import os
import google.generativeai as genai
import numpy as np
from typing import Dict, Any, List
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", None)
genai.configure(api_key=GOOGLE_API_KEY)
LLM = genai.GenerativeModel("gemini-pro")
def validate_response(response: Dict[str, List[str]]) -> bool:
return (
"queries" in response
and isinstance(response["queries"], list)
and all(isinstance(term, str) for term in response["queries"])
)
def create_prompt(item: Dict[str, Any]) -> str:
item_string = json.dumps(item)
prompt = f"""{item_string}\n\n Use the item above to generate a set of 5 to 10 search terms to find complementary items. Return json like {{"queries": [...]}} and nothing else."""
return prompt
def get_document_vector(mq: marqo.Client, item_id: str, index_name: str) -> List[float]:
item_with_facets = mq.index(index_name).get_document(
document_id=item_id, expose_facets=True
)
vecs = []
for facet in item_with_facets["_tensor_facets"]:
vecs.append(facet["_embedding"])
vec = np.mean(vecs, axis=0).tolist()
return vec
def get_complementary_terms(item: Dict[str, Any]) -> List[str]:
response_data = {}
max_attempts = 5
attempts = 0
while not validate_response(response_data) and attempts < max_attempts:
prompt = create_prompt(item)
response = LLM.generate_content([prompt]).text
attempts += 1
try:
response_data = json.loads(response)
except json.JSONDecodeError:
response_data = {}
attempts += 1
if not validate_response(response_data):
raise ValueError(
f"Failed to generate valid response after {max_attempts} attempts"
)
return response_data["queries"]
def recommend_diverse_llm_augmented(
mq: marqo.Client,
item_id: str,
index_name: str,
limit: int = 10,
include_similar: bool = False, # optionally include similar items in the recall set
) -> Dict[str, Any]:
result_sets = []
if include_similar:
similar_results = mq.index(index_name).recommend(
documents=[item_id],
limit=limit,
)
result_sets.append(similar_results)
item = mq.index(index_name).get_document(document_id=item_id)
query_terms = get_complementary_terms(json.dumps(item))
vec = get_document_vector(mq, item_id, index_name)
for query in query_terms:
result = mq.index(index_name).search(
q={query: 1.0},
limit=limit,
context={
"tensor": [{"vector": vec, "weight": 0.2}]
}, # this weight will require tuning
)
result_sets.append(result)
# merge the results and sort by score
results = result_sets[0]
for result_set in result_sets[1:]:
results["hits"] += result_set["hits"]
results["hits"] = sorted(results["hits"], key=lambda x: x["_score"], reverse=True)
return results