Page MenuHomec4science

main_without_solution.py
No OneTemporary

File Metadata

Created
Thu, Nov 21, 16:40

main_without_solution.py

# Welcome to the exciting world of Retrieval-Augmented Generation (RAG) systems!
# In this exercise, you'll build a powerful RAG system step by step.
# Get ready to dive into embeddings, vector databases, and AI-powered search!
import os
from dotenv import load_dotenv
from typing import List, Tuple
import sqlite3
import numpy
print(numpy.__version__)
import faiss
import numpy as np
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.memory import ConversationBufferMemory
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, Tool
from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain.tools.render import format_tool_to_openai_function
from langchain.schema.runnable import RunnablePassthrough
from langchain.tools import tool
from langchain.text_splitter import TokenTextSplitter
from langchain.schema import Document
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer
from read_pdf import read_pdf
# Let's start by setting up our environment and initializing our models
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_KEY")
# Initialize SentenceTransformer and its underlying tokenizer
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
def create_sqlite_tables(db_path: str) -> None:
"""
Create SQLite tables for storing document chunks and their embeddings.
This function sets up the foundation of our RAG system's database. It creates
two tables: 'chunks' for storing text chunks and their metadata, and 'embeddings'
for storing the vector representations of these chunks.
Args:
db_path (str): The file path where the SQLite database will be created or accessed.
Returns:
None
Fun fact: SQLite is so reliable it's used in airplanes and smartphones!
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chunk_content TEXT,
source_document TEXT,
start_page INTEGER
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS embeddings (
chunk_id INTEGER,
embedding BLOB,
FOREIGN KEY (chunk_id) REFERENCES chunks (id)
)
''')
conn.commit()
conn.close()
def chunk_document(pages: List[Document], source: str) -> List[Tuple[str, str, int]]:
"""
Chunk the document pages, handling chunks that cross page boundaries.
This function is like a master chef slicing a long document into bite-sized pieces.
It ensures that each chunk is just the right size for our model to digest, while
keeping track of where each chunk came from.
Args:
pages (List[Document]): List of Document objects, each representing a page.
source (str): The source document name.
Returns:
List[Tuple[str, str, int]]: List of (chunk_text, source, start_page).
"""
# initialization
text_splitter = TokenTextSplitter(chunk_size=500, chunk_overlap=200)
result = []
previous_last_chunk = ""
current_page = 1
chunk_start_page = 1
for page in pages:
pass
# TODO: concatenate the current page content with the last chunk of previous page
# TODO: chunk this concatenation
# Hint: use text_splitter.split_text() method
# TODO: add all the chunks but the last one to the result
# add the last chunk of the last page to the result
if previous_last_chunk:
result.append((previous_last_chunk, source, chunk_start_page))
return result
def embed_chunks(chunks: List[str], local: bool = True) -> np.ndarray:
"""
Embed a list of text chunks using either a local SentenceTransformer model or OpenAI's embedding model.
This function is like a translator, converting our text chunks into a language
that our AI models can understand - the language of vectors!
Args:
chunks (List[str]): The list of text chunks to be embedded.
local (bool): If True, use the local SentenceTransformer model. If False, use OpenAI's model.
Returns:
np.ndarray: The embedding vectors for the chunks.
Exercise: Try implementing the OpenAI embedding method. How does it compare to the local model?
"""
if local:
pass
# TODO: Implement the local SentenceTransformer embedding method here
# Hint: You'll need to use the model.encode() method
else:
# TODO: Implement OpenAI embedding method here
# Hint: You'll need to use the openai.Embedding.create() method
pass
def process_and_store_chunks(chunks: List[Tuple[str, str, int]], db_path: str, local: bool = True) -> None:
"""
Process the input chunks, embed them, and store in the database.
This function is like a librarian, taking our chunks of text, creating a special
index for each (the embedding), and carefully storing both in our database.
Args:
chunks (List[Tuple[str, str, int]]): List of (chunk_text, source_document, start_page) tuples.
db_path (str): Path to the SQLite database file.
local (bool): Whether to use the local embedding model or OpenAI's.
Returns:
None
Challenge: Can you modify this function to batch process chunks for better efficiency?
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for chunk_text, source_document, start_page in chunks:
# TODO define the sql query to insert the chunk into the database
# Hint the cursor usage if of the form cursor.execute("INSERT INTO table VALUES (?, ?, ?)", (var1, var2, var3))
query = ""
cursor.execute(
query,
()
)
chunk_id = cursor.lastrowid
# TODO: Embed the chunk using the embed_chunks function
# TODO: Store the embedding in the database
# Hint: You'll need to convert the embedding to bytes using the tobytes() method
query = ""
cursor.execute(
query,
()
)
conn.commit()
conn.close()
def create_faiss_index(db_path: str) -> faiss.Index:
"""
Create a FAISS index from the stored embeddings in the database.
This function is like building a high-tech library catalog. It takes all our
stored embeddings and organizes them in a way that allows for super-fast searching!
Args:
db_path (str): Path to the SQLite database file.
Returns:
faiss.Index: The created FAISS index.
Fun fact: FAISS can handle billions of vectors, making it perfect for large-scale search systems!
"""
# TODO: Implement the function to create and return a FAISS index
# Hint: You'll need to retrieve embeddings from the database and use faiss.IndexFlatL2
# create conn and cursor
# retrieve embeddings from the database
# Hint: to read an embedding stored using tobytes() method, you can use np.frombuffer(<database result>, dtype=np.float32)
# close the connection
# create the index
dimension = '?' #TODO: get the dimension of the embeddings
index = '?' # TODO create the index using faiss.IndexFlatL2
# add the embeddings to the index
# Hint: use the add() method of the index
return index
def process_pdf(file_path, db_path, local=True):
# create a connection to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# TODO: check if document already exists in the database
# close the connection
conn.close()
# read the pdf file
pages = '?'
source = '?'
chunks = '?'
# process and store the chunks
# Hint: use the process_and_store_chunks function
def search_engine(query: str, faiss_index: faiss.Index, db_path: str, k: int = 5) -> List[Tuple[str, float, str, int]]:
"""
Search for relevant chunks using the query and FAISS index.
This function is the heart of our RAG system. It takes a question, finds the most
relevant information in our database, and returns it. It's like having a super-smart
research assistant at your fingertips!
Args:
query (str): The search query.
faiss_index (faiss.Index): The FAISS index for similarity search.
db_path (str): Path to the SQLite database file.
k (int): Number of top results to return.
Returns:
List[Tuple[str, float, str, int]]: List of (chunk_content, similarity_score, source_document, start_page).
Exercise: Can you modify this function to also return the actual similarity scores?
"""
# TODO: Implement the search functionality
# Hint: You'll need to embed the query, use faiss_index.search(), and fetch corresponding chunks from the database
# in the following code, you will implement the agent that uses the search engine to answer questions using langchain
# Some example and help can be found here: https://python.langchain.com/docs/how_to/agent_executor/
@tool
def search_tool(query: str) -> str:
"""
Search for relevant information using the query.
"""
# TODO: Implement this function, you have to find a way to let the llm know which chunk comes from where so that we can add the sources in the end.
# Use your search_engine function and format the results
pass
tools = [] # TODO: Create the search tools list using the search_tool function and the Tool class from langchain
# TODO Use ChatOpenAI from LangChain. Choose an appropriate model and temperature.
llm = '<your code here>'
# TODO Create the prompt template in the file system_prompt.txt
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the full path to the system_prompt.txt file
system_prompt_path = os.path.join(current_dir, 'system_prompt.txt')
# Read the system prompt from the file
with open(system_prompt_path, 'r') as file:
system_prompt = file.read().strip()
# Use ChatPromptTemplate.from_messages to create a prompt that instructs the AI
# on how to use the search tool and format its responses
prompt = ChatPromptTemplate.from_messages([
("system",
system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# Set up the memory
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# Create the agent
# Use the RunnablePassthrough, prompt, llm, and OpenAIFunctionsAgentOutputParser
# to create the agent you can find some infos here: https://github.com/langchain-ai/langchain/discussions/18591
# agent = (
# {
# "input": # TODO Implement the input format
# "chat_history": # TODO Implement the chat history format
# "agent_scratchpad": # TODO Implement the agent scratchpad format
# }
# | # TODO: Use the prompt
# | # TODO: Use the language model with tools
# | # TODO: Use the output parser
# )
# 4.7: Create the agent executor
agent_executor = '<your code here>' # TODO: Use the AgentExecutor to create the agent executor
import re
def run_agent_conversation() -> None:
"""
Run the LangChain agent in a console-based conversation loop.
"""
print("Welcome to the RAG system. Type 'exit' to end the conversation.")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
while True:
user_input = '<Your code here>' #TODO: Get user input
if user_input.lower() == 'exit':
break
response = '<Your code here>'
## the output contains the sources in the format [[id]]. we use a regex to extract the ids and get the sources
ids = '<Your code here>'
for id in ids:
# fetch the source and page from the database
'<Your code here>'
# replace the id with the source
response["output"] = '<Your code here>'
print("Assistant:", response["output"])
conn.close()
if __name__ == "__main__":
print("Welcome to your RAG system building adventure!"
)
LOCAL = os.getenv("LOCAL", "True").lower() == "true"
QUICK_DEMO = os.getenv("QUICK_DEMO", "False").lower() == "true"
# in your .env file, set LOCAL to False if you want to use the openai embedding model
# set QUICK_DEMO to False if you want to run the code on the entirety of the data
# namely: add the following lines to your .env file
# LOCAL=False
# QUICK_DEMO=False
if LOCAL:
db_path = "rag_database.sqlite"
else:
db_path = "rag_database_with_openai_embedding.sqlite"
# Initialize the database and FAISS index
create_sqlite_tables(db_path)
# list all files in the data folder
data_folder = './data'
all_files = os.listdir(data_folder)
if QUICK_DEMO:
all_files = all_files[:2]
for file in all_files:
file_path = os.path.join(data_folder, file)
# check if file is a pdf
if file_path.endswith('.pdf'):
process_pdf(file_path, db_path)
# Create FAISS index
faiss_index = create_faiss_index(db_path)
# Run the conversation loop
run_agent_conversation()

Event Timeline