Skip to content

Diversifying Recommendations

Often the task of recommendations is to find items which are similar but not too similar, diversification of recommendation results aims to retain relevancy without showing items that are "too similar".

For this recipe we will apply modifications to the pattern provided in the similar item recommendations recipe using the recommend endpoint.

Naive Approach

A very simple approach is to simply offset the search to achieve a "no more similar than 'x'" effect. The nice properties of this approach are its simplicity, predictability, and speed. However, while this can be effective at making the recommendations diverse with regard to the item you are recommending from, the recall set itself may not be diverse.

def recommend_diverse_naive(
    mq: marqo.Client,
    item_id: str,
    index_name: str,
    limit: int = 10,
    diversity: float = 0,  # 0.0 to 1.0, a term to control the diversification
    max_offset: int = 500,  # ensure you have at least 500 items in your index
) -> Dict[str, Any]:
    offset = int(
        max_offset * diversity
    )  # calculate the offset based on the diversity term

    results = mq.index(index_name).recommend(
        documents=[item_id],
        limit=limit,
        offset=offset,
    )
    return results

A more Sophisticated Approach

To acheive more diverity not only in regards to the item you are recommending from but also in the recall set itself, we can extend the previous approach and sample points from across the recall set.

The aim is to get limit items from a set of up to max_limit items. The diversity term controls the diversification, with a diversity of 0 returning the first limit items and a diversity of 1 returning limit items evenly spaced across the first max_limit items. i.e. a diversity value of x would return limit items evenly spaced over the first max_limit*x items.

def recommend_diverse_stepped(
    mq: marqo.Client,
    item_id: str,
    index_name: str,
    limit: int = 10,
    diversity: float = 0,  # 0.0 to 1.0, a term to control the diversification
    max_limit: int = 500,  # ensure you have at least 500 items in your index
) -> Dict[str, Any]:
    if limit > max_limit:
        raise ValueError(
            f"limit must be less than or equal to max_limit, got {limit} and {max_limit}"
        )

    # marqo has a max limit of 1000
    if max_limit > 1000:
        raise ValueError(
            f"max_limit must be less than or equal to 1000, got {max_limit}"
        )

    # we calculate an actual_limit and step based on the diversity term
    actual_limit = int((max_limit - limit) * diversity) + limit
    step = actual_limit // limit

    results = mq.index(index_name).recommend(
        documents=[item_id],
        limit=actual_limit,
    )

    # sample the results
    sampled_results = []

    # update the results to only include the sampled results
    for i in range(0, len(results["hits"]), step):
        sampled_results.append(results["hits"][i])

    results["hits"] = sampled_results
    return results

Using _score to Control Diversity with Random Sampling

The _score for each document in the search results can be used to control the diversity of the results. We know that documents with similar _score values are unlikely to be considered diverse as they are a similar distance from the query. We can treat the differences in scores as weights to inform samping across the recall set. The diversity parameter acts as a temperature term to control the importance of the rank of the results compared to the _score changes.

def recommend_diverse_sampled(
    mq: marqo.Client,
    item_id: str,
    index_name: str,
    limit: int = 10,
    diversity: float = 0,  # 0.0 to 1.0, a term to control the diversification
    marqo_limit: int = 500,  # the limit to fetch from Marqo
) -> Dict[str, Any]:
    if limit > marqo_limit:
        raise ValueError(
            f"limit must be less than or equal to marqo_limit, got {limit} and {marqo_limit}"
        )

    if marqo_limit > 1000:
        raise ValueError(
            f"marqo_limit must be less than or equal to 1000, got {marqo_limit}"
        )

    if diversity < 0 or diversity > 1:
        raise ValueError(f"diversity must be between 0 and 1, got {diversity}")

    results = mq.index(index_name).recommend(
        documents=[item_id],
        limit=marqo_limit,
    )

    ranks = (
        1 - (np.arange(1, len(results["hits"]) + 1) / len(results["hits"])) ** 2
    )  # 1 to 0 descending, exponential decay
    marqo_scores = [0] + [hit["_score"] for hit in results["hits"]]
    score_diffs = np.abs(
        np.diff(marqo_scores)
    )  # calculate the absolute score differences
    score_diffs = (score_diffs - np.min(score_diffs)) / (
        np.max(score_diffs) - np.min(score_diffs)
    )  # normalize the score diffs
    score_diffs[0] = np.max(score_diffs)  # provide a sensible value for the first score

    # calculate the weights using the diversity term as a temperature
    weights = ((1 - diversity) * ranks) + (diversity * score_diffs)

    # sample indices based on the weights
    sampled_indices = np.random.choice(
        len(results["hits"]), size=limit, replace=False, p=weights
    )

    # update the results to only include the sampled results
    sampled_results = [results["hits"][i] for i in sampled_indices]
    results["hits"] = sampled_results
    return results

