↓ Archives ↓

Category → Code

Solving monitoring state storage problems using Redis

Redis is an in-memory key-value data store that provides a small number of primitives suitable to the task of building monitoring systems. As a lot of us are hacking in this space I thought I’d write a blog post summarizing where I’ve been using it in a little Sensu like monitoring system I have been working on on and off.

There’s some monitoring related events coming up like MonitoringLove in Antwerp and Monitorama in Boston – I will be attending both and I hope a few members in the community will create similar background posts on various interesting areas before these events.

I’ve only recently started looking at Redis but really like it. It’s a very light weight daemon written in C with fantastic documentation detailing things like each commands performance characteristics and most documantation pages are live in that they have a REPL right on the page like the SET page – note you can type into the code sample and see your changes in real time. It is sponsored by VMWare and released under the 3 clause BSD license.

Redis Data Types


Redis provides a few common data structures:

  • Normal key-value storage where every key has just one string value
  • Hashes where every key contains a hash of key-values strings
  • Lists of strings – basically just plain old Arrays sorted in insertion order that allows duplicate values
  • Sets are a bit like Lists but with the addition that a given value can only appear in a list once
  • Sorted Sets are sets that in addition to the value also have a weight associated with it, the set is indexed by weight

All the keys support things like expiry based on time and TTL calculation. Additionally it also supports PubSub.

At first it can be hard to imagine how you’d use a data store with only these few data types and capable of only storing strings for monitoring but with a bit of creativity it can be really very useful.

The full reference about all the types can be found in the Redis Docs: Data Types

Monitoring Needs


Monitoring systems generally need a number of different types of storage. These are configuration, event archiving and status and alert tracking. There are more but these are the big ticket items, of the 3 I am only going to focus on the last one – Status and Alert Tracking here.

Status tracking is essentially transient data. If you loose your status view it’s not really a big deal it will be recreated quite quickly as new check results come in. Worst case you’ll get some alerts again that you recently got. This fits well with Redis that doesn’t always commit data soon as it receives it – it flushes roughly every second from memory to disk.

Redis does not provide much by way of SSL or strong authentication so I tend to consider it a single node IPC system rather than say a generic PubSub system. I feed data into a node using system like ActiveMQ and then for comms and state tracking on a single node I’ll use Redis.

I’ll show how it can be used to solve the following monitoring related storage/messaging problems:

  • Check Status – a check like load on every node
  • Staleness Tracking – you need to know when a node is not receiving check results so you can do alive checks
  • Event Notification – your core monitoring system will likely feed into alerters like Opsgenie and metric storage like Graphite
  • Alert Tracking – you need to know when you last sent an alert and when you can alert again based on an interval like every 2 hours

Check Status


The check is generally the main item of monitoring systems. Something configures a check like load and then every node gets check results for this item, the monitoring system has to track the status of the checks on a per node basis.

In my example a check result looks more or less like this:

{"lastcheck"        => "1357490521", 
 "count"            => "1143", 
 "exitcode"         => "0", 
 "output"           => "OK - load average: 0.23, 0.10, 0.02", 
 "last_state_change"=> "1357412507",
 "perfdata"         => '{"load15":0.02,"load5":0.1,"load1":0.23}',
 "check"            => "load",
 "host"             => "dev2.devco.net"}

This is standard stuff and the most boring part – you might guess this goes into a Hash and you’ll be right. Note the count item there Redis has special handling for counters and I’ll show that in a minute.

By convention Redis keys are name spaced by a : so I’d store the check status for a specific node + check combination in a key like status:example.net:load

Updating or creating a new hash is real easy – just write to it:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def save_check(check)
  key = "status:%s:%s" % [check.host, check.check]
 
  check.last_state_change = @redis.hget(key, "last_state_change")
  check.previous_exitcode = @redis.hget(key, "exitcode")
 
  @redis.multi do
    @redis.hset(key, "host", check.host)
    @redis.hset(key, "check", check.check)
    @redis.hset(key, "exitcode", check.exitcode)
    @redis.hset(key, "lastcheck", check.last_check)
    @redis.hset(key, "last_state_change", check.last_state_change)
    @redis.hset(key, "output", check.output)
    @redis.hset(key, "perfdata", check.perfdata)
 
    unless check.changed_state?
      @redis.hincrby(key, "count", 1)
    else
      @redis.hset(key, "count", 1)
    end
  end
 
  check.count = @redis.hget(key, "count")
end

Here I assume we have a object that represents a check result called check and we’re more or less just fetching/updating data in it. I first retrieve the previously saved state of exitcode and last state change time and save those into the object. The object will do some internal state management to determine if the current check result represents a changed state – OK to WARNING etc – based on this information.

The @redis.multi starts a transaction, everything inside the block will be written in an atomic way by the Redis server thus ensuring we do not have any half-baked state while other parts of the system might be reading the status of this check.

As I said the check determines if the current result is a state change when I set the previous exitcode on line 5 this means lines 16-20 will either set the count to 1 if it’s a change or just increment the count if not. We use the internal Redis counter handling on line 17 to avoid having to first fetch the count and then update it and saving it, this saves a round trip to the database.

You can now just retrieve the whole hash with the HGETALL command, even on the command line:

% redis-cli hgetall status:dev2.devco.net:load
 1) "check"
 2) "load"
 3) "host"
 4) "dev2.devco.net"
 5) "output"
 6) "OK - load average: 0.00, 0.00, 0.00"
 7) "lastcheck"
 8) "1357494721"
 9) "exitcode"
10) "0"
11) "perfdata"
12) "{\"load15\":0.0,\"load5\":0.0,\"load1\":0.0}"
13) "last_state_change"
14) "1357412507"
15) "count"
16) "1178"

References: Redis Hashes, MULTI, HSET, HINCRBY, HGET, HGETALL

Staleness Tracking


Staleness Tracking here means we want to know when last we saw any data about a node, if the node is not providing information we need to go and see what happened to it. Maybe it’s up but the data sender died or maybe it’s crashed.

This is where we really start using some of the Redis features to save us time. We need to track when last we saw a specific node and then we have to be able to quickly find all nodes not seen within certain amount of time like 120 seconds.

We could retrieve all the check results and check their last updated time and so figure it out but that’s not optimal.

This is what Sorted Lists are for. Remember Sorted Lists have a weight and orders the list by the weight, if we use the timestamp that we last received data at for a host as the weight it means we can very quickly fetch a list of stale hosts.

1
2
3
def update_host_last_seen(host, time)
  @redis.zadd("host:last_seen", time, host)
end

When we call this code like update_host_last_seen(“dev2.devco.net”, Time.now.utc.to_i) the host will either be added to or updated in the Sorted List based on the current UTC time. We do this every time we save a new result set with the code in the previous section.

To get a list of hosts that we have not seen in the last 120 seconds is really easy now:

1
2
3
def get_stale_hosts(age)
  @redis.zrangebyscore("host:last_seen", 0, (Time.now.utc.to_i - age))
end

If we call this with an age like 120 we’ll get an array of nodes that have not had any data within the last 120 seconds.

You can do the same check on the CLI, this shows all the machines not seen in the last 60 seconds:

% redis-cli zrangebyscore host:last_seen 0 $(expr $(date +%s) - 60)
 1) "dev1.devco.net"

Reference: Sorted Sets, ZADD, ZRANGEBYSCORE

Event Notification


When a check result enters the system thats either a state change, a problem or have metrics associated it we’d want to send those on to other pieces of code.

We don’t know or care who those interested parties are we only care that there might be some interested parties – it might be something writing to Graphite or OpenTSDB or both at the same time or something alerting to Opsgenie or Pager Duty. This is a classic use case for PubSub and Redis has a good PubSub subsystem that we’ll use for this.

I am only going to show the metrics publishing – problem and state changes are very similar:

1
2
3
4
5
6
7
8
9
10
11
def publish_metrics(check)
  if check.has_perfdata?
    msg = {"metrics" => check.perfdata, "type" => "metrics", "time" => check.last_check, "host" => check.host, "check" => check.check}.to_json
    publish(["metrics", check.host, check.check], msg)
  end
end
 
def publish(type, message)
  target = ["overwatch", Array(type).join(":")].join(":")
  @redis.publish(target, message)
end

This is pretty simple stuff, we’re just publishing some JSON to a named destination like overwatch:metrics:dev1.devco.net:load. We can now write small standalone single function tools that consume this stream of metrics and send it wherever we like – like Graphite or OpenTSDB.

We publish similar events for any incoming check result that is not OK and also for any state transition like CRITICAL to OK, these would be consumed by alerter handlers that might feed pagers or SMS.

We’re publishing these alerts to to destinations that include the host and specific check – this way we can very easily create individual host views of activity by doing pattern based subscribes.

Reference: PubSub, PUBLISH

Alert Tracking


Alert Tracking means keeping track of which alerts we’ve already sent and when we’ll need to send them again like only after 2 hours of the same problem and not on every check result which might come in every minute.

Leading on from the previous section we’d just consume the problem and state change PubSub channels and react on messages from those:

A possible consumer of this might look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@redis.psubscribe("overwatch:state_change:*", "overwatch:issues:*") do |on|
  on.pmessage do |channel, message|
    event = JSON.parse(message)
 
    case event["type"]
      when "issue"
        sender.notify_issue(event["issue"]["exitcode"], event["host"], event["check"], event["issue"]["output"])
      when "state_change"
        if event["state_change"]["exitcode"] == 0
          sender.notify_recovery(event["host"], event["check"], event["state_change"]["output"])
        end
    end
  end
end

This subscribes to the 2 channels and pass the incoming events to a notifier. Note we’re using the patterns here to catch all alerts and changes for all hosts.

