
You've built your first RAG system using embeddings stored in a simple CSV file, and it worked great for your proof-of-concept. But now you're facing 100,000 documents, sub-second query requirements, and a production environment that demands reliability. Your trusty pandas DataFrame isn't going to cut it anymore.
The explosion of AI applications has created a new category of specialized databases designed specifically for vector data. Unlike traditional databases that excel at exact matches and range queries, vector databases optimize for similarity searches across high-dimensional spaces. They're the backbone of modern RAG systems, recommendation engines, and semantic search applications.
Choosing the right vector database can make or break your AI project. Pick wrong, and you'll face scaling bottlenecks, vendor lock-in, or infrastructure complexity that derails your timeline. Pick right, and you'll have a foundation that scales seamlessly from prototype to production.
What you'll learn:
You should be comfortable with:
Vector databases solve a fundamentally different problem than traditional databases. While SQL databases excel at structured queries ("find all customers in California"), vector databases excel at similarity queries ("find documents most similar to this concept"). This requires specialized data structures, indexing algorithms, and query optimization techniques.
The three platforms we'll examine represent different architectural philosophies:
Let's dive deep into each, starting with hands-on implementations.
Pinecone takes the "serverless" approach to vector databases. You don't manage infrastructure, tune parameters, or worry about scaling—you just store vectors and query them. This simplicity comes with tradeoffs in flexibility and cost, but for many teams, it's exactly what they need.
First, let's build a realistic document search system using product documentation from a SaaS company:
import pinecone
import openai
from sentence_transformers import SentenceTransformer
import pandas as pd
import numpy as np
from typing import List, Dict, Any
import time
# Initialize Pinecone (get API key from pinecone.io)
pinecone.init(
api_key="your-pinecone-api-key",
environment="us-west1-gcp-free" # Use your actual environment
)
# Create index with cosine similarity
index_name = "product-docs"
dimension = 384 # Using all-MiniLM-L6-v2 model
if index_name not in pinecone.list_indexes():
pinecone.create_index(
name=index_name,
dimension=dimension,
metric="cosine",
pods=1,
replicas=1,
pod_type="p1.x1"
)
# Connect to index
index = pinecone.Index(index_name)
# Initialize embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
Pinecone's strength lies in its indexing algorithms. It uses a combination of graph-based approaches (similar to HNSW) optimized for cloud deployment. Here's how to structure your data for optimal performance:
class DocumentProcessor:
def __init__(self, model, chunk_size=500, overlap=50):
self.model = model
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_document(self, text: str, doc_id: str) -> List[Dict]:
"""Split document into overlapping chunks for better retrieval"""
words = text.split()
chunks = []
for i in range(0, len(words), self.chunk_size - self.overlap):
chunk_words = words[i:i + self.chunk_size]
chunk_text = " ".join(chunk_words)
chunk_id = f"{doc_id}_chunk_{i // (self.chunk_size - self.overlap)}"
chunks.append({
'id': chunk_id,
'text': chunk_text,
'doc_id': doc_id,
'chunk_index': i // (self.chunk_size - self.overlap),
'word_count': len(chunk_words)
})
return chunks
def process_documents(self, documents: List[Dict]) -> List[Dict]:
"""Process multiple documents into vector-ready chunks"""
all_chunks = []
for doc in documents:
chunks = self.chunk_document(doc['content'], doc['id'])
for chunk in chunks:
# Add document metadata to each chunk
chunk.update({
'title': doc['title'],
'category': doc['category'],
'last_modified': doc['last_modified']
})
all_chunks.append(chunk)
return all_chunks
# Sample product documentation
sample_docs = [
{
'id': 'api-auth',
'title': 'API Authentication Guide',
'category': 'API',
'last_modified': '2024-01-15',
'content': """
Our API uses Bearer token authentication. To authenticate, include
your API key in the Authorization header. Generate API keys from
your dashboard under Settings > API Keys. Each key has configurable
permissions and expiration dates. For production use, rotate keys
monthly and use environment variables to store them securely.
Rate limiting applies at 1000 requests per minute per key. Exceeded
limits return 429 status codes with retry-after headers. Use
exponential backoff for retries.
"""
},
{
'id': 'webhooks-setup',
'title': 'Webhook Configuration',
'category': 'Integration',
'last_modified': '2024-01-10',
'content': """
Webhooks deliver real-time notifications when events occur in your
account. Configure webhook endpoints in your dashboard under
Settings > Webhooks. We support HTTPS endpoints only and require
SSL certificate validation.
Webhook payloads include event type, timestamp, and relevant data.
Verify webhook authenticity using HMAC signatures in the
X-Signature header. Failed deliveries retry with exponential backoff
up to 24 hours.
"""
}
]
# Process documents
processor = DocumentProcessor(model)
chunks = processor.process_documents(sample_docs)
print(f"Created {len(chunks)} chunks from {len(sample_docs)} documents")
Pinecone excels at metadata filtering, which makes it powerful for multi-tenant applications and complex filtering requirements:
def upsert_chunks_to_pinecone(chunks: List[Dict], batch_size: int = 100):
"""Upload chunks to Pinecone with proper batching"""
# Generate embeddings for all chunks
texts = [chunk['text'] for chunk in chunks]
embeddings = model.encode(texts, show_progress_bar=True)
# Prepare vectors for upsert
vectors = []
for i, chunk in enumerate(chunks):
vector_data = {
'id': chunk['id'],
'values': embeddings[i].tolist(),
'metadata': {
'text': chunk['text'][:1000], # Pinecone has metadata limits
'doc_id': chunk['doc_id'],
'title': chunk['title'],
'category': chunk['category'],
'chunk_index': chunk['chunk_index'],
'word_count': chunk['word_count'],
'last_modified': chunk['last_modified']
}
}
vectors.append(vector_data)
# Batch upsert
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i + batch_size]
index.upsert(vectors=batch)
print(f"Uploaded batch {i//batch_size + 1}/{(len(vectors)-1)//batch_size + 1}")
time.sleep(0.1) # Rate limiting courtesy
return len(vectors)
# Upload data
total_vectors = upsert_chunks_to_pinecone(chunks)
print(f"Successfully uploaded {total_vectors} vectors to Pinecone")
# Wait for index to be ready
time.sleep(10)
index_stats = index.describe_index_stats()
print(f"Index contains {index_stats['total_vector_count']} vectors")
Pinecone's metadata filtering is where it really shines. You can combine semantic similarity with precise filtering:
def search_documents(query: str, category_filter: str = None, top_k: int = 5):
"""Search with optional category filtering"""
# Generate query embedding
query_embedding = model.encode([query])[0].tolist()
# Build filter
filter_dict = {}
if category_filter:
filter_dict['category'] = {'$eq': category_filter}
# Query Pinecone
results = index.query(
vector=query_embedding,
top_k=top_k,
include_metadata=True,
filter=filter_dict if filter_dict else None
)
# Format results
formatted_results = []
for match in results['matches']:
result = {
'text': match['metadata']['text'],
'title': match['metadata']['title'],
'category': match['metadata']['category'],
'score': match['score'],
'doc_id': match['metadata']['doc_id']
}
formatted_results.append(result)
return formatted_results
# Test searches
print("=== General Search ===")
results = search_documents("How do I authenticate with the API?")
for r in results:
print(f"Score: {r['score']:.3f} | {r['title']}")
print(f"Text: {r['text'][:150]}...")
print()
print("=== Category Filtered Search ===")
results = search_documents("real-time notifications", category_filter="Integration")
for r in results:
print(f"Score: {r['score']:.3f} | {r['title']} ({r['category']})")
print(f"Text: {r['text'][:150]}...")
print()
Production Tip: Pinecone's metadata filtering happens at query time, not during indexing. This means you can add complex filters without rebuilding your index, but it also means filters can impact query performance if you have millions of vectors.
Weaviate takes a fundamentally different approach. It's built around GraphQL, treats objects as first-class citizens with relationships, and provides more control over the underlying infrastructure. This flexibility comes with additional complexity, but it's powerful for applications that need rich data models.
We'll use Docker Compose to run Weaviate locally with the OpenAI vectorizer module:
# docker-compose.yml
version: '3.4'
services:
weaviate:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
image: semitechnologies/weaviate:1.22.4
ports:
- "8080:8080"
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'text2vec-transformers'
ENABLE_MODULES: 'text2vec-transformers'
TRANSFORMERS_INFERENCE_API: 'http://t2v-transformers:8080'
CLUSTER_HOSTNAME: 'node1'
volumes:
- weaviate_data:/var/lib/weaviate
t2v-transformers:
image: semitechnologies/transformers-inference:sentence-transformers-all-MiniLM-L6-v2
environment:
ENABLE_CUDA: '0'
volumes:
weaviate_data:
Start Weaviate with docker-compose up -d, then connect with Python:
import weaviate
import json
from typing import List, Dict, Optional
import requests
# Connect to Weaviate
client = weaviate.Client("http://localhost:8080")
# Check if Weaviate is ready
print(f"Weaviate is ready: {client.is_ready()}")
print(f"Weaviate version: {client.get_meta()['version']}")
Weaviate's power lies in its schema system. Unlike Pinecone's key-value approach, Weaviate lets you define rich object types with properties and relationships:
def create_documentation_schema():
"""Create schema for technical documentation with relationships"""
# Delete existing schema if it exists
if client.schema.contains({"class": "Document"}):
client.schema.delete_class("Document")
if client.schema.contains({"class": "DocumentChunk"}):
client.schema.delete_class("DocumentChunk")
if client.schema.contains({"class": "Category"}):
client.schema.delete_class("Category")
# Category class
category_schema = {
"class": "Category",
"description": "Documentation category",
"properties": [
{
"name": "name",
"dataType": ["text"],
"description": "Category name"
},
{
"name": "description",
"dataType": ["text"],
"description": "Category description"
}
]
}
# Document class
document_schema = {
"class": "Document",
"description": "A technical documentation document",
"vectorizer": "text2vec-transformers",
"moduleConfig": {
"text2vec-transformers": {
"poolingStrategy": "masked_mean",
"vectorizeClassName": True
}
},
"properties": [
{
"name": "title",
"dataType": ["text"],
"description": "Document title",
"moduleConfig": {
"text2vec-transformers": {
"skip": False,
"vectorizePropertyName": False
}
}
},
{
"name": "content",
"dataType": ["text"],
"description": "Full document content",
"moduleConfig": {
"text2vec-transformers": {
"skip": False,
"vectorizePropertyName": False
}
}
},
{
"name": "lastModified",
"dataType": ["date"],
"description": "Last modification date"
},
{
"name": "belongsToCategory",
"dataType": ["Category"],
"description": "Document category relationship"
}
]
}
# DocumentChunk class for RAG
chunk_schema = {
"class": "DocumentChunk",
"description": "A chunk of a document for RAG retrieval",
"vectorizer": "text2vec-transformers",
"moduleConfig": {
"text2vec-transformers": {
"poolingStrategy": "masked_mean"
}
},
"properties": [
{
"name": "content",
"dataType": ["text"],
"description": "Chunk content"
},
{
"name": "chunkIndex",
"dataType": ["int"],
"description": "Position within parent document"
},
{
"name": "wordCount",
"dataType": ["int"],
"description": "Number of words in chunk"
},
{
"name": "fromDocument",
"dataType": ["Document"],
"description": "Parent document relationship"
}
]
}
# Create schema objects
client.schema.create_class(category_schema)
client.schema.create_class(document_schema)
client.schema.create_class(chunk_schema)
print("Schema created successfully")
create_documentation_schema()
# Verify schema
schema = client.schema.get()
print(f"Classes in schema: {[cls['class'] for cls in schema['classes']]}")
Weaviate's batch import system is designed for high-throughput data loading with automatic vectorization:
def import_documentation_data():
"""Import sample documentation with proper relationships"""
# First, create categories
categories_data = [
{"name": "API", "description": "API documentation and guides"},
{"name": "Integration", "description": "Third-party integration guides"},
{"name": "Security", "description": "Security best practices"}
]
category_uuids = {}
with client.batch as batch:
batch.batch_size = 100
batch.dynamic = True
for cat_data in categories_data:
uuid = client.batch.add_data_object(
cat_data, "Category"
)
category_uuids[cat_data["name"]] = uuid
print(f"Created {len(category_uuids)} categories")
# Create documents with category relationships
documents_data = [
{
"title": "API Authentication Guide",
"content": """Our API uses Bearer token authentication. To authenticate,
include your API key in the Authorization header. Generate API keys
from your dashboard under Settings > API Keys...""",
"lastModified": "2024-01-15T10:00:00Z",
"category": "API"
},
{
"title": "Webhook Configuration",
"content": """Webhooks deliver real-time notifications when events occur
in your account. Configure webhook endpoints in your dashboard
under Settings > Webhooks...""",
"lastModified": "2024-01-10T15:30:00Z",
"category": "Integration"
}
]
document_uuids = {}
with client.batch as batch:
for doc_data in documents_data:
category_name = doc_data.pop("category")
# Add reference to category
doc_data["belongsToCategory"] = [{
"beacon": f"weaviate://localhost/Category/{category_uuids[category_name]}"
}]
uuid = client.batch.add_data_object(
doc_data, "Document"
)
document_uuids[doc_data["title"]] = uuid
print(f"Created {len(document_uuids)} documents")
# Create document chunks with relationships
chunks_data = []
chunk_size = 200 # words
for title, doc_uuid in document_uuids.items():
# Get document to chunk it
doc = client.data_object.get_by_id(doc_uuid, class_name="Document")
content = doc['properties']['content']
words = content.split()
for i in range(0, len(words), chunk_size):
chunk_words = words[i:i + chunk_size]
chunk_content = " ".join(chunk_words)
chunk_data = {
"content": chunk_content,
"chunkIndex": i // chunk_size,
"wordCount": len(chunk_words),
"fromDocument": [{
"beacon": f"weaviate://localhost/Document/{doc_uuid}"
}]
}
chunks_data.append(chunk_data)
with client.batch as batch:
for chunk_data in chunks_data:
client.batch.add_data_object(chunk_data, "DocumentChunk")
print(f"Created {len(chunks_data)} document chunks")
return category_uuids, document_uuids
category_uuids, document_uuids = import_documentation_data()
Weaviate's GraphQL interface provides powerful querying capabilities that combine semantic search with relationship traversal:
def search_with_relationships(query: str, limit: int = 5):
"""Search chunks and traverse relationships to get full context"""
result = client.query.get("DocumentChunk", ["content", "chunkIndex", "wordCount"]) \
.with_near_text({"concepts": [query]}) \
.with_additional(["certainty", "id"]) \
.with_where({
"path": ["wordCount"],
"operator": "GreaterThan",
"valueInt": 10
}) \
.with_limit(limit) \
.do()
if 'errors' in result:
print(f"Query error: {result['errors']}")
return []
chunks = result['data']['Get']['DocumentChunk']
# For each chunk, get the parent document and category
enriched_results = []
for chunk in chunks:
chunk_id = chunk['_additional']['id']
# Get parent document via relationship
doc_result = client.query.get("DocumentChunk", ["content"]) \
.with_additional(["id"]) \
.with_where({
"path": ["id"],
"operator": "Equal",
"valueText": chunk_id
}) \
.with_limit(1) \
.do()
# Get linked document details
linked_doc_result = client.query.get("DocumentChunk") \
.with_additional(["id"]) \
.where_linked("fromDocument", "Document", {
"path": ["id"],
"operator": "Like",
"valueText": "*"
}) \
.with_limit(1) \
.do()
enriched_chunk = {
"content": chunk["content"],
"chunk_index": chunk["chunkIndex"],
"certainty": chunk['_additional']['certainty'],
"word_count": chunk["wordCount"]
}
enriched_results.append(enriched_chunk)
return enriched_results
# Alternative: Direct GraphQL query for more control
def advanced_graphql_search(query: str):
"""Use raw GraphQL for complex queries"""
graphql_query = """
{
Get {
DocumentChunk(
nearText: {
concepts: ["%s"]
}
limit: 5
) {
content
chunkIndex
wordCount
_additional {
certainty
id
}
fromDocument {
... on Document {
title
lastModified
belongsToCategory {
... on Category {
name
description
}
}
}
}
}
}
}
""" % query
result = client.query.raw(graphql_query)
return result
# Test searches
print("=== Basic Semantic Search ===")
results = search_with_relationships("API authentication tokens")
for r in results:
print(f"Certainty: {r['certainty']:.3f}")
print(f"Content: {r['content'][:200]}...")
print()
print("=== Advanced GraphQL Search ===")
advanced_results = advanced_graphql_search("webhook configuration")
chunks = advanced_results['data']['Get']['DocumentChunk']
for chunk in chunks:
print(f"Certainty: {chunk['_additional']['certainty']:.3f}")
if chunk['fromDocument']:
doc = chunk['fromDocument'][0]
print(f"From: {doc['title']}")
if doc['belongsToCategory']:
cat = doc['belongsToCategory'][0]
print(f"Category: {cat['name']}")
print(f"Content: {chunk['content'][:150]}...")
print()
Weaviate Advantage: The relationship traversal capabilities make Weaviate excellent for knowledge graphs and complex data models where you need to understand connections between concepts, not just find similar content.
pgvector brings vector capabilities to PostgreSQL, the world's most popular open-source database. This approach leverages existing database expertise, tooling, and infrastructure while adding vector search capabilities.
Install pgvector on PostgreSQL (this example uses Docker):
# docker-compose-postgres.yml
version: '3.8'
services:
postgres:
image: pgvector/pgvector:pg15
environment:
POSTGRES_DB: vectordb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:
import psycopg2
import numpy as np
from sentence_transformers import SentenceTransformer
import pandas as pd
from typing import List, Dict, Any, Optional
import json
# Database connection
conn_params = {
'host': 'localhost',
'database': 'vectordb',
'user': 'postgres',
'password': 'password',
'port': 5432
}
conn = psycopg2.connect(**conn_params)
conn.autocommit = True
cursor = conn.cursor()
# Enable pgvector extension
cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;")
print("pgvector extension enabled")
# Initialize embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
embedding_dim = 384
pgvector's strength is leveraging SQL's full power for complex queries and joins:
def create_production_schema():
"""Create a production-ready schema with proper indexing"""
# Drop existing tables
tables = ['document_chunks', 'documents', 'categories']
for table in tables:
cursor.execute(f"DROP TABLE IF EXISTS {table} CASCADE;")
# Categories table
cursor.execute("""
CREATE TABLE categories (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# Documents table with full-text search
cursor.execute("""
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title VARCHAR(500) NOT NULL,
content TEXT NOT NULL,
category_id INTEGER REFERENCES categories(id),
last_modified TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
content_vector vector(384), -- Full document embedding
word_count INTEGER,
-- Full-text search
content_tsvector tsvector
);
""")
# Document chunks table optimized for RAG
cursor.execute(f"""
CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
content TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
word_count INTEGER,
embedding vector({embedding_dim}),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-- Composite unique constraint
UNIQUE(document_id, chunk_index)
);
""")
# Create indexes for performance
indexes = [
"CREATE INDEX ON documents USING gin(content_tsvector);",
"CREATE INDEX ON documents(category_id);",
"CREATE INDEX ON documents(last_modified);",
"CREATE INDEX ON document_chunks(document_id);",
"CREATE INDEX ON document_chunks(word_count);",
# HNSW index for vector similarity (approximate)
f"CREATE INDEX ON document_chunks USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64);",
# IVFFlat index alternative (can be better for smaller datasets)
# f"CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);"
]
for index_sql in indexes:
cursor.execute(index_sql)
# Create trigger for automatic tsvector updates
cursor.execute("""
CREATE OR REPLACE FUNCTION update_content_tsvector()
RETURNS trigger AS $$
BEGIN
NEW.content_tsvector := to_tsvector('english', NEW.content);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_documents_tsvector
BEFORE INSERT OR UPDATE ON documents
FOR EACH ROW EXECUTE FUNCTION update_content_tsvector();
""")
print("Production schema created with indexes")
create_production_schema()
pgvector handles batch operations efficiently using PostgreSQL's COPY protocol:
def insert_sample_data():
"""Insert sample data with proper batch processing"""
# Insert categories
categories = [
('API', 'API documentation and reference'),
('Integration', 'Third-party integration guides'),
('Security', 'Security and authentication guides')
]
cursor.executemany(
"INSERT INTO categories (name, description) VALUES (%s, %s);",
categories
)
# Get category IDs
cursor.execute("SELECT id, name FROM categories;")
category_map = {name: id for id, name in cursor.fetchall()}
# Sample documents
docs = [
{
'title': 'Complete API Authentication Guide',
'content': '''
Our REST API uses Bearer token authentication for secure access.
To authenticate your requests, include your API key in the Authorization
header using the Bearer scheme: Authorization: Bearer your-api-key-here.
Generate API keys from your account dashboard under Settings > API Keys.
Each key can be configured with specific permissions and expiration dates.
For production environments, we strongly recommend:
1. Rotating keys monthly
2. Using environment variables to store keys securely
3. Implementing proper error handling for 401/403 responses
4. Setting up monitoring for failed authentication attempts
Rate limiting is enforced at 1000 requests per minute per API key.
When limits are exceeded, the API returns HTTP 429 status codes with
Retry-After headers indicating when to retry the request.
''',
'category': 'API',
'last_modified': '2024-01-15 10:00:00'
},
{
'title': 'Webhook Integration Setup',
'content': '''
Webhooks provide real-time notifications when events occur in your account.
This guide covers complete webhook setup and best practices for reliable
event processing.
Configure webhook endpoints in your dashboard under Settings > Webhooks.
All endpoints must use HTTPS with valid SSL certificates. We validate
certificates and reject self-signed certificates in production.
Webhook payload structure:
- event_type: The type of event that occurred
- timestamp: ISO 8601 timestamp of the event
- data: Event-specific payload
- signature: HMAC-SHA256 signature for verification
Verify webhook authenticity by computing HMAC signatures using your
webhook secret. The signature is included in the X-Signature header.
Our delivery system implements exponential backoff for failed deliveries,
retrying for up to 24 hours. Monitor webhook delivery status in your
dashboard to identify and resolve delivery issues quickly.
''',
'category': 'Integration',
'last_modified': '2024-01-12 14:30:00'
}
]
# Process and insert documents
for doc in docs:
# Generate full document embedding
doc_embedding = model.encode([doc['content']])[0]
word_count = len(doc['content'].split())
cursor.execute("""
INSERT INTO documents (title, content, category_id, last_modified,
content_vector, word_count)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id;
""", (
doc['title'],
doc['content'],
category_map[doc['category']],
doc['last_modified'],
doc_embedding.tolist(), # pgvector accepts lists
word_count
))
doc_id = cursor.fetchone()[0]
# Create chunks
chunks = create_chunks(doc['content'], chunk_size=300, overlap=50)
chunk_data = []
for i, chunk_text in enumerate(chunks):
chunk_embedding = model.encode([chunk_text])[0]
chunk_data.append((
doc_id,
chunk_text,
i,
len(chunk_text.split()),
chunk_embedding.tolist()
))
# Batch insert chunks
cursor.executemany("""
INSERT INTO document_chunks (document_id, content, chunk_index,
word_count, embedding)
VALUES (%s, %s, %s, %s, %s);
""", chunk_data)
print(f"Inserted document '{doc['title']}' with {len(chunks)} chunks")
def create_chunks(text: str, chunk_size: int, overlap: int) -> List[str]:
"""Create overlapping text chunks"""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk_words = words[i:i + chunk_size]
chunks.append(" ".join(chunk_words))
return chunks
insert_sample_data()
# Verify data
cursor.execute("""
SELECT d.title, c.name as category, COUNT(dc.id) as chunk_count
FROM documents d
JOIN categories c ON d.category_id = c.id
JOIN document_chunks dc ON d.id = dc.document_id
GROUP BY d.id, d.title, c.name;
""")
for row in cursor.fetchall():
print(f"Document: {row[0]} ({row[1]}) - {row[2]} chunks")
pgvector's real power emerges when combining vector similarity with SQL's full querying capabilities:
def hybrid_search(query: str, category_filter: Optional[str] = None,
limit: int = 5, similarity_threshold: float = 0.5):
"""Combine vector similarity with full-text search and filtering"""
# Generate query embedding
query_embedding = model.encode([query])[0]
# Build dynamic SQL with optional filters
base_query = """
SELECT
dc.content,
dc.chunk_index,
d.title,
c.name as category,
d.last_modified,
1 - (dc.embedding <=> %s) as similarity,
ts_rank_cd(d.content_tsvector, plainto_tsquery('english', %s)) as text_rank
FROM document_chunks dc
JOIN documents d ON dc.document_id = d.id
JOIN categories c ON d.category_id = c.id
WHERE 1 - (dc.embedding <=> %s) > %s
"""
params = [query_embedding.tolist(), query, query_embedding.tolist(), similarity_threshold]
if category_filter:
base_query += " AND c.name = %s"
params.append(category_filter)
# Add full-text search boost
base_query += """
AND (
d.content_tsvector @@ plainto_tsquery('english', %s)
OR similarity > 0.7
)
ORDER BY
(similarity * 0.7 + LEAST(text_rank, 1.0) * 0.3) DESC,
similarity DESC
LIMIT %s;
"""
params.extend([query, limit])
cursor.execute(base_query, params)
results = cursor.fetchall()
# Format results
formatted_results = []
for row in results:
result = {
'content': row[0],
'chunk_index': row[1],
'title': row[2],
'category': row[3],
'last_modified': row[4],
'similarity': row[5],
'text_rank': row[6],
'combined_score': row[5] * 0.7 + min(row[6], 1.0) * 0.3
}
formatted_results.append(result)
return formatted_results
def analytics_query():
"""Demonstrate SQL analytics capabilities"""
cursor.execute("""
SELECT
c.name as category,
COUNT(DISTINCT d.id) as document_count,
COUNT(dc.id) as chunk_count,
AVG(dc.word_count) as avg_chunk_words,
AVG(d.word_count) as avg_doc_words
FROM categories c
LEFT JOIN documents d ON c.id = d.category_id
LEFT JOIN document_chunks dc ON d.id = dc.document_id
GROUP BY c.id, c.name
ORDER BY document_count DESC;
""")
print("\n=== Content Analytics ===")
for row in cursor.fetchall():
print(f"Category: {row[0]}")
print(f" Documents: {row[1]}, Chunks: {row[2]}")
print(f" Avg words per chunk: {row[3]:.1f}")
print(f" Avg words per document: {row[4]:.1f}")
print()
# Test searches
print("=== Hybrid Search Results ===")
results = hybrid_search("API key authentication best practices")
for r in results:
print(f"Combined Score: {r['combined_score']:.3f} "
f"(Similarity: {r['similarity']:.3f}, Text: {r['text_rank']:.3f})")
print(f"Title: {r['title']} ({r['category']})")
print(f"Content: {r['content'][:200]}...")
print()
analytics_query()
# Performance testing
import time
def benchmark_search(query: str, iterations: int = 10):
"""Benchmark search performance"""
query_embedding = model.encode([query])[0]
start_time = time.time()
for _ in range(iterations):
cursor.execute("""
SELECT content, 1 - (embedding <=> %s) as similarity
FROM document_chunks
WHERE 1 - (embedding <=> %s) > 0.5
ORDER BY similarity DESC
LIMIT 5;
""", (query_embedding.tolist(), query_embedding.tolist()))
cursor.fetchall()
avg_time = (time.time() - start_time) / iterations
print(f"Average query time: {avg_time*1000:.2f}ms")
benchmark_search("authentication")
pgvector Performance: The HNSW index provides excellent query performance for most applications. For datasets under 1M vectors, the performance is comparable to specialized vector databases, with the added benefit of SQL's full querying power.
Let's build a realistic comparison system that helps you evaluate which database works best for your use case:
import time
import statistics
from concurrent.futures import ThreadPoolExecutor
import matplotlib.pyplot as plt
import seaborn as sns
class VectorDatabaseBenchmark:
def __init__(self):
self.results = {
'pinecone': [],
'weaviate': [],
'pgvector': []
}
def benchmark_pinecone(self, queries: List[str]):
"""Benchmark Pinecone performance"""
times = []
for query in queries:
start_time = time.time()
# Generate embedding
query_embedding = model.encode([query])[0].tolist()
# Query Pinecone
results = index.query(
vector=query_embedding,
top_k=5,
include_metadata=True
)
query_time = time.time() - start_time
times.append(query_time)
return {
'avg_time': statistics.mean(times),
'median_time': statistics.median(times),
'min_time': min(times),
'max_time': max(times),
'std_dev': statistics.stdev(times) if len(times) > 1 else 0
}
def benchmark_weaviate(self, queries: List[str]):
"""Benchmark Weaviate performance"""
times = []
for query in queries:
start_time = time.time()
# Query Weaviate
result = client.query.get("DocumentChunk", ["content"]) \
.with_near_text({"concepts": [query]}) \
.with_limit(5) \
.do()
query_time = time.time() - start_time
times.append(query_time)
return {
'avg_time': statistics.mean(times),
'median_time': statistics.median(times),
'min_time': min(times),
'max_time': max(times),
'std_dev': statistics.stdev(times) if len(times) > 1 else 0
}
def benchmark_pgvector(self, queries: List[str]):
"""Benchmark pgvector performance"""
times = []
for query in queries:
start_time = time.time()
# Generate embedding and query
query_embedding = model.encode([query])[0]
cursor.execute("""
SELECT content, 1 - (embedding <=> %s) as similarity
FROM document_chunks
WHERE 1 - (embedding <=> %s) > 0.5
ORDER BY similarity DESC
LIMIT 5;
""", (query_embedding.tolist(), query_embedding.tolist()))
cursor.fetchall()
query_time = time.time() - start_time
times.append(query_time)
return {
'avg_time': statistics.mean(times),
'median_time': statistics.median(times),
'min_time': min(times),
'max_time': max(times),
'std_dev': statistics.stdev(times) if len(times) > 1 else 0
}
def run_comprehensive_benchmark(self):
"""Run benchmarks across all databases"""
test_queries = [
"How do I authenticate with the API?",
"webhook configuration and setup",
"rate limiting and error handling",
"security best practices authentication",
"real-time notifications delivery",
"API key management and rotation",
"HTTPS SSL certificate requirements",
"exponential backoff retry logic",
"monitoring webhook delivery status",
"HMAC signature verification process"
]
print("Running comprehensive benchmark...")
print(f"Testing with {len(test_queries)} queries\n")
# Benchmark each database
pinecone_results = self.benchmark_pinecone(test_queries)
weaviate_results = self.benchmark_weaviate(test_queries)
pgvector_results = self.benchmark_pgvector(test_queries)
# Display results
databases = ['Pinecone', 'Weaviate', 'pgvector']
results = [pinecone_results, weaviate_results, pgvector_results]
print("=== Performance Comparison ===")
print(f"{'Database':<12} {'Avg (ms)':<10} {'Median (ms)':<12} {'Min (ms)':<10} {'Max (ms)':<10}")
print("-" * 60)
for db, result in zip(databases, results):
print(f"{db:<12} {result['avg_time']*1000:<10.2f} "
f"{result['median_time']*1000:<12.2f} "
f"{result['min_time']*1000:<10.2f} "
f"{result['max_time']*1000:<10.2f}")
return {
'pinecone': pinecone_results,
'weaviate': weaviate_results,
'pgvector': pgvector_results
}
# Run benchmark
benchmark = VectorDatabaseBenchmark()
benchmark_results = benchmark.run_comprehensive_benchmark()
def create_feature_comparison():
"""Create a comprehensive feature comparison"""
features = {
'Setup Complexity': {
'Pinecone': '⭐⭐⭐⭐⭐ (Managed service)',
'Weaviate': '⭐⭐⭐ (Docker/K8s setup required)',
'pgvector': '⭐⭐⭐⭐ (Extension install only)'
},
'Query Performance': {
'Pinecone': '⭐⭐⭐⭐⭐ (Optimized for scale)',
'Weaviate': '⭐⭐⭐⭐ (Good with GraphQL overhead)',
'pgvector': '⭐⭐⭐⭐ (Excellent with proper indexing)'
},
'Scalability': {
'Pinecone': '⭐⭐⭐⭐⭐ (Auto-scaling)',
'Weaviate': '⭐⭐⭐⭐ (Horizontal scaling)',
'pgvector': '⭐⭐⭐ (Postgres scaling limits)'
},
'Metadata Filtering': {
'Pinecone': '⭐⭐⭐⭐ (Good filtering)',
'Weaviate': '⭐⭐⭐⭐⭐ (GraphQL + relationships)',
'pgvector': '⭐⭐⭐⭐⭐ (Full SQL power)'
},
'Cost': {
'Pinecone': '⭐⭐ (Can be expensive at scale)',
'Weaviate': '⭐⭐⭐⭐ (Open source + hosting costs)',
'pgvector': '⭐⭐⭐⭐⭐ (Postgres hosting only)'
},
'Ecosystem Integration': {
'Pinecone': '⭐⭐⭐ (Vector-specific)',
'Weaviate': '⭐⭐⭐⭐ (GraphQL ecosystem)',
'pgvector': '⭐⭐⭐⭐⭐ (Entire Postgres ecosystem)'
},
'Learning Curve': {
'Pinecone': '⭐⭐⭐⭐⭐ (Simple API)',
'Weaviate': '⭐⭐⭐ (GraphQL + schema design)',
'pgvector': '⭐⭐⭐⭐ (SQL knowledge required)'
}
}
print("=== Feature Comparison Matrix ===\n")
for feature, ratings in features.items():
print(f"**{feature}:**")
for db, rating in ratings.items():
print(f" {db}: {rating}")
print()
create_feature_comparison()
Metadata Size Limits: Pinecone limits metadata to 40KB per vector. For document retrieval, this often means truncating text or storing only references:
# Bad: Storing full text in metadata
metadata = {
'text': full_document_text, # Could exceed 40KB
'doc_id': 'doc123'
}
# Good: Store references and truncated text
metadata = {
'text': full_document_text[:1000], # Truncate for context
'doc_id': 'doc123',
'title': document_title,
'chunk_index': chunk_num
}
Index Configuration: Wrong pod types or replica settings can dramatically impact cost and performance:
# Expensive: Over-provisioned for development
pinecone.create_index(
name="dev-index",
dimension=1536,
pods=4, # Overkill for development
replicas=3, # Unnecessary redundancy
pod_type="p2.x4" # Premium instance
)
# Cost-effective: Right-sized for workload
pinecone.create_index(
name="prod-index",
dimension=1536,
pods=1, # Start small, scale up
replicas=1,
pod_type="p1.x1" # Basic instance
)
Over-Vectorization: Weaviate can vectorize every property by default, creating unnecessary embeddings:
# Bad: Everything gets vectorized
schema = {
"class": "Document",
"properties": [
{"name": "title", "dataType": ["text"]}, # Will be vectorized
{"name": "id", "dataType": ["text"]}, # Shouldn't be vectorized
{"name": "date", "dataType": ["text"]}, # Shouldn't be vectorized
{"name": "content", "dataType": ["text"]} # Should be vectorized
]
}
# Good: Explicit vectorization control
schema = {
"class": "Document",
"properties": [
{
"name": "title",
"dataType": ["text"],
"moduleConfig": {"text2vec-transformers": {"skip": False}}
},
{
"name": "id",
"dataType": ["text"],
"moduleConfig": {"text2vec-transformers": {"skip": True}}
},
{
"name": "date",
"dataType": ["date"], # Use proper data type
"moduleConfig": {"text2vec-transformers": {"skip": True}}
}
]
}
Missing Indexes: The most common performance killer is inadequate indexing:
-- Bad: No vector index
CREATE TABLE embeddings (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(384)
);
-- Good: Proper indexing strategy
CREATE TABLE embeddings (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(384),
created_at TIMESTAMP DEFAULT NOW(),
category_id INTEGER
);
-- Add appropriate indexes
CREATE INDEX ON embeddings USING hnsw (embedding vector_cosine_ops);
CREATE INDEX ON embeddings(category_id);
CREATE INDEX ON embeddings(created_at);
Index Parameter Tuning: HNSW indexes need proper configuration for your dataset size:
-- Small dataset (< 100K vectors)
CREATE INDEX ON embeddings USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Large dataset (> 1M vectors)
CREATE INDEX ON embeddings USING hnsw (embedding vector_cosine_ops)
WITH (m = 32, ef_construction = 128);
Embedding Consistency: Always verify embeddings are generated consistently:
def test_embedding_consistency():
"""Test that embeddings are reproducible"""
test_text = "This is a test document for embedding consistency"
# Generate multiple embeddings
embeddings = []
for i in range(5):
emb = model.encode([test_text])[0]
embeddings.append(emb)
# Check consistency (should be identical)
base_embedding = embeddings[0]
for i, emb in enumerate(embeddings[1:], 1):
cosine_sim = np.dot(base_embedding, emb) / (np.linalg.norm(base_embedding) * np.linalg.norm(emb))
print(f"Embedding {i} similarity to base: {cosine_sim:.6f}")
assert cosine_sim > 0.999, f"Embedding {i} not consistent!"
test_embedding_consistency()
Query Result Validation: Always validate that your queries return sensible results:
def validate_search_results(query: str, results: List[Dict], min_similarity: float = 0.3):
"""Validate search results make sense"""
if not results:
print(f"WARNING: No results for query '{query}'")
return False
# Check similarity scores
for i, result in enumerate(results):
similarity = result.get('similarity', result.get('score', 0))
if similarity < min_similarity:
print(f"WARNING: Result {i} has low similarity ({similarity:.3f}) for query '{query}'")
# Check for duplicate results
seen_content = set()
for result in results:
content_hash = hash(result.get('content', result.get('text', '')))
if content_hash in seen_content:
print(f"WARNING: Duplicate results found for query '{query}'")
seen_content.add(content_hash)
return True
Vector databases represent a fundamental shift in how we store and query data for AI applications. Each platform we've examined offers distinct advantages:
Choose Pinecone when:
Choose Weaviate when:
Choose pgvector when:
The vector database landscape continues evolving rapidly. Keep these principles in mind as you evaluate new options:
The foundation you build with vector databases will determine how quickly you can innovate with AI. Choose thoughtfully, measure continuously, and remain flexible as the ecosystem evolves.
Learning Path: RAG & AI Agents