Distributed Coordination with Zookeeper

Zookeeper is a system for coordinating applications and provides a framework for solving several problems that can arise when building applications that must be highly available, distributed, tolerant to network partitions and node failures:

  1. Data update notifications. Imagine you have a few processes running to processes some data. Whenever one process is done, it needs to let the others know it’s ready for the next process to pick it up. A rudimentary way to accomplish this would be for all the processes to periodically read some file or database record to see if it is their turn. That works, but everything needs to poll, and that can lead to locking of the file or database table. With Zookeeper, you have a tree of nodes, like a mini file system, and can query for some data and set a watch on the path to that data. If the data is added, deleted, or otherwise changed on that path, your watch will be immediately notified. This is a very helpful tool for passing status information or other data between local or distributed processes, which is something you will do quite a lot when increasing scalbility.
  2. Distributed lock (semaphore). If you’ve written any multithreaded code, you likely had to occasionally use some mechanism to lock access to some shared resource. If you’re working with distributed resources and want to lock access to those, it’s not sufficient to use a locking mechanism in your own process or even in the OS – you need a distributed lock. To accomplish this in Zookeeper, create a node that represents the resource you are locking, and anything that wishes to have access to that resource must be able to create the node. If the node already exists, that means another application has the lock, so you should set a watch to know when it is released (the node is deleted). Because applications crash and network partitions occur, you will want a lock created by an application released even if the application abnormally disconnects, which can be accomplished by creating an “ephemeral” node for the lock. When the session associated with an ephemeral node ends, the node will be automatically deleted, and for your distributed application, that means the lock is released.
  3. Leadership election. Occasionally, your clustered application will need an active/passive capability. An election needs to occur to determine the active member, and if the that member dies, another election must occur to find a new active member. This can be accomplished in Zookeeper by all cluster members attempting to create data at a specific path in Zookeeper. The first one to create the node becomes the active member, and it should delete the node when it shuts down. The rest watch for the node to be deleted, at which point, they all try to create the node, and the winner becoomes the active member. Because application crash, Zookeeper nodes can be created as “ephemeral” so that if the connection closes and session ends, the ephemeral nodes created by that session will be removed automatically. So if the active member crashes, Zookeeper removes the node and passive members will hold an election.

Lots of nice use cases, so how do we use it? First you need a Zookeeper instance, or an “ensemble” if you want high availability of Zookeeper itself. There are plenty of tutorials for that, one of the quickest ways to get a simple instance up and running is to install the “zookeeperd” package.

yum install -y zookeeperd

or

apt-get install -y zookeeperd

After installing, you may need to start Zookeeper (sudo service zookeeper start), and then by default it will listen on port 2181.

This isn’t a tutorial on administration of Zookeeper itself, and I suggest you read the documentation on clustering, backup, purging the transaction logs, etc. On to actually using it from .NET.

The authors of Zookeeper didn’t do a great job of documenting the protocol, nor do they create a .NET client. A few have reverse engineered the Java client, and the best library for this currently is ZooKeeperNetEx. This library is well maintained by developers that are responsive to reported issues and keep up to date with the latest .NET technologies. Big features of this library:

  • Fully supports Windows .NET, Mono, and CoreCLR
  • Asyncronous through and through, so you will not find yourself blocking threads waiting on the Zookeeper protocol.
  • Comes with a companion package, ZooKeeperNetEx.Recipes that has implementation of common scenarios used in Zookeeper.
open org.apache.zookeeper
open org.apache.utils

// A Zookeeper Watcher that watches for the connection to be made successfully.
type WaitToConnect() =
    inherit Watcher()
    // Need to return a task that only completes once connected to a Zookeeper node.
    let connectedCompletionSource = TaskCompletionSource()
    // WaitAsync returns a task that completes once connected.
    member cw.WaitAsync () = connectedCompletionSource.Task
    // The process method is called by the framework when the connection state changes.
    override cw.``process``(evt:WatchedEvent) =
        match evt.getState() with
        | Watcher.Event.KeeperState.SyncConnected -> connectedCompletionSource.TrySetResult(true) |> ignore
        | _ -> ()
        Task.FromResult(true) :> Task

// A function to asynchronously connect to Zookeeper, wait until successfully connected, create a node, and disconnect.
let connectCreateNodeAndDisconnect () =
    async {
        let zkUrl = "localhost:2181"
        let sessionTimeout = 10000
        let newPath = "/mynode"
        let watcher = WaitToConnect ()
        let zk = ZooKeeper(zkUrl, sessionTimeout, watcher)
        let! connected = watcher.WaitAsync () |> Async.AwaitTask
        let! newNode = zk.createAsync (newPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) |> Async.AwaitTask
        do! zk.closeAsync() |> Async.AwaitTask
    }

// Call this function, in this case synchronously
connectCreateNodeAndDisconnect () |> Async.RunSynchronously

That’s nice, we created a node. For fun, you can run with your Zookeeper node stopped (sudo service zookeeper stop), and it will sit and wait. It’s an async wait, so just a TPL task not blocking a thread for the I/O with Zookeeper, although Async.RunSynchronously will block our main thread so the whole application doesn’t just exit. Start Zookeeper (sudo service zookeeper start) and the client connects, the Task returned by WaitAsync() completes, and the code moves forward.

