Subscribing to RethinkDB Record Changes in Go

RethinkDB is a document storage database with excellent clustering capabilities. However it can also auto notify when records are updated in real-time, and this article is going to explore how to do that using Go.

RethinkDB is a document storage database with real-time record pub/sub capabilities.

What is RethinkDB?

RethinkDB is a document storage database with a lot of really nice modern features, like multiple nodes joining a cluster and automatically rebalancing and re-sharding data among themselves when that data changes. In this article though, we’ll be exploring the changes feature, so we can get automatic updates when something in the data for any given row, or even a whole table, gets changed.

The RethinkDB Data Explorer/Dashboard

A few details about RethinkDB at a glance:

  • Written in C++
  • Default ports:
    • 28015 for client drivers (command execution)
    • 29015 for inter-cluster communication
    • 8080 for admin dashboard
  • Config file location
    • MacOS: $BREW_HOME/etc/rethinkdb.conf
    • Linux (Ubuntu, from RethinkDB’s apt): /etc/rethinkdb/instances.d/<HOSTNAME>.conf
    • Windows: I have no idea, good luck with that 🙂

Didn’t RethinkDB die years ago?

You might have heard back in 2016 that the RethinkDB project shut down. That’s true: it did. The company behind it essentially ran out of money. Then something miraculous happened: the open source community picked it up and carried it forward, eventually joining the Linux Foundation in 2017 and as of this writing (May 2020) has multiple well-known sponsors. So the idea that RethinkDB is a “dead” project is just “dead” wrong.

RethinkDB Joined the Linux Foundation in 2017 and presently has users including NASA along with sponsors like Digital Ocean, Atlassian and Netlify.

Install RethinkDB

$ brew install rethinkdb

On other platforms, make use of the installation directions at rethinkdb.com.

Configure RethinkDB

Now let’s edit the default configuration. On MacOS, you’ll find this under your brew directory, etc/rethinkdb.conf. In this code example below, I’ve done a grep on my own configuration for any line that’s not commented; you can see the values for the corresponding configuration directives here.

$ grep "^[^#]" /Users/jah/.brew/etc/rethinkdb.conf
directory=/Users/jah/.brew/var/rethinkdb
log-file=/Users/jah/.rethinkdb.log
bind=all
canonical-address=nova.local
server-name=nova.local

You can set the directory and log file to wherever you want as long as you have write access to that location on disk. bind=all is highly recommended so you can access RethinkDB at either localhost or 127.0.0.1 or your network IP address. Finally, my machine is specified in multicast DNS as nova.local, so put your hostname here as well so you can have other clients connect by hostname instead of internal IP address.

Next, on MacOS, use brew services to start and monitor RethinkDB:

$ brew services start rethinkdb
$ brew services list
rethinkdb         started jah  /Users/jah/Library/LaunchAgents/homebrew.mxcl.rethinkdb.plist

If you installed RethinkDB on Ubuntu using their apt repository, it should already be running and monitored. Try running systemctl rethinkdb to view its status.

If you’re on another platform, try the installation instructions at the RethinkDB website.

Let’s Write Some Code

To use RethinkDB with Go, we’re going to use the rethinkdb-go database driver (source code available on GitHub). You can see its full API documentation here.

Side note: You might want to use Visual Studio Code with the Go extension installed. The extension is excellent, and provides “intellisense” while writing Go code so you can see what types a function returns, how many arguments it returns and what they are, etc. Highly recommended.

Create the tv_shows Table

Let’s get started. Create a file: go.mod. We’re going to populate it with the following to get started:

module github.com/jahio/rethinkdb-go

Yep, just one line. This is our module definition that tells Go where our code will eventually live and what it’ll be called. (Feel free to rename it to whatever GitHub repository you’ll store yours in.)

Now, add another file: main.go:

package main

