13 min read

How to Build a Complete RAG Agent with Mastra and AI SDK

A practical guide to building a production RAG system with internal knowledge search, external fallback, conversation history, and streaming responses. Featuring a real implementation: AI chat for my portfolio.

Tutorials

/images/blogs/build_complete_rag_agent.png

TL;DR: Build a complete RAG agent that searches your internal docs first, falls back to web search when needed, maintains conversation history across sessions, streams responses in real-time, and orchestrates multiple tools intelligently. I built this for my portfolio and it’s running on this site right now.


Who this is for

This guide is for developers who want to build complete RAG systems, not just proof-of-concepts.

You’ll see how to integrate:

  • Internal knowledge search (vector database)
  • External knowledge fallback (web search)
  • Persistent conversation threads
  • Streaming responses with refresh tolerance
  • Multi-tool orchestration

If you’re building chatbots that need to answer from your own docs while also handling general questions, this is for you.

Not for: Simple Q&A demos or single-document retrieval.


What we’re building

I built an AI chat for my portfolio website that:

  • Knows my content: Searches through my blog posts, projects, and experience
  • Handles general questions: Falls back to web search via Perplexity
  • Remembers conversations: Thread-based history, not ephemeral
  • Streams responses: Real-time typing effect with database persistence
  • Orchestrates tools: Internal search → External search → Custom tools

Try it live: Look for the chat bubble in the bottom-right corner of this website. Click it to ask questions about my projects, blog posts, or experience.

Full source code: GitHub Repository

This is a real production system, not a tutorial toy. It’s running on this site right now, handling real queries. The patterns here scale to any domain - customer support, internal knowledge bases, documentation assistants, etc.


The complete architecture

Here’s how all the pieces fit together:

RAG Agent Architecture

Tech stack:

  • Mastra 0.1.x: Agent framework with tool orchestration
  • Vercel AI SDK 5.0: Streaming and tool calling
  • PostgreSQL + PgVector: Vector store for embeddings
  • OpenRouter: Unified LLM API (Claude, GPT, etc.)
  • Perplexity Sonar Pro: External knowledge via web search
  • tRPC or Next.js API Routes: Type-safe API layer

Why this stack:

  • Mastra handles agent logic and memory
  • AI SDK handles streaming elegantly
  • PgVector gives you vector search in Postgres (no separate vector DB)
  • OpenRouter lets you switch models without code changes
  • Perplexity provides quality web search with citations

Setting up internal knowledge (RAG)

The database schema

First, create your vector storage table:

// schema.ts
import {
  pgTable,
  text,
  vector,
  uuid,
  jsonb,
  timestamp,
} from "drizzle-orm/pg-core";

export const knowledgeVectors = pgTable("knowledge_vectors", {
  id: uuid("id").primaryKey().defaultRandom(),
  embedding: vector("embedding", { dimensions: 1536 }), // OpenAI embeddings
  content: text("content").notNull(),
  metadata: jsonb("metadata").$type<{
    source: string; // e.g., "blog/why-i-love-mastra.md"
    category: string; // e.g., "blog" | "project" | "experience"
    title: string;
    url: string;
    pathPrefix?: string; // For multi-tenant filtering
  }>(),
  createdAt: timestamp("created_at").defaultNow(),
});

// Create the vector index
// Run this SQL migration:
// CREATE INDEX ON knowledge_vectors USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

Document ingestion

Ingest your content (blog posts, projects, etc.):

// ingest.ts
import { openai } from "@ai-sdk/openai";
import { embed } from "ai";
import { db } from "./db";
import { knowledgeVectors } from "./schema";

async function ingestDocument(
  content: string,
  metadata: { source: string; category: string; title: string; url: string }
) {
  // 1. Chunk the content
  const chunks = chunkText(content, { maxSize: 512, overlap: 50 });

  // 2. Generate embeddings
  const embeddings = await Promise.all(
    chunks.map((chunk) =>
      embed({
        model: openai.embedding("text-embedding-3-small"),
        value: chunk,
      })
    )
  );

  // 3. Store in database
  await db.insert(knowledgeVectors).values(
    chunks.map((chunk, i) => ({
      content: chunk,
      embedding: embeddings[i].embedding,
      metadata,
    }))
  );
}

function chunkText(
  text: string,
  options: { maxSize: number; overlap: number }
) {
  const chunks: string[] = [];
  const sentences = text.split(/[.!?]+/);
  let currentChunk = "";

  for (const sentence of sentences) {
    if ((currentChunk + sentence).length > options.maxSize) {
      if (currentChunk) chunks.push(currentChunk.trim());
      currentChunk = sentence;
    } else {
      currentChunk += sentence + ". ";
    }
  }

  if (currentChunk) chunks.push(currentChunk.trim());
  return chunks;
}

