2013-04-23

Rolling your own message/job queue in MySQL (Part 3: Message Queue Implementation)

Note: These blog posts are in a stream-of-counciousness style with limited revision so I can rapidly progress without worrying about polishing the post.  If you notice a mistake, something missing, or even just confusing portions, let me know and I'll attempt to revise that portion.

The specification

Okay, in part 2 we created a rough specification for what we need to implement a message queue in mysql.  I'll summarize them here:

  1. A unique id for each message.
  2. A field for holding a unique transaction id (initially null) to prevent multiple dequeuing clients from colliding over the same message.
  3. One or more payload fields.
  4. Queuing a message is a row insertion.
  5. Dequeuing a message is a matter of an UPDATE with a LIMIT setting a unique transaction identifier on entries that do not have one, and then a subsequent SELECT for rows matching that unique transaction identifier.
  6. Accepting a message is a matter of deleting that row from the table.  If we need a history of messages, we will move the row to an archival table.
  7. Rejecting a message if achieved by resetting that message's unique transaction identifier back to null.
That's fairly straight-forward, and quite easy to implement.  For our implementation we'll use MySQL's AUTO_INCREMENT capability for the primary key to get a unique id for each message, and contrary to the example in part 2, we'll use a simple integer for the unique transaction id, as that should be sufficient for our needs.

The implementation

With the specification taken care of, our schema is extremely simple:

CREATE TABLE `queue_test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `transaction_unique` mediumint(8) unsigned DEFAULT NULL,
  `payload` text,
  PRIMARY KEY (`id`),
  KEY `transaction_unique` (`transaction_unique`)
) ENGINE=InnoDB;

Now, it's perfectly valid to write SQL functions for queue and dequeue actions, and the accept and reject actions, but I'm not a fan of moving too much of my model code to the database.

While it does enforce an extra level of consistency on the data, I feel constrained architecturally and feel it won't scale as easily, since the database is generally much harder to scale than front-end servers in my experience.  Truthfully, there's probably a good trade-off point where there's real benefits from reducing round trips to the database by coding small quick routines as SQL functions, and our Dequeue implementation probably falls squarely into this.  I'm not going to implement that here though.

The code


Now, let's work up an implementation to this in an actual programming language.  I'm going to use Perl and DBIx::Class, because that's what I'm comfortable in and I believe it's a fairly compact but readable syntax.  If you have problems with that, feel free to translate into your favorite language.  Actually, please do, as that's the best way to get a feel for this.  For my following examples, please assume they all reside in the same file, even though I'll be presenting them in discrete chunks.

use 5.014; # implies strict
use warnings;
use utf8;

This is a fairly regular preamble in modern Perl.  We want to use newer features from version 5.14 (which implies "use strict;"), turn on warnings, and tell perl to expect possible UTF-8 data within the script (a sane default).

package Queue::Message {
    use base 'DBIx::Class::Core';
    __PACKAGE__->table( 'queue_test' );
    __PACKAGE__->add_columns(qw( id transaction_unique payload ));
    __PACKAGE__->set_primary_key( 'id' );
    __PACKAGE__->resultset_class( 'Queue::MessageBroker' );

    # To "accept" a message, we delete that row. This leaves the ORM object alone
    sub message_accept {
        my $self = shift;
        return $self->delete;
    }
    # To "reject" a message, we mark it as no longer part of a transaction
    sub message_reject {
        my $self = shift;
        return $self->update({ transaction_unique => undef })
    }

    # Convenience method to automatically convert the JSON payload
    sub message {
        my $self = shift;
        return decode_json( $self->payload );
    }

    1; # Class returns true
}

Here we have our Message class, which both defines our metadata for our table specification in DBIx::Class (in a shorthand syntax), and defines our message accept and reject methods, as defined in our specification.  Additionally, it provides a convenience method to decode the JSON we are serializing our message to so it can be easily stored.

package Queue::MessageBroker {
    use base 'DBIx::Class::ResultSet';
    use JSON;
    __PACKAGE__->load_components(
        qw(Helper::ResultSet Helper::ResultSet::Shortcut)
    );

    sub message_queue {
        my ($self,$message) = @_;
        return $self->create({ payload => encode_json( $message ) });
    }
    sub message_dequeue {
        my $self        = shift;
        my $wanted_msgs = shift || 1;
        my $uniq        = int(rand(2**24));
        # Mark some messages as part of this transaction
        my $result = $self->order_by('id')
                          ->rows($wanted_msgs)
                          ->search({ transaction_unique => undef })
                          ->update({ transaction_unique => $uniq });
        # Return marked messages
        return $self->search({ transaction_unique => $uniq })->all;
    }
    sub message_count { shift->search({ transaction_unique => undef })->count };

    1; # Class returns true
}

This is the definition of our Broker class, which as a DBIx::Class::ResultSet subclass handles the actual query operations.  Beyond some simple DBIx::Class setup for our inherited methods, plus a few convenience methods (chained order_by and rows methods), our Broker is responsible primarily for actually inserting (queuing), retrieving and deleting (dequeuing) messages, and as such methods to do those operations take up a majority of the implementation. 

package Queue {
    use base 'DBIx::Class::Schema';
    __PACKAGE__->load_classes( 'Message' );
    1; # Class returns true
}

Finally, we have our base ORM class.  This comes last because it expects to parse the object schema information from the Queue::Message class we defined above, and needs to know about any specifics we set in the ResultSet (MessageBroker) class.  Normally it can automatically find and load everything by itself from the separate files they are defined it, but since we are using a single file we have to be careful about the order we define classes.

That's it.  That the entire definition of our initial, somewhat naive implementation of a message queue on top of MySQL.  The only thing left to add is some code to drive the implementation and test that it works, so we might as well add that.

package main;
use JSON;
use Time::HiRes qw(time);

# Connect to DB with DBIx::Class schema object
my $schema = Queue->connect(
        "DBI:mysql:database=test;host=localhost;mysql_socket=/var/lib/mysql/mysql.sock",
        'queuetest_user', 'queuetest_pass', { PrintError => 0, RaiseError => 1, AutoCommit => 1 },
);
# Get our broker (DBIx::ResultSet for Message class)
my $broker = $schema->resultset('Message');

# Queue 4 messages with a high-resolution timestamp for data
$broker->message_queue({ time => time() }) for 1..4;
my @messages;

say 'Get one message, 4 -> 3';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue;
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_accept for @messages;
print "\n";

say 'Get two messages, but reject them, 3 -> 3';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue(2);
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_reject for @messages;
print "\n";

say 'Get two messages, 3 -> 1';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue(2);
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_accept for @messages;
print "\n";

say 'Try to get two messages when 1 available, 1 -> 0';
say $broker->message_count, ' messages available';
@messages = $broker->message_dequeue(2);
say scalar(@messages) . " received";
printf("message: %d, time: %s\n", $_->id, $_->message->{time}) for @messages;
$_->message_accept for @messages;
print "\n";

Next time we'll look at performance, and see how quickly we can get messages in and out of this system.

No comments:

Post a Comment