Introduction 

In the second part of our series on building a RAG application on a Raspberry Pi, we’ll expand on the foundation we laid in the first part, where we created and tested the core pipeline. In the first part, we created the core pipeline and tested it to ensure everything worked as expected. Now, we’re going to take things a step further by building a FastAPI application to serve our RAG pipeline and creating a Reflex app to give users a simple and interactive way to access it. This part will guide you through setting up the FastAPI back-end, designing the front-end with Reflex, and getting everything up and running on your Raspberry Pi. By the end, you’ll have a complete, working application that’s ready for real-world use.

Learning Objectives

  • Set up a FastAPI back-end to integrate with the existing RAG pipeline and process queries efficiently.
  • Design a user-friendly interface using Reflex to interact with the FastAPI back-end and the RAG pipeline.
  • Create and test API endpoints for querying and document ingestion, ensuring smooth operation with FastAPI.
  • Deploy and test the complete application on a Raspberry Pi, ensuring both back-end and front-end components function seamlessly.
  • Understand the integration between FastAPI and Reflex for a cohesive RAG application experience.
  • Implement and troubleshoot FastAPI and Reflex components to provide a fully operational RAG application on a Raspberry Pi.

If you missed the previous edition, be sure to check it out here: Self-Hosting RAG Applications on Edge Devices with Langchain and Ollama – Part I.

This article was published as a part of the Data Science Blogathon.

Creating Python Environment

Before we start with creating the application we need to setup the environment. Create an environment and install the below dependencies:

deeplake 
boto3==1.34.144 
botocore==1.34.144 
fastapi==0.110.3 
gunicorn==22.0.0 
httpx==0.27.0 
huggingface-hub==0.23.4 
langchain==0.2.6 
langchain-community==0.2.6 
langchain-core==0.2.11 
langchain-experimental==0.0.62 
langchain-text-splitters==0.2.2 
langsmith==0.1.83 
marshmallow==3.21.3 
numpy==1.26.4 
pandas==2.2.2 
pydantic==2.8.2 
pydantic_core==2.20.1 
PyMuPDF==1.24.7 
PyMuPDFb==1.24.6 
python-dotenv==1.0.1 
pytz==2024.1 
PyYAML==6.0.1 
reflex==0.5.6 
requests==2.32.3
reflex==0.5.6
reflex-hosting-cli==0.1.13

Once the required packages are installed, we need to have the required models present in the device. We will do this using Ollama. Follow the steps from Part-1 of this article to download both the language and embedding models. Finally, create two directories for the back-end and front-end applications.

Once the models are pulled using Ollama, we are ready to build the final application.

Developing the Back-End with FastAPI

In the Part-1 of this article, we have built the RAG pipeline having both the Ingestion and QnA modules. We have tested both the pipelines using some documents and they were perfectly working. Now we need to wrap the pipeline with FastAPI to create consumable API. This will help us integrate it with any front-end application like Streamlit, Chainlit, Gradio, Reflex, React, Angular etc. Let’s start by building a structure for the application. Following the structure is completely optional, but make sure to check the dependency imports if you follow a different structure to create the app.

Below is the tree structure we will follow:

backend
├── app.py
├── requirements.txt
└── src
    ├── config.py
    ├── doc_loader
    │   ├── base_loader.py
    │   ├── __init__.py
    │   └── pdf_loader.py
    ├── ingestion.py
    ├── __init__.py
    └── qna.py

Let’s start with the config.py. This file will contain all the configurable options for the application, like the Ollama URL, LLM name and the embeddings model name. Below is an example:

LANGUAGE_MODEL_NAME = "phi3"
EMBEDDINGS_MODEL_NAME = "nomic-embed-text"
OLLAMA_URL = "http://localhost:11434"

The base_loader.py file contains the parent document loader class that will be inherited by children document loader. In this application we are only working with PDF files, so a Child PDFLoader class will be
created that will inherit the BaseLoader class.

Below are the contents of base_loader.py and pdf_loader.py:

# base_loader.py
from abc import ABC, abstractmethod

class BaseLoader(ABC):
    def __init__(self, file_path: str) -> None:
        self.file_path = file_path

    @abstractmethod
    async def load_document(self):
        pass


# pdf_loader.py
import os

