チュートリアル: ElasticsearchでCohereを使用する

このチュートリアルの指示では、推論APIを使用してCohereで埋め込みを計算し、Elasticsearchで効率的なベクトルまたはハイブリッド検索のためにそれらを保存する方法を示します。このチュートリアルでは、Python Elasticsearchクライアントを使用して操作を実行します。

あなたは以下のことを学びます:

  • Cohereサービスを使用してテキスト埋め込みのための推論エンドポイントを作成すること、
  • Elasticsearchインデックスのために必要なインデックスマッピングを作成すること、
  • 埋め込みと共にドキュメントをインデックスに取り込むための推論パイプラインを構築すること、
  • データに対してハイブリッド検索を実行すること、
  • Cohereの再ランクモデルを使用して検索結果を再ランクすること、
  • CohereのチャットAPIを使用してRAGシステムを設計すること。

このチュートリアルでは、SciFactデータセットを使用します。

異なるデータセットを使用した例については、Cohereのチュートリアルを参照してください。

このチュートリアルのColabノートブック版も確認できます。

要件

  • Cohereサービスで推論APIを使用するには、有料のCohereアカウントが必要です。Cohereの無料トライアルAPIの使用は制限されています。
  • Elastic Cloudアカウントが必要です。
  • Python 3.7以上が必要です。

必要なパッケージをインストールする

ElasticsearchとCohereをインストールします:

Py

  1. !pip install elasticsearch
  2. !pip install cohere

必要なパッケージをインポートします:

Py

  1. from elasticsearch import Elasticsearch, helpers
  2. import cohere
  3. import json
  4. import requests

Elasticsearchクライアントを作成する

Elasticsearchクライアントを作成するには、次のものが必要です:

Py

  1. ELASTICSEARCH_ENDPOINT = "elastic_endpoint"
  2. ELASTIC_API_KEY = "elastic_api_key"
  3. client = Elasticsearch(
  4. cloud_id=ELASTICSEARCH_ENDPOINT,
  5. api_key=ELASTIC_API_KEY
  6. )
  7. # クライアントが接続されていることを確認する
  8. print(client.info())

推論エンドポイントを作成する

推論エンドポイントを作成することから始めます。この例では、推論エンドポイントはCohereのembed-english-v3.0モデルを使用し、embedding_typebyteに設定されています。

Py

  1. COHERE_API_KEY = "cohere_api_key"
  2. client.inference.put_model(
  3. task_type="text_embedding",
  4. inference_id="cohere_embeddings",
  5. body={
  6. "service": "cohere",
  7. "service_settings": {
  8. "api_key": COHERE_API_KEY,
  9. "model_id": "embed-english-v3.0",
  10. "embedding_type": "byte"
  11. }
  12. },
  13. )

APIキーは、CohereダッシュボードのAPIキーセクションで見つけることができます。

インデックスマッピングを作成する

埋め込みを含むインデックスのためのインデックスマッピングを作成します。

Py

  1. client.indices.create(
  2. index="cohere-embeddings",
  3. settings={"index": {"default_pipeline": "cohere_embeddings"}},
  4. mappings={
  5. "properties": {
  6. "text_embedding": {
  7. "type": "dense_vector",
  8. "dims": 1024,
  9. "element_type": "byte",
  10. },
  11. "text": {"type": "text"},
  12. "id": {"type": "integer"},
  13. "title": {"type": "text"}
  14. }
  15. },
  16. )

推論パイプラインを作成する

これで、埋め込みを保存するための推論エンドポイントとインデックスが準備できました。次のステップは、推論エンドポイントを使用して埋め込みを作成し、それらをインデックスに保存する取り込みパイプラインを作成することです。

Py

  1. client.ingest.put_pipeline(
  2. id="cohere_embeddings",
  3. description="Ingest pipeline for Cohere inference.",
  4. processors=[
  5. {
  6. "inference": {
  7. "model_id": "cohere_embeddings",
  8. "input_output": {
  9. "input_field": "text",
  10. "output_field": "text_embedding",
  11. },
  12. }
  13. }
  14. ],
  15. )

データを準備し、ドキュメントを挿入する

この例では、HuggingFaceで見つけることができるSciFactデータセットを使用します。

Py

  1. url = 'https://huggingface.co/datasets/mteb/scifact/raw/main/corpus.jsonl'
  2. # URLからJSONLデータを取得する
  3. response = requests.get(url)
  4. response.raise_for_status() # 不正なレスポンスを確認する
  5. # 内容を改行で分割し、各行をJSONとして解析する
  6. data = [json.loads(line) for line in response.text.strip().split('\n') if line]
  7. # データは辞書のリストです
  8. # _idキーをidに変更します。_idはElasticsearchで予約されたキーです。
  9. for item in data:
  10. if '_id' in item:
  11. item['id'] = item.pop('_id')
  12. # インデックスされるドキュメントを準備する
  13. documents = []
  14. for line in data:
  15. data_dict = line
  16. documents.append({
  17. "_index": "cohere-embeddings",
  18. "_source": data_dict,
  19. }
  20. )
  21. # バルクエンドポイントを使用してインデックスする
  22. helpers.bulk(client, documents)
  23. print("データの取り込みが完了し、テキスト埋め込みが生成されました!")

