Reference: pipeline_usage.rs
Context: Quipu is a high-performance semantic indexing system designed for extremely parallel/concurrent file processing and queries that support both metadata and vector search operations.
First things first, I need to address the goal and why we need this to be parallel in the first place. The simple answer is that we want to maximize "throughput".
We want to process as many files as possible, as quickly as possible. This means keeping the CPU busy with useful work at all times and not letting it sit idle waiting for slow operations like reading from a disk. (though we don't want to overload the CPU with tasks, but this helps us find the optimal ratios between # of workers for each stage).
The context of this project is to process embeddings for file contents and store them into a vector database (e.g. pdf, txt, docx content), and properly chunk them as efficiently as possible.
What are we actually doing?
The pipeline I thought of has four main stages:
- Files I/O: This is a classic I/O-bound task, tokio has a builtin
tokio::fs
wrapper for it. - Preprocessing: In actual production, this would be processing PDF and DOCX files into text. But for the purpose of the simple pipeline architecture, the program will only normalize line endings and trimming whitespace. It's technically CPU work, but it's so trivial it's negligible. We can treat it as part of the I/O-bound pipeline. However, if this stage involved more heavy CPU work (e.g., converting a PDF/DOCX file to text, FFmpeg conversion, etc), it would become a CPU-bound task.
- Chunking: This involves a sentence embedding model's tokenizer. Tokenizing text, especially long text, is a series of complex lookups and computations. This is a CPU-bound task.
- Embedding: This is the heaviest stage. It runs the embeddings model to turn text chunks into vectors. This is a very heavy CPU-bound task.
Conclusion: We have two distinct "types" of work (CPU-bound and I/O bound, basically categorized by computational complexity). Mixing them carelessly is the source of all performance problems.
Choose the Right Tool for Each Job
Tokio
gives us two primary tools for running tasks:
tokio::spawn(async { ... })
: For async tasks. This runs the task on a shared pool of worker threads (the "core" runtime). Anasync
task is special because it canyield
control back to the scheduler whenever it hits an.await
point (e.g., waiting for a file read or a channel message). This allows a single thread to juggle thousands of waiting tasks, making it incredibly efficient for I/O-bound work.
Side note: tokio::spawn
is an alias of tokio::task::spawn
.
tokio::task::spawn_blocking(|| { ... })
: For blocking tasks. This moves the task to a separate, dedicated thread pool designed for blocking, CPU-intensive work. This is crucial because a task here can run for a long time, hogging 100% of a CPU core, without stalling the main async runtime. The async runtime can continue juggling its thousands of I/O (or small computation) tasks while the blocking threads crunch numbers.
TLDR;
The general rule of thumb: a function should be async
if it primarily waits (e.g., for I/O, network responses, channel messages), and should use tokio::spawn
. A function should be run via spawn_blocking
if it primarily works (performs heavy computation) and has no .await
points. Any pure CPU task taking more than ~100µs is a good candidate.
So in the example pipeline, we designed it like this:
file_reader_worker
->async fn
, run withtokio::spawn
preprocessing_worker
->async fn
, run withtokio::spawn
(for now)chunking_worker
-> regularfn
, run withtokio::task::spawn_blocking
embedding_worker
-> regularfn
, run withtokio::task::spawn_blocking
How do workers talk to each other?
We use tokio::sync::mpsc::channel
to create asynchronous, multi-producer, single-consumer channels. These are basically conveyor belts between our pipeline stages.
We have a sender (tx
) can be cloned and given to many producer tasks.
and a receiver (rx
) can only be used by one consumer task at a time. (but this is fine, and I'll explain more later)
When multiple consumers need to pull tasks from a single shared channel receiver (e.g., multiple file readers taking paths from input_paths_rx
), the Arc<Mutex<Receiver>>
pattern is used.
Arc<...>
allows multiple worker tasks to safely share ownership of the receiver.tokio::sync::Mutex
is a lock that ensures only one task can access the data inside it at a time. Thetokio
version isasync
-aware..lock().await
is the key part for async workers. When an async worker calls this, it tries to acquire the lock. If the lock is held by another worker, this task does not block its thread. Instead, it yields control to thetokio
scheduler and registers itself to be woken up when the lock is released. The scheduler then runs other ready tasks. This ensures that workers don't waste CPU time spinning or blocking while waiting for a shared resource.
let maybe_path = {
let mut guard = rx.lock().await; // Yields if lock is busy (i.e. another worker is using it rn)
guard.recv().await // Yields if channel is empty (when channel is not empty, it finishes and returns the result)
};
// The `guard` (MutexGuard) is dropped here, releasing the lock so other workers can continue.
A common question is whether holding the lock across an .await
can cause problems. For example, if worker1
calls let mut guard = rx.lock().await; guard.recv().await;
, it holds the lock while waiting for a message. What happens to worker2
?
-
If
worker2
(of the same type) tries to acquire the lock viarx.lock().await
, its task will pause and yield control to the Tokio scheduler. The underlying thread is not blocked and is free to run other, unrelated tasks (like spawning chunkers or embedders, monitoring, etc). -
This is not a performance bottleneck. If the channel is empty, there's no work for any of these workers to do anyway. The worker holding the lock simply becomes the designated waiter for the next message. Once a message arrives and is processed, the lock is released, and the next worker in the queue can acquire it to wait for its turn.
-
rx.blocking_lock().blocking_recv()
: For blocking workers (those spawned withtokio::task::spawn_blocking
), they cannot.await
. Instead, they use blocking calls.
loop {
let maybe_job = rx.blocking_lock().blocking_recv(); // Blocks the current thread (the one that is spawned)
let Some(job) = maybe_job else { break };
// Process job...
}
// The lock is released when `blocking_lock()` returns its guard,
// or when the guard is dropped at the end of the statement.
- When
blocking_lock().blocking_recv()
is called by a blocking worker (e.g., an embedder):- "Current thread" refers to the specific, dedicated OS thread that this worker (e.g., one embedder instance) is running on, provided by
tokio::task::spawn_blocking
's internal thread pool. Each CPU-bound worker of a given type (e.g., each of theEMBEDDERS
instances) runs on its own dedicated blocking thread. - The
rx.blocking_lock()
call returns a temporaryMutexGuard
. Theblocking_recv()
method is then immediately called on this temporary guard. In Rust, temporary values are dropped at the end of the full statement. Therefore, the lock is held only for the brief duration of theblocking_recv()
call. As soon as a message is received andblocking_recv()
returns, theMutexGuard
is dropped, releasing the lock. - The call to
blocking_lock().blocking_recv()
does block the specific blocking thread that executes it until a message is received and the lock is released. If multiple workers of the same type (e.g., several embedders) are simultaneously trying to acquire the lock on that specific sharedReceiver
, they will block their own respective dedicated threads until the lock becomes available. Once a worker successfully receives an item and releases the lock, another waiting worker can then acquire it. So only one worker can pull an item from the shared channel at any given instant. However, because these are separate dedicated blocking threads, multiple embedders can be running concurrently, each processing a different item after they have successfully acquired the lock and received their work. The blocking is localized to the act of acquiring the lock and receiving the item, not to the entire processing duration.
- "Current thread" refers to the specific, dedicated OS thread that this worker (e.g., one embedder instance) is running on, provided by
How tokio::fs
works under the hood (File I/O)
Tokio's asynchronous file system operations, such as tokio::fs::read_to_string
, are implemented using tokio::task::spawn_blocking
internally.
// This async call sends the actual file reading work to Tokio's dedicated
// blocking thread pool, freeing up the current async worker thread.
match tokio::fs::read_to_string(&path).await { /* ... */ }
Other things I considered
Alternative concurrency implementations/runtimes
While tokio::sync::mpsc
channels are used here since I have to use the Tokio runtime (better support in general for other open source libraries), other high-performance channel libraries exist, like crossbeam-channel
or flume
, and also offer good throughput and latency characteristics and can be used in both synchronous and asynchronous contexts (often requiring tokio::task::spawn_blocking
or adapters for async usage). The choice often depends on the specific project's dependencies and whether a fully asynchronous channel is strictly necessary or if a blocking channel integrated with blocking tasks is more appropriate.
Sharing sentence embedding model instances to reduce memory
For the Chunker
and Embedder
stages, I also considered sharing a single SentenceEmbeddingsModelType::AllMiniLmL12V2
instance across multiple workers, rather than having each worker load its own copy.
However, this is currently not straightforward (or not possible without deep cloning) within the rust-bert
library due to underlying tch
(LibTorch/PyTorch) tensor types not consistently implementing Rust's Send
and Sync
traits for safe sharing and mutation across multiple threads. Specifically, tch::Tensor
does not implement Sync
, which prevents multiple threads from safely holding shared references to a single tensor instance for concurrent inference.
Though, I think it is possible to share the model weights in memory and only allocate for inference. Maybe another day I'll try doing this.