Ragflow architecture analysis and performance optimization methods

Deepen your understanding of the RagFlow architecture and improve data processing and retrieval performance.
Core content:
1. Analysis of the functions and core processes of the RagFlow data processing module
2. Introduction to key classes and methods of DocumentService
3. Performance optimization strategies and secondary development suggestions
There are many easy-to-use rag systems, and ragflow is the most representative one. If you use ragflow in your own production system, what are the basic points you need to understand? I took advantage of the weekend to open a content I wrote a long time ago and share my understanding with you. Because ragflow is relatively large, I modified the part of the design and implementation logic of the parsing data processing, Retriever and Embedding modules that I wrote before . Interested friends can optimize performance or open it again based on this.
Data processing module
Module functions and processes: The data processing module of ragflow is responsible for loading and preprocessing raw data (documents, images, audio, etc.) into text "slices" suitable for retrieval. Its core processes include: data loading (reading file contents), parsing and segmentation (extracting text and related information according to file type, and dividing into multiple segments according to strategy), vectorized embedding (generating semantic vectors for each segment), and persistent storage (writing segments and vectors into vector indexes or databases). This process is mainly composed ofDocumentService
The service classes cooperate with the parsers of various types of files to complete it.
Key classes and methods:DocumentService
(api/db/services/document_service.py
) is the core scheduler for data processing. It uses the parser factory to select the corresponding parser according to the document type and processes each uploaded document in parallel.DocumentService
Maintain a parser mapping table (such asFACTORY
), the fileparser_id
Mapped to specific parser modules, such as Word/PDF uses general text parsing, PPT usespresentation
Analysis, image usagepicture
Analysis, audio usageaudio
Analysis, etc.
FACTORY = { ParserType.PRESENTATION.value: presentation, ParserType.PICTURE.value: picture, ParserType.AUDIO.value: audio, ParserType.EMAIL.value: email } parser_config = {"chunk_token_num": 4096, "delimiter": "\n!?;.;!?", "layout_recognize": "Plain Text"} exe = ThreadPoolExecutor(max_workers=12) threads = []...for d, blob in files: kwargs = { "callback": dummy, "parser_config": parser_config, "from_page": 0, "to_page": 100000, "tenant_id": kb.tenant_id, "lang": kb.language } threads.append(exe.submit(FACTORY.get(d["parser_id"], naive).chunk, d["name"], blob, **kwargs))
Each parser implements a unifiedchunk
The interface is responsible for parsing the input file into several text blocks with metadata. For example, the PPT parser extracts the text and thumbnails of each slide, and the PDF parser splits the text according to the page number and extracts the scanned images in combination with OCR.
if re.search( r"\.pptx?$" , filename, re.IGNORECASE):
ppt_parser = Ppt()
for pn, (txt, img) in enumerate (ppt_parser(
filename if not binary else binary, from_page, 1000000 , callback)):
d = copy.deepcopy(doc)
pn += from_page
d[ "image" ] = img
d[ "page_num_int" ] = [pn + 1 ]
d[ "top_int" ] = [ 0 ]
d[ "position_int" ] = [(pn + 1 , 0 , img.size[ 0 ], 0 , img.size[ 1 ])]
tokenize(d, txt, eng)
res.append(d)
return res
elif re.search( r"\.pdf$" , filename, re.IGNORECASE):
pdf_parser = Pdf()
if kwargs.get( "layout_recognize" , "DeepDOC" ) == "Plain Text" :
pdf_parser = PlainParser()
for pn, (txt, img) in enumerate (pdf_parser(filename, binary,
from_page=from_page, to_page=to_page, callback=callback)):
d = copy.deepcopy(doc)
pn += from_page
if img:
d[ "image" ] = img
d[ "page_num_int" ] = [pn + 1 ]
d[ "top_int" ] = [ 0 ]
d[ "position_int" ] = [(pn + 1 , 0 , img.size[ 0 ] if img else 0 , 0 , img.size[ 1 ] if img else 0 )]
tokenize(d, txt, eng)
res.append(d)
return res
In the processing flow,DocumentService
A thread pool (up to 12 threads by default) is started to parse multiple documents concurrently. Each thread calls the corresponding parserchunk
The method splits the document into a list of fragments and returns a dictionary structure containing the text content (and possible weights, images, etc.). For example, for a text document, the parser will split it into paragraphs or pages and callrag_tokenizer.tokenize
Perform fine-grained Chinese word segmentation and tagging. The parsing results also include metadata such as document ID and knowledge base ID for subsequent indexing. For parsed images, such as PPT page screenshots or PDF scanned images, the system will store their binary data inSTORAGE_IMPL
The specified storage (such as local or cloud storage, the default is MINIO), and only keep references in the fragment data (such asimg_id
) to reduce the burden on the vector library.
for (docinfo, _), th in zip(files, threads):
docs = []
doc = {
"doc_id": docinfo[ "id" ],
"kb_id" : [kb.id]
}
for ck in th .result ():
d = deepcopy (doc)
d. update (ck)
d[ "id" ] = xxhash. xxh64 ((ck[ "content_with_weight" ] + str (d[ "doc_id" ])). encode ( "utf-8" )). hexdigest ()
d[ "create_time" ] = str (datetime. now ()). replace ( "T" , " " )[: 19 ]
d[ "create_timestamp_flt" ] = datetime. now (). timestamp ()
if not d.get ( " image" ) :
docs.append (d )
continue
output_buffer = BytesIO ()
if isinstance (d[ "image" ], bytes):
output_buffer = BytesIO (d[ "image" ])
else:
d[ "image" ]. save (output_buffer, format= 'JPEG' )
STORAGE_IMPL. put (kb.id, d[ "id" ], output_buffer. getvalue ())
d[ "img_id" ] = "{}-{}" . format (kb.id, d[ "id" ])
d. pop ( "image" , None)
docs.append (d )
class StorageFactory :
storage_mapping = {
Storage . MINIO : RAGFlowMinio ,
Storage .AZURE_SPN : RAGFlowAzureSpnBlob ,
Storage . AZURE_SAS : RAGFlowAzureSasBlob ,
Storage . AWS_S3 : RAGFlowS3 ,
Storage . OSS : RAGFlow OSS,
}
@classmethod
def create ( cls, storage: Storage ):
return cls.storage_mapping[storage]()
STORAGE_IMPL_TYPE = os.getenv( 'STORAGE_IMPL' , 'MINIO' )
STORAGE_IMPL = StorageFactory .create( Storage [ STORAGE_IMPL_TYPE ])
Potential bottlenecks: Although the current data processing process parses multiple documents in parallel through a thread pool, there are still performance bottlenecks and room for improvement:
High processing overhead for large files: For very large files (such as PDFs with thousands of pages), a single parsing thread may be occupied for a long time, and a large amount of fragment data will be generated and stored in memory at one time. The code will wait for the parsing to be completed before batch processing, embedding and storage, which may lead to a high peak memory usage. Optimization suggestions: Introduce streaming or segmented loading mechanisms to parse and process documents in batches. For example, parse very long documents by chapters and pages, and embed and write them immediately after a certain number of fragments are generated to reduce the memory usage of a single batch. You can also consider summarizing the text or filtering irrelevant information during the parsing stage to reduce the amount of subsequent processing.
Lack of parsing cache and incremental updates: The entire text is re-parsed every time a document is added or modified.
DocumentService
Currently, updates are implemented by deleting the original fragments before storage, and there is no fine-grained incremental update. Optimization suggestions: For unchanged document content, you can skip repeated parsing and embedding. You can maintain the hash or fingerprint of the file content, and reuse the existing vector if the document is uploaded again and the content has not changed. For partially updated documents, consider comparing the differences between the new and old content, and only parse the changed parts and update the corresponding fragments.
@classmethod@DB.connection_context()def remove_document(cls, doc, tenant_id): cls.clear_chunk_num(doc.id) try: settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id) graph_source = settings.docStoreConn.getFields( settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), [doc.kb_id]), ["source_id"] ) if len(graph_source) > 0 and doc.id in list(graph_source.values())[0]["source_id"]: settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id}, {"remove": {"source_id": doc.id}}, search.index_name(tenant_id), doc.kb_id) settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, {"removed_kwd": "Y"}, search.index_name(tenant_id), doc.kb_id) settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}}, search.index_name(tenant_id), doc.kb_id) except Exception: pass return cls.delete_by_id(doc.id)
Parallelism and asynchronous processing: The code uses a thread pool for parallel parsing, but the embedding calculation is still performed synchronously on the main thread: after the thread pool collects all document fragments, it calls the embedding model encoding for each document in turn. If the parsing speed is much faster than the embedding, the CPU may be idle waiting for the GPU or external API to respond. Optimization suggestions: Introduce asynchronous task queues or multi-threaded/multi-process concurrent embedding . For example, you can asynchronously submit the embedding task immediately after the parsing thread completes the slicing of a document, instead of waiting for all parsing to end; or use multiple GPUs/processes to parallelize the vector generation of different documents, thereby improving the overall throughput. You can also consider asynchronous I/O for batch embedding, such as using
asyncio
Wholesale calls to OpenAI and other service interfaces to accelerate remote embedded API calls.
def embedding(doc_id, cnts, batch_size=16): nonlocal embd_mdl, chunk_counts, token_counts vects = [] for i in range(0, len(cnts), batch_size): vts, c = embd_mdl.encode(cnts[i: i + batch_size]) vects.extend(vts.tolist()) chunk_counts[doc_id] += len(cnts[i:i + batch_size]) token_counts[doc_id] += c return vects
Parser performance and customization: ragflow supports a wide range of formats, but parsing depends on third-party libraries (such as Aspose for PPT, PyPDF for PDF, and OCR libraries for images), which may become a bottleneck. For example, OCR of large PDFs takes a long time. Optimization suggestions: Introduce more efficient parsing tools or parallel OCR solutions for common formats; provide parsing parameter tuning (such as whether to extract images, OCR accuracy, etc.) to balance performance and accuracy. For enterprise scenarios, users can be allowed to convert some data formats in advance to reduce the workload of online parsing. High-precision scenarios can provide a variety of methods and models and other configuration options, such as minerU.
After parsing and embedding,DocumentService
passdocStoreConn
The interface persists fragment data to vector storage. It first checks or creates the index structure, and then inserts fragments in batches (64 records per batch by default). Each fragment contains text, metadata, and generated vectors (field names are in the form ofq_<dim>_vec
).at the same time,DocumentService
The document records in the relational database (such as the total number of slices, total number of tokens, etc.) will be updated for monitoring. The entire data storage process ensures the sequential execution of parsing -> embedding -> storage , and rolls back or records the exception when an error occurs.
for i , d in enumerate(cks):
v = vects[i]
d[ "q_%d_vec" % len (v)] = v
for b in range ( 0 , len (cks), es_bulk_size= 64 ):
if try_create_idx:
if not settings.docStoreConn.indexExist ( idxnm, kb_id):
settings.docStoreConn.createIdx (idxnm, kb_id, len ( vects[ 0 ]))
try_create_idx = False
settings.docStoreConn. insert (cks[b:b + es_bulk_size], idxnm, kb_id)
Retriever Module
Structural design: The retrieval module of ragflow adopts an interface-oriented design, shielding the differences in the implementation of different vector libraries. The core abstraction isDocStoreConnection
(rag/utils/doc_store_conn.py
), and thesearch.Dealer
Retriever (located inrag/nlp/search.py
). The system is initialized according to the configured backend vector engine type (DOC_ENGINE
) Create the correspondingDocStoreConnection
Instance and assign it to the globalsettings.docStoreConn
.
global DOC_ENGINE, docStoreConn, retrievaler, kg_retrievaler
DOC_ENGINE = os.environ.get( "DOC_ENGINE" , "elasticsearch" )
lower_case_doc_engine = DOC_ENGINE.lower()
if lower_case_doc_engine == "elasticsearch" :
docStoreConn = rag.utils.es_conn.ESConnection()
elif lower_case_doc_engine == "infinity" :
docStoreConn = rag.utils.infinity_conn.InfinityConnection()
elif lower_case_doc_engine == "opensearch" :
docStoreConn = rag.utils.opensearch_coon.OSConnection()
else :
raise Exception( f"Not supported doc engine: {DOC_ENGINE} " )
retrievaler = search.Dealer(docStoreConn)
kg_retrievaler = kg_search.KGSearch(docStoreConn)
Supported engines include: Elasticsearch (using its built-in dense vector index), OpenSearch (vector support for traditional search engines), and the "Infinity" engine developed by the InfiniFlow team.rag/utils
There are corresponding implementation classes in the directory (such asESConnection
,OSConnection
,InfinityConnection
), they all provide unified methods, such as:createIdx
Create indexes,insert
Insert document,search
Perform a search,getTotal
Get the total number of results, etc. In the code, you can see that different engines are selected according to the configuration, otherwise an exception "Not supported doc engine" is thrown. This design makes it less likely that changing the underlying vector library (such as changing from Elasticsearch to Milvus or FAISS) will have a significant impact on the upper-level retrieval process. You only need to implement a newDocStoreConnection
Subclass and configure to enable it.
Add new vector library types: Although there are also vector libraries such as FAISS, Weaviate, and Milvus, the current source code mainly directly supports the above three backends. The ragflow architecture has expansion capabilities: developers can refer to the existingDocStoreConnection
The interface encapsulates new backends (for example, using FAISS to build indexes in local memory, or sending queries through the Python client of Weaviate/Milvus). The introduction of these vector libraries is expected to provide advantages in pure vector retrieval performance and distributed scalability.
Retrieval strategy: The Retriever module of ragflow adopts a multi-way recall fusion strategy, that is, it performs dense vector retrieval and sparse keyword retrieval at the same time, and fuses the results for re-ranking.search.Dealer.search
In the method, this logic is clearly visible: First, the user query is processed to generate semantic vector query and text keyword query . The semantic vector query encodes the user question into a vector (usingemb_mdl.encode_queries
Get query vector);
def get_vector ( self, txt, emb_mdl, topk= 10 , similarity= 0.1 ):
qv, _ = emb_mdl.encode_queries(txt)
shape = np.array(qv).shape
if len (shape) > 1 :
raise Exception(
f"Dealer.get_vector returned array's shape {shape} doesn't match expectation(exact one dimension)." )
embedding_data = [get_float(v) for v in qv]
vector_column_name = f"q_ { len (embedding_data)} _vec"
return MatchDenseExpr(vector_column_name, embedding_data, 'float' , 'cosine' , topk, { "similarity" : similarity})
Keyword search usesFulltextQueryer
Convert the question into a search formula (extract keywords and set the matching degree). Then, Dealer constructs a fusion query to match the dense vector (MatchDenseExpr
) and sparse text matching (MatchTextExpr
) combination (weighted fusion is achieved byFusionExpr
Finally, throughdocStoreConn.search
The interface is submitted to the underlying engine for execution. For example, when the underlying engine is ElasticSearch, it will also be executed in the indexmatch
Text query and vector nearest neighbor query, and then merge the scores according to the preset similarity weight. When the initial search results are empty or insufficient, the system lowers the text matching threshold (min_match
) and initiate the combination of keyword matching again (matchText
) + vector matching (matchDense
) + fusion expression (fusionExpr
) to maximize the recall of relevant fragments.
matchText, keywords = self.qryr.question(qst, min_match=0.3)if emb_mdl is None: matchExprs = [matchText] res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature) total = self.dataStore.getTotal(res) logging.debug("Dealer.search TOTAL: {}".format(total))else: matchDense = self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1)) q_vec = matchDense.embedding_data src.append(f"q_{len(q_vec)}_vec") fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"}) matchExprs = [matchText, matchDense, fusionExpr] res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature) total = self.dataStore.getTotal(res) logging.debug("Dealer.search TOTAL: {}".format(total)) # If result is empty, try again with lower min_match if total == 0: if filters.get("doc_id"): res = self.dataStore.search(src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids) total = self.dataStore.getTotal(res) else: matchText, _ = self.qryr.question(qst, min_match=0.1) filters.pop("doc_id", None) matchDense.extra_options["similarity"] = 0.17 res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr], orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature) total = self.dataStore.getTotal(res) logging.debug("Dealer.search 2 TOTAL: {}".format(total))
In addition to dense/sparse dual retrieval, ragflow also supports multimodal scenarios such as knowledge graph retrieval (KGSearch).KGSearch(docStoreConn)
This indicates that the knowledge graph retriever is used for structured knowledge. However, in the case of text QA, the current situation is mainly about the complementarity of dense and sparse information. The advantages of this design are: higher recall rate and interpretable results . Dense vectors ensure semantic relevance but may introduce fuzzy matching, while sparse keywords ensure accurate matching but may miss semantic paraphrases. Through fusion, the advantages of both can be combined. At the same time, when returning results, Dealer also provides information such as highlights and aggregation for result presentation.
class SearchResult: total: int ids: list[str] query_vector: list[float] | None = None field: dict | None = None highlight: dict | None = None aggregation: list | dict | None = None keywords: list[str] | None = None group_docs: list[list] | None = None
After the search results are returned, ragflow can also perform a rerank operation.RERANK_MDL
Configuration andrag/llm/rerank_model.py
The module is used to load cross-encoder reordering models such as BCE and BGE. In the question-answering process, when the Retriever retrieves N candidate segments, it callssimilarity(query, texts)
The fragments and the original question are scored and re-ranked to improve relevance. This two-stage retrieval is called "fusion re-ranking" in the document and can effectively improve the accuracy of the final answer basis.
Potential issues:
Unstable recall rate: Dense retrieval depends on the quality of the vector. If the embedding model does not cover the domain knowledge, relevant content may be missed; sparse retrieval is limited by text keyword matching. The fusion effect is generally better than the single path, but the parameters still need to be tuned (such as the relative weight of vector and text matching,
min_match
Thresholds, etc.). Currently in the codesimilarity
The lower limit and other parameters are hard-coded (for example,matchDense
The threshold is set to 0.17). These may need to be adjusted based on actual data. Improvement suggestions: Introduce a training-based data-driven method to optimize the fusion strategy, such as learning a small model to determine whether the vector matching result is reliable, so as to dynamically adjust the dense vs. sparse weight. In addition, the judgment of query intent can be increased: when the user's question contains specific entities/keywords, strengthen the BM25 weight; when the question is vague, rely more on vector recall.Slow retrieval speed: When using ElasticSearch/OpenSearch for large-scale vector retrieval, the performance may not be as good as that of specialized vector databases (such as Milvus and FAISS) when the vector dimension is high and the data volume is large. The Infinity engine uses HNSW to improve the speed of dense retrieval, but if the knowledge base is in the tens of millions, the memory and query latency of a single HNSW machine will increase. Optimization plan: Introduce higher-performance vector indexes : for example, incorporate Faiss into the backend and accelerate approximate nearest neighbor search through GPU; or integrate Milvus and other distributed sharding support to horizontally expand queries under large data volumes. In addition, the existing HNSW parameters can be tuned (such as M and ef values) or compression can be enabled to reduce memory usage. For sparse retrieval, keyword filtering or index simplification (such as removing stop words) can be performed on the text in advance to improve matching efficiency.
Insufficient multi-GPU/distributed support: The current retrieval process is mainly completed on a single instance, and multi-GPU parallelism is not fully utilized. If vector libraries such as Infinity are deployed on multiple nodes, it is necessary to coordinate the merging of retrieval results from multiple servers, which is not yet reflected in this part of the framework. In addition, when calling Faiss locally, you can use multiple GPUs for IndexShard or data parallel search, but this is not covered by the current architecture. Suggestions for improvement: Provide support for distributed retrieval mode, such as calling a remote vector engine cluster through gRPC; perform sharded parallel retrieval on large indexes at the application layer, and then fuse the results. In addition, as mentioned above, allowing the embedding stage to use multiple GPUs to generate query vectors can also slightly reduce the pressure on a single card. It is worth noting that multiple GPUs are more often used in the embedding stage, and the retrieval stage is more of a CPU/memory bottleneck, so it is more important to improve retrieval concurrency and index optimization.
The Retriever module supports different vector storage through interface encapsulation , uses dense + sparse fusion to improve recall and accuracy, and uses an optional re-ranking model to further optimize the result sorting. To make the retrieval module more robust, we can: expand support for richer backends (Faiss/Milvus, etc.) to improve performance and deployment flexibility; introduce agent-based retrieval tuning (such as dynamically adjusting queries or multi-hop retrieval); and strengthen parallel and distributed capabilities to ensure that results can be returned quickly and stably even in large-scale knowledge bases.
Embedding Module
Supported embedding models: The embedding module of ragflow is designed to be very flexible and integrates embedding models from multiple sources, including local and cloud models. For example:
Local vector model: By default, the open source general text embedding model is used.
DefaultEmbedding
The model weights with the specified name will be loaded through the FlagEmbedding library (defaultBAAI/bge-large-zh-v1.5
), and cache it locally.FastEmbed
Implementation, using lightweight models such asbge-small-en-v1.5
) and optimized inference library to speed up embedding calculations. Another local model isYoudaoEmbed
, integrating the Chinese and English bilingual Embedding models of NetEase Youdao. If these local models are not cached during initialization, the weights will be automatically downloaded from HuggingFace Hub.OpenAI and other cloud embedding services: module provision
OpenAIEmbed
Class encapsulating OpenAI's text embedding API (default model text-embedding-ada-002).AzureEmbed
Supports Azure OpenAI services. Domestic cloud services also include:BaiChuanEmbed
Connect to Baichuan Intelligence's Embedding API.QWenEmbed
Get its Embedding model (such as Tongyi Qianwen embedding) through the DashScope interface provided by Alibaba Damo Academy.ZhipuEmbed
Connect to the Embedding interface of Zhipu AI.JinaEmbed
Used to call the vector service interface provided by Jina AI. These cloud service classes usually encapsulate HTTP request calls and parse the returned embedding vectors. For example, in JinaEmbed,requests.post
Send a JSON request and get the result.Locally deployed Embedding service: The module also considers large model services hosted by users, such as
LocalAIEmbed
Used to connect to the local open source Embedding API (OpenAI compatible interface),OllamaEmbed
Used to call the local Ollama engine to generate embedding (supports some local LLMs),XinferenceEmbed
Connect to XInference service, etc. These classes usually reuse OpenAI interface logic butbase_url
Point to the local server address, so as to use local computing power to generate vectors.
The Embedding module includes mainstream embedding solutions through different subcategories: it supports both offline reasoning of open source models and online services of major manufacturers .EMBEDDING_MDL
, select the default embedding model. For example, the default embedding model list includesBGE-large
andYoudao BCE
. Depending on the deployment mode (LIGHTEN
Configuration) can decide whether to load large model weights or call lightweight services.
BUILTIN_EMBEDDING_MODELS = ["BAAI/bge-large-zh-v1.5@BAAI", "maidalun1020/bce-embedding-base_v1@Youdao"]
Token limit and batch processing strategy: Since different models have restrictions on the length of input text, the code truncates long text to avoid exceeding the number of tokens supported by the model. For example, OpenAI's ada-002 model supports a maximum of 8191 tokens.OpenAIEmbed.encode
Each paragraph of text is truncated to 8191 in length; the Zhipu embedding-3 model has an upper limit of 3072 tokens, and the code truncates the input accordingly. The local FlagEmbedding model also cuts the text to around 2048 characters to control the amount of calculation. This truncation ensures that any long paragraph can be safely encoded, but some information may also be lost . In terms of optimization, in the future, it may be considered to embed very long text segments (such as further segmenting and averaging long paragraphs) or introduce a long context embedding model.
class ZhipuEmbed ( Base ):
def __init__ ( self, key, model_name= "embedding-2" , **kwargs ):
self.client = ZhipuAI(api_key=key)
self.model_name = model_name
def encode ( self, texts: list ):
arr = []
tks_num = 0
MAX_LEN = - 1
if self.model_name.lower() == "embedding-2" :
MAX_LEN = 512
if self.model_name.lower() == "embedding-3" :
MAX_LEN = 3072
if MAX_LEN > 0 :
texts = [truncate(t, MAX_LEN) for t in texts]
for txt in texts:
res = self.client.embeddings.create( input =txt,
model = self.model_name)
arr.append(res.data[ 0 ].embedding)
tks_num += self.total_token_count(res)
return np.array(arr), tks_num
def encode_queries ( self, text ):
res = self.client.embeddings.create( input =text,
model = self.model_name)
return np.array(res.data[ 0 ].embedding), self.total_token_count(res)
In terms of batch processing, most implementations split requests into batches of size 16 to balance efficiency and API limitations. For example, OpenAI requests a maximum of 16 inputs each time, and explicitly sets batch_size=16 in the code; local models also usually process 16 inputs in a batch to fully utilize matrix parallelism. There are also special cases, such as Qwen's DashScope interface, which has weak batch capabilities and only uses batch_size=4. Batch processing logic often uses a simple for loop to accumulate results. For example, for a set of text lists, the underlying model is called in groups of 16 to obtain the embedding, and the results are expanded into a numpy array and returned. After each batch call, some implementations such as OpenAI will accumulate the number of processed tokens for subsequent recording. In addition, some will provideencode_queries
Specially handle single query situations, some will directly callencode([text])
Simplify the implementation. In general, the current batch processing strategy is relatively conservative, without dynamically adjusting the batch size or issuing multiple batch requests in parallel.
class LocalAIEmbed ( Base ):
def __init__ ( self , key, model_name, base_url ):
if not base_url:
raise ValueError ( "Local embedding model url cannot be None" )
if base_url.split( "/" )[- 1 ] != "v1" :
base_url = os.path.join(base_url, "v1" )
self .client = Open AI(api_key= "empty" , base_url=base_url)
self .model_name = model_name.split( "___" )[ 0 ]
def encode ( self , texts: list ):
batch_size = 16
ress = []
for i in range( 0 , len(texts), batch_size):
res = self .client.embeddings.create(input=texts[ i: i + batch_size], model= self .model_name)
ress.extend ([d.embedding for d in res.data ])
# local embedding for LmStudio donot count tokens
return np.array(ress), 1024
def encode_queries ( self , text ):
embds, cnt = self .encode([text])
return np.array(embds[ 0 ]), cnt
Multithreading/concurrency implementation: In the Embedding module, each modelencode
The implementation is mostly synchronous and does not explicitly use multithreading. However, ragflow uses the upper-level process to process embeddings of different documents concurrently to improve throughput:DocumentService
The chunks of different documents are processed in parallel, and the vector generation of multiple documents can be performed simultaneously with the help of thread pool (limited by GPU computing power). However, for a single large document, all its chunk vectors are still calculated sequentially in a single thread. Currently, there is no mechanism to use multiple GPU cards to calculate the same batch of embeddings in parallel.DefaultEmbedding
When the class is initialized, the environment variableCUDA_VISIBLE_DEVICES
Forced to0
, lock the use of the first GPU , which may cause idleness on multi-GPU machines. Improvement suggestion: Consider introducing multi-threaded/asynchronous coding methods. For example, for those who need to call external APIs (OpenAI, etc.), you can useasyncio.gather
Multiple request batches are issued at the same time to fully utilize the network bandwidth; for local models, such as PyTorch's embedding model, multi-threaded batch reasoning or OpenMP parallelism can be used (provided that the model calculation bottleneck is in the CPU). In addition, users can configure which GPU devices to use, or detect multiple GPUs during initialization and split the model and data to achieve multi-card parallel embedding (for example, the FlagEmbedding model is instantiated in multiple copies, each occupying one card, to distribute the batches). These measures will significantly improve the embedding throughput when large-scale documents are batched into the repository.
Efficiency and maintainability evaluation: The Embedding module code of ragflow is long but the structure is clear: use abstract base classBase
Define the interface, and then create a subclass implementation for each service provider/model. This approach requires modifying the code and adding classes when adding support for new models, but it also provides a high degree of control. At present, in terms of maintainability , each subclass is basically independent, and it is very convenient to add support for other platforms (such as TogetherAI, PerfX, etc., which already have some prototypes in the code). If the number of subsequent model types continues to increase, it would be more flexible to consider using a plug-in mechanism or configuration driver . For example, the available embedding backends and parameters can be listed through a configuration file, and the corresponding implementation can be reflected and loaded at runtime to avoid the continuous expansion of the main code.
In the context of large-scale document embedding , the current structure may expose some efficiency bottlenecks: for example, all document embeddings share a single thread for sequential execution, making it difficult to fully utilize multiple cores; the local large model (BGE large) occupies a large amount of video memory and has a relatively slow inference speed, and the GPU becomes a bottleneck during batch processing. If you want to improve throughput, you can consider the following suggestions:
Batch parallelism : For embedding tens of thousands of texts, you can divide them into multiple processes/threads for separate processing. For example, start multiple EmbeddingWorker processes, each of which loads a model to process a portion of the data, and then merges the results. This is particularly effective in multi-core CPU or multi-GPU scenarios. You can also use vectorization tools (such as NumPy parallelism and PyTorch DataLoader) to speed up the CPU calculation part.
Model compression and acceleration : To address the performance issues of large model embedding, distillation and small models (such as using the small version of BGE or other lightweight models) can be introduced when high performance requirements are required; or OpenVINO, TensorRT, etc. can be used to accelerate the reasoning of the embedding model.
FastEmbed
The class is an example of using an optimization library, which callsfastembed.TextEmbedding
Internally, C++ may be used to improve the calculation speed. Further optimization can consider quantizing the weights of the embedding model, using FP16 or even INT8 in exchange for higher speed and lower memory usage.Pipeline parallelism : Decouple the three steps of document parsing -> embedding -> storage into a pipeline, eliminating the need for serial waiting. For example, after a document is parsed to obtain a partial chunk, it is immediately handed over to the embedding thread for processing, while the parsing thread continues to process the next part, so that the parsing and embedding stages are executed in an overlapping manner, improving hardware utilization. Task scheduling and buffer queues need to be implemented in the architecture to prevent a single step from being too slow and dragging down the entire process.
The Embedding module of ragflow is flexible and easy to expand . In subsequent expansion, configuration and plug-in can be introduced to reduce the code changes of new models, and through parallel optimization and model acceleration solutions, the large-scale document embedding process can be made more efficient. With the emergence of new embedding models in the open source community (such as multimodal embedding, encoding images and audio into the same vector space), ragflow can also be seamlessly connected through a similar architecture, thereby enhancing the system's ability in multimodal retrieval.
Overview of module dependencies: The data processing module is responsible for converting raw data into a structure that can be used by the vector library. It relies on the internal parser (such as DeepDoc and other libraries) to extract text and calls the Embedding module to complete vector generation. The Embedding module connects to external models or services to convert text into semantic vectors. The generated text fragments and vectors are persisted to the search engine through the DocStore interface. When a search query occurs, the Retriever module calls the Embedding module to convert the user query into a vector, and retrieves the relevant fragments through the DocStore interface in combination with keyword matching. The results returned by the Retriever can be sent to the re-ranking model (a second call to the Embedding or an independent Rerank model) to optimize the order, and finally the results are provided to the upper-level question-answering process. Therefore, the closed loop of data processing-> vector storage <- search query ensures that the knowledge base content matches the user question in the same vector space.
Each module is clearly decoupled: the parsing and embedding modules are connected by configuration and interface, and the retrieval module adapts to different storages through a unified interface, achieving good modularity and scalability. In the future, secondary development and performance optimization on this architecture only require local improvements to the bottleneck module without affecting the overall process. As described above, the various improvement plans will help ragflow process massive heterogeneous data more efficiently and serve large-scale question-and-answer applications.