ACID Transactions #
ACID transactions were added to the platform based on design document. This document will describe the current implementation.
Setting up transaction support #
In order to use ACID transactions a few modifications to the definition of configuration file passed to the Repository is needed.
Marking attribute as ACID #
entities {
gateway: {
attributes: {
status: {
scheme: bytes
transactional: all
manager: gateway-transaction-commit-log
}
...
}
}
}
attributeFamilies {
gateway-transaction-commit-log {
entity: _transaction
attributes: [ "request.*", "response.*", "state" ]
storage: "kafka://FIXME/topic"
type: primary
access: [ commit-log, state-commit-log, cached-view ]
}
transactions-commit {
entity: _transaction
attributes: [ "commit" ]
storage: "kafka://FIXME/transactions-commit"
type: primary
access: commit-log
}
}
An attribute needs to get transactional
field, which should be set to all
, currently. This might be extended in future, currently this implies global ACID consistency of the attribute. Another option defines configuration of transaction manager, namely up to three attribute families for attributes request.*
, response.*
and state
of entity _transaction
(defined in gateway-transaction-commit-log
above). The attribute family for transaction.state
MUST have access
[commit-log, state-commit-log, cached-view]
. There also MUST be a single attribute family (commit-log) for attribute commit
of _transaction
. This is the family transactions-commit
above.
A more scalable definition will define separate family for each purpose, e.g.:
entities {
gateway {
"user.*": {
scheme: bytes
transactional: all
manager: [ all-transaction-commit-log-request, all-transaction-commit-log-response, all-transaction-commit-log-state ]
}
...
}
}
attributeFamilies {
all-transaction-commit-log-request {
entity: _transaction
attributes: [ "request.*" ]
storage: "kafka://BROKERS/request-topic"
}
all-transaction-commit-log-response {
entity: _transaction
attributes: [ "response.*" ]
storage: "kafka://BROKERS/response-topic"
}
all-transaction-commit-log-state {
entity: _transaction
attributes: [ "state" ]
storage: "inmem://BROKERS/state-topic"
# access will be derived automatically
}
transactions-commit {
entity: _transaction
attributes: [ "commit" ]
storage: "kafka://BROKERS/transactions-commit"
type: primary
access: commit-log
}
}
Once this definition is in place, Proxima will use workflow described below to ensure ACID guarantees on read and writes on affected attributes.
Using transaction API #
Currently, ACID transactions have support only when using DirectDataOperator (either directly or by using wrappers, e.g. in BeamDataOperator with IO provided by direct).
The API is generally based on a Transaction object, which provides all the necessary communications between the transaction manager and other parts of the system. Let’s see a simple code which executes a transaction:
// create Repository from test resource
Repository repo =
Repository.ofTest(
ConfigFactory.parseResources("test-transactions.conf").resolve());
// create DirectDataOperator
DirectDataOperator direct = repo.getOrCreateOperator(DirectDataOperator.class);
// retrieve some entity and some attributes
// this would be done via compiled model in production environment
EntityDescriptor gateway = repo.getEntity("gateway");
// retrieve two fields
Regular<Integer> intField = Regular.of(gateway, gateway.getAttribute("intField"));
Wildcard<?> device = Wildcard.of(gateway, gateway.getAttribute("device.*"));
Optional<OnlineAttributeWriter> maybeWriter = direct.getWriter(intField);
Optional<RandomAccessReader> maybeReader = direct.getRandomAccess(device);
// sanity check
Preconditions.checkArgument(maybeWriter.isPresent());
Preconditions.checkArgument(maybeReader.isPresent());
// get reader and writer
OnlineAttributeWriter writer = maybeWriter.get();
RandomAccessReader reader = maybeReader.get();
// we use transactions, so we get transactional writer
Preconditions.checkState(writer.isTransactional());
String gatewayId = "gw1";
// need to loop, so we can restart transaction in case
// of inconsistency detected
while (true) {
// create transaction
try (Transaction transaction = writer.transactional().begin()) {
// read devices associated with gateway and store them to list
List<KeyValue<?>> kvs = new ArrayList<>();
reader.scanWildcard(gatewayId, device, kvs::add);
// notify the transaction manager of what we've read
transaction.update(KeyAttributes.ofWildcardQueryElements(gateway, gatewayId, device, kvs));
// write number of devices to the 'intField'
StreamElement upsert =
intField.upsert(
gatewayId, 1L /* will be replaced by the transaction coordinator */, kvs.size());
// commit and wait for confirmation
BlockingQueue<Pair<Boolean, Throwable>> result = new ArrayBlockingQueue<>(1);
transaction.commitWrite(
Collections.singletonList(upsert),
(succ, exc) -> {
result.offer(Pair.of(succ, exc));
});
// synchornous waiting, try to avoid this in production and process results
// asynchronously
Pair<Boolean, Throwable> taken = ExceptionUtils.uncheckedFactory(result::take);
if (!taken.getFirst()) {
// some error occurred
if (taken.getSecond() instanceof TransactionRejectedException) {
// transaction was rejected, need to restart it
continue;
}
// some other error
throw new IllegalStateException(taken.getSecond());
}
// success
break;
}
}
Transaction workflow #
The current architecture is illustrated on the following figure:
The transaction support needs definiton of 4 commit logs:
- request
- response
- commit
- state
Proxima provides serializable isolation level on transactions by assigning each transaction an incremental sequential ID. This is assigned automatically to all writes in the same transaction and sub-sequently is used to check if concurrently executed transaction has conflicts (and need to be rejected) or not.
The TransactionalOnlineAttributeWriter creates transaction after first call to #update. This results in sending a Request to transaction manager notifying it about new transaction with a set of inputs. The manager updates state of the transaction and returns response.
To ensure eventual consistency of the process (which is essential to ensure ACID guarantees even in case of failures), the manager must return responses via replication controller. This ensures consistency between returned response and state of the transaction.
After the client reads all the data (and for each read or set of reads call the #update method), the client computes outputs (according to its business logic) and call #commitWrite.
The write is (asynchronously) confirmed either as successful, or with a Throwable instance that was caught during processing. The important one is TransactionRejectedException which signals that the transaction run on data that violated the ACID properties and need to be re-executed.
Runtime components #
As mentioned above, the process of ensuring ACID transactions consists of two runtime components:
- transaction manager, which is generally a streaming data pipeline, which transforms
Requests to
Commits written to the
_transaction.commit
attribute. - replication controller, which accepts a special transformation transforming
Commit
to outputs, responses and state updates in eventually consistent manner.
Currently, both these parts have implementations in the direct module, the former in TransactionManagerServer and the latter in ReplicationController.
The direct version of transaction manager (TransactionManagerServer
) works by using CommitLogReader
to read Requests
and - by nature of the DirectDataOperator
- is single process. It is possible to have multiple instances of the server, but at a single moment, only one is active and all others are spare instances waiting for failover. In the future, more scalable versions of the manager should be implemented (e.g. using the
BeamDataOperator).