2013-04-22

Rolling your own message/job queue in MySQL (Part 2: Message Queue Initial Spec)

Forgive me if this is dense and/or has errors, I'm doing this as a sort of stream of consciousness dump with limited revision, not a polished blog post.

The first step to implementing a message queue in SQL is to define a table.  If we want a sane implementation, we're going to have to take a look at what sort of usage we expect and plan our schema accordingly.

Let's revisit the AMQP protocol capabilities to spec this out:

  1. some standard outcomes for transfers, through which receivers of messages can for example accept or reject messages
  2. a mechanism for indicating or requesting one of the two basic distribution patterns, competing- and non-competing- consumers, through the distribution modes move and copy respectively
  3. the ability to create nodes on-demand, e.g. for temporary response queues
  4. the ability to refine the set of message of interest to a receiver through filters
For our initial implementation, let's focus on #1.  The other items are either of limited interest to us initially, or we already get for free (filtering is inherent in SQL).

One of the items I glossed over in the prior article was the performance considerations inherent in the design.  In addition to that, we need to decide whether we want auditing of past messages.  If we don't require a log of all prior messages, then we can just delete them from the table, which is useful because this will keep our table from growing indefinitely.  That's good, because a large table will cause our queries to slow over time, and eventually the system could fall over under it's own historical weight.  Imagine a system that processing 100,000 messages a day, usually with no more than 1,000 active at a time.  By the end of the first day, every scan of the table for active messages will have to scan 99,000 past entries (in a naive worst case example) to see the active entries.  This can be made much better with good indices, but in the end you are still indexing entries you don't care about, or at least don't care about for normal operation.

A better solution is for us to archive past entries to a separate table if we require auditing entries.  This allows us the ability to use past messages for statistical queries, while not impacting the active message queue table. This allows us to make the first rule for how our schema will act:
  • For each queue that requires history, there will be a secondary table that contains archived messages that have been accepted by a client.  Upon acceptance of a message, the message will be inserted into the corresponding archive queue table and deleted from the main queue table.
Now that we've dealt with that, we can look into the actual mechanisms used to submit and receive messages.  Submission is easy enough, to submit a message we insert a now into the queue table.  This row should have a unique id and a payload.

The message itself has no state that the receiver needs to be aware of, if it is available, it can be processed.  Unfortunately SQL is not inherently suited for parceling out a single row to an individual client while preventing access by other clients.  SQL by itself does not support a method to both retrieve and update a row atomically.  This means multiple clients could conceivably SELECT the same data before updating/deleting it to signal acceptance by that client.  To achieve this we'll have to combine a few other SQL features together to simulate an atomic UPDATE and SELECT action.  The core idea of this is that we use another field to denote the retrieval status of a message.  Using this field, we can attempt to update a number of rows that are not already in the process of being handed off to a client to set them to a unique state, and then select the rows that match that state.  It looks something like this:

# Assume client_unique is empty on newly inserted messages
SET @WANTED_ROWS=10;
SET @UUID=UUID();
UPDATE queue_table
   SET transaction_unique=@UUID
 WHERE transaction_unique IS NULL
 LIMIT $WANTED_ROWS

SELECT id, payload
  FROM queue_table
 WHERE transaction_unique=@UUID

We now have a set of rows that we have designated for a single client, and we designated them in an atomic way so we don't have to worry about another client getting the same rows.  This also allows us to achieve capability #1 of the AMQP protocol, which is to allow accepting or rejecting messages at the client level.  All we have to do to accept the message is to delete it and optionally recreate it in the archive table.  No other client will be able to access it as long as they use the same access mechanism we've defined above.  to reject a message, we just set the transaction_unique back to null, and other clients will be able to pick it up.

At this point you may have noticed a problem with this algorithm.  What happens if the client crashes at some point after designating some messages as received but before accepting or rejecting them all?  To account for this we probably need a reaper of some sort that looks for and resets messages that have been in the received but not accepted state longer than a timeout period.  In our initial, naive implementation, we'll do this on client shutdown, and examine alternate solutions when we look for ways to enhance performance later.  Unfortunately, to track time since a message was received, we then need to add a time field to each row, but at least it will also track closely the accepted date of the message (we would need to add yet another time field if we want to track the creation date, which may be worth doing).

At this point we've outlined how to submit messages (plain insert), receive messages (update to unique identifier with a limit, and select for that identifier), and archive messages if we require that.  Next, we can attempt to implement this, and find out where our assumptions and ideas fall apart.  I'll be doing that using Perl, and I don't imagine anyone with programming experience will have trouble following along, but I'll try to explain any areas I think may be confusing explicitly or with comments in the code, whichever seems more appropriate.

No comments:

Post a Comment