// Example: Ingest a blog post
await ingestDocument(blogPostContent, {
  source: "blog/why-i-love-mastra.md",
  category: "blog",
  title: "Why I Love Mastra",
  url: "/blog/why-i-love-mastra",
});

Live implementation: Ingestion script

Pro tip: I also generate synthetic content for the homepage that emphasizes availability for hire, key skills, and contact information. This ensures critical info is always findable even if it’s not in a specific blog post.

The internal search tool

Create a Mastra tool for searching your internal knowledge:

// tools/internal-search.ts
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
import { createOpenAI } from "@ai-sdk/openai";
import { embed } from "ai";
import { PgVector } from "@mastra/pg";

// Use OpenAI SDK with OpenRouter endpoint
const openrouter = createOpenAI({
  apiKey: process.env.OPENROUTER_API_KEY,
  baseURL: "https://openrouter.ai/api/v1",
});

export const internalKnowledgeSearchTool = createTool({
  id: "internal_knowledge_search",
  description:
    "Search internal knowledge base. Use for questions about projects, blog posts, experience, or skills.",
  inputSchema: z.object({
    queryText: z.string().describe("The search query"),
    topK: z
      .number()
      .optional()
      .default(10)
      .describe("Number of results to return"),
  }),
  outputSchema: z.object({
    relevantContext: z.array(
      z.object({
        text: z.string(),
        source: z.string(),
        category: z.string(),
        pathPrefix: z.string().optional(),
        score: z.number(),
      })
    ),
    sources: z.array(
      z.object({
        id: z.string(),
        score: z.number(),
        metadata: z.record(z.string(), z.any()),
      })
    ),
  }),
  execute: async ({ context, runtimeContext }) => {
    const { queryText, topK } = context;

    // Read pathPrefix from RuntimeContext for multi-tenancy
    const pathPrefix = runtimeContext?.get("pathPrefix") as string | undefined;

    // Generate embedding for the query
    const { embedding } = await embed({
      model: openrouter.embedding("text-embedding-3-small"),
      value: queryText,
    });

    // Initialize PgVector
    const pgVector = new PgVector({
      connectionString: process.env.DATABASE_URL!,
    });

    // Build filter for multi-tenancy
    const filter = pathPrefix ? { pathPrefix } : undefined;

    // Query vector store
    const results = await pgVector.query({
      indexName: "portfolio_knowledge",
      queryVector: embedding,
      topK: topK || 10,
      filter,
    });

    // Format results
    const relevantContext = results.map((result) => ({
      text: (result.metadata?.text as string) || "",
      source: (result.metadata?.source as string) || "unknown",
      category: (result.metadata?.category as string) || "",
      pathPrefix: result.metadata?.pathPrefix as string | undefined,
      score: result.score,
    }));

    const sources = results.map((result) => ({
      id: result.id,
      score: result.score,
      metadata: result.metadata || {},
    }));

    return {
      relevantContext,
      sources,
    };
  },
});

Live implementation: Internal search tool


Adding external knowledge fallback

Your agent shouldn’t be limited to only what’s in your docs. Add Perplexity for web search:

// tools/external-search.ts
import { createTool } from "@mastra/core";
import { z } from "zod";
import { createOpenRouter } from "@openrouter/ai-sdk-provider";
import { generateText } from "ai";

const openrouter = createOpenRouter({
  apiKey: process.env.OPENROUTER_API_KEY!,
});

export const externalSearchTool = createTool({
  id: "external_search",
  description: "Search the web for information not in internal knowledge base",
  inputSchema: z.object({
    query: z.string().describe("The search query"),
  }),
  outputSchema: z.object({
    answer: z.string(),
    sources: z.array(z.string()),
  }),
  execute: async ({ input }) => {
    // Use Perplexity Sonar Pro for web search
    const result = await generateText({
      model: openrouter("perplexity/sonar-pro"),
      prompt: input.query,
    });

    // Extract citations from response
    const sources = extractUrls(result.text);

    return {
      answer: result.text,
      sources,
    };
  },
});

function extractUrls(text: string): string[] {
  const urlRegex = /https?:\/\/[^\s\)]+/g;
  return [...new Set(text.match(urlRegex) || [])];
}

The decision logic

Critical: Your agent must search internal knowledge first, only falling back to external when needed.

This prevents hallucinations and ensures your own content is prioritized.

