メインコンテンツへジャンプ

お客様と対話するインテリジェントなボットをビジネスに導入することを想像してみてください。チャットボットは一般的に、顧客と対話し、彼らに助けや情報を提供するために使用されます。しかし、通常のチャットボットは複雑な質問に答えるのに苦労することがあります。

RAG とは

Retrieval Augmented Generation (RAG)は、チャットボットが難しい質問を理解し、応答する能力を向上させる方法です。この生成AIデザインパターンは、大規模言語モデル(LLM)と外部知識の取得を組み合わせています。 これにより、リアルタイムデータを生成プロセス(推論時間)中にAIアプリケーションに統合することが可能になります。この文脈情報をLLMに提供することで、RAGは生成された出力の精度と品質を大幅に向上させます。

RAGを使用する利点の一部は次のとおりです:

  • AIアプリケーションの精度と品質の向上: RAGがLLMにリアルタイムデータをコンテキストとして提供することで、AIアプリケーションの精度と品質が向上します。 これは、LLMがより多くの情報にアクセスでき、それを使用してより情報的で関連性のある応答を生成できるためです。
  • さまざまなタイプのデータを処理する能力:RAGは、ドキュメントやメールのような非構造化データや、テーブルのような構造化データを含むさまざまなタイプのデータを処理できます。これにより、さまざまなアプリケーションで使用できる汎用的なツールになります。
  • ユーザーのクエリに対するよりダイナミックで柔軟なレスポンス: RAGは、ユーザーの興味やデータアクセス制御に基づいてレスポンスを制限するなど、ユーザーのクエリに対するよりダイナミックで柔軟なレスポンスを生成することができます。これにより、RAGチャットボットはセキュリティコントロールを持ちつつ、ユーザーにとってより魅力的で役立つものになります。
  • 初期費用の削減と開発の高速化:RAGは、大規模な開発作業やLLMの微調整なしに迅速かつ簡単にデプロイできます。

DatabricksとPinecone

Pineconeのベクトルデータベースは、複雑なデータ検索をピンポイントで管理するのに優れている一方、Databricksのデータインテリジェンスプラットフォームは、大量のデータセットの取り扱いと分析を効率化します。

Pineconeとの統合はシームレスで、Databricksが大規模にベクトル埋め込みを効率的に保存および取得できます。 この統合により、PineconeとDatabricksを活用した高性能ベクトル検索アプリケーションの開発が簡素化されます。

DatabricksとPineconeを一緒に使用することで、従来のチャットボットよりも正確で効率的なチャットボットを作成することができます。

ステップバイステップの実装

このブログでは、Databricksのドキュメントやホワイトペーパーを活用して、Databricksに関するあらゆる質問に答えられるチャットボットの構築方法をご紹介します。

チャットボットの構築には、4つの重要なステージがあります。最初のステージは、データのインジェストと準備です。次に、効率的な情報検索のために、データをPineconeのようなベクターデータベースに保存します。3つ目のステージでは、Pineconeを使った検索と、Llama 3.1のようなLLM(大規模言語モデル)を使用して応答を生成するRAGリトリーバーとチェーンを設定します。最後に、チャットボットをDatabricks Unity Catalogに登録し、Databricks Mosaic AI Model Servingを通じてデプロイします。このプロセスの詳細な手順については、続きをご覧ください。

Databricksでデータを準備する

ステップ1:Databricksでデータを準備する

  1. クラウドストレージにある生のファイルをDatabricks Autoloaderを使用して取り込みます。
    私たちはDatabricks autoloaderを使用しています。これは、新しいファイルがクラウドストレージに着陸すると自動的に処理するハンズオフアプローチを提供し、手動の状態管理の必要性なしに効率性とフォールトトレランスを確保します。Databricks Autoloaderは、数十億のファイルにスケールするように設計されており、ネイティブクラウドAPIを利用したファイル検出によりコストを抑えることができます。さらに、Auto Loaderはスキーマの変更に適応する組み込みのスキーマ推論と進化機能を備えたインテリジェントなものです。大量のデータを扱っている場合や、ほぼリアルタイムの取り込みが必要な場合でも、Auto Loaderはデータ取り込みプロセスを簡素化し、加速します。ストリーミングテーブルは、特にストリーミングやインクリメンタルなデータ処理を扱う際に、よりシンプルな体験を提供します。
  2. pdf / htmlファイルからテキストを抽出します。
    まず、PDFファイルのバイトコンテンツを読み取り可能なテキストに変換し、テキストから特定のセグメントを取得する必要があります。この参照実装では、Spark UDFと組み合わせてPyPdfまたはUnstructuredIOライブラリを使用し、テキスト抽出プロセスを簡素化します。 テキストを管理しやすいチャンクに分割するためのテキストスプリッターも使用します。
  3. ベクトル埋め込みを作成し、それらをDeltaテーブルに保存します。
    ベクトル埋め込みを作成するために、Databricks Mosaic AI Foundational Model API経由で利用可能なBGE埋め込みモデルを使用します。 Python UDFは、基礎となるモデルエンドポイントを使用して埋め込みを計算します。PDFから抽出したデータと埋め込みは、Deltaテーブルに保存されます。