from .base_loader import BaseLoader
from langchain.schema import Document
from langchain.document_loaders.pdf import PyMuPDFLoader
from langchain.text_splitter import CharacterTextSplitter


class PDFLoader(BaseLoader):
    def __init__(self, file_path: str) -> None:
        super().__init__(file_path)

    async def load_document(self):
        self.file_name = os.path.basename(self.file_path)
        loader = PyMuPDFLoader(file_path=self.file_path)

        text_splitter = CharacterTextSplitter(
            separator="\n",
            chunk_size=1000,
            chunk_overlap=200,
        )
        pages = await loader.aload()
        total_pages = len(pages)
        chunks = []
        for idx, page in enumerate(pages):
            chunks.append(
                Document(
                    page_content=page.page_content,
                    metadata=dict(
                        {
                            "file_name": self.file_name,
                            "page_no": str(idx + 1),
                            "total_pages": str(total_pages),
                        }
                    ),
                )
            )

        final_chunks = text_splitter.split_documents(chunks)
        return final_chunks

We have discussed the working of pdf_loader in the Part-1 of the article.

Next, let’s build the Ingestion class. This is same as the one we built in the Part-1 of this article.

Code for Ingestion Class

import os
import config as cfg

from pinecone import Pinecone
from langchain.vectorstores.deeplake import DeepLake
from langchain.embeddings.ollama import OllamaEmbeddings
from .doc_loader import PDFLoader

class Ingestion:
    """Document Ingestion pipeline."""
    def __init__(self):
        try:
            self.embeddings = OllamaEmbeddings(
                model=cfg.EMBEDDINGS_MODEL_NAME,
                base_url=cfg.OLLAMA_URL,
                show_progress=True,
            )
            self.vector_store = DeepLake(
                dataset_path="data/text_vectorstore",
                embedding=self.embeddings,
                num_workers=4,
                verbose=False,
            )
        except Exception as e:
            raise RuntimeError(f"Failed to initialize Ingestion system. ERROR: {e}")

    async def create_and_add_embeddings(
        self,
        file: str,
    ):
        try:
            loader = PDFLoader(
                file_path=file,
            )

            chunks = await loader.load_document()
            size = await self.vector_store.aadd_documents(documents=chunks)
            return len(size)
        except (ValueError, RuntimeError, KeyError, TypeError) as e:
            raise Exception(f"ERROR: {e}")

Now that we have setup the Ingestion class, we’ll go forward with creating the QnA class. This too is same as the one we created in the Part-1 of this article.

Code for QnA Class

import os
import config as cfg

from pinecone import Pinecone
from langchain.vectorstores.deeplake import DeepLake
from langchain.embeddings.ollama import OllamaEmbeddings
from langchain_community.llms.ollama import Ollama
from .doc_loader import PDFLoader

class QnA:
    """Document Ingestion pipeline."""
    def __init__(self):
        try:
            self.embeddings = OllamaEmbeddings(
                model=cfg.EMBEDDINGS_MODEL_NAME,
                base_url=cfg.OLLAMA_URL,
                show_progress=True,
            )
            self.model = Ollama(
                model=cfg.LANGUAGE_MODEL_NAME,
                base_url=cfg.OLLAMA_URL,
                verbose=True,
                temperature=0.2,
            )
            self.vector_store = DeepLake(
                dataset_path="data/text_vectorstore",
                embedding=self.embeddings,
                num_workers=4,
                verbose=False,
            )
            self.retriever = self.vector_store.as_retriever(
                search_type="similarity",
                search_kwargs={
                    "k": 10,
                },
            )
        except Exception as e:
            raise RuntimeError(f"Failed to initialize Ingestion system. ERROR: {e}")

    def create_rag_chain(self):
        try:
            system_prompt = """\n\nContext: {context}"
            """
            prompt = ChatPromptTemplate.from_messages(
                [
                    ("system", system_prompt),
                    ("human", "{input}"),
                ]
            )
            question_answer_chain = create_stuff_documents_chain(self.model, prompt)
            rag_chain = create_retrieval_chain(self.retriever, question_answer_chain)

            return rag_chain
        except Exception as e:
            raise RuntimeError(f"Failed to create retrieval chain. ERROR: {e}")

With this we have finished creating the code functionalities of the RAG app. Now let’s wrap the app with FastAPI.