// agent-instructions.ts
export const AGENT_INSTRUCTIONS = `
You are a portfolio assistant with access to internal knowledge and web search.

CRITICAL RULES:
1. ALWAYS use internal_knowledge_search FIRST for every query
2. ONLY use external_search if internal_knowledge_search returns NO results or insufficient results
3. NEVER use both tools for the same query
4. After calling a tool, generate a natural response in the user's language

WORKFLOW:
- User asks about projects/blog/experience → internal_knowledge_search → Answer from results
- User asks general question → internal_knowledge_search → If empty → external_search → Answer
- User asks recent/external info → internal_knowledge_search → If empty → external_search → Answer

When answering:
- Cite your sources (blog post title, project name, URL)
- If from web search, mention "According to [source]..."
- Be concise but informative
- Use the user's language (match their tone and language)
`;

Conversation memory and threads

Database schema for threads

Users expect conversations to persist. Don’t reset on every page refresh.

// schema.ts - Add these tables
export const conversationThreads = pgTable("conversation_threads", {
  id: uuid("id").primaryKey().defaultRandom(),
  userId: text("user_id").notNull(), // From your auth system
  title: text("title"), // Auto-generated from first message
  status: text("status").$type<"active" | "archived">().default("active"),
  createdAt: timestamp("created_at").defaultNow(),
  updatedAt: timestamp("updated_at").defaultNow(),
});

export const conversationMessages = pgTable("conversation_messages", {
  id: uuid("id").primaryKey().defaultRandom(),
  threadId: uuid("thread_id").references(() => conversationThreads.id),
  role: text("role").$type<"user" | "assistant" | "system">().notNull(),
  content: text("content").notNull(),
  isComplete: boolean("is_complete").default(true), // For streaming state
  toolCalls: jsonb("tool_calls").$type<
    Array<{
      toolName: string;
      args: any;
      result?: any;
    }>
  >(),
  createdAt: timestamp("created_at").defaultNow(),
});

// Optional: For refresh-tolerant streaming
export const messageChunks = pgTable("message_chunks", {
  id: uuid("id").primaryKey().defaultRandom(),
  messageId: uuid("message_id").references(() => conversationMessages.id),
  content: text("content").notNull(),
  sequenceNumber: integer("sequence_number").notNull(),
  createdAt: timestamp("created_at").defaultNow(),
});

Creating and loading threads

// api/threads.ts
import { db } from "./db";
import { conversationThreads, conversationMessages } from "./schema";
import { eq, desc } from "drizzle-orm";

export async function createThread(userId: string) {
  const [thread] = await db
    .insert(conversationThreads)
    .values({ userId })
    .returning();

  return thread;
}

export async function getThreads(userId: string) {
  return db.query.conversationThreads.findMany({
    where: eq(conversationThreads.userId, userId),
    orderBy: desc(conversationThreads.updatedAt),
  });
}

export async function getMessages(threadId: string) {
  return db.query.conversationMessages.findMany({
    where: eq(conversationMessages.threadId, threadId),
    orderBy: asc(conversationMessages.createdAt),
  });
}

export async function saveMessage(
  threadId: string,
  role: "user" | "assistant" | "system",
  content: string
) {
  const [message] = await db
    .insert(conversationMessages)
    .values({ threadId, role, content })
    .returning();

  // Update thread timestamp
  await db
    .update(conversationThreads)
    .set({ updatedAt: new Date() })
    .where(eq(conversationThreads.id, threadId));

  return message;
}

Streaming responses

Why database-backed streaming?

Traditional streaming sends chunks over HTTP. If the user refreshes, the stream is lost.

Database-backed streaming persists chunks as they’re generated. Benefits:

  • User can refresh mid-stream and see partial response
  • Multiple users can watch the same stream (collaborative)
  • Stream continues even if frontend disconnects

Implementation

// api/chat.ts
import { streamText } from "ai";
import { openrouter } from "@openrouter/ai-sdk-provider";
import { saveMessage } from "./threads";