ステップ2:データをPineconeベクトルデータベースに保存します

  1. Pineconeクライアント設定の初期化。

    Pineconeにベクトル埋め込みをアップサートするとき、最初にインデックスを作成します。 インデックスは、次元数が同じ埋め込みのグループで、通常は同種のユースケースの基礎となるデータセットを表します。 Pineconeにログインして、Pinecone APIキーを作成します。

    Databricks Secretsは、Databricksのノートブック、ジョブ、データパイプライン内で使用する可能性のあるパスワード、APIキー、その他の資格情報などの機密情報を安全に管理および保存します。 Pinecone APIキーなどの必要な認証情報など、機密情報を保存するためにDatabricksのシークレットを使用します。

    以下に、Databricksのシークレットを使用して、Pinecone APIキーなどの機密情報を取得する方法を示します。次に、Pinecone APIキーと環境を使用して、Pineconeへのクライアント接続を初期化します。
    # Pineconeの初期化
    
    import pinecone
    from pinecone import Pinecone
    
     
    pinecone_api_key = dbutils.secrets.get("your_secrets_scope", "PINECONE_API_KEY")
    
    project_name = "Starter"    # your-pinecone-project-name
    
    index_name = "dbdemo-index" # your-pinecone-index-name
    
     
    
    # connect to pinecone index
    
    pc = Pinecone(api_key=api_key)
    
    index = pc.Index(index_name)
  2. 次に、Pinecone UIまたはAPIを使用してPineconeインデックスを作成します。
    pc.create_index(
    
    name= index_name, # pineconeのインデックス名
    
    dimension=1536,
    
    metric="cosine",
    
    spec=ServerlessSpec(
    
       cloud="aws", 
    
       region="us-east-1"
    
    )
    
    )
  3. Pineconeに必要なスキーマにデータを変換します。

    Pineconeでは、インデックス内のベクトルにメタデータのキーと値のペアを付加することができます。これを使用して、元のドキュメントとそのメタデータを保存したり、追加のフィルター表現を指定したりできます。Pineconeに書き込む前に、データを変換し、デルタテーブルにメタデータ列を追加して、元のドキュメントの内容やスニペット、ドキュメントのソースやIDなどの追加メタデータをPineconeのスキーマ要件に従ってキャプチャします。
    from pyspark.sql.functions import col, lit, struct, to_json
    
    from pyspark.sql.functions import encode
    
     
    
    df = spark.table('databricks_pdf_documentation')\
    
               .withColumn("metadata", to_json(struct(col("content"), col("url"), col("id"))))\
    
               .withColumn("namespace", lit("dbdemo-namespace")) \
    
               .withColumn("values", col("embedding")) \
    
               .withColumn("sparse_values", lit(None)) \
    
               .select("id", "values", "namespace", "metadata", "sparse_values")
  4. Pineconeのインデックスに書き込みます。

    ドキュメンテーションに記載されているように、Pineconeのスパークコネクタをインストールします。Pineconeのスパークコネクタを使用して、エンベッディングをPineconeのインデックスに書き込みます。モード "append" を使用すると、新しいデータをインデックスに追加することもできます。
    #write to pinecone
    
    (
    
       df.write
    
       .option("pinecone.apiKey", api_key)
    
       .option("pinecone.indexName", index_name)
    
       .format("io.pinecone.spark.pinecone.Pinecone")
    
       .mode("append")
    
       .save()
    
    )

ステップ3:Pineconeベクトルデータベースのクエリ

