Tuesday, 15 October 2013

Java Memory Model and Happens-Before Internals

Recently I jumped into some concurrency code that takes advantage of happens-before guarantee to ensure a specific configuration was loaded in an executing thread. I was also aware that a good bunch of java concurrency classes, especially AbstractQueuedSynchronizer, makes usage of same synchronization piggybacking technique.

Although Java Memory Model is documented well, I was still wondering which assembler instructions are used to ensure “happens-before” guarantees at multi-core CPU level.
I managed to get some free time and play with java at low level and tested three synchronization techniques. I also found a good documentation explaining  Synchronized Memory Access details.
According to Microsoft’s x86 Architecture following actions happen for lock instructions.
  1. Before issuing the instruction, the CPU will flush all pending memory operations to ensure coherency. All data pre-fetches are abandoned.
  2. While issuing the instruction, the CPU will have exclusive access to the bus. This ensures the atomicity of the load/modify/store operation.
Below, I will detail how java concurrency utilities are translated to low level assembler instructions. Details of assembly printing can be found on PrintAssembly wiki page.


That technique uses synchronized keyword to lock variables properly. As it synchs whole method block, therefore there are two lock cmpxchg instructions in the generated assembly code.


Below code piece uses volatile keyword to set the counter value.

Volatile keyword causes inserting a lock instruction to ensure happens-before guarantees and flushes pending changes to make sure these changes are visible.


Below code shows a simple AtomicInteger being used a counter.

When above code is executed with -XX:+UnlockDiagnosticVMOptions   -XX:CompileCommand=print,Counter.inc, similar assembly instructions are seen in the assembly dump log.

Tuesday, 1 October 2013

Time Series Map

Recently, I needed a time-series data where a ticker information was needed to be stored every millisecond. My initial thought was I could create a map which would have millisecond as the key and data as the value.

Map<Long, Tick> tickMap;

I soon realized also needed to remove old entries from the map as the goes on to keep memory consumption in control. Again I thought I could evict expiring map entries from map after a put operation or use guava cache builder. Although that would work, I would cause creating/removing a lot of objects and hence risking garbage collector runs.
I remembered Disruptor usage from my Betfair times, the magic Ring Buffer solution which eliminates GC runs by reusing early initialized objects. Ring buffer is basically an array list with a predefined capacity. Instead of adding/removing entries from the ring, entries are updated to reflect the last added object. Locating an entry’s position can be determined by a bitwise operation of index mask and entry’s sequence value.
Disruptor/Ring Buffer

The sequence number is basically an always increasing pointer, yielding the next long number and index mask is simply size of buffer minus one. With that information in my hand, I can build a ring buffer to store 24 hour volatile data without driving GC crazy. All i need is defining a ring buffer with a power of 2 size, above 60 x 60 x 24x1000 milliseconds.  The reason why power of 2 is required is for performance reason. Computers like binary systems and it seems an index locating function using a bitwise operation is cheaper than a modulus operation. Java Hashmap internally uses same logic for determining the size of the internal buckets.

Sunday, 8 September 2013

Scalability Strategies

I want to highlight some strategies that helps building scalable applications without much pain. I will try to shed some light on data decomposition, separated reader/writer and false sharing. These are principles affecting an application performance from top to down.

Data decomposition

Data decomposition is a top-down technique for breaking work into small tasks that can be parallelized. That strategy allows creating data chunks which can be processed in parallel. This strategy is generally applied in database systems, for example both Cassandra and MongoDB use that strategy to scale up. Both databases have a partition strategy which calculates a hash value and then uses that value to locate a node in the cluster.
There are also real life examples showing that strategy works very well and not applying it causes lot of trouble.
For example, I use Tottenham Court Rd tube station for my daily commuting routine and observe how life can be painful when not following decomposition strategies. This station has a notorious access to northern line users, requiring them to go to Central line platform then pass to the opposite to access northern line, moreover passenger traffic on the platform is two ways which causes hundreds of people bumping to each other during peak hours, although they are interested in different routes.
Blocking platform entries

Single Writer

That strategy states using a single writer to change a state of a resource frequently used in an application, helping to eliminate memory contention which can be seen in today’s multi-core processors. Synchronizing data among different cores has big performance penalty because of bus traffic, cache coherency, locking cost and cache misses. With single writer, a cpu can utilize modern cpu advancements, knowing that there is no other writers, it doesn't need to force a sync with RAM and can optimize L1 cache usage. Moreover with single writer there is not a lock acquire/release stage that happen with multi writers.
A single picture can explain single writer better than writing hundreds of lines. Let’s assume there are four baton runners which never gets tired and there is not a requirement to exchange the baton at the exchange point. Instead of having multiple runners, only one runner, never tiring, can carry the baton easily. In the second case, the runner doesn't need to pass the baton at the exchange point. We can draw an analogy between multiple runners and multiple threads, in both case there is a synchronization overhead and exchange. Similarly we can only have one thread modifying a shared resource so that it eliminates the synchronization work.