export async function streamChatResponse(
  threadId: string,
  messages: Array<{ role: string; content: string }>
) {
  // Create assistant message shell
  const assistantMessage = await saveMessage(threadId, "assistant", "");
  await db
    .update(conversationMessages)
    .set({ isComplete: false })
    .where(eq(conversationMessages.id, assistantMessage.id));

  // Stream from LLM
  const result = await streamText({
    model: openrouter("anthropic/claude-3-haiku"),
    messages,
    tools: {
      internal_knowledge_search: internalKnowledgeSearchTool,
      external_search: externalSearchTool,
    },
  });

  // Buffer for chunking
  let buffer = "";
  let sequenceNumber = 0;
  const FLUSH_INTERVAL = 200; // ms
  let lastFlush = Date.now();

  // Process stream
  for await (const chunk of result.fullStream) {
    if (chunk.type === "text-delta") {
      buffer += chunk.textDelta;

      // Flush buffer periodically
      if (
        buffer.length >= 40 ||
        (buffer.length >= 20 && Date.now() - lastFlush >= FLUSH_INTERVAL)
      ) {
        await db.insert(messageChunks).values({
          messageId: assistantMessage.id,
          content: buffer,
          sequenceNumber: sequenceNumber++,
        });

        buffer = "";
        lastFlush = Date.now();
      }
    } else if (chunk.type === "tool-call") {
      // Store tool calls in message metadata
      await db
        .update(conversationMessages)
        .set({
          toolCalls: sql`COALESCE(tool_calls, '[]'::jsonb) || ${JSON.stringify({
            toolName: chunk.toolName,
            args: chunk.args,
          })}::jsonb`,
        })
        .where(eq(conversationMessages.id, assistantMessage.id));
    }
  }

  // Final flush
  if (buffer.length > 0) {
    await db.insert(messageChunks).values({
      messageId: assistantMessage.id,
      content: buffer,
      sequenceNumber: sequenceNumber++,
    });
  }

  // Mark complete
  const fullContent = await reconstructMessage(assistantMessage.id);
  await db
    .update(conversationMessages)
    .set({ content: fullContent, isComplete: true })
    .where(eq(conversationMessages.id, assistantMessage.id));

  return assistantMessage;
}

async function reconstructMessage(messageId: string) {
  const chunks = await db.query.messageChunks.findMany({
    where: eq(messageChunks.messageId, messageId),
    orderBy: asc(messageChunks.sequenceNumber),
  });

  return chunks.map((c) => c.content).join("");
}

Tool orchestration

Building the agent

Now combine everything into a Mastra agent:

// agent.ts
import { Agent } from "@mastra/core";
import { PostgresStore } from "@mastra/postgres";
import { internalKnowledgeSearchTool } from "./tools/internal-search";
import { externalSearchTool } from "./tools/external-search";

export const knowledgeAgent = new Agent({
  name: "knowledge-agent",
  instructions: AGENT_INSTRUCTIONS,
  model: {
    provider: "openrouter",
    name: "anthropic/claude-3-haiku",
    toolChoice: "auto",
  },
  tools: {
    internal_knowledge_search: internalKnowledgeSearchTool,
    external_search: externalSearchTool,
  },
});

Live implementation: Knowledge agent

Runtime context (optional)

If you need multi-tenancy (e.g., different knowledge bases per user/organization):

// agent-with-context.ts
export const knowledgeAgent = new Agent({
  name: "knowledge-agent",
  instructions: async ({ runtimeContext }) => {
    const pathPrefix = runtimeContext.get("pathPrefix");

    return `
    You are a portfolio assistant.
    ${
      pathPrefix ? `You are searching only documents from "${pathPrefix}".` : ""
    }

    CRITICAL RULES:
    1. ALWAYS use internal_knowledge_search FIRST
    2. ONLY use external_search if internal returns NO results
    ...
    `;
  },
  model: {
    provider: "openrouter",
    name: "anthropic/claude-3-haiku",
  },
  tools: {
    internal_knowledge_search: internalKnowledgeSearchTool,
    external_search: externalSearchTool,
  },
});

// Usage with runtime context
const runtimeContext = new RuntimeContext();
runtimeContext.set("pathPrefix", "portfolio");

const response = await knowledgeAgent.generate(messages, {
  runtimeContext,
});

Building the frontend

AI elements

The frontend uses AI Elements - pre-built React components from Vercel specifically designed for AI chat interfaces. These components handle common patterns like:

  • Message rendering with streaming support
  • Markdown formatting with code highlighting
  • Auto-scrolling conversation containers
  • Input components with file attachments
  • Loading states and indicators

AI Elements integrates seamlessly with the AI SDK’s useChat hook and provides a polished UI out of the box. Instead of building chat components from scratch, you get production-ready components that handle edge cases like scroll anchoring, streaming text display, and accessibility.

Bonus: my implementation includes custom enhancements like source extraction, contextual suggestions, and tool-aware loading states built on top of AI Elements.

Chat component

// components/chat.tsx
"use client";

import { useState } from "react";
import { useQuery, useMutation } from "@tanstack/react-query";