次に、クエリAPIを使用してPineconeのベクトルインデックスをクエリすることができます。このAPIは、質問のエンベッディングを入力として受け取ります。

# UDF for embedding

from pyspark.sql.types import *

def get_embedding_for_string(text):

   response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": text})

   e = response.data

   return e[0]['embedding']

#udfとして登録

get_embedding_for_string_udf = udf(get_embedding_for_string, ArrayType(FloatType()))



# Pineconeベクトルデータベースのクエリ

 

question = "How can I track billing usage on my workspaces?"

 

# クエリ埋め込みを作成

xq = get_embedding_for_string(question)

 

# Pineconeに最も類似した上位5つの結果をクエリ

query_response = index.query(

   namespace='dbdemo-namespace',

   top_k=5,

   include_values=True,

   include_metadata=True,

   vector=xq

)

 

#print(query_response)

 

query_response_docs = []

for match in query_response['matches']:

   query_response_docs.append([match['metadata']['url'],match['metadata']['content'],match['score']])

 

print(query_response_docs)

APIを通じてPineconeを直接クエリすることで、PineconeとDatabricksを任意のコードに統合できます。

PineconeとDatabricksを任意のコードに統合する

次のセクションでは、人気のあるLangChainフレームワークを使用してこのワークフローを簡素化する方法を示します。

ステップ4:LangChainを使用してPineconeベクトルデータベースをクエリします

Langchainは、LLM(大規模言語モデル)によるアプリケーション構築を簡素化するフレームワークです。Databricksの埋め込みは、埋め込みモデルとの対話を簡素化し、Pineconeとの統合は簡素化されたクエリインターフェースを提供します。

Langchainのラッパーは、すべての基礎的なロジックとAPI呼び出しを処理することで、簡単にします。下記のLangChainコードは、クエリテキストをベクトルに明示的に変換する必要性を抽象化します。

import pinecone

from langchain_community.embeddings import DatabricksEmbeddings

from langchain.chains import RetrievalQA

from pinecone import Pinecone

from langchain_pinecone import PineconeVectorStore

 
import os 

 
#Creating the input question embeddings (with Databricks `bge-large-en`)

embedding_model = DatabricksEmbeddings(endpoint="databricks-bge-large-en")


# connect to pinecone index

pc = Pinecone(api_key=pinecone_api_key)

index_pc = pc.Index(pinecone_index_name)

 

vectorstore = PineconeVectorStore(

     index=index_pc,

     namespace=pinecone_namespace,

     embedding=embedding_model,

     text_key="content"

 )

 

#Calling the Pinecone vector database to find similar documents

query = "What is Apache Spark?"

docs = vectorstore.similarity_search(

     クエリ、  # 検索クエリ

     k=3  # 最も関連性の高いドキュメントを3つ返す

 )

pprint(docs[0])

ステップ5:PineconeとLangChainのためのリトリーバーを作成します

上記では、Pineconeベクトルインデックスでの類似性検索の方法を示しました。RAGチャットボットを作成するために、LangChain Retrieverインターフェースを使用してインデックスをラップします。

まず、APIキーと環境を設定するためにPineconeを初期化します。次に、正しいネームスペースとキーを持つ既存のPineconeインデックスからVectorStoreインスタンスを作成します。

from langchain_community.embeddings import DatabricksEmbeddings

from langchain.chains import RetrievalQA

from pinecone import Pinecone

from langchain_pinecone import PineconeVectorStore

 

embedding_model = DatabricksEmbeddings(endpoint="databricks-bge-large-en")

 

def get_retriever(persist_dir: str = None):

   # initialize pinecone and connect to pinecone index

   pc = Pinecone(api_key=pinecone_api_key)

   index_pc = pc.Index(pinecone_index_name)

 

   vectorstore = PineconeVectorStore(

       index=index_pc,

       namespace=pinecone_namespace,

       embedding=embedding_model,

       text_key="content"

   )

 

   return vectorstore.as_retriever()

 

retriever = get_retriever()

ステップ6: チャットボットチェーンの組み立て

これで、チャットボットを定義するチェーンにリトリーバーを入れることができます!

retrieve_document_chain = (

   itemgetter("messages")

   | RunnableLambda(extract_question)

   | retriever

)

 

#test the retriever chain

print(retrieve_document_chain.invoke({"messages": [{"role": "user", "content": "Apache Sparkとは何ですか?"}]}))