You can use a similar pattern to implement other watchers, such as waiting for a node to be created or deleted so you can perform the leader election mentioned above. ZooKeeperNetEx has a companion package, ZooKeeperNetEx.Recipes, which contains several common patterns, like distributed locking and leadership election.  And example of leadership election from these recipes is show below.

open org.apache.zookeeper
open org.apache.utils
open org.apache.zookeeper.recipes.leader

let ElectLeader (zkUrl:string) =
    let electionNode = "/election" // This is the parent node where this election will occur.

    // Connect to Zookeeper and create the node for this election.
    let connectAndSetupElectionAsync () = async {
        let w = WaitToConnect()
        let zk = ZooKeeper(zkUrl, timeout, w)
        let! connected = w.WaitAsync() |> Async.AwaitTask
        let! electionNodeExists = zk.existsAsync(electionNode) |> Async.AwaitTask
        match electionNodeExists with
        | null -> // When a node doesn't exist, you get a null
            let! electionNodeCreated = zk.createAsync(electionNode, [||], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) |> Async.AwaitTask
            printfn "Created election node %" electionNodeCreated
        | _ -> ()
        return zk
    }

    // When we are done, we will need to close.
    let closeAsync (zk:ZooKeeper) = async {
        do! zk.closeAsync() |> Async.AwaitTask
    }

    // Define an async workflow for starting the election.  Each candidate will do this.
    let startElectionAsync (zk:ZooKeeper) hostname = async {
        let election = LeaderElectionSupport(zk, electionNode, hostname)
        do! election.start() |> Async.AwaitTask
        return election
    }

    // Who won the election?
    let getLeaderAsync (election:LeaderElectionSupport) = async {
        return! election.getLeaderHostName() |> Async.AwaitTask
    }

    // We're all done, we can stop the election.
    let stopElectionAsync (election:LeaderElectionSupport) = async {
        do! election.stop() |> Async.AwaitTask
    }

    // Let's hold an election.
    async {
        let! zk = connectAndSetupElectionAsync ()
        let announceWinner candidate = printfn "%s won!" candidate
        // Create 3 candidates, they all run, and they all agree one wins
        let candidacy1 = async {
            let! election = startElectionAsync zk "candidate1"
            let! leader = getLeaderAsync election
            leader |> announceWinner
            do! Async.Sleep(10000)
            do! election |> stopElectionAsync
        }
        let candidacy2 = async {
            let! election = startElectionAsync zk "candidate2"
            let! leader = getLeaderAsync election
            leader |> announceWinner
            do! Async.Sleep(10000)
            do! election |> stopElectionAsync
        }
        let candidacy3 = async {
            let! election = startElectionAsync zk "candidate3"
            let! leader = getLeaderAsync election
            leader |> announceWinner
            do! Async.Sleep(10000)
            do! election |> stopElectionAsync
        }
        let! electionsCompleted = [candidacy1; candidacy2; candidacy3] |> Async.Parallel
        do! zk |> closeAsync
    } |> Async.RunSynchronously

If you run the code multiple times, you’ll see that different candidates win each time. There is no reason this code needs to run on the same machine – typically it would run on each member in a cluster, and any node would check to see if it is the leader before performing its leadership duties. If one wants to drop out, even the current leader, it calls stopElectionAsync and one of the other candidates will be chosen.

Gotchas

  • Logs – Zookeeper keeps a transaction log for synchronization between nodes, and you need to clean this up or it will fill up the disks. This is not hard to automate, just don’t forget to do it.
  • Connection limit – the default is 50 connections from a single IP address. You probably want to share a single connection for your entire application, or at least create some sort of pool.
  • Heavy weight connection – the connection takes time to negotiate – it’s not a stateless protocol like HTTP. Avoid scenarios where you might create a connection per HTTP request or something that you cannot really control, because they can really hinder your application performance if constantly connecting and disconnecting.

.NET in the Linux Ecosystem

Running .NET applications on Linux provides access to API’s and libraries that augment the capabilities of horizontally scaled applications, but more importantly, living in the Linux ecosystem provides access to an ecosystem of tools for distributed application development and management.

You may be a seasoned .NET developer, excited about .NET core and mono and being able to take your code over to Linux where you don’t have to pay for the OS license or you could run code on your toaster. That is exciting, but Linux is great for software development because of all the software available to scale up your distributed application. I’ll discuss the following benefits and show the tools, libraries, and code I’m using for each.

  1. Cluster Coordination with Zookeeper
    a. Distributed semaphore
    b. Leader election
    c. Emerging alternatives – Consul + .NET client
  2. Mesos – a layer above the (V)M
    a. Lifetime and HA for clustered applications
    b. Mono & CoreCLR
    c. Docker
    d. Service discovery & mesos-dns
    e. Windows mesos-slave – work in progress
  3. Storm
    a. Scaling processing horizontally
    b. F# on Storm

All code samples are in F#, as it provides a relatively succinct language for demonstrating the functionality and also has rich async primitives, which are used quite frequently in distributed applications.