Phorm.ai
run-llama/llama_index

    can i use postgres as docstore? to use the document management functionallity ( upserts, refresh, etc)

    Yes, you can use PostgreSQL as a document store to utilize document management functionality such as upserts, refresh, etc., in the context of LlamaIndex. The PostgresDocumentStore class provides an interface for interacting with a PostgreSQL database as a document store. This class is built on top of the PostgresKVStore class, which is a key-value store implementation using PostgreSQL. The PostgresDocumentStore supports operations like inserting, updating (upserts), and deleting documents, among others.

    To use PostgreSQL as a document store, you can initialize a PostgresDocumentStore instance by providing connection details to your PostgreSQL database. You can do this either by using the from_uri method with a PostgreSQL URI or the from_params method with individual connection parameters such as host, port, database, user, and password.

    Here's an example of how to initialize a PostgresDocumentStore using the from_uri method:

    from llama_index.storage.docstore.postgres import PostgresDocumentStore # Initialize PostgresDocumentStore from a URI docstore = PostgresDocumentStore.from_uri( uri="postgresql://user:password@localhost:5432/mydatabase", table_name="docstore", schema_name="public", perform_setup=True, debug=False, use_jsonb=True )

    And here's an example using the from_params method:

    from llama_index.storage.docstore.postgres import PostgresDocumentStore # Initialize PostgresDocumentStore from connection parameters docstore = PostgresDocumentStore.from_params( host="localhost", port="5432", database="mydatabase", user="user", password="password", table_name="docstore", schema_name="public", perform_setup=True, debug=False, use_jsonb=True )

    Once initialized, you can use the PostgresDocumentStore instance to perform document management operations such as inserting, updating, and deleting documents in your PostgreSQL database.

    For more details on the PostgresDocumentStore and its methods, refer to the PostgresDocumentStore.from_uri and PostgresDocumentStore.from_params documentation.

    Search results
    [run-llama/llama_index] docs/examples/vector_stores/postgres.ipynb
    {
     "cells": [
      {
       "attachments": {},
       "cell_type": "markdown",
       "id": "bccd47fc",
       "metadata": {},
       "source": [
        "<a href=\"https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/vector_stores/postgres.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "db0855d0",
       "metadata": {},
       "source": [
        "# Postgres Vector Store\n",
        "In this notebook we are going to show how to use [Postgresql](https://www.postgresql.org) and  [pgvector](https://github.com/pgvector/pgvector)  to perform vector searches in LlamaIndex"
       ]
      },
      {
       "attachments": {},
       "cell_type": "markdown",
       "id": "e4f33fc9",
       "metadata": {},
       "source": [
        "If you're opening this Notebook on colab, you will probably need to install LlamaIndex 🦙."
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "d2fc9c18",
       "metadata": {},
       "outputs": [],
       "source": [
        "%pip install llama-index-vector-stores-postgres"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "712daea5",
       "metadata": {},
       "outputs": [],
       "source": [
        "!pip install llama-index"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "c2d1c538",
       "metadata": {},
       "outputs": [],
       "source": [
        "# import logging\n",
        "# import sys\n",
        "\n",
        "# Uncomment to see debug logs\n",
        "# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)\n",
        "# logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))\n",
        "\n",
        "from llama_index.core import SimpleDirectoryReader, StorageContext\n",
        "from llama_index.core import VectorStoreIndex\n",
        "from llama_index.vector_stores.postgres import PGVectorStore\n",
        "import textwrap\n",
        "import openai"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "26c71b6d",
       "metadata": {},
       "source": [
        "### Setup OpenAI\n",
        "The first step is to configure the openai key. It will be used to created embeddings for the documents loaded into the index"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "67b86621",
       "metadata": {},
       "outputs": [],
       "source": [
        "import os\n",
        "\n",
        "os.environ[\"OPENAI_API_KEY\"] = \"<your key>\"\n",
        "openai.api_key = os.environ[\"OPENAI_API_KEY\"]"
       ]
      },
      {
       "attachments": {},
       "cell_type": "markdown",
       "id": "eecf4bd5",
       "metadata": {},
       "source": [
        "Download Data"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "6df9fa89",
       "metadata": {},
       "outputs": [],
       "source": [
        "!mkdir -p 'data/paul_graham/'\n",
        "!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt' -O 'data/paul_graham/paul_graham_essay.txt'"
       ]
      },
      {
       "attachments": {},
       "cell_type": "markdown",
       "id": "f7010b1d-d1bb-4f08-9309-a328bb4ea396",
       "metadata": {},
       "source": [
        "### Loading documents\n",
        "Load the documents stored in the `data/paul_graham/` using the SimpleDirectoryReader"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "c154dd4b",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "Document ID: 88efac05-2277-4eda-a94c-c9247c9aca1c\n"
         ]
        }
       ],
       "source": [
        "documents = SimpleDirectoryReader(\"./data/paul_graham\").load_data()\n",
        "print(\"Document ID:\", documents[0].doc_id)"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "7bd24f0a",
       "metadata": {},
       "source": [
        "### Create the Database\n",
        "Using an existing postgres running at localhost, create the database we'll be using."
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "e6d61e73",
       "metadata": {},
       "outputs": [],
       "source": [
        "import psycopg2\n",
        "\n",
        "connection_string = \"postgresql://postgres:password@localhost:5432\"\n",
        "db_name = \"vector_db\"\n",
        "conn = psycopg2.connect(connection_string)\n",
        "conn.autocommit = True\n",
        "\n",
        "with conn.cursor() as c:\n",
        "    c.execute(f\"DROP DATABASE IF EXISTS {db_name}\")\n",
        "    c.execute(f\"CREATE DATABASE {db_name}\")"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "c0232fd1",
       "metadata": {},
       "source": [
        "### Create the index\n",
        "Here we create an index backed by Postgres using the documents loaded previously. PGVectorStore takes a few arguments."
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "8731da62",
       "metadata": {},
       "outputs": [
        {
         "data": {
          "application/vnd.jupyter.widget-view+json": {
           "model_id": "40b4c3becfc64c5184360b8b8e81ca9a",
           "version_major": 2,
           "version_minor": 0
          },
          "text/plain": [
           "Parsing nodes:   0%|          | 0/1 [00:00<?, ?it/s]"
          ]
         },
         "metadata": {},
         "output_type": "display_data"
        },
        {
         "data": {
          "application/vnd.jupyter.widget-view+json": {
           "model_id": "37763ad7b17f4481a7e67df379304d31",
           "version_major": 2,
           "version_minor": 0
          },
          "text/plain": [
           "Generating embeddings:   0%|          | 0/22 [00:00<?, ?it/s]"
          ]
         },
         "metadata": {},
         "output_type": "display_data"
        }
       ],
       "source": [
        "from sqlalchemy import make_url\n",
        "\n",
        "url = make_url(connection_string)\n",
        "vector_store = PGVectorStore.from_params(\n",
        "    database=db_name,\n",
        "    host=url.host,\n",
        "    password=url.password,\n",
        "    port=url.port,\n",
        "    user=url.username,\n",
        "    table_name=\"paul_graham_essay\",\n",
        "    embed_dim=1536,  # openai embedding dimension\n",
        ")\n",
        "\n",
        "storage_context = StorageContext.from_defaults(vector_store=vector_store)\n",
        "index = VectorStoreIndex.from_documents(\n",
        "    documents, storage_context=storage_context, show_progress=True\n",
        ")\n",
        "query_engine = index.as_query_engine()"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "8ee4473a-094f-4d0a-a825-e1213db07240",
       "metadata": {},
       "source": [
        "### Query the index\n",
        "We can now ask questions using our index."
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "0a2bcc07",
       "metadata": {},
       "outputs": [],
       "source": [
        "response = query_engine.query(\"What did the author do?\")"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "8cf55bf7",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "The author worked on writing and programming before college. Initially, the author wrote short\n",
          "stories and later started programming on an IBM 1401 using an early version of Fortran. The author\n",
          "then transitioned to working with microcomputers, building a computer kit and eventually getting a\n",
          "TRS-80 to further explore programming. In college, the author initially planned to study philosophy\n",
          "but switched to studying AI due to a lack of interest in philosophy courses. The author was inspired\n",
          "to work on AI after encountering works like Heinlein's novel \"The Moon is a Harsh Mistress\" and\n",
          "seeing Terry Winograd using SHRDLU in a PBS documentary.\n"
         ]
        }
       ],
       "source": [
        "print(textwrap.fill(str(response), 100))"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "68cbd239-880e-41a3-98d8-dbb3fab55431",
       "metadata": {},
       "outputs": [],
       "source": [
        "response = query_engine.query(\"What happened in the mid 1980s?\")"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "fdf5287f",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "In the mid-1980s, the author spent a significant amount of time working on a book called \"On Lisp\"\n",
          "and had obtained a contract to publish it. They were paid large amounts of money for their work,\n",
          "which allowed them to save enough to go back to RISD (Rhode Island School of Design) and pay off\n",
          "their college loans. They also learned valuable lessons during this time, such as the importance of\n",
          "having technology companies run by product people rather than sales people, the drawbacks of editing\n",
          "code by too many people, and the significance of being the \"entry level\" option in a competitive\n",
          "market.\n"
         ]
        }
       ],
       "source": [
        "print(textwrap.fill(str(response), 100))"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "b3bed9e1",
       "metadata": {},
       "source": [
        "### Querying existing index"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "e6b2634b",
       "metadata": {},
       "outputs": [],
       "source": [
        "vector_store = PGVectorStore.from_params(\n",
        "    database=\"vector_db\",\n",
        "    host=\"localhost\",\n",
        "    password=\"password\",\n",
        "    port=5432,\n",
        "    user=\"postgres\",\n",
        "    table_name=\"paul_graham_essay\",\n",
        "    embed_dim=1536,  # openai embedding dimension\n",
        ")\n",
        "\n",
        "index = VectorStoreIndex.from_vector_store(vector_store=vector_store)\n",
        "query_engine = index.as_query_engine()"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "e7075af3-156e-4bde-8f76-6d9dee86861f",
       "metadata": {},
       "outputs": [],
       "source": [
        "response = query_engine.query(\"What did the author do?\")"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "b088c090",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "The author worked on writing and programming before college. They wrote short stories and tried\n",
          "writing programs on an IBM 1401 computer. They also built a microcomputer and started programming on\n",
          "it, writing simple games and a word processor. In college, the author initially planned to study\n",
          "philosophy but switched to AI due to their interest in intelligent computers. They taught themselves\n",
          "AI by learning Lisp.\n"
         ]
        }
       ],
       "source": [
        "print(textwrap.fill(str(response), 100))"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "55745895-8f01-4275-abaa-b2ebef2cb4c7",
       "metadata": {},
       "source": [
        "### Hybrid Search  "
       ]
      },
      {
       "cell_type": "markdown",
       "id": "91cae40f-3cd4-4403-8af4-aca2705e96a2",
       "metadata": {},
       "source": [
        "To enable hybrid search, you need to:\n",
        "1. pass in `hybrid_search=True` when constructing the `PGVectorStore` (and optionally configure `text_search_config` with the desired language)\n",
        "2. pass in `vector_store_query_mode=\"hybrid\"` when constructing the query engine (this config is passed to the retriever under the hood). You can also optionally set the `sparse_top_k` to configure how many results we should obtain from sparse text search (default is using the same value as `similarity_top_k`). "
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "65a7e133-39da-40c5-b2c5-7af2c0a3a792",
       "metadata": {},
       "outputs": [],
       "source": [
        "from sqlalchemy import make_url\n",
        "\n",
        "url = make_url(connection_string)\n",
        "hybrid_vector_store = PGVectorStore.from_params(\n",
        "    database=db_name,\n",
        "    host=url.host,\n",
        "    password=url.password,\n",
        "    port=url.port,\n",
        "    user=url.username,\n",
        "    table_name=\"paul_graham_essay_hybrid_search\",\n",
        "    embed_dim=1536,  # openai embedding dimension\n",
        "    hybrid_search=True,\n",
        "    text_search_config=\"english\",\n",
        ")\n",
        "\n",
        "storage_context = StorageContext.from_defaults(\n",
        "    vector_store=hybrid_vector_store\n",
        ")\n",
        "hybrid_index = VectorStoreIndex.from_documents(\n",
        "    documents, storage_context=storage_context\n",
        ")"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "6f8edee4-6c19-4d99-b602-110bdc5708e5",
       "metadata": {},
       "outputs": [],
       "source": [
        "hybrid_query_engine = hybrid_index.as_query_engine(\n",
        "    vector_store_query_mode=\"hybrid\", sparse_top_k=2\n",
        ")\n",
        "hybrid_response = hybrid_query_engine.query(\n",
        "    \"Who does Paul Graham think of with the word schtick\"\n",
        ")"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "bd454b25-b66c-4733-8ff4-24fb2ee84cec",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "Roy Lichtenstein\n"
         ]
        }
       ],
       "source": [
        "print(hybrid_response)"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "2e5e8083",
       "metadata": {},
       "source": [
        "### Metadata filters\n",
        "\n",
        "PGVectorStore supports storing metadata in nodes, and filtering based on that metadata during the retrieval step."
       ]
      },
      {
       "cell_type": "markdown",
       "id": "2d0ad3fc",
       "metadata": {},
       "source": [
        "#### Download git commits dataset"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "63e90a89",
       "metadata": {},
       "outputs": [],
       "source": [
        "!mkdir -p 'data/git_commits/'\n",
        "!wget 'https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/csv/commit_history.csv' -O 'data/git_commits/commit_history.csv'"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "fef41f44",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "{'commit': '44e41c12ab25e36c202f58e068ced262eadc8d16', 'author': 'Lakshmi Narayanan Sreethar<lakshmi@timescale.com>', 'date': 'Tue Sep 5 21:03:21 2023 +0530', 'change summary': 'Fix segfault in set_integer_now_func', 'change details': 'When an invalid function oid is passed to set_integer_now_func, it finds out that the function oid is invalid but before throwing the error, it calls ReleaseSysCache on an invalid tuple causing a segfault. Fixed that by removing the invalid call to ReleaseSysCache.  Fixes #6037 '}\n",
          "4167\n"
         ]
        }
       ],
       "source": [
        "import csv\n",
        "\n",
        "with open(\"data/git_commits/commit_history.csv\", \"r\") as f:\n",
        "    commits = list(csv.DictReader(f))\n",
        "\n",
        "print(commits[0])\n",
        "print(len(commits))"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "3b0d9f47",
       "metadata": {},
       "source": [
        "#### Add nodes with custom metadata"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "3920109b",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "Node ID: e084ffbd-24e0-4bd9-b7c8-287fe1abd85d\n",
          "Text: Fix segfault in set_integer_now_func  When an invalid function\n",
          "oid is passed to set_integer_now_func, it finds out that the function\n",
          "oid is invalid but before throwing the error, it calls ReleaseSysCache\n",
          "on an invalid tuple causing a segfault. Fixed that by removing the\n",
          "invalid call to ReleaseSysCache.  Fixes #6037\n",
          "2023-03-22 to 2023-09-05\n",
          "{'36882414+akuzm@users.noreply.github.com', 'erik@timescale.com', 'konstantina@timescale.com', 'mats@timescale.com', 'nikhil@timescale.com', 'dmitry@timescale.com', 'jguthrie@timescale.com', 'rafia.sabih@gmail.com', 'engel@sero-systems.de', 'satish.8483@gmail.com', 'me@noctarius.com', 'sven@timescale.com', 'jan@timescale.com', 'lakshmi@timescale.com', 'fabriziomello@gmail.com'}\n"
         ]
        }
       ],
       "source": [
        "# Create TextNode for each of the first 100 commits\n",
        "from llama_index.core.schema import TextNode\n",
        "from datetime import datetime\n",
        "\n",
        "nodes = []\n",
        "dates = set()\n",
        "authors = set()\n",
        "for commit in commits[:100]:\n",
        "    author_email = commit[\"author\"].split(\"<\")[1][:-1]\n",
        "    commit_date = datetime.strptime(\n",
        "        commit[\"date\"], \"%a %b %d %H:%M:%S %Y %z\"\n",
        "    ).strftime(\"%Y-%m-%d\")\n",
        "    commit_text = commit[\"change summary\"]\n",
        "    if commit[\"change details\"]:\n",
        "        commit_text += \"\\n\\n\" + commit[\"change details\"]\n",
        "    nodes.append(\n",
        "        TextNode(\n",
        "            text=commit_text,\n",
        "            metadata={\n",
        "                \"commit_date\": commit_date,\n",
        "                \"author\": author_email,\n",
        "            },\n",
        "        )\n",
        "    )\n",
        "    dates.add(commit_date)\n",
        "    authors.add(author_email)\n",
        "\n",
        "print(nodes[0])\n",
        "print(min(dates), \"to\", max(dates))\n",
        "print(authors)"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "a638f76a",
       "metadata": {},
       "outputs": [],
       "source": [
        "vector_store = PGVectorStore.from_params(\n",
        "    database=db_name,\n",
        "    host=url.host,\n",
        "    password=url.password,\n",
        "    port=url.port,\n",
        "    user=url.username,\n",
        "    table_name=\"metadata_filter_demo3\",\n",
        "    embed_dim=1536,  # openai embedding dimension\n",
        ")\n",
        "\n",
        "index = VectorStoreIndex.from_vector_store(vector_store=vector_store)\n",
        "index.insert_nodes(nodes)"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "15f7cf45",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "Lakshmi fixed the segfault by removing the invalid call to ReleaseSysCache that was causing the issue.\n"
         ]
        }
       ],
       "source": [
        "print(index.as_query_engine().query(\"How did Lakshmi fix the segfault?\"))"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "7ab03ed4",
       "metadata": {},
       "source": [
        "#### Apply metadata filters\n",
        "\n",
        "Now we can filter by commit author or by date when retrieving nodes."
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "aa6212e7",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "{'commit_date': '2023-08-07', 'author': 'mats@timescale.com'}\n",
          "{'commit_date': '2023-08-07', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-15', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-23', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-07-13', 'author': 'mats@timescale.com'}\n",
          "{'commit_date': '2023-08-27', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-21', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-30', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-10', 'author': 'mats@timescale.com'}\n",
          "{'commit_date': '2023-08-20', 'author': 'sven@timescale.com'}\n"
         ]
        }
       ],
       "source": [
        "from llama_index.core.vector_stores.types import (\n",
        "    MetadataFilter,\n",
        "    MetadataFilters,\n",
        ")\n",
        "\n",
        "filters = MetadataFilters(\n",
        "    filters=[\n",
        "        MetadataFilter(key=\"author\", value=\"mats@timescale.com\"),\n",
        "        MetadataFilter(key=\"author\", value=\"sven@timescale.com\"),\n",
        "    ],\n",
        "    condition=\"or\",\n",
        ")\n",
        "\n",
        "retriever = index.as_retriever(\n",
        "    similarity_top_k=10,\n",
        "    filters=filters,\n",
        ")\n",
        "\n",
        "retrieved_nodes = retriever.retrieve(\"What is this software project about?\")\n",
        "\n",
        "for node in retrieved_nodes:\n",
        "    print(node.node.metadata)"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "67c19ec6",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "{'commit_date': '2023-08-23', 'author': 'erik@timescale.com'}\n",
          "{'commit_date': '2023-08-15', 'author': '36882414+akuzm@users.noreply.github.com'}\n",
          "{'commit_date': '2023-08-17', 'author': 'konstantina@timescale.com'}\n",
          "{'commit_date': '2023-08-15', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-23', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-15', 'author': '36882414+akuzm@users.noreply.github.com'}\n",
          "{'commit_date': '2023-08-21', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-24', 'author': 'lakshmi@timescale.com'}\n",
          "{'commit_date': '2023-08-16', 'author': '36882414+akuzm@users.noreply.github.com'}\n",
          "{'commit_date': '2023-08-20', 'author': 'sven@timescale.com'}\n"
         ]
        }
       ],
       "source": [
        "filters = MetadataFilters(\n",
        "    filters=[\n",
        "        MetadataFilter(key=\"commit_date\", value=\"2023-08-15\", operator=\">=\"),\n",
        "        MetadataFilter(key=\"commit_date\", value=\"2023-08-25\", operator=\"<=\"),\n",
        "    ],\n",
        "    condition=\"and\",\n",
        ")\n",
        "\n",
        "retriever = index.as_retriever(\n",
        "    similarity_top_k=10,\n",
        "    filters=filters,\n",
        ")\n",
        "\n",
        "retrieved_nodes = retriever.retrieve(\"What is this software project about?\")\n",
        "\n",
        "for node in retrieved_nodes:\n",
        "    print(node.node.metadata)"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "4f6e9cdf",
       "metadata": {},
       "source": [
        "#### Apply nested filters\n",
        "\n",
        "In the above examples, we combined multiple filters using AND or OR. We can also combine multiple sets of filters.\n",
        "\n",
        "e.g. in SQL:\n",
        "```sql\n",
        "WHERE (commit_date >= '2023-08-01' AND commit_date <= '2023-08-15') AND (author = 'mats@timescale.com' OR author = 'sven@timescale.com')\n",
        "```"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "94f20be7",
       "metadata": {},
       "outputs": [
        {
         "name": "stdout",
         "output_type": "stream",
         "text": [
          "{'commit_date': '2023-08-07', 'author': 'mats@timescale.com'}\n",
          "{'commit_date': '2023-08-07', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-15', 'author': 'sven@timescale.com'}\n",
          "{'commit_date': '2023-08-10', 'author': 'mats@timescale.com'}\n"
         ]
        }
       ],
       "source": [
        "filters = MetadataFilters(\n",
        "    filters=[\n",
        "        MetadataFilters(\n",
        "            filters=[\n",
        "                MetadataFilter(\n",
        "                    key=\"commit_date\", value=\"2023-08-01\", operator=\">=\"\n",
        "                ),\n",
        "                MetadataFilter(\n",
        "                    key=\"commit_date\", value=\"2023-08-15\", operator=\"<=\"\n",
        "                ),\n",
        "            ],\n",
        "            condition=\"and\",\n",
        "        ),\n",
        "        MetadataFilters(\n",
        "            filters=[\n",
        "                MetadataFilter(key=\"author\", value=\"mats@timescale.com\"),\n",
        "                MetadataFilter(key=\"author\", value=\"sven@timescale.com\"),\n",
        "            ],\n",
        "            condition=\"or\",\n",
        "        ),\n",
        "    ],\n",
        "    condition=\"and\",\n",
        ")\n",
        "\n",
        "retriever = index.as_retriever(\n",
        "    similarity_top_k=10,\n",
        "    filters=filters,\n",
        ")\n",
        "\n",
        "retrieved_nodes = retriever.retrieve(\"What is this software project about?\")\n",
        "\n",
        "for node in retrieved_nodes:\n",
        "    print(node.node.metadata)"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "2b274ecb",
       "metadata": {},
       "source": [
        "### PgVector Query Options"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "a490a0fa",
       "metadata": {},
       "source": [
        "#### IVFFlat Probes\n",
        "\n",
        "Specify the number of [IVFFlat probes](https://github.com/pgvector/pgvector?tab=readme-ov-file#query-options) (1 by default)\n",
        "\n",
        "When retrieving from the index, you can specify an appropriate number of IVFFlat probes (higher is better for recall, lower is better for speed)"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "111a3682",
       "metadata": {},
       "outputs": [],
       "source": [
        "retriever = index.as_retriever(\n",
        "    vector_store_query_mode=query_mode,\n",
        "    similarity_top_k=top_k,\n",
        "    vector_store_kwargs={\"ivfflat_probes\": 10},\n",
        ")"
       ]
      },
      {
       "cell_type": "markdown",
       "id": "6104ef8d",
       "metadata": {},
       "source": [
        "#### HNSW EF Search\n",
        "\n",
        "Specify the size of the dynamic [candidate list](https://github.com/pgvector/pgvector?tab=readme-ov-file#query-options-1) for search (40 by default)"
       ]
      },
      {
       "cell_type": "code",
       "execution_count": null,
       "id": "f3a44758",
       "metadata": {},
       "outputs": [],
       "source": [
        "retriever = index.as_retriever(\n",
        "    vector_store_query_mode=query_mode,\n",
        "    similarity_top_k=top_k,\n",
        "    vector_store_kwargs={\"hnsw_ef_search\": 300},\n",
        ")"
       ]
      }
     ],
     "metadata": {
      "kernelspec": {
       "display_name": "Python 3 (ipykernel)",
       "language": "python",
       "name": "python3"
      },
      "language_info": {
       "codemirror_mode": {
        "name": "ipython",
        "version": 3
       },
       "file_extension": ".py",
       "mimetype": "text/x-python",
       "name": "python",
       "nbconvert_exporter": "python",
       "pygments_lexer": "ipython3"
      }
     },
     "nbformat": 4,
     "nbformat_minor": 5
    }
    
    
    [run-llama/llama_index] llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/llama_index/vector_stores/postgres/base.py
    def _to_postgres_operator(self, operator: FilterOperator) -> str: if operator == FilterOperator.EQ: return "=" elif operator == FilterOperator.GT: return ">" elif operator == FilterOperator.LT: return "<" elif operator == FilterOperator.NE: return "!=" elif operator == FilterOperator.GTE: return ">=" elif operator == FilterOperator.LTE: return "<=" elif operator == FilterOperator.IN: return "@>" else: _logger.warning(f"Unknown operator: {operator}, fallback to '='") return "="
    [run-llama/llama_index] docs/module_guides/loading/ingestion_pipeline/root.md

    Document Management

    Attaching a docstore to the ingestion pipeline will enable document management.

    Using the document.doc_id or node.ref_doc_id as a grounding point, the ingestion pipeline will actively look for duplicate documents.

    It works by:

    • Storing a map of doc_id -> document_hash
    • If a vector store is attached:
      • If a duplicate doc_id is detected, and the hash has changed, the document will be re-processed and upserted
      • If a duplicate doc_id is detected and the hash is unchanged, the node is skipped
    • If only a vector store is not attached:
      • Checks all existing hashes for each node
      • If a duplicate is found, the node is skipped
      • Otherwise, the node is processed

    NOTE: If we do not attach a vector store, we can only check for and remove duplicate inputs.

    from llama_index.core.ingestion import IngestionPipeline from llama_index.core.storage.docstore import SimpleDocumentStore pipeline = IngestionPipeline( transformations=[...], docstore=SimpleDocumentStore() )

    A full walkthrough is found in our demo notebook.

    Also check out another guide using Redis as our entire ingestion stack.

    [run-llama/llama_index] llama-index-integrations/storage/docstore/llama-index-storage-docstore-postgres/llama_index/storage/docstore/postgres/base.py
    def from_uri( cls, uri: str, namespace: Optional[str] = None, table_name: str = "docstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresDocumentStore": """Load a PostgresDocumentStore from a Postgres URI.""" postgres_kvstore = PostgresKVStore.from_uri( uri=uri, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) return cls(postgres_kvstore, namespace)
    [run-llama/llama_index] llama-index-core/llama_index/core/storage/docstore/postgres_docstore.py
    def from_uri( cls, uri: str, namespace: Optional[str] = None, table_name: str = "docstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresDocumentStore": """Load a PostgresDocumentStore from a Postgres URI.""" postgres_kvstore = PostgresKVStore.from_uri( uri=uri, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) return cls(postgres_kvstore, namespace)
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/storage/docstore/postgres_docstore.py
    def from_uri( cls, uri: str, namespace: Optional[str] = None, table_name: str = "docstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresDocumentStore": """Load a PostgresDocumentStore from a Postgres URI.""" postgres_kvstore = PostgresKVStore.from_uri( uri=uri, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) return cls(postgres_kvstore, namespace)
    [run-llama/llama_index] llama-index-core/llama_index/core/storage/docstore/postgres_docstore.py
    def from_params( cls, host: Optional[str] = None, port: Optional[str] = None, database: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, namespace: Optional[str] = None, table_name: str = "docstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresDocumentStore": """Load a PostgresDocumentStore from a Postgres host and port.""" postgres_kvstore = PostgresKVStore.from_params( host=host, port=port, database=database, user=user, password=password, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) return cls(postgres_kvstore, namespace)
    [run-llama/llama_index] llama-index-integrations/storage/docstore/llama-index-storage-docstore-postgres/llama_index/storage/docstore/postgres/base.py
    def from_params( cls, host: Optional[str] = None, port: Optional[str] = None, database: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, namespace: Optional[str] = None, table_name: str = "docstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresDocumentStore": """Load a PostgresDocumentStore from a Postgres host and port.""" postgres_kvstore = PostgresKVStore.from_params( host=host, port=port, database=database, user=user, password=password, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) return cls(postgres_kvstore, namespace)
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/storage/docstore/postgres_docstore.py
    def from_params( cls, host: Optional[str] = None, port: Optional[str] = None, database: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, namespace: Optional[str] = None, table_name: str = "docstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresDocumentStore": """Load a PostgresDocumentStore from a Postgres host and port.""" postgres_kvstore = PostgresKVStore.from_params( host=host, port=port, database=database, user=user, password=password, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) return cls(postgres_kvstore, namespace)
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/vector_stores/postgres.py
    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: import sqlalchemy self._initialize() with self._session() as session, session.begin(): stmt = sqlalchemy.text( f"DELETE FROM {self.schema_name}.data_{self.table_name} where " f"(metadata_->>'doc_id')::text = '{ref_doc_id}' " ) session.execute(stmt) session.commit()
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/storage/kvstore/postgres_kvstore.py
    class PostgresKVStore(BaseKVStore): """Postgres Key-Value store. Args: mongo_client (Any): MongoDB client uri (Optional[str]): MongoDB URI host (Optional[str]): MongoDB host port (Optional[int]): MongoDB port db_name (Optional[str]): MongoDB database name """ connection_string: str async_connection_string: str table_name: str schema_name: str perform_setup: bool debug: bool use_jsonb: bool def __init__( self, connection_string: str, async_connection_string: str, table_name: str, schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> None: try: import asyncpg # noqa import psycopg2 # noqa import sqlalchemy import sqlalchemy.ext.asyncio # noqa except ImportError: raise ImportError( "`sqlalchemy[asyncio]`, `psycopg2-binary` and `asyncpg` " "packages should be pre installed" ) table_name = table_name.lower() schema_name = schema_name.lower() self.connection_string = connection_string self.async_connection_string = async_connection_string self.table_name = table_name self.schema_name = schema_name self.perform_setup = perform_setup self.debug = debug self.use_jsonb = use_jsonb self._is_initialized = False from sqlalchemy.orm import declarative_base # sqlalchemy model self._base = declarative_base() self._table_class = get_data_model( self._base, table_name, schema_name, use_jsonb=use_jsonb, ) @classmethod def from_params( cls, host: Optional[str] = None, port: Optional[str] = None, database: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, table_name: str = "kvstore", schema_name: str = "public", connection_string: Optional[str] = None, async_connection_string: Optional[str] = None, perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresKVStore": """Return connection string from database parameters.""" conn_str = ( connection_string or f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}" ) async_conn_str = async_connection_string or ( f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{database}" ) return cls( connection_string=conn_str, async_connection_string=async_conn_str, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) @classmethod def from_uri( cls, uri: str, table_name: str = "kvstore", schema_name: str = "public", perform_setup: bool = True, debug: bool = False, use_jsonb: bool = False, ) -> "PostgresKVStore": """Return connection string from database parameters.""" params = params_from_uri(uri) return cls.from_params( **params, table_name=table_name, schema_name=schema_name, perform_setup=perform_setup, debug=debug, use_jsonb=use_jsonb, ) def _connect(self) -> Any: from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker self._engine = create_engine(self.connection_string, echo=self.debug) self._session = sessionmaker(self._engine) self._async_engine = create_async_engine(self.async_connection_string) self._async_session = sessionmaker(self._async_engine, class_=AsyncSession) def _create_schema_if_not_exists(self) -> None: with self._session() as session, session.begin(): from sqlalchemy import text # Check if the specified schema exists with "CREATE" statement check_schema_statement = text( f"SELECT schema_name FROM information_schema.schemata WHERE schema_name = '{self.schema_name}'" ) result = session.execute(check_schema_statement).fetchone() # If the schema does not exist, then create it if not result: create_schema_statement = text( f"CREATE SCHEMA IF NOT EXISTS {self.schema_name}" ) session.execute(create_schema_statement) session.commit() def _create_tables_if_not_exists(self) -> None: with self._session() as session, session.begin(): self._base.metadata.create_all(session.connection()) def _initialize(self) -> None: if not self._is_initialized: self._connect() if self.perform_setup: self._create_schema_if_not_exists() self._create_tables_if_not_exists() self._is_initialized = True def put( self, key: str, val: dict, collection: str = DEFAULT_COLLECTION, ) -> None: """Put a key-value pair into the store. Args: key (str): key val (dict): value collection (str): collection name """ self.put_all([(key, val)], collection=collection) async def aput( self, key: str, val: dict, collection: str = DEFAULT_COLLECTION, ) -> None: """Put a key-value pair into the store. Args: key (str): key val (dict): value collection (str): collection name """ await self.aput_all([(key, val)], collection=collection) def put_all( self, kv_pairs: List[Tuple[str, dict]], collection: str = DEFAULT_COLLECTION, batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: from sqlalchemy import text self._initialize() with self._session() as session: for i in range(0, len(kv_pairs), batch_size): batch = kv_pairs[i : i + batch_size] # Prepare the VALUES part of the SQL statement values_clause = ", ".join( f"(:key_{i}, :namespace_{i}, :value_{i})" for i, _ in enumerate(batch) ) # Prepare the raw SQL for bulk upsert # Note: This SQL is PostgreSQL-specific. Adjust for other databases. stmt = text( f""" INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value) VALUES {values_clause} ON CONFLICT (key, namespace) DO UPDATE SET value = EXCLUDED.value; """ ) # Flatten the list of tuples for execute parameters params = {} for i, (key, value) in enumerate(batch): params[f"key_{i}"] = key params[f"namespace_{i}"] = collection params[f"value_{i}"] = json.dumps(value) # Execute the bulk upsert session.execute(stmt, params) session.commit() async def aput_all( self, kv_pairs: List[Tuple[str, dict]], collection: str = DEFAULT_COLLECTION, batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: from sqlalchemy import text self._initialize() async with self._async_session() as session: for i in range(0, len(kv_pairs), batch_size): batch = kv_pairs[i : i + batch_size] # Prepare the VALUES part of the SQL statement values_clause = ", ".join( f"(:key_{i}, :namespace_{i}, :value_{i})" for i, _ in enumerate(batch) ) # Prepare the raw SQL for bulk upsert # Note: This SQL is PostgreSQL-specific. Adjust for other databases. stmt = text( f""" INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value) VALUES {values_clause} ON CONFLICT (key, namespace) DO UPDATE SET value = EXCLUDED.value; """ ) # Flatten the list of tuples for execute parameters params = {} for i, (key, value) in enumerate(batch): params[f"key_{i}"] = key params[f"namespace_{i}"] = collection params[f"value_{i}"] = json.dumps(value) # Execute the bulk upsert await session.execute(stmt, params) await session.commit() def get(self, key: str, collection: str = DEFAULT_COLLECTION) -> Optional[dict]: """Get a value from the store. Args: key (str): key collection (str): collection name """ from sqlalchemy import select self._initialize() with self._session() as session: result = session.execute( select(self._table_class) .filter_by(key=key) .filter_by(namespace=collection) ) result = result.scalars().first() if result: return result.value return None async def aget( self, key: str, collection: str = DEFAULT_COLLECTION ) -> Optional[dict]: """Get a value from the store. Args: key (str): key collection (str): collection name """ from sqlalchemy import select self._initialize() async with self._async_session() as session: result = await session.execute( select(self._table_class) .filter_by(key=key) .filter_by(namespace=collection) ) result = result.scalars().first() if result: return result.value return None def get_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]: """Get all values from the store. Args: collection (str): collection name """ from sqlalchemy import select self._initialize() with self._session() as session: results = session.execute( select(self._table_class).filter_by(namespace=collection) ) results = results.scalars().all() return {result.key: result.value for result in results} if results else {} async def aget_all(self, collection: str = DEFAULT_COLLECTION) -> Dict[str, dict]: """Get all values from the store. Args: collection (str): collection name """ from sqlalchemy import select self._initialize() async with self._async_session() as session: results = await session.execute( select(self._table_class).filter_by(namespace=collection) ) results = results.scalars().all() return {result.key: result.value for result in results} if results else {} def delete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool: """Delete a value from the store. Args: key (str): key collection (str): collection name """ from sqlalchemy import delete self._initialize() with self._session() as session: result = session.execute( delete(self._table_class) .filter_by(namespace=collection) .filter_by(key=key) ) session.commit() return result.rowcount > 0 async def adelete(self, key: str, collection: str = DEFAULT_COLLECTION) -> bool: """Delete a value from the store. Args: key (str): key collection (str): collection name """ from sqlalchemy import delete self._initialize() async with self._async_session() as session: async with session.begin(): result = await session.execute( delete(self._table_class) .filter_by(namespace=collection) .filter_by(key=key) ) return result.rowcount > 0
    [run-llama/llama_index] llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-postgres/llama_index/storage/kvstore/postgres/base.py
    def put( self, key: str, val: dict, collection: str = DEFAULT_COLLECTION, ) -> None: """Put a key-value pair into the store. Args: key (str): key val (dict): value collection (str): collection name """ self.put_all([(key, val)], collection=collection)
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/storage/kvstore/postgres_kvstore.py
    def put_all( self, kv_pairs: List[Tuple[str, dict]], collection: str = DEFAULT_COLLECTION, batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: from sqlalchemy import text self._initialize() with self._session() as session: for i in range(0, len(kv_pairs), batch_size): batch = kv_pairs[i : i + batch_size] # Prepare the VALUES part of the SQL statement values_clause = ", ".join( f"(:key_{i}, :namespace_{i}, :value_{i})" for i, _ in enumerate(batch) ) # Prepare the raw SQL for bulk upsert # Note: This SQL is PostgreSQL-specific. Adjust for other databases. stmt = text( f""" INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value) VALUES {values_clause} ON CONFLICT (key, namespace) DO UPDATE SET value = EXCLUDED.value; """ ) # Flatten the list of tuples for execute parameters params = {} for i, (key, value) in enumerate(batch): params[f"key_{i}"] = key params[f"namespace_{i}"] = collection params[f"value_{i}"] = json.dumps(value) # Execute the bulk upsert session.execute(stmt, params) session.commit()
    [run-llama/llama_index] llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-postgres/llama_index/storage/kvstore/postgres/base.py
    async def aput_all( self, kv_pairs: List[Tuple[str, dict]], collection: str = DEFAULT_COLLECTION, batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: from sqlalchemy import text self._initialize() async with self._async_session() as session: for i in range(0, len(kv_pairs), batch_size): batch = kv_pairs[i : i + batch_size] # Prepare the VALUES part of the SQL statement values_clause = ", ".join( f"(:key_{i}, :namespace_{i}, :value_{i})" for i, _ in enumerate(batch) ) # Prepare the raw SQL for bulk upsert # Note: This SQL is PostgreSQL-specific. Adjust for other databases. stmt = text( f""" INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value) VALUES {values_clause} ON CONFLICT (key, namespace) DO UPDATE SET value = EXCLUDED.value; """ ) # Flatten the list of tuples for execute parameters params = {} for i, (key, value) in enumerate(batch): params[f"key_{i}"] = key params[f"namespace_{i}"] = collection params[f"value_{i}"] = json.dumps(value) # Execute the bulk upsert await session.execute(stmt, params) await session.commit()
    [run-llama/llama_index] llama-index-integrations/vector_stores/llama-index-vector-stores-postgres/llama_index/vector_stores/postgres/base.py
    def delete(self, ref_doc_id: str, **delete_kwargs: Any) -> None: import sqlalchemy self._initialize() with self._session() as session, session.begin(): stmt = sqlalchemy.text( f"DELETE FROM {self.schema_name}.data_{self.table_name} where " f"(metadata_->>'doc_id')::text = '{ref_doc_id}' " ) session.execute(stmt) session.commit()
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/ingestion/pipeline.py
    class DocstoreStrategy(str, Enum): """Document de-duplication strategy.""" UPSERTS = "upserts" DUPLICATES_ONLY = "duplicates_only" UPSERTS_AND_DELETE = "upserts_and_delete"
    [run-llama/llama_index] llama-index-integrations/storage/kvstore/llama-index-storage-kvstore-postgres/llama_index/storage/kvstore/postgres/base.py
    def put_all( self, kv_pairs: List[Tuple[str, dict]], collection: str = DEFAULT_COLLECTION, batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: from sqlalchemy import text self._initialize() with self._session() as session: for i in range(0, len(kv_pairs), batch_size): batch = kv_pairs[i : i + batch_size] # Prepare the VALUES part of the SQL statement values_clause = ", ".join( f"(:key_{i}, :namespace_{i}, :value_{i})" for i, _ in enumerate(batch) ) # Prepare the raw SQL for bulk upsert # Note: This SQL is PostgreSQL-specific. Adjust for other databases. stmt = text( f""" INSERT INTO {self.schema_name}.{self._table_class.__tablename__} (key, namespace, value) VALUES {values_clause} ON CONFLICT (key, namespace) DO UPDATE SET value = EXCLUDED.value; """ ) # Flatten the list of tuples for execute parameters params = {} for i, (key, value) in enumerate(batch): params[f"key_{i}"] = key params[f"namespace_{i}"] = collection params[f"value_{i}"] = json.dumps(value) # Execute the bulk upsert session.execute(stmt, params) session.commit()
    [run-llama/llama_index] llama-index-core/llama_index/core/ingestion/pipeline.py
    class DocstoreStrategy(str, Enum): """Document de-duplication strategy.""" UPSERTS = "upserts" DUPLICATES_ONLY = "duplicates_only" UPSERTS_AND_DELETE = "upserts_and_delete"
    [run-llama/llama_index] llama-index-legacy/llama_index/legacy/indices/base.py
    def docstore(self) -> BaseDocumentStore: """Get the docstore corresponding to the index.""" return self._docstore
run-llama/llama_index