_ = load_dotenv(find_dotenv(), override=True)
service_url = os.environ['TIMESCALE_SERVICE_URL']PgVectorizer
Vectorize
Vectorize (service_url:str, table_name:str, schema_name:str='public', id_column_name:str='id', work_queue_table_name:str=None, trigger_name:str='track_changes_for_embedding', trigger_name_fn:str=None)
Initialize self. See help(type(self)) for accurate signature.
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS blog (
id SERIAL PRIMARY KEY NOT NULL,
title TEXT NOT NULL,
author TEXT NOT NULL,
contents TEXT NOT NULL,
category TEXT NOT NULL,
published_time TIMESTAMPTZ NULL --NULL if not yet published
);
''')
cursor.execute('''
insert into blog (title, author, contents, category, published_time) VALUES ('first', 'mat', 'first_post', 'personal', '2021-01-01');
''')
vectorizer = Vectorize(service_url, 'blog')
vectorizer.register()
# should be idempotent
vectorizer.register()from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from timescale_vector import client
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores.timescalevector import TimescaleVector
from datetime import timedeltadef get_document(blog):
text_splitter = CharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
)
docs = []
for chunk in text_splitter.split_text(blog['contents']):
content = f"Author {blog['author']}, title: {blog['title']}, contents:{chunk}"
metadata = {
"id": str(client.uuid_from_time(blog['published_time'])),
"blog_id": blog['id'],
"author": blog['author'],
"category": blog['category'],
"published_time": blog['published_time'].isoformat(),
}
docs.append(Document(page_content=content, metadata=metadata))
return docs
def embed_and_write(blog_instances, vectorizer):
TABLE_NAME = vectorizer.table_name_unquoted +"_embedding"
embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
collection_name=TABLE_NAME,
service_url=service_url,
embedding=embedding,
time_partition_interval=timedelta(days=30),
)
# delete old embeddings for all ids in the work queue
metadata_for_delete = [{"blog_id": blog['locked_id']} for blog in blog_instances]
vector_store.delete_by_metadata(metadata_for_delete)
documents = []
for blog in blog_instances:
# skip blogs that are not published yet, or are deleted (will be None because of left join)
if blog['published_time'] != None:
documents.extend(get_document(blog))
if len(documents) == 0:
return
texts = [d.page_content for d in documents]
metadatas = [d.metadata for d in documents]
ids = [d.metadata["id"] for d in documents]
vector_store.add_texts(texts, metadatas, ids)
vectorizer = Vectorize(service_url, 'blog')
assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0
TABLE_NAME = "blog_embedding"
embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
collection_name=TABLE_NAME,
service_url=service_url,
embedding=embedding,
time_partition_interval=timedelta(days=30),
)
res = vector_store.similarity_search_with_score("first", 10)
assert len(res) == 1
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
insert into blog (title, author, contents, category, published_time) VALUES ('2', 'mat', 'second_post', 'personal', '2021-01-01');
insert into blog (title, author, contents, category, published_time) VALUES ('3', 'mat', 'third_post', 'personal', '2021-01-01');
''')
assert vectorizer.process(embed_and_write) == 2
assert vectorizer.process(embed_and_write) == 0
res = vector_store.similarity_search_with_score("first", 10)
assert len(res) == 3
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
DELETE FROM blog WHERE title = '3';
''')
assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0
res = vector_store.similarity_search_with_score("first", 10)
assert len(res) == 2
res = vector_store.similarity_search_with_score("second", 10)
assert len(res) == 2
content = res[0][0].page_content
assert "new version" not in content
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
update blog set contents = 'second post new version' WHERE title = '2';
''')
assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0
res = vector_store.similarity_search_with_score("second", 10)
assert len(res) == 2
content = res[0][0].page_content
assert "new version" in content
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
cursor.execute('''
CREATE TABLE IF NOT EXISTS test.blog_table_name_that_is_really_really_long_and_i_mean_long (
id SERIAL PRIMARY KEY NOT NULL,
title TEXT NOT NULL,
author TEXT NOT NULL,
contents TEXT NOT NULL,
category TEXT NOT NULL,
published_time TIMESTAMPTZ NULL --NULL if not yet published
);
''')
cursor.execute('''
insert into test.blog_table_name_that_is_really_really_long_and_i_mean_long (title, author, contents, category, published_time) VALUES ('first', 'mat', 'first_post', 'personal', '2021-01-01');
''')
vectorizer = Vectorize(service_url, 'blog_table_name_that_is_really_really_long_and_i_mean_long', schema_name='test')
assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0