Handling concurrent processes it’s not specific to microservices, but microservices and distributed systems in general bring an additional complexity to the table, which is caused by the fact that multiple concurrent and distributed flows can run in isolation from each other. In this article I’m addressing this problem using an abstraction of the pessimistic concurrency model. The code used in this article can be found in my github repository.
To better understand certain concepts, let’s take an example of an imaginary web-shop and two of its processes, ordering an item, and deactivation of an item. And two business rules, one which prevents inventory items, part of an open order to be deactivated and another one that pprevents creating orders with deactivated items.
In traditional software systems (not distributed or its derivatives), especially the ones using a data centric architecture pattern, dealing with concurrency means dealing with concurrent data access, which means at a given point in time only one process can update the data, assuring that the data used by calculations are always in its latest state. For handling concurrent access in such systems there are three options, Last writer wins, Optimistic and Pessimistic concurrency strategies. Let’s see how these options can meet the above requirement and how they fit in a distributed environment.
Last writer wins
This approach allows update operations to proceed without verifying if any other application has updated the data since the application first read it.
This is basically the lack of a formal concurrency strategy and it is usually used where there is no likelihood that multiple users will access the same data. This is definitely not a solution to implement our business requirement related to orders and item deactivation.
Optimistic concurrency
As its name says, Optimistic concurrency considers the best scenario when handling concurrency, which means, the execution of the flow will try to proceed until completion, without any prior verification, assuming that conflict between concurrent flows will happen infrequently. The verification typically happens at the storage level, comparing RowVersions, TimeStamps or Etags.
In our case both flows should retrieve the item and its version when executing the above business logic, and when updating the status or the open orders of an item, verifies if the data has changed since the application last read that data.
When a conflict occurs the system will be able to detect based on the Version of the item and 2nd flow will have to either re-execute with refreshed data or execute a logic that compensates the changes already made, which can be done using transactions, since both processes point to the same database.
This approach has some advantages like, it doesn’t require locks, can serve multiple flows at once and does not affect performance. However what makes it less suitable for most distributed systems, is the fact that it requires a shared database or the fact that it allows concurrent flows to execute until a conflict occurs. Rolling back changes in a distributed system, implementing distributed transactions or designing idempotent flows, can be extremely difficult or might have a major impact over performance or service decoupling.
Pessimistic concurrency
In opposition to optimistic concurrency, Pessimistic concurrency assumes conflicts can happen often and blocks data records before applying any state change. This has the disadvantage of not allowing parallel execution of certain steps within a process and can potentially introduce a bottleneck, however it doesn’t require any compensation logic or idempotent flows, which in my opinion makes it the ideal candidate for Microservices.
Unlike in the data centric architecture model, in distributed systems, instead of locking a set of data, an entire business flow or a subset of it has to be locked.
To achieve this (keeping in mind that flows can execute in parallel isolated services), the concurrency solution needs three things:
1. A fast persistence layer shared between services only to store locks. In my implementations I used Redis or Azure Table Storage.
2. The presence of a lock key, this come from the business domain typically contain dynamic information about an entity or the process itself, built in such a way that concurrent flows will always produce the same key. In our example we have two isolated processes which might not even share the same database. In this case to determine the lock key the best to take the business requirements for this and extract dynamic values from there when designing the lock key with the Item id part of the key, representing the common element for both of the flows e.g. “LockNoOrderWithInactivetem|<ItemId>”.
3. A decision point in each flow where the lock will take place using the lock key. This can be part of a validation at the beginning of the concurrent process, or at the beginning of a subprocess, to reduce the impact over performance.
Now let’s see how does this look like, starting with the implementation of the PurchaseItem and DeactivateItem flows:
public async Task DeactivateItem(Guid itemId)
{
//Read open orders with the given item
//If there are open orders, end the execution
var lockKey = $"LockNoOrderWithInactivetem|{itemId:N}";
var unlockKey = $"DeactivateItem|{itemId:N}";
var lockDuraction = TimeSpan.FromSeconds(2);
if (!await _concurrencyLockService.TryLock(lockKey, unlockKey, lockDuraction))
{
_logger.LogWarning($"Cannot deactivate item {itemId} because of NoOrderWithInactivetem rule!");
return;
}
//deactivate item
_logger.LogInformation($"Item {itemId} deactivated.");
}
public async Task PurchaseItem(Guid itemId)
{
//Read the item from DB
//If the item is deactivated, end the execution
var lockKey = $"LockNoOrderWithInactivetem|{itemId:N}";
var unlockKey = $"PurchaseItem|{itemId:N}";
if (!await _concurrencyLockService.TryLock(lockKey, unlockKey))
{
_logger.LogWarning($"Cannot purchase item {itemId} because of NoOrderWithInactivetem rule!");
return;
}
//purchase item
_logger.LogInformation($"Item {itemId} purchased.");
}
Both methods are checking first the status of the item and open orders against the database at lines 3-4, 24-25. If the state of the date meets the business rules related to inactive items and open orders the execution continues by composing a lock and unlock keys, and optionally specifying a lock duration. The lock duration should be kept short, but must cover the execution of each flow (PurchaseItem/DeactivateItem) until the point where the data is persisted in the tables used for checking the business rules at lines 3-4, 24-25.
Both flows should produce the same LockKey at lines 6 and 27 in order to prevent concurrently breaking the business rules. If the implementation would end here, the business rules would always be enforced, however creating multiple orders with the same item in a short time would not be possible due to the locks, and this is not the desired case. Using an unlockKey solves the problem, as it allows flows that are not concurrent with each other to execute in parallel, if they produce the same unlock key.
This is why the PurchaseItem and DeactivateItem methods are producing a separate unlockKeys but identical lockKey for a given item.
The keys are then passed to the concurrencyLockService at lines 12 and 31, which returns the status of the lock. True means the lock succeeded, and the execution of the flow can continue.
The implementation of the TryLock method from ConcurrencyLockService backed with AzureTableStorage looks like the below:
public async Task<bool> TryLock(string lockKey, string unlockKey, TimeSpan? lockDuration = null)
{
lockDuration ??= DefaultLockDuration;
var lockSucceed = await _concurrencyLockRepository.AddLock(lockKey, unlockKey, DateTime.UtcNow + lockDuration.Value);
if (!lockSucceed)
_logger.LogWarning($"Concurrency lock failed for lockKey {lockKey}!");
return lockSucceed;
}
And the AddLock method of the ConcurrencyLockRepository:
public async Task<bool> AddLock(string lockKey, string unlockKey, DateTime lockExpiration)
{
var locks = await _tableClient.QueryAsync<ConcurrencyLock>(p => p.PartitionKey == "Lock" && p.RowKey == lockKey).ToListAsync();
var concurrencyLock = locks.FirstOrDefault();
if (concurrencyLock?.LockExpiration > DateTime.UtcNow && concurrencyLock.UnlockKey != unlockKey)
return false;
try
{
if (concurrencyLock != null)
{
concurrencyLock.LockExpiration = lockExpiration;
concurrencyLock.UnlockKey = unlockKey;
await _tableClient.UpdateEntityAsync(concurrencyLock, concurrencyLock.ETag);
}
else
{
concurrencyLock = new ConcurrencyLock { PartitionKey = "Lock", RowKey = lockKey, LockExpiration = lockExpiration, UnlockKey = unlockKey };
await _tableClient.AddEntityAsync(concurrencyLock);
}
return true;
}
catch (RequestFailedException ex) when (ex.Status == 409 || ex.Status == 412)
{
return await AddLock(lockKey, unlockKey, lockExpiration);
}
}
Above, at line 3, the existing locks are retrieved from the storage table filtered by the PartitionKey and lockKey as RowKey. Then at line 7 the code checks if there is a lock with the same lockKey with LockExpiration in the future and a different unlockKey, and if it exists, the lock will not take place, and false is returned.
When adding a new lock at line 16, an exception of type RequestFailedException with status 409 is thrown if in the meanwhile another lock is created.
Similarly, updating an existing lock could potentially result in a RequestFailedException with status code 412, if an update has been made on the same record wrestling in a different Etag.
This is actually an example of optimistic concurrency, as the execution runs up until the point where an exception is detected at the storage level. In both cases the AddLock method is invoked again, to execute the locking logic again, using the most recent state of the ConcurrencyLock data.
At this point there might be a bit of confusion about what concurrency pattern is used to solve the issue. At the beginning mentioned that locking data before executing logic is the characteristic of the Pessimistic concurrency pattern, while in the AddLock method of the ConcurrencyLockRepository the code is executed up until an exception occurs and compensatory logic takes place, which describes the Optimistic concurrency approach.
Now you may ask what concurrency pattern is used to implement our business rules described at the beginning of this article. It depends on the level of abstraction you are looking at the problem.
At the process level when calling the Trylock method before executing the deactivate item or purchase flows, It’s definitely an implementation of pessimistic concurrency.
At the data access level, however the Optimistic concurrency pattern is used, but this is more due to the implementation details of the repository itself, but it can vary from one implementation to another.
At the system level, due to our abstraction of the locking mechanism, the solution is still characterized by the positive and negative aspects of the Pessimistic concurrency, and that’s what matters.
In order to reduce the performance impact of locking, a good solution is to split a process into smaller sub processes using a chain of sub processes, which has an excellent fit with Event Driven Architecture.
Another improvement of the TryLock and AddLock methods is to accept a list of lock- and unlock keys, and persist them in transactions (this is supported by the Table Api).
I hope, with the above example I managed to highlight the challenges and a solution for handling concurrent processes in distributed systems such as Microservices, by implementing the Pessimistic concurrency strategy using a simple abstraction that requires lock- and unlock key(s) to make sure, from multiple concurrent processes only one will pass a certain decision point at a time, while non-concurrent processes are still able to run parallel.