Do you need a Push Notification Manager? – Redis PubSub to the rescue

Happy New Year!

And its indeed an awesome start to a surely prosperous year — my stunts with Redis PubSub worked ! We have been looking at getting a push notification manager in place to communicate between applications! Here is what we wanted to do:

  • Applications should be able to subscribe for notifications.
  • Notification wildcards should be supported!
  • Applications should be able to publish notifications to subscribers.
  • These should push notifications!
  • Guaranteed delivery of notifications i.e. Applications should get pending messages
  • Sequential delivery of notifications — even for the backlog.

Well, initially I went searching for various solutions – almost all pointed to EventMachine, Websockets & Cramp. When I came across Redis pubsub, it fit like a glove but it has the following drawbacks:

  • Its a lightweight non-persistent PubSub.
  • Missing or unavailable subscribers don’t get notifications.

I came across this mail thread on the Redis Google Group and realized that I would have to implement this. First a little background on Redis pubsub. For those not familiar with Redis, I would strongly recommend getting familiar with it :). Redis is a nosql based memory datastore which supports persistence asynchronously – in short, best of both worlds – speed and consistency. After installing the redis-server, I added ‘redis’ to the Gemfile and I was on my way.

Redis Pubsub supports subscriptions in 2 commands: subscribe and psubscribe, the latter supports wildcards – that is exactly what I wanted. A subscription block is simple:

uri = URI.parse(ENV["redis://127.0.0.1:6379"])
REDIS = Redis.new(:host => uri.host, :port => uri.port, :password => uri.password)

REDIS.psubscribe('my.first.event', 'basic.events.*', 'event.*' ) do |on|
 on.psubscribe do |event, total|
    Rails.logger.info("Subscribed to ##{event} (#{total} subscriptions)")
 end

 on.pmessage do |pattern, event, message|
    # Message processing here
 end

 on.punsubscribe do |event, total|
   Rails.logger.info("Unsubscribed for ##{event} (#{total} subscriptions)")
 end
end

The block parameter ‘on’ is a Subscription object, which registers the callbacks for ‘psubscribe’, ‘pmessage’ and ‘punsubscribe’. Some caveates:

  • If you use wildcards, remember to not subscribe to the same event more than once. You will receive the event twice. Eg. If you subscribe to ‘event.one’ and ‘event.*’, the callback will be invoked twice!
  • psubscribe is a blocking call. Remember NOT to use the REDIS connection for issuing any other commands otherwise it will throw an exception.
  • Since its a blocking call, its recommended to run this in a separate process (like DelayedJob) or a Thread at least, otherwise your Rails server will get blocked!

Now, my problem was that I also need to persist the events during a publish and process pending messages. To achieve this, I created a custom Redis Client that handles a backlog show below:

require 'redis'
require 'multi_json'

class PubSubRedis < Redis

  def initialize(options = {})
    @timestamp = options[:timestamp].to_i || 0 # 0 means -- no backlog needed
    super
  end

  # Add each event to a Sorted Set with the timestamp as the score
  def publish(channel, message)
    timestamp = Time.now.to_i
    zadd(channel, timestamp, MultiJson.encode([channel, message]))
    super(channel, MultiJson.encode(message))
  end

  # returns the backlog of pending messages [ event, payload ] pairs
  # We do a union of sorted sets because we need to support wild-card channels.
  def backlog(channels, &block)
    return if @timestamp == 0

    # Collect the entire set of events with wild-card support.
    events = channels.collect {|e| keys(e)}.flatten

    return if not events or events.empty? # no events to process

    destination = "backlog-#{Time.now.to_i}"
    zunionstore(destination, events)
    # We want events only after the timestamp so add the (. This ensures that
    # an event with this timestamp will not be sent.
    # TODO: We may have a condition where, multiple events for the same timestamp
    # may be recorded but will be missed out because of the (.
    messages = zrangebyscore(destination, "(#{@timestamp.to_s}", "+inf")

    messages.each do |message|
      event, payload = MultiJson.decode(message)
      block.call(event, payload)
    end

    # cleanup
    del(destination)

  end
end

The only thing left to do now is tie up loose ends. Its necessary to store the timestamp of the events received – what better store than Redis? I passed the timestamp as a variable to the custom Redis Client. We process the backlog first before invoking subscribe. Here is the snippet:

# config/initializer/redis.rb
LAST_TIMESTAMP='notification_timestamp_42037933' # some random key

uri = URI.parse('redis://127.0.0.1:6379')
# Publishing channel, since the REDIS_SUB will block!
REDIS_PUB = PubSubRedis.new(:host => uri.host, :port => uri.port, :password => uri.password)
REDIS_SUB = PubSubRedis.new(:host => uri.host, :port => uri.port, :password => uri.password,
               :timestamp => REDIS.get(LAST_TIMESTAMP).to_i )

# File where you are processing the backlog (DelayedJob or a separate thread)
REDIS_SUB.backlog('my.first.event', 'basic.events.*', 'event.*') do |event, payload|
  # backlog notification processing
end

# callback processing loop shown earlier.
REDIS_SUB.psubscribe('my.first.event', 'basic.events.*', 'event.*' ) do |on|
 ...
 on.pmessage do |pattern, event, message|
   REDIS_PUB.set(LAST_TIMESTAMP, Time.now.to_i)

   # callback processing
 end
 ...
end

Voila! My next post is going to be how I set this up on Heroku using Redistogo. Stay tuned.

I hope you found this article valuable. Feel free to ask questions and give feedback in the comments section of this post. Thanks!

13 thoughts on “Do you need a Push Notification Manager? – Redis PubSub to the rescue

  1. Maybe you can clear something for me. I’m new to redis, and trying to figure out if this pub/cub is what I need.

    Are ‘channels’ separate from the actual key-value stores? For example, say I add some new data to a set, and I want to alert a client. Do I need to create a channel for that data set in order to publish it, and subscribe to it? Or can the data structure serve as its own channel, and the client simply subscribes to it?

    Appreciate it!

  2. @Johnny,

    Storing data and pub/sub are 2 different things!

    Pub/Sub events are not stored — they are immediately dispatched to subscribers (who are available at that time). They are not persisted.

    Since I wanted to persist these events / channels, I save them in the RedisStore when they are published – not the other way round.

    ‘channels’ in my code above are keys — so when I publish an event say ‘hey.i.am.here’ with payload {‘somekey’ : ‘somevalue} , it is stored in a sorted set like this:

    zadd ‘hey.i.am.here’, ,

    So channel refers to the key for the sorted set. A subscriber will subscribe to this channel with Psubscribe:

    psubscribe ‘hey.i.am.here’ or maybe using wild-cards:
    psubscribe ‘*.i.am*’

    I do plan to open the source code soon — that would help understand the flow. Do stay tuned to this space!

  3. HI Gaurav,

    Very nice write up.

    How this redis pub sub differs from Amazon SNS + SQS pub sub, are both are same ?. i am bit confused which one i need to use for my IM. In SNS you can create up to 100 topic and it can have multiple Message Queues (SQS) so that particular service can subscribe for it for the Notification and Message Queue.

    in case of redis i am not sure how this will be achieved .

    My Assumption is to use SNS + SQS for pub sub and Redis store to store the messages,Rooms.. as key value etc.

    Please advice.

    Thanks & Regards,
    Srinivas

    1. SNS+SQS is a good option if you are on a “pay as you go” mode. If you are person who wants to control all the aspects, using redis pub-sub does the trick.

      Suppose I have a 10 chatrooms, called chat1 … chat10 and users called user1 .. user1000. It’s possible that a user maybe in multiple chatrooms, private chat etc or someone may want to broadcast their message.

      * Broadcast: “msg.*” , { data: “I have arrived” }
      * Contact only user23: “msg.user23”, { data: “are you there bro?” }
      * Contact all users of chatroom21: “msg.chatroom21.*”, { data: “Guys, calm down!” }
      * Contact user32 in chatroom21: “msg.chatroom21.user32”, { data: “bro, get over here” }

      Basically, using the wild-card and regex – I can DESIGN my events to give me as much flexibility I want. This post discusses how to store messages with persistence and chronology. SNS+SQS do it in a different way (message queues).

      So, your choice should be based on concurrency, reliance and chronology.. Since it’s a chat, concurrency and chronology are important, not reliance and durability (i.e. if the chat connection breaks, its not a cause for life and death). So, a simple redis pubsub should do the trick.

      1. Hi Gautham,

        Thank you very much for your time and comments. I will need to do some experiments before i take right choice for my requirements. I will be sharing the progress to you as writeup in future.

        Regards,
        Srinivas

  4. I have learn several good stuff here. Certainly value bookmarking for revisiting.
    I wonder how so much attempt you put to make the sort of excellent informative
    web site.

    1. Actually no, REDIS_PUB is fine as REDIS_SUB can not set values: Redis::CommandError (ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context) thanks

  5. Could you please give me example of psubscribe in node js code?
    My scenario is as follow:
    Let say there are two customer first is ‘A’ and second is ‘B’ and the start the chat and they generate unique ID let say the ID of ‘A’ is ‘aaaa’ and ID of ‘B’ is ‘bbbb’,
    if i do psubscribe h+ID+llo on node js program.
    here ID is ‘aaaa’ or ‘bbbb’.
    now i want to publish message on ‘aaaa’ only means want to send message to user ‘A’ only how can i achieve this.
    Please Please help?

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.