Home > Engines, Grade of the Steel, NoSQL, OLTP > Implementing Message Queues in Relational Databases

Implementing Message Queues in Relational Databases

At the last SQL Bits X I held the FusionIO fireside chat during the launch party. During this presentation, I demonstrated how it is possible to build a table structure inside a relational engine that will act is a message queue and deliver nearly 100K messages/second.

My design was inspired by the LMAX Disruptor Pattern and used sequencer objects to greatly boost throughput of the queue structure.

In this blog entry, I will show you how I build this queue structure and give you enough details to do so yourself.

But first, lets me walk you through the problem.

Note: In the following I will use the terms push and pop for respectively adding and deleting messages in a queue. You may have learned the terminology enqueue and dequeue instead, same thing.

The Problem Statement

Every now and again, relational database designers find themselves creating a table that ends up acting as a message queue. Such tables tend to fluctuate a lot in size, from zero rows to several millions. Furthermore, the tables typically need to be ordered so fairness of message pushing and popping. The data flow looks something like this:

image

In the following examples, I will assume a message size of 300B – but the argument flies for larger and smaller message sizes too.

Relational Message Queues – Naïve

When faced with a table that need to be ordered where the same rows are frequently added and shortly after deleted, something like this is the typical design:

CREATE TABLE dbo.MyQ
(
    [message_id] INT NOT NULL
    ,[time] [datetime] NULL
    ,[message] [char](300) NULL
)

CREATE UNIQUE CLUSTERED INDEX CIX
ON dbo.MyQ (message_id)

CREATE SEQUENCE dbo.MessageSequence AS INT

The rows in the table are implicitly ordered by the index. Push and pop can now implemented like this:

/* Push Naïve */

INSERT MyQ (message_id, time, message)
SELECT NEXT VALUE FOR dbo.MessageSequence, GETDATE(), ‘Hello World’

/* Pop Naïve */

DELETE Q
FROM (SELECT TOP 1 * FROM MyQ ORDER BY message_id) AS Q

This approach has a lot of problems that will be very familiar with anyone who has tried to be a DBA for a table like this. Among these problems are:

  • The statistics are NEVER up to date on such a table. This typically requires all plans to be forced/hinted or statistics to be custom hacked.
  • In databases that auto updates statistics, the auto updates constantly kick in, creating jagged CPU patterns and throughput.
  • The B-tree structure implementing the ordering of the rows is constantly being split at the root as it grows and shrinks. This causes mutex convoys (in SQL Server, these convoys are reported as latches on ACCESS_METHODS_HOBT_VIRTUAL_ROOT)
  • New pages are constantly allocated and deallocated in the buffer pool which stresses internal allocation structures in the database
  • There is contention on the memory structures that hold the pages at the start and end of the B-tree (reported as PAGELATCH_EX in SQL Server)
  • The writes are extremely sensitive to latency of the transaction log drive and this is exacerbated by the convoy effects on the higher levels of the B-tree always being split.

The contention is perhaps best illustrated with this diagram of the hot pages in the B-tree:

image

I build this implementation on an HP DL380G7 server with a FusionIO ioDrive Duo (tlog latency under 50microsec) on a SQL Server 2012 RTM installation. I experimented with different thread counts for both push and pop to find the sweet spot. The highest throughput I could push (pun intended) through the message queue with the above implementation was around 6000 messages/sec.  Very, very far from impressive and fuel for the NoSQL fires.

    The obvious (but wrong) conclusion, is that relational database are not fit for purpose when it comes to building message queues. Of course, there IS a grain of truth to it, but can we do better than a meager 6000 messages/sec?

Relational Message Queues – LMAX’ed

It is in fact possible to do a lot better than the naïve message queue implementation. Inspired by the bright people over at LMAX and their Disruptor pattern, I decided to build a relational equivalent.

