May 27, 2015

AMQP + Document Database = ZebraMQ

Part 1

Imagine taking the powerful and simplicity of AMQP and combining it with a document store you can create a method for message routing far more flexible than a hierarchy dot notation routing key.

Let’s say you have a sports team organization system. One where you may have different divisions and teams within a division. You have the administrators of the league and administrators of the division and then the coaches of each individual team. You want users who are coaches to be able to communicate with their players add well as all of their superiors. You don’t want a coach to communicate with players outside of their team and you want to allow for coaches to be able to communicate with their fellow coach’s.

To have a coach message his players would involve specifying the division dot the team then indicating that they are players not coaches or parents.

Let’s say that the publish routing key could be specified as a query with a JSON embedded fashion. Like specifying:

{
  "type": "user",
  "$or": [
    {
      "division": "orange",
      "role":"administrator"
    },
    {
      "teams.the-hawks": {
        "role": {
          "$in": [
            "player",
            "captain"
          ]
        }
      }
    }
  ]
}

That would include the orange division administrators and the players and captain of the Hawks team. Excluding non players if that team. The subscribers data entries may look like this:

[
  {
  
   "firstName": "Jon",
   "lastName": "Smith",
   "type": "user",
   "teams": {
      "the-hawks": {
        "role": "player"
      }
    }
  },
  {
    "firstName": "Tom",
    "lastName": "Klein",
    "type": "user",
    "teams": {
      "the-hawks": {
        "role": "captain"
      }
    }
  },
  {
    "firstName": "Arnold",
    "lastName": "Palmer",
    "type": "user",
    "division": "orange",
    "role": "administrator"
  }
]

This is all valid MongoDB type querying, which I think is just a very simple and readable way of expressing a query.. The idea is instead of having the application obtain the records based on the queries, and passing them along to the message bus, it would be far more efficient to have the systems indexed and integrated together. Having the data relating to who receives publishes messages present and indexed and only publishing a query that would be used to pull those corresponding records would provide a whole new way of utilizing message queues.

I see this being somewhat how ElasticSearch utilizes a document store and lucene under the hood, if we utilized a document store with RabbitMQ under the hood. Mostly as a tool for publishing messages to a complex set of users or some other endpoint. Theoretically, lambdas may also be used for subscription callbacks. Thus providing a tool to subscribe to something and store the way the subscriber responds all within the message system context.

That would be very similar to how HDFS utilizes map reduce, not having to send the data outside of message system to process its callback function. Obviously, the callback can be application based, but this way would yield a tremendous performance gain, like stored procedures in a RDBMS.

Part 2

Messaging. Queue. Data.

My initial way to implement the bare minimum functionality to prototype was by creating a RabbitMQ plugin that would be able to plugin into a data store. I still think that having a tightly coupled message bus along side its data store will be most performant. The roots of AMQP are in the finance and its most important requirement was high performance. If I am going to keep the AMQP flag I would want to maintain performance as a priority, while allowing for additional configurable options that may tip the scales with slightly negiable variances in performance but tremendous gains in security and overall functionality.

Instead of building either a message bus, or a data store or both, I would build an abstraction that sits on top of the two (or more). Essentially creating a producer / consumer bus that allows for integrating services. All “endpoints” will be authenticated and identified by set credentials. Endpoints use AMQP (and optionally HTTP, STOMP on top) to hook in and produce/consume.

Once authenticated the endpoints may publish and subscribe to “messages”. Data stores may be directly queried by publishing a request with message criteria specifying the data store in question (just a routing queue/key parameter) and the corresponding query.

Optionally, the notion of subscribing to a data store for specific data changes is also a possibility. Depending on the implementation, the system may poll intervals to check, or most likely will be able to know changes are happening when messages are received to alter the data that would match the subscribed criteria. This can be an extremely awesome feature. Not if the data store is modified by means outside of the application’s context it will be unaware of those changes!

Endpoints

Depending on the actual AMQP specifications, it may be possible to use a standard AMQP client (0.9 or 1.0) implementation to communicate with Zebra. The routing queue would need to be serialized, perhaps it can just be in the headers. A REST API would be simplest to implement.

Security Options

If data requires security restrictions that will limit messages of different “types” being mixed with different data there are several possible methodologies to handle this.

  1. Public key / private key: pair may be used to encrypt all data. In order to send certain types of data a separate key will be needed for varying levels of security. The message data will be encrypted against the security signature’s unique public key. Prior to receiving the data, the application will check each recipient’s security access and see if have access to this “security signature”, if they do, then the data is decrypted and sent over a standard SSL encryption to the recipients.

  2. Multi phase queue: Routing queries may be be specified in multiple phases. A queued phase will essentially filter out messages based on given criteria and pass the appropriate messages to the phase that matches the next set of criteria. Each is essentially a separate exchange and therefor may be configurable with all of the standard options (durable, auto delete, declare)