Code for the FastAPI Application

import sys
import os
import uvicorn

from src import QnA, Ingestion
from fastapi import FastAPI, Request, File, UploadFile
from fastapi.responses import StreamingResponse

app = FastAPI()

ingestion = Ingestion()
chatbot = QnA()
rag_chain = chatbot.create_rag_chain()


@app.get("https://www.analyticsvidhya.com/")
def hello():
    return {"message": "API Running in server 8089"}


@app.post("/query")
async def ask_query(request: Request):
    data = await request.json()
    question = data.get("question")

    async def event_generator():
        for chunk in rag_chain.pick("answer").stream({"input": question}):
            yield chunk

    return StreamingResponse(event_generator(), media_type="text/plain")


@app.post("/ingest")
async def ingest_document(file: UploadFile = File(...)):
    try:
        os.makedirs("files", exist_ok=True)
        file_location = f"files/{file.filename}"
        with open(file_location, "wb+") as file_object:
            file_object.write(file.file.read())

        size = await ingestion.create_and_add_embeddings(file=file_location)
        return {"message": f"File ingested! Document count: {size}"}
    except Exception as e:
        return {"message": f"An error occured: {e}"}


if __name__ == "__main__":
    try:
        uvicorn.run(app, host="0.0.0.0", port=8089)
    except KeyboardInterrupt as e:
        print("App stopped!")

Let’s breakdown the app by each endpoints:

  • First we initialize the FastAPI app, the Ingestion and the QnA objects. We then create a RAG chain using the create_rag_chain method of QnA class.
  • Our first endpoint is a simple GET method. This will help us know whether the app is healthy or not. Think of it like a ‘Hello World’ endpoint.
  • The second is the query endpoint. This is a POST method and will be used to run the chain. It takes in a request parameter, from which we extract the user’s query. Then we create a asynchronous method that acts as an asynchronous wrapper around the chain.stream function call. We need to do this to allow FastAPI to handle the LLM’s stream function call, to get a ChatGPT-like experience in the chat interface. We then wrap the asynchronous method with StreamingResponse class and return it.
  • The third endpoint is the ingestion endpoint. It also is a POST method that takes in the entire file as bytes as input. We store this file in the local directory and then ingest it using the create_and_add_embeddings method of Ingestion class.

Finally, we run the app using uvicorn package, using host and port. To test the app, simply run the application using the following command:

python app.py
Code for the FastAPI Application

Use a API testing IDE like Postman, Insomnia or Bruno for testing the application. You can also use Thunder Client extension to do the same.

Testing the Ingestion endpoint:

Testing the Ingestion endpoint

Testing the query endpoint:

Testing the query endpoint

Designing the Front-End with Reflex

We have successfully created a FastAPI app for the backend of our RAG application. It’s time to build our front-end. You can chose any front-end library for this, but for this particular article we will build the front-end using Reflex. Reflex is a python-only front-end library, created to build web applications, purely using python. It proves us with templates for common applications like calculator, image generation and chatbot. We will use the chatbot application template as a start for our user interface. Our final app will have the following structure, so let’s have it here for reference.

Frontend Directory

We will have a frontend directory for this:

frontend
├── assets
│   └── favicon.ico
├── docs
│   └── demo.gif
├── chat
│   ├── components
│   │   ├── chat.py
│   │   ├── file_upload.py
│   │   ├── __init__.py
│   │   ├── loading_icon.py
│   │   ├── modal.py
│   │   └── navbar.py
│   ├── __init__.py
│   ├── chat.py
│   └── state.py
├── requirements.txt
├── rxconfig.py
└── uploaded_files

Steps for Final App

Follow the steps to prepare the grounding for the final app.

Step1: Clone the chat template repository in the frontend directory

git clone https://github.com/reflex-dev/reflex-chat.git .

Step2: Run the following command to initialize the directory as a reflex app

reflex init
Run the following command to initialize the directory as a reflex app

This will setup the reflex app and will be ready to run and develop. 

Step3: Test the app, use the following command from inside the frontend directory

reflex run
Test the app, use the following command from inside the frontend directory

Let’s start modifying the components. First let’s modify the chat.py file.

Below is the code for the same:

import reflex as rx
from reflex_demo.components import loading_icon
from reflex_demo.state import QA, State