The problem here is that without any special handling this is going to fire off alerts every minute assuming we check the load every minute. This is where Redis expiry of keys come in.

We’ll need to track which messages we have sent when and on any state change clear the tracking thus restarting the counters.

So we’ll just add keys called “alert:dev2.devco.net:load:3″ to indicate an UNKNOWN state alert for load on dev2.devco.net:

1
2
3
4
5
def record_alert(host, check, status, expire=7200)
  key = "alert:%s:%s:%d" % [host, check, status]
  @redis.set(key, 1)
  @redis.expire(key, expire)
end

This takes an expire time which defaults to 2 hours and tells redis to just remove the key when its time is up.

With this we need a way to figure out if we can send again:

1
2
3
4
def alert_ttl(host, check, status)
  key = "alert:%s:%s:%d" % [host, check, status]
  @redis.ttl(key)
end

This will return the amount of seconds till next alert and -1 if we are ready to send again

And finally on every state change we need to just purge all the tracking for a given node + check combo. The reason for this is if we notified on CRITICAL a minute ago then the service recovers to OK but soon goes to CRITICAL again this most recent CRITICAL alert will be suppressed as part of the previous cycle of alerts.

1
2
3
def clear_alert_ttls(host, check)
  @redis.del(@redis.keys.grep(/^alert:#{host}:#{check}:\d/))
end

So now I can show the two methods that will actually publish the alerts:

The first notifies of issues but only every @interval seconds and it uses the alert_ttl helper above to determine if it should or shouldn’t send:

1
2
3
4
5
6
7
8
9
10
11
12
def notify_issue(exitcode, host, check, output)
  if (ttl = @storage.alert_ttl(host, check, exitcode)) == -1
    subject = "%s %s#%s" % [status_for_code(exitcode), host, check]
    message = "%s: %s" % [subject, output]
 
    send(message, subject, @recipients)
 
    @redis.record_alert(host, check, exitcode, @alert_interval)
  else
    Log.info("Not alerting %s#%s due to interval restrictions, next alert in %d seconds" % [host, check, ttl])
  end
end

The second will publish recovery notices and we’d always want those and they will not repeat, here we clear all the previous alert tracking to avoid incorrect alert surpressions:

1
2
3
4
5
6
7
8
def notify_recovery(host, check, output)
  subject = "RECOVERY %s#%s" % [host, check]
  message = "%s: %s" % [subject, output]
 
  send_alert(message, subject, @recipients)
 
  @redis.clear_alert_ttls(host, check)
end

References: SET, EXPIRE, SUBSCRIBE, TTL, DEL

Conclusion


This covered a few Redis basics but it’s a very rich system that can be used in many areas so if you are interested spend some quality time with its docs.

Using its facilities saved me a ton of effort while working on a small monitoring system. It is fast and light weight and enable cross language collaboration that I’d have found hard to replicate in a performant manner without it.

Scaling Nagios NRPE checks

Most Nagios systems does a lot of forking especially those built around something like NRPE where each check is a connection to be made to a remote system. On one hand I like NRPE in that it puts the check logic on the nodes using a standard plugin format and provides a fairly re-usable configuration file but on the other hand the fact that the Nagios machine has to do all this forking has never been good for me.

In the past I’ve shown one way to scale checks by aggregate all results for a specific check into one result but this is not always a good fit as pointed out in the post. I’ve now built a system that use the same underlying MCollective infrastructure as in the previous post but without the aggregation.

I have a pair of Nagios nodes – one in the UK and one in France – and they are on quite low spec VMs doing around 400 checks each. The problems I have are:

  • The machines are constantly loaded under all the forking, one would sit on 1.5 Load Average almost all the time
  • They use a lot of RAM and it’s quite spikey, if something is wrong especially I’d have a lot of checks concurrently so the machines have to be bigger than I want them
  • The check frequency is quite low in the usual Nagios manner, sometimes 10 minutes can go by without a check
  • The check results do not represent a point in time, I have no idea how the check results of node1 relate to those on node2 as they can be taken anywhere in the last 10 minutes

These are standard Nagios complaints though and there are many more but these ones specifically is what I wanted to address right now with the system I am showing here.

Probably not a surprise but the solution is built on MCollective, it uses the existing MCollective NRPE agent and the existing queueing infrastructure to push the forking to each individual node – they would do this anyway for every NRPE check – and read the results off a queue and spool it into the Nagios command file as Passive results. Internally it splits the traditional MCollective request-response system into a async processing system using the technique I blogged about before.

As you can see the system is made up of a few components:

  • The Scheduler takes care of publishing requests for checks
  • MCollective and the middleware provides AAA and transport
  • The nodes all run the MCollective NRPE agent which put their replies on the Queue
  • The Receiver reads the results from the Queue and write them to the Nagios command file

The Scheduler

The scheduler daemon is written using the excellent Rufus Scheduler gem – if you do not know it you totally should check it out, it solves many many problems. Rufus allows me to create simple checks on intervals like 60s and I combine these checks with MCollective filters to create a simple check configuration as below:

nrpe 'check_bacula_main', '6h', 'bacula::node monitored_by=monitor1'
nrpe 'check_disks', '60s', 'monitored_by=monitor1'
nrpe 'check_greylistd', '60s', 'greylistd monitored_by=monitor1'
nrpe 'check_load', '60s', 'monitored_by=monitor1'
nrpe 'check_mailq', '60s', 'monitored_by=monitor1'
nrpe 'check_mongodb', '60s', 'mongodb monitored_by=monitor1'
nrpe 'check_mysql', '60s', 'mysql::server monitored_by=monitor1'
nrpe 'check_pki', '60m', 'monitored_by=monitor1'
nrpe 'check_swap', '60s', 'monitored_by=monitor1'
nrpe 'check_totalprocs', '60s', 'monitored_by=monitor1'
nrpe 'check_zombieprocs', '60s', 'monitored_by=monitor1'

Taking the first line it says: Run the check_bacula_main NRPE check every 6 hours on machines with the bacula::node Puppet Class and with the fact monitored_by=monitor1. I had the monitored_by fact already to assist in building my Nagios configs using a simple search based approach in Puppet.

When the scheduler starts it will log:

W, [2012-12-31T22:10:12.186789 #32043]  WARN -- : activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp.example.net:6163
W, [2012-12-31T22:10:12.193405 #32043]  WARN -- : activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp.example.net:6163
I, [2012-12-31T22:10:12.196387 #32043]  INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 19709s
I, [2012-12-31T22:10:12.196632 #32043]  INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_disks every 60s matching 'monitored_by=monitor1', first in 57s
I, [2012-12-31T22:10:12.197173 #32043]  INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_load every 60s matching 'monitored_by=monitor1', first in 23s
I, [2012-12-31T22:10:35.326301 #32043]  INFO -- : scheduler.rb:26:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'

You can see it reads the file and schedule the first check a random interval between now and the interval window this spread out the checks.

The Receiver


The receiver has almost no config, it just need to know what queue to read and where your Nagios command file lives, it logs:

I, [2013-01-01T11:49:38.295661 #23628]  INFO -- : mnrpes.rb:35:in `daemonize' Starting in the background
W, [2013-01-01T11:49:38.302045 #23631]  WARN -- : activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp.example.net:6163
W, [2013-01-01T11:49:38.310853 #23631]  WARN -- : activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp.example.net:6163
I, [2013-01-01T11:49:38.310980 #23631]  INFO -- : receiver.rb:16:in `subscribe' Subscribing to /queue/mcollective.nagios_passive_results_monitor1
I, [2013-01-01T11:49:41.572362 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040981] PROCESS_SERVICE_CHECK_RESULT;node1.example.net;mongodb;0;OK: connected, databases admin local my_db puppet mcollective
I, [2013-01-01T11:49:42.509061 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040982] PROCESS_SERVICE_CHECK_RESULT;node2.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
I, [2013-01-01T11:49:42.510574 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040982] PROCESS_SERVICE_CHECK_RESULT;node3.example.net;zombieprocs;0;PROCS OK: 1 process with STATE = Z

As the results get pushed to Nagios I see the following in its logs:

[1357042122] EXTERNAL COMMAND: PROCESS_SERVICE_CHECK_RESULT;node1.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
[1357042124] PASSIVE SERVICE CHECK: node1.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z

Did it solve my problems?


I listed the set of problems I wanted to solve so it’s worth evaluating if I did solve them properly.

Less load and RAM use on the Nagios nodes

My Nagios nodes have gone from load averages of 1.5 to 0.1 or 0.0, they are doing nothing, they use a lot less RAM and I have removed some of the RAM from the one and given it to my Jenkins VM instead, it was a huge win. The sender and receiver is quite light on resources as you can see below:

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
nagios    9757  0.4  1.8 130132 36060 ?        S     2012   3:41 ruby /usr/bin/mnrpes-receiver --pid=/var/run/mnrpes/mnrpes-receiver.pid --config=/etc/mnrpes/mnrpes-receiver.cfg
nagios    9902  0.3  1.4 120056 27612 ?        Sl    2012   2:22 ruby /usr/bin/mnrpes-scheduler --pid=/var/run/mnrpes/mnrpes-scheduler.pid --config=/etc/mnrpes/mnrpes-scheduler.cfg

On the RAM side I now never get a pile up of many checks. I do have the stale detection enabled on my Nagios template so if something breaks in the scheduler/receiver/broker triplet Nagios will still try to do a traditional check to see what’s going on but that’s bearable.

Check frequency too low

With this system I could do my checks every 10 seconds without any problems, I settled on 60 seconds as that’s perfect for me. Rufus scheduler does a great job of managing that and the requests from the scheduler are effectively fire and forget as long as the broker is up.

Results are spread over 10 minutes

The problem with the results for load on node1 and node2 having no temporal correlation is gone too now, because I use MCollectives parallel nature all the load checks happen at the same time:

Here is the publisher:

I, [2013-01-01T12:00:14.296455 #20661]  INFO -- : scheduler.rb:26:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'

…and the receiver:

I, [2013-01-01T12:00:14.380981 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node1.example.net;load;0;OK - load average: 0.92, 0.54, 0.42|load1=0.920;9.000;10.000;0; load5=0.540;8.000;9.000;0; load15=0.420;7.000;8.000;0; 
I, [2013-01-01T12:00:14.383875 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node2.example.net;load;0;OK - load average: 0.00, 0.00, 0.00|load1=0.000;1.500;2.000;0; load5=0.000;1.500;2.000;0; load15=0.000;1.500;2.000;0; 
I, [2013-01-01T12:00:14.387427 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node3.example.net;load;0;OK - load average: 0.02, 0.07, 0.07|load1=0.020;1.500;2.000;0; load5=0.070;1.500;2.000;0; load15=0.070;1.500;2.000;0; 
I, [2013-01-01T12:00:14.388754 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node4.example.net;load;0;OK - load average: 0.07, 0.02, 0.00|load1=0.070;1.500;2.000;0; load5=0.020;1.500;2.000;0; load15=0.000;1.500;2.000;0; 
I, [2013-01-01T12:00:14.404650 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node5.example.net;load;0;OK - load average: 0.03, 0.09, 0.04|load1=0.030;1.500;2.000;0; load5=0.090;1.500;2.000;0; load15=0.040;1.500;2.000;0; 
I, [2013-01-01T12:00:14.405689 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node6.example.net;load;0;OK - load average: 0.06, 0.06, 0.07|load1=0.060;3.000;4.000;0; load5=0.060;3.000;4.000;0; load15=0.070;3.000;4.000;0; 
I, [2013-01-01T12:00:14.489590 #23631]  INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node7.example.net;load;0;OK - load average: 0.06, 0.14, 0.14|load1=0.060;1.500;2.000;0; load5=0.140;1.500;2.000;0; load15=0.140;1.500;2.000;0;

All the results are from the same second, win.

Conclusion

So my scaling issues on my small site is solved and I think the way this is built will work for many people. The code is on GitHub and requires MCollective 2.2.0 or newer.

Having reused the MCollective and Rufus libraries for all the legwork including logging, daemonizing, broker connectivity, addressing and security I was able to build this in a very short time, the total code base is only 237 lines excluding packaging etc. which is a really low number of lines for what it does.

Simple Puppet Module Structure Redux

Back in September 2009 I wrote a blog post titled “Simple Puppet Module Structure” which introduced a simple approach to writing Puppet Modules. This post has been hugely popular in the community – but much has changed in Puppet since then so it is time for an updated version of that post.

As before I will show a simple module for a common scenario. Rather than considering this module a blueprint for every module out there you should instead study its design and use it as a starting point when writing your own modules. You can build on it and adapt it but the basic approach should translate well to more complex modules.

I should note that while I work for Puppet Labs I do not know if this reflect any kind of standard suggested approach by Puppet Labs – this is what I do when managing my own machines and no more.

The most important deliverables


When writing a module I have a few things I keep in mind – these are all centered around down stream users of my module and future-me trying to figure out what is going on:

  • A module should have a single entry point where someone reviewing it can get an overview of it’s behavior
  • Modules that have configuration should be configurable in a single way and single place
  • Modules should be made up of several single-responsibility classes. As far as possible these classes should be private details hidden from the user
  • For the common use cases, users should not need to know individual resource names
  • For the most common use case, users should not need to provide any parameters, defaults should be used
  • Modules I write should have a consistant design and behaviour

The module layout I will present below is designed so that someone who is curious about the behaviour of the module only have to look in the init.pp to see:

  • All the parameters and their defaults used to configure the behaviour of the module
  • Overview of the internal structure of the module by way of descriptive class names
  • Relationships and notifications that exist inside the module and what classes they can notify

This design will never remove the need for documenting your modules but a clear design will guide your users in discovering the internals of your module and how they interact with it.

More important than what a module does is how accessible it is to you and others, how easy is it to understand, debug and extend.

Thinking about your module


For this post I will write a very simple module to manage NTP – it really is very simple, you should check the Forge for more complete ones.

To go from nowhere to having NTP on your machine you would have to do:

  • Install the packages and any dependencies
  • Write out appropriate configuration files with some environment specific values
  • Start the service or services you need once the configuration files are written. Restart it if the config file change later.

There is a clear implied dependency chain here and this basic pattern applies to most pieces of software.

These 3 points basically translate to distinct groups of actions and sticking with the above principal of single function classes I will create a class for each group.

To keep things clear and obvious I will call these class install, config and service. The names don’t matter as long as they are descriptive – but you really should pick something and stick with it in all your modules.

Writing the module


I’ll show the 3 classes that does the heavy lifting here and discuss parts of them afterwards:

class ntp::install {
   package{'ntpd':
      ensure => $ntp::version
   }
}
 
class ntp::config {
   $ntpservers = $ntp::ntpservers
 
   File{
      owner   => root,
      group   => root,
      mode    => 644,
   }
 
   file{'/etc/ntp.conf':
         content => template('ntp/ntp.conf.erb');
 
        '/etc/ntp/step-tickers':
         content => template('ntp/step-tickers.erb');
    }
}
 
class ntp::service {
   $ensure = $ntp::start ? {true => running, default => stopped}
 
   service{"ntp":
      ensure  => $ensure,
      enable  => $ntp::enable,
   }
}

Here I have 3 classes that serve a single purpose each and do not have any details like relationships, ordering or notifications in them. They roughly just do the one thing they are supposed to do.

Take a look at each class and you will see they use variables like $ntp::version, $ntp::ntpservers etc. These are variables from the the main ntp class, lets take a quick look at that class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# == Class: ntp
#
# A basic module to manage NTP
#
# === Parameters
# [*version*]
#   The package version to install
#
# [*ntpservers*]
#   An array of NTP servers to use on this node
#
# [*enable*]
#   Should the service be enabled during boot time?
#
# [*start*]
#   Should the service be started by Puppet
class ntp(
   $version = "present",
   $ntpservers = ["1.pool.ntp.org", "2.pool.ntp.org"],
   $enable = true,
   $start = true
) {
   class{'ntp::install': } ->
   class{'ntp::config': } ~>
   class{'ntp::service': } ->
   Class["ntp"]
}

This is the main entry point into the module that was mentioned earlier. All the variables the module use is documented in a single place, the basic design and parts of the module is clear and you can see that the service class can be notified and the relationships between the parts.

I use the new chaining features to inject the dependencies and relationships here which surfaces these important interactions between the various classes back up to the main entry class for users to see easily.

All this information is immediately available in the obvious place without looking at any additional files or by being bogged down with implementation details.

Line 26 here requires some extra explanation – This ensures that all the NTP member classes are applied before this main NTP class so that cases where someone say require => Class["ntp"] elsewhere they can be sure the associated tasks are completed. This is a light weight version of the Anchor Pattern.

Using the module

Let’s look at how you might use this module from knowing nothing.

Ideally simply including the main entry point on a node should be enough:

include ntp

This does what you’d generally expect – installs, configures and starts the NTP service.

After looking at the init.pp you can now supply some new values for some of the parameters to tune it for your needs:

class{"ntp": ntpservers => ["ntp1.example.com", "ntp2.example.com"]}

Or you can use the new data bindings in Puppet 3 and supply new data in Hiera to override these variables by supplying data for the keys like ntp::ntpservers.

Finally if for some or other related reason you need to restart the service you know from looking at the ntp class that you can notify the ntp::service class to achieve that.

Using classes for relationships

There’s a huge thing to note here in the main ntp class. I specify all relationships and notifies on the classes and not the resources themselves.

As personal style I only mention resources by name inside a class that contains that resource – if I ever have to access a resource outside of the class that it is contained in I access the class.

I would not write:

class ntp::service {
   service{"ntp": require => File["/etc/ntp.conf"]}
}

These are many issues with this approach that mostly come down to maintenance headaches. Here I require the ntp config file but what if a service have more than one file? Do you then list all the files? Do you later edit every class that reference these when another file gets managed?

These issues quickly multiply in a large code base. By always acting on class names and by creating many small single purpose classes as here I effectively contain these by grouping names and not individual resource names. This way any future refactoring of individual classes would not have an impact on other classes.

So the above snippet would rather be something like this:

class ntp::service {
   service{"ntp": require => Class["ntp::config"]}
}

Here I require the containing class and not the resource. This has the effect of requiring all resources inside that class. This has the effect of isolating changes to that class and avoiding a situation where users have to worry about the internal implementation details of the other class. Along the same lines you can also notify a class – and all resources inside that class gets notified.

I only include other classes at the top ntp level and never have include statements in my classes like ntp::confg and so forth – this means when I require the class ntp::config or notify ntp::service I get just what I want and no more.

If you create big complex classes you run the risk of having refreshonly execs that relate to configuration or installation associated with services in the same class which would have disastrous consequences if you notify the wrong thing or if a user do not study your code before using it.

A consistant style of small single purpose classes named descriptively avoid these and other problems.

What we learned and further links

There is a lot to learn here and much of it is about soft issues like the value of consistency and clarity of design and thinking about your users – and your future self.

On the technical side you should learn about the effects of relationships and notifications based on containing classes and not by naming resources by name.

And we came across a number of recently added Puppet features:

Parameterized Classes are used to provide multiple convenient methods for supplying data to your module – defaults in the module, specifically in code, using Hiera and (not shown here) an ENC.

Chaining Arrows are used in the main class to inject the dependencies and notifications in a way that is visible without having to study each individual class.

These are important new additions to Puppet. Some new features like Parameterised classes are not quite ready for prime time imho but in Puppet 3 when combined with the data bindings a lot of the pain points have been removed.

Finally there are a number of useful things I did not mention here. Specifically you should study the Puppet Style Guide and use the Puppet Lint tool to validate your modules comply. You should consider writing tests for your modules using rspec-puppet and finally share it on the Puppet Forge.

And perhaps most importantly – do not reinvent the wheel, check the Forge first.

Using MongoDB as Publish Subscribe middleware

Yesterday I mentioned on Twitter that I was playing with the MongoDB pub/sub features and that it worked quite well for my needs.

What I didn’t mention was that the documentation and blog posts were a bit all over the show and the Ruby examples I saw didn’t actually do what they said they did so I’ll show in this post working code and some basic approaches I took to deal with per consumer destinations etc.

Why?


So why would anyone want to use MongoDB as a queue or indeed MongoDB at all since everyone knows it’s unusable and does not save any data ever and kills kittens?

Actually MongoDB is a really nice database but like most NoSQL databases the thing to know about it is what shortcuts it takes with your data to do it’s magic. Knowing this you have to evaluate its suitability to your specific problem and if it’s not suitable, don’t use it.

It’s fast and has a flexible query system to search over arbitrary structured JSON data. Yes it has some interesting ideas about data durability but this is well known by now and if your needs match it’s features it’s not bad.

For shops with needs well suited to MongoDB who might want to add some queueing ability it can be daunting to bring in new tech like RabbitMQ or ActiveMQ because it brings new unknowns requires an investment in more monitoring, training and learning by making mistakes. If you already have a Mongo instance and know its quirks using it for a queue might not be such a terrible thing.

Additionally MongoDB is really easy to get going and generally I find for my work loads it just works with little maintenance required.

So my interest in its queueing abilities lies in providing a simpler ‘getting started’ for MCollective. New MCollective has pluggable discovery which works really well when discovering against a MongoDB cache of registration data so it would be nice if a simple starter edition setup could include both the queue and discovery data in one simple bit of software.

There are other options of course like Redis and I’ll evaluate them but of the various options MongoDB is the only one that comes with both pubsub and searching/querying capabilities that does what I need, isn’t written in Java and has OS packages for most distros easily available.

Background


In MongoDB when you do a find on a collection the returned result set is a Cursor. Cursors can have a number of modes or flags associated with them. Further it has something called Capped Collections that are fixed size and rotate old data out when they fill up.

The combination of some of these Cursor flags and Capped Collections enables a kind of tail -f behavior that works like a queue.

When you have a collection it usually returns nil when you reached the end of your results as can be seen here:

>> coll = db.collection('commands')
=> Mongo::DB:0x7fa1ae005f58 ....>
>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.next_document
=> nil

Here we opened a collections and did a find. We moved to the end of the results and fetched the next result which immediately returned a nil indicating there’s nothing new.

Lets see how we can change the behavior of this collection that instead of returning immediately it will block for a while waiting for a new document and then return a nil after after a timeout if nothing new was found:

>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
=> 2
>> cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
=> 34
>> loop { puts "#{Time.now}> Tailing...."; p cursor.next_document }
Fri Aug 31 13:40:19 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:21 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:23 +0100 2012> Tailing....
nil

Now instead of immediately returning a nil it will wait 2 to 3 seconds at the end of the collection incase new data comes.

So this is your consumer to the queue called commands here, anyone who saves data into the collection are producers. It’s quite light on resources on both the client and the MongoDB server, on a fairly low spec VM I was easily able to run 50+ consumers, a MongoDB instance and some producers.

MongoDB calls this feature Tailable Cursors and the thing the Ruby docs don’t tell you and that the Ruby library does not do for you is set the Mongo::Cursor::OP_QUERY_AWAIT_DATA option as above. Without this option it will still return nil immediately and the example code has a sleep to combat a infinite high resource usage loop. The proposed sleeping solution to the problem makes it completely pointless as a high performance Queue but the Mongo::Cursor::OP_QUERY_AWAIT_DATA option sorts that out.

A simple message structure


In my use case I have to be able to send messages to all consumers or sometimes just to a specific consumer. In other middleware you do this with different queue names or perhaps headers and then do selective subscribes to the queue picking off just the messages you are interested in.

I chose to use a single capped collection and use a structure similar to middleware headers to identify message targets:

{"headers" : {"target" : "all"},
 "payload" : "data"}

{"headers" : {"target" : "some.consumer"},
 "payload" : "data"}

The 2 examples show different target headers in one I am targeting everyone consuming the queue and in the 2nd one just a specific consumer. The payload can be anything, text, hashes whatever your needs are.

Lets look at a consumer that has a consumer name and that’s interested in messages directed at it or all consumers:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@consumer_identity = "example.com"
@database = "queue"
@collection = "commands"
 
def get_collection
  @db ||= Mongo::Connection.new().db(@database)
 
  until @db.connection.active?
     puts ">>> Retrying database connection..."
     @db.connection.close
     @db.connection.connect
 
     sleep 0.5 unless @db.connection.active?
  end
 
  unless @db.collections.include?(@collection)
    coll = @db.create_collection(@collection, :capped => true, :size => 10240)
  else
    coll = @db.collection(@collection)
  end
 
  coll
end
 
loop do
  begin
    cursor = get_collection.find({"headers.target" => {"$in" => [@consumer_identity, "all"]}})
 
    # ignore old stuff
    cursor.skip(cursor.count)
 
    # blocking tail reads
    cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
    cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
 
    begin
      # fetch all the docs forever
      loop do
        if doc = cursor.next_document
          p doc["payload"]
        end
      end
    rescue Mongo::OperationFailure => e
      puts ">>> Cursor closed: %s (%s)" % [e.to_s, e.class]
    end
  rescue Mongo::ConnectionFailure
    puts ">>> DB connection failed"
  end
end

On line 30 we’re setting up a Cursor for all messages matching “all” and our identity. You can now simply publish data with correct headers to target specific consumers or all consumers. The 2 loops will forever attempt to reconnect to any failed database and forever read whatever new messages arrives after connection.

Using this method it’s really easy to come up with all kinds of addressing modes for your queue. For example you can give work being done a job name and combine it with the target header to create sets of named consumers that will all receive commands that match just the work they’re able to do.

Results


As I initially said I did all this to test a MCollective connector that uses MongoDB as a middleware. It worked surprisingly well and I have both broadcast and directed modes working:

$ mco ping
.
.
---- ping statistics ----
15 replies max: 57.94 min: 48.56 avg: 54.72

I’ll try out some other options for a small site or starter edition middleware and post follow up blog posts.

I’ll say I’ve been very surprised by how well this worked though. The connector is a bit complex and awkward because of how thread safety is handled in the MongoDB Ruby drivers but it’s not been a big problem overall to solve a pretty complex use case with this.

Specifically I noted performance didn’t degrade hugely with 50 nodes connected or with larger payloads which is very nice.

MCollective Async Result Handling

This ia a post in a series of posts I am doing about MCollective 2.0 and later.

Overview


The kind of application I tend to show with MCollective is very request-response orientated. You request some info from nodes and it shows you the data as they reply. This is not the typical thing people tend to do with middleware, instead what they do is create receivers for event streams processing those into databases or using it as a job queue.

The MCollective libraries can be used to build similar applications and today I’ll show a basic use case for this. It’s generally really easy creating a consumer for a job queue using Middleware as covered in my recent series of blog posts. It’s much harder doing it when you want to support multiple middleware brokers, support pluggable payload encryption, different serializers add some Authentication, Authorization and Auditing into the mix and soon it becomes a huge undertaking.

MCollective already has a rich sets of plugins for all of this so it would be great if you could reuse these to save yourself some time.

Request, but reply elsewhere


One of the features we added in 2.0.0 is more awareness of the classical reply-to behaviour common to middleware brokers to the core MCollective libraries. Now every request specifies a reply-to target and the nodes will send their replies there, this is how we get replies back from nodes and if the brokers support it this is typically done using temporary private queues.

But it’s not restricted to this, lets see how you can use this feature from the command line. First we’ll setup a listener on a specific queue using my stomp-irb application.

% stomp-irb -s stomp -p 6163
Interactive Ruby shell for STOMP
 
info> Attempting to connect to stomp://rip@stomp:6163
info> Connected to stomp://rip@stomp:6163
 
Type 'help' for usage instructions
 
>> subscribe :queue, "mcollective.nagios_passive_results"
Current Subscriptions:
        /queue/mcollective.nagios_passive_results
 
=> nil
>>

We’re now receiving all messages on /queue/mcollective.nagios_passive_results, lets see how we get all our machines to send some data there:

% mco rpc nrpe runcommand command=check_load --reply-to=/queue/mcollective.nagios_passive_results
Request sent with id: 61dcd7c8c4a354198289606fb55d5480 replies to /queue/mcollective.nagios_passive_results

Note this client recognised that you’re never going to get replies so it just publishes the request(s) and shows you the outcome. It’s real quick and doesn’t wait of care for the results.

And over in our stomp-irb we should see many messages like this one:

<<stomp>> BAh7CzoJYm9keSIB1QQIewg6CWRhdGF7CToNZXhpdGNvZGVpADoMY29tbWFu
ZCIPY2hlY2tfbG9hZDoLb3V0cHV0IihPSyAtIGxvYWQgYXZlcmFnZTogMC44
MiwgMC43NSwgMC43MToNcGVyZmRhdGEiV2xvYWQxPTAuODIwOzEuNTAwOzIu
MDAwOzA7IGxvYWQ1PTAuNzUwOzEuNTAwOzIuMDAwOzA7IGxvYWQxNT0wLjcx
MDsxLjUwMDsyLjAwMDswOyA6D3N0YXR1c2NvZGVpADoOc3RhdHVzbXNnIgdP
SzoOcmVxdWVzdGlkIiU2MWRjZDdjOGM0YTM1NDE5ODI4OTYwNmZiNTVkNTQ4
MDoMbXNndGltZWwrBwjRMFA6DXNlbmRlcmlkIgl0d3AxOgloYXNoIgGvbVdV
V0RXaTd6a04xRWYrM0RRUWQzUldsYjJINTltMUdWYkRBdWhVamJFaEhrOGJl
Ykd1Q1daMnRaZ3VBCmx3MW5DeXhtT2xWK3RpbzlCNFBMbnhoTStvV3Z6OEo4
SVNiYTA4a2lzK3BVTVZ0cGxiL0ZPRVlMVWFPRQp5K2QvRGY3N2I2TTdGaGtJ
RUxtR2hONHdnZTMxdU4rL3hlVHpRenE0M0lJNE5CVkpRTTg9CjoQc2VuZGVy
YWdlbnQiCW5ycGU=

What you’re looking at is a base64 encoded serialized MCollective reply message. This reply message is in this case signed using a SSL key for authenticity and has the whole MCollective reply in it.

MCollective to Nagios Passive Check bridge


So as you might have guessed from the use of the NRPE plugin and the queue name I chose the next step is to connect the MCollective NRPE results to Nagios using its passive check interface:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
require 'mcollective'
require 'pp'
 
# where the nagios command socket is
NAGIOSCMD = "/var/log/nagios/rw/nagios.cmd"
 
# to mcollective this is a client, load the client config and
# inform the security system we are a client
MCollective::Applications.load_config
MCollective::PluginManager["security_plugin"].initiated_by = :client
 
# connect to the middleware and subscribe
connector = MCollective::PluginManager["connector_plugin"]
connector.connect
connector.connection.subscribe("/queue/mcollective.nagios_passive_results")
 
# consume all the things...
loop do
  # get a mcollective Message object and configure it as a reply
  work = connector.receive
  work.type = :reply
 
  # decode it, this will go via the MCollective security system
  # and validate SSL etcetc
  work.decode!
 
  # Now we have the NRPE result, just save it to nagios
  result = work.payload
  data = result[:body][:data]
 
  unless data[:perfdata] == ""
    output = "%s|%s" % [data[:output], data[:perfdata]]
  else
    output = data[:output]
  end
 
  passive_check = "[%d] PROCESS_SERVICE_CHECK_RESULT;%s;%s;%d;%s" % [result[:msgtime], result[:senderid], data[:command].gsub("check_", ""), data[:exitcode], output]
 
  begin
    File.open(NAGIOSCMD, "w") {|nagios| nagios.puts passive_check }
  rescue => e
    puts "Could not write to #{NAGIOSCMD}: %s: %s" % [e.class, e.to_s]
  end
end

This code connects to the middleware using the MCollective Connector Plugin, subscribes to the specified queue and consumes the messages.

You’ll note there is very little being done here that’s actually middleware related we’re just using the MCollective libraries. The beauty of this code is that if we later wish to employ a different middleware or different security system or configure our middleware connections to use TLS to ActiveMQ nothing has to change here. All the hard stuff is done in MCollective config and libraries.

In this specific case I am using the SSL plugin for MCollective so the message is signed so no-one can edit the results in a MITM attack on the monitoring system. This came for free I didn’t have to write any code here to get this ability – just use MCollective.

Scheduling Nagios Checks and scaling them with MCollective


Now that we have a way to receive check results from the network lets look at how we can initiate checks. I’ll use the very awesome Rufus Scheduler Gem for this.

I want to create something simple that reads a simple config file of checks and repeatedly request my nodes – possibly matching mcollective filters – to do NRPE checks. Here’s a sample checks file:

nrpe "check_load", "1m", "monitored_by=monitor1"
nrpe "check_swap", "1m", "monitored_by=monitor1"
nrpe "check_disks", "1m", "monitored_by=monitor1"
nrpe "check_bacula_main", "6h", "bacula::node monitored_by=monitor1"

This will check load, swap and disks on all machines monitored by this monitoring box and do a bacula backup check on machines that has the bacula::node class included via puppet.

Here’s a simple bit of code that takes the above file and schedules the checks:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
require 'rubygems'
require 'mcollective'
require 'rufus/scheduler'
 
# (ab)use mcollective logger...
Log = MCollective::Log
 
class Scheduler
  include MCollective::RPC
 
  def initialize(destination, checks)
    @destination = destination
    @jobs = []
 
    @scheduler = Rufus::Scheduler.start_new
    @nrpe = rpcclient("nrpe")
 
    # this is where the magic happens, send all the results to the receiver...
    @nrpe.reply_to = destination
 
    instance_eval(File.read(checks))
  end
 
  # helper to schedule checks, this will create rufus jobs that does NRPE requests
  def nrpe(command, interval, filter=nil)
    options = {:first_in => "%ss" % rand(Rufus.parse_time_string(interval)),
               :blocking => true}
 
    Log.info("Adding a job for %s every %s matching '%s', first in %s" % [command, interval, filter, options[:first_in]])
 
    @jobs << @scheduler.every(interval.to_s, options) do
      Log.info("Publishing request for %s with filter '%s'" % [command, filter])
 
      @nrpe.reset_filter
      @nrpe.filter = parse_filter(filter)
      @nrpe.runcommand(:command => command.to_s)
    end
  end
 
  def parse_filter(filter)
    new_filter = MCollective::Util.empty_filter
 
    return new_filter unless filter
 
    filter.split(" ").each do |filter|
      begin
        fact_parsed = MCollective::Util.parse_fact_string(filter)
        new_filter["fact"] << fact_parsed
      rescue
        new_filter["cf_class"] << filter
      end
    end
 
    new_filter
  end
 
  def join
    @scheduler.join
  end
end
 
s = Scheduler.new("/queue/mcollective.nagios_passive_results", "checks.txt")
s.join

When I run it I get:

% ruby schedule.rb
info 2012/08/19 13:06:46: activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp:6163
info 2012/08/19 13:06:46: activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp:6163
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_load every 1m matching 'monitored_by=monitor1', first in 36s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_swap every 1m matching 'monitored_by=monitor1', first in 44s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_disks every 1m matching 'monitored_by=monitor1', first in 43s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 496s
info 2012/08/19 13:07:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'
info 2012/08/19 13:07:29: schedule.rb:28:in `nrpe' Publishing request for check_disks with filter 'monitored_by=monitor1'
info 2012/08/19 13:07:30: schedule.rb:28:in `nrpe' Publishing request for check_swap with filter 'monitored_by=monitor1'
info 2012/08/19 13:08:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'

All the checks are loaded, they are splayed a bit so they don’t cause a thundering herd and you can see the schedule is honoured. In my nagios logs I can see the passive results being submitted by the receiver.

MCollective NRPE Scaler


So taking these ideas I’ve knocked up a project that does this with some better code than above, it’s still in progress and I’ll blog later about it. For now you can check out the code on GitHub it includes all of the above but integrated better and should serve as a more complete example than I can realistically post on a blog post.

There are many advantages to this method that comes specifically from combining MCollective and Nagios. The Nagios scheduler visit hosts one by one meaning you get this moving view of status over a 5 minute resolution. Using MCollective to request the check on all your hosts means you get a 1 second resolution – all the load averages Nagios sees are from the same narrow time period. Receiving results on a queue has scaling benefits and the MCollective libraries are already multi broker aware and supports failover to standby brokers which means this isn’t a single point of failure.

Conclusion


So we’ve seen that we can reuse much of the MCollective internals and plugin system to setup a dedicated receiver of MCollective produced data and I’ve shown a simple use case where we’re requesting data from our managed nodes.

Today what I showed kept the request-response model but split the traditional MCollective client into two. One part scheduling requests and another part processing results. These parts could even be on different machines.

We can take this further and simply connect 2 bits of code together and flow arbitrary data between them but securing the communications using the MCollective protocol. A follow up blog post will look at that.

MCollective Batched Requests

This ia a post in a series of posts I am doing about MCollective 2.0 and later.

We’ve discussed Direct Addressing Mode before and today I’ll show one of the new features this mode enables.

Overview


MCollective is very fast which is great usually. Sometimes though when you’re restarting webservers the speed and concurrency can be a problem. Restarting all your webservers at the same time is generally a bad idea.

In the past the general way to work around this was using a fact like cluster=a to cut your server estate into named groups and then only address them based on that. This worked OK but was clearly not the best possibly outcome.

Apart from this the concurrency also meant that once a request is sent you cannot ^C out of it. Any mistake made is final and processing cannot be interrupted.

Since MCollective 2.0 has the ability to address nodes directly without broadcasting it has become much easier to come up with a good solution to these problems. You can now construct RPC requests targeted at 100s of nodes but ask MCollective to communicate with them in smaller batches with a configurable sleep in between batches. You can ^C at any time and only batches that has already received requests will be affected.

Using on the CLI


Using this feature on the CLI is pretty simple, all RPC clients have some new CLI options:

% mco service restart httpd --batch 10 --batch-sleep 2
Discovering hosts using the mongo method .... 26
 
 * [============================================================> ] 26 / 26
 
.
.
.
 
Finished processing 26 / 26 hosts in 6897.66 ms

What you will see when running it on the CLI is that the progress bar will progress in groups of 10, pause 2 seconds and then do the next 10. In this case you could ^C at any time and only the machines in earlier batches and the 10 of the current batches will have restarted, future nodes would not yet be affected in any way.

Under the hood MCollective detects that you want to do batching then force the system into Direct Addressing Mode and makes batches of requests. The requestid stays the same throughout, auditing works, results work exactly as before and display behaviour does not change apart from progressing in steps.

Using in code


Naturally you can also use this from your own code, here’s a simple script that does the same thing as above.

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/ruby
 
require 'mcollective'
include MCollective::RPC
 
svcs = rpcclient("service")
 
svcs.batch_size = 10
svcs.batch_sleep_time = 2
 
printrpc svcs.restart(:service => "httpd")

The key lines here are lines 8 and 9 that has the same behaviour as –batch and –batch-sleep

MCollective Pluggable Discovery

This ia a post in a series of posts I am doing about MCollective 2.0 and later.

In my previous post I detailed how you can extend the scope of the information MCollective has available to it about a node using Data Plugins, this was node side plugins today we’ll look at ones that runs on the client.

Background


Using the network as your source of truth works for a certain style of application but as I pointed out in an earlier post there are kinds of application where that is not appropriate. If you want to build a deployer that rolls out the next version of your software you probably want to provide it with a list of nodes rather than have it discover against the network, this way you know when a deploy failed because a node is down rather than it just not being discovered.

These plugins give you the freedom of choice to discover against anything that can give back a list of nodes with mcollective identities. Examples are databases, CMDBs, something like Noah or Zookeeper etc.

To get this to work requires Direct Addressing, I’ll recap an example from the linked post:

c = rpcclient("service")
 
c.discover :nodes => File.readline("hosts.txt").map {|i| i.chomp}
 
printrpc c.restart(:service => "httpd")

In this example MCollective is reading hosts.txt and using that as the source of truth and attempts to communicate only with the hosts discovered against that file. This, as was covered in the previous post, is in stark contrast with MCollective 1.x that had no choice but to use the network as source of truth.

Building on this we’ve built a plugin system that abstracts this away into plugins that you can use on the CLI, web etc – once activated the MCollective usage on the CLI and any existing code can use these plugins without code change.

Using Discovery Plugins


Using these plugins is the same as you’d always do discovery, in fact as of version 2.1.0 if you use mcollective you’re already using this plugin, lets see:

% mco rpc rpcutil ping
Discovering hosts using the mc method for 2 second(s) .... 26
 
 * [============================================================> ] 26 / 26
.
.
---- rpcutil#ping call stats ----
           Nodes: 26 / 26
     Pass / Fail: 26 / 0
      Start Time: Fri Jul 06 09:47:06 +0100 2012
  Discovery Time: 2002.07ms
      Agent Time: 311.14ms
      Total Time: 2313.21ms

Notice the discovery message says it is using the “mc” method, this is the traditional broadcast mode as before, it’s the default mode and will remain the default mode.

Lets look at the generic usage of the hosts.txt above:

% mco rpc rpcutil ping --nodes hosts.txt -v
Discovering hosts using the flatfile method .... 9
 
 * [============================================================> ] 9 / 9
.
.
---- rpcutil#ping call stats ----
           Nodes: 9 / 9
     Pass / Fail: 9 / 0
      Start Time: Fri Jul 06 09:48:15 +0100 2012
  Discovery Time: 0.40ms
      Agent Time: 34.62ms
      Total Time: 35.01ms

Note the change in the discovery message, it is now using the flatfile discovery method and doesn’t have a timeout. Take a look at the Discovery Time statistic, the flatfile example took a fraction of a second vs the usual 2 seconds spent discovering.

There’s a longer form of the above command:

% mco rpc rpcutil ping --disc-method flatfile --disc-option hosts.txt
Discovering hosts using the flatfile method .... 9
.
.

So you can pick a discovery method and they can take options. You can figure out what plugins you have available to you using the plugin application:

% mco plugin doc
Please specify a plugin. Available plugins are:
.
.
Discovery Methods:
  flatfile        Flatfile based discovery for node identities
  mc              MCollective Broadcast based discovery
  mongo           MongoDB based discovery for databases built using registration
  puppetdb        PuppetDB based discovery

And more information about a plugin can be seen:

% mco plugin doc mc
MCollective Broadcast based discovery
 
      Author: R.I.Pienaar <rip@devco.net>
     Version: 0.1
     License: ASL 2.0
     Timeout: 2
   Home Page: http://marionette-collective.org/
 
DISCOVERY METHOD CAPABILITIES:
      Filter based on configuration management classes
      Filter based on system facts
      Filter based on mcollective identity
      Filter based on mcollective agents
      Compound filters combining classes and facts

The discovery methods have capabilities that declare what they can do. The flatfile one for example has no idea about classes, facts etc so it’s capabilities would only be identity filters.

If you decide to always use a different plugin than mc as your discovery source you can set it in client.cfg:

default_discovery_method = mongo

The RPC api obviously can also choose method and supply options, below code forces the flatfile mode:

c = rpcclient("service")
 
c.discovery_method = "flatfile"
c.discovery_options << "hosts.txt"
 
printrpc c.restart(:service => "httpd")

This has the same effect as mco rpc service restart service=httpd –dm=flatfile –do=hosts.txt

Writing a Plugin


We’ll look at the simplest plugin which is the flatfile one, this plugin ships with MCollective but it’s a good example.

This plugin will let you issue commands like:

% mco service restart httpd
% mco service restart httpd -I some.host
% mco service restart httpd -I /domain/ -I /otherdomain/

So your basic identity filters with regular expression support or just all hosts.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
module MCollective
  class Discovery
    class Flatfile
      def self.discover(filter, timeout, limit=0, client=nil)
        unless client.options[:discovery_options].empty?
          file = client.options[:discovery_options].first
        else
          raise "The flatfile discovery method needs a path to a text file"
        end
 
        raise "Cannot read the file %s specified as discovery source" % file unless File.readable?(file)
 
        discovered = []
        hosts = File.readlines(file).map{|l| l.chomp}
 
        unless filter["identity"].empty?
          filter["identity"].each do |identity|
            identity = Regexp.new(identity.gsub("\/", "")) if identity.match("^/")
 
            if identity.is_a?(Regexp)
              discovered = hosts.grep(identity)
            elsif hosts.include?(identity)
              discovered << identity
            end
          end
        else
          discovered = hosts
        end
 
        discovered
      end
    end
  end
end

Past the basic boiler plate in lines 5 to 11 we deal with the discovery options, you’ll notice discovery options is an array so users can call –disc-option many times and each call just gets appended to this array. We’ll just take one flat file and raise if you didn’t pass a file or if the file can’t be read.

Lines 13 and 14 sets up a empty array where the selected nodes will go into and reads all the hosts found in the file.

Lines 16 and 17 checks if we got anything in the identity filter, if it was not we set the discovered list to all hosts in the file in line 27. The filters are arrays so in the case of multiple -I passed you will have multiple entries here, line 17 loops all the filters. You do not need to worry about someone accidentally setting a Class filter as MCollective will know from the DDL that you are incapable of doing class filters and will just not call your plugin with those.

The body of the loop in lines 18 to 25 just does regular expression matching or exact matching over the list and if anything is found it gets added to the discovered list.

In the end we just return the list of discovered nodes, you do not need to worry about duplicates in the list or sorting it or anything.

As there were automatic documentation generated and input validation done you need to create a DDL file that describes the plugin and the data it can accept and return, here’s the DDL for this plugin:

1
2
3
4
5
6
7
8
9
10
11
metadata    :name        => "flatfile",
            :description => "Flatfile based discovery for node identities",
            :author      => "R.I.Pienaar <rip@devco.net>",
            :license     => "ASL 2.0",
            :version     => "0.1",
            :url         => "http://marionette-collective.org/",
            :timeout     => 0
 
discovery do
    capabilities :identity
end

The meta block is familiar – set timeout to 0 if there’s no timeout and then MCollective will not inform the user about a timeout in the discovery message. Lines 9 to 11 declares the capabilities, possible capabilities are :classes, :facts, :identity, :agents, :compound. Technically :compound isn’t usable by your plugins as MCollective will force the mc plugin when you use any -S filters as those might contain references to data plugins that has to be done using the nodes as source of truth.

Finally store this in a directory like below and you can package it into a RPM or a Deb:

% tree flatfile
flatfile
└── discovery
    ├── flatfile.ddl
    └── flatfile.rb
% cd flatfile
% mco plugin package
Created package mcollective-flatfile-discovery
% ls -l *rpm
-rw-rw-r-- 1 rip rip 2893 Jul  6 10:20 mcollective-flatfile-discovery-0.1-1.noarch.rpm

Install this plugin to all your clients and it will be available to use, if you do not want to use the packages just dump the files in $libdir/discovery/.

Available Plugins


There are a few plugins available now, you saw the mc and flatfile ones here.

If you use the MongoDB based discovery system there is a fully capable discovery plugin that can work against a local MongoDB instance. This plugin has all the capabilities possible with full regular expression support and full sub collective support. I use this as my default discovery method now.

We’re also working on a PuppetDB one, it is not quite ready to publish as I am waiting for PuppetDB to get wildcard support. And finally there is a community plugin that discovers using Elastic Search.

Conclusion


These plugins conclude the big rework done on MCollective discovery. You can now mix and match any source of truth you like even ones we as MCollective developers are not aware of as you can write your own plugin.

Use the network when appropriate, use databases or flat files when appropriate and you can switch freely between modes during the life of a single application.

Using these plugins is fun as they can be extremely fast. The short 1 minute video embedded below (click here if its not shown) shows the mco, puppetdb and mongodb plugins in action.

Version 2.1.0 made these plugins available, we’re looking to bump the Production branch to support these soon.

MCollective 2.1 – Data Plugins for Discovery

This ia a post in a series of posts I am doing about MCollective 2.0 and later.

In my previous post I covered a new syntax for composing discovery queries and right at the end touched on a data plugin system, today I’ll cover those in detail and show you how to write and use such a plugin.

Usage and Overview


These plugins allow you to query any data available on your nodes. Examples might be stat() information for a file, sysctl settings, Augeas matches – really anything you could potentially interact with from Ruby that exist on your managed nodes can be used as discovery data. You can write your own and distribute it and we ship a few with MCollective.

I’ll jump right in with an example of using these plugins:

$ mco service restart httpd -S "/apache/ and fstat('/etc/rsyslog.conf').md5 = /51b08b8/"

Here we’re using the -S discovery statement so we have full boolean matching. We match machines with the apache class applied and then do a regular expression match over the MD5 string of the /etc/rsyslog.conf file, any machines with both conditions met are discovered and apache is restarted.

The fstat plugin ships with MCollective 2.1.0 and newer ready to use, we can have a look at our available plugins:

$ mco plugin doc
.
.
Data Queries:
  agent           Meta data about installed MColletive Agents
  augeas_match    Augeas match lookups
  fstat           Retrieve file stat data for a given file
  resource        Information about Puppet managed resources
  sysctl          Retrieve values for a given sysctl

And we can get information about one of these plugins, lets look at the agent one:

$ mco plugin doc agent
Agent
=====
 
Meta data about installed MColletive Agents
 
      Author: R.I.Pienaar <rip@devco.net>
     Version: 1.0
     License: ASL 2.0
     Timeout: 1
   Home Page: http://marionette-collective.org/
 
QUERY FUNCTION INPUT:
 
              Description: Valid agent name
                   Prompt: Agent Name
                     Type: string
               Validation: (?-mix:^[\w\_]+$)
                   Length: 20
 
QUERY FUNCTION OUTPUT:
 
           author:
              Description: Agent author
               Display As: Author
 
           description:
              Description: Agent description
               Display As: Description
 
           license:
              Description: Agent license
               Display As: License
 
           timeout:
              Description: Agent timeout
               Display As: Timeout
 
           url:
              Description: Agent url
               Display As: Url
 
           version:
              Description: Agent version
               Display As: Version

This shows what the query is that this plugin is expecting and what data it returns, so we can use this to discover all machines with version 1.6 of a specific MCollective agent:

$ mco find -S "agent('puppetd').version = 1.6"

And if you’re curious what exactly a plugin would return you can quickly find out using the rpcutil agent:

% mco rpc rpcutil get_data query=puppetd source=agent
 
devco.net                                
         agent: puppetd
        author: R.I.Pienaar
   description: Run puppet agent, get its status, and enable/disable it
       license: Apache License 2.0
       timeout: 20
           url: https://github.com/puppetlabs/mcollective-plugins
       version: 1.6

Writing your own plugin


Lets look at writing a plugin. We’re going to write one that can query a Linux sysctl value and let you discover against that. We’ll want this plugin only to activate on machines where /sbin/sysctl exist.

When we’re done we want to be able to do discovery like:

% mco service restart iptables -S "sysctl('net.ipv4.conf.all.forwarding').value=1"

To restart iptables on all machines with that specific sysctl enabled. Additionally we’d be able to use this plugin in any of our agents:

action "query" do
   reply[:value] = Data.sysctl(request[:sysctl_name]).value
end

So these plugins really are nicely contained reusable bits of data retrieval logic shareable between discovery, agents and clients.

This is the code for our plugin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
module MCollective; module Data
  class Sysctl_data<Base
    activate_when { File.exist?("/sbin/sysctl") }
 
    query do |sysctl|
      out = %x{/sbin/sysctl #{sysctl}}
 
      if $?.exitstatus == 0
        value = out.chomp.split(/\s*=\s*/)[1]
 
        if value
          value = Integer(value) if value =~ /^\d+$/
          value = Float(value) if value =~ /^\d+\.\d+$/
 
          result[:value] = value
        end
      end
    end
  end
end;end

These plugins have to be called Something_data and they go in the libdir called data/something_data.rb.

On line 3 we use the activate_when helper to ensure we don't enable this plugin on machines without sysctl. The same confinement system as you might have seen in Agents.

Lines 5 to 18 we run the sysctl command and do some quick and dirty parsing of the result ensuring we return Integers and Floats so that numeric comparison works fine on the CLI.

You'd think we need to do some input validation here to avoid bogus data or shell injection but below you will see that the DDL defines validation and MCollective will validate the input for you prior to invoking your code. This validation happens on both the server and the client. DDL files also help us generate the documentation you saw above, native OS packages and in some cases command line completion and web UI generation.

The DDL for this plugin would be:

metadata    :name        => "Sysctl values",
            :description => "Retrieve values for a given sysctl",
            :author      => "R.I.Pienaar <rip@devco.net>",
            :license     => "ASL 2.0",
            :version     => "1.0",
            :url         => "http://marionette-collective.org/",
            :timeout     => 1
 
dataquery :description => "Sysctl values" do
    input :query,
          :prompt      => "Variable Name",
          :description => "Valid Variable Name",
          :type        => :string,
          :validation  => /\A[\w\-\.]+\z/,
          :maxlength   => 120
 
    output :value,
           :description => "Kernel Parameter Value",
           :display_as  => "Value"
end

This stuff is pretty normal anyone who has written any MCollective agents would have seen these and the input, output and metadata formats are identical. The timeout is quite important if your plugin is doing something like talking to Augeas then set this timeout to a longer period, the client when doing discovery will wait an appropriate period of time based on these timeouts.

With the DDL deployed to both the server and the client you can be sure people won't be sending you nasty shell injection attacks and if someone accidentally tries to access a non existing return they'd get an error before sending traffic over the network.

You're now ready to package up this plugin we support creating RPMs and Debs of mcollective plugins:

% ls data
sysctl_data.ddl  sysctl_data.rb
% mco plugin package
Created package mcollective-sysctl-values-data
% ls -l
-rw-rw-r-- 1 rip rip 2705 Jun 30 10:05 mcollective-sysctl-values-data-1.0-1.noarch.rpm
% rpm -qip mcollective-sysctl-values-data-1.0-1.noarch.rpm
Name        : mcollective-sysctl-values-data  Relocations: (not relocatable)
Version     : 1.0                               Vendor: Puppet Labs
Release     : 1                             Build Date: Sat 30 Jun 2012 10:05:24 AM BST
Install Date: (not installed)               Build Host: devco.net
Group       : System Tools                  Source RPM: mcollective-sysctl-values-data-1.0-1.src.rpm
Size        : 1234                             License: ASL 2.0
Signature   : (none)
Packager    : R.I.Pienaar <rip@devco.net>
URL         : http://marionette-collective.org/
Summary     : Retrieve values for a given sysctl
Description :
Retrieve values for a given sysctl

Install this RPM on all your machines and you're ready to use your plugin. The version and meta data like author and license in the RPM comes from the DDL file.

Conclusion


This is the second of a trio of new discovery features that massively revamped the capabilities of MCollective discovery.

Discovery used to be limited to only CM Classes, Facts and Identities now the possibilities are endless as far as data residing on the nodes go. This is only available in the current development series - 2.1.x - but I hope this one will be short and we'll get these features into the production supported code base soon.

In the next post I'll cover discovering against arbitrary client side data - this was arbitrary server side data.

MCollective 2.0 – Complex Discovery Statements

This ia a post in a series of posts I am doing about MCollective 2.0 and later.

In the past discovery was reasonably functional, certainly at the time I first demoed it around 2009 it was very unique. Now other discovery frameworks exist that does all sorts of interesting things and so we did 3 huge improvements to discovery in MCollective that again puts it apart from the rest, these are:

  • Complex discovery language with full boolean support etc
  • Plugins that lets you query any node data as discovery sources
  • Plugins that lets you use any data available to the client as discovery sources

I’ll focus on the first one today. A quick example will be best.

$ mco service restart httpd -S "((customer=acme and environment=staging) or environment=development) and /apache/"

Here we are a hypothetical hosting company and we want to restart all the apache services for development. One of the customers though use their staging environment as development so it’s a bit more complex. This discovery query will find the acme customers staging environment and development for everyone else and then select the apache machines out of those.

You can also do excludes and some other bits, these 2 statements are identical:

$ mco find -S "environment=development and !customer=acme"
$ mco find -S "environment=development and not customer=acme"

This basic form of the language can be described with the EBNF below:

compound = ["("] expression [")"] {["("] expression [")"]}
expression = [!|not]statement ["and"|"or"] [!|not] statement
char = A-Z | a-z | < | > | => | =< | _ | - |* | / { A-Z | a-z | < | > | => | =< | _ | - | * | / | }
int = 0|1|2|3|4|5|6|7|8|9{|0|1|2|3|4|5|6|7|8|9|0}

It’s been extended since but more on that below and in a future post.

It’s very easy to use this filter in your code, here’s a Ruby script that sets the same compound filter and restarts apache:

#!/usr/bin/ruby
 
require "mcollective"
 
include MCollective::RPC
 
c = rpcclient("service")
c.compound_filter '((customer=acme and environment=staging) or environment=development) and /apache/'
 
printrpc c.restart(:service => "httpd")

These filters are combined with other filters so you’re welcome to mix in Identity filters etc using the other filter types and they will be evaluated additively.

These filters also supports querying node data, a simple example of such a query can be seen here:

$ mco service restart httpd -S "fstat('/etc/httpd/conf/httpd.conf').md5 = /51b08b8/"

This will match all machines with a certain MD5 hash for the apache config file and restart them. More on these plugins the next post where I’ll show you how to write your own and use them.

MCollective Direct Addressing Mode

As mentioned in my first post in this series I will be covering new MCollective features that were introduced with version 2.0.0 or later. Today I’ll talk about the biggest new feature called Direct Addressing.

The Past – Broadcast Only Mode


In the past MCollective only had one mode of communication. It would send a broadcast message to all nodes with the target agent in a named group (subcollective) and this message would have a filter attached that nodes will validate to determine if they should run the action. Basically if I send a message with a filter “country=uk” all machines will get it and validate this filter, the ones that match will act on the message.

This mode is the first problem I set out to solve – a way to have a broadcast based zero config RPC system that can address many machines in parallel with a new style of addressing. I wanted to get the broadcast model right first and I wanted to get the RPC structures right as well before looking at other possibilities.

There were many 1:1 RPC systems before and it’s not a very hard problem to solve if you have a queue – but it was not the problem I set out to solve as my first target. MCollective 1.2.1 and older did not have a 1:1 mode.

The parallel mode works fine in many scenarios, specifically this is the only real way to build a central coordinator that degrades well in split brain scenarios since addressing is done by discovery and only discovered nodes are expected to reply. It’s a new paradigm – one thats better suited for distributed applications since failure is inevitable you may as well code your applications to always work in that environment.

I think MCollective solved that problem well in the past but the very nature of that mode of communication means it is not suitable for all use cases. The unsuitable usage include some of the points below but there are of course others:

  • If you want to deploy to a pre-determined set of hosts you really want to be sure they get your request and get warnings if they dont
  • The broadcast mode is very fast and parallel, you might want to do rolling restarts
  • The broadcast only operates with a concept of now, you might know a machine is being provisioned and soon as its up you want it to run your command that you sent 10 minutes ago
  • Your discovery needs might not map onto what MCollective support, like when you present users with a host list they can pick arbitrary hosts from

There are many similar problems that was awkward to fit into the MCollective model in the past, all related to either trying to live outside its idea of addressing or about slowing it down to a pace suitable for rolling changes.

Messaging Rewrite


As of 2.0.0 we now have a fairly large rewrite of the messaging subsystem to be more generic and extendable but it also introduce a new mode of addressing that allows you to provide the host list in any way you want. Rather than doing a broadcast for these requests it will communicate only with the specified nodes.

The history of MCollective is that it used to be a in-house unpublished project that was made pluggable and open sourced. The initial release did a OK job of it but the messaging had a bunch of badly coupled decisions all over the code base that was a legacy left over. In 2.0.0 we’re rewritten all of this and abstracted all the actual communication with the middleware away from MCollective core. This made it much easier to change how we approach messaging.

Armed with the 2nd mode of communication we were able to apply the very same existing RPC system to a second more traditional style of addressing and we’re able to mix and match freely between these modes when appropriate. In 2.0.0 this is all kind of under the covers and accessible to the API only but in the new development series – 2.1.x – there has already been a bunch of new user facing features added thanks to this new mode.

Best is to show some code, here’s a traditional discovery based approach to running and action against some machines. The example will just restart apache on some machines:

c = rpcclient("service")
 
c.fact_filter "country=uk"
 
printrpc c.restart(:service => "httpd")

This code sets the fact_filter which it will get via discovery and then communicates with those hosts. You’re basically here at the mercy of the network and current status of those machines for which ones will be affected.

But what if you had a list of hosts that you know you wanted to target like you would if you’re doing a specific deployment task? You’d have had to do something like:

c.identity_filter /^host1|host2|host3|host4$/

Instead of the fact filter, not ideal! It would still be doing a discover and should host4 not be around it will not really tell you it can’t talk to 4. All it knows is there’s a regex to match.

Now since MCollective 2.0.0 the situation is hugely improved, here’s the client again this time supplying custom discovery data:

c = rpcclient("service")
 
c.discover :nodes => File.readline("hosts.txt").map {|i| i.chomp}
 
printrpc c.restart(:service => "httpd")
 
unless c.stats.noresponsefrom.empty?
   STDERR.puts "WARNING: No responses from hosts: %s" % [c.stats.noresponsefrom.join(", ")]
end

In this example I am reading a text file called hosts.txt that should have 1 hostname per line and passing that into the discover method. This switches the MCollective client into Direct Addressing mode and it will attempt to communicate with just the hosts you provided in the host list.

Communication is still via the Message Broker even in direct mode but under the covers this is built using queues.

Now if any of those hosts aren’t responding in time you will get an actual useful error message that you could handle in your code in whatever way you wish.

Also note that we were not compelled to construct a filter that would match every host like in the past, just giving the list of identities were enough.

This is a lot more suitable for the purpose of building deployment tools or web applications where you might have arbitrary hosts. This also demonstrates that you are effectively doing discovery against a text file and can easily be adapted to communicate with a database or any data you might have on the client side.

Other Possibilities


This mode opens up a whole bunch of possibilities and I’ll run through a few here – and there will be follow up posts covering some of these in more detail:

Command Chaining


You can now chain RPC requests via Unix pipes:

% mco rpc package status package=foo -j | jgrep data.version=1.2.3 | mco rpc puppetd runonce

This fetches the package version of the foo package, filters out only the nodes where the version is 1.2.3 and then does a Puppet run on those nodes. The Puppet run is using the filtered result set from the first command as a source of discovery information so you do not need to supply any filters or anything like that.

Batched Requests


To avoid affecting all discovered nodes at the same time you can now do things in smaller batches, carving up the total discovered nodes into smaller chunks:

% mco rpc service restart service=httpd --batch=2 --batch-sleep=60

This will look on the surface the exact same as before, progress bar and all, but it will progress in groups of 2 and sleep a minute between each group. It will still be traditional discovery (Unless you use -I), the results will look the same, everything will be the same except it will just affect 2 machines at a time.

You can ^C at any time to bail out and only the batches up to that point will be affected.

Traffic Optimization


If you have a 1000 nodes and you often just communicate with a small amount – say 10 – the broadcast mode is not very efficient, the middleware will shunt messages to all 1000 all the time.

Since 2.0.0 the client will switch to Direct Addressing mode if it determines you are communicating with <= 10 hosts, you can configure this threshold to your own needs.

This has the effect that discovery gets done old style unless you supply a list as above and then it will only communicate with the 10 nodes or less directly thus saving a lot of effort for the middleware layer.

No discovery for identity only filters


If you are only using the -I option and not supplying regular expressions MCollective will now switch to direct addressing mode and just assume you know what you’re doing.

% mco rpc rpcutil ping -I devco.net
 
 * [============================================================> ] 1 / 1
 
devco.net
   Timestamp: 1340117924
 
Finished processing 1 / 1 hosts in 62.17 ms

Note there’s no indication that it’s doing any discovery – that’s because it completely bypassed that step, you can specify multiple -I arguments to go out to many machines.

The total runtime here will be very close to 70ms vs the old 2 seconds for discovery and 70ms for the main request.

Pluggable Discovery


Now that we can get host lists from anywhere we’ve made discovery completely pluggable allowing you to decide what is your preferred source of truth that suits your needs best.

Here are some examples:

Query a flatfile:

% mco rpc service restart service=httpd --nodes=hosts.txt

Discover using data kept in PuppetDB:

% mco rpc service restart service=httpd -W country=uk --dm=puppetdb

There are plugins for flatfiles, PuppetDB, MongoDB as built by registration, old style broadcast mode (the default) and more will be written like a recent one by a community member that queries Elastic Search. Imagine discovering against Zookeeper, Chef Server, Noah or any other data source you can imagine. Direct Addressing makes all of that possible.

This is a MCollective 2.1.x feature only at the moment so still maturing in the development series.

Message TTLs


Directly addressed messages are implemented using queues, this means they can linger on the network when no-one is there to consume them. Using this feature we can send RPC requests to nodes that do not exist yet – time band that request and should the node become active during the TTL they will act on that request:

% mco rpc service restart service=httpd --ttl 1000 -I some.node

This request will go sit in the queue for some.node and if that machine boots up in the next 1000 seconds it will perform the request. The TTLs are low by default and it does mean your clocks need to be synced. RPC to non existing hosts though, quite interesting bootstrapping possibilities.

Entirely new styles of application built ontop of MCollective


Now that the MCollective messaging has been abstracted out of core it would be fairly easy to apply MCollective to non request/response style systems. We can use MCollective libraries to just transport arbitrary data between 2 processes. This will be done using the security, serialization and connector plugins meaning that you can write generic code and just reuse my libraries to have pluggable security and network capabilities.

The system now supports sending requests and reading the replies elsewhere. For a web based system this would allow a 100% async model. You could send your request from the web page and have the replies spool into a temporary storage like a NoSQL database where you show the result sets using paging and traditional web approaches. This combined with arbitrary discovery sources means an entirely new kind of web application can be built using MCollective RPC that’s very fast, responsive and feature rich

Conclusion


That’s a quick intro to the new messaging rewrite covering just a few areas it’s already improved. I’ll follow up with more in depth blog posts about some of the items mentioned above.

Having this mode doesn’t deprecate or invalidate the older broadcast mode, I still believe this is the right approach for zero config systems, still believe its method of degrading is the right way to build a certain kind of application and MCollective will remain suitable to those kinds of application. It will remain the default mode for a freshly installed MCollective client. The new mode enhances the existing capabilities.

A side effect of all of this rewriting is that the connectivity plugin is now in full control of how this is implemented paving the way for wider middleware support in the future. At the moment the only viable way to use this feature is to use ActiveMQ but we’ll add more plugins in the future.

Given the above statement Direct Addressing isn’t yet enabled by default but expect that to change in the next major release.