From 12715a9f2a0eed625cf32a315e88ab5b4e8b8c39 Mon Sep 17 00:00:00 2001 From: iQuxLE Date: Wed, 14 Aug 2024 06:07:29 -0700 Subject: [PATCH 1/3] batch embedding function call, using tokenizer --- src/curate_gpt/store/duckdb_adapter.py | 154 ++++++++++++++++++++----- 1 file changed, 125 insertions(+), 29 deletions(-) diff --git a/src/curate_gpt/store/duckdb_adapter.py b/src/curate_gpt/store/duckdb_adapter.py index e8cd4f8..b3595f2 100644 --- a/src/curate_gpt/store/duckdb_adapter.py +++ b/src/curate_gpt/store/duckdb_adapter.py @@ -10,7 +10,7 @@ import time from dataclasses import dataclass, field from typing import Any, Callable, ClassVar, Dict, Iterable, Iterator, List, Mapping, Optional, Union - +from langchain_openai import OpenAIEmbeddings import duckdb import numpy as np import openai @@ -174,7 +174,37 @@ def create_index(self, collection: str): """ self.conn.execute(create_index_sql) - def _embedding_function(self, texts: Union[str, List[str]], model: str = None) -> list: + def _embedding_function_langchain(self, texts: Union[str, List[str], List[List[str]]], model: str = None) -> list: + """ + Get the embeddings for the given texts using the specified model + :param texts: A single text or a list of texts or list of list of texts to embed + :param model: Model to use for embedding, defaults to "text-embedding-ada-002" if none provided + :return: A list of embeddings or a list of list of embeddings depending on input format + """ + if model is None: + model = "text-embedding-ada-002" # Default model + + # Flatten the input if it's a list of lists + flatten = False + if any(isinstance(i, list) for i in texts): + original_structure = [len(sublist) for sublist in texts if isinstance(sublist, list)] + texts = [item for sublist in texts for item in sublist] # Flatten the list + flatten = True + + embeddings = embeddings.embed_documents(texts, model) + + # If the input was a list of lists, reconstruct the nested list structure + if flatten: + new_embeddings = [] + index = 0 + for size in original_structure: + new_embeddings.append(embeddings[index:index + size]) + index += size + embeddings = new_embeddings + + return embeddings + + def _embedding_function(self, texts: Union[str, List[str], List[List[str]]], model: str = None) -> list: """ Get the embeddings for the given texts using the specified model :param texts: A single text or a list of texts to embed @@ -320,33 +350,99 @@ def _process_objects( cumulative_len = 0 sql_command = self._generate_sql_command(collection, method) sql_command = sql_command.format(collection=collection) - for next_objs in chunk(objs, batch_size): - next_objs = list(next_objs) - logger.info("Processing batch of objects in DuckDB process_objects ...") - docs = [self._text(o, text_field) for o in next_objs] - docs_len = sum([len(d) for d in docs]) - cumulative_len += docs_len - if self._is_openai(collection) and cumulative_len > 3000000: - logger.warning(f"Cumulative length = {cumulative_len}, pausing ...") - time.sleep(60) - cumulative_len = 0 - metadatas = [self._dict(o) for o in next_objs] - ids = [self._id(o, id_field) for o in next_objs] - embeddings = self._embedding_function(docs, cm.model) - try: - self.conn.execute("BEGIN TRANSACTION;") - self.conn.executemany( - sql_command, list(zip(ids, metadatas, embeddings, docs, strict=False)) - ) - self.conn.execute("COMMIT;") - except Exception as e: - self.conn.execute("ROLLBACK;") - logger.error( - f"Transaction failed: {e}, default model: {self.default_model}, model used: {model}, len(embeddings): {len(embeddings[0])}" - ) - raise - finally: - self.create_index(collection) + if not self._is_openai(collection): + for next_objs in chunk(objs, batch_size): + next_objs = list(next_objs) + docs = [self._text(o, text_field) for o in next_objs] + docs_len = sum([len(d) for d in docs]) + metadatas = [self._dict(o) for o in next_objs] + ids = [self._id(o, id_field) for o in next_objs] + embeddings = self._embedding_function(docs, cm.model) + try: + self.conn.execute("BEGIN TRANSACTION;") + self.conn.executemany( + sql_command, list(zip(ids, metadatas, embeddings, docs, strict=False)) + ) + self.conn.execute("COMMIT;") + except Exception as e: + self.conn.execute("ROLLBACK;") + logger.error(f"Transaction failed: {e}, default model: {self.default_model}, model used: {model}, len(embeddings): {len(embeddings[0])}") + raise + finally: + self.create_index(collection) + else: + if model.startswith("openai:"): + openai_model = model.split(":", 1)[1] + if openai_model == "" or openai_model not in MODELS: + logger.info(f"The model {openai_model} is not " + f"one of {MODELS}. Defaulting to {MODELS[0]}") + openai_model = MODELS[0] #ada 002 + else: + logger.error(f"Something went wonky ## model: {model}") + from transformers import GPT2Tokenizer + tokenizer = GPT2Tokenizer.from_pretrained("gpt2") + for next_objs in chunk(objs, batch_size): # Existing chunking + next_objs = list(next_objs) + docs = [self._text(o, text_field) for o in next_objs] + docs_len = sum([len(d) for d in docs]) + metadatas = [self._dict(o) for o in next_objs] + ids = [self._id(o, id_field) for o in next_objs] + + tokenized_docs = [tokenizer.encode(doc) for doc in docs] + current_batch = [] + current_token_count = 0 + batch_embeddings = [] + + i = 0 + while i < len(tokenized_docs): + doc_tokens = tokenized_docs[i] + # peek + if current_token_count + len(doc_tokens) <= 8192: + current_batch.append(doc_tokens) + current_token_count += len(doc_tokens) + i += 1 + else: + if current_batch: + logger.info(f"Curent token count to embed: {current_token_count}") + texts = [tokenizer.decode(tokens) for tokens in current_batch] + embeddings = OpenAIEmbeddings(model=openai_model, tiktoken_model_name=model).embed_documents(texts, + openai_model) + logger.info(f"len embeddings: {len(embeddings)}") + batch_embeddings.extend(embeddings) + + if len(doc_tokens) > 8192: + logger.warning( + f"Document with ID {ids[i]} exceeds the token limit alone and will be skipped.") + # try: + # embeddings = OpenAIEmbeddings(model=model, tiktoken_model_name=model).embed_query(texts, + # model) + # batch_embeddings.extend(embeddings) + # skipping + i += 1 + continue + else: + current_batch = [] + current_token_count = 0 + + if current_batch: + texts = [tokenizer.decode(tokens) for tokens in current_batch] + embeddings = OpenAIEmbeddings(model=openai_model, tiktoken_model_name=openai_model).embed_documents(texts, + openai_model) + batch_embeddings.extend(embeddings) + logger.info(f"Trying to insert: {len(ids)} IDS, {len(metadatas)} METADATAS, {len(batch_embeddings)} EMBEDDINGS") + try: + self.conn.execute("BEGIN TRANSACTION;") + self.conn.executemany( + sql_command, list(zip(ids, metadatas, batch_embeddings, docs, strict=False)) + ) + self.conn.execute("COMMIT;") + except Exception as e: + self.conn.execute("ROLLBACK;") + logger.error( + f"Transaction failed: {e}, default model: {self.default_model}, model used: {model}, len(embeddings): {len(embeddings[0])}") + raise + finally: + self.create_index(collection) def remove_collection(self, collection: str = None, exists_ok=False, **kwargs): """ From 5af5e0e7d56ff85158d9c0fb5982dbe5fc2e85fc Mon Sep 17 00:00:00 2001 From: Carlo Kroll Date: Tue, 20 Aug 2024 12:13:12 -0700 Subject: [PATCH 2/3] delete unsused function --- src/curate_gpt/store/duckdb_adapter.py | 30 -------------------------- 1 file changed, 30 deletions(-) diff --git a/src/curate_gpt/store/duckdb_adapter.py b/src/curate_gpt/store/duckdb_adapter.py index b3595f2..4d1cb2e 100644 --- a/src/curate_gpt/store/duckdb_adapter.py +++ b/src/curate_gpt/store/duckdb_adapter.py @@ -174,36 +174,6 @@ def create_index(self, collection: str): """ self.conn.execute(create_index_sql) - def _embedding_function_langchain(self, texts: Union[str, List[str], List[List[str]]], model: str = None) -> list: - """ - Get the embeddings for the given texts using the specified model - :param texts: A single text or a list of texts or list of list of texts to embed - :param model: Model to use for embedding, defaults to "text-embedding-ada-002" if none provided - :return: A list of embeddings or a list of list of embeddings depending on input format - """ - if model is None: - model = "text-embedding-ada-002" # Default model - - # Flatten the input if it's a list of lists - flatten = False - if any(isinstance(i, list) for i in texts): - original_structure = [len(sublist) for sublist in texts if isinstance(sublist, list)] - texts = [item for sublist in texts for item in sublist] # Flatten the list - flatten = True - - embeddings = embeddings.embed_documents(texts, model) - - # If the input was a list of lists, reconstruct the nested list structure - if flatten: - new_embeddings = [] - index = 0 - for size in original_structure: - new_embeddings.append(embeddings[index:index + size]) - index += size - embeddings = new_embeddings - - return embeddings - def _embedding_function(self, texts: Union[str, List[str], List[List[str]]], model: str = None) -> list: """ Get the embeddings for the given texts using the specified model From 5a61ac6fa2d4b4ac166caa8256f10644bce57164 Mon Sep 17 00:00:00 2001 From: Carlo Kroll Date: Tue, 20 Aug 2024 15:12:09 -0700 Subject: [PATCH 3/3] use llm instead of langchain --- src/curate_gpt/store/duckdb_adapter.py | 33 +++++++++++++++----------- src/curate_gpt/store/vocab.py | 11 +++++++++ 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/curate_gpt/store/duckdb_adapter.py b/src/curate_gpt/store/duckdb_adapter.py index 4d1cb2e..5260616 100644 --- a/src/curate_gpt/store/duckdb_adapter.py +++ b/src/curate_gpt/store/duckdb_adapter.py @@ -10,7 +10,7 @@ import time from dataclasses import dataclass, field from typing import Any, Callable, ClassVar, Dict, Iterable, Iterator, List, Mapping, Optional, Union -from langchain_openai import OpenAIEmbeddings +import llm import duckdb import numpy as np import openai @@ -33,6 +33,8 @@ IDS, METADATAS, MODEL_DIMENSIONS, + MODEL_MAP, + DEFAULT_MODEL, MODELS, OBJECT, OPENAI_MODEL_DIMENSIONS, @@ -192,12 +194,12 @@ def _embedding_function(self, texts: Union[str, List[str], List[List[str]]], mod if model.startswith("openai:"): self._initialize_openai_client() openai_model = model.split(":", 1)[1] - if openai_model == "" or openai_model not in MODELS: + if openai_model == "" or openai_model not in MODEL_MAP.keys(): logger.info( f"The model {openai_model} is not " - f"one of {MODELS}. Defaulting to {MODELS[1]}" + f"one of {[MODEL_MAP.keys()]}. Defaulting to {DEFAULT_MODEL}" ) - openai_model = MODELS[1] + openai_model = DEFAULT_MODEL responses = [ self.openai_client.embeddings.create(input=text, model=openai_model) @@ -343,10 +345,10 @@ def _process_objects( else: if model.startswith("openai:"): openai_model = model.split(":", 1)[1] - if openai_model == "" or openai_model not in MODELS: + if openai_model == "" or openai_model not in MODEL_MAP.keys(): logger.info(f"The model {openai_model} is not " - f"one of {MODELS}. Defaulting to {MODELS[0]}") - openai_model = MODELS[0] #ada 002 + f"one of {MODEL_MAP.keys()}. Defaulting to {DEFAULT_MODEL}") + openai_model = DEFAULT_MODEL #ada 002 else: logger.error(f"Something went wonky ## model: {model}") from transformers import GPT2Tokenizer @@ -373,11 +375,12 @@ def _process_objects( i += 1 else: if current_batch: - logger.info(f"Curent token count to embed: {current_token_count}") + logger.info(f"Tokens: {current_token_count}") texts = [tokenizer.decode(tokens) for tokens in current_batch] - embeddings = OpenAIEmbeddings(model=openai_model, tiktoken_model_name=model).embed_documents(texts, - openai_model) - logger.info(f"len embeddings: {len(embeddings)}") + short_name, _ = MODEL_MAP[openai_model] + embedding_model = llm.get_embedding_model(short_name) + embeddings = list(embedding_model.embed_multi(texts)) + logger.info(f"Number of Documents in batch: {len(embeddings)}") batch_embeddings.extend(embeddings) if len(doc_tokens) > 8192: @@ -385,7 +388,7 @@ def _process_objects( f"Document with ID {ids[i]} exceeds the token limit alone and will be skipped.") # try: # embeddings = OpenAIEmbeddings(model=model, tiktoken_model_name=model).embed_query(texts, - # model) + # embeddings.average model) # batch_embeddings.extend(embeddings) # skipping i += 1 @@ -395,9 +398,11 @@ def _process_objects( current_token_count = 0 if current_batch: + logger.info(f"Last batch, token count: {current_token_count}") texts = [tokenizer.decode(tokens) for tokens in current_batch] - embeddings = OpenAIEmbeddings(model=openai_model, tiktoken_model_name=openai_model).embed_documents(texts, - openai_model) + short_name, _ = MODEL_MAP[openai_model] + embedding_model = llm.get_embedding_model(short_name) + embeddings = list(embedding_model.embed_multi(texts)) batch_embeddings.extend(embeddings) logger.info(f"Trying to insert: {len(ids)} IDS, {len(metadatas)} METADATAS, {len(batch_embeddings)} EMBEDDINGS") try: diff --git a/src/curate_gpt/store/vocab.py b/src/curate_gpt/store/vocab.py index 9e0d665..51a7554 100644 --- a/src/curate_gpt/store/vocab.py +++ b/src/curate_gpt/store/vocab.py @@ -26,3 +26,14 @@ "text-embedding-3-large": 3072, } MODELS = ["text-embedding-ada-002", "text-embedding-3-small", "text-embedding-3-large"] + +MODEL_MAP = { + "text-embedding-ada-002": ("ada-002", 1536), + "text-embedding-3-small": ("3-small", 1536), + "text-embedding-3-large": ("3-large", 3072), + "text-embedding-3-small-512": ("3-small-512", 512), + "text-embedding-3-large-256": ("3-large-256", 256), + "text-embedding-3-large-1024": ("3-large-1024", 1024) + } + +DEFAULT_MODEL = "text-embedding-ada-002" \ No newline at end of file