Source code for autofaiss.external.build

""" gather functions necessary to build an index """
import logging
from typing import Dict, Optional, Tuple, Union, Callable, Any

import faiss
import pandas as pd
from faiss import extract_index_ivf
from embedding_reader import EmbeddingReader

from autofaiss.external.metadata import IndexMetadata
from autofaiss.external.optimize import (
    check_if_index_needs_training,
    get_optimal_batch_size,
    get_optimal_index_keys_v2,
    get_optimal_train_size,
)
from autofaiss.indices.index_factory import index_factory
from autofaiss.utils.cast import (
    cast_bytes_to_memory_string,
    cast_memory_to_bytes,
    to_faiss_metric_type,
    to_readable_time,
)
from autofaiss.utils.decorators import Timeit
from autofaiss.indices.distributed import run

logger = logging.getLogger("autofaiss")


[docs]def estimate_memory_required_for_index_creation( nb_vectors: int, vec_dim: int, index_key: Optional[str] = None, max_index_memory_usage: Optional[str] = None, make_direct_map: bool = False, nb_indices_to_keep: int = 1, ) -> Tuple[int, str]: """ Estimates the RAM necessary to create the index The value returned is in Bytes """ if index_key is None: if max_index_memory_usage is not None: index_key = get_optimal_index_keys_v2( nb_vectors, vec_dim, max_index_memory_usage, make_direct_map=make_direct_map )[0] else: raise ValueError("you should give max_index_memory_usage value if no index_key is given") metadata = IndexMetadata(index_key, nb_vectors, vec_dim, make_direct_map) index_memory = metadata.estimated_index_size_in_bytes() needed_for_adding = min(index_memory * 0.1, 10 ** 9) index_needs_training = check_if_index_needs_training(index_key) if index_needs_training: # Compute the smallest number of vectors required to train the index given # the maximal memory constraint nb_vectors_train = get_optimal_train_size(nb_vectors, index_key, "1K", vec_dim) memory_for_training = metadata.compute_memory_necessary_for_training(nb_vectors_train) else: memory_for_training = 0 index_memory_with_n_indices = index_memory / nb_indices_to_keep return int(max(index_memory_with_n_indices + needed_for_adding, memory_for_training)), index_key
[docs]def get_estimated_construction_time_infos(nb_vectors: int, vec_dim: int, indent: int = 0) -> str: """ Gives a general approximation of the construction time of the index """ size = 4 * nb_vectors * vec_dim train = 1000 # seconds, depends on the number of points for training add = 450 * size / (150 * 1024 ** 3) # seconds, Linear approx (450s for 150GB in classic conditions) infos = ( f"-> Train: {to_readable_time(train, rounding=True)}\n" f"-> Add: {to_readable_time(add, rounding=True)}\n" f"Total: {to_readable_time(train + add, rounding=True)}" ) tab = "\t" * indent infos = tab + infos.replace("\n", "\n" + tab) return infos
[docs]def create_index( embedding_reader: EmbeddingReader, index_key: str, metric_type: Union[str, int], current_memory_available: str, embedding_ids_df_handler: Optional[Callable[[pd.DataFrame, int], Any]] = None, use_gpu: bool = False, make_direct_map: bool = False, distributed: Optional[str] = None, temporary_indices_folder: str = "hdfs://root/tmp/distributed_autofaiss_indices", nb_indices_to_keep: int = 1, index_optimizer: Callable = None, ) -> Tuple[Optional[faiss.Index], Dict[str, str]]: """ Function that returns an index on the numpy arrays stored on disk in the embeddings_path path. """ # Instanciate the index with Timeit(f"-> Instanciate the index {index_key}", indent=2): # Convert metric_type to faiss type metric_type = to_faiss_metric_type(metric_type) vec_dim = embedding_reader.dimension # Instanciate the index index = index_factory(vec_dim, index_key, metric_type) metadata = IndexMetadata(index_key, embedding_reader.count, vec_dim, make_direct_map) logger.info( f"The index size will be approximately {cast_bytes_to_memory_string(metadata.estimated_index_size_in_bytes())}" ) index_needs_training = check_if_index_needs_training(index_key) if index_needs_training: # Extract training vectors with Timeit("-> Extract training vectors", indent=2): memory_available_for_training = cast_bytes_to_memory_string(cast_memory_to_bytes(current_memory_available)) # Determine the number of vectors necessary to train the index train_size = get_optimal_train_size( embedding_reader.count, index_key, memory_available_for_training, vec_dim ) memory_needed_for_training = metadata.compute_memory_necessary_for_training(train_size) logger.info( f"Will use {train_size} vectors to train the index, " f"that will use {cast_bytes_to_memory_string(memory_needed_for_training)} of memory" ) # Extract training vectors train_vectors, _ = next(embedding_reader(batch_size=train_size, start=0, end=train_size)) # Instanciate the index and train it # pylint: disable=no-member if use_gpu: # if this fails, it means that the GPU version was not comp. assert ( faiss.StandardGpuResources ), "FAISS was not compiled with GPU support, or loading _swigfaiss_gpu.so failed" res = faiss.StandardGpuResources() dev_no = 0 # transfer to GPU (may be partial). index = faiss.index_cpu_to_gpu(res, dev_no, index) with Timeit( f"-> Training the index with {train_vectors.shape[0]} vectors of dim {train_vectors.shape[1]}", indent=2 ): index.train(train_vectors) del train_vectors else: train_size = 0 size_per_index = metadata.estimated_index_size_in_bytes() / nb_indices_to_keep memory_available_for_adding = cast_bytes_to_memory_string( cast_memory_to_bytes(current_memory_available) - size_per_index ) logger.info( f"The memory available for adding the vectors is {memory_available_for_adding}" "(total available - used by the index)" ) logger.info("Will be using at most 1GB of ram for adding") # Add the vectors to the index. with Timeit("-> Adding the vectors to the index", indent=2): batch_size = get_optimal_batch_size(vec_dim, memory_available_for_adding) logger.info( f"Using a batch size of {batch_size} (memory overhead {cast_bytes_to_memory_string(batch_size*vec_dim*4)})" ) if make_direct_map: # Retrieve the embedded index if we are in an IndexPreTransform state if isinstance(index, faiss.swigfaiss.IndexPreTransform): embedded_index = extract_index_ivf(index) else: embedded_index = index # Make direct map is only implemented for IndexIVF and IndexBinaryIVF, see built file faiss/swigfaiss.py if isinstance(embedded_index, (faiss.swigfaiss.IndexIVF, faiss.swigfaiss.IndexBinaryIVF)): embedded_index.make_direct_map() if distributed is None: for batch_id, (vec_batch, ids_batch) in enumerate(embedding_reader(batch_size=batch_size)): index.add(vec_batch) if embedding_ids_df_handler: embedding_ids_df_handler(ids_batch, batch_id) metric_infos = index_optimizer(index, "") # type: ignore elif distributed == "pyspark": index, metric_infos = run( faiss_index=index, embedding_reader=embedding_reader, memory_available_for_adding=memory_available_for_adding, embedding_ids_df_handler=embedding_ids_df_handler, temporary_indices_folder=temporary_indices_folder, nb_indices_to_keep=nb_indices_to_keep, index_optimizer=index_optimizer, ) else: raise ValueError(f'Distributed by {distributed} is not supported, only "pyspark" is supported') # return the index. return index, metric_infos