The thought process behind Quipu's parallelism design

September 23, 2025 (1w ago)

Reference: pipeline_usage.rs

Context: Quipu is a high-performance semantic indexing system designed for extremely parallel/concurrent file processing and queries that support both metadata and vector search operations.

First things first, I need to address the goal and why we need this to be parallel in the first place. The simple answer is that we want to maximize "throughput".

We want to process as many files as possible, as quickly as possible. This means keeping the CPU busy with useful work at all times and not letting it sit idle waiting for slow operations like reading from a disk. (though we don't want to overload the CPU with tasks, but this helps us find the optimal ratios between # of workers for each stage).

The context of this project is to process embeddings for file contents and store them into a vector database (e.g. pdf, txt, docx content), and properly chunk them as efficiently as possible.


What are we actually doing?

The pipeline I thought of has four main stages:

  1. Files I/O: This is a classic I/O-bound task, tokio has a builtin tokio::fs wrapper for it.
  2. Preprocessing: In actual production, this would be processing PDF and DOCX files into text. But for the purpose of the simple pipeline architecture, the program will only normalize line endings and trimming whitespace. It's technically CPU work, but it's so trivial it's negligible. We can treat it as part of the I/O-bound pipeline. However, if this stage involved more heavy CPU work (e.g., converting a PDF/DOCX file to text, FFmpeg conversion, etc), it would become a CPU-bound task.
  3. Chunking: This involves a sentence embedding model's tokenizer. Tokenizing text, especially long text, is a series of complex lookups and computations. This is a CPU-bound task.
  4. Embedding: This is the heaviest stage. It runs the embeddings model to turn text chunks into vectors. This is a very heavy CPU-bound task.

Conclusion: We have two distinct "types" of work (CPU-bound and I/O bound, basically categorized by computational complexity). Mixing them carelessly is the source of all performance problems.


Choose the Right Tool for Each Job

Tokio gives us two primary tools for running tasks:

  1. tokio::spawn(async { ... }): For async tasks. This runs the task on a shared pool of worker threads (the "core" runtime). An async task is special because it can yield control back to the scheduler whenever it hits an .await point (e.g., waiting for a file read or a channel message). This allows a single thread to juggle thousands of waiting tasks, making it incredibly efficient for I/O-bound work.

Side note: tokio::spawn is an alias of tokio::task::spawn.

  1. tokio::task::spawn_blocking(|| { ... }): For blocking tasks. This moves the task to a separate, dedicated thread pool designed for blocking, CPU-intensive work. This is crucial because a task here can run for a long time, hogging 100% of a CPU core, without stalling the main async runtime. The async runtime can continue juggling its thousands of I/O (or small computation) tasks while the blocking threads crunch numbers.

TLDR;

The general rule of thumb: a function should be async if it primarily waits (e.g., for I/O, network responses, channel messages), and should use tokio::spawn. A function should be run via spawn_blocking if it primarily works (performs heavy computation) and has no .await points. Any pure CPU task taking more than ~100µs is a good candidate.

So in the example pipeline, we designed it like this:


How do workers talk to each other?

We use tokio::sync::mpsc::channel to create asynchronous, multi-producer, single-consumer channels. These are basically conveyor belts between our pipeline stages.

We have a sender (tx) can be cloned and given to many producer tasks.

and a receiver (rx) can only be used by one consumer task at a time. (but this is fine, and I'll explain more later)

When multiple consumers need to pull tasks from a single shared channel receiver (e.g., multiple file readers taking paths from input_paths_rx), the Arc<Mutex<Receiver>> pattern is used.

let maybe_path = {
	let mut guard = rx.lock().await; // Yields if lock is busy (i.e. another worker is using it rn)
	guard.recv().await // Yields if channel is empty (when channel is not empty, it finishes and returns the result)
};
// The `guard` (MutexGuard) is dropped here, releasing the lock so other workers can continue.

A common question is whether holding the lock across an .await can cause problems. For example, if worker1 calls let mut guard = rx.lock().await; guard.recv().await;, it holds the lock while waiting for a message. What happens to worker2?

loop {
	let maybe_job = rx.blocking_lock().blocking_recv(); // Blocks the current thread (the one that is spawned)
	let Some(job) = maybe_job else { break };
	// Process job...
}
// The lock is released when `blocking_lock()` returns its guard,
// or when the guard is dropped at the end of the statement.

How tokio::fs works under the hood (File I/O)

Tokio's asynchronous file system operations, such as tokio::fs::read_to_string, are implemented using tokio::task::spawn_blocking internally.

// This async call sends the actual file reading work to Tokio's dedicated
// blocking thread pool, freeing up the current async worker thread.
match tokio::fs::read_to_string(&path).await { /* ... */ }

Other things I considered

Alternative concurrency implementations/runtimes

While tokio::sync::mpsc channels are used here since I have to use the Tokio runtime (better support in general for other open source libraries), other high-performance channel libraries exist, like crossbeam-channel or flume, and also offer good throughput and latency characteristics and can be used in both synchronous and asynchronous contexts (often requiring tokio::task::spawn_blocking or adapters for async usage). The choice often depends on the specific project's dependencies and whether a fully asynchronous channel is strictly necessary or if a blocking channel integrated with blocking tasks is more appropriate.

Sharing sentence embedding model instances to reduce memory

For the Chunker and Embedder stages, I also considered sharing a single SentenceEmbeddingsModelType::AllMiniLmL12V2 instance across multiple workers, rather than having each worker load its own copy.

However, this is currently not straightforward (or not possible without deep cloning) within the rust-bert library due to underlying tch (LibTorch/PyTorch) tensor types not consistently implementing Rust's Send and Sync traits for safe sharing and mutation across multiple threads. Specifically, tch::Tensor does not implement Sync, which prevents multiple threads from safely holding shared references to a single tensor instance for concurrent inference.

Though, I think it is possible to share the model weights in memory and only allocate for inference. Maybe another day I'll try doing this.