View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.io;
33  
34  import com.google.common.annotations.Beta;
35  import static com.google.common.base.Preconditions.checkArgument;
36  import static com.google.common.base.Preconditions.checkNotNull;
37  import static com.google.common.base.Preconditions.checkPositionIndexes;
38  
39  import com.google.common.primitives.UnsignedBytes;
40  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
41  
42  import java.io.IOException;
43  import java.io.InputStream;
44  import java.io.Reader;
45  import java.nio.Buffer;
46  import java.nio.ByteBuffer;
47  import java.nio.CharBuffer;
48  import java.nio.charset.Charset;
49  import java.nio.charset.CharsetEncoder;
50  import java.nio.charset.CoderResult;
51  import java.nio.charset.CodingErrorAction;
52  import java.util.Arrays;
53  
54  /**
55   * An {@link InputStream} that converts characters from a {@link Reader} into bytes using an
56   * arbitrary Charset.
57   *
58   * <p>This is an alternative to copying the data to an {@code OutputStream} via a {@code Writer},
59   * which is necessarily blocking. By implementing an {@code InputStream} it allows consumers to
60   * "pull" as much data as they can handle, which is more convenient when dealing with flow
61   * controlled, async APIs.
62   *
63   * @author Chris Nokleberg
64   *
65   * Notes by Z: THis class in its current state is not ready for prime time. Here is why:
66   * 1) Since it buffers, there needs to be capability to access the unconsumed bytes/chars.
67   * 2) Buffer sizing could be smarter? char buffer and byte buffers are not "byte size equivalent"(just made this up :-))
68   */
69  @Beta
70  public final class ReaderInputStream extends InputStream {
71    private final Reader reader;
72    private final CharsetEncoder encoder;
73    private final byte[] singleByte = new byte[1];
74  
75    /**
76     * charBuffer holds characters that have been read from the Reader but not encoded yet. The buffer
77     * is perpetually "flipped" (unencoded characters between position and limit).
78     */
79    private CharBuffer charBuffer;
80  
81    /**
82     * byteBuffer holds encoded characters that have not yet been sent to the caller of the input
83     * stream. When encoding it is "unflipped" (encoded bytes between 0 and position) and when
84     * draining it is flipped (undrained bytes between position and limit).
85     */
86    private ByteBuffer byteBuffer;
87  
88    /** Whether we've finished reading the reader. */
89    private boolean endOfInput;
90    /** Whether we're copying encoded bytes to the caller's buffer. */
91    private boolean draining;
92    /** Whether we've successfully flushed the encoder. */
93    private boolean doneFlushing;
94  
95    /**
96     * Creates a new input stream that will encode the characters from {@code reader} into bytes using
97     * the given character set. Malformed input and unmappable characters will be replaced.
98     *
99     * @param reader input source
100    * @param charset character set used for encoding chars to bytes
101    * @param bufferSize size of internal input and output buffers
102    * @throws IllegalArgumentException if bufferSize is non-positive
103    */
104   public ReaderInputStream(final Reader reader, final Charset charset, final int bufferSize) {
105     this(
106         reader,
107         charset
108             .newEncoder()
109             .onMalformedInput(CodingErrorAction.REPLACE)
110             .onUnmappableCharacter(CodingErrorAction.REPLACE),
111         bufferSize);
112   }
113 
114   /**
115    * Creates a new input stream that will encode the characters from {@code reader} into bytes using
116    * the given character set encoder.
117    *
118    * @param reader input source
119    * @param encoder character set encoder used for encoding chars to bytes
120    * @param bufferSize size of internal input and output buffers
121    * @throws IllegalArgumentException if bufferSize is non-positive
122    */
123   @SuppressFBWarnings("EI_EXPOSE_REP2")
124   public ReaderInputStream(final Reader reader, final CharsetEncoder encoder, final int bufferSize) {
125     this.reader = checkNotNull(reader);
126     this.encoder = checkNotNull(encoder);
127     checkArgument(bufferSize > 0, "bufferSize must be positive: %s", bufferSize);
128     encoder.reset();
129 
130     charBuffer = CharBuffer.allocate(bufferSize);
131     charBuffer.flip();
132 
133     byteBuffer = ByteBuffer.allocate(bufferSize);
134   }
135 
136   @Override
137   public void close() throws IOException {
138     reader.close();
139   }
140 
141   @Override
142   public int read() throws IOException {
143     return (read(singleByte) == 1) ? UnsignedBytes.toInt(singleByte[0]) : -1;
144   }
145 
146   // TODO(chrisn): Consider trying to encode/flush directly to the argument byte
147   // buffer when possible.
148   @Override
149   public int read(final byte[] b, final int off, final int len) throws IOException {
150     // Obey InputStream contract.
151     checkPositionIndexes(off, off + len, b.length);
152     if (len == 0) {
153       return 0;
154     }
155 
156     // The rest of this method implements the process described by the CharsetEncoder javadoc.
157     int totalBytesRead = 0;
158     boolean doneEncoding = endOfInput;
159 
160     DRAINING:
161     while (true) {
162       // We stay in draining mode until there are no bytes left in the output buffer. Then we go
163       // back to encoding/flushing.
164       if (draining) {
165         totalBytesRead += drain(b, off + totalBytesRead, len - totalBytesRead);
166         if (totalBytesRead == len || doneFlushing) {
167           return (totalBytesRead > 0) ? totalBytesRead : -1;
168         }
169         draining = false;
170         byteBuffer.clear();
171       }
172 
173       while (true) {
174         // We call encode until there is no more input. The last call to encode will have endOfInput
175         // == true. Then there is a final call to flush.
176         CoderResult result;
177         if (doneFlushing) {
178           result = CoderResult.UNDERFLOW;
179         } else if (doneEncoding) {
180           result = encoder.flush(byteBuffer);
181         } else {
182           result = encoder.encode(charBuffer, byteBuffer, endOfInput);
183         }
184 
185         if (result.isOverflow()) {
186           // Not enough room in output buffer--drain it, creating a bigger buffer if necessary.
187           startDraining(true);
188           continue DRAINING;
189         } else if (result.isUnderflow()) {
190           // If encoder underflows, it means either:
191           // a) the final flush() succeeded; next drain (then done)
192           // b) we encoded all of the input; next flush
193           // c) we ran of out input to encode; next read more input
194           if (doneEncoding) { // (a)
195             doneFlushing = true;
196             startDraining(false);
197             continue DRAINING;
198           } else if (endOfInput) { // (b)
199             doneEncoding = true;
200           } else { // (c)
201             readMoreChars();
202           }
203         } else if (result.isError()) {
204           // Only reach here if a CharsetEncoder with non-REPLACE settings is used.
205           result.throwException();
206           return 0; // Not called.
207         }
208       }
209     }
210   }
211 
212   /** Returns a new CharBuffer identical to buf, except twice the capacity. */
213   private static CharBuffer grow(final CharBuffer buf) {
214     char[] copy = Arrays.copyOf(buf.array(), buf.capacity() * 2);
215     CharBuffer bigger = CharBuffer.wrap(copy);
216     bigger.position(buf.position());
217     bigger.limit(buf.limit());
218     return bigger;
219   }
220 
221   /** Handle the case of underflow caused by needing more input characters. */
222   private void readMoreChars() throws IOException {
223     // Possibilities:
224     // 1) array has space available on right hand side (between limit and capacity)
225     // 2) array has space available on left hand side (before position)
226     // 3) array has no space available
227     //
228     // In case 2 we shift the existing chars to the left, and in case 3 we create a bigger
229     // array, then they both become case 1.
230 
231     if (availableCapacity(charBuffer) == 0) {
232       if (charBuffer.position() > 0) {
233         // (2) There is room in the buffer. Move existing bytes to the beginning.
234         charBuffer.compact().flip();
235       } else {
236         // (3) Entire buffer is full, need bigger buffer.
237         charBuffer = grow(charBuffer);
238       }
239     }
240 
241     // (1) Read more characters into free space at end of array.
242     int limit = charBuffer.limit();
243     int numChars = reader.read(charBuffer.array(), limit, availableCapacity(charBuffer));
244     if (numChars == -1) {
245       endOfInput = true;
246     } else {
247       charBuffer.limit(limit + numChars);
248     }
249   }
250 
251   /** Returns the number of elements between the limit and capacity. */
252   private static int availableCapacity(final Buffer buffer) {
253     return buffer.capacity() - buffer.limit();
254   }
255 
256   /**
257    * Flips the buffer output buffer so we can start reading bytes from it. If we are starting to
258    * drain because there was overflow, and there aren't actually any characters to drain, then the
259    * overflow must be due to a small output buffer.
260    */
261   private void startDraining(final boolean overflow) {
262     byteBuffer.flip();
263     if (overflow && byteBuffer.remaining() == 0) {
264       byteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2);
265     } else {
266       draining = true;
267     }
268   }
269 
270   /**
271    * Copy as much of the byte buffer into the output array as possible, returning the (positive)
272    * number of characters copied.
273    */
274   private int drain(final byte[] b, final int off, final int len) {
275     int remaining = Math.min(len, byteBuffer.remaining());
276     byteBuffer.get(b, off, remaining);
277     return remaining;
278   }
279 
280   @Override
281   public String toString() {
282     return "ReaderInputStream{" + "reader=" + reader + ", encoder=" + encoder + ", charBuffer=" + charBuffer
283             + ", endOfInput=" + endOfInput + ", draining=" + draining + ", doneFlushing=" + doneFlushing + '}';
284   }
285 
286 
287 }