Build Messaging Between Ruby/Rails Applications with ActiveMQ

Publish/Subscribe Pattern with Message Brokers and usage of core concepts. Asynchronous execution of the application parts.

Kirill Shevchenko
7 min readJul 4, 2019

One of the tools that are rarely covered in the Ruby/Rails world are the message brokers (probably because they mostly written in Java). Everyone are familiar mostly with background jobs processing, but message brokers offer a more flexible approach to asynchronous execution. For example, you can create a message from one application and process it in another and continue executing without waiting for the response.

Some benefits that you get at the architectural level:

Fault Tolerance, Guaranteed delivery, Asynchronous communication(through Publish/Subscribe pattern), Loosely coupling, etc.

One of the messaging brokers that I used is ActiveMQ. ActiveMQ provides most of these features and I will consider building communication using this broker as an example.

ActiveMQ

Open source multi-protocol Messaging Broker.

Advantages:

Installing

ActiveMQ requires Java 7 to run and to build.

Brew (on MacOS)

The easier way to install

brew install apache-activemq
activemq start

Unix Binary Installation

Download the latest version here and follow up the documentation.

Docker

Unfortunately, the ActiveMQ doesn’t have official docker image. One of me checked that I can recommend is https://hub.docker.com/r/rmohr/activemq

docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq

CLI commands

Here are three most useful commands for beginning:

activemq start — Creates and starts a broker using a configuration file.

activemq stop — Stops a running broker.

activemq restart — Restarts a running broker.

To see all the commands just call activemq into terminal.

Start ActiveMQ

activemqINFO: Loading '/usr/local/Cellar/activemq/5.15.9/libexec//bin/env'INFO: Using java '/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java'INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get detailsINFO: pidfile created : '/usr/local/Cellar/activemq/5.15.9/libexec//data/activemq.pid' (pid '61388')

Monitoring ActiveMQ

You can monitor ActiveMQ using the Web Console by pointing your browser at http://localhost:8161/admin . Default credentials: login: admin, pass: admin

Messaging Patterns

The use of messaging brokers is simple and consists of only two concepts — Topics and Queues. But modern tools also provide a combination of these approaches and provide additional features such as implementations of Publish/Subscribe pattern, failover, etc. But first, let’s get to know the main concepts.

Queue
Queues are the base messaging pattern. They provide direct communication between a publisher and a subscriber. The publisher creates messages, while the consumer reads one after another. After a message was read, it’s gone from the Queue. If the queue has multiple subscribers, only one of them will get the message.

Topic
Topic implements one-to-many communication. Unlike a queue, every subscriber will receive a message sent by the publisher. And the main problem is that the message cannot be recovered for a single listener (for example, if the service is disconnected from reading Topic).

Virtual Topics
Virtual topics combine both approaches. While the publisher sends messages to a topic, subscribers will receive a copy of the message on their own related queue.

Protocols

ActiveMQ supports most of the communication protocols such as MQTT, OpenWire, REST, RSS and Atom, Stomp, WSIF, WebSocket and XMPP.

Getting Started

The easiest way is to start a feature review with a familiar protocol HTTP.

REST

ActiveMQ implements a RESTful API to messaging which allows any web capable device to publish messages using a regular HTTP POST or GET.

Publish to Queue

curl -u admin:admin -d "body=order_id" http://localhost:8161/api/message/shop?type=queue

Publish to Topic

curl -u admin:admin -d "body=order_id” http://localhost:8161/api/message/shop?type=topic

Integration with Ruby

The protocol that I will consider — STOMP (The Simple Text Oriented Messaging Protocol). STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms, and brokers.

There is a great gem to work with this protocol for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, SSL support.

gem 'stomp'bundle install

Initialize Connection

def config_hash 
{
hosts: [
{
login: ‘admin’,
passcode: ‘admin’,
host: ‘0.0.0.0’,
port: 61613,
ssl: false
}
]
}
end
client = Stomp::Client.new(config_hash)

Queues

Interface to Queues with STOMP is pretty simple. Just initialize the connection with the list of configurations and close after publishing the message.

Send a Message to Queue

client = Stomp::Client.new(config_hash)data = { order_id: 1, command: :paid }client.publish(‘/queue/user-notifications’, data.to_json)client.close

Receive a Message from Queue

client = Stomp::Client.new(config_hash)Thread.new do
client.subscribe('/queue/user-notifications') do |msg|
begin
msg = JSON.parse(msg.body)
# message processing...
rescue StandardError => e
Raven.capture_exception(e)
end
end
end

Note: Use exception handling to respond to them in time. For example, here I use Raven — Sentry wrapper.

Topics

Topics have a similar interface to Queues. Some examples below.

Send a Message to Topic

client = Stomp::Client.new(config_hash)data = { order_id: 1, command: :paid }client.publish(‘/topic/user-notifications’, data.to_json)client.close

Receive a Message from Topic

client = Stomp::Client.new(config_hash)Thread.new do
client.subscribe('/topic/user-notifications') do |msg|
begin
msg = JSON.parse(msg.body)
# message processing...
rescue StandardError => e
Raven.capture_exception(e)
end
end
end

Integration with Rails

ActiveMessagingAttempt to bring the simplicity and elegance of rails development to the world of messaging.

Add a gem to Gemfile version for Rails 5+

gem 'activemessaging', github: 'kookster/activemessaging', branch: 'feat/rails5'

And then execute:

bundle install

Initializing

After adding ActiveMessaging the following command (to add a base class for defining listeners and polling server)

rails g active_messaging:install  create  app/processors/application_processor.rb
create script/poller
chmod script/poller
create script/threaded_poller
chmod script/threaded_poller
create lib/poller.rb
create config/broker.yml
gemfile daemons

Generate a listener

rails g active_messaging:processor RailsQueue  create  app/processors/rails_queue_processor.rb
create config/messaging.rb
invoke rspec
create spec/functional/rails_queue_processor_spec.rb

Processor

Here you specify which will be listened by subscribes_to . When the message is published to RailsQueue then on_message executes with first arguments as the message body.

class RailsQueueProcessor < ApplicationProcessor
subscribes_to :rails_queue
def on_message(message)
logger.debug 'RailsQueueProcessor received: ' + message
end
end

Destination config

In initializer, we describe the destination of queue.

ActiveMessaging::Gateway.define do |s|
s.destination :rails_queue, '/queue/RailsQueue'
end

Run Application

script/poller run

Now you can publish to RailsQueue and Rails instance will receive messages.

Production

First of all, you should think about deployment and maintenance. So, for small teams (without DevOps/SRE/System Administrator Role) I suggest look into Cloud Solutions.

AmazonMQ

Some features:

  • Uses Apache KahaDB as its data store. Other data stores, such as JDBC and LevelDB, aren't supported
  • Offers low latency messaging, often as low as single digit milliseconds
  • Persistence out of the box
  • Backups

No need to use extra API, just follow STOMP protocol (or any other) which Amazon provides to you.

FIFO (First In, First Out)

To enable saving the order of delivery of messages, you need to add in the broker configuration total ordering.

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="&gt;">
<!--
The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other
consumers
by limiting the number of messages that are retained
For more information, see: http://activemq.apache.org/slow- consumer-handling.html
-->
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

Reference

Post related to my last talk on Ruby Wine #1 about Event-Driven Architecture and Messaging Patterns for Ruby Microservices.

Slides available on Speaker Deck.

Conclusion

For simple projects, the use of queues may not be justified, since adding an additional architectural layer responsible for queuing messages is not an easy task. And before use, you must weigh the pros and cons(for e.g.you are ready to spend time for support). Queuing allows you to scale the application more flexible and solve most performance problems that are not related to language, but to architecture.

--

--

Kirill Shevchenko
Kirill Shevchenko

Written by Kirill Shevchenko

Software Engineer. Interested in Microservices, Distributed Systems and Cloud Services.

No responses yet