Moving from EventMachine to Async

In The Beginning

A little over 13 years ago, I had been working as a freelance developer for over five years, primarily writing and hosting Ruby based web sites and applications for a variety of commercial interests, from mutual funds to construction to education.

I had designed and built a reliable system architecture for deploying and managing dozens of disparate sites and applications. However, I frequently encountered an annoying logistical problem when I was diagnosing and debugging client issues. For both load handling and for redundancy purposes, many of these sites ran on two or more application server instances. This necessarily meant that there were two or more sets of logs being generated on different instances/machines.

If there were commercial log aggregator offerings at that time, I wasn’t aware of them, and for various reasons I didn’t like the idea of trying to use syslog to fill the gap, so I chose to write my own solution. I did everything with Ruby, and I wasn’t afraid of the challenge of writing a Ruby based system that was both fast enough and reliable enough for this purpose, so I dove in and I hacked something out.

Analogger was born.

The Backstory

Analogger was written using Ruby 1.8.6, and for high performance network IO, I used EventMachine, which provides an event reactor implemented with C++, with an interface to Ruby, that lets one write high speed IO/network handling code in Ruby. As part of the package, I also wrote a logging client, which used pure Ruby to send the messages, so that whatever was using the client (such as a Rails app) did not have to be running an EventMachine reactor to use it.

I didn’t do much with it after that. It just worked. I announced the Ruby 1.8.6 End-Of-Life in 2011, but Analogger didn’t need much attention to adapt to later Rubies. Analogger just worked.

In my usage, at it’s peak, it handled millions of records a day, aggregating those logs into log files on a single dedicated logging instance. I fixed a few bugs here and there. I eventually made it available on Github and as a gem. I added a feature a few years ago so that one can avoid losing logging data if the server goes down. But mostly, I ignored it and just let it do it’s job. There was little that had to be done for it to work with newer Rubies as they were released, though the dependence on EventMachine meant that alternative Ruby implementations, such as JRuby, might not work so well (there has been a Java implementation of EventMachine for a long time, but it’s feature parity with the Ruby version often lagged). However, like so many other people do so often with software that is running in production without any problems, I mostly took the approach of, “If it isn’t broken, don’t fix it.”

Fast forward to the present. I’m running Ruby 2.7.1. Rubygems is a first class citizen of the Ruby ecosystem. EventMachine is still around, and it’s still fast, but Ruby is faster now, too, and there are other ways to do things. There are also other interesting languages around like Crystal, Golang, and Rust, and a compelling argument could be made that low level, performance sensitive software like Analogger might better be written in a language like that.

So, a few weeks ago when I was looking through the Analogger code with that in mind, and it occurred to me that before I embark on a complete rewrite, it would be prudent to just renovate Analogger for the Ruby 2.7.1 world. The biggest part of that is that I wanted to convert it from dependence on EventMachine to the use of the excellent Async library.

The Journey

EventMachine is great. It joined the Ruby ecosystem a long time ago, and it still sees regular use. That alone should be testimony to it’s utility and its performance. However, Ruby itself now has a much richer ecosystem surrounding it, which provides us with more options, and more choices.

Analogger is an asynchronous logger written around an event reactor. There is no reason that reactor can not itself be implemented primarily in Ruby.

Back at the end of 2011, Tony Acieri started writing nio4r. It was modeled after Java’s NIO (Nonblocking IO). Taken from the current github page for the project:

New I/O for Ruby (nio4r): cross-platform asynchronous I/O
primitives for scalable network clients and servers. Modeled
after the Java NIO API, but simplified for ease-of-use.

nio4r provides an abstract, cross-platform stateful I/O
selector API for Ruby. I/O selectors are the heart of
"reactor"-based event loops, and monitor multiple I/O objects
for various types of readiness, e.g. ready for reading or writing.

About three years ago Samuel Williams started working on Async, which is an event reactor implementation in Ruby, leveraging Nio4r. It is intentionally simple, with several companion projects that provide other capabilities such as IO, HTTP handling, process management, websockets, etc.

For the purposes of the renovation of Analogger, async-io seems like the state of the art in Ruby event reactors, so let’s refactor the codebase to use it.

With EventMachine, the core of the usage is this:

EventMachine.run {
  EventMachine.add_shutdown_hook do
    write_queue
    flush_queue
    cleanup
  end
  @server = EventMachine.start_server @config[-"host"], @config[-"port"], protocol
  EventMachine.add_periodic_timer(1) {Analogger.update_now}
  EventMachine.add_periodic_timer(@config[-"interval"]) {write_queue}
  EventMachine.add_periodic_timer(@config[-"syncinterval"]) {flush_queue}
}

