Beam data operator #
BeamDataOperator is data operator that exposes data from data model via Apache Beam.
Instantiate the operator #
The operator can be instantiated by adding proxima-beam-core
dependency and using
Repository repo = Repository.of(ConfigFactory.load("test-readme.conf").resolve());
BeamDataOperator beam = repo.getOrCreateOperator(BeamDataOperator.class);
If we also compile our data model, we can create the wrapping class via
Model model = Model.wrap(repo);
Once the operator is instantiated, we can use it for both reading and writing data to the platform.
Writing Pipelines using BeamDataOperator #
Apache Beam is a data processing API that builds around a concept of a
Pipeline. BeamDataOperator
acts on this Pipeline
to create PCollection
(s):
Pipeline p = Pipeline.create();
PCollection<StreamElement> events =
beam.getStream(
p,
Position.OLDEST,
false, /* stopAtCurrent */
true, /* useEventTime */
model.getEvent().getDataDescriptor());
PCollection<BaseEvent> parsed =
events.apply(
FlatMapElements.into(TypeDescriptor.of(BaseEvent.class))
.via(
e ->
model.getEvent().getDataDescriptor().valueOf(e).stream()
.collect(Collectors.toList())));
// further processing
// ...
// run the Pipeline
p.run().waitUntilFinish();
ProximaIO for data sink #
When writing data to the Platform, we first need to convert any data to a PCollection<StreamElement>
and then apply ProximaIO
to the PCollection
to store the data.
Pipeline p = Pipeline.create();
PCollection<StreamElement> elements =
beam.getStream(
"InputEvents",
p,
Position.CURRENT,
/* stopAtCurrent */ false,
/* useEventTime */ true,
model.getEvent().getDataDescriptor());
// implement actual transformation logic here
// convert the outputs to StreamElements
PCollection<StreamElement> outputs =
elements
.apply(
FlatMapElements.into(TypeDescriptor.of(BaseEvent.class))
.via(
e ->
model.getEvent().getDataDescriptor().valueOf(e).stream()
.collect(Collectors.toList())))
.apply(Filter.by(e -> e.getAction().equals(Action.BUY)))
.apply(new ProcessUserBuys());
// store the result
outputs.apply(ProximaIO.write(repo.asFactory()));
// run the Pipeline
p.run().waitUntilFinish();
The class ProcessUserBuys
is a
PTransform which processes input events and produces the ouptuts. This carries the user logic.
Usage of DirectDataOperator in BeamDataOperator #
BeamDataOperator
have two options how to read data from the platform:
- implement own IO module (e.g.
proxima-beam-io-pubsub
) or - use
DirectDataOperator
for actual data access
The latter option is more common, though the former method might have a slightly better performance. There is not many native IOs to Proxima, though, so most Pipelines make use of proxima-direct-core
module to do the actual reading. This approach also guarantees the same reading semantics of both operators (because one makes use of the other).
End-to-end testing of Pipelines #
When using an in-process runner (e.g. DirectRunner), one can test a Pipeline from end to end using the following approach:
Pipeline p = Pipeline.create();
OnlineAttributeWriter eventWriter =
Optionals.get(direct.getWriter(model.getEvent().getDataDescriptor()));
long now = System.currentTimeMillis();
writeEvent("user1", now, eventWriter);
writeEvent("user2", now, eventWriter);
writeEvent("user1", now + 1, eventWriter);
PCollection<StreamElement> inputs =
beam.getStream(p, Position.OLDEST, true, true, model.getEvent().getDataDescriptor());
// apply transformation of inputs to output StreamElements
PCollection<StreamElement> outputs = inputs.apply(new Transformation(model));
// store results
outputs.apply(ProximaIO.write(repo.asFactory()));
// run and wait
p.run().waitUntilFinish();
// when the Pipeline finishes, we should have writes in 'user1' and 'user2'
// verify results
RandomAccessReader reader =
Optionals.get(direct.getRandomAccess(model.getUser().getPreferencesDescriptor()));
List<StreamElement> read = new ArrayList<>();
Optional<KeyValue<UserPreferences>> user1Preferences =
reader.get("user1", model.getUser().getPreferencesDescriptor());
Optional<KeyValue<UserPreferences>> user2Preferences =
reader.get("user2", model.getUser().getPreferencesDescriptor());
// we stored both the preferences
assertTrue(user1Preferences.isPresent());
assertTrue(user2Preferences.isPresent());