Read from ByteArrayOutputStream while it's being written to

I have a class that is constantly producing data and writing it to a ByteArrayOutputStream on its own thread. I have a 2nd thread that gets a reference to this ByteArrayOutputStream. I want the 2nd thread to read any data (and empty) the ByteArrayOutputStream and then stop when it doesn't get any bytes and sleep. After the sleep, I want it to try to get more data and empty it again.

The examples I see online say to use PipedOutputStream. If my first thread is making the ByteArrayOutputStream available to the outside world from a separate reusable library, I don't see how to hook up the inputStream to it.

How would one setup the PipedInputStream to connect it to the ByteArrayOutputStream to read from it as above? Also, when reading the last block from the ByteArrayOutputStream, will I see bytesRead == -1, indicating when the outputStream is closed from the first thread?

Many thanks, Mike

Write to the PipedOutputStream directly (that is, don't use a ByteArrayOutputStream at all). They both extend OutputStream and so have the same interface.

There are connect methods in both PipedOutputStream and PipedInputStream that are used to wire two pipes together, or you can use one of the constructors to create a pair.

Writes to the PipedOutputStream will block when the buffer in the PipedInputStream fills up, and reads from the PipedInputStream will block when the buffer is empty, so both threads will sleep (block) if they get "ahead" of the other side.

Your input stream will see the EOF (bytesRead == -1) when you close the output stream in the producer thread.

import java.io.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class PipeTest {
    public static void main(String[] args) throws IOException {
        PipedOutputStream out = new PipedOutputStream();
        // Wire an input stream to the output stream, and use a buffer of 2048 bytes
        PipedInputStream in = new PipedInputStream(out, 2048);

        ExecutorService executor = Executors.newCachedThreadPool();

        executor.execute(() -> {
            try {
                for (int i = 0; i < 1024; i++) {
                    out.write(0);
                }
                out.close();
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        executor.execute(() -> {
            try {
                int b, read = 0;
                while ((b = in.read()) != -1) {
                    read++;
                }
                System.out.println("Read " + read + " bytes.");
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });

        executor.shutdown();
    }
}

0 Comment

NO COMMENTS

LEAVE A REPLY

Captcha image