import (
	"log"

	r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

func main() {
	log.SetFlags(0)

	rdbOpts := r.ConnectOpts{
		Address: "localhost:28015",
	}

	rconn, err := r.Connect(rdbOpts)
	checkError(err)

	err = r.DB("test").TableCreate("tv_shows").Exec(rconn)
	checkError(err)
}

func checkError(err error) {
	if err != nil {
		log.Println(err)
		return
	}
}

In this file, we’re importing gopkg.in/rethinkdb/rethinkdb-go.v6 as our RethinkDB database driver and renaming it to r in our code for convenience sake, which you can see kick in around line 12.

The first thing we do is set our connection options for RethinkDB. In this case, we create a new variable, rdbOpts, based on the rethinkdb package’s ConnectOpts struct with an Address property set to localhost:28015. This the default location and port; if you changed either one of those (maybe you’re running RethinkDB on another machine), update this accordingly.

Next, we connect to RethinkDB with that options object and check for an error if the connection failed. After that, we run a query:

err = r.DB("test").TableCreate("tv_shows").Exec(rconn)

Here we’re method chaining a pseudo RQL query to RethinkDB. We start with r, the RethinkDB driver module, then tell it what database to use with DB("test").

Note that RethinkDB sets up a table for every new installation called “test” which you can use to test out queries and just generally experiment with. Never store permanent data in this table.

Next, we call TableCreate("tv_shows")TableCreate is the Golang equivalent of tableCreate in RQL and JavaScript, or table_create in Ruby. They all do the same thing: create a table with the string passed in. In our case, this is tv_shows. We’ll use some good shows here (shows I happen to like) to demonstrate how this works.

Finally, we call Exec(rconn) here to tell RethinkDB to execute the command over the rconn connection. Passing the connection to the Run() or Exec() functions is a necessary step every time, and may throw you an error if you forget to do it (and trust me, you’ll forget here and there).

Now let’s run the code. In a terminal, cd into the directory you have this code in and run go run main.go. This will compile the program and run it, thus creating your table.

To verify you’ve created the table successfully, check it out in the RethinkDB Console:

Access the RethinkDB Console at http://localhost:8080 and look for the tv_shows table.

View the code at this point in the tutorial on GitHub.

Add some tv_shows

Now that the table has been created, let’s add a few shows to it. First, download this JSON file and save it in the same directory as main.go, calling it shows.json. Next, modify the code above to look like this:

package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"

	r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

type show struct {
	Name     string    `json:"name" gorethink:"name"`
	Genre    string    `json:"genre" gorethink:"genre"`
	Website  string    `json:"website" gorethink:"website"`
	Episodes []episode `json:"episodes" gorethink:"episodes"`
}

type episode struct {
	Name    string `json:"name" gorethink:"name"`
	Summary string `json:"summary" gorethink:"summary"`
}

func main() {
	log.SetFlags(0)

	rdbOpts := r.ConnectOpts{
		Address: "localhost:28015",
	}

	rconn, err := r.Connect(rdbOpts)
	checkError(err)

	// Make sure you have shows.json in the same directory as this file.
	file, err := ioutil.ReadFile("shows.json")
	checkError(err)

	var shows []show
	err = json.Unmarshal(file, &shows)
	checkError(err)

	result, err := r.Table("tv_shows").Insert(shows).RunWrite(rconn)
	checkError(err)
	printObj(result)

}

func checkError(err error) {
	if err != nil {
		log.Println(err)
		return
	}
}

func printObj(v interface{}) {
	vBytes, err := json.Marshal(v)
	checkError(err)
	fmt.Println(string(vBytes))
}

Now, run the code again in a terminal with go run main.go.

To walk through the changes here, first we’ve created two new types, Show and Episode, both of which are structs. This is so we can serialize the data in the JSON file and insert it into the database later.

Next, we use ioutil to read the contents of shows.json, then we use the json package to Unmarshal (convert from JSON to objects) that payload of data into instances of those structs we mentioned earlier. Then we insert those structs into RethinkDB by passing that slice directly to the driver:

result, err := r.Table("tv_shows").Insert(shows).RunWrite(rconn)
checkError(err)
printObj(result)

Note: Using the rethinkdb-go driver, you use RunWrite to write information to the database, Run to read information from the database, and Exec to simply run a command that you need no output for, like creating a table or an index.

Then we’re using the new printObj() function to print the result of that insertion operation as JSON out on the console. What’s that look like, you might ask?

{
  "Errors":0, "Inserted":2, "Updated":0, "Unchanged":0, 
  "Replaced":0, "Renamed":0, "Skipped":0, "Deleted":0, 
  "Created":0, "DBsCreated":0, "TablesCreated":0, "Dropped":0, 
  "DBsDropped":0, "TablesDropped":0, 
  "GeneratedKeys":
  [
    "c9461b23-14af-4c47-8aaa-bff62957486f",
    "10e81779-3b5b-413d-9e27-dbc1af9d4089"
  ], 
  "FirstError":"", "ConfigChanges":null, "Changes":null
}

From the returned data, we can get a lot of information. First of all, we see that two records were inserted, and nothing was deleted, created, or threw errors (all of which are good signs). Perhaps more importantly, we see the unique IDs (UUIDs) the database assigned to our new objects in the GeneratedKeys field.

You’ll probably note that we didn’t specify these IDs. RethinkDB generates those for us automatically, and in fact may complain when you try to insert a record with an ID of your own.

Subscribe to all changes on the tv_shows table

Now we’re going to have our Go program subscribe to changes on the tv_shows table and report those changes to us in the console. To make those changes, you’ll insert another show manually through the console. To start, modify the code as follows:

package main

import (
	"encoding/json"
	"fmt"
	"log"

	r "gopkg.in/rethinkdb/rethinkdb-go.v6"
)

type show struct {
	Name     string    `json:"name" gorethink:"name"`
	Genre    string    `json:"genre" gorethink:"genre"`
	Website  string    `json:"website" gorethink:"website"`
	Episodes []episode `json:"episodes" gorethink:"episodes"`
}

type episode struct {
	Name    string `json:"name" gorethink:"name"`
	Summary string `json:"summary" gorethink:"summary"`
}

func main() {
	log.SetFlags(0)

	rdbOpts := r.ConnectOpts{
		Address: "localhost:28015",
	}

	rconn, err := r.Connect(rdbOpts)
	checkError(err)

	for {
		result, err := r.Table("tv_shows").Changes().Run(rconn)
		checkError(err)

		var response interface{}
		for result.Next(&response) {
			printObj(response)
		}
		checkError(result.Err())
	}

}

func checkError(err error) {
	if err != nil {
		log.Println(err)
		return
	}
}

func printObj(v interface{}) {
	vBytes, err := json.Marshal(v)
	checkError(err)
	fmt.Println(string(vBytes))
}

See the code at this point in the tutorial on GitHub

Now, run that code again in your terminal with go run main.go. You’ll see a new line and just blank output, but don’t worry, it’s running and looking for changes.

Next, copy and paste the below code into the “Data Explorer” section of your RethinkDB Console:

r.db('test').table('tv_shows').insert({name: "Breaking Bad", website:"https://www.amc.com/shows/breaking-bad", genre: "Crime, Drama", episodes: [{name: "Pilot", summary: "Chemistry teacher (Bryan Cranston) with cancer teams up with a former student (Aaron Paul) to cook meth on Breaking Bad"}, { name: "Live Free or Die", summary: "As Walt deals with the aftermath of the Casa Tranquila explosion, Hank works to wrap up his investigation of Gus' empire."}]})

Insert this (copy and paste is fine) into your RethinkDB Console

You should see, in your browser, that one record was created:

After creating a new show in the database using the RethinkDB Administration Console

And in your terminal, you should see the following:

{"new_val":{"episodes":[{"name":"Pilot","summary":"Chemistry teacher (Bryan Cranston) with cancer teams up with a former student (Aaron Paul) to cook meth on Breaking Bad"},{"name":"Live Free or Die","summary":"As Walt deals with the aftermath of the Casa Tranquila explosion, Hank works to wrap up his investigation of Gus' empire."}],"genre":"Crime, Drama","id":"6645b41d-9ff8-4e49-9e81-b2dea2f98794","name":"Breaking Bad","website":"https://www.amc.com/shows/breaking-bad"},"old_val":null}
Sample console output from the Go program

And there you have it!

The key thing to remember is that you can call changes() on any RethinkDB query, thus enabling you to use a secondary index to find the thing you’re looking for (or group of things) and subscribe to changes on just that group of things, instead of an entire table as in this example (which was shown just for brevity and simplicity).  While not trivial, creating a real-time WebSocket streaming system that notifies with new data every time the query selected has changes made to it is very possible to build in an efficient manner.

Go is a great language with lots of fantastic features, like goroutines and channels, that lend themselves well to building a subscribe system around RethinkDB.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.