False Sharing

False sharing is notoriously known as the silent performance killer. It arise in parallel applications when two threads operate on an independent data in the same memory address region, stored on the same cache line. The cache coherency protocol may invalidate the whole cache line on each write, leading to memory stalls and occupying the bus or the interconnect.
Detecting this issue is quite a daunting task, and involves inspecting the memory layout of the classes and analysing the CPU cache lines. After identifying that the application indeed suffers from false sharing.
Let’s assume we have a message-id resource which is incremented every time when it is accessed by a single writer thread. With single writer principle, one can expect to have some performance improvements for message-id number generation as there is not any lock release/acquire steps, which would have happen between multiple threads.
However modern processors  have multiple cores that include cache lines, these cache lines generally store 64 bytes, therefore a CPU can align a number of variables into the same cache line regardless of their frequency access. Each time a cache line is updated, it has to be invalided, flushed to the RAM and then synced with other core’s cache lines.
Unfortunately there isn't any native way to avoid sharing a cache line with other variables up to JDK7, this issue has been fixed in JDK8 with the @Contented annotation.
However there are still ways to ensure that a variable is not shared with other variables in the same cache line, one of them is to make the shared variable big enough to occupy whole cache line so that another variable cannot fit into it. That’s generally achieved by creating an array of 64byte size, which is a 8-size long array in Java, then the shared variable is stored in one of the array position.
Unordered items
Cache line alignment
Here is a fruit basket containing mixture of fruits, imagine how easy to load/unload just apples amongst them compared to load/unload a rack having each fruit in a different rack?
This is like variables sharing the same cache line. The more we have control how data is decomposed, the easier to improve application performance.

Monday, 12 August 2013

Integration Tests, Spring and Mocking

Sometimes ago, I emphasised how much integration tests were needed? This time I will show some tricks to build nice integration tests. This use case specific to Spring Framework, however can be applied to similar scenarios.
That trick is about using Spring Framework and Mockito to build integration tests and mock behaviours of the dependencies. The mocks are not data mocks but dependency mocks. Data mocks are a little bit different than dependency mocking as the previous one allows generating/injecting data for the mock on the wire, while the latter one mocks library dependencies.
Today’s software are getting more interacted and communication with a number of services and systems. For example a service could use a messaging server to receive and send messages to the interested parties through a messaging exchange. That strategy enables achieving abstracted systems. Similarly applications can be built with abstracted components so that an individual component can be tested in isolation while it is dependencies are mocked with different strategies. Testing individual components helps reducing some heavy weight functional tests and ensuring components are reliable to some degree.
Below code samples demonstrates building integration tests with Spring and Mockito, notice the code line mocking a spring bean.

Spring Factory & Mockito

This nice spring feature is a kind of hidden gem, maybe because of its a bit confusing "constructor-arg" naming. That parameter sounds like class constructor argument, however in reality it can be used for any method argument.

Spring Context

And here is the spring test context including main application context and abstracted test dependency. Separating spring context into multiple chunks helps building abstracted systems and allows plugging a mocked dependency into an integration test.

Monday, 5 August 2013

Scalable Services

Previously, I mentioned about micro service architecture and its features, This time I will mention about scaling services and try to explain design strategies that affect scalability of  services.
Nowadays every business wants to have easily scalable services. Because today’s business environment is more competitive and customers are more demanding, no one wants to wait for a long web page load, will use a slow mobile application. it’s also important to maintain customer retention policy, today’s customers have more choices compared to the past, so scalable high performing services are paramount to any business success.
By scalability, I mean scaling whole technology stack a service use. A truly scalable service architecture requires avoiding any kind of bottleneck whether it is in the application or database layer.
I will focus on scaling applications via partitioning service consumers. Easiest way of scaling services is making them data agnostic and have load balancers distributing traffic randomly between service instances so that each instance can serve a range of traffic received. We can see this strategy applied in Cassandra random partitioner and MongoDB hash based sharding.

scalable service
Scalable service