Federated Data Storage and Access

Zebra may be pluggable to anything that implements its interfaces. Data stores are special but are endpoints of Zebra as well. Therefore they may subscribe to messages and publish messages. Complex data queries may be performed that can merge and aggregate data from multiple data stores together. This can be accomplished by querying multiple data stores and providing a set of standard tools. Beyond anything provided, you may implement “middleware” which subscribe to common data tasks and analysis, similar to a stored procedure enabling you to create pluggable endpoints that you can query against.

Part 3

Standard RPC calls through AMQP (RabbitMQ implementation) pass a correlation-id and a reply-to field in the messages header. The reply-to field is used to indicate what queue should be used to send the corresponding response. In reality we don’t need to send that message back to it origin, rather if it is part of a sequence of events perhaps we can direct it to the next step of the sequence.

Using reply-to and multi-phase queues we can have a very powerful conditional workflow system and map reduce system. The notion of multi-phase queues is an interesting one I came up with. Taking a routing query and assign a level of priority allowing for multiple phases sequentially. Phases may be nested with conditional criteria. When the set criteria is found to be true, the next phase will be evaluated.

If you want to decimate your data in a clean and automated way you could just specify the initial criteria in the primary routing query. Then for the next query you can specify the more specific query knowing that all of the results will be viable.

Let’s say you have a web site that buys and sells high value automobiles. You have two separate databases that you utilize. One that has a listing of all of the automobiles and a wealth of information and history on each make and model. The second database just has the buyers and sellers automobiles listing what is for sell or looking to purchase. Depending on the traffic and inventory, it is possible that there may be more buyer/seller automobiles than all of the automobiles produced or the exact opposite. The point is first let’s narrow down the automobiles by the actual criteria of the car’s specification, then dealing with if the automobile is actually available or not. This all depends on what the question is that the user is attempting to answer, is it what is the car they want, or is it what cars are available that I also want?

HEADERS:

{
  "routing-keys": [
    {
      "brake-horsepower": {
        "$gte": 500
      },
      "makes": {
        "$in": [
          "Ferrari",
          "Lamborghini",
          "Maserati",
          "Aston Martin"
        ]
      }, 
      {
        "price": {
          "$lte": "250,000"
        },
        "condition": "new"
      }
    },
    {
      "location": {
        "$lte": [100, "miles"]
      }
    }
  ]
}

This example is dealing with cars. The first set of criteria is to filter all messages to those that have brake horsepower greater than or equal to 500 and only one of the specified makes.  After this, the next place the recipient of that message will do is publish the new message with the routing query specifying that the price must be less than or equal to $250K and I only want to purchase a new car.

The second message will alert all of the owners of cars that match the secondary criteria a message letting them know that someone may be interested in purchasing their car. The third and final criteria is that I would really be interested if possible of seeing the car in person and that it is located within 100 miles from where I live. This final result will return back to the initial publisher user so that they may view the final list.

As a message makes its way along its path, it may accumulate data at each phase and transformation. Beyond the messages next path, it may acquire new data. All the changes made at each step of the messages journey may be associated with the message. This provides a way to gather information through a workflow process. There is a great deal of flexibility in this that we will explore shortly.

Let’s say I have a piece of data that I want to analyze to see if it something that needs to be reported to the system administrator. Perhaps these analyzers pass along their analysis to be fed into the next analyzer or maybe it will be attached to the final result.

Perhaps the first analyzer will detect the applications’s type. If the applications’s type falls in to two categories it will go to the corresponding category. Then let’s say that we want to send the application to be tested in a sandbox and examine the function calls it makes. Finally, we would take a list of the URL’s that are called by the application and if any of them are in the supplied list then the application will be returned  to  be further evaluated by the user. The user may want to see the gathered data alongside the fact that it matched the final criteria. This is very similar to a traditional map-reduce pattern used in distributed computing systems.

Each of the links along the chain, may inject its results into the “route-data” header field. Such “route-data” may be used in routing queries. For instance let’s say you are looking for an application that is a PE (Portable Executable) file and is compiled for a 64-bit architecture. The next routing query is:

{
 "$routeData.$0.fileType": "PE",
 "$routeData.$0.architecture": "64-bit"
}

A possible usage of “$routeData” can be used to show if there are any headers in the “route-headers” field that satisfied the supplied condition found in the first route.

Zebra is a fresh approach to combining AMQP style messaging alongside document based databases. AMQP remains the wire transport protocol and Zebra sits on top of the database and messaging layer as an abstraction. It translates the queries into subscribers and then publishes the appropriate messages to corresponding subscriber’s queue.