“스트림이 이미 작동되었거나 닫혔습니다”를 방지하기 위해 스트림을 복사합니다. 번 처리

두 번 처리 할 수 ​​있도록 Java 8 스트림을 복제하고 싶습니다. 나는 collect목록으로 할 수 있고 그로부터 새로운 스트림을 얻을 수 있습니다 .

// doSomething() returns a stream
List<A> thing = doSomething().collect(toList());
thing.stream()... // do stuff
thing.stream()... // do other stuff

하지만 좀 더 효율적이고 우아한 방법이 있어야한다고 생각합니다.

컬렉션으로 변환하지 않고 스트림을 복사하는 방법이 있습니까?

나는 실제로 Eithers 스트림으로 작업하고 있으므로 오른쪽 투영으로 이동하고 다른 방식으로 처리하기 전에 왼쪽 투영을 한 방향으로 처리하고 싶습니다. 이런 종류의 (지금까지는 toList트릭 을 사용해야합니다 ).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList());

Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left());
failures.forEach(failure -> ... );

Stream<A> successes = results.stream().flatMap(either -> either.right());
successes.forEach(success -> ... );



답변

효율성에 대한 귀하의 가정은 다소 거꾸로 생각됩니다. 데이터를 저장할 필요가 없기 때문에 데이터를 한 번만 사용하려는 경우 이러한 엄청난 효율성을 얻을 수 있으며, 스트림은 파이프 라인을 통해 전체 데이터를 효율적으로 흐르게하는 강력한 “루프 융합”최적화를 제공합니다.

동일한 데이터를 재사용하려면 정의에 따라 두 번 (결정적) 생성하거나 저장해야합니다. 이미 컬렉션에있는 경우 좋습니다. 두 번 반복하면 저렴합니다.

우리는 “포크 스트림”으로 디자인을 실험했습니다. 우리가 발견 한 것은이를 지원하는 데 실제 비용이 든다는 것입니다. 그것은 흔하지 않은 경우를 희생하여 일반적인 경우 (한 번 사용)를 부담했습니다. 큰 문제는 “두 파이프 라인이 동일한 속도로 데이터를 소비하지 않을 때 발생하는 일”을 처리하는 것이 었습니다. 이제 어쨌든 버퍼링으로 돌아갑니다. 이것은 분명히 그 무게를 지니지 않은 기능이었습니다.

동일한 데이터에 대해 반복적으로 작업하려면 데이터를 저장하거나 작업을 소비자로 구성하고 다음을 수행합니다.

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

처리 모델이 이러한 종류의 “스트림 포크”에 더 적합하기 때문에 RxJava 라이브러리를 살펴볼 수도 있습니다.


답변

와 함께 로컬 변수를 사용 Supplier하여 스트림 파이프 라인의 공통 부분을 설정할 수 있습니다.

에서 http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :

스트림 재사용

Java 8 스트림은 재사용 할 수 없습니다. 터미널 작업을 호출하자마자 스트림이 닫힙니다.

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

Calling `noneMatch` after `anyMatch` on the same stream results in the following exception:
java.lang.IllegalStateException: stream has already been operated upon or closed
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at
java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
at com.winterbe.java8.Streams5.test7(Streams5.java:38)
at com.winterbe.java8.Streams5.main(Streams5.java:28)

이 한계를 극복하려면 실행하려는 모든 터미널 작업에 대해 새 스트림 체인을 만들어야합니다. 예를 들어 모든 중간 작업이 이미 설정된 새 스트림을 생성하는 스트림 공급자를 만들 수 있습니다.

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

에 대한 각 호출 get()은 원하는 터미널 작업을 호출하기 위해 저장되는 새 스트림 을 생성합니다.


답변

를 사용하여 Supplier각 종료 작업에 대한 스트림을 생성합니다.

Supplier<Stream<Integer>> streamSupplier = () -> list.stream();

해당 컬렉션의 스트림이 필요할 때마다을 사용 streamSupplier.get()하여 새 스트림을 가져옵니다.

예 :

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

답변

우리는 구현했습니다 duplicate()에서 스트림을위한 방법을 jOOλ , 우리가에 대한 통합 테스트를 개선하기 위해 만든 오픈 소스 라이브러리 jOOQ을 . 기본적으로 다음과 같이 작성할 수 있습니다.

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

내부적으로는 한 스트림에서 소비되었지만 다른 스트림에서는 소비되지 않은 모든 값을 저장하는 버퍼가 있습니다. 두 스트림이 거의 동일한 속도로 소비되고 thread-safety 부족으로 살 수 있다면 그것은 아마도 효율적일 것입니다 .

알고리즘 작동 방식은 다음과 같습니다.

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) {
    final List<T> gap = new LinkedList<>();
    final Iterator<T> it = stream.iterator();

    @SuppressWarnings("unchecked")
    final Iterator<T>[] ahead = new Iterator[] { null };

    class Duplicate implements Iterator<T> {
        @Override
        public boolean hasNext() {
            if (ahead[0] == null || ahead[0] == this)
                return it.hasNext();

            return !gap.isEmpty();
        }

        @Override
        public T next() {
            if (ahead[0] == null)
                ahead[0] = this;

            if (ahead[0] == this) {
                T value = it.next();
                gap.offer(value);
                return value;
            }

            return gap.poll();
        }
    }

    return tuple(seq(new Duplicate()), seq(new Duplicate()));
}

여기에 더 많은 소스 코드

Tuple2아마 당신처럼 Pair반면, 유형 Seq입니다 Stream몇 가지 향상된 기능.


답변

실행 가능한 스트림을 만들 수 있습니다 (예 :).

results.stream()
    .flatMap(either -> Stream.<Runnable> of(
            () -> failure(either.left()),
            () -> success(either.right())))
    .forEach(Runnable::run);

적용 할 작업은 어디에 failure있고 있습니까 success? 그러나 이것은 꽤 많은 임시 객체를 생성하고 컬렉션에서 시작하여 두 번 스트리밍 / 반복하는 것보다 더 효율적이지 않을 수 있습니다.


답변

요소를 여러 번 처리하는 또 다른 방법은 Stream.peek (Consumer) 를 사용하는 것입니다 .

doSomething().stream()
.peek(either -> handleFailure(either.left()))
.foreach(either -> handleSuccess(either.right()));

peek(Consumer) 필요한만큼 여러 번 연결할 수 있습니다.

doSomething().stream()
.peek(element -> handleFoo(element.foo()))
.peek(element -> handleBar(element.bar()))
.peek(element -> handleBaz(element.baz()))
.foreach(element-> handleQux(element.qux()));


답변

내가 기여한 라이브러리 인 cyclops-react 에는 Stream을 복제 할 수있는 정적 메서드가 있습니다 (그리고 jOOλ Tuple of Streams 반환).

    Stream<Integer> stream = Stream.of(1,2,3);
    Tuple2<Stream<Integer>,Stream<Integer>> streams =  StreamUtils.duplicate(stream);

주석을 참조하십시오. 기존 스트림에서 중복을 사용할 때 발생하는 성능 저하가 있습니다. 더 성능이 좋은 대안은 Streamable을 사용하는 것입니다.

Stream, Iterable 또는 Array에서 구성하고 여러 번 재생할 수있는 (lazy) Streamable 클래스도 있습니다.

    Streamable<Integer> streamable = Streamable.of(1,2,3);
    streamable.stream().forEach(System.out::println);
    streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream (stream)-스레드간에 공유 할 수있는 방식으로 백업 컬렉션을 느리게 채울 Streamable을 만드는 데 사용할 수 있습니다. Streamable.fromStream (stream)은 동기화 오버 헤드를 발생시키지 않습니다.