ReaderInputStream.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.io;

import com.google.common.annotations.Beta;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkPositionIndexes;

import com.google.common.primitives.UnsignedBytes;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Arrays;

/**
 * An {@link InputStream} that converts characters from a {@link Reader} into bytes using an
 * arbitrary Charset.
 *
 * <p>This is an alternative to copying the data to an {@code OutputStream} via a {@code Writer},
 * which is necessarily blocking. By implementing an {@code InputStream} it allows consumers to
 * "pull" as much data as they can handle, which is more convenient when dealing with flow
 * controlled, async APIs.
 *
 * @author Chris Nokleberg
 *
 * Notes by Z: THis class in its current state is not ready for prime time. Here is why:
 * 1) Since it buffers, there needs to be capability to access the unconsumed bytes/chars.
 * 2) Buffer sizing could be smarter? char buffer and byte buffers are not "byte size equivalent"(just made this up :-))
 */
@Beta
public final class ReaderInputStream extends InputStream {
  private final Reader reader;
  private final CharsetEncoder encoder;
  private final byte[] singleByte = new byte[1];

  /**
   * charBuffer holds characters that have been read from the Reader but not encoded yet. The buffer
   * is perpetually "flipped" (unencoded characters between position and limit).
   */
  private CharBuffer charBuffer;

  /**
   * byteBuffer holds encoded characters that have not yet been sent to the caller of the input
   * stream. When encoding it is "unflipped" (encoded bytes between 0 and position) and when
   * draining it is flipped (undrained bytes between position and limit).
   */
  private ByteBuffer byteBuffer;

  /** Whether we've finished reading the reader. */
  private boolean endOfInput;
  /** Whether we're copying encoded bytes to the caller's buffer. */
  private boolean draining;
  /** Whether we've successfully flushed the encoder. */
  private boolean doneFlushing;

  /**
   * Creates a new input stream that will encode the characters from {@code reader} into bytes using
   * the given character set. Malformed input and unmappable characters will be replaced.
   *
   * @param reader input source
   * @param charset character set used for encoding chars to bytes
   * @param bufferSize size of internal input and output buffers
   * @throws IllegalArgumentException if bufferSize is non-positive
   */
  public ReaderInputStream(final Reader reader, final Charset charset, final int bufferSize) {
    this(
        reader,
        charset
            .newEncoder()
            .onMalformedInput(CodingErrorAction.REPLACE)
            .onUnmappableCharacter(CodingErrorAction.REPLACE),
        bufferSize);
  }

  /**
   * Creates a new input stream that will encode the characters from {@code reader} into bytes using
   * the given character set encoder.
   *
   * @param reader input source
   * @param encoder character set encoder used for encoding chars to bytes
   * @param bufferSize size of internal input and output buffers
   * @throws IllegalArgumentException if bufferSize is non-positive
   */
  @SuppressFBWarnings("EI_EXPOSE_REP2")
  public ReaderInputStream(final Reader reader, final CharsetEncoder encoder, final int bufferSize) {
    this.reader = checkNotNull(reader);
    this.encoder = checkNotNull(encoder);
    checkArgument(bufferSize > 0, "bufferSize must be positive: %s", bufferSize);
    encoder.reset();

    charBuffer = CharBuffer.allocate(bufferSize);
    charBuffer.flip();

    byteBuffer = ByteBuffer.allocate(bufferSize);
  }

  @Override
  public void close() throws IOException {
    reader.close();
  }

  @Override
  public int read() throws IOException {
    return (read(singleByte) == 1) ? UnsignedBytes.toInt(singleByte[0]) : -1;
  }

  // TODO(chrisn): Consider trying to encode/flush directly to the argument byte
  // buffer when possible.
  @Override
  public int read(final byte[] b, final int off, final int len) throws IOException {
    // Obey InputStream contract.
    checkPositionIndexes(off, off + len, b.length);
    if (len == 0) {
      return 0;
    }

    // The rest of this method implements the process described by the CharsetEncoder javadoc.
    int totalBytesRead = 0;
    boolean doneEncoding = endOfInput;

    DRAINING:
    while (true) {
      // We stay in draining mode until there are no bytes left in the output buffer. Then we go
      // back to encoding/flushing.
      if (draining) {
        totalBytesRead += drain(b, off + totalBytesRead, len - totalBytesRead);
        if (totalBytesRead == len || doneFlushing) {
          return (totalBytesRead > 0) ? totalBytesRead : -1;
        }
        draining = false;
        byteBuffer.clear();
      }

      while (true) {
        // We call encode until there is no more input. The last call to encode will have endOfInput
        // == true. Then there is a final call to flush.
        CoderResult result;
        if (doneFlushing) {
          result = CoderResult.UNDERFLOW;
        } else if (doneEncoding) {
          result = encoder.flush(byteBuffer);
        } else {
          result = encoder.encode(charBuffer, byteBuffer, endOfInput);
        }

        if (result.isOverflow()) {
          // Not enough room in output buffer--drain it, creating a bigger buffer if necessary.
          startDraining(true);
          continue DRAINING;
        } else if (result.isUnderflow()) {
          // If encoder underflows, it means either:
          // a) the final flush() succeeded; next drain (then done)
          // b) we encoded all of the input; next flush
          // c) we ran of out input to encode; next read more input
          if (doneEncoding) { // (a)
            doneFlushing = true;
            startDraining(false);
            continue DRAINING;
          } else if (endOfInput) { // (b)
            doneEncoding = true;
          } else { // (c)
            readMoreChars();
          }
        } else if (result.isError()) {
          // Only reach here if a CharsetEncoder with non-REPLACE settings is used.
          result.throwException();
          return 0; // Not called.
        }
      }
    }
  }

  /** Returns a new CharBuffer identical to buf, except twice the capacity. */
  private static CharBuffer grow(final CharBuffer buf) {
    char[] copy = Arrays.copyOf(buf.array(), buf.capacity() * 2);
    CharBuffer bigger = CharBuffer.wrap(copy);
    bigger.position(buf.position());
    bigger.limit(buf.limit());
    return bigger;
  }

  /** Handle the case of underflow caused by needing more input characters. */
  private void readMoreChars() throws IOException {
    // Possibilities:
    // 1) array has space available on right hand side (between limit and capacity)
    // 2) array has space available on left hand side (before position)
    // 3) array has no space available
    //
    // In case 2 we shift the existing chars to the left, and in case 3 we create a bigger
    // array, then they both become case 1.

    if (availableCapacity(charBuffer) == 0) {
      if (charBuffer.position() > 0) {
        // (2) There is room in the buffer. Move existing bytes to the beginning.
        charBuffer.compact().flip();
      } else {
        // (3) Entire buffer is full, need bigger buffer.
        charBuffer = grow(charBuffer);
      }
    }

    // (1) Read more characters into free space at end of array.
    int limit = charBuffer.limit();
    int numChars = reader.read(charBuffer.array(), limit, availableCapacity(charBuffer));
    if (numChars == -1) {
      endOfInput = true;
    } else {
      charBuffer.limit(limit + numChars);
    }
  }

  /** Returns the number of elements between the limit and capacity. */
  private static int availableCapacity(final Buffer buffer) {
    return buffer.capacity() - buffer.limit();
  }

  /**
   * Flips the buffer output buffer so we can start reading bytes from it. If we are starting to
   * drain because there was overflow, and there aren't actually any characters to drain, then the
   * overflow must be due to a small output buffer.
   */
  private void startDraining(final boolean overflow) {
    byteBuffer.flip();
    if (overflow && byteBuffer.remaining() == 0) {
      byteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2);
    } else {
      draining = true;
    }
  }

  /**
   * Copy as much of the byte buffer into the output array as possible, returning the (positive)
   * number of characters copied.
   */
  private int drain(final byte[] b, final int off, final int len) {
    int remaining = Math.min(len, byteBuffer.remaining());
    byteBuffer.get(b, off, remaining);
    return remaining;
  }

  @Override
  public String toString() {
    return "ReaderInputStream{" + "reader=" + reader + ", encoder=" + encoder + ", charBuffer=" + charBuffer
            + ", endOfInput=" + endOfInput + ", draining=" + draining + ", doneFlushing=" + doneFlushing + '}';
  }


}