Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F92582332
main_without_solution.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Nov 21, 16:40
Size
13 KB
Mime Type
text/x-python
Expires
Sat, Nov 23, 16:40 (2 d)
Engine
blob
Format
Raw Data
Handle
22435511
Attached To
R13363 Coling_RAG_exercise
main_without_solution.py
View Options
# Welcome to the exciting world of Retrieval-Augmented Generation (RAG) systems!
# In this exercise, you'll build a powerful RAG system step by step.
# Get ready to dive into embeddings, vector databases, and AI-powered search!
import
os
from
dotenv
import
load_dotenv
from
typing
import
List
,
Tuple
import
sqlite3
import
numpy
print
(
numpy
.
__version__
)
import
faiss
import
numpy
as
np
from
langchain.prompts
import
ChatPromptTemplate
,
MessagesPlaceholder
from
langchain.memory
import
ConversationBufferMemory
from
langchain_openai
import
ChatOpenAI
from
langchain.agents
import
AgentExecutor
,
Tool
from
langchain.agents.format_scratchpad
import
format_to_openai_function_messages
from
langchain.agents.output_parsers
import
OpenAIFunctionsAgentOutputParser
from
langchain.tools.render
import
format_tool_to_openai_function
from
langchain.schema.runnable
import
RunnablePassthrough
from
langchain.tools
import
tool
from
langchain.text_splitter
import
TokenTextSplitter
from
langchain.schema
import
Document
from
sentence_transformers
import
SentenceTransformer
from
transformers
import
AutoTokenizer
from
read_pdf
import
read_pdf
# Let's start by setting up our environment and initializing our models
load_dotenv
()
os
.
environ
[
"OPENAI_API_KEY"
]
=
os
.
getenv
(
"OPENAI_KEY"
)
# Initialize SentenceTransformer and its underlying tokenizer
model
=
SentenceTransformer
(
'sentence-transformers/all-MiniLM-L6-v2'
)
tokenizer
=
AutoTokenizer
.
from_pretrained
(
'sentence-transformers/all-MiniLM-L6-v2'
)
def
create_sqlite_tables
(
db_path
:
str
)
->
None
:
"""
Create SQLite tables for storing document chunks and their embeddings.
This function sets up the foundation of our RAG system's database. It creates
two tables: 'chunks' for storing text chunks and their metadata, and 'embeddings'
for storing the vector representations of these chunks.
Args:
db_path (str): The file path where the SQLite database will be created or accessed.
Returns:
None
Fun fact: SQLite is so reliable it's used in airplanes and smartphones!
"""
conn
=
sqlite3
.
connect
(
db_path
)
cursor
=
conn
.
cursor
()
cursor
.
execute
(
'''
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chunk_content TEXT,
source_document TEXT,
start_page INTEGER
)
'''
)
cursor
.
execute
(
'''
CREATE TABLE IF NOT EXISTS embeddings (
chunk_id INTEGER,
embedding BLOB,
FOREIGN KEY (chunk_id) REFERENCES chunks (id)
)
'''
)
conn
.
commit
()
conn
.
close
()
def
chunk_document
(
pages
:
List
[
Document
],
source
:
str
)
->
List
[
Tuple
[
str
,
str
,
int
]]:
"""
Chunk the document pages, handling chunks that cross page boundaries.
This function is like a master chef slicing a long document into bite-sized pieces.
It ensures that each chunk is just the right size for our model to digest, while
keeping track of where each chunk came from.
Args:
pages (List[Document]): List of Document objects, each representing a page.
source (str): The source document name.
Returns:
List[Tuple[str, str, int]]: List of (chunk_text, source, start_page).
"""
# initialization
text_splitter
=
TokenTextSplitter
(
chunk_size
=
500
,
chunk_overlap
=
200
)
result
=
[]
previous_last_chunk
=
""
current_page
=
1
chunk_start_page
=
1
for
page
in
pages
:
pass
# TODO: concatenate the current page content with the last chunk of previous page
# TODO: chunk this concatenation
# Hint: use text_splitter.split_text() method
# TODO: add all the chunks but the last one to the result
# add the last chunk of the last page to the result
if
previous_last_chunk
:
result
.
append
((
previous_last_chunk
,
source
,
chunk_start_page
))
return
result
def
embed_chunks
(
chunks
:
List
[
str
],
local
:
bool
=
True
)
->
np
.
ndarray
:
"""
Embed a list of text chunks using either a local SentenceTransformer model or OpenAI's embedding model.
This function is like a translator, converting our text chunks into a language
that our AI models can understand - the language of vectors!
Args:
chunks (List[str]): The list of text chunks to be embedded.
local (bool): If True, use the local SentenceTransformer model. If False, use OpenAI's model.
Returns:
np.ndarray: The embedding vectors for the chunks.
Exercise: Try implementing the OpenAI embedding method. How does it compare to the local model?
"""
if
local
:
pass
# TODO: Implement the local SentenceTransformer embedding method here
# Hint: You'll need to use the model.encode() method
else
:
# TODO: Implement OpenAI embedding method here
# Hint: You'll need to use the openai.Embedding.create() method
pass
def
process_and_store_chunks
(
chunks
:
List
[
Tuple
[
str
,
str
,
int
]],
db_path
:
str
,
local
:
bool
=
True
)
->
None
:
"""
Process the input chunks, embed them, and store in the database.
This function is like a librarian, taking our chunks of text, creating a special
index for each (the embedding), and carefully storing both in our database.
Args:
chunks (List[Tuple[str, str, int]]): List of (chunk_text, source_document, start_page) tuples.
db_path (str): Path to the SQLite database file.
local (bool): Whether to use the local embedding model or OpenAI's.
Returns:
None
Challenge: Can you modify this function to batch process chunks for better efficiency?
"""
conn
=
sqlite3
.
connect
(
db_path
)
cursor
=
conn
.
cursor
()
for
chunk_text
,
source_document
,
start_page
in
chunks
:
# TODO define the sql query to insert the chunk into the database
# Hint the cursor usage if of the form cursor.execute("INSERT INTO table VALUES (?, ?, ?)", (var1, var2, var3))
query
=
""
cursor
.
execute
(
query
,
()
)
chunk_id
=
cursor
.
lastrowid
# TODO: Embed the chunk using the embed_chunks function
# TODO: Store the embedding in the database
# Hint: You'll need to convert the embedding to bytes using the tobytes() method
query
=
""
cursor
.
execute
(
query
,
()
)
conn
.
commit
()
conn
.
close
()
def
create_faiss_index
(
db_path
:
str
)
->
faiss
.
Index
:
"""
Create a FAISS index from the stored embeddings in the database.
This function is like building a high-tech library catalog. It takes all our
stored embeddings and organizes them in a way that allows for super-fast searching!
Args:
db_path (str): Path to the SQLite database file.
Returns:
faiss.Index: The created FAISS index.
Fun fact: FAISS can handle billions of vectors, making it perfect for large-scale search systems!
"""
# TODO: Implement the function to create and return a FAISS index
# Hint: You'll need to retrieve embeddings from the database and use faiss.IndexFlatL2
# create conn and cursor
# retrieve embeddings from the database
# Hint: to read an embedding stored using tobytes() method, you can use np.frombuffer(<database result>, dtype=np.float32)
# close the connection
# create the index
dimension
=
'?'
#TODO: get the dimension of the embeddings
index
=
'?'
# TODO create the index using faiss.IndexFlatL2
# add the embeddings to the index
# Hint: use the add() method of the index
return
index
def
process_pdf
(
file_path
,
db_path
,
local
=
True
):
# create a connection to the database
conn
=
sqlite3
.
connect
(
db_path
)
cursor
=
conn
.
cursor
()
# TODO: check if document already exists in the database
# close the connection
conn
.
close
()
# read the pdf file
pages
=
'?'
source
=
'?'
chunks
=
'?'
# process and store the chunks
# Hint: use the process_and_store_chunks function
def
search_engine
(
query
:
str
,
faiss_index
:
faiss
.
Index
,
db_path
:
str
,
k
:
int
=
5
)
->
List
[
Tuple
[
str
,
float
,
str
,
int
]]:
"""
Search for relevant chunks using the query and FAISS index.
This function is the heart of our RAG system. It takes a question, finds the most
relevant information in our database, and returns it. It's like having a super-smart
research assistant at your fingertips!
Args:
query (str): The search query.
faiss_index (faiss.Index): The FAISS index for similarity search.
db_path (str): Path to the SQLite database file.
k (int): Number of top results to return.
Returns:
List[Tuple[str, float, str, int]]: List of (chunk_content, similarity_score, source_document, start_page).
Exercise: Can you modify this function to also return the actual similarity scores?
"""
# TODO: Implement the search functionality
# Hint: You'll need to embed the query, use faiss_index.search(), and fetch corresponding chunks from the database
# in the following code, you will implement the agent that uses the search engine to answer questions using langchain
# Some example and help can be found here: https://python.langchain.com/docs/how_to/agent_executor/
@tool
def
search_tool
(
query
:
str
)
->
str
:
"""
Search for relevant information using the query.
"""
# TODO: Implement this function, you have to find a way to let the llm know which chunk comes from where so that we can add the sources in the end.
# Use your search_engine function and format the results
pass
tools
=
[]
# TODO: Create the search tools list using the search_tool function and the Tool class from langchain
# TODO Use ChatOpenAI from LangChain. Choose an appropriate model and temperature.
llm
=
'<your code here>'
# TODO Create the prompt template in the file system_prompt.txt
# Get the directory of the current script
current_dir
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
# Construct the full path to the system_prompt.txt file
system_prompt_path
=
os
.
path
.
join
(
current_dir
,
'system_prompt.txt'
)
# Read the system prompt from the file
with
open
(
system_prompt_path
,
'r'
)
as
file
:
system_prompt
=
file
.
read
()
.
strip
()
# Use ChatPromptTemplate.from_messages to create a prompt that instructs the AI
# on how to use the search tool and format its responses
prompt
=
ChatPromptTemplate
.
from_messages
([
(
"system"
,
system_prompt
),
MessagesPlaceholder
(
variable_name
=
"chat_history"
),
(
"human"
,
"{input}"
),
MessagesPlaceholder
(
variable_name
=
"agent_scratchpad"
),
])
# Set up the memory
memory
=
ConversationBufferMemory
(
memory_key
=
"chat_history"
,
return_messages
=
True
)
# Create the agent
# Use the RunnablePassthrough, prompt, llm, and OpenAIFunctionsAgentOutputParser
# to create the agent you can find some infos here: https://github.com/langchain-ai/langchain/discussions/18591
# agent = (
# {
# "input": # TODO Implement the input format
# "chat_history": # TODO Implement the chat history format
# "agent_scratchpad": # TODO Implement the agent scratchpad format
# }
# | # TODO: Use the prompt
# | # TODO: Use the language model with tools
# | # TODO: Use the output parser
# )
# 4.7: Create the agent executor
agent_executor
=
'<your code here>'
# TODO: Use the AgentExecutor to create the agent executor
import
re
def
run_agent_conversation
()
->
None
:
"""
Run the LangChain agent in a console-based conversation loop.
"""
print
(
"Welcome to the RAG system. Type 'exit' to end the conversation."
)
conn
=
sqlite3
.
connect
(
db_path
)
cursor
=
conn
.
cursor
()
while
True
:
user_input
=
'<Your code here>'
#TODO: Get user input
if
user_input
.
lower
()
==
'exit'
:
break
response
=
'<Your code here>'
## the output contains the sources in the format [[id]]. we use a regex to extract the ids and get the sources
ids
=
'<Your code here>'
for
id
in
ids
:
# fetch the source and page from the database
'<Your code here>'
# replace the id with the source
response
[
"output"
]
=
'<Your code here>'
print
(
"Assistant:"
,
response
[
"output"
])
conn
.
close
()
if
__name__
==
"__main__"
:
print
(
"Welcome to your RAG system building adventure!"
)
LOCAL
=
os
.
getenv
(
"LOCAL"
,
"True"
)
.
lower
()
==
"true"
QUICK_DEMO
=
os
.
getenv
(
"QUICK_DEMO"
,
"False"
)
.
lower
()
==
"true"
# in your .env file, set LOCAL to False if you want to use the openai embedding model
# set QUICK_DEMO to False if you want to run the code on the entirety of the data
# namely: add the following lines to your .env file
# LOCAL=False
# QUICK_DEMO=False
if
LOCAL
:
db_path
=
"rag_database.sqlite"
else
:
db_path
=
"rag_database_with_openai_embedding.sqlite"
# Initialize the database and FAISS index
create_sqlite_tables
(
db_path
)
# list all files in the data folder
data_folder
=
'./data'
all_files
=
os
.
listdir
(
data_folder
)
if
QUICK_DEMO
:
all_files
=
all_files
[:
2
]
for
file
in
all_files
:
file_path
=
os
.
path
.
join
(
data_folder
,
file
)
# check if file is a pdf
if
file_path
.
endswith
(
'.pdf'
):
process_pdf
(
file_path
,
db_path
)
# Create FAISS index
faiss_index
=
create_faiss_index
(
db_path
)
# Run the conversation loop
run_agent_conversation
()
Event Timeline
Log In to Comment