Ensar Basri Kahveci

overly distributed

Hazelcast IMDG 3.12 Introduces CP Subsystem

Posted at — Feb 26, 2019

I published this post at Hazelcast blog and put a copy here.

We put Java locks on steroids! Hazelcast is now the first and only In-Memory Data Grid (IMDG) to offer a linearizable and distributed implementation of the Java concurrency primitives backed by the Raft consensus algorithm. Sounds interesting? Just keep calm and carry on reading…

Hazelcast IMDG has been offering a set of concurrency APIs for a long time. We have IAtomicLong. IAtomicReference, ILock, ISemaphore, ICountDownLatch APIs, which are basically distributed versions of Java’s fundamental concurrency APIs. Although Hazelcast is mostly used for storing large amounts of data in memory, these concurrency APIs can be used for distributed coordination in many forms. For instance, multiple instances of a service can be running to provide high availability. These instances may use an ILock to enable only one instance to handle incoming requests. Another example of coordination could be that worker nodes can use an ICountDownLatch to meet with each other or with a master node to make progress. Coordination can be also required when processes try to agree on metadata information, such as discovery, system configuration, connection settings, feature flags, etc. For the locking scenario above, the service instance that acquires the lock can use an IAtomicRefence to advertise itself.

Strong consistency is a fundamental requirement for coordination tasks. A distributed coordination tool must maintain its consistency in failure cases. However, Hazelcast IMDG is mainly designed for high availability. In this regard, Hazelcast IMDG gracefully handles server, client, and network failures to maintain availability, but its core replication mechanisms, which are basically built on top of asynchronous replication, do not guarantee strong consistency in every failure scenario. In terms of the CAP principle, Hazelcast IMDG is AP.

We have been aware that this situation creates a drawback for our concurrency APIs. We tried to improve the reliability of these APIs in the previous releases, and be open about it. However, we knew that we could not provide a correct distributed implementation without abandoning asynchronous replication. For this reason, we decided to work on a new implementation of our concurrency APIs. That is what we have been busy with for the last year.

CP Subsystem

With Hazelcast IMDG 3.12, we are thrilled to announce a new module, CP Subsystem. CP Subsystem contains new implementations of Hazelcast’s concurrency APIs on top of the Raft consensus algorithm. As the name of the module implies, these implementations are CP with respect to the CAP principle and they live alongside AP data structures in the same Hazelcast IMDG cluster. They maintain linearizability in all cases, including client and server failures, network partitions, and prevent split-brain situations. We also reiterated and clarified distributed execution and failure semantics of our APIs, as well as made a lot of improvements on the API level. Last, we introduce a new API, FencedLock, that extends the semantics of java.util.concurrent.locks.Lock to cover various failures models we can face in distributed environments. We have been also verifying the new CP Subsystem via an extensive Jepsen test suite. We made use of the Jepsen tool to discover many subtle bugs as we were developing the new system. We have Jepsen tests for IAtomicLong, IAtomicReference, ISemaphore, and FencedLock.

Hazelcast is the first and only IMDG that offers a linearizable and distributed implementation of the Java concurrency primitives backed by the Raft consensus algorithm.

Let’s see CP Subsystem in action:

Config config = new Config();
config.getCPSubsystemConfig().setCPMemberCount(3);

HazelcastInstance instance1 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance instance2 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance instance3 = Hazelcast.newHazelcastInstance(config);

IAtomicLong counter = instance1.getCPSubsystem().getAtomicLong("counter");
counter.incrementAndGet();

In this code sample, we configure the CP Subsystem to be formed from 3 Hazelcast members. We call these members CP members. Once we start 3 members, the CP Subsystem is initialized in the background. Then, we fetch a IAtomicLong proxy from the CP Subsystem module offered by HazelcastInstance interface.

The current set of CP data structures (IAtomicLong, IAtomicReference, ICountDownLatch, ISemaphore, FencedLock) have relatively small memory footprints. Therefore, all members of your Hazelcast IMDG cluster do not have to be CP members. It is perfectly fine to configure only 3 or 5 CP members in a 100-member Hazelcast cluster. All members of a cluster have access to the CP Subsystem APIs.

When the CP subsystem starts, it internally creates 2 consensus groups. We call these groups CP groups. Each CP group runs the Raft consensus algorithm to elect a leader node and commit operations. The first CP group is the Metadata CP group that manages CP members and CP groups created for data structures. The second CP group is the Default CP group. When you do not specify a CP group name while fetching a CP data structure proxy, the proxy internally talks to the Default CP group.

It should be sufficient to use the Default CP group for most use-cases. However, you can create custom CP groups at run-time, for instance, if Raft leader node of the Default group becomes a bottleneck under your workload. You can distribute CP groups into CP members as shown in the following code sample:

Config config = new Config();
config.getCPSubsystemConfig().setCPMemberCount(5);
config.getCPSubsystemConfig().setGroupSize(3);

HazelcastInstance instance1 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance instance2 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance instance3 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance instance4 = Hazelcast.newHazelcastInstance(config);
HazelcastInstance instance5 = Hazelcast.newHazelcastInstance(config);

IAtomicLong counter1 = instance1.getCPSubsystem().getAtomicLong("counter1");
counter1.incrementAndGet();

IAtomicLong counter2 = instance1.getCPSubsystem().getAtomicLong("counter2@custom");
counter2.decrementAndGet();

Here, we configure the CP group size and CP member count parameters separately. What we do is, we form the CP subsystem with 5 CP members. However, CP groups will be created with 3 CP members. It means that when a CP group is created, its members will be randomly selected among all CP members available in the CP subsystem. With this method, you can improve throughput by running multiple instances of the Raft consensus algorithm via CP groups and having multiple Raft leaders. This approach is basically partitioning (sharding) and you decide how partitions will be created.

You can manage your CP members in the Hazelcast way. If a CP member crashes, you can remove it from the CP subsystem and promote another Hazelcast member to CP role. Hazelcast IMDG can also remove a CP member from the CP subsystem automatically if it is absent in the cluster for a configured duration. You can manage the CP subsystem programmatically or via the REST API. Dynamicity of the CP subsystem greatly helps to preserve the availability of the CP subsystem when failures occur.

Closing Words

We believe that the simplicity of our concurrency APIs will help developers implement coordination tasks with ease. With the new CP subsystem, we will work hard to make Hazelcast IMDG a good citizen of distributed coordination use cases. If you want to give it a try, just download the latest Hazelcast IMDG 3.12 release. You can also check out our reference manual or take a look at the code samples to learn more about the CP subsystem. We are looking forward to hearing your feedback!

More posts coming soon!

comments powered by Disqus