The first realisation is that INSERT/DELETE is just not going to work. People who code high scale systems might intuit this: constant memory allocation and deallocation is expensive and INSERT/DELETE is a form of memory alloc/dealloc. The solution is to preallocate the queue at a certain size and UPDATE a reference count on each message slot in a row instead of completely removing/adding a row for every message sent through the queue.

The queue table now looks like this:

CREATE TABLE dbo.MyQ
(
    [Slot] BIGINT NOT NULL
    , message_id BIGINT NULL
    ,[time] [datetime] NOT NULL
    ,[message] [char](300) NOT NULL
    ,reference_count TINYINT NOT NULL
)

/* Prefill messages */
WHILE @i < @QueueSize BEGIN
  INSERT MyQ (Slot, time, message, reference_count) 
  VALUES (@i, ‘2050-01-01’, ‘dummy’, 0)
  SET @i = @i + 1
END

/* Create index and fill it up */
CREATE UNIQUE CLUSTERED INDEX CIX ON dbo.MyQ (Slot)
WITH (FILLFACTOR = 100)

Side Note: If you are still wondering why UPDATE is faster than INSERT for this case. See my previous blog entry.

The above preallocates @QueueSize message slots. Pushing a message is now these operations:

  • Generate a new message_id
  • Find the next available slot (with reference_count = 0)
  • Update the message column
  • Add one to reference_count

And pop is:

  • Find the smallest message_id that has been “inserted”
  • Read and return the message column
  • Decrement reference_count, marking the slot as available again (variants can be done if you have multiple subscribers)

At least, the above is the pseudo code. However, for this to work we have to find a way to quickly locate the next available slot in the message queue for push and find the smallest message_id that is not popped yet.

First, how do we find the next available slot for push? This turns out to be surprisingly easy: we use a sequencer object. The sequencer, for the non database people out there,  is a very high scale data structure to generate “the next number”. Think of it like a singleton object. The sequencer can be used for BOTH generating the message_id AND find the next available slot. Here is how:

/* Sequence to keep track of next message number and slot */

CREATE SEQUENCE dbo.PushSequence AS BIGINT
START WITH 1 INCREMENT BY 1
CACHE 100000;

/* Push LMAX’ed Begin */

SET @PushSeq = NEXT VALUE FOR dbo.PushSequence

/* Find slot */
SET @Slot = @PushSeq % @QueueSize

UPDATE dbo.MyQ /* SQL Server users: hint WITH (ROWLOCK) here */
SET [time] = GETDATE()
    , [message] = ‘Hello World’
    , [message_id] = @PushSeq
    , reference_count = reference_count + 1 
WHERE Slot = @Slot
  AND reference_count = 0 /* Don’t overwrite! */

IF <No rows affected> BEGIN
  /* The slot was not available –> queue is full */
  <Sleep 100ms>
  <Try UPDATE again>

END

/* Push LMAX’ed end */

What we have now achieved is essentially a ring buffer for insert operations. To illustrate, here is an example queue table of @QueueSize = 100.

image

 

We can quickly located new slots for push operations. All the remains is to implement pop. We could chose to keep track of the smallest message_id in a variable that the pop will update. However, it is always better to remove coordinate when possible. Instead of letting pop keep track of the messages, we can create a ANOTHER sequencer object that trails behind PushSequence. In fact, we don’t even need to keep track of the value of PushSequence to properly pop. Here is how to implement pop:

CREATE SEQUENCE dbo.PopSequence AS BIGINT
START WITH 1 INCREMENT BY 1
CACHE 100000;

/* Pop LMAX’ed Begin */

SET @PopSeq = NEXT VALUE FOR dbo.PopSequence

/* Find slot */
SET @Slot = @PopSeq % @QueueSize

UPDATE dbo.MyQ /* SQL Server users: hint WITH (ROWLOCK) here */
SET [time] = GETDATE()
    , @OutPutMessage = message 
    , [message_id] = NULL 
    , reference_count = reference_count – 1 
WHERE Slot = @Slot
  AND message_id = @PopSeq /* Make sure we didn’t try to pop an empty slot */

