Overcoming the Performance Cost of Streaming Transactions

19 minutes

In This NoSQL Presentation

For a long time distributed transactions have been known to make systems slow and unavailable. However, recent academic and industry advancements such as RAMP, Occult, Calvin and TAPIR have changed the landscape and made transactions remarkably fast. In this talk I'll explain how the Redpanda streaming data platform utilizes modern transactional approaches and pushes the envelope further by adjusting these concepts to the streaming workload. I'll share benchmarks and explain what makes Redpanda transactions so fast.

Denis Rystov

Denis Rystov, Staff Engineer, Redpanda

Denis Rystsov is a staff engineer working on Redpanda, a Kafka-compatible distributed event streaming platform. His area of expertise lies in distributed transactions, consensus algorithms, and testing consistency models. He likes to keep an eye on academic computer science research and use it as leverage in the industry. Denis volunteers as a member of the Hydra conference program committee and as a member of the editorial board of the Journal of Systems Research (JSys). He received his MSc in Applied Mathematics and Computer Science from SPbSU. Before joining Vectorized he worked at Yandex, Amazon, and Microsoft. Apart from work he likes surfing and exploring the waters of the PNW.

Video Transcript

Hello, ScyllaDB Summit. Today, we’re going to talk about transactions, but first a couple of thoughts about myself. My name is Denis Rystsov. So at Redpanda, I designed Redpanda transactions, and before I joined the company, I was at Microsoft testing consistency of CosmosDB. As kind of a side thing, I invented CASPaxos, and when I don’t stare at the screens I love surfing in cold waters of the Pacific Northwest. And today, we’re going to talk about transactions in Redpanda.