While that strategy works most of the time, there are occasions that a service needs to treat received traffic in a smarter way, which may require examining the received traffic data, before processing the request, for example a payment card processing system can distribute the received traffic amongst service instances according to card BIN numbers, which ensures a customer bank’s data goes into the same instance every time. That strategy is an application of reactor pattern at architecture level. A front controller or load balancer can examine the initial piece of the traffic data, then delegate work to a specific instance. Another example is that some service features could depend on customer’s geo-location for tax, jurisdiction purposes.
Reactor pattern, applied at architectural level, also helps to abstract specific traffic types and build specialized versions of service instances. For example once jurisdiction is abstracted, a service can have either a UK or AUS jurisdiction instance supplied during service provisioning. That strategy enables treating service traffic selectively rather than randomly hence yields control of the received traffic.

Micro Service Architecture

Micro service architecture is a paradigm that aims to build systems by decomposing business features into lightweight services. It is like applying SOLID principles at architectural level.

micro service architecture
Micro Services
Micro service architecture enhances classic SOA architecture by emphasising on one single responsibility. Conceptually micro services aren’t particularly difficult to delivery, but in practice some questions can raise.

Features of micro services

By definition a micro service is very small and has a small footprint. It takes a few minutes to complete a CI build including unit and integration tests. Micro services can be short lived (disposed once business doesn’t need it ) in a long-living software eco system. Micro services are likely not to like heavy frameworks, applications servers which could be a negative in some cases.
Another nice thing of these services is that they can be language agnostic, so a heterogeneous development team can build services with different languages provided service communication is abstracted into some forms. Moreover micro services can have multiple versions in production systems (that may bring additional work to DevOps).

Service communication

There isn’t a silver bullet for this one. It probably depends on role of these services, it’s high likely there are different communication paths amongst the services, say public network services could use HTTP, REST and JSON for data exchange, while internal services can leverage more efficient protocols, like Protobuf and may interact via message bus.

Service delivery

Micro services are relatively easier to develop and deliver to production systems given the development focus is on a one feature set. Moreover it is possible to have multi instance multi versioned services in production systems. These services can have multiple instances to handle high volume traffic, while multi instance deployment may require some design changes, for example instances can be made data agnostic, or each instance can handle specific traffic.(e.g. customer/location segmented etc.)
Having multiple instances and versions is not cost free, first of all it requires exposing versioned service interaction points, moreover deployment and provisioning will have some overhead,  however it is totally achievable to reduce overhead of DevOps works with change and configuration management systems. Infrastructures can be virtualized and service configuration be managed with tools like Chef and Puppet.

Sunday, 9 June 2013

Event loop and multi-threaded ordered execution

I wanted to write about real use case of less known design patterns and how they can help achieving a simplified design and high performance.
In message oriented business use cases, sometimes there is a need to process messages in the received order so that system is consistent according to the time.
For example if trader buys 10 lot of a share at 11:00:00 am and then wants to buy another 15 lot of the share at 11:00:01. Order management system has to process these orders in the received order to make sure trader’s account is updated correctly and system does check remaining funds, purchased shares in a right way.
Another similar use case is processing events of a football match. In a match there is a number of events from scoring a goal, yellow cards, red cards, stop times, free kicks, etc. In a betting portal all these events could be used as a betting market, a market operator may create a betting market for number of free kicks or when/how many yellow/red cards will happen.
A simple use case scenario gets complicated when number of trade orders/ betting market events are in high volume and there is business demand to settle these activities instantly.
A naive approach to process orders from a message queue is to create a multi-threaded executor which receives messages and dispatches the them to the underlying threads. By default a thread executor doesn’t have any logic of dispatching work items to the worker threads.
Let’s consider a trader places two trade orders M1, M4 and expects that system process these trades in the received order while another trader places M2 and M5 orders.
How to ensure trades will be processed in the received order in a multi-threaded environment ?
In the naive approach unfortunately there is not guarantee that thread2 will be executed after thread1 which processes M1.
Following test case and console output shows unordered message processing.
Test case

Unordered message processing
Not so good solution:
To solve this problem, there is a need to mark relevant messages together. For example adding a trader id field to the message as a correlation id helps identifying which messages are for a particular trader.
One approach to solve this problem is creating a map of correlation id and thread id, so that a customized thread executor can figure out how to distribute messages amongst worker threads.
Following code piece demonstrates that approach.

Map implementation can be a self-expiring map, like a cache, to ensure map doesn’t grow over the time.
However this solution introduces a complicated expiring map and how to determine expiration of map items.

A better solution:
The following picture illustrates a better  solution. This solution distributes messages having same correlation id to the same underlying thread. Once messages are assigned to the same thread, thread processes these messages in the received order.ordered_trade_processing
It is easier to use hash value of the correlation id and size of the thread executors to determine which executor should pick the message. In this solution it is assumed correlation is immutable once being assigned.
As hash value of the correlation id is fixed, same thread/executor can be assigned to the messages having same correlation id.
Following code piece shows how hash value is used to find index of a thread/executor.

Whole project can be found on github