Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion packages/dbgpt-ext/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ storage_chromadb = [
"onnxruntime>=1.14.1,<=1.18.1",
"chromadb>=0.4.22"
]
storage_elasticsearch = ["elasticsearch"]
storage_elasticsearch = [
"elasticsearch==8.17.1",
"langchain==0.3.19",
"langchain-community==0.3.18",
]
storage_obvector = ["pyobvector"]

file_oss = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def __init__(
hosts=[f"http://{self._es_url}:{self._es_port}"],
)
self._es_index_settings = {
"number_of_shards": 1,
# replica number # # Avoid yellow status in standalone es,
# TODO: setting in config toml
"number_of_replicas": 0,
"analysis": {"analyzer": {"default": {"type": "standard"}}},
"similarity": {
"custom_bm25": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import os
import traceback
from dataclasses import dataclass, field
from typing import Any, List, Optional

Expand Down Expand Up @@ -236,12 +237,9 @@ def __init__(
f"http://{self.uri}:{self.port}",
basic_auth=(self.username, self.password),
)
# create es index
self.create_collection(collection_name=self.index_name)
else:
logger.warning("ElasticSearch not set username and password")
self.es_client_python = Elasticsearch(f"http://{self.uri}:{self.port}")
self.create_collection(collection_name=self.index_name)
except ConnectionError:
logger.error("ElasticSearch connection failed")
except Exception as e:
Expand Down Expand Up @@ -298,6 +296,12 @@ def load_document(
"Please install it with `pip install langchain` and "
"`pip install elasticsearch`."
)
try:
# create es index
self.create_collection(collection_name=self.index_name)
except Exception as e:
logger.error(f"Try create es index failed : {e}", exc_info=True)
logger.error(traceback.format_exc())
try:
texts = [chunk.content for chunk in chunks]
metadatas = [chunk.metadata for chunk in chunks]
Expand Down Expand Up @@ -346,6 +350,7 @@ def load_document(
logger.error(f"ElasticSearch connect failed {ce}")
except Exception as e:
logger.error(f"ElasticSearch load_document failed : {e}")
logger.error(traceback.format_exc())
return []

def delete_by_ids(self, ids):
Expand All @@ -365,7 +370,7 @@ def similar_search(
filters: Optional[MetadataFilters] = None,
) -> List[Chunk]:
"""Perform a search on a query string and return results."""
info_docs = self._search(query=text, topk=topk, filters=filters)
info_docs = self._vector_search(query=text, topk=topk, filters=filters)
return info_docs

def similar_search_with_scores(
Expand All @@ -385,7 +390,7 @@ def similar_search_with_scores(
List[Chunk]: Result doc and score.
"""
query = text
info_docs = self._search(query=query, topk=topk, filters=filters)
info_docs = self._vector_search(query=query, topk=topk, filters=filters)
docs_and_scores = [
chunk for chunk in info_docs if chunk.score >= score_threshold
]
Expand Down Expand Up @@ -439,6 +444,67 @@ def _search(
info_docs.append(doc_with_score)
return info_docs

def _vector_search(
self, query: str, topk: int, filters: Optional[MetadataFilters] = None, **kwargs
) -> List[Chunk]:
"""Search similar documents.

Args:
query: query text
topk: return docs nums. Defaults to 4.
filters: metadata filters.
Return:
List[Chunk]: list of chunks
"""
# Convert the query text to a vector using the embedding function
query_vector = self.embedding.embed_query(query)

# Prepare the script score query to compute vector similarity
script_score_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "(cosineSimilarity(params.query_vector, 'dense_vector')"
" + 1.0)/2.0",
"params": {"query_vector": query_vector},
},
}
}

# Prepare the body for the search query
body = {"query": script_score_query}

# Apply filter first if filters are provided
if filters:
where_filters = self.convert_metadata_filters(filters)
body["query"] = {
"bool": {
"filter": [{"terms": where_filters}],
"must": [script_score_query],
}
}

search_results = self.es_client_python.search(
index=self.index_name, body=body, size=topk
)
search_results = search_results["hits"]["hits"]

if not search_results:
logger.warning("""No ElasticSearch results found.""")
return []
info_docs = []
for result in search_results:
doc_id = result["_id"]
source = result["_source"]
context = source["context"]
metadata = source["metadata"]
score = result["_score"]
doc_with_score = Chunk(
content=context, metadata=metadata, score=score, chunk_id=doc_id
)
info_docs.append(doc_with_score)
return info_docs

def vector_name_exists(self):
"""Whether vector name exists."""
return self.es_client_python.indices.exists(index=self.index_name)
Expand Down