But first of all, what is Redpanda? As you already understand Redpanda is the name of the company, but also it’s the name of the product. What this product is about, Redpanda is a modern streaming platform supporting Kafka protocol, and like Kafka, it isn’t JVM-based. It’s written in C++, and it’s native. It uses thread per core model, and just like ScyllaDB, it’s based on the Seastar framework which by the way, I really like it, makes so easy to write synchronous code. Thank you, folks, for creating it. Okay. That’s Redpanda, but it only gives information if you’re already familiar with Kafka, so let’s look at the Redpanda from the storage perspective. Redpanda is a specialized distributed replicated transactional storage which optimized for append only writes, sequential reads, persistent cursors and primitives for consumer cooperation. Well, that’s described what it is, but how do people use it? There’s multiple use cases for Redpanda. The major ones are to use Redpanda for data transfer. For example, customers may use ScyllaDB. They may connect with tools like Debezium to Redpanda and then use Redpanda to shift large amounts of data between an OLTP database and an OLAP database such as ClickHouse for example to do the analytics. Another use case for using Redpanda is to have Redpanda as a backbone for the event-driven architecture when we think of a lot of microservices connected with Redpanda, so this it. And now the interesting part, transactions, we probably all know what transactions are. It’s basically a mechanism to update multiple objects at once with all-or-nothing semantics. For the databases, the objects are usually roles, and for Redpanda, when we talk about updating multiple objects, it’s topics, so with transactions, customers may write to multiple topics at once. And another use for transactions in Redpanda is consume-transform-produce loop. Remember I mentioned about the persistent cursors. It’s called consumer groups, and what customer wants is to read data from a cursor, do some transformation and then write this data to the output topic, and they want to do this operation instantly, so we don’t want to process the same piece of information twice, and we don’t want to see duplicates in the output topic. Usually when people talk about transactions, they talk about the banking example, so moving money between different accounts, and it’s kind of easy to see importance of this problem, so we don’t want to lose money, so we want all-or-nothing semantics for this. Let’s see how to map the banking example to Redpanda. So we want to talk about the banking example, and we may have this architecture for our banking service: a list of accounts, accounts grouped together and mapped to microservices, and each microservice is backed by ScyllaDB and Redpanda. ScyllaDB calls the current balance of the account, and Redpanda acts as a ledger. Just like blockchain and just like Bitcoin, each instance of Redpanda contains a list of all transactions touching this particular account. You might think that it kind of contradicts the microservice architecture because we have different microservices backed by the same storage, ScyllaDB and Redpanda, but actually it’s fine here because we may map each microservice to its own table and topic, so nothing is violated in terms of microservice architecture. Now let’s look at the operations which it’s possible to perform in this banking example. So when the user wants to check its balance or transfer money between its own accounts, we don’t need to touch Redpanda. All of these operations happens between the REST interface, the microservice and ScyllaDB. But when user wants to perform cross-accounts operation, we need to involve Redpanda. So how it happens, each microservice, it has its own ledger, and it’s constantly pulling events from this ledger, updates the balance and saves snapshot to ScyllaDB. When a user wants to transfer money between accounts, it contact its own microservice. Then this microservice checks that the user has enough money on the account, and then it execute the Redpanda transactions to put this logical transaction to multiple ledgers. Remember, we have .. . Each microservice has its own ledgers, so we need to insert, and if we transfer money between two accounts, we need to insert into two ledgers, and we want this operation to be automatically. So this explains us how to build transactions on top of Redpanda, so how to use it in the banking example, but just because we may build banking examples on top of Redpanda, it doesn’t mean that we should. So the next question is why to use Redpanda at all for the transaction operations? And the answer is latency. Imagine two data centers, one in Seattle and another in New York. Seattle data center is responsible for operations of the users on the West Coast, and the New York data center is responsible for operations of the people from the East Coast. And the problem with the databases is for each operation, they require coordination between both data centers. It happens because of the isolations level. Usually when people use a database, they pick one of the two isolation level as serializability, and serializability means that all of the transactions are ordered. It means that we can’t get away from coordination. Another option that people may choose is snapshot isolation. With snapshot isolation, it’s possible to update multiple records concurrently at the same time, but it requires that all of the reads observe the monotonically increasing view of the database, so again it’s an order and will require coordination. With streaming, it’s not true. With streaming, usually for example Kafka and Redpanda implements non-monotonic snapshot isolation, and this isolation level is just a person who is tired of being in control. Do you want out-of-order updates? Okay, go for it. You want non-monotonic reads? Yeah, you’re allowing to do this. The only restriction is to avoid conflicting records. So if people want to execute transactions on the Seattle data center, obviously the separations, they don’t conflict with the updates in the New York data center, and it all may happen concurrently, so we don’t need the need for coordination when there’s no cross-data-center transactions, and because of this, streaming may be way faster than the databases. But because we use technologies that allows us to be fast, it doesn’t mean that we actually achieve this performance. So I’ve asked on Twitter to just to see people opinion. What’s the expected performance of a replicated distributed database when we’re talking about the banking example of transferring money between two accounts? And we see that the majority of answers, it’s 6,000 transactions per second. And you know what? This is the data for Redpanda. We go at somewhere 60 concurrent clients. We achieve this volume. Another .. . So when we talk about performance, we can compare it with expectation, which we did. But another thing that we may do is compare it with the existing system, so let’s look at the Kafka, so this is data for Kafka. We can see that its throughput is six times lower than Redpanda. Of course, if I were you, I wouldn’t trust this data because I’m biased, and I probably may just mess up with the configuration. And my goal is to convince and convince myself that this data is actually valid. So what I did, I’ve compared the baseline between Kafka and Redpanda, and then I measured the ratio between the baseline and the transactions. If this ratio is the same for Redpanda and Kafka, okay, this difference is possible to explain by choose of wrong parameters. But if this ratio is different, it will tell us about the differences in the replication protocol. So this is a baseline. We can see that Redpanda is roughly three times .. . has throughput three times more than Kafka, but for transactions, it was six times, so it hints that Redpanda does something different in the transaction department. Let’s find out. This is an example of banking transactions in Kafka and in Redpanda. Let’s see what is happening when we try to execute this transaction. beginTransaction is the client only method call, but send already sends the RPC to the server, and send consists of two parts. First is adding topic to a transaction, and second part is sending data to a topic. So this is how we add topic to a transaction. Client call a register call. If we’re happy this call contains data for all of the partitions that are being executed, the coordinator saves to disk and replicates this data, and then it answers to the client. Then clients start to execute send requests. It send data to a partition. If a client doesn’t wait for result of the send operation, it may send data to a different partition too. Then the partition write to disk and replicate it, and then it acknowledges the request. Now the final part is the commit transaction. Clients send a commit transaction to a transactional coordinator. Coordinator writes it to a disk, replicates it and immediately acknowledges the request. Then in the background, it contact all of the partitions, writes the transactional marker. The marker is being written to disk and replicated, and then it’s acknowledged, and then it marked the current transaction as a finalized. So this is Redpanda. This is a Kafka protocol. Let’s see what we can .. . Let’s see on the Redpanda optimizations. So first of all, we allow send to be eventually consistent. It may lead to inconsistencies, but we have it covered. When it’s commit phase, we check that all previously replicated data are replicated, so we check that everything is .. . So we eliminate this possible inconsistent state. Also, we use parallel commits optimization, and we use RAM over disk in the cases when it’s possible. Let’s see the flow. So this is add topic to transaction. Client sent in the RPC to a coordinator. Then coordinator conduct all of the partitions in these transactions and ask for its term. Term is something like a version of the current ledger, and later we use this information to validate that all of the events should they .. . are replicated since are actually fully replicated. We’ll keep this information in the RAM instead of the disk, and we immediately acknowledge to the client. During the send operation, we don’t replicate data. We just write it to a disk and immediately answer to the client, and then we replicate this data in the background so not on the hot path. And the last part, the commit operation, so client calls a commit RPC. Then the coordinator immediately start writing and replicating its preparing status, and also it contacts all the partitions participating in the transactions to set a barrier. Basically it asks if use the same leader I had talked before, please validate that all of the ongoing send operations are fully replicated. Then once it get confirmation from the partitions and from writing these disk operations, we acknowledge the commit to the client. This part of the writing, it’s called parallel commits optimization first introduced by [Indistinct]. Yeah, after we acknowledge the data, we write the prepared state and replicate it, and then we contact the partitions again and ask them to write the transactional marker, and then we keep this. Then we mark this transaction as committed and keep this information in memory. If it is being wiped by the restart, we just retry the commit RPC and its item [Indistinct] so there’s no big deal, and nothing can go wrong in this case. So this is Redpanda optimizations which lead us to this throughput. That’s it. Let’s stay in touch. I’ve posted my contacts, but also we have a community Slack as Redpanda company, and also we have open positions, so if you like to work on the complicated distribution stuff, please apply. Thank you.

Read More