두 번 처리 할 수 있도록 Java 8 스트림을 복제하고 싶습니다. 나는 collect
목록으로 할 수 있고 그로부터 새로운 스트림을 얻을 수 있습니다 .
// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff
하지만 좀 더 효율적이고 우아한 방법이 있어야한다고 생각합니다.
컬렉션으로 변환하지 않고 스트림을 복사하는 방법이 있습니까?
나는 실제로 Either
s 스트림으로 작업하고 있으므로 오른쪽 투영으로 이동하고 다른 방식으로 처리하기 전에 왼쪽 투영을 한 방향으로 처리하고 싶습니다. 이런 종류의 (지금까지는 toList
트릭 을 사용해야합니다 ).
List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());
Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );
Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );
답변
효율성에 대한 귀하의 가정은 다소 거꾸로 생각됩니다. 데이터를 저장할 필요가 없기 때문에 데이터를 한 번만 사용하려는 경우 이러한 엄청난 효율성을 얻을 수 있으며, 스트림은 파이프 라인을 통해 전체 데이터를 효율적으로 흐르게하는 강력한 “루프 융합”최적화를 제공합니다.
동일한 데이터를 재사용하려면 정의에 따라 두 번 (결정적) 생성하거나 저장해야합니다. 이미 컬렉션에있는 경우 좋습니다. 두 번 반복하면 저렴합니다.
우리는 “포크 스트림”으로 디자인을 실험했습니다. 우리가 발견 한 것은이를 지원하는 데 실제 비용이 든다는 것입니다. 그것은 흔하지 않은 경우를 희생하여 일반적인 경우 (한 번 사용)를 부담했습니다. 큰 문제는 “두 파이프 라인이 동일한 속도로 데이터를 소비하지 않을 때 발생하는 일”을 처리하는 것이 었습니다. 이제 어쨌든 버퍼링으로 돌아갑니다. 이것은 분명히 그 무게를 지니지 않은 기능이었습니다.
동일한 데이터에 대해 반복적으로 작업하려면 데이터를 저장하거나 작업을 소비자로 구성하고 다음을 수행합니다.
stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });
처리 모델이 이러한 종류의 “스트림 포크”에 더 적합하기 때문에 RxJava 라이브러리를 살펴볼 수도 있습니다.
답변
와 함께 로컬 변수를 사용 Supplier
하여 스트림 파이프 라인의 공통 부분을 설정할 수 있습니다.
에서 http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :
스트림 재사용
Java 8 스트림은 재사용 할 수 없습니다. 터미널 작업을 호출하자마자 스트림이 닫힙니다.
Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception Calling `noneMatch` after `anyMatch` on the same stream results in the following exception: java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)
이 한계를 극복하려면 실행하려는 모든 터미널 작업에 대해 새 스트림 체인을 만들어야합니다. 예를 들어 모든 중간 작업이 이미 설정된 새 스트림을 생성하는 스트림 공급자를 만들 수 있습니다.
Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok
에 대한 각 호출
get()
은 원하는 터미널 작업을 호출하기 위해 저장되는 새 스트림 을 생성합니다.
답변
를 사용하여 Supplier
각 종료 작업에 대한 스트림을 생성합니다.
Supplier<Stream<Integer>> streamSupplier = () -> list.stream();
해당 컬렉션의 스트림이 필요할 때마다을 사용 streamSupplier.get()
하여 새 스트림을 가져옵니다.
예 :
streamSupplier.get().anyMatch(predicate);
streamSupplier.get().allMatch(predicate2);
답변
우리는 구현했습니다 duplicate()
에서 스트림을위한 방법을 jOOλ , 우리가에 대한 통합 테스트를 개선하기 위해 만든 오픈 소스 라이브러리 jOOQ을 . 기본적으로 다음과 같이 작성할 수 있습니다.
Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();
내부적으로는 한 스트림에서 소비되었지만 다른 스트림에서는 소비되지 않은 모든 값을 저장하는 버퍼가 있습니다. 두 스트림이 거의 동일한 속도로 소비되고 thread-safety 부족으로 살 수 있다면 그것은 아마도 효율적일 것입니다 .
알고리즘 작동 방식은 다음과 같습니다.
static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
final List<T> gap = new LinkedList<>();
final Iterator<T> it = stream.iterator();
@SuppressWarnings("unchecked")
final Iterator<T>[] ahead = new Iterator[] { null };
class Duplicate implements Iterator<T> {
@Override
public boolean hasNext() {
if (ahead[0] == null || ahead[0] == this)
return it.hasNext();
return !gap.isEmpty();
}
@Override
public T next() {
if (ahead[0] == null)
ahead[0] = this;
if (ahead[0] == this) {
T value = it.next();
gap.offer(value);
return value;
}
return gap.poll();
}
}
return tuple(seq(new Duplicate()), seq(new Duplicate()));
}
Tuple2
아마 당신처럼 Pair
반면, 유형 Seq
입니다 Stream
몇 가지 향상된 기능.
답변
실행 가능한 스트림을 만들 수 있습니다 (예 :).
results.stream()
.flatMap(either -> Stream.<Runnable> of(
() -> failure(either.left()),
() -> success(either.right())))
.forEach(Runnable::run);
적용 할 작업은 어디에 failure
있고 있습니까 success
? 그러나 이것은 꽤 많은 임시 객체를 생성하고 컬렉션에서 시작하여 두 번 스트리밍 / 반복하는 것보다 더 효율적이지 않을 수 있습니다.
답변
요소를 여러 번 처리하는 또 다른 방법은 Stream.peek (Consumer) 를 사용하는 것입니다 .
doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));
peek(Consumer)
필요한만큼 여러 번 연결할 수 있습니다.
doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));
답변
내가 기여한 라이브러리 인 cyclops-react 에는 Stream을 복제 할 수있는 정적 메서드가 있습니다 (그리고 jOOλ Tuple of Streams 반환).
Stream<Integer> stream = Stream.of(1,2,3);
Tuple2<Stream<Integer>,Stream<Integer>> streams = StreamUtils.duplicate(stream);
주석을 참조하십시오. 기존 스트림에서 중복을 사용할 때 발생하는 성능 저하가 있습니다. 더 성능이 좋은 대안은 Streamable을 사용하는 것입니다.
Stream, Iterable 또는 Array에서 구성하고 여러 번 재생할 수있는 (lazy) Streamable 클래스도 있습니다.
Streamable<Integer> streamable = Streamable.of(1,2,3);
streamable.stream().forEach(System.out::println);
streamable.stream().forEach(System.out::println);
AsStreamable.synchronizedFromStream (stream)-스레드간에 공유 할 수있는 방식으로 백업 컬렉션을 느리게 채울 Streamable을 만드는 데 사용할 수 있습니다. Streamable.fromStream (stream)은 동기화 오버 헤드를 발생시키지 않습니다.