Using Large Language Models (LLMs)

Large Language Models (LLMs) are very good for data augmentation tasks. Instead of only recommending items directly from the vector of a known item we can use an LLM to generate a set of search terms and then use these terms to search for items. This can be a very effective way to diversify recommendations.

In this example we will use Google's Gemini model with the generative AI API. In practice you could use an LLM of your choice (Gemini has a great cost/performance ratio).

Taking more care with how you represent your data to the LLM can also improve results. In this example we will use a JSON representation of the item to generate search terms. For multimodal indexes using a multimodal LLM with the images can be very effective.

import marqo
import json
import os
import google.generativeai as genai
import numpy as np
from typing import Dict, Any, List

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", None)
genai.configure(api_key=GOOGLE_API_KEY)
LLM = genai.GenerativeModel("gemini-pro")


def validate_response(response: Dict[str, List[str]]) -> bool:
    return (
        "queries" in response
        and isinstance(response["queries"], list)
        and all(isinstance(term, str) for term in response["queries"])
    )


def create_prompt(item: Dict[str, Any]) -> str:
    item_string = json.dumps(item)
    prompt = f"""{item_string}\n\n Use the item above to generate a set of 5 to 10 search terms to find complementary items. Return json like {{"queries": [...]}} and nothing else."""
    return prompt


def get_document_vector(mq: marqo.Client, item_id: str, index_name: str) -> List[float]:
    item_with_facets = mq.index(index_name).get_document(
        document_id=item_id, expose_facets=True
    )

    vecs = []
    for facet in item_with_facets["_tensor_facets"]:
        vecs.append(facet["_embedding"])

    vec = np.mean(vecs, axis=0).tolist()
    return vec


def get_complementary_terms(item: Dict[str, Any]) -> List[str]:
    response_data = {}
    max_attempts = 5
    attempts = 0
    while not validate_response(response_data) and attempts < max_attempts:
        prompt = create_prompt(item)
        response = LLM.generate_content([prompt]).text
        attempts += 1
        try:
            response_data = json.loads(response)
        except json.JSONDecodeError:
            response_data = {}
        attempts += 1

    if not validate_response(response_data):
        raise ValueError(
            f"Failed to generate valid response after {max_attempts} attempts"
        )
    return response_data["queries"]


def recommend_diverse_llm_augmented(
    mq: marqo.Client,
    item_id: str,
    index_name: str,
    limit: int = 10,
    include_similar: bool = False,  # optionally include similar items in the recall set
) -> Dict[str, Any]:
    result_sets = []

    if include_similar:
        similar_results = mq.index(index_name).recommend(
            documents=[item_id],
            limit=limit,
        )
        result_sets.append(similar_results)

    item = mq.index(index_name).get_document(document_id=item_id)
    query_terms = get_complementary_terms(json.dumps(item))

    vec = get_document_vector(mq, item_id, index_name)

    for query in query_terms:
        result = mq.index(index_name).search(
            q={query: 1.0},
            limit=limit,
            context={
                "tensor": [{"vector": vec, "weight": 0.2}]
            },  # this weight will require tuning
        )
        result_sets.append(result)

    # merge the results and sort by score
    results = result_sets[0]
    for result_set in result_sets[1:]:
        results["hits"] += result_set["hits"]

    results["hits"] = sorted(results["hits"], key=lambda x: x["_score"], reverse=True)

    return results