チャットボットがチャットメッセージから質問を正しく抽出し、Pineconeから関連するコンテキストを取得できるかどうかを見てみましょう。

Pineconeからの関連コンテキスト

ステップ7:チャットボットをモデルとしてデプロイする

チャットボットを反復処理するにつれて、モデルオブジェクト、モデルバージョン、メタデータを追跡し、アクセス制御を管理することが求められます。そのためには、Unity Catalogと統合されたMLflowのModel Registryを使用します。

Unityカタログを使用して、チャットボットチェーンをmlflow.langchain.log_modelを使用してモデルとして登録できます。モデルのシグネチャは、mlflowのinfer_signatureを使用して推測することができます。 pinecone-clientを依存関係に入れることを忘れないでください。チェーンを定義したノートブックで"mlflow.models.set_model(model=full_chain)"を設定します。新しいドライバーノートブックで、チャットボットを登録し、モデルサービングにチャットボットをデプロイします。

from mlflow.models import infer_signature

import mlflow

import langchain

import pandas as pd

 

mlflow.set_registry_uri("databricks-uc")

model_name = f"{catalog_name}.{schema_name}.rag_with_pinecone_model" #catalog_name, schema_name and model name

 

# Specify the full path to the chain notebook

chain_notebook_file = "2.1 - Advanced-Chatbot-Chain - Using Pinecone" # the name of the notebook that has the chain definition

chain_notebook_path = os.path.join(os.getcwd(), chain_notebook_file)

 

mlflow.start_run()で:

   signature = infer_signature(input_example, output_example)

   logged_chain_info = mlflow.langchain.log_model(

       lc_model=chain_notebook_path,

       artifact_path="chain",

       registered_model_name=model_name,

       input_example=input_example,

       signature=signature,

       example_no_conversion=True, # スキーマを動作させるために必要

       extra_pip_requirements=[

         "mlflow==" + mlflow.__version__,

         "langchain==" + langchain.__version__,

         "pinecone-client==3.2.2",

         "langchain-pinecone==0.1.1",

         "langchain-community"
       ]

   )

モデルはDatabricksのUnity Catalogに登録されており、すべてのデータとAI資産のアクセス制御、監査、系統、発見を一元化します。

データとAIの資産

ステップ8:チャットボットをDatabricksモデル提供にデプロイする

さあ、チャットボットチェーンモードをモデルサービングエンドポイントとしてデプロイしましょう。以下では、PINECONE_API_KEYDATABRICKS_TOKENを環境変数に入れます。これは、サービングエンドポイントがPineconeとDatabricks Foundation Modelsと通信するために使用します。これにより、コードやユーザーにこれらの機密情報を明かすことなく、サーブされたモデルへのアクセスを許可することができます。

# サービングエンドポイントの作成または更新

from databricks.sdk import WorkspaceClient

from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedModelInput, ServedModelInputWorkloadSize

import requests

 

# 最新のモデルバージョンをチェック

def get_latest_model_version(model_name):

   from mlflow import MlflowClient

   mlflow_client = MlflowClient()

   latest_version = 1

   for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):

       version_int = int(mv.version)

       if version_int > latest_version:

           latest_version = version_int

   return latest_version

 

# これでサービングエンドポイントを作成または更新します

 

serving_endpoint_name = "pinecone_rag_chain"

latest_model_version = get_latest_model_version(model_name)

databricks_api_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

 

 

w = WorkspaceClient()

endpoint_config = EndpointCoreConfigInput(

   name=serving_endpoint_name,

   served_models=[

       ServedModelInput(

           model_name=model_name,

           model_version=latest_model_version,

           workload_size=ServedModelInputWorkloadSize.SMALL,

           scale_to_zero_enabled=True,

           environment_vars={

               "PINECONE_API_KEY": "{{secrets/prasad_kona/PINECONE_API_KEY}}",

               "DATABRICKS_TOKEN": "{{secrets/dbdemos/rag_sp_token}}",

           }

       )

   ]

)

 

existing_endpoint = next(

   (e for e in w.serving_endpoints.list() if e.name == serving_endpoint_name), None

)

serving_endpoint_url = f"{host}/ml/endpoints/{serving_endpoint_name}"

