←back to Blog

Building a GPU-Accelerated Ollama LangChain Workflow with RAG Agents, Multi-Session Chat Performance Monitoring

«`html

Building a GPU-Accelerated Ollama LangChain Workflow with RAG Agents and Multi-Session Chat Performance Monitoring

This tutorial outlines the process of creating a GPU-capable local LLM stack that integrates Ollama and LangChain. The workflow includes installing necessary libraries, launching the Ollama server, pulling a model, and wrapping it in a custom LangChain LLM. This setup allows for control over parameters such as temperature, token limits, and context. Additionally, we introduce a Retrieval-Augmented Generation (RAG) layer that processes PDFs or text, chunks them, embeds them using Sentence-Transformers, and provides grounded answers. We also manage multi-session chat memory, register tools (web search and RAG query), and deploy an agent that determines when to utilize these tools.

Target Audience Analysis

The target audience for this tutorial includes:

  • Data scientists and AI engineers interested in implementing advanced AI workflows.
  • Business managers looking to leverage AI for enhanced decision-making and operational efficiency.
  • Developers seeking to integrate AI capabilities into existing applications.

Pain Points:

  • Difficulty in managing and deploying AI models efficiently.
  • Challenges in integrating multiple AI components into a cohesive workflow.
  • Need for real-time performance monitoring and optimization of AI systems.

Goals:

  • To create a robust AI system that can handle complex queries and provide accurate responses.
  • To streamline the process of integrating various AI tools and libraries.
  • To enhance user experience through efficient chat management and retrieval systems.

Interests:

  • Latest advancements in AI and machine learning technologies.
  • Practical applications of AI in business and technology.
  • Tools and frameworks that facilitate AI development.

Communication Preferences:

  • Technical documentation with clear instructions and examples.
  • Visual aids such as diagrams and flowcharts to illustrate complex concepts.
  • Interactive tutorials that allow for hands-on experimentation.

Installation and Setup

We begin by installing the required packages for the Colab environment:

import os
import sys
import subprocess
import time
import threading
import queue
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from contextlib import contextmanager
import asyncio
from concurrent.futures import ThreadPoolExecutor

def install_packages():
    """Install required packages for Colab environment"""
    packages = [
        "langchain",
        "langchain-community",
        "langchain-core",
        "chromadb",
        "sentence-transformers",
        "faiss-cpu",
        "pypdf",
        "python-docx",
        "requests",
        "psutil",
        "pyngrok",
        "gradio"
    ]
   
    for package in packages:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

install_packages()

Next, we define the configuration for the Ollama setup:

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4

This configuration allows us to manage runtime settings effectively, including model name, API endpoint, and generation behavior.

Ollama Manager

The OllamaManager class is responsible for installing, starting, and managing the Ollama server:

class OllamaManager:
    """Advanced Ollama manager for Colab environment"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.process = None
        self.is_running = False
        self.models_cache = {}
        self.performance_monitor = PerformanceMonitor()
       
    def install_ollama(self):
        """Install Ollama in Colab environment"""
        try:
            subprocess.run([
                "curl", "-fsSL", "https://ollama.com/install.sh", "-o", "/tmp/install.sh"
            ], check=True)
           
            subprocess.run(["bash", "/tmp/install.sh"], check=True)
            print("Ollama installed successfully")
           
        except subprocess.CalledProcessError as e:
            print(f"Failed to install Ollama: {e}")
            raise
   
    def start_server(self):
        """Start Ollama server with GPU support"""
        if self.is_running:
            print("Ollama server is already running")
            return
           
        try:
            env = os.environ.copy()
            env["OLLAMA_NUM_PARALLEL"] = str(self.config.threads)
            env["OLLAMA_MAX_LOADED_MODELS"] = "3"
           
            self.process = subprocess.Popen(
                ["ollama", "serve"],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
           
            time.sleep(5)
           
            if self.health_check():
                self.is_running = True
                print("Ollama server started successfully")
                self.performance_monitor.start()
            else:
                raise Exception("Server failed to start properly")
               
        except Exception as e:
            print(f"Failed to start Ollama server: {e}")
            raise

This class also includes methods for health checks, pulling models, and listing available local models.

Performance Monitoring

The PerformanceMonitor class tracks CPU, memory, and inference times while the Ollama server is running:

class PerformanceMonitor:
    """Monitor system performance and resource usage"""
   
    def __init__(self):
        self.monitoring = False
        self.stats = {
            "cpu_usage": [],
            "memory_usage": [],
            "gpu_usage": [],
            "inference_times": []
        }
        self.monitor_thread = None
   
    def start(self):
        """Start performance monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
   
    def stop(self):
        """Stop performance monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
   
    def _monitor_loop(self):
        """Main monitoring loop"""
        while self.monitoring:
            try:
                cpu_percent = psutil.cpu_percent(interval=1)
                memory = psutil.virtual_memory()
               
                self.stats["cpu_usage"].append(cpu_percent)
                self.stats["memory_usage"].append(memory.percent)
               
                for key in ["cpu_usage", "memory_usage"]:
                    if len(self.stats[key]) > 100:
                        self.stats[key] = self.stats[key][-100:]
               
                time.sleep(5)
               
            except Exception as e:
                print(f"Monitoring error: {e}")
   
    def get_stats(self) -> Dict[str, Any]:
        """Get current performance statistics"""
        return {
            "avg_cpu": sum(self.stats["cpu_usage"][-10:]) / max(len(self.stats["cpu_usage"][-10:]), 1),
            "avg_memory": sum(self.stats["memory_usage"][-10:]) / max(len(self.stats["memory_usage"][-10:]), 1),
            "total_inferences": len(self.stats["inference_times"]),
            "avg_inference_time": sum(self.stats["inference_times"]) / max(len(self.stats["inference_times"]), 1)
        }

This monitoring system allows for real-time tracking of resource usage, which is crucial for optimizing performance during model inference.

Retrieval-Augmented Generation System

The RAGSystem class integrates the LLM with a retrieval mechanism:

class RAGSystem:
    """Retrieval-Augmented Generation system"""
   
    def __init__(self, llm: OllamaLLM, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.llm = llm
        self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
        self.vector_store = None
        self.qa_chain = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
   
    def add_documents(self, file_paths: List[str]):
        """Add documents to the vector store"""
        documents = []
       
        for file_path in file_paths:
            try:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                else:
                    loader = TextLoader(file_path)
               
                docs = loader.load()
                documents.extend(docs)
               
            except Exception as e:
                print(f"Error loading {file_path}: {e}")
       
        if documents:
            splits = self.text_splitter.split_documents(documents)
           
            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(splits, self.embeddings)
            else:
                self.vector_store.add_documents(splits)
           
            self.qa_chain = RetrievalQA.from_chain_type(
                llm=self.llm,
                chain_type="stuff",
                retriever=self.vector_store.as_retriever(search_kwargs={"k": 3}),
                return_source_documents=True
            )
           
            print(f"Added {len(splits)} document chunks to vector store")

This class allows for the addition of documents to the vector store and enables querying of those documents using RAG.

Conversation Management

The ConversationManager class manages chat sessions with memory capabilities:

class ConversationManager:
    """Manage conversation history and memory"""
   
    def __init__(self, llm: OllamaLLM, memory_type: str = "buffer"):
        self.llm = llm
        self.conversations = {}
        self.memory_type = memory_type
       
    def get_conversation(self, session_id: str) -> ConversationChain:
        """Get or create conversation for session"""
        if session_id not in self.conversations:
            if self.memory_type == "buffer":
                memory = ConversationBufferWindowMemory(k=10)
            elif self.memory_type == "summary":
                memory = ConversationSummaryBufferMemory(
                    llm=self.llm,
                    max_token_limit=1000
                )
            else:
                memory = ConversationBufferWindowMemory(k=10)
           
            self.conversations[session_id] = ConversationChain(
                llm=self.llm,
                memory=memory,
                verbose=True
            )
       
        return self.conversations[session_id]
   
    def chat(self, session_id: str, message: str) -> str:
        """Chat with specific session"""
        conversation = self.get_conversation(session_id)
        return conversation.predict(input=message)
   
    def clear_session(self, session_id: str):
        """Clear conversation history for session"""
        if session_id in self.conversations:
            del self.conversations[session_id]

This class allows for the management of multiple chat sessions, each with its own memory type, enabling a more personalized user experience.

Conclusion

In summary, this tutorial provides a comprehensive guide to building a GPU-accelerated workflow using Ollama and LangChain. The integration of RAG agents and multi-session chat performance monitoring enhances the capabilities of AI systems, making them more efficient and user-friendly. This modular approach allows for easy adaptation and extension of the system to meet various business needs.

For further exploration, download the complete code and experiment with the setup in your environment.

All credit for this research goes to the researchers of this project. Subscribe now to our AI Newsletter for more insights.

«`