IF <No rows affected> BEGIN
  /* No message found to pop or we are ahead of push */
  <Sleep 1sec>
  <Try UPDATE again>

END

/* Pop LMAX’ed end */

Very similar to push, with a few tricks added. You have to be a bit careful with the boundary conditions. First of all, because there is no coordination between PopSequence and PushSequence, it CAN happen (for example due to thread scheduling) that the current pop message_id gets ahead of push message_id. When that happens, we will wait for the pusher to create more rows. We can now complete the above illustration:

image

To avoid pop constantly blocking on an empty queue (if message pop faster than they push), it is generally a good idea to let push get a “head start” so there is something in the queue to pop. In my example I have chosen to let pop wait for 1 second before trying to pop from a queue where the last attempt to pop was an empty queue.

I took the above implement and ran it on the same hardware as the naïve approach. The results were VERY different. I can now drive between 90K and 100K messages/sec through the queue. A nice little 15x improvement.

Summary

It can be argued that old school, relational databases and tables are not the best structures to implement durable storage for message queues. The many code paths needed in relational algebra to implement ACID properties, generic concurrency, serialization and block I/O can get in the way of a fast queue implementation, especially the naïve implementation of relational purists.

However, if we combine our knowledge of programming with database design skills, high throughput can be achieved even in the relational model and there are large, often overlooked improvements to be found.