if existing_endpoint == None:

   print(f"エンドポイント{serving_endpoint_url}を作成しています、これにはパッケージ化とデプロイに数分かかります...")

   w.serving_endpoints.create_and_wait(name=serving_endpoint_name, config=endpoint_config)

else:

   print(f"エンドポイント{serving_endpoint_url}をバージョン{latest_model_version}に更新します。これには、エンドポイントのパッケージ化とデプロイに数分かかります...")

   w.serving_endpoints.update_config_and_wait(served_models=endpoint_config.served_models, name=serving_endpoint_name)

 

displayHTML(f'あなたのモデルエンドポイントの提供が利用可能になりました。詳細は<a href="/ml/endpoints/{serving_endpoint_name}">モデルサービングエンドポイントページ</a>をご覧ください。')

モデルサービングUIは、提供されているモデルのヘルスに関するリアルタイム情報を提供します。

モデル提供UI

ステップ9:チャットボットをテストする

チャットボットをデプロイした後、REST APIまたはDatabricks SDKでテストすることができます。

from databricks.sdk.service.serving import DataframeSplitInput

 

test_dialog = DataframeSplitInput(

   columns=["messages"],

   data=[

     

           {

               "messages": [

                   {"role": "user", "content": "Apache Sparkとは何ですか?"},

                   {

                       "role": "assistant",

                       "content": "Apache Sparkは、ビッグデータ分析で広く使用されているオープンソースのデータ処理エンジンです。",

                   },

                   {"role": "user", "content": "ストリーミングはサポートしていますか?"},

               ]

           }

     

   ],

)

answer = w.serving_endpoints.query(serving_endpoint_name, dataframe_split=test_dialog)

print(answer.predictions[0])

モデル提供の一部として利用可能なQuery UIを使用してテストすることもできます。

Query UIモデルの提供

次のステップ

DatabricksとPineconeを活用して、最先端のRAGチャットボットを導入し、顧客サービスを向上させましょう。従来のボットとは異なり、これらの高度なチャットボットはDatabricksのデータインテリジェンスプラットフォームとPineconeのベクターデータベースを活用して、正確かつタイムリーな回答を提供します。膨大なデータを素早く検索し、必要な情報を即座に見つけ、数秒でお客様に正確な答えを提供します。これにより、顧客体験が向上するだけでなく、デジタルエンゲージメントの新しい基準が確立されます。

ビジネスリーダーにとって、このテクノロジーの導入は単なるアップグレードではなく、顧客サービスの革新をリードするための戦略的な一手です。データ駆動型のインテリジェントなソリューションを採用することで、ビジネスを顧客対応の最前線に置き、優れたサービスへのコミットメントを示し、顧客の共感を得ることができます。

Databricksでの生成AIについてもっと学べる無料トレーニングをぜひチェックしてください。また、PineconeおよびDatabricksの追加ドキュメントもこちらからご覧いただけます。このブログのサンプルノートブックもちらからアクセスできます。

Databricks 無料トライアル

関連記事

本番運用 - Databricksを使用した高品質の RAG アプリケーション

12 月に、Databricks は Retrieval Augmented Generation (RAG) を使用して AI アプリケーションを本番運用するための新しいツール を発表しました 。それ以来、 Databricks Data Intelligence Platform 上で何千もの顧客によって構築される RAG アプリケーションが爆発的に増加しています 。 本日、 DatabricksVector Searchの一般提供やモデルサービングのメジャーアップデートなど、...

Databricksで高品質のRAGアプリケーションを作成する

RAG(Retrieval-Augmented-Generation )は、独自のリアルタイムデータを LLM(Large Language Model) アプリケーションに組み込む強力な方法として、急速に台頭してきた。 本日Databricksユーザーが企業データを使用して高品質な本番LLMアプリケーションを構築するためのRAGツール群を発表できることを嬉しく思う。 LLMは、新しいアプリケーションを迅速にプロトタイプ化する能力において、大きなブレークスルーをもたらした。 しかし、RAGアプリケーションを構築している何千もの企業と仕事をした結果、彼らの最大の課題は、これらのアプリケーションを 本番で用いることができる品質にすること であることがわかった。 顧客向けアプリケーションに要求される品質基準を満たすためには、AIの出力は正確で、最新で、そして企業のコンテキストを認識し、安全でなければならない。 高品質なRAGアプリケーションを構築するためには、開発者はデータとモデル出力の品質を理解するための豊富なツール
プラットフォームブログ一覧へ