Spec-Zone .ru
спецификации, руководства, описания, API
|
Interface | Description |
---|---|
CloseableStream<T> |
A
CloseableStream is a Stream that can be closed. |
Collector<T,R> |
A reduction operation that
supports folding input elements into a cumulative result.
|
DoubleStream |
A sequence of primitive double elements supporting sequential and parallel
bulk operations.
|
IntStream |
A sequence of primitive integer elements supporting sequential and parallel
bulk operations.
|
LongStream |
A sequence of primitive long elements supporting sequential and parallel
bulk operations.
|
Stream<T> |
A sequence of elements supporting sequential and parallel bulk operations.
|
StreamBuilder<T> |
A mutable builder for a
Stream . |
StreamBuilder.OfDouble |
A mutable builder for a
DoubleStream . |
StreamBuilder.OfInt |
A mutable builder for an
IntStream . |
StreamBuilder.OfLong |
A mutable builder for a
LongStream . |
Class | Description |
---|---|
Collectors |
Implementations of
Collector that implement various useful reduction
operations, such as accumulating elements into collections, summarizing
elements according to various criteria, etc. |
DelegatingStream<T> |
A
Stream implementation that delegates operations to another Stream . |
StreamSupport |
Low-level utility methods for creating and manipulating streams.
|
Enum | Description |
---|---|
Collector.Characteristics |
Characteristics indicating properties of a
Collector , which can
be used to optimize reduction implementations. |
int sumOfWeights = blocks.stream().filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
Here we use blocks
, which might be a Collection
, as a source for a stream,
and then perform a filter-map-reduce (sum()
is an example of a reduction
operation) on the stream to obtain the sum of the weights of the red blocks.
The key abstraction used in this approach is Stream
, as well as its primitive
specializations IntStream
, LongStream
,
and DoubleStream
. Streams differ from Collections in several ways:
Stream
produces a new Stream
,
rather than removing elements from the underlying source.String
matching a pattern" need not examine all the input strings.) Stream operations
are divided into intermediate (Stream
-producing) operations and terminal (value-producing)
operations; all intermediate operations are lazy.limit(n)
or findFirst()
can allow computations on infinite streams
to complete in finite time.Streams are used to create pipelines of operations. A
complete stream pipeline has several components: a source (which may be a Collection
,
an array, a generator function, or an IO channel); zero or more intermediate operations
such as Stream.filter
or Stream.map
; and a terminal operation such
as Stream.forEach
or java.util.stream.Stream.reduce
. Stream operations may take as parameters
function values (which are often lambda expressions, but could be method references
or objects) which parameterize the behavior of the operation, such as a Predicate
passed to the Stream#filter
method.
Intermediate operations return a new Stream
. They are lazy; executing an
intermediate operation such as Stream.filter
does
not actually perform any filtering, instead creating a new Stream
that, when
traversed, contains the elements of the initial Stream
that match the
given Predicate
. Consuming elements from the stream source does not
begin until the terminal operation is executed.
Terminal operations consume the Stream
and produce a result or a side-effect.
After a terminal operation is performed, the stream can no longer be used and you must
return to the data source, or select a new data source, to get a new stream. For example,
obtaining the sum of weights of all red blocks, and then of all blue blocks, requires a
filter-map-reduce on two different streams:
int sumOfRedWeights = blocks.stream().filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
int sumOfBlueWeights = blocks.stream().filter(b -> b.getColor() == BLUE)
.mapToInt(b -> b.getWeight())
.sum();
However, there are other techniques that allow you to obtain both results in a single pass if multiple traversal is impractical or inefficient. TODO provide link
Intermediate stream operation (such as filter
or sorted
) always produce a
new Stream
, and are alwayslazy. Executing a lazy operations does not
trigger processing of the stream contents; all processing is deferred until the terminal
operation commences. Processing streams lazily allows for significant efficiencies; in a
pipeline such as the filter-map-sum example above, filtering, mapping, and addition can be
fused into a single pass, with minimal intermediate state. Laziness also enables us to avoid
examining all the data when it is not necessary; for operations such as "find the first
string longer than 1000 characters", one need not examine all the input strings, just enough
to find one that has the desired characteristics. (This behavior becomes even more important
when the input stream is infinite and not merely large.)
Intermediate operations are further divided into stateless and stateful
operations. Stateless operations retain no state from previously seen values when processing
a new value; examples of stateless intermediate operations include filter
and
map
. Stateful operations may incorporate state from previously seen elements in
processing new values; examples of stateful intermediate operations include distinct
and sorted
. Stateful operations may need to process the entire input before
producing a result; for example, one cannot produce any results from sorting a stream until
one has seen all elements of the stream. As a result, under parallel computation, some
pipelines containing stateful intermediate operations have to be executed in multiple passes.
Pipelines containing exclusively stateless intermediate operations can be processed in a
single pass, whether sequential or parallel.
Further, some operations are deemed short-circuiting operations. An intermediate
operation is short-circuiting if, when presented with infinite input, it may produce a
finite stream as a result. A terminal operation is short-circuiting if, when presented with
infinite input, it may terminate in finite time. (Having a short-circuiting operation is a
necessary, but not sufficient, condition for the processing of an infinite stream to
terminate normally in finite time.)
Terminal operations (such as forEach
or findFirst
) are always eager
(they execute completely before returning), and produce a non-Stream
result, such
as a primitive value or a Collection
, or have side-effects.
By recasting aggregate operations as a pipeline of operations on a stream of values, many
aggregate operations can be more easily parallelized. A Stream
can execute either
in serial or in parallel. When streams are created, they are either created as sequential
or parallel streams; the parallel-ness of streams can also be switched by the
Stream#sequential()
and BaseStream.parallel()
operations. The Stream
implementations in the JDK create serial streams unless
parallelism is explicitly requested. For example, Collection
has methods
Collection.stream()
and Collection.parallelStream()
,
which produce sequential and parallel streams respectively; other stream-bearing methods
such as java.util.stream.Streams#intRange(int, int)
produce sequential
streams but these can be efficiently parallelized by calling parallel()
on the
result. The set of operations on serial and parallel streams is identical. To execute the
"sum of weights of blocks" query in parallel, we would do:
int sumOfWeights = blocks.parallelStream().filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
The only difference between the serial and parallel versions of this example code is
the creation of the initial Stream
. Whether a Stream
will execute in serial
or parallel can be determined by the Stream#isParallel
method. When the terminal
operation is initiated, the entire stream pipeline is either executed sequentially or in
parallel, determined by the last operation that affected the stream's serial-parallel
orientation (which could be the stream source, or the sequential()
or
parallel()
methods.)
In order for the results of parallel operations to be deterministic and consistent with their serial equivalent, the function values passed into the various stream operations should be stateless.
Streams may or may not have an encounter order. An encounter
order specifies the order in which elements are provided by the stream to the
operations pipeline. Whether or not there is an encounter order depends on
the source, the intermediate operations, and the terminal operation.
Certain stream sources (such as List
or arrays) are intrinsically
ordered, whereas others (such as HashSet
) are not. Some intermediate
operations may impose an encounter order on an otherwise unordered stream,
such as Stream.sorted()
, and others may render an
ordered stream unordered (such as BaseStream.unordered()
).
Some terminal operations may ignore encounter order, such as
Stream.forEach(java.util.function.Consumer<? super T>)
.
If a Stream is ordered, most operations are constrained to operate on the
elements in their encounter order; if the source of a stream is a List
containing [1, 2, 3]
, then the result of executing map(x -> x*2)
must be [2, 4, 6]
. However, if the source has no defined encounter
order, than any of the six permutations of the values [2, 4, 6]
would
be a valid result. Many operations can still be efficiently parallelized even
under ordering constraints.
For sequential streams, ordering is only relevant to the determinism
of operations performed repeatedly on the same source. (An ArrayList
is constrained to iterate elements in order; a HashSet
is not, and
repeated iteration might produce a different order.)
For parallel streams, relaxing the ordering constraint can enable
optimized implementation for some operations. For example, duplicate
filtration on an ordered stream must completely process the first partition
before it can return any elements from a subsequent partition, even if those
elements are available earlier. On the other hand, without the constraint of
ordering, duplicate filtration can be done more efficiently by using
a shared ConcurrentHashSet
. There will be cases where the stream
is structurally ordered (the source is ordered and the intermediate
operations are order-preserving), but the user does not particularly care
about the encounter order. In some cases, explicitly de-ordering the stream
with the BaseStream.unordered()
method may result in
improved parallel performance for some stateful or terminal operations.
java.util.stream
package enables you to execute possibly-parallel
bulk-data operations over a variety of data sources, including even non-thread-safe
collections such as ArrayList
. This is possible only if we can
prevent interference with the data source during the execution of a
stream pipeline. (Execution begins when the terminal operation is invoked, and ends
when the terminal operation completes.) For most data sources, preventing interference
means ensuring that the data source is not modified at all during the execution
of the stream pipeline. (Some data sources, such as concurrent collections, are
specifically designed to handle concurrent modification.)
Accordingly, lambda expressions (or other objects implementing the appropriate functional interface) passed to stream methods should never modify the stream's data source. An implementation is said to interfere with the data source if it modifies, or causes to be modified, the stream's data source. The need for non-interference applies to all pipelines, not just parallel ones. Unless the stream source is concurrent, modifying a stream's data source during execution of a stream pipeline can cause exceptions, incorrect answers, or nonconformant results.
Further, results may be nondeterministic or incorrect if the lambda expressions passed to stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline. An example of a stateful lambda is:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
Here, if the mapping operation is performed in parallel, the results for the same input
could vary from run to run, due to thread scheduling differences, whereas, with a stateless
lambda expression the results would always be the same.
Of course, such operations can be readily implemented as simple sequential loops, as in:
int sum = 0;
for (int x : numbers) {
sum += x;
}
However, there may be a significant advantage to preferring a reduce operation
over a mutative accumulation such as the above -- a properly constructed reduce operation is
inherently parallelizable so long as the
reduction operaterator
has the right characteristics. Specifically the operator must be
associative. For example, given a
stream of numbers for which we want to find the sum, we can write:
int sum = numbers.reduce(0, (x,y) -> x+y);
or more succinctly:
int sum = numbers.reduce(0, Integer::sum);
(The primitive specializations of Stream
, such as
IntStream
, even have convenience methods for common reductions,
such as sum
and max
,
which are implemented as simple wrappers around reduce.)
Reduction parallellizes well since the implementation of reduce
can operate on
subsets of the stream in parallel, and then combine the intermediate results to get the final
correct answer. Even if you were to use a parallelizable form of the
forEach()
method
in place of the original for-each loop above, you would still have to provide thread-safe
updates to the shared accumulating variable sum
, and the required synchronization
would likely eliminate any performance gain from parallelism. Using a reduce
method
instead removes all of the burden of parallelizing the reduction operation, and the library
can provide an efficient parallel implementation with no additional synchronization needed.
The "blocks" examples shown earlier shows how reduction combines with other operations
to replace for loops with bulk operations. If blocks
is a collection of Block
objects, which have a getWeight
method, we can find the heaviest block with:
OptionalInt heaviest = blocks.stream()
.mapToInt(Block::getWeight)
.reduce(Integer::max);
In its more general form, a reduce
operation on elements of type <T>
yielding a result of type <U>
requires three parameters:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumlator,
BinaryOperator<U> combiner);
Here, the identity element is both an initial seed for the reduction, and a default
result if there are no elements. The accumulator function takes a partial result and
the next element, and produce a new partial result. The combiner function combines
the partial results of two accumulators to produce a new partial result, and eventually the
final result.
This form is a generalization of the two-argument form, and is also a generalization of
the map-reduce construct illustrated above. If we wanted to re-cast the simple sum
example using the more general form, 0
would be the identity element, while
Integer::sum
would be both the accumulator and combiner. For the sum-of-weights
example, this could be re-cast as:
int sumOfWeights = blocks.stream().reduce(0,
(sum, b) -> sum + b.getWeight())
Integer::sum);
though the map-reduce form is more readable and generally preferable. The generalized form
is provided for cases where significant work can be optimized away by combining mapping and
reducing into a single function.
More formally, the identity
value must be an identity for the combiner
function. This means that for all u
, combiner.apply(identity, u)
is equal
to u
. Additionally, the combiner
function must be
associative and must be compatible with the accumulator
function; for all u
and t
, the following must hold:
combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
Collection
or StringBuilder
,
as it processes the elements in the stream.
For example, if we wanted to take a stream of strings and concatenate them into a single long string, we could achieve this with ordinary reduction:
String concatenated = strings.reduce("", String::concat)
We would get the desired result, and it would even work in parallel. However, we might not
be happy about the performance! Such an implementation would do a great deal of string
copying, and the run time would be O(n^2) in the number of elements. A more
performant approach would be to accumulate the results into a StringBuilder
, which
is a mutable container for accumulating strings. We can use the same technique to
parallelize mutable reduction as we do with ordinary reduction.
The mutable reduction operation is called collect()
, as it
collects together the desired results into a result container such as StringBuilder
.
A collect
operation requires three things: a factory function which will construct
new instances of the result container, an accumulating function that will update a result
container by incorporating a new element, and a combining function that can take two
result containers and merge their contents. The form of this is very similar to the general
form of ordinary reduction:
<R> R collect(Supplier<R> resultFactory,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
As with reduce()
, the benefit of expressing collect
in this abstract way is
that it is directly amenable to parallelization: we can accumulate partial results in parallel
and then combine them. For example, to collect the String representations of the elements
in a stream into an ArrayList
, we could write the obvious sequential for-each form:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
Or we could use a parallelizable collect form:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
or, noting that we have buried a mapping operation inside the accumulator function, more
succinctly as:
ArrayList<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Here, our supplier is just the ArrayList constructor
, the
accumulator adds the stringified element to an ArrayList
, and the combiner simply
uses addAll
to copy the strings from one container into the other.
As with the regular reduction operation, the ability to parallelize only comes if an
associativity condition is met. The combiner
is associative
if for result containers r1
, r2
, and r3
:
combiner.accept(r1, r2);
combiner.accept(r1, r3);
is equivalent to
combiner.accept(r2, r3);
combiner.accept(r1, r2);
where equivalence means that r1
is left in the same state (according to the meaning
of equals
for the element types). Similarly, the resultFactory
must act as an identity with respect to the combiner
so that for any result
container r
:
combiner.accept(r, resultFactory.get());
does not modify the state of r
(again according to the meaning of
equals
). Finally, the accumulator
and combiner
must be
compatible such that for a result container r
and element t
:
r2 = resultFactory.get();
accumulator.accept(r2, t);
combiner.accept(r, r2);
is equivalent to:
accumulator.accept(r,t);
where equivalence means that r
is left in the same state (again according to the
meaning of equals
).
The three aspects of collect
: supplier, accumulator, and combiner, are often very
tightly coupled, and it is convenient to introduce the notion of a Collector
as
being an object that embodies all three aspects. There is a collect
method that simply takes a Collector
and returns the resulting container.
The above example for collecting strings into a List
can be rewritten using a
standard Collector
as:
ArrayList<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
Map
, such as:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
(where Collectors.groupingBy(java.util.function.Function<? super T, ? extends K>)
is a utility function
that returns a Collector
for grouping sets of elements based on some key)
it may actually be counterproductive to perform the operation in parallel.
This is because the combining step (merging one Map
into another by key)
can be expensive for some Map
implementations.
Suppose, however, that the result container used in this reduction
was a concurrently modifiable collection -- such as a
ConcurrentHashMap
. In that case,
the parallel invocations of the accumulator could actually deposit their results
concurrently into the same shared result container, eliminating the need for the combiner to
merge distinct result containers. This potentially provides a boost
to the parallel execution performance. We call this a concurrent reduction.
A Collector
that supports concurrent reduction is marked with the
Collector.Characteristics.CONCURRENT
characteristic.
Having a concurrent collector is a necessary condition for performing a
concurrent reduction, but that alone is not sufficient. If you imagine multiple
accumulators depositing results into a shared container, the order in which
results are deposited is non-deterministic. Consequently, a concurrent reduction
is only possible if ordering is not important for the stream being processed.
The Stream.collect(Collector)
implementation will only perform a concurrent reduction if
Collector.Characteristics.CONCURRENT
characteristic,
and;Collector.Characteristics.UNORDERED
characteristic.
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(where Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)
is the concurrent companion
to groupingBy
).
Note that if it is important that the elements for a given key appear in the order they appear in the source, then we cannot use a concurrent reduction, as ordering is one of the casualties of concurrent insertion. We would then be constrained to implement either a sequential reduction or a merge-based parallel reduction.
op
is associative if the following holds:
(a op b) op c == a op (b op c)
The importance of this to parallel evaluation can be seen if we expand this to four terms:
a op b op c op d == (a op b) op (c op d)
So we can evaluate (a op b)
in parallel with (c op d)
and then invoke op
on
the results.
TODO what does associative mean for mutative combining functions?
FIXME: we described mutative associativity above.
A pipeline is initially constructed from a spliterator (see Spliterator
) supplied by a stream source.
The spliterator covers elements of the source and provides element traversal operations
for a possibly-parallel computation. See methods on <code>Streams</code> for construction
of pipelines using spliterators.
A source may directly supply a spliterator. If so, the spliterator is traversed, split, or queried
for estimated size after, and never before, the terminal operation commences. It is strongly recommended
that the spliterator report a characteristic of IMMUTABLE
or CONCURRENT
, or be
late-binding and not bind to the elements it covers until traversed, split or queried for
estimated size.
If a source cannot directly supply a recommended spliterator then it may indirectly supply a spliterator
using a Supplier
. The spliterator is obtained from the supplier after, and never before, the terminal
operation of the stream pipeline commences.
Such requirements significantly reduce the scope of potential interference to the interval starting with the commencing of the terminal operation and ending with the producing a result or side-effect. See Non-Interference for more details. XXX - move the following to the non-interference section
A source can be modified before the terminal operation commences and those modifications will be reflected in
the covered elements. Afterwards, and depending on the properties of the source, further modifications
might not be reflected and the throwing of a ConcurrentModificationException
may occur.
For example, consider the following code:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(toStringJoiner(" ")).toString();
First a list is created consisting of two strings: "one"; and "two". Then a stream is created from that list.
Next the list is modified by adding a third string: "three". Finally the elements of the stream are collected
and joined together. Since the list was modified before the terminal collect
operation commenced
the result will be a string of "one two three". However, if the list is modified after the terminal operation
commences, as in:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
String s = sl.peek(s -> l.add("BAD LAMBDA")).collect(toStringJoiner(" ")).toString();
then a ConcurrentModificationException
will be thrown since the peek
operation will attempt
to add the string "BAD LAMBDA" to the list after the terminal operation has commenced.
For further API reference and developer documentation, see Java SE Documentation. That documentation contains more detailed, developer-targeted descriptions, with conceptual overviews, definitions of terms, workarounds, and working code examples.
Copyright © 1993, 2013, Oracle and/or its affiliates. All rights reserved.
DRAFT ea-b92