= load_dotenv(find_dotenv(), override=True)
_ = os.environ['TIMESCALE_SERVICE_URL'] service_url
PgVectorizer
Vectorize
Vectorize (service_url:str, table_name:str, schema_name:str='public', id_column_name:str='id', work_queue_table_name:str=None, trigger_name:str='track_changes_for_embedding', trigger_name_fn:str=None)
Initialize self. See help(type(self)) for accurate signature.
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
'''
cursor.execute( CREATE TABLE IF NOT EXISTS blog (
id SERIAL PRIMARY KEY NOT NULL,
title TEXT NOT NULL,
author TEXT NOT NULL,
contents TEXT NOT NULL,
category TEXT NOT NULL,
published_time TIMESTAMPTZ NULL --NULL if not yet published
);
''')
'''
cursor.execute( insert into blog (title, author, contents, category, published_time) VALUES ('first', 'mat', 'first_post', 'personal', '2021-01-01');
''')
= Vectorize(service_url, 'blog')
vectorizer
vectorizer.register()# should be idempotent
vectorizer.register()
from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from timescale_vector import client
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores.timescalevector import TimescaleVector
from datetime import timedelta
def get_document(blog):
= CharacterTextSplitter(
text_splitter =1000,
chunk_size=200,
chunk_overlap
)= []
docs for chunk in text_splitter.split_text(blog['contents']):
= f"Author {blog['author']}, title: {blog['title']}, contents:{chunk}"
content = {
metadata "id": str(client.uuid_from_time(blog['published_time'])),
"blog_id": blog['id'],
"author": blog['author'],
"category": blog['category'],
"published_time": blog['published_time'].isoformat(),
}=content, metadata=metadata))
docs.append(Document(page_contentreturn docs
def embed_and_write(blog_instances, vectorizer):
= vectorizer.table_name_unquoted +"_embedding"
TABLE_NAME = OpenAIEmbeddings()
embedding = TimescaleVector(
vector_store =TABLE_NAME,
collection_name=service_url,
service_url=embedding,
embedding=timedelta(days=30),
time_partition_interval
)
# delete old embeddings for all ids in the work queue
= [{"blog_id": blog['locked_id']} for blog in blog_instances]
metadata_for_delete
vector_store.delete_by_metadata(metadata_for_delete)
= []
documents for blog in blog_instances:
# skip blogs that are not published yet, or are deleted (will be None because of left join)
if blog['published_time'] != None:
documents.extend(get_document(blog))
if len(documents) == 0:
return
= [d.page_content for d in documents]
texts = [d.metadata for d in documents]
metadatas = [d.metadata["id"] for d in documents]
ids
vector_store.add_texts(texts, metadatas, ids)
= Vectorize(service_url, 'blog')
vectorizer assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0
= "blog_embedding"
TABLE_NAME = OpenAIEmbeddings()
embedding = TimescaleVector(
vector_store =TABLE_NAME,
collection_name=service_url,
service_url=embedding,
embedding=timedelta(days=30),
time_partition_interval
)
= vector_store.similarity_search_with_score("first", 10)
res assert len(res) == 1
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
'''
cursor.execute( insert into blog (title, author, contents, category, published_time) VALUES ('2', 'mat', 'second_post', 'personal', '2021-01-01');
insert into blog (title, author, contents, category, published_time) VALUES ('3', 'mat', 'third_post', 'personal', '2021-01-01');
''')
assert vectorizer.process(embed_and_write) == 2
assert vectorizer.process(embed_and_write) == 0
= vector_store.similarity_search_with_score("first", 10)
res assert len(res) == 3
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
'''
cursor.execute( DELETE FROM blog WHERE title = '3';
''')
assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0
= vector_store.similarity_search_with_score("first", 10)
res assert len(res) == 2
= vector_store.similarity_search_with_score("second", 10)
res assert len(res) == 2
= res[0][0].page_content
content assert "new version" not in content
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
'''
cursor.execute( update blog set contents = 'second post new version' WHERE title = '2';
''')
assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0
= vector_store.similarity_search_with_score("second", 10)
res assert len(res) == 2
= res[0][0].page_content
content assert "new version" in content
with psycopg2.connect(service_url) as conn:
with conn.cursor() as cursor:
'''
cursor.execute( CREATE TABLE IF NOT EXISTS test.blog_table_name_that_is_really_really_long_and_i_mean_long (
id SERIAL PRIMARY KEY NOT NULL,
title TEXT NOT NULL,
author TEXT NOT NULL,
contents TEXT NOT NULL,
category TEXT NOT NULL,
published_time TIMESTAMPTZ NULL --NULL if not yet published
);
''')
'''
cursor.execute( insert into test.blog_table_name_that_is_really_really_long_and_i_mean_long (title, author, contents, category, published_time) VALUES ('first', 'mat', 'first_post', 'personal', '2021-01-01');
''')
= Vectorize(service_url, 'blog_table_name_that_is_really_really_long_and_i_mean_long', schema_name='test')
vectorizer assert vectorizer.process(embed_and_write) == 1
assert vectorizer.process(embed_and_write) == 0