message_style = dict(
    display="inline-block",
    padding="0 10px",
    border_radius="8px",
    max_width=["30em", "30em", "50em", "50em", "50em", "50em"],
)


def message(qa: QA) -> rx.Component:
    """A single question/answer message.

    Args:
        qa: The question/answer pair.

    Returns:
        A component displaying the question/answer pair.
    """
    return rx.box(
        rx.box(
            rx.markdown(
                qa.question,
                background_color=rx.color("mauve", 4),
                color=rx.color("mauve", 12),
                **message_style,
            ),
            text_align="right",
            margin_top="1em",
        ),
        rx.box(
            rx.markdown(
                qa.answer,
                background_color=rx.color("accent", 4),
                color=rx.color("accent", 12),
                **message_style,
            ),
            text_align="left",
            padding_top="1em",
        ),
        width="100%",
    )


def chat() -> rx.Component:
    """List all the messages in a single conversation."""
    return rx.vstack(
        rx.box(rx.foreach(State.chats[State.current_chat], message), width="100%"),
        py="8",
        flex="1",
        width="100%",
        max_width="50em",
        padding_x="4px",
        align_self="center",
        overflow="hidden",
        padding_bottom="5em",
    )


def action_bar() -> rx.Component:
    """The action bar to send a new message."""
    return rx.center(
        rx.vstack(
            rx.chakra.form(
                rx.chakra.form_control(
                    rx.hstack(
                        rx.input(
                            rx.input.slot(
                                rx.tooltip(
                                    rx.icon("info", size=18),
                                    content="Enter a question to get a response.",
                                )
                            ),
                            placeholder="Type something...",
                            id="question",
                            width=["15em", "20em", "45em", "50em", "50em", "50em"],
                        ),
                        rx.button(
                            rx.cond(
                                State.processing,
                                loading_icon(height="1em"),
                                rx.text("Send", font_family="Ubuntu"),
                            ),
                            type="submit",
                        ),
                        align_items="center",
                    ),
                    is_disabled=State.processing,
                ),
                on_submit=State.process_question,
                reset_on_submit=True,
            ),
            rx.text(
                "ReflexGPT may return factually incorrect or misleading responses. Use discretion.",
                text_align="center",
                font_size=".75em",
                color=rx.color("mauve", 10),
                font_family="Ubuntu",
            ),
            rx.logo(margin_top="-1em", margin_bottom="-1em"),
            align_items="center",
        ),
        position="sticky",
        bottom="0",
        left="0",
        padding_y="16px",
        backdrop_filter="auto",
        backdrop_blur="lg",
        border_top=f"1px solid {rx.color('mauve', 3)}",
        background_color=rx.color("mauve", 2),
        align_items="stretch",
        width="100%",
    )

The changes are minimal from the one present natively in the template.

Next, we will edit the chat.py app. This is the main chat component.

Code for Main Chat Component

Below is the code for it:

import reflex as rx
from reflex_demo.components import chat, navbar, upload_form
from reflex_demo.state import State


@rx.page(route="/chat", title="RAG Chatbot")
def chat_interface() -> rx.Component:
    return rx.chakra.vstack(
        navbar(),
        chat.chat(),
        chat.action_bar(),
        background_color=rx.color("mauve", 1),
        color=rx.color("mauve", 12),
        min_height="100vh",
        align_items="stretch",
        spacing="0",
    )


@rx.page(route="https://www.analyticsvidhya.com/", title="RAG Chatbot")
def index() -> rx.Component:
    return rx.chakra.vstack(
        navbar(),
        upload_form(),
        background_color=rx.color("mauve", 1),
        color=rx.color("mauve", 12),
        min_height="100vh",
        align_items="stretch",
        spacing="0",
    )


# Add state and page to the app.
app = rx.App(
    theme=rx.theme(
        appearance="dark",
        accent_color="jade",
    ),
    stylesheets=["https://fonts.googleapis.com/css2?family=Ubuntu&display=swap"],
    style={
        "font_family": "Ubuntu",
    },
)
app.add_page(index)
app.add_page(chat_interface)

This is the code for the chat interface. We have only added the Font family to the app config, the rest of the code is the same.

Next let’s edit the state.py file. This is where the frontend will make call to the API endpoints for response.