In this code, protocol is a class that implements the network protocol, using EventMachine methods.

To convert the code to using Async (specifically, Async-IO), we remove EventMachine from the gemspec and add Async:

spec.add_runtime_dependency 'async-io', '~> 1.29'

Async offers some signal handling tools that EventMachine did not offer. In the original EventMachine based code, the following helper method was implemented:

def safe_trap(siglist, &operation)
  (Signal.list.keys & siglist).each {|sig| trap(sig, &operation)}
end

It’s usage was something like this:

EXIT_SIGNALS = %w[INT TERM]
safe_trap(EXIT_SIGNALS) {handle_pending_and_exit}

Async’s signal handling is aware of the reactor event loop, and is written to interact with it. In order to keep the code within the Async event loop looking clean, helper methods for each of the signal handlers needed to be written. They follow this pattern:

def install_hup_trap(task: nil)
  hup_trap = Async::IO::Trap.new(:HUP)
  hup_trap.install!
  task.async do |_handler_task|
    loop do
      hup_trap.wait
      cleanup_and_reopen
    end
  end
end

This handler, for the HUP signal, instructs Analogger to sync any data that may still be in buffers out to its logfiles, and then to close and reopen all of them. With Async, since everything is being handled from inside of a reactor event loop, there are two interesting things that happen here.

  hup_trap = Async::IO::Trap.new(:HUP)
  hup_trap.install!

This creates a signal handler, and then installs it.

  task.async do |_handler_task|
    loop do
      hup_trap.wait
      cleanup_and_reopen
    end
  end

This creates a task. Async leverages Ruby fibers to encapsulate discrete units of execution, and the Task essentially wraps that Fiber with a state management framework.

This code looks like it would just run in a tight loop, calling cleanup_and_reopen over and over. However, the call to hup_trap.wait changes that. The hup_trap.wait causes that fiber to cease executing and passes control back to the reactor. The signal handler encapsulated in the instance of Async::IO::Trap that was created for the HUP signal, if it is triggered by catching such a signal, will tell the reactor that this fiber should be scheduled again. As soon as the reactor is free to do so, it will resume the fiber’s execution at that point, allowing the loop to continue. When the loop restarts, the fiber will yield it’s execution once again. In this way the signal handler remains active and available to handle any number of HUP signals.

Similar methods were created for handling the USR2 signal, which Analogger responds to by completely restarting itself, and the INT and TERM signals, which both cause Analogger to shut down gracefully, except in those methods, because the respective signals will only be handled a single time, they do not sit within a loop.

def install_int_term_trap(task: nil, server: nil)
  int_trap = Async::IO::Trap.new(:INT)
  term_trap = Async::IO::Trap.new(:TERM)
  [int_trap, term_trap].each do |trap|
    trap.install!
    task.async do |_handler_task|
      trap.wait

      write_queue if any_in_queue?
      flush_queue
      cleanup
      trap.default!
      Async.logger.info(server) do |buffer|
        buffer.puts "Caught #{trap}"
        buffer.puts 'Stopping all tasks...'
        task.print_hierarchy(buffer)
        buffer.puts '', 'Reactor Hierarchy'
        task.reactor.print_hierarchy(buffer)
      end
      task.stop
    end
  end
end

Because this is renovation, the handler also takes advantage of some nice introspection capabilities that exist in Async to output a snapshot of the Reactor internals when it is stopped. This feature doesn’t exist with EventMachine, and it isn’t a functional feature, in that it doesn’t alter the core capabilities of the software in any way, but it is nice because it lets one see what tasks were currently in the system when it was stopped.

Analogger uses several timers, which are blocks of code that get executed either once, or repeatedly, after a given interval, to trigger actions that should occur at periodic intervals (such as ensuring that all logs that may exist in memory buffers are flushed to disk). EventMachine offers some nice syntactic sugar for implementing timers.

Async doesn’t offer any built in sugar for doing this, but timers are simple to implement within a reactor.

async do |subtask|
  subtask.sleep(interval)
  # DO STUFF
end

This simple code here creates a task. That task sleeps, which yields it’s execution back to the reactor. After the specified sleep interval (in seconds) has passed, the reactor will wake the fiber and allow it to continue executing. At that point, it yields execution to the block that it was given, and the rest of the code after the sleep runs. If one wants this to be a periodic timer, running repeatedly, one puts that code inside of a loop do/end block.

This can be abstracted to a general case, for handling both one shot and periodic timers, by monkey patching Async with a little helper method:

class Task
  def add_timer(interval: 0, periodic: true)
    async do |subtask|
      loop do
        subtask.sleep(interval)
        yield
        break unless periodic
      end
    end
  end
end

With these little changes done, the main reactor loop can now be rewritten using Async instead of EventMachine:

endpoint = Async::IO::Endpoint.tcp(@config[-'host'], @config[-'port'])

Async do
  endpoint.bind do |server, task|
    install_int_term_trap(task: task, server: server)
    install_hup_trap(task: task)
    install_usr2_trap(task: task, server: server)

    task.add_timer(interval: 1) { Analogger.update_now }
    task.add_timer(interval: @config[-'interval']) { write_queue }
    task.add_timer(interval: @config[-'syncinterval']) { flush_queue }

    server.listen(128)

    server.accept_each do |peer|
      stream = Async::IO::Stream.new(peer)
      handler = protocol.new(stream: stream, peer: peer)
      handler.receive until stream.closed?
    end
  end
end

It is slightly more verbose than the EventMachine version, but the EventMaching version didn’t handle the signals from inside of the event loop, and this code is still pretty terse and easy to understand when reading it.

At this point, the lion’s share of the core renovations were done. There was just one more really important thing to handle before it would run again.

EventMachine has a very specific protocol cycle that is executed in response to IO activity. An EventMachine protocol is a class that implements methods to handle the various lifecycle touchpoints, such as sending data or receiving data.

So, for the EventMachine based version, the AnaloggerProtocol had a receive_data method that EventMachine would call every time the socket to which the protocol was attached received some data.

Async is a little more hands off than that, which is where this code comes in:

    server.accept_each do |peer|
      stream = Async::IO::Stream.new(peer)
      handler = protocol.new(stream: stream, peer: peer)
      handler.receive until stream.closed?
    end

Specifically, that code runs in a tight loop. It blocks until the reactor receives a connection to the server’s endpoint. When it does, it runs the code in that block.

It attaches an IO stream to the socket, and then creates an instance of the AnaloggerProtocol class, which is identical to the one used with EventMachine, with a single addition.

That addition is the receive method, which is implemented this way:

def receive
  while ( chunk = @stream.read_partial(8192) )
    receive_data chunk
  end
end

The receive method sits in a tight loop, reading data from the input stream, in chunks of up to 8k in size. At this point it hooks into the exact same logic that existed under EventMachine, and it just passes those data chunks to the receive_data method.

One level up, in the block that is calling receive, that executes in another tight until loop, doing nothing but receiving data on the stream until the stream is closed.

The only other changes to the EventMachine based protocol class are to change its base class:

  class AnaloggerProtocol < Async::IO::Protocol::Generic

And to tell the class initializer that it needs to receive both stream objects and peer objects. The peer object is used in some error messaging if Analogger receives bad data, so that it can report information about the client connection that sent the bad data.

And finally, one last small change. With EventMachine, the protocol superclass automatically had access to the data stream from the socket, so one could invoke send_data without doing anything special, and the data would get sent on the socket. With Async, though, we need to provide a simple method to do that:

def send_data data
  @stream.write data
end

And that’s it. If the tests still work, then we are probably in good shape.

6 runs, 51 assertions, 0 failures, 0 errors, 0 skips

The End Result

According to the (admittedly naive) benchmarks that the test suite runs, it is now faster than the EventMachine based version, at least on this system (Ubuntu 20.04 running under WSL 1 on Windows 10):

Async:

Analogger Speedtest -- short messages
Testing 100000 messages of 10 bytes each.

Message rate: 221083.49483482633/second (0.4523178)

Analogger Speedtest -- larger messages
Testing 100000 messages of 100 bytes each.

Message rate: 156525.31335193792/second (0.6388743)

EventMachine:

Analogger Speedtest -- short messages
Testing 100000 messages of 10 bytes each.

Message rate: 171825.14386060167/second (0.581987)

Analogger Speedtest -- larger messages
Testing 100000 messages of 100 bytes each.

Message rate: 105641.61062467258/second (0.9465967)

Final Thoughts

The conversion from EventMachine to Async was relatively painless. EventMachine provides a more structured, opinionated framework for implementing a system that runs within an event reactor than Async does, but with the understanding and adoption of a few simple conventions, the code was readily converted to run with Async.

The end result is a codebase that is a little easier to read. It is also a little more portable, and it seems to perform better on identical hardware. The EventMachine version of Analogger had no problem managing hundreds of concurrent log streams seamlessly, and the version implemented with Async will manage at least as well.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.