feat(blocks): Pinecone blocks (#8535)

* update pinecone

* update blocks

* fix linting

* update test

* update requests

* mock funciton
pull/8593/head^2
Aarushi 2024-11-08 11:13:55 -06:00 committed by GitHub
parent c960bd870c
commit f719c7e70e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 109 additions and 12 deletions

View File

@ -1,4 +1,5 @@
from typing import Literal
import uuid
from typing import Any, Literal
from autogpt_libs.supabase_integration_credentials_store import APIKeyCredentials
from pinecone import Pinecone, ServerlessSpec
@ -98,10 +99,14 @@ class PineconeQueryBlock(Block):
include_metadata: bool = SchemaField(
description="Whether to include metadata in the response", default=True
)
host: str = SchemaField(description="Host for pinecone")
host: str = SchemaField(description="Host for pinecone", default="")
idx_name: str = SchemaField(description="Index name for pinecone")
class Output(BlockSchema):
results: dict = SchemaField(description="Query results from Pinecone")
results: Any = SchemaField(description="Query results from Pinecone")
combined_results: Any = SchemaField(
description="Combined results from Pinecone"
)
def __init__(self):
super().__init__(
@ -119,13 +124,105 @@ class PineconeQueryBlock(Block):
credentials: APIKeyCredentials,
**kwargs,
) -> BlockOutput:
pc = Pinecone(api_key=credentials.api_key.get_secret_value())
idx = pc.Index(host=input_data.host)
results = idx.query(
namespace=input_data.namespace,
vector=input_data.query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata,
try:
# Create a new client instance
pc = Pinecone(api_key=credentials.api_key.get_secret_value())
# Get the index
idx = pc.Index(input_data.idx_name)
# Ensure query_vector is in correct format
query_vector = input_data.query_vector
if isinstance(query_vector, list) and len(query_vector) > 0:
if isinstance(query_vector[0], list):
query_vector = query_vector[0]
results = idx.query(
namespace=input_data.namespace,
vector=query_vector,
top_k=input_data.top_k,
include_values=input_data.include_values,
include_metadata=input_data.include_metadata,
).to_dict()
combined_text = ""
if results["matches"]:
texts = [
match["metadata"]["text"]
for match in results["matches"]
if match.get("metadata", {}).get("text")
]
combined_text = "\n\n".join(texts)
# Return both the raw matches and combined text
yield "results", {
"matches": results["matches"],
"combined_text": combined_text,
}
yield "combined_results", combined_text
except Exception as e:
error_msg = f"Error querying Pinecone: {str(e)}"
raise RuntimeError(error_msg) from e
class PineconeInsertBlock(Block):
class Input(BlockSchema):
credentials: PineconeCredentialsInput = PineconeCredentialsField()
index: str = SchemaField(description="Initialized Pinecone index")
chunks: list = SchemaField(description="List of text chunks to ingest")
embeddings: list = SchemaField(
description="List of embeddings corresponding to the chunks"
)
yield "results", results
namespace: str = SchemaField(
description="Namespace to use in Pinecone", default=""
)
metadata: dict = SchemaField(
description="Additional metadata to store with each vector", default={}
)
class Output(BlockSchema):
upsert_response: str = SchemaField(
description="Response from Pinecone upsert operation"
)
def __init__(self):
super().__init__(
id="477f2168-cd91-475a-8146-9499a5982434",
description="Upload data to a Pinecone index",
categories={BlockCategory.LOGIC},
input_schema=PineconeInsertBlock.Input,
output_schema=PineconeInsertBlock.Output,
)
def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs,
) -> BlockOutput:
try:
# Create a new client instance
pc = Pinecone(api_key=credentials.api_key.get_secret_value())
# Get the index
idx = pc.Index(input_data.index)
vectors = []
for chunk, embedding in zip(input_data.chunks, input_data.embeddings):
vector_metadata = input_data.metadata.copy()
vector_metadata["text"] = chunk
vectors.append(
{
"id": str(uuid.uuid4()),
"values": embedding,
"metadata": vector_metadata,
}
)
idx.upsert(vectors=vectors, namespace=input_data.namespace)
yield "upsert_response", "successfully upserted"
except Exception as e:
error_msg = f"Error uploading to Pinecone: {str(e)}"
raise RuntimeError(error_msg) from e