export function Chat({ threadId }: { threadId?: string }) {
  const [input, setInput] = useState("");

  // Load messages
  const { data: messages, refetch } = useQuery({
    queryKey: ["messages", threadId],
    queryFn: () =>
      fetch(`/api/threads/${threadId}/messages`).then((r) => r.json()),
    enabled: !!threadId,
    refetchInterval: 500, // Poll during streaming
  });

  // Send message
  const sendMessage = useMutation({
    mutationFn: async (content: string) => {
      const response = await fetch(`/api/threads/${threadId}/messages`, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ content }),
      });
      return response.json();
    },
    onSuccess: () => {
      refetch();
      setInput("");
    },
  });

  const isStreaming = messages?.some((m: any) => !m.isComplete);

  return (
    <div className="flex flex-col h-screen">
      {/* Messages */}
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages?.map((message: any) => (
          <div
            key={message.id}
            className={`flex ${
              message.role === "user" ? "justify-end" : "justify-start"
            }`}
          >
            <div
              className={`max-w-2xl rounded-lg p-4 ${
                message.role === "user"
                  ? "bg-blue-500 text-white"
                  : "bg-gray-100 text-gray-900"
              }`}
            >
              {message.content}
              {!message.isComplete && (
                <span className="animate-pulse ml-1">▋</span>
              )}
            </div>
          </div>
        ))}
      </div>

      {/* Input */}
      <div className="border-t p-4">
        <form
          onSubmit={(e) => {
            e.preventDefault();
            if (input.trim()) sendMessage.mutate(input);
          }}
          className="flex gap-2"
        >
          <input
            type="text"
            value={input}
            onChange={(e) => setInput(e.target.value)}
            disabled={isStreaming}
            placeholder="Ask me anything..."
            className="flex-1 border rounded-lg px-4 py-2"
          />
          <button
            type="submit"
            disabled={isStreaming || !input.trim()}
            className="bg-blue-500 text-white px-6 py-2 rounded-lg disabled:opacity-50"
          >
            Send
          </button>
        </form>
      </div>
    </div>
  );
}

Thread sidebar

// components/thread-sidebar.tsx
"use client";

import { useQuery } from "@tanstack/react-query";
import { useRouter } from "next/navigation";

export function ThreadSidebar({
  currentThreadId,
}: {
  currentThreadId?: string;
}) {
  const router = useRouter();

  const { data: threads } = useQuery({
    queryKey: ["threads"],
    queryFn: () => fetch("/api/threads").then((r) => r.json()),
  });

  return (
    <aside className="w-64 border-r bg-gray-50 p-4">
      <button
        onClick={() => router.push("/chat")}
        className="w-full bg-blue-500 text-white rounded-lg py-2 mb-4"
      >
        New Chat
      </button>

      <div className="space-y-2">
        {threads?.map((thread: any) => (
          <button
            key={thread.id}
            onClick={() => router.push(`/chat?threadId=${thread.id}`)}
            className={`w-full text-left p-3 rounded-lg hover:bg-gray-200 ${
              thread.id === currentThreadId ? "bg-gray-200" : ""
            }`}
          >
            <div className="font-medium truncate">
              {thread.title || "New Chat"}
            </div>
            <div className="text-xs text-gray-500">
              {new Date(thread.updatedAt).toLocaleDateString()}
            </div>
          </button>
        ))}
      </div>
    </aside>
  );
}

What I learned

1. Internal-first prevents hallucinations

Always search your own content before hitting the web. This prevents outdated information, contradictions with your documented positions, and missed opportunities to cite your own content.

Pattern: internal_knowledge_search → check results → if empty → external_search

2. Tool orchestration requires clear decision logic

Don’t leave tool selection to chance. Explicit instructions prevent using both internal and external search for the same query (wasteful), skipping internal knowledge search (defeats RAG purpose), and infinite tool loops.

Solution: Write explicit rules in agent instructions, validate in testing.

3. Rate limiting and observability are essential

Production RAG systems need rate limiting (10 req/10sec per user with Upstash Redis), cost tracking (embedding API costs add up), tool analytics (which tools are called, success rates), and error alerting (failed embeddings, search errors).


Try it yourself

The chat widget is embedded on this site. Click the chat bubble in the bottom-right corner to try it live. Ask about:

  • “What projects have you built with AI?”
  • “Tell me about your blog on Mastra”
  • “What’s your experience with TypeScript?”
  • “Where have you traveled?”

Watch how it searches internal content first, shows sources, and generates contextual follow-up suggestions.


Source code

Full implementation: GitHub Repository

Key files:


Questions? Try the chat widget on this site or find me on Twitter/X.

Talk to my portfolio