Advertisements
  1. Marco Russo
    May 25, 2012 at 22:01

    When you Pop in the fast code, why you SET the message_id with @PushSeq? Shouldn’t you read data instead of writing (except for decreasing reference_count)?

  2. Thomas Kejser
    May 26, 2012 at 09:34

    Well spotted Marco. I corrected. Thanks

  3. June 11, 2012 at 10:36

    But why not Service Broker queues? Is there particular reason you implement queues yourself?

    • Thomas Kejser
      June 11, 2012 at 10:46

      Try to compare my benchmark with a service broker queue Janis 🙂

      • June 11, 2012 at 13:20

        You know Its quite complicated to do that :)..

        In example messages up to 300 chars. My typical message is variable length XML. So, data gets out of the row and what would be performance in that case.. And what is if worker process gets broken (i do usually have web service calls there) and queue gets much bigger than i was expecting it will.. And whats about grouped messages, when i do want to process 1000 messages in one transaction..

        Idea thought is very, very interesting and i bet there is special cases when it works much faster.

      • Thomas Kejser
        June 11, 2012 at 13:45

        Interesting questions Janis

        First of all, you should NOT wait for a web service roundtrip to take the message out of the queue. This anti-pattern:

        1) Begin Tran 2) Do some work that must be scalable and lock synchronized with other threads (i.e. Pop queue) 3) Do some work that requires network traffic or expensive memory access 4) Commit

        …Is simply fundamentally unscalable duo to the physics of roundtrip speeds. If you are doing that, you are doing it wrong. This is the right approach:

        1) Begin Tran 2) Do some work that must be scalable and lock synchronized with other threads (i.e. Pop queue) 3) Commit 4) Do some work that requires network traffic or expensive memory access in a way that is safe even when exceptions are thrown.

        To translate to the message queues: remove the message from the queue immediately and put it somewhere the web service can pick it up (step 2 above). The target should be a scalable structure, for example an auxiliary table for each web service worker thread. This also decouples the pop from any error handling in the web role You dont want some .NET error handling, non native code to stall the queue.

        With regards to message size: The pattern should scale with messages up to a full page. The preallocated queue structure will of course be larger and take more memory. Once you go above the page size you will want to decouple the “which message is mine?” part of the Pop code from the actual fetch of the message content. One way to do that would be to store only a message pointer in the queue. The message content itself can be stored in a multiple tables partitioned by some hash value you generate out of the message_id (for example, the module of the message_id). Because the fetch of the message content is not the critical, scalable path that must be synchronized, such a set of tables could even be a heap with simple INSERT/DELETE semantics as per the naive approach.

        Grouped messages are an interesting example too. If you want to do this as an atomic pop of 1000 messages, the question becomes: Why did you not put the full group into a single message in the first place? If you group 1000 messages into one Pop, you can of course drive MUCH more throughput. Though I would consider that cheating 🙂 On the other hand, it actually becomes HARDER to scale if you want to reserve ranges of messages that have not been grouped in a single pop. To do that, you would need the sequences object to guarantee atomic range reservations. I dont think it allows this in the current version of SQL. Such an implementation would have to be done using the clean LMAX, outside the database.

  4. Marian
    June 11, 2012 at 13:49

    Hi Thomas. What solution would you see for older SQL Server versions? I’m curious about how would we implement your solution without having to work with sequences. Can we replace sequences with some custom solution (maybe a big tally table?) or are we tied to Service Broker in this case?

    Thanks, very interesting article!

    • Thomas Kejser
      June 11, 2012 at 14:26

      Hi Marian

      Good question too. I cant on top of my head think of a data structure that would support a scalable “next number” semantics. IDENTITY column just dont work, because of the hot page problem described in my earlier blogs.

      The best solution I would have is to use an extended stored procedure or CLR procedure to get a scalable number similar to sequencer. However, to do that you need to own a memory region that persists throughput calls to the procedure. You might be able to hack that with extended procedures.

      • robvolk1
        July 2, 2012 at 20:14

        You could try using the sp_user_counter procedures (1-10) to increment push and pop sequences. You’d have to query sys.dm_os_performance_counters to get the current values. I wouldn’t say it’s scalable but it doesn’t require any CLR or other external components.

        Another option that’s also not a great idea is to add a rowversion/timestamp column to your queue table and base your sequence off @@DBTS. The downsides are that it can only track one value (which you can’t control) and it would increment on both pop and push.

      • Thomas Kejser
        July 2, 2012 at 21:20

        The use counter are a very cool idea. I have never benchmarks them, but I suspect they would be a lot faster than using a table to hold the data.

        Thanks for adding this idea to the design. Should meant is possible to implement in 2005 and 2008 too

  5. June 11, 2012 at 14:17

    Well..

    Service Broker (SB) allows grouping messages. For example, i have table with lots of modifications from different sources and i do want to audit that- as i need to audit modified data, not only modification fact, i cant use “Audit” feature of SQL Server. On the other hand- data modifications happens in many small transactions. Writing that all to some log table can happen in batches- so i group many messages in one. Processing happens some time later by using set based operations..

    About beginning.. What would be the way to “4) Do some work that requires network traffic or expensive memory access in a way that is safe even when exceptions are thrown.”?
    Well- i am using the first option you called wrong.. (in my case i have had a lot of thought about this, so i would never agree its wrong, although- maybe not the best too)
    I do have approximately 10 concurrent external processes that works on the SB queue, i have message groups (one group contains many message types- typically each message type requires web service call + db update). SB queue guaranties that i will process messages one by one (for example, if i process some invoice, then each message updates the same invoice- but as it happens message-by-message, no concurrency on the same invoice) and that at no point of time 2 workers works on 1 message group. If i would start to implement some other way to make sure each call succeeds at some point in time, guess i would just start to live at office to make my job done..

    But anyway, this is already off topic.. What i was thinking- SB offers a lot of infrastructure and out-of-the-box options (for free, since SQL Server 2005, all editions. DB Mail great example of SB usage), just need to learn how to properly use them.
    In some specific cases there could be better options how to get better performance- this article is one great example what to try out when it comes to it.

    • Thomas Kejser
      June 11, 2012 at 14:40

      If your solution works for your throughput requirement Janis, great.

      What I am trying to point out here is that if you are after high scalability and throughput – the flexibility of standard solutions comes with a price that you might not be able to pay. At high speed, the only way forward is to write code that makes use of what a modern machine ACTUALLY looks like – not what the API gives you. Service Broker is a great all round solution, but it is not a race car. All that being said, SB is pretty fast and I have a seen a highly tuned single queue give you 10K msg/sec, which may just be enough.

      Flexibility (like SB) is expensive. It typically takes more and harder to understand code to get good scale. To scale, you aslo have to challenge standard implementations. That is why we have things like noSQL which makes compromises on some generic functionality (ACID for example) to achieve better scale in specific use cases. This is a good thing, it keeps programmers working and makes for a more exciting job. Understanding the tradeoffs required also allows people who “get” the reality of the underlying machine architecture to charge good money for their services 🙂

    • Thomas Kejser
      June 11, 2012 at 15:09

      With regard on how to do 4) in the right case: Once you know you “have” the message (i.e. you have a pointer to the data) you no longer have to coordinate with the queue to work on it. In other words, you create two smaller transactions instead of a big one: one to get the pointer, one to work on the data. Of course, you will have to “glue” them together to appear as one. This requires you to be a bit careful about the boundary conditions.

      Your code will have to check for the condition where it has crashed AFTER it got the pointer, but BEFORE it finished doing work on the message. One way to do this is to have a unique ID number that the process generates when it starts up and it will write this ID to disk on the client (to recover, in case of a crash) along with the last message_id you successfully processed. When you pop the message, you can then stamp your unique id into the queue to mark that you “have it”. If the process crashes (or handles an exception) before it did what it needs to do with the message, it can resume from the last “checkpoint” by looking for messages that have be put aside (for example in the auxiliary table used for large messages). All of this processing work can be done without coordinating with the queue mechanism, and hence it will be scalable (which holding the lock in step 3 in the wrong case will NOT be). Now granted, some messages can now get delayed, especially if failure is the common case – such is the nature of a highly scalable queue.

      General coding rule for scale:

      1) Hold locks for as short an interval as possible
      2) Use the best lock coordination mechanism for the expected lock hold duration (see my blog on waits, latches, spins and interlocks here: http://blog.kejser.org/2011/11/09/thread-synchronization-in-sql-server/ )

  6. August 19, 2012 at 07:48

    Hi Thomas!
    Sorry that I am late to the party, but I hope you are still monitoring the comments.

    Anyway, very interesting article, especially since I am at the moment implementing a generic queue solution for our databases. Since I am targeting SQL Server 2008 (and below), I cannot really implement your solution above, (yet), but it definitely gives me food for thoughts. A couple of questions:
    1. In your article you mention you achieve ~ 100,000 msgs / sec (give or take). In your bench-marking, what constitutes one message cycle? I.e. is it a push and a pop, or something else?
    2. Also, in your bench-marking above, are you using a singleton “pusher” or concurrent “pushers”?
    3. In most queue based architectures the pop of a message is based on some eventhandler (Disruptor), listener / trigger (MSMQ), queue reader (SSB). When implementing a solution like this, what is your recommended pop solution? Would you use SSB for that – at the same time you push a message you send an SSB msg and let the queue reader take care of it, or are you just running your pop code in an infinite loop (re-reading the article I would assume you do the latter)?

    • Thomas Kejser
      August 23, 2012 at 18:14

      Hi Niels

      Glad you find the article useful

      To answer your questions (sorry for brevity, on a mobile device)

      1) a message is a full push/pop cycle
      2) the queue is multiple publisher and multiple subscriber
      3) I run an infinite loop. When the queue is empty, I sleep for a few ms and then try again

      With regards to signalling the popper: neither SSB nor MSMQ will be up to the task for handling 100K messages/sec. What you might do is to partition the output into multiple “monitor threads” that each pop as above. You would then route the message directly to the receiver based on some very fast lookup function (hash table). For this to scale, it would require that the aggregates of all receiver queues are up to the task (in other words: multiple SSB might work).

      It is likely possible to optimise the popper by going into a busy waiting spin for a few hundred cycles when the queue is Empty, retry, and only after failing to pop again: go to sleep for a few ms.

  1. May 29, 2012 at 07:27
  2. April 11, 2013 at 16:43
  3. January 30, 2014 at 05:01

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s