あなたのインデックスは、SciFactデータとテキストフィールドのテキスト埋め込みで満たされています。

ハイブリッド検索

インデックスをクエリし始めましょう!

以下のコードはハイブリッド検索を実行します。kNNクエリは、text_embeddingフィールドを使用してベクトルの類似性に基づいて検索結果の関連性を計算し、レキシカル検索クエリはtitleおよびtextフィールドでキーワードの類似性を計算するためにBM25検索を使用します。

Py

  1. query = "バイオ類似性とは何ですか?"
  2. response = client.search(
  3. index="cohere-embeddings",
  4. size=100,
  5. knn={
  6. "field": "text_embedding",
  7. "query_vector_builder": {
  8. "text_embedding": {
  9. "model_id": "cohere_embeddings",
  10. "model_text": query,
  11. }
  12. },
  13. "k": 10,
  14. "num_candidates": 50,
  15. },
  16. query={
  17. "multi_match": {
  18. "query": query,
  19. "fields": ["text", "title"]
  20. }
  21. }
  22. )
  23. raw_documents = response["hits"]["hits"]
  24. # 最初の10件の結果を表示する
  25. for document in raw_documents[0:10]:
  26. print(f'タイトル: {document["_source"]["title"]}\nテキスト: {document["_source"]["text"]}\n')
  27. # ランキングのためにドキュメントをフォーマットする
  28. documents = []
  29. for hit in response["hits"]["hits"]:
  30. documents.append(hit["_source"]["text"])

検索結果を再ランクする

結果をより効果的に組み合わせるために、推論APIを通じてCohereのRerank v3モデルを使用して、結果のより正確な意味的再ランクを提供します。

Cohere APIキーと使用するモデル名をmodel_id(この例ではrerank-english-v3.0)として推論エンドポイントを作成します。

Py

  1. client.inference.put_model(
  2. task_type="rerank",
  3. inference_id="cohere_rerank",
  4. body={
  5. "service": "cohere",
  6. "service_settings":{
  7. "api_key": COHERE_API_KEY,
  8. "model_id": "rerank-english-v3.0"
  9. },
  10. "task_settings": {
  11. "top_n": 10,
  12. },
  13. }
  14. )

新しい推論エンドポイントを使用して結果を再ランクします。

Py

  1. # クエリと検索結果をサービスに渡す
  2. response = client.inference.inference(
  3. inference_id="cohere_rerank",
  4. body={
  5. "query": query,
  6. "input": documents,
  7. "task_settings": {
  8. "return_documents": False
  9. }
  10. }
  11. )
  12. # 再ランク応答で提供されたインデックスに基づいて入力ドキュメントを再構築する
  13. ranked_documents = []
  14. for document in response.body["rerank"]:
  15. ranked_documents.append({
  16. "title": raw_documents[int(document["index"])]["_source"]["title"],
  17. "text": raw_documents[int(document["index"])]["_source"]["text"]
  18. })
  19. # 上位10件の結果を印刷する
  20. for document in ranked_documents[0:10]:
  21. print(f"タイトル: {document['title']}\nテキスト: {document['text']}\n")

応答は、関連性の降順でドキュメントのリストです。各ドキュメントには、推論エンドポイントに送信されたときのドキュメントの順序を反映するインデックスが対応しています。

CohereとElasticsearchを使用した情報検索強化生成 (RAG)

RAGは、外部データソースから取得した追加情報を使用してテキストを生成する方法です。ランク付けされた結果を使用して、CohereのチャットAPIを使用して以前に作成したものの上にRAGシステムを構築できます。

取得したドキュメントとクエリを渡して、Cohereの最新の生成モデルCommand R+を使用して基盤となる応答を受け取ります。

次に、クエリとドキュメントをチャットAPIに渡し、応答を印刷します。

Py

  1. response = co.chat(message=query, documents=ranked_documents, model='command-r-plus')
  2. source_documents = []
  3. for citation in response.citations:
  4. for document_id in citation.document_ids:
  5. if document_id not in source_documents:
  6. source_documents.append(document_id)
  7. print(f"Query: {query}")
  8. print(f"Response: {response.text}")
  9. print("Sources:")
  10. for document in response.documents:
  11. if document['id'] in source_documents:
  12. print(f"{document['title']}: {document['text']}")

応答はこのようになります:

コンソール結果

  1. Query: What is biosimilarity?
  2. Response: Biosimilarity is based on the comparability concept, which has been used successfully for several decades to ensure close similarity of a biological product before and after a manufacturing change. Over the last 10 years, experience with biosimilars has shown that even complex biotechnology-derived proteins can be copied successfully.
  3. Sources:
  4. Interchangeability of Biosimilars: A European Perspective: (...)