Editing state.py File

import requests
import reflex as rx


class QA(rx.Base):
    question: str
    answer: str


DEFAULT_CHATS = {
    "Intros": [],
}


class State(rx.State):
    chats: dict[str, list[QA]] = DEFAULT_CHATS
    current_chat = "Intros"
    url: str = "http://localhost:8089/query"
    question: str
    processing: bool = False
    new_chat_name: str = ""

    def create_chat(self):
        """Create a new chat."""
        # Add the new chat to the list of chats.
        self.current_chat = self.new_chat_name
        self.chats[self.new_chat_name] = []

    def delete_chat(self):
        """Delete the current chat."""
        del self.chats[self.current_chat]
        if len(self.chats) == 0:
            self.chats = DEFAULT_CHATS
        self.current_chat = list(self.chats.keys())[0]

    def set_chat(self, chat_name: str):
        """Set the name of the current chat.

        Args:
            chat_name: The name of the chat.
        """
        self.current_chat = chat_name

    @rx.var
    def chat_titles(self) -> list[str]:
        """Get the list of chat titles.

        Returns:
            The list of chat names.
        """
        return list(self.chats.keys())

    async def process_question(self, form_data: dict[str, str]):
        # Get the question from the form
        question = form_data["question"]

        # Check if the question is empty
        if question == "":
            return

        model = self.openai_process_question

        async for value in model(question):
            yield value

    async def openai_process_question(self, question: str):
        """Get the response from the API.

        Args:
            form_data: A dict with the current question.
        """
        # Add the question to the list of questions.
        qa = QA(question=question, answer="")
        self.chats[self.current_chat].append(qa)
        payload = {"question": question}

        # Clear the input and start the processing.
        self.processing = True
        yield

        response = requests.post(self.url, json=payload, stream=True)

        # Stream the results, yielding after every word.
        for answer_text in response.iter_content(chunk_size=512):
            # Ensure answer_text is not None before concatenation
            answer_text = answer_text.decode()
            if answer_text is not None:
                self.chats[self.current_chat][-1].answer += answer_text
            else:
                answer_text = ""
                self.chats[self.current_chat][-1].answer += answer_text
            self.chats = self.chats
            yield

        # Toggle the processing flag.
        self.processing = False

In this file, we have defined the URL for the query endpoint. We have also modified the openai_process_question method to send a POST request to the query endpoint and get the streaming
response, which will be displayed in the chat interface.

Writing Contents of the file_upload.py File

Finally, let’s write the contents of the file_upload.py file. This component will be displayed in the beginning which will allow us to upload the file for ingestion.

import reflex as rx
import os
import time

import requests


class UploadExample(rx.State):
    uploading: bool = False
    ingesting: bool = False
    progress: int = 0
    total_bytes: int = 0
    ingestion_url = "http://127.0.0.1:8089/ingest"

    async def handle_upload(self, files: list[rx.UploadFile]):
        self.ingesting = True
        yield
        for file in files:
            file_bytes = await file.read()
            file_name = file.filename
            files = {
                "file": (os.path.basename(file_name), file_bytes, "multipart/form-data")
            }
            response = requests.post(self.ingestion_url, files=files)
            self.ingesting = False
            yield
            if response.status_code == 200:
                # yield rx.redirect("/chat")
                self.show_redirect_popup()

    def handle_upload_progress(self, progress: dict):
        self.uploading = True
        self.progress = round(progress["progress"] * 100)
        if self.progress >= 100:
            self.uploading = False

    def cancel_upload(self):
        self.uploading = False
        return rx.cancel_upload("upload3")


def upload_form():
    return rx.vstack(
        rx.upload(
            rx.flex(
                rx.text(
                    "Drag and drop file here or click to select file",
                    font_family="Ubuntu",
                ),
                rx.icon("upload", size=30),
                direction="column",
                align="center",
            ),
            id="upload3",
            border="1px solid rgb(233, 233,233, 0.4)",
            margin="5em 0 10px 0",
            background_color="rgb(107,99,246)",
            border_radius="8px",
            padding="1em",
        ),
        rx.vstack(rx.foreach(rx.selected_files("upload3"), rx.text)),
        rx.cond(
            ~UploadExample.ingesting,
            rx.button(
                "Upload",
                on_click=UploadExample.handle_upload(
                    rx.upload_files(
                        upload_id="upload3",
                        on_upload_progress=UploadExample.handle_upload_progress,
                    ),
                ),
            ),
            rx.flex(
                rx.spinner(size="3", loading=UploadExample.ingesting),
                rx.button(
                    "Cancel",
                    on_click=UploadExample.cancel_upload,
                ),
                align="center",
                spacing="3",
            ),
        ),
        rx.alert_dialog.root(
            rx.alert_dialog.trigger(
                rx.button("Continue to Chat", color_scheme="green"),
            ),
            rx.alert_dialog.content(
                rx.alert_dialog.title("Redirect to Chat Interface?"),
                rx.alert_dialog.description(
                    "You will be redirected to the Chat Interface.",
                    size="2",
                ),
                rx.flex(
                    rx.alert_dialog.cancel(
                        rx.button(
                            "Cancel",
                            variant="soft",
                            color_scheme="gray",
                        ),
                    ),
                    rx.alert_dialog.action(
                        rx.button(
                            "Continue",
                            color_scheme="green",
                            variant="solid",
                            on_click=rx.redirect("/chat"),
                        ),
                    ),
                    spacing="3",
                    margin_top="16px",
                    justify="end",
                ),
                style={"max_width": 450},
            ),
        ),
        align="center",
    )

This component will allow us to upload a file and ingest it into the vector store. It uses the ingest endpoint of our FastAPI app to upload and ingest the file. After ingestion, the user can simply move
to the chat interface for asking queries.

With this we have completed building the front-end for our application. Now we will need to test the application using some document.

Testing and Deployment

Now let’s test the application on some manuals or documents. To use the application, we need to run both the back-end app and the reflex app separately. Run the back-end app from it’s directory using the
following command:

python app.py

Wait for the FastAPI to start running. Then in another terminal instance run the front-end app using the following command:

reflex run

One the apps are up and running, got to the following URL to access the reflex app. Initially we would be in the File Upload page. Upload a file and press the upload button.

reflex chat

The file will be uploaded and ingested. This will take a while depending on the document size and
the device specs. Once it’s done, click on the ‘Continue to Chat’ button to move to the chat interface. Write your query and press Send.

Conclusion

In this two part series, you’ve now built a complete and functional RAG application on a Raspberry Pi, from creating the core pipeline to wrapping it with a FastAPI back-end and developing a Reflex-based front-end. With these tools, your RAG pipeline is accessible and interactive, providing real-time query processing through a user-friendly web interface. By mastering these steps, you’ve gained valuable experience in building and deploying end-to-end applications on a compact, efficient platform. This setup opens the door to countless possibilities for deploying AI-driven applications on resource-constrained devices like the Raspberry Pi, making cutting-edge technology more accessible and practical for everyday use.

Key Takeaways

  • A detailed guide is provided on setting up the development environment, including installing necessary dependencies and models using Ollama, ensuring the application is ready for the final build.
  • The article explains how to wrap the RAG pipeline in a FastAPI application, including setting up endpoints for querying the model and ingesting documents, making the pipeline accessible via a web API.
  • The front-end of the RAG application is built using Reflex, a Python-only front-end library. The article demonstrates how to modify the chat application template to create a user-friendly interface for interacting with the RAG pipeline.
  • The article guides on integrating the FastAPI backend with the Reflex front-end and deploying the complete application on a Raspberry Pi, ensuring seamless operation and user accessibility.
  • Practical steps are provided for testing both the ingestion and query endpoints using tools like Postman or Thunder Client, along with running and testing the Reflex front-end to ensure the entire application functions as expected.

Frequently Asked Question

Q1: How can I make the app accessible to myself from anywhere in the World without compromising security?

A. There is a platform named Tailscale that allows your devices to be connected to a private secure network, accessible only to you. You can add your Raspberry Pi and other devices to Tailscale devices and connect to the VPN to access your apps, from anywhere within the world. 

Q2: My application is very slow in terms of ingestion and QnA.

A. That is the constraint due to low hardware specifications of Raspberry Pi. The article is just a head up tutorial on how to start building RAG app using Raspberry Pi and Ollama. 

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.



Source link

Shares:
Leave a Reply

Your email address will not be published. Required fields are marked *