Package cz.o2.proxima.tools.groovy
Interface Stream<T>
-
- All Known Subinterfaces:
WindowedStream<T>
@CompileStatic public interface Stream<T>
A stream abstraction with fluent style methods.
-
-
Method Summary
All Methods Instance Methods Abstract Methods Default Methods Modifier and Type Method Description default Stream<T>
assignEventTime(groovy.lang.Closure<java.lang.Long> assigner)
Assign event time to elements.Stream<T>
assignEventTime(java.lang.String name, groovy.lang.Closure<java.lang.Long> assigner)
Assign event time to elements.<V> Stream<StreamElement>
asStreamElements(RepositoryProvider repoProvider, EntityDescriptor entity, groovy.lang.Closure<java.lang.CharSequence> keyExtractor, groovy.lang.Closure<java.lang.CharSequence> attributeExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<java.lang.Long> timeExtractor)
Convert elements toStreamElement
s.default Stream<T>
asUnbounded()
Process this stream as it was unbounded stream.WindowedStream<T>
calendarWindow(java.lang.String window, int count, java.util.TimeZone timeZone)
Create calendar-based windowed stream.java.util.List<T>
collect()
Collect stream as list.default Stream<T>
filter(groovy.lang.Closure<java.lang.Boolean> predicate)
Filter stream based on predicateStream<T>
filter(java.lang.String name, groovy.lang.Closure<java.lang.Boolean> predicate)
Filter stream based on predicatedefault <X> Stream<X>
flatMap(groovy.lang.Closure<java.lang.Iterable<X>> mapper)
Remap the stream.<X> Stream<X>
flatMap(java.lang.String name, groovy.lang.Closure<java.lang.Iterable<X>> mapper)
Remap the stream.default <K,V>
Stream<Pair<K,V>>integratePerKey(groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<V> initialValue, groovy.lang.Closure<V> combiner)
Transform this stream to another stream by applying combining transform in global window emitting results after each element added.<K,V>
Stream<Pair<K,V>>integratePerKey(java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<V> initialValue, groovy.lang.Closure<V> combiner)
Transform this stream to another stream by applying combining transform in global window emitting results after each element added.boolean
isBounded()
Test if this is bounded stream.default <X> Stream<X>
map(groovy.lang.Closure<X> mapper)
Remap the stream.<X> Stream<X>
map(java.lang.String name, groovy.lang.Closure<X> mapper)
Remap the stream.<V> void
persist(RepositoryProvider repoProvider, EntityDescriptor entity, groovy.lang.Closure<java.lang.CharSequence> keyExtractor, groovy.lang.Closure<java.lang.CharSequence> attributeExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<java.lang.Long> timeExtractor)
Persist this stream as attribute of entitydefault void
persistIntoTargetFamily(RepositoryProvider repoProvider, java.lang.String targetFamilyname)
Persist this stream to specific family.void
persistIntoTargetFamily(RepositoryProvider repoProvider, java.lang.String targetFamilyname, int parallelism)
Persist this stream to specific family.void
persistIntoTargetReplica(RepositoryProvider repoProvider, java.lang.String replicationName, java.lang.String target)
Persist this stream to replication.void
print()
Print all elements to console.default <K,S,V,O>
Stream<Pair<K,O>>reduceValueStateByKey(groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing.default <K,S,V,O>
Stream<Pair<K,O>>reduceValueStateByKey(java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing.<K,S,V,O>
Stream<Pair<K,O>>reduceValueStateByKey(java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate, boolean sorted)
Transform this stream using stateful processing.default <K,S,V,O>
Stream<Pair<K,O>>reduceValueStateByKeyUnsorted(groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing without time-sorting.default <K,S,V,O>
Stream<Pair<K,O>>reduceValueStateByKeyUnsorted(java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing without time-sorting.default Stream<T>
reshuffle()
Reshuffle the stream via random key.Stream<T>
reshuffle(java.lang.String name)
Reshuffle the stream via random key.<K> WindowedStream<Pair<K,T>>
sessionWindow(groovy.lang.Closure<K> keyExtractor, long gapDuration)
Create session windowed stream.WindowedStream<T>
timeSlidingWindow(long millis, long slide)
Create sliding time windowed stream.WindowedStream<T>
timeWindow(long millis)
Create time windowed stream.default Stream<T>
union(Stream<T> other)
Merge two streams together.default Stream<T>
union(java.lang.String name, Stream<T> other)
Merge two streams together.Stream<T>
union(java.lang.String name, java.util.List<Stream<T>> streams)
Merge multiple streams together.default Stream<T>
union(java.util.List<Stream<T>> streams)
Merge multiple streams together.WindowedStream<T>
windowAll()
Group all elements into single window.default Stream<Pair<T,java.lang.Long>>
withTimestamp()
Add timestamp to each element in the stream.Stream<Pair<T,java.lang.Long>>
withTimestamp(java.lang.String name)
Add timestamp to each element in the stream.default Stream<Pair<java.lang.Object,T>>
withWindow()
Add window to each element in the stream.Stream<Pair<java.lang.Object,T>>
withWindow(java.lang.String name)
Add window to each element in the stream.void
write(RepositoryProvider repoProvider)
Directly write this stream to repository.
-
-
-
Method Detail
-
flatMap
default <X> Stream<X> flatMap(groovy.lang.Closure<java.lang.Iterable<X>> mapper)
Remap the stream.- Type Parameters:
X
- type parameter- Parameters:
mapper
- mapper returning iterable of values to be flattened into output- Returns:
- the remapped stream
-
flatMap
<X> Stream<X> flatMap(@Nullable java.lang.String name, groovy.lang.Closure<java.lang.Iterable<X>> mapper)
Remap the stream.- Type Parameters:
X
- type parameter- Parameters:
name
- name of the operationmapper
- mapper returning iterable of values to be flattened into output- Returns:
- the remapped stream
-
map
default <X> Stream<X> map(groovy.lang.Closure<X> mapper)
Remap the stream.- Type Parameters:
X
- type parameter- Parameters:
mapper
- the mapping closure- Returns:
- remapped stream
-
map
<X> Stream<X> map(@Nullable java.lang.String name, groovy.lang.Closure<X> mapper)
Remap the stream.- Type Parameters:
X
- type parameter- Parameters:
name
- stable name of the mapping operatormapper
- the mapping closure- Returns:
- remapped stream
-
filter
default Stream<T> filter(groovy.lang.Closure<java.lang.Boolean> predicate)
Filter stream based on predicate- Parameters:
predicate
- the predicate to filter on- Returns:
- filtered stream
-
filter
Stream<T> filter(@Nullable java.lang.String name, groovy.lang.Closure<java.lang.Boolean> predicate)
Filter stream based on predicate- Parameters:
name
- name of the filter operatorpredicate
- the predicate to filter on- Returns:
- filtered stream
-
assignEventTime
default Stream<T> assignEventTime(groovy.lang.Closure<java.lang.Long> assigner)
Assign event time to elements.- Parameters:
assigner
- assigner of event time- Returns:
- stream with elements assigned event time
-
assignEventTime
Stream<T> assignEventTime(@Nullable java.lang.String name, groovy.lang.Closure<java.lang.Long> assigner)
Assign event time to elements.- Parameters:
name
- name of the assign event time operatorassigner
- assigner of event time- Returns:
- stream with elements assigned event time
-
withWindow
default Stream<Pair<java.lang.Object,T>> withWindow()
Add window to each element in the stream.- Returns:
- stream of pairs with window
-
withWindow
Stream<Pair<java.lang.Object,T>> withWindow(@Nullable java.lang.String name)
Add window to each element in the stream.- Parameters:
name
- stable name of the mapping operator- Returns:
- stream of pairs with window
-
withTimestamp
default Stream<Pair<T,java.lang.Long>> withTimestamp()
Add timestamp to each element in the stream.- Returns:
- stream of pairs with timestamp
-
withTimestamp
Stream<Pair<T,java.lang.Long>> withTimestamp(@Nullable java.lang.String name)
Add timestamp to each element in the stream.- Parameters:
name
- stable name of mapping operator- Returns:
- stream of pairs with timestamp
-
print
void print()
Print all elements to console.
-
collect
java.util.List<T> collect()
Collect stream as list. Note that this will result on OOME if this is unbounded stream.- Returns:
- the stream collected as list.
-
isBounded
boolean isBounded()
Test if this is bounded stream.- Returns:
true
if this is bounded stream,false
otherwise
-
asUnbounded
default Stream<T> asUnbounded()
Process this stream as it was unbounded stream.This is a no-op if
isBounded()
returnsfalse
, otherwise it turns the stream into being processed as unbounded, although being bounded.This is an optional operation and might be ignored if not supported by underlying implementation.
- Returns:
- Stream viewed as unbounded stream, if supported
-
asStreamElements
<V> Stream<StreamElement> asStreamElements(RepositoryProvider repoProvider, EntityDescriptor entity, groovy.lang.Closure<java.lang.CharSequence> keyExtractor, groovy.lang.Closure<java.lang.CharSequence> attributeExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<java.lang.Long> timeExtractor)
Convert elements toStreamElement
s.- Type Parameters:
V
- type of value- Parameters:
repoProvider
- provider ofRepository
entity
- the entity of elementskeyExtractor
- extractor of keysattributeExtractor
- extractor of attributesvalueExtractor
- extractor of valuestimeExtractor
- extractor of time- Returns:
- stream with
StreamElement
s inside
-
persistIntoTargetReplica
void persistIntoTargetReplica(RepositoryProvider repoProvider, java.lang.String replicationName, java.lang.String target)
Persist this stream to replication.- Parameters:
repoProvider
- provider ofRepository
.replicationName
- name of replication to persist stream totarget
- target of the replication
-
persistIntoTargetFamily
default void persistIntoTargetFamily(RepositoryProvider repoProvider, java.lang.String targetFamilyname)
Persist this stream to specific family.Note that the type of the stream has to be already
StreamElements
to be persisted to the specified family. The family has to accept givenAttributeDescriptor
of theStreamElement
.- Parameters:
repoProvider
- provider ofRepository
.targetFamilyname
- name of target family to persist the stream into
-
persistIntoTargetFamily
void persistIntoTargetFamily(RepositoryProvider repoProvider, java.lang.String targetFamilyname, int parallelism)
Persist this stream to specific family.Note that the type of the stream has to be already
StreamElements
to be persisted to the specified family. The family has to accept givenAttributeDescriptor
of theStreamElement
.- Parameters:
repoProvider
- provider ofRepository
.targetFamilyname
- name of target family to persist the stream intoparallelism
- parallelism to use when target family is bulk attribute family
-
persist
<V> void persist(RepositoryProvider repoProvider, EntityDescriptor entity, groovy.lang.Closure<java.lang.CharSequence> keyExtractor, groovy.lang.Closure<java.lang.CharSequence> attributeExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<java.lang.Long> timeExtractor)
Persist this stream as attribute of entity- Type Parameters:
V
- type of value extracted- Parameters:
repoProvider
- provider of repositoryentity
- the entity to store the stream tokeyExtractor
- extractor of key for elementsattributeExtractor
- extractor for attribute for elementsvalueExtractor
- extractor of values for elementstimeExtractor
- extractor of event time
-
write
void write(RepositoryProvider repoProvider)
Directly write this stream to repository. Note that the stream has to containStreamElement
s (e.g. created byasStreamElements(cz.o2.proxima.tools.groovy.RepositoryProvider, cz.o2.proxima.core.repository.EntityDescriptor, groovy.lang.Closure<java.lang.CharSequence>, groovy.lang.Closure<java.lang.CharSequence>, groovy.lang.Closure<V>, groovy.lang.Closure<java.lang.Long>)
.- Parameters:
repoProvider
- provider of repository
-
timeWindow
WindowedStream<T> timeWindow(long millis)
Create time windowed stream.- Parameters:
millis
- duration of tumbling window- Returns:
- time windowed stream
-
timeSlidingWindow
WindowedStream<T> timeSlidingWindow(long millis, long slide)
Create sliding time windowed stream.- Parameters:
millis
- duration of the windowslide
- duration of the slide- Returns:
- sliding time windowed stream
-
sessionWindow
<K> WindowedStream<Pair<K,T>> sessionWindow(groovy.lang.Closure<K> keyExtractor, long gapDuration)
Create session windowed stream.- Type Parameters:
K
- type of key- Parameters:
keyExtractor
- extractor of keygapDuration
- duration of the gap between elements per key- Returns:
- session windowed stream
-
calendarWindow
WindowedStream<T> calendarWindow(java.lang.String window, int count, java.util.TimeZone timeZone)
Create calendar-based windowed stream.- Parameters:
window
- the resolution of the calendar window ("days", "weeks", "months", "years")count
- number of days, weeks, months, yearstimeZone
- time zone of the calculation- Returns:
- calendar windowed stream
-
windowAll
WindowedStream<T> windowAll()
Group all elements into single window.- Returns:
- globally windowed stream.
-
union
default Stream<T> union(Stream<T> other)
Merge two streams together.- Parameters:
other
- the other stream(s)- Returns:
- merged stream
-
union
default Stream<T> union(@Nullable java.lang.String name, Stream<T> other)
Merge two streams together.- Parameters:
name
- name of the union operatorother
- the other stream(s)- Returns:
- merged stream
-
union
default Stream<T> union(java.util.List<Stream<T>> streams)
Merge multiple streams together.- Parameters:
streams
- other streams- Returns:
- merged stream
-
union
Stream<T> union(@Nullable java.lang.String name, java.util.List<Stream<T>> streams)
Merge multiple streams together.- Parameters:
name
- name of the union operatorstreams
- other streams- Returns:
- merged stream
-
reduceValueStateByKey
default <K,S,V,O> Stream<Pair<K,O>> reduceValueStateByKey(groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing.- Type Parameters:
K
- type of keyS
- type of value stateV
- type of intermediate valueO
- type of output value- Parameters:
keyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialState
- closure providing initial state value for keyoutputFn
- function for outputting values (when function returnsnull
the output is discardedstateUpdate
- update (accumulation) function for the state the output is discarded- Returns:
- the statefully reduced stream
-
reduceValueStateByKeyUnsorted
default <K,S,V,O> Stream<Pair<K,O>> reduceValueStateByKeyUnsorted(groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing without time-sorting.- Type Parameters:
K
- type of keyS
- type of value stateV
- type of intermediate valueO
- type of output value- Parameters:
keyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialState
- closure providing initial state value for keyoutputFn
- function for outputting values (when function returnsnull
the output is discardedstateUpdate
- update (accumulation) function for the state the output is discarded- Returns:
- the statefully reduced stream
-
reduceValueStateByKey
default <K,S,V,O> Stream<Pair<K,O>> reduceValueStateByKey(@Nullable java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing.- Type Parameters:
K
- type of keyS
- type of value stateV
- type of intermediate valueO
- type of output value- Parameters:
name
- optional name of the stateful operationkeyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialState
- closure providing initial state value for keystateUpdate
- update (accumulation) function for the stateoutputFn
- function for outputting values (when function returnsnull
the output is discarded- Returns:
- the statefully reduced stream
-
reduceValueStateByKeyUnsorted
default <K,S,V,O> Stream<Pair<K,O>> reduceValueStateByKeyUnsorted(@Nullable java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate)
Transform this stream using stateful processing without time-sorting.- Type Parameters:
K
- type of keyS
- type of value stateV
- type of intermediate valueO
- type of output value- Parameters:
name
- optional name of the stateful operationkeyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialState
- closure providing initial state value for keystateUpdate
- update (accumulation) function for the stateoutputFn
- function for outputting values (when function returnsnull
the output is discarded- Returns:
- the statefully reduced stream
-
reduceValueStateByKey
<K,S,V,O> Stream<Pair<K,O>> reduceValueStateByKey(@Nullable java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<S> initialState, groovy.lang.Closure<O> outputFn, groovy.lang.Closure<S> stateUpdate, boolean sorted)
Transform this stream using stateful processing.- Type Parameters:
K
- type of keyS
- type of value stateV
- type of intermediate valueO
- type of output value- Parameters:
name
- optional name of the stateful operationkeyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialState
- closure providing initial state value for keystateUpdate
- update (accumulation) function for the stateoutputFn
- function for outputting values (when function returnsnull
the output is discardedsorted
-true
if the input to the state update function should be time-sorted- Returns:
- the statefully reduced stream
-
integratePerKey
default <K,V> Stream<Pair<K,V>> integratePerKey(groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<V> initialValue, groovy.lang.Closure<V> combiner)
Transform this stream to another stream by applying combining transform in global window emitting results after each element added. That means that the following holds: * the new stream will have exactly the same number of elements as the original stream minus late elements dropped * streaming semantics need to define allowed lateness, which will incur real time processing delay * batch semantics use sort per key- Type Parameters:
K
- key typeV
- value type- Parameters:
keyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialValue
- closure providing initial value of state for keycombiner
- combiner of values to final value- Returns:
- the integrated stream
-
integratePerKey
<K,V> Stream<Pair<K,V>> integratePerKey(@Nullable java.lang.String name, groovy.lang.Closure<K> keyExtractor, groovy.lang.Closure<V> valueExtractor, groovy.lang.Closure<V> initialValue, groovy.lang.Closure<V> combiner)
Transform this stream to another stream by applying combining transform in global window emitting results after each element added. That means that the following holds: * the new stream will have exactly the same number of elements as the original stream minus late elements dropped * streaming semantics need to define allowed lateness, which will incur real time processing delay * batch semantics use sort per key- Type Parameters:
K
- key typeV
- value type- Parameters:
name
- optional name of the transformkeyExtractor
- extractor of keyvalueExtractor
- extractor of valueinitialValue
- closure providing initial value of state for keycombiner
- combiner of values to final value- Returns:
- the integrated stream
-
-