Happy New Year!
And its indeed an awesome start to a surely prosperous year — my stunts with Redis PubSub worked ! We have been looking at getting a push notification manager in place to communicate between applications! Here is what we wanted to do:
- Applications should be able to subscribe for notifications.
- Notification wildcards should be supported!
- Applications should be able to publish notifications to subscribers.
- These should push notifications!
- Guaranteed delivery of notifications i.e. Applications should get pending messages
- Sequential delivery of notifications — even for the backlog.
Well, initially I went searching for various solutions – almost all pointed to EventMachine, Websockets & Cramp. When I came across Redis pubsub, it fit like a glove but it has the following drawbacks:
- Its a lightweight non-persistent PubSub.
- Missing or unavailable subscribers don’t get notifications.
I came across this mail thread on the Redis Google Group and realized that I would have to implement this. First a little background on Redis pubsub. For those not familiar with Redis, I would strongly recommend getting familiar with it :). Redis is a nosql based memory datastore which supports persistence asynchronously – in short, best of both worlds – speed and consistency. After installing the redis-server, I added ‘redis’ to the Gemfile and I was on my way.
Redis Pubsub supports subscriptions in 2 commands: subscribe and psubscribe, the latter supports wildcards – that is exactly what I wanted. A subscription block is simple:
uri = URI.parse(ENV["redis://127.0.0.1:6379"]) REDIS = Redis.new(:host => uri.host, :port => uri.port, :password => uri.password) REDIS.psubscribe('my.first.event', 'basic.events.*', 'event.*' ) do |on| on.psubscribe do |event, total| Rails.logger.info("Subscribed to ##{event} (#{total} subscriptions)") end on.pmessage do |pattern, event, message| # Message processing here end on.punsubscribe do |event, total| Rails.logger.info("Unsubscribed for ##{event} (#{total} subscriptions)") end end
The block parameter ‘on’ is a Subscription object, which registers the callbacks for ‘psubscribe’, ‘pmessage’ and ‘punsubscribe’. Some caveates:
- If you use wildcards, remember to not subscribe to the same event more than once. You will receive the event twice. Eg. If you subscribe to ‘event.one’ and ‘event.*’, the callback will be invoked twice!
- psubscribe is a blocking call. Remember NOT to use the REDIS connection for issuing any other commands otherwise it will throw an exception.
- Since its a blocking call, its recommended to run this in a separate process (like DelayedJob) or a Thread at least, otherwise your Rails server will get blocked!
Now, my problem was that I also need to persist the events during a publish and process pending messages. To achieve this, I created a custom Redis Client that handles a backlog show below:
require 'redis' require 'multi_json' class PubSubRedis < Redis def initialize(options = {}) @timestamp = options[:timestamp].to_i || 0 # 0 means -- no backlog needed super end # Add each event to a Sorted Set with the timestamp as the score def publish(channel, message) timestamp = Time.now.to_i zadd(channel, timestamp, MultiJson.encode([channel, message])) super(channel, MultiJson.encode(message)) end # returns the backlog of pending messages [ event, payload ] pairs # We do a union of sorted sets because we need to support wild-card channels. def backlog(channels, &block) return if @timestamp == 0 # Collect the entire set of events with wild-card support. events = channels.collect {|e| keys(e)}.flatten return if not events or events.empty? # no events to process destination = "backlog-#{Time.now.to_i}" zunionstore(destination, events) # We want events only after the timestamp so add the (. This ensures that # an event with this timestamp will not be sent. # TODO: We may have a condition where, multiple events for the same timestamp # may be recorded but will be missed out because of the (. messages = zrangebyscore(destination, "(#{@timestamp.to_s}", "+inf") messages.each do |message| event, payload = MultiJson.decode(message) block.call(event, payload) end # cleanup del(destination) end end
The only thing left to do now is tie up loose ends. Its necessary to store the timestamp of the events received – what better store than Redis? I passed the timestamp as a variable to the custom Redis Client. We process the backlog first before invoking subscribe. Here is the snippet:
# config/initializer/redis.rb LAST_TIMESTAMP='notification_timestamp_42037933' # some random key uri = URI.parse('redis://127.0.0.1:6379') # Publishing channel, since the REDIS_SUB will block! REDIS_PUB = PubSubRedis.new(:host => uri.host, :port => uri.port, :password => uri.password) REDIS_SUB = PubSubRedis.new(:host => uri.host, :port => uri.port, :password => uri.password, :timestamp => REDIS.get(LAST_TIMESTAMP).to_i ) # File where you are processing the backlog (DelayedJob or a separate thread) REDIS_SUB.backlog('my.first.event', 'basic.events.*', 'event.*') do |event, payload| # backlog notification processing end # callback processing loop shown earlier. REDIS_SUB.psubscribe('my.first.event', 'basic.events.*', 'event.*' ) do |on| ... on.pmessage do |pattern, event, message| REDIS_PUB.set(LAST_TIMESTAMP, Time.now.to_i) # callback processing end ... end
Voila! My next post is going to be how I set this up on Heroku using Redistogo. Stay tuned.
I hope you found this article valuable. Feel free to ask questions and give feedback in the comments section of this post. Thanks!
Maybe you can clear something for me. I’m new to redis, and trying to figure out if this pub/cub is what I need.
Are ‘channels’ separate from the actual key-value stores? For example, say I add some new data to a set, and I want to alert a client. Do I need to create a channel for that data set in order to publish it, and subscribe to it? Or can the data structure serve as its own channel, and the client simply subscribes to it?
Appreciate it!
@Johnny,
Storing data and pub/sub are 2 different things!
Pub/Sub events are not stored — they are immediately dispatched to subscribers (who are available at that time). They are not persisted.
Since I wanted to persist these events / channels, I save them in the RedisStore when they are published – not the other way round.
‘channels’ in my code above are keys — so when I publish an event say ‘hey.i.am.here’ with payload {‘somekey’ : ‘somevalue} , it is stored in a sorted set like this:
zadd ‘hey.i.am.here’, ,
So channel refers to the key for the sorted set. A subscriber will subscribe to this channel with Psubscribe:
psubscribe ‘hey.i.am.here’ or maybe using wild-cards:
psubscribe ‘*.i.am*’
I do plan to open the source code soon — that would help understand the flow. Do stay tuned to this space!
Hi
Great article !!! Did you ever write a follow-up post on getting this all working on Heroku using Redistogo ?
Thanks
Dave
Hi Dave,
I remember drafting it — but I cannot find it. LoL. Thanks for the ”push”
HI Gaurav,
Very nice write up.
How this redis pub sub differs from Amazon SNS + SQS pub sub, are both are same ?. i am bit confused which one i need to use for my IM. In SNS you can create up to 100 topic and it can have multiple Message Queues (SQS) so that particular service can subscribe for it for the Notification and Message Queue.
in case of redis i am not sure how this will be achieved .
My Assumption is to use SNS + SQS for pub sub and Redis store to store the messages,Rooms.. as key value etc.
Please advice.
Thanks & Regards,
Srinivas
SNS+SQS is a good option if you are on a “pay as you go” mode. If you are person who wants to control all the aspects, using redis pub-sub does the trick.
Suppose I have a 10 chatrooms, called chat1 … chat10 and users called user1 .. user1000. It’s possible that a user maybe in multiple chatrooms, private chat etc or someone may want to broadcast their message.
* Broadcast: “msg.*” , { data: “I have arrived” }
* Contact only user23: “msg.user23”, { data: “are you there bro?” }
* Contact all users of chatroom21: “msg.chatroom21.*”, { data: “Guys, calm down!” }
* Contact user32 in chatroom21: “msg.chatroom21.user32”, { data: “bro, get over here” }
Basically, using the wild-card and regex – I can DESIGN my events to give me as much flexibility I want. This post discusses how to store messages with persistence and chronology. SNS+SQS do it in a different way (message queues).
So, your choice should be based on concurrency, reliance and chronology.. Since it’s a chat, concurrency and chronology are important, not reliance and durability (i.e. if the chat connection breaks, its not a cause for life and death). So, a simple redis pubsub should do the trick.
Hi Gautham,
Thank you very much for your time and comments. I will need to do some experiments before i take right choice for my requirements. I will be sharing the progress to you as writeup in future.
Regards,
Srinivas
I have learn several good stuff here. Certainly value bookmarking for revisiting.
I wonder how so much attempt you put to make the sort of excellent informative
web site.
Very informative post, thanks.
Should it be REDIS_SUB.set(LAST_TIMESTAMP, Time.now.to_i), and not REDIS_PUB.set..?
Actually no, REDIS_PUB is fine as REDIS_SUB can not set values: Redis::CommandError (ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context) thanks
Could you please give me example of psubscribe in node js code?
My scenario is as follow:
Let say there are two customer first is ‘A’ and second is ‘B’ and the start the chat and they generate unique ID let say the ID of ‘A’ is ‘aaaa’ and ID of ‘B’ is ‘bbbb’,
if i do psubscribe h+ID+llo on node js program.
here ID is ‘aaaa’ or ‘bbbb’.
now i want to publish message on ‘aaaa’ only means want to send message to user ‘A’ only how can i achieve this.
Please Please help?
This is logic on the server side. I don’t write a lot of node, but you can easily port this code and add your logic!