A modified salting technique that cuts P99 write latency 22x for large blobs
Storing huge blobs in any database has always been, and still is, very challenging. Large allocations required for storing, reading, compacting, and repairing such cells always create significant pressure on the memory allocation sub-system. In addition, receiving a write request or sending a read response with a huge payload on a shared connection creates a “head of line” issue impacting the latency of other requests.
This is true for every database!
Consequently, by splitting the blob into smaller chunks and processing them in parallel, we can achieve latencies comparable to a single chunk read/write operation.
Naturally, when all your data consists of huge blobs, you are probably not going to use CQL or SQL databases to store them. You will use S3-like storage for blobs and will use CQL/SQL DB to store references to those blobs.
However, if your data is mostly reasonably small but has a small part of the population that are huge blobs, you may want to be able to serve both small and large blobs from the same database.
While working with ScyllaDB, we found that a modified salting technique can address the latency impact of storing large blobs. In this post, we present that salting technique, then explain when/how to apply it.
Background: Large Blobs in ScyllaDB
For a better idea of how storing large blobs impacts performance, let’s look at an example.
In our testing of ScyllaDB version 2026.1.1 with cassandra-stress tool, we observed that writing key-value rows with a 60MB blob cell results in an average latency of about 568ms and P99 latencies of 1.4s. In contrast, writing K/V data of 1MB yields an average latency of 2.2ms, with a P99 of approximately 4.5ms.
When writing 60MB cells, ScyllaDB could not go any faster because its memory management system was totally saturated.
Below are the results of the 60MB cell test (with a single i8g.4xlarge node):
Results:
Op rate : 18 op/s [WRITE: 18 op/s]
Partition rate : 18 pk/s [WRITE: 18 pk/s]
Row rate : 18 row/s [WRITE: 18 row/s]
Latency mean : 567.6 ms [WRITE: 567.6 ms]
Latency median : 497.0 ms [WRITE: 497.0 ms]
Latency 95th percentile : 1087.4 ms [WRITE: 1,087.4 ms]
Latency 99th percentile : 1436.5 ms [WRITE: 1,436.5 ms]
Latency 99.9th percentile : 1874.9 ms [WRITE: 1,874.9 ms]
Latency max : 1995.4 ms [WRITE: 1,995.4 ms]
Total partitions : 1,000 [WRITE: 1,000]
Total errors : 0 [WRITE: 0]
Total GC count : 0
Total GC memory : 0.000 KiB
Total GC time : 0.0 seconds
Avg GC time : NaN ms
StdDev GC time : 0.0 ms
Total operation time : 00:00:55
And here are the results of writes of 1MB cells with the same rate byte-to-byte with the 60MB execution above:
Op rate : 1,061 op/s [WRITE: 1,080 op/s]
Partition rate : 1,061 pk/s [WRITE: 1,080 pk/s]
Row rate : 1,061 row/s [WRITE: 1,080 row/s]
Latency mean : 2.2 ms [WRITE: 2.2 ms]
Latency median : 2.0 ms [WRITE: 2.0 ms]
Latency 95th percentile : 2.8 ms [WRITE: 2.8 ms]
Latency 99th percentile : 4.5 ms [WRITE: 4.5 ms]
Latency 99.9th percentile : 15.0 ms [WRITE: 15.0 ms]
Latency max : 41.6 ms [WRITE: 41.6 ms]
Total partitions : 60,000 [WRITE: 60,000]
Total errors : 0 [WRITE: 0]
Total GC count : 0
Total GC memory : 0.000 KiB
Total GC time : 0.0 seconds
Avg GC time : NaN ms
StdDev GC time : 0.0 ms
Total operation time : 00:00:56
The 60MB blob results are suboptimal for high-performance requirements.
However, 1MB results show that if we can split the blob into smaller chunks and write/read them in parallel we can achieve latencies close to a single chunk read/write operation. Perhaps salting can help us achieve this?
Classic Salting Technique
The classic “salting” technique, used to break down large partitions consisting of too many rows, introduces an additional “salt” column to the partition key. It selects a random value from a known range (e.g., an integer between 0 and 99) to store the next row.
This will distribute what once was a single large partition for a key KEY1 into 100 smaller partitions with partition keys (KEY1, 0), (KEY1, 1) …, (KEY1, 99) each of about 1/100 the size of the original one.
The primary drawback of this technique for large partitions is the necessity of using “salting” for every row, as the system does not inherently know if a row belongs to a large partition. Consequently, reading data for any original key KEYn requires reading all 100 partitions (KEYn, k), where k=0, 1, …, 99. And this may be very wasteful because large partitions normally represent a very small part of the total partition population. Similarly, large blobs typically represent only a small fraction of the total blob population.
Another “weak spot” of the classic “salting” is that you can’t reduce the SALT cardinality — you can only increase it. This means that if the size of your large partitions got smaller, you would still need to use the same “salt” cardinality you already used before.
Modified Salting Technique for Storing Blobs
We found that improving the original “salting” algorithm for a blob case can eliminate both of those drawbacks. Let’s look at how we modified that classic salting technique.
Schema
Let’s assume that the original table schema is as follows:
CREATE TABLE keyspace1.standard1 (
key blob,
value blob,
PRIMARY KEY (key)
)
For our algorithm, we modify it to:
CREATE TABLE keyspace1.standard1 (
key blob,
salt int,
chunk_id int,
chunk blob,
total_chunks int,
salt_cardinality int
PRIMARY KEY ((key, salt), chunk_id)
)
Algorithm
Write
On a write path, we are going to store the used “max_salt” (salt_cardinality) and the total number of chunks (total_chunks) in every row in addition to the rest of the chunk-specific data for simplicity. If you want to optimize the storage to a bitter end, you can store salt_cardinality and total_chunks only in the “metadata row” (see below).
def write_key_blob(key, blob, max_salt=100, max_chunk_size=4096):
# Split blob into chunks; last chunk may be smaller
split_blob_chunks: List[bytes] = split_blob(blob, max_chunk_size)
num_chunks = len(split_blob_chunks)
salted_partition_chunks = [[]] * min(num_chunks, max_salt)
for chunk_id, chunk in enumerate(split_blob_chunks):
salted_partition_chunks[chunk_id % max_salt].append(
(chunk_id, chunk)
)
for salt, chunks in enumerate(salted_partition_chunks):
# Inserts salted partition in one or a few UNLOGGED BATCHes
insert_async_batch(
key=key,
salt=salt,
chunks=chunks,
total_chunks=num_chunks,
salt_cardinality=max_salt
)
Complexity
- Memory: O(sizeof(blob))
- CPU: O(num_chunks)
- DB: O(num_salted_partitions), where num_salted_partitions = min(num_chunks, max_salt)
Latency
- Maximum batches concurrency divided by the num_salted_partitions times the single batch latency.
- If all batches can be sent out in parallel, the whole write is going to take the time it takes to write a single salted partition data.
Read
On a read path, we are going to start with reading total_chunks and salt_cardinality from the “metadata row” of a specific Key: row with (key=Key, salt=0, chunk_id=0) primary key. If we have stored any data for the Key, this row should exist.
Once we have total_chunks and salt_cardinality values, we can calculate primary key values for every chunk of the original blob we stored before, and read them all in parallel.
Below you can find a pseudo-code implementing this idea.
def read_key_blob(key: bytes):
# SELECT (total_chunks, salt_cardinality) FROM keyspace1.standard1
# WHERE key=key AND salt=0 AND chunk_id=0
total_chunks, max_salt = get_num_chunks(key=key)
if not total_chunks:
return None # No data for this key
salted_results_futures = []
for i in range(min(total_chunks, max_salt)):
# Full partition read
salted_results_futures.append(
async_read(device_id=device_id, salt=i)
)
# Poll for completions; can also use async callbacks
salted_partition_data = []
while salted_results_futures:
not_finished = []
for fut in salted_results_futures:
if fut.done():
salted_partition_data.append(fut.result())
else:
not_finished.append(fut)
salted_results_futures = not_finished
# Reassemble blob in correct order
chunks: List[bytes] = [None] * total_chunks
for partition_data in salted_partition_data:
for row in partition_data:
chunks[row['chunk_id']] = row['chunk']
# Zero-copy binary iterator over the original chunk
return itertools.chain.from_iterable(chunks)
Complexity
- Memory: O(sizeof(original blob))
- CPU: O(num_chunks)
- DB: O(num_salted_partitions), where num_salted_partitions = min(num_chunks, max_salt)
Solving Different Blobs’ Version Problem
As with regular large partition salting, there are some challenges:
- How to ensure the chunks you read belong to the same version of the blob?
- How to ensure concurrent writers of different blob versions to the same Key don’t leave the database’s data in an inconsistent state?
A rather common approach to solving the first issue is to add a ‘version’ non-key column:
- Writers must guarantee that every time they write a new version of the blob, they assign the same cluster-unique version identifier to every chunk (in order to ensure that all chunks of that specific version share the same identifier).
- A reader would always verify that the versions of each chunk (row) he/she reads for a specific Key match. And if they don’t — one needs to retry a read.
Solving the second issue on the DB level is not recommended. It would require using atomic transactions like CQL LWT, which would introduce a performance overhead of their own.
A better approach is to ensure the atomicity of writes on the application level by ensuring that there is always a single writer to the same (original) Key at any given point in time. One way to implement this is to have writer Agents manage specific Shard Key ranges. Each Agent acts as a consumer for an MPSC queue and is responsible for writing new versions of blobs belonging to its assigned keys.
In general, solving these problems is outside the scope of this blog.
Benefits Compared to Classic Salting
- One can choose any blob chunk size (
MAX_CHUNK_SIZE) and any salting cardinality (MAX_SALT) for every key without impacting other keys writes or reads. - Unnecessary reads of empty partitions in the read path are eliminated at the price of an additional small read of 8 bytes.
Examples of Approaches When Choosing MAX_CHUNK_SIZE and MAX_SALT
| Approach | How to configure | Pros | Cons |
|---|---|---|---|
| Fixed maximum chunk size | Always use the same MAX_CHUNK_SIZE for all blobs. Choose different MAX_SALT values per key depending on the blob size to control the size and the number of salted partitions. |
Use it if you want to create a predictable load on the internal memory allocation system. | The number or the size of salted partitions may grow large for big blobs. |
| Fixed maximum number of salted partitions per original key | Always use the same MAX_SALT for each key. You may choose to pick a different MAX_CHUNK_SIZE to control the number of rows in each salted partition. |
Same CPU complexity for read and write operations. | Some partitions or cells can get big for big blobs. |
| Control the number of single-row/single-shard partitions to be above a particular portion of the total population | Choose MAX_SALT to be 1 for blobs below a certain size, e.g. P99 blob sizes in the data population. |
Control the amount of data loss in case of losing a quorum. | If the threshold is chosen to be some big value, it may create huge partitions, which will in turn create bottlenecks on corresponding shards (CPUs). |
Clarifications About the Last Policy
One of the reasons that we want to salt large partitions (in this particular case, we are effectively salting a “large partition that has all the chunks of our original blob”) is to avoid creating a bottleneck on a single shard. By salting, we are distributing its data among many shards. That not only allows reading and writing its smaller parts in parallel, but also distributes the corresponding overhead among multiple shards of the ScyllaDB database.
However, this same distribution is going to become our nemesis when we try to estimate the “blast radius” of data consistency loss when we lose a quorum.
Let’s do a quick estimation. Assume the following configuration:
- Cluster: 3 racks (A, B, and C), each rack having 2 nodes A1, A2, B1, B2, C1, C2 correspondingly.
- Keyspace: NetworkTopologyStrategy with RF=3 in the current DC.
- Write consistency: LOCAL_QUORUM (this is a common consistency setting that, when paired with a LOCAL_QUORUM read, ensures immediate visibility of all writes)
When we write with a LOCAL_QUORUM, we always write to all 3 replicas — however, the write request is reported as a success when 2 out of 3 replicas acknowledge the write.
Therefore, when we estimate potential consistency loss, we should always assume the worst case scenario of when every write has only reached 2 out of 3 replicas.
Let’s now assume that nodes A1 and B1 are lost, and so is all their data.
If blobs are stored as-is (no chunking) as a single key-value row/partition, then this would mean that we lost a guaranteed consistency for about 25% of our data set: A1 has data of ~50% of the population and there is a ~50% probability that keys replicated on A1 are also replicated on B1.
To reduce this number, one should provision more nodes per-rack.
| Number of nodes per rack | Possible data loss amount when losing 1 node in each of 2 racks |
|---|---|
| 3 | ~11% |
| 4 | ~6.25% |
| 5 | ~4% |
| … | … |
If blobs are chunked and salted — each with MAX_SALT of at least as the number of nodes in a single rack — then statistically, each node in the cluster is going to have some chunks of each blob. For the above scenario, we would have to assume that we lost consistency of every key: 100% data loss.
Total data consistency loss is a critical scenario that database administrators strive to avoid.
So, how can this risk be reduced? One option is to use a hybrid salting strategy, as presented above.
- If all your blobs are large or blob sizes are uniformly distributed, then you may want to chunk them and store each blob’s chunks as a single partition: always use
MAX_SALT=1. - If your blob size distribution has a high tail (e.g. P99 is 10MB while the average blob size is 300 bytes), then add only 1% to the value in the table above. To do this, you can use
MAX_SALT=1for all blobs below 10MB and use a largerMAX_SALT(e.g. 100) for all blobs that are larger or equal than 10MB.- It allows for effective management of the data loss blast radius.
- It enables the distribution of the largest blobs across multiple shards, fulfilling the primary goal of chunking.
Demo
Here is a small demonstration of the idea described above. We wanted to show that the latencies of reading and writing of the chunked 60MB blobs is comparable to latencies of 1MB or 64KB small blobs. The small chunk writes and reads steps were running with the fixed concurrency of 15 to make sure we are not hitting any possible bottlenecks.
We have implemented a write API that receives blob and salting parameters and stores it in a chunked form as described above. We have also implemented a corresponding read API that reads the blob previously stored by a write API back and returns it as a vector of chunks.
We are going to measure the latency of API calls above:
- For writes: the time all chunks of a given blob are written to the DB.
- For reads: the time all chunks are read from the DB and the corresponding vector of chunks is returned to a caller.
We are going to issue APIs that chunk the blob with concurrency 1 in order to avoid the possibility of queuing and get the clean latency measurements.
You can find the API for managing salted blobs within the SaltedBlobStore class in this repository, with implementations available in both Python and C++.
The following results were obtained using the C++ API.
The benchmark tool has 4 steps:
- Write a given number of blobs of a given size with one of the write APIs mentioned above.
- Read the blobs written in step 1 using one of the read APIs mentioned above.
- Write the same amount of data written in step 1 using single chunk writes of the same size we used for chunking blobs in step 1.
- Read the data written in step 3 back.
Our setup is:
- ScyllaDB: a single node with 15 shards: i8g.4xlarge AWS VM.
- Loader: a single c5.12xlarge AWS VM.
- Compactions are disabled to make steps 1 and 3, and 2 and 4 comparable since they run back-to-back.
We write 1000 blobs 60MB each in the demo.
In the first iteration, we use 1MB chunks and max_salt=60 since there will be exactly 60 chunks.
In the second iteration, we use 64KB chunks and max_salt=100.
Then we compare the API-level latencies between these two iterations.
Benchmark Results
Iteration 1
Total amount of data written/read:
Large blobs : 1,000 × 60 MiB = 58.59 GiB total
Small blobs : 60,000 × 1024 KiB ≈ 58.59 GiB total
Chunk size : 1 MB max_salt=60
small blobs concurrency=15
large blobs batch write/partitions read concurrency = 60 (all partitions are read and written in parallel)
| Metric | Large Write (60MB) | Large Read (60MB) | Small Write (1MB) | Small Read (1MB) |
|---|---|---|---|---|
| Effective Throughput | 682.1 MiB/s | 758.3 MiB/s | 1420.1 MiB/s | 1238.1 MiB/s |
| Execution Duration | 1m 28s | 1m 19s | 42.3 s | 48.5 s |
| Operation Count | 1,000 | 1,000 | 60,000 | 60,000 |
| Latency Metric | Large Write (60MB) | Large Read (60MB) | Small Write (1MB) | Small Read (1MB) |
|---|---|---|---|---|
| Minimum Latency | 85.7 ms | 64.0 ms | 2.5 ms | 1.2 ms |
| Median (p50) | 87.7 ms | 74.9 ms | 7.3 ms | 10.5 ms |
| Tail Latency (p99) | 92.5 ms | 87.1 ms | 38.6 ms | 39.4 ms |
| Maximum Latency | 98.1 ms | 91.2 ms | 59.7 ms | 80.0 ms |
Iteration 2
Total amount of data written/read:
Large blobs : 1,000 × 60 MiB = 58.59 GiB total
Small blobs : 960,000 × 64 KiB ≈ 58.59 GiB total
Chunk size : 64 KB max_salt=100
small blobs concurrency=15
large blobs batch write/partitions read concurrency = 100 (all partitions are read and written in parallel)
| Metric | Large Write (60MB) | Large Read (60MB) | Small Write (64KB) | Small Read (64KB) |
|---|---|---|---|---|
| Effective Throughput | 998.0 MiB/s | 1022.9 MiB/s | 1124.5 MiB/s | 438.8 MiB/s |
| Execution Duration | 1m 0s | 58.7 s | 53.4 s | 2m 17s |
| Operation Count | 1,000 | 1,000 | 960,000 | 960,000 |
Per-Operation Latency Characteristics
| Latency Metric | Large Write (60MB) | Large Read (60MB) | Small Write (64KB) | Small Read (64KB) |
|---|---|---|---|---|
| Minimum Latency | 58.8 ms | 52.3 ms | 0.6 ms | 0.6 ms |
| Median (p50) | 59.8 ms | 57.8 ms | 0.8 ms | 0.9 ms |
| Tail Latency (p99) | 64.2 ms | 69.6 ms | 1.1 ms | 1.2 ms |
| Maximum Latency | 91.9 ms | 76.0 ms | 2.0 ms | 23.8 ms |
These results validate the efficiency of the salting strategy for massive objects.
While we were writing with virtually the same throughput as cassandra-stress at the beginning of the article, using 64KB chunking results in about 10s faster average writes for the same 60MB of data and 22x lower P99 write latencies.
We see that 1MB chunking results in about 40% worse latency across all percentiles compared to 64KB chunking. This is not very surprising because 1MB chunks are pretty large blobs themselves and trigger the same issues like larger blobs.
Overall, these performance metrics are highly favorable compared to the raw 60MB blobs’ write/read latencies we saw with cassandra-stress in the original test we shared.
Conclusion: High Performance, Controlled Risk
The challenge of storing large blobs in ScyllaDB is fundamentally about managing memory pressure and latency. Our experiments confirmed that a large 60MB blob written as a single key-value row resulted in a write latency of about 567ms/1436ms average/P99 latency.
The Modified Salting Technique solves this bottleneck by transparently fragmenting the large blob and allowing its parts to be processed in parallel across multiple shards. This approach successfully reduces write/read latency to highly performant levels, comparable to small key-value operations (60ms/64ms average/P99) with a very low tail latency. Plus, there is a good potential to improve this even further if one increases the write/read concurrency.
This technique offers flexibility not found in classic salting: most notably, the ability to configure the salting cardinality (MAX_SALT) on a per-key basis. This flexibility is the key to managing a delicate trade-off:
- For optimal performance and shard distribution, a large
MAX_SALTis preferred. - For critical data where minimizing the data loss blast radius during a quorum failure is paramount, a low
MAX_SALT(e.g.,MAX_SALT=1) can be used to isolate the data to fewer nodes.
By implementing a hybrid approach — using low salting for small to medium blobs, and high salting for the largest ones — administrators can achieve high throughput and low latency for their entire data set while retaining control over data loss risk. This modified salting technique can help users squeeze better performance from ScyllaDB when dealing with mixed-size datasets and large object storage.
If you’re interested and want to give this chunked blob technique a try, you can find working code samples and the benchmark used above at https://github.com/scylladb/scylla-code-samples/tree/master/chunking-large-cells/.

