Embeddings model selection and vector database selection and implementation analysis in RAGFlow

In-depth exploration of the selection and configuration of the Embeddings model in the RAGFlow project, as well as the technical details of the vector database.
Core content:
1. Interface design and configuration of the Embeddings model in RAGFlow
2. Supported Embedding models and their characteristics
3. Selection and implementation analysis of the vector database
This article continues to analyze the technical details of its Embeddings model selection and configuration and vector database selection and implementation based on RAGFlow source code and official documentation.
1. Embeddings model selection and configuration
RAGFlow supports multiple Embeddings models.rag/llm/embedding_model.py
Implemented rich model interfaces and configuration options.
1.1 Model Architecture Design
RAGFlow adopts the abstract base class design pattern.Base
The class defines the interface that all Embedding models must implement:
class Base ( ABC ):
def __init__ ( self, key, model_name ):
pass
def encode ( self, texts: list ):
raise NotImplementedError( "Please implement encode method!" )
def encode_queries ( self, text: str ):
raise NotImplementedError( "Please implement encode method!" )
def total_token_count ( self, resp ):
try :
return resp.usage.total_tokens
except Exception:
pass
try :
return resp[ "usage" ][ "total_tokens" ]
except Exception:
pass
return 0
This design allows RAGFlow to easily support and extend different Embedding models by simply implementing specific interface methods.
1.2 Supported Embedding Models
As can be seen from the source code, RAGFlow supports the following Embedding models:
1. DefaultEmbedding : The default FlagEmbedding model is used, based on BAAI/bge-large-zh-v1.5 class DefaultEmbedding ( Base ):
os.environ[ 'CUDA_VISIBLE_DEVICES' ] = '0'
_model = None
_model_name = ""
_model_lock = threading.Lock()
def __init__ ( self, key, model_name, **kwargs ):
if not settings.LIGHTEN:
with DefaultEmbedding._model_lock:
from FlagEmbedding import FlagModel
import torch
if not DefaultEmbedding._model or model_name != DefaultEmbedding._model_name:
try :
DefaultEmbedding._model = FlagModel(os.path.join(get_home_cache_dir(), re.sub( r"^[a-zA-Z0-9]+/" , "" , model_name)),
query_instruction_for_retrieval = "Generate a representation for this sentence to retrieve related articles:" ,
use_fp16=torch.cuda.is_available())
DefaultEmbedding._model_name = model_name
except Exception:
model_dir = snapshot_download(repo_id= "BAAI/bge-large-zh-v1.5" ,
local_dir=os.path.join(get_home_cache_dir(), re.sub( r"^[a-zA-Z0-9]+/" , "" , model_name)),
local_dir_use_symlinks= False )
DefaultEmbedding._model = FlagModel(model_dir,
query_instruction_for_retrieval = "Generate a representation for this sentence to retrieve related articles:" ,
use_fp16=torch.cuda.is_available())
self ._model = DefaultEmbedding._model
self ._model_name = DefaultEmbedding._model_name2. OpenAIEmbed : Using OpenAI’s embedding model class OpenAIEmbed ( Base ):
def __init__ ( self, key, model_name= "text-embedding-ada-002" , base_url= "https://api.openai.com/v1" ):
if not base_url:
base_url = "https://api.openai.com/v1"
self .client = OpenAI(api_key=key, base_url=base_url)
self .model_name = model_name3. LocalAIEmbed : Supports locally deployed embedding models class LocalAIEmbed ( Base ):
def __init__ ( self, key, model_name, base_url ):
if not base_url:
raise ValueError( "Local embedding model url cannot be None" )
base_url = urljoin(base_url, "v1" )
self .client = OpenAI(api_key= "empty" , base_url=base_url)
self .model_name = model_name.split( "___" )[ 0 ]4. AzureEmbed : Azure OpenAI’s embedding model class AzureEmbed ( OpenAIEmbed ):
def __init__ ( self, key, model_name, **kwargs ):
from openai.lib.azure import AzureOpenAI
api_key = json.loads(key).get( 'api_key' , '' )
api_version = json.loads(key).get( 'api_version' , '2024-02-01' )
self .client = AzureOpenAI(api_key=api_key, azure_endpoint=kwargs[ "base_url" ], api_version=api_version)
self .model_name = model_name5. BaiChuanEmbed : BaiChuan AI’s embedding model class BaiChuanEmbed ( OpenAIEmbed ):
def __init__ ( self, key, model_name= 'Baichuan-Text-Embedding' , base_url= 'https://api.baichuan-ai.com/v1' ):
if not base_url:
base_url = "https://api.baichuan-ai.com/v1"
super ().__init__(key, model_name, base_url)6. QWenEmbed : embedding model for Tongyi Qianwen class QWenEmbed ( Base ):
def __init__ ( self, key, model_name= "text_embedding_v2" , **kwargs ):
self .key = key
self .model_name = model_name7. ZhipuEmbed : Zhipu AI’s embedding model class ZhipuEmbed ( Base ):
def __init__ ( self, key, model_name= "embedding-2" , **kwargs ):
self .client = ZhipuAI(api_key=key)
self .model_name = model_name8. OllamaEmbed : Ollama’s embedding model class OllamaEmbed ( Base ):
def __init__ ( self, key, model_name, **kwargs ):
self .client = Client(host=kwargs[ "base_url" ]) if not key or key == "x" else \
Client(host=kwargs[ "base_url" ], headers={ "Authorization" : f"Bear {key} " })
self .model_name = model_name9. GoogleEmbed : Google’s embedding model class GoogleEmbed ( Base ):
def __init__ ( self, key, model_name= "embedding-001" , **kwargs ):
genai.configure(api_key=key)
self .model_name = model_name
1.3 Batch Processing Optimization
RAGFlow uses batch optimization in the embedding model implementation to improve processing efficiency:
def encode ( self, texts: list ):
batch_size = 16
texts = [truncate(t, 2048 ) for t in texts]
token_count = 0
for t in texts:
token_count += num_tokens_from_string(t)
ress = []
for i in range ( 0 , len (texts), batch_size):
ress.extend( self ._model.encode(texts[i:i + batch_size]).tolist())
return np.array(ress), token_count
This batch processing method can reduce the number of API calls and improve efficiency. Different models implement different batch processing strategies, for example:
• DefaultEmbedding: batch size is 16 • OpenAIEmbed: batch size is 16 and limit text length to 8191 • QWenEmbed: batch size is 4 and limit text length to 2048
1.4 Text length processing
RAGFlow handles text length restrictions for different models:
# In OpenAIEmbed
texts = [truncate(t, 8191 ) for t in texts]
In QWenEmbed
texts = [truncate(t, 2048 ) for t in texts]
# In ZhipuEmbed
if self .model_name.lower() == "embedding-2" :
MAX_LEN = 512
if self .model_name.lower() == "embedding-3" :
MAX_LEN = 3072
if MAX_LEN > 0 :
texts = [truncate(t, MAX_LEN) for t in texts]
This process ensures that the text does not exceed the maximum length limit of the model and avoids API call errors.
1.5 Default model selection
According to the official documentation and source code, the Docker image of RAGFlow (non-slim version) comes pre-installed with two optimized embedding models:
1. BAAI/bge-large-zh-v1.5 2. maidalun1020/bce-embedding-base_v1
These two models are optimized for Chinese and English, providing good multi-language support.DefaultEmbedding
In the class, if the model is not specified, BAAI/bge-large-zh-v1.5 is used by default.
2. Vector database selection and implementation
RAGFlow adopts a flexible vector database architecture and supports multiple vector databases through abstract interfaces.
2.1 Database Abstract Interface
RAGFlowrag/utils/doc_store_conn.py
The abstract interface of the vector database is defined in:
class DocStoreConnection ( ABC ):
"""
Database operations
"""
@abstractmethod
def dbType ( self ) -> str :
"""
Return the type of the database.
"""
raise NotImplementedError( "Not implemented" )
@abstractmethod
def health ( self ) -> dict :
"""
Return the health status of the database.
"""
raise NotImplementedError( "Not implemented" )
"""
Table operations
"""
@abstractmethod
def createIdx ( self, indexName: str , knowledgebaseId: str , vectorSize: int ):
"""
Create an index with given name
"""
raise NotImplementedError( "Not implemented" )
@abstractmethod
def deleteIdx ( self, indexName: str , knowledgebaseId: str ):
"""
Delete an index with given name
"""
raise NotImplementedError( "Not implemented" )
@abstractmethod
def indexExist ( self, indexName: str , knowledgebaseId: str ) -> bool :
"""
Check if an index with given name exists
"""
raise NotImplementedError( "Not implemented" )
"""
CRUD operations
"""
@abstractmethod
def search (
self, selectFields: list [ str ], highlightFields: list [ str ],
condition: dict , matchExprs: list [MatchExpr],
orderBy: OrderByExpr, offset: int , limit: int ,
indexNames: str | list [ str ], knowledgebaseIds: list [ str ],
aggFields: list [ str ] = [], rank_feature: dict | None = None
):
"""
Search with given conjunctive equivalent filtering condition and return all fields of matched documents
"""
raise NotImplementedError( "Not implemented" )
This abstract interface design allows RAGFlow to easily support different vector databases by simply implementing specific interface methods.
2.2 OpenSearch Implementation
RAGFlow uses OpenSearch as the vector database by default.rag/utils/opensearch_coon.py
accomplish:
@singleton
class OSConnection ( DocStoreConnection ):
def __init__ ( self ):
self .info = {}
logger.info( f"Use OpenSearch {settings.OS[ 'hosts' ]} as the doc engine." )
for _ in range (ATTEMPT_TIME):
try :
self .os = OpenSearch(
settings.OS[ "hosts" ].split( "," ),
http_auth=(settings.OS[ "username" ], settings.OS[ "password" ]) if "username" in settings.OS and "password" in settings.OS else None ,
verify_certs = False ,
timeout = 600
)
if self .os:
self .info = self .os.info()
break
except Exception as e:
logger.warning( f" { str (e)} . Waiting OpenSearch {settings.OS[ 'hosts' ]} to be healthy." )
time.sleep( 5 )
OpenSearch implements all necessary interface methods, including index creation, deletion, search, etc.
2.3 Index Design
RAGFlow creates a separate index for each knowledge base, using the naming conventionragflow_{uid}
Ensure index uniqueness:
def createIdx ( self, indexName: str , knowledgebaseId: str , vectorSize: int ):
if self .indexExist(indexName, knowledgebaseId):
return True
try :
from opensearchpy.client import IndicesClient
return IndicesClient( self .os).create(index=indexName, body= self .mapping)
except Exception:
logger.exception( "OSConnection.createIndex error %s" % (indexName))
The index structure is defined through the configuration fileconf/os_mapping.json
Definition ensures correct storage and retrieval of vector data.
2.4 Hybrid search strategy
RAGFlow implements a hybrid retrieval strategy that combines keyword search and vector similarity search:
def search (
self, selectFields: list [ str ], highlightFields: list [ str ],
condition: dict , matchExprs: list [MatchExpr],
orderBy: OrderByExpr, offset: int , limit: int ,
indexNames: str | list [ str ], knowledgebaseIds: list [ str ],
aggFields: list [ str ] = [], rank_feature: dict | None = None
):
use_knn = False
if isinstance (indexNames, str ):
indexNames = indexNames.split( "," )
assert isinstance (indexNames, list ) and len (indexNames) > 0
assert "_id" not in condition
bqry = Q( "bool" , must=[])
condition[ "kb_id" ] = knowledgebaseIds
# ... construct the query...
s = Search()
vector_similarity_weight = 0.5
for m in matchExprs:
if isinstance (m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
assert len (matchExprs) == 3 and isinstance (matchExprs[ 0 ], MatchTextExpr) and isinstance (matchExprs[ 1 ], MatchDenseExpr) and isinstance (matchExprs[ 2 ], FusionExpr)
weights = m.fusion_params[ "weights" ]
vector_similarity_weight = float (weights.split( "," )[ 1 ])
knn_query = {}
for m in matchExprs:
if isinstance (m, MatchTextExpr):
# Keyword search
minimum_should_match = m.extra_options.get( "minimum_should_match" , 0.0 )
if isinstance (minimum_should_match, float ):
minimum_should_match = str ( int (minimum_should_match * 100 )) + "%"
bqry.must.append(Q( "query_string" , fields=m.fields, type = "best_fields" , query=m.matching_text, minimum_should_match=minimum_should_match, boost= 1 ))
bqry.boost = 1.0 - vector_similarity_weight
elif isinstance (m, MatchDenseExpr):
# Vector similarity search
assert ( bqry is not None )
similarity = 0.0
if "similarity" in m.extra_options:
similarity = m.extra_options[ "similarity" ]
use_knn = True
vector_column_name = m.vector_column_name
knn_query[vector_column_name] = {}
knn_query[vector_column_name][ "vector" ] = list (m.embedding_data)
knn_query[vector_column_name][ "k" ] = m.topn
knn_query[vector_column_name][ "filter" ] = bqry.to_dict()
knn_query[vector_column_name][ "boost" ] = similarity
This hybrid retrieval strategy can improve retrieval quality by combining the precision of keyword search and the semantic understanding ability of vector similarity search.
2.5 Weight Configuration
RAGFlow supports configuring the weights of keyword search and vector similarity search:
vector_similarity_weight = 0.5
for m in matchExprs:
if isinstance (m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
assert len (matchExprs) == 3 and isinstance (matchExprs[ 0 ], MatchTextExpr) and isinstance (matchExprs[ 1 ], MatchDenseExpr) and isinstance (matchExprs[ 2 ], FusionExpr)
weights = m.fusion_params[ "weights" ]
vector_similarity_weight = float (weights.split( "," )[ 1 ])
By default, the vector similarity weight is 0.5 and the keyword search weight is 0.5. Users can adjust these weights as needed to optimize the search results.
2.6 Batch operation optimization
RAGFlow implements batch operations to improve write efficiency:
def insert ( self, rows: list [ dict ], indexName: str , knowledgebaseId: str = None ) -> list [ str ]:
"""
Update or insert a bulk of rows
"""
if len (rows) == 0 :
return []
actions = []
ids = []
for row in rows:
if "_id" in row:
_ id = row[ "_id" ]
del row[ "_id" ]
else :
_id = None
action = {
"_index" : indexName,
"_source" : row
}
if _ id :
action[ "_id" ] = _id
ids.append(_ id )
actions.append(action)
try :
from opensearchpy.helpers import bulk
success, failed = bulk( self .os, actions, stats_only= True )
return ids
except Exception:
logger.exception( "OSConnection.insert error" )
return []
This batch operation method can reduce the number of API calls and improve writing efficiency.
3. Configuration and Integration
3.1 Embedding Model Configuration
According to the official documentation, RAGFlow allows you to choose different embedding models for different knowledge bases:
An embedding model converts chunks into embeddings. It cannot be changed once the knowledge base has chunks. To switch to a different embedding model, you must delete all existing chunks in the knowledge base. The obvious reason is that we must ensure that files in a specific knowledge base are converted to embeddings using the same embedding model (ensure that they are compared in the same embedding space).
This design ensures that all documents in the same knowledge base use the same embedding model, ensuring the consistency of the vector space.
3.2 Vector database configuration
RAGFlow sets the vector database connection parameters through the configuration file:
self .os = OpenSearch(
settings.OS[ "hosts" ].split( "," ),
http_auth=(settings.OS[ "username" ], settings.OS[ "password" ]) if "username" in settings.OS and "password" in settings.OS else None ,
verify_certs = False ,
timeout = 600
)
These parameters can be set via environment variables or configuration files, allowing RAGFlow to flexibly connect to different OpenSearch instances.
3.3 Retrieval parameter configuration
RAGFlow supports configuring a variety of search parameters, such as similarity threshold, vector similarity weight, etc.:
RAGFlow uses multiple recall of both full-text search and vector search in its chats. Prior to setting up an AI chat, consider adjusting the following parameters to ensure that the intended information always turns up in answers:
* Similarity threshold: Chunks with similarities below the threshold will be filtered. By default, it is set to 0.2.
* Vector similarity weight: The percentage by which vector similarity contributes to the overall score. By default, it is set to 0.3.
These parameters can be adjusted according to specific needs to optimize the search results.
4. Conclusion
RAGFlow has the following features in terms of Embeddings model selection and configuration, as well as vector database selection and implementation:
1. Multi-model support : Supports multiple embedding models, including OpenAI, Zhipu AI, Tongyi Qianwen, etc., as well as locally deployed models. 2. Batch processing optimization : Batch encoding is implemented to improve efficiency, and long text truncation is automatically handled. 3. Consistency guarantee : Ensure that all documents in the same knowledge base use the same embedding model to ensure the consistency of the vector space. 4. Flexible configuration : allows you to select different embedding models for different knowledge bases, and supports adjusting model parameters through configuration files. 5. OpenSearch is used by default : As the default vector database, it provides high-performance retrieval capabilities and good scalability. 6. Hybrid retrieval strategy : Combine keyword search and vector similarity search to improve retrieval quality through weighted fusion. 7. Batch operation optimization : Batch operations are implemented to improve writing efficiency and reduce the number of API calls.
These features enable RAGFlow to flexibly adapt to different application scenarios and provide high-quality retrieval results.