1 /* 2 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved. 3 * 4 * This library is free software; you can redistribute it and/or 5 * modify it under the terms of the GNU Lesser General Public 6 * License as published by the Free Software Foundation; either 7 * version 2.1 of the License, or (at your option) any later version. 8 * 9 * This library is distributed in the hope that it will be useful, 10 * but WITHOUT ANY WARRANTY; without even the implied warranty of 11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 * GNU General Public License for more details. 13 * 14 * You should have received a copy of the GNU Lesser General Public 15 * License along with this program; if not, write to the Free Software 16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. 17 * 18 * Additionally licensed with: 19 * 20 * Licensed under the Apache License, Version 2.0 (the "License"); 21 * you may not use this file except in compliance with the License. 22 * You may obtain a copy of the License at 23 * 24 * http://www.apache.org/licenses/LICENSE-2.0 25 * 26 * Unless required by applicable law or agreed to in writing, software 27 * distributed under the License is distributed on an "AS IS" BASIS, 28 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 29 * See the License for the specific language governing permissions and 30 * limitations under the License. 31 */ 32 package org.spf4j.io; 33 34 import com.google.common.annotations.Beta; 35 import static com.google.common.base.Preconditions.checkArgument; 36 import static com.google.common.base.Preconditions.checkNotNull; 37 import static com.google.common.base.Preconditions.checkPositionIndexes; 38 39 import com.google.common.primitives.UnsignedBytes; 40 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; 41 42 import java.io.IOException; 43 import java.io.InputStream; 44 import java.io.Reader; 45 import java.nio.Buffer; 46 import java.nio.ByteBuffer; 47 import java.nio.CharBuffer; 48 import java.nio.charset.Charset; 49 import java.nio.charset.CharsetEncoder; 50 import java.nio.charset.CoderResult; 51 import java.nio.charset.CodingErrorAction; 52 import java.util.Arrays; 53 54 /** 55 * An {@link InputStream} that converts characters from a {@link Reader} into bytes using an 56 * arbitrary Charset. 57 * 58 * <p>This is an alternative to copying the data to an {@code OutputStream} via a {@code Writer}, 59 * which is necessarily blocking. By implementing an {@code InputStream} it allows consumers to 60 * "pull" as much data as they can handle, which is more convenient when dealing with flow 61 * controlled, async APIs. 62 * 63 * @author Chris Nokleberg 64 * 65 * Notes by Z: THis class in its current state is not ready for prime time. Here is why: 66 * 1) Since it buffers, there needs to be capability to access the unconsumed bytes/chars. 67 * 2) Buffer sizing could be smarter? char buffer and byte buffers are not "byte size equivalent"(just made this up :-)) 68 */ 69 @Beta 70 public final class ReaderInputStream extends InputStream { 71 private final Reader reader; 72 private final CharsetEncoder encoder; 73 private final byte[] singleByte = new byte[1]; 74 75 /** 76 * charBuffer holds characters that have been read from the Reader but not encoded yet. The buffer 77 * is perpetually "flipped" (unencoded characters between position and limit). 78 */ 79 private CharBuffer charBuffer; 80 81 /** 82 * byteBuffer holds encoded characters that have not yet been sent to the caller of the input 83 * stream. When encoding it is "unflipped" (encoded bytes between 0 and position) and when 84 * draining it is flipped (undrained bytes between position and limit). 85 */ 86 private ByteBuffer byteBuffer; 87 88 /** Whether we've finished reading the reader. */ 89 private boolean endOfInput; 90 /** Whether we're copying encoded bytes to the caller's buffer. */ 91 private boolean draining; 92 /** Whether we've successfully flushed the encoder. */ 93 private boolean doneFlushing; 94 95 /** 96 * Creates a new input stream that will encode the characters from {@code reader} into bytes using 97 * the given character set. Malformed input and unmappable characters will be replaced. 98 * 99 * @param reader input source 100 * @param charset character set used for encoding chars to bytes 101 * @param bufferSize size of internal input and output buffers 102 * @throws IllegalArgumentException if bufferSize is non-positive 103 */ 104 public ReaderInputStream(final Reader reader, final Charset charset, final int bufferSize) { 105 this( 106 reader, 107 charset 108 .newEncoder() 109 .onMalformedInput(CodingErrorAction.REPLACE) 110 .onUnmappableCharacter(CodingErrorAction.REPLACE), 111 bufferSize); 112 } 113 114 /** 115 * Creates a new input stream that will encode the characters from {@code reader} into bytes using 116 * the given character set encoder. 117 * 118 * @param reader input source 119 * @param encoder character set encoder used for encoding chars to bytes 120 * @param bufferSize size of internal input and output buffers 121 * @throws IllegalArgumentException if bufferSize is non-positive 122 */ 123 @SuppressFBWarnings("EI_EXPOSE_REP2") 124 public ReaderInputStream(final Reader reader, final CharsetEncoder encoder, final int bufferSize) { 125 this.reader = checkNotNull(reader); 126 this.encoder = checkNotNull(encoder); 127 checkArgument(bufferSize > 0, "bufferSize must be positive: %s", bufferSize); 128 encoder.reset(); 129 130 charBuffer = CharBuffer.allocate(bufferSize); 131 charBuffer.flip(); 132 133 byteBuffer = ByteBuffer.allocate(bufferSize); 134 } 135 136 @Override 137 public void close() throws IOException { 138 reader.close(); 139 } 140 141 @Override 142 public int read() throws IOException { 143 return (read(singleByte) == 1) ? UnsignedBytes.toInt(singleByte[0]) : -1; 144 } 145 146 // TODO(chrisn): Consider trying to encode/flush directly to the argument byte 147 // buffer when possible. 148 @Override 149 public int read(final byte[] b, final int off, final int len) throws IOException { 150 // Obey InputStream contract. 151 checkPositionIndexes(off, off + len, b.length); 152 if (len == 0) { 153 return 0; 154 } 155 156 // The rest of this method implements the process described by the CharsetEncoder javadoc. 157 int totalBytesRead = 0; 158 boolean doneEncoding = endOfInput; 159 160 DRAINING: 161 while (true) { 162 // We stay in draining mode until there are no bytes left in the output buffer. Then we go 163 // back to encoding/flushing. 164 if (draining) { 165 totalBytesRead += drain(b, off + totalBytesRead, len - totalBytesRead); 166 if (totalBytesRead == len || doneFlushing) { 167 return (totalBytesRead > 0) ? totalBytesRead : -1; 168 } 169 draining = false; 170 byteBuffer.clear(); 171 } 172 173 while (true) { 174 // We call encode until there is no more input. The last call to encode will have endOfInput 175 // == true. Then there is a final call to flush. 176 CoderResult result; 177 if (doneFlushing) { 178 result = CoderResult.UNDERFLOW; 179 } else if (doneEncoding) { 180 result = encoder.flush(byteBuffer); 181 } else { 182 result = encoder.encode(charBuffer, byteBuffer, endOfInput); 183 } 184 185 if (result.isOverflow()) { 186 // Not enough room in output buffer--drain it, creating a bigger buffer if necessary. 187 startDraining(true); 188 continue DRAINING; 189 } else if (result.isUnderflow()) { 190 // If encoder underflows, it means either: 191 // a) the final flush() succeeded; next drain (then done) 192 // b) we encoded all of the input; next flush 193 // c) we ran of out input to encode; next read more input 194 if (doneEncoding) { // (a) 195 doneFlushing = true; 196 startDraining(false); 197 continue DRAINING; 198 } else if (endOfInput) { // (b) 199 doneEncoding = true; 200 } else { // (c) 201 readMoreChars(); 202 } 203 } else if (result.isError()) { 204 // Only reach here if a CharsetEncoder with non-REPLACE settings is used. 205 result.throwException(); 206 return 0; // Not called. 207 } 208 } 209 } 210 } 211 212 /** Returns a new CharBuffer identical to buf, except twice the capacity. */ 213 private static CharBuffer grow(final CharBuffer buf) { 214 char[] copy = Arrays.copyOf(buf.array(), buf.capacity() * 2); 215 CharBuffer bigger = CharBuffer.wrap(copy); 216 bigger.position(buf.position()); 217 bigger.limit(buf.limit()); 218 return bigger; 219 } 220 221 /** Handle the case of underflow caused by needing more input characters. */ 222 private void readMoreChars() throws IOException { 223 // Possibilities: 224 // 1) array has space available on right hand side (between limit and capacity) 225 // 2) array has space available on left hand side (before position) 226 // 3) array has no space available 227 // 228 // In case 2 we shift the existing chars to the left, and in case 3 we create a bigger 229 // array, then they both become case 1. 230 231 if (availableCapacity(charBuffer) == 0) { 232 if (charBuffer.position() > 0) { 233 // (2) There is room in the buffer. Move existing bytes to the beginning. 234 charBuffer.compact().flip(); 235 } else { 236 // (3) Entire buffer is full, need bigger buffer. 237 charBuffer = grow(charBuffer); 238 } 239 } 240 241 // (1) Read more characters into free space at end of array. 242 int limit = charBuffer.limit(); 243 int numChars = reader.read(charBuffer.array(), limit, availableCapacity(charBuffer)); 244 if (numChars == -1) { 245 endOfInput = true; 246 } else { 247 charBuffer.limit(limit + numChars); 248 } 249 } 250 251 /** Returns the number of elements between the limit and capacity. */ 252 private static int availableCapacity(final Buffer buffer) { 253 return buffer.capacity() - buffer.limit(); 254 } 255 256 /** 257 * Flips the buffer output buffer so we can start reading bytes from it. If we are starting to 258 * drain because there was overflow, and there aren't actually any characters to drain, then the 259 * overflow must be due to a small output buffer. 260 */ 261 private void startDraining(final boolean overflow) { 262 byteBuffer.flip(); 263 if (overflow && byteBuffer.remaining() == 0) { 264 byteBuffer = ByteBuffer.allocate(byteBuffer.capacity() * 2); 265 } else { 266 draining = true; 267 } 268 } 269 270 /** 271 * Copy as much of the byte buffer into the output array as possible, returning the (positive) 272 * number of characters copied. 273 */ 274 private int drain(final byte[] b, final int off, final int len) { 275 int remaining = Math.min(len, byteBuffer.remaining()); 276 byteBuffer.get(b, off, remaining); 277 return remaining; 278 } 279 280 @Override 281 public String toString() { 282 return "ReaderInputStream{" + "reader=" + reader + ", encoder=" + encoder + ", charBuffer=" + charBuffer 283 + ", endOfInput=" + endOfInput + ", draining=" + draining + ", doneFlushing=" + doneFlushing + '}'; 284 } 285 286 287 }