MemorizingBufferedInputStream.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.io;
import com.google.common.io.BaseEncoding;
import edu.umd.cs.findbugs.annotations.CleanupObligation;
import edu.umd.cs.findbugs.annotations.DischargesObligation;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CodingErrorAction;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.spf4j.base.Arrays;
import org.spf4j.recyclable.SizedRecyclingSupplier;
import org.spf4j.recyclable.impl.ArraySuppliers;
/**
* Why another buffered input stream?
* Main use case if for troubleshooting.
* Allows you to get more detail on where your stream processing failed.
*
* Implementation is a circular byte buffer, where you have 2 sizes to control the behavior:
*
* buffer size - the total sie of the buffer.
* read size - the maximum number of read bytes kept in the buffer.
*
*
* @author zoly
*/
@ParametersAreNonnullByDefault
@CleanupObligation
public final class MemorizingBufferedInputStream extends FilterInputStream {
private byte[] memory;
private final SizedRecyclingSupplier<byte[]> bufferProvider;
private final int readSize;
private final Charset charset;
private int memIdx;
private int startIdx;
private int endIdx;
private boolean isEof;
private boolean isClosed;
private long readBytes;
public MemorizingBufferedInputStream(final InputStream in) {
this(in, 16384, 8192, ArraySuppliers.Bytes.GL_SUPPLIER, null);
}
public MemorizingBufferedInputStream(final InputStream in, final int size) {
this(in, size, size / 2, ArraySuppliers.Bytes.GL_SUPPLIER, null);
}
public MemorizingBufferedInputStream(final InputStream in, final Charset charset) {
this(in, 16384, 8192, ArraySuppliers.Bytes.GL_SUPPLIER, charset);
}
public MemorizingBufferedInputStream(final InputStream in, final Charset charset, final int size) {
this(in, size, size / 2, ArraySuppliers.Bytes.GL_SUPPLIER, charset);
}
@SuppressFBWarnings("EI_EXPOSE_REP2")
public MemorizingBufferedInputStream(final InputStream in,
final int size, final int readSize,
final SizedRecyclingSupplier<byte[]> bufferProvider,
@Nullable final Charset charset) {
super(in);
if (readSize > size) {
throw new IllegalArgumentException("Read size " + readSize + " cannot be greater than " + size);
}
if (size < 2) {
throw new IllegalArgumentException("Buffer size " + size + " cannot be smaler than 2");
}
memory = bufferProvider.get(size);
this.bufferProvider = bufferProvider;
this.charset = charset;
this.startIdx = 0;
this.endIdx = 0;
this.readSize = readSize;
this.isEof = false;
this.isClosed = false;
this.readBytes = 0;
}
@Override
@DischargesObligation
public synchronized void close() throws IOException {
if (!isClosed) {
isClosed = true;
bufferProvider.recycle(memory);
memory = null;
super.close();
}
}
private int availableToWrite() {
if (memIdx <= endIdx) {
return memIdx + memory.length - endIdx - 1;
} else {
return memIdx - endIdx - 1;
}
}
private int availableToRead() {
if (startIdx <= endIdx) {
return endIdx - startIdx;
} else {
return memory.length - startIdx + endIdx;
}
}
private int availableInMemory() {
if (memIdx <= startIdx) {
return startIdx - memIdx;
} else {
return memory.length - memIdx + startIdx;
}
}
public synchronized byte[] getReadBytesFromBuffer() {
final int availableInMemory = availableInMemory();
if (availableInMemory == 0) {
return Arrays.EMPTY_BYTE_ARRAY;
}
byte[] result = new byte[availableInMemory];
if (memIdx < startIdx) {
System.arraycopy(memory, memIdx, result, 0, result.length);
} else {
final int toEnd = memory.length - memIdx;
System.arraycopy(memory, memIdx, result, 0, toEnd);
System.arraycopy(memory, 0, result, toEnd, startIdx);
}
return result;
}
public synchronized byte[] getUnreadBytesFromBuffer() {
final int availableToRead = availableToRead();
if (availableToRead == 0) {
return Arrays.EMPTY_BYTE_ARRAY;
}
byte[] result = new byte[availableToRead];
if (startIdx < endIdx) {
System.arraycopy(memory, startIdx, result, 0, result.length);
} else {
final int toEnd = memory.length - startIdx;
System.arraycopy(memory, startIdx, result, 0, toEnd);
System.arraycopy(memory, 0, result, toEnd, endIdx);
}
return result;
}
private int tryCleanup(final int size) {
int canWrite = availableToWrite();
if (canWrite < size) {
int toFree = size - canWrite;
int availableToFree = availableInMemory();
if (toFree > availableToFree) {
toFree = availableToFree;
}
memIdx = memIdx + toFree;
if (memIdx >= memory.length) {
memIdx = memIdx - memory.length;
}
return toFree + canWrite;
} else {
return size;
}
}
private void fill() throws IOException {
int size = tryCleanup(readSize);
if (size < readSize) {
throw new IllegalStateException("Illegal state " + this);
}
int canWriteInBulk = Math.min(size, memory.length - endIdx);
int read = super.read(memory, endIdx, canWriteInBulk);
if (read < 0) {
isEof = true;
return;
}
endIdx += read;
if (read < canWriteInBulk) {
return;
}
int wrapArround = size - canWriteInBulk;
if (wrapArround > 0) {
read = super.read(memory, 0, wrapArround);
if (read < 0) {
if (endIdx >= memory.length) {
endIdx = 0;
}
isEof = true;
return;
}
endIdx = read;
} else if (endIdx >= memory.length) {
endIdx = 0;
}
}
@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
if (isClosed) {
throw new IOException("Stream is closed " + this);
}
if (len < 0 || off + len > b.length) {
throw new ArrayIndexOutOfBoundsException("Offset " + off + " or len " + len);
}
int availableToRead = availableToRead();
if (availableToRead <= 0) {
fill();
availableToRead = availableToRead();
}
if (availableToRead == 0) {
if (isEof) {
return -1;
} else {
throw new IllegalStateException("State=" + this);
}
}
int toRead = Math.min(availableToRead, len);
int readToEnd = Math.min(toRead, memory.length - startIdx);
System.arraycopy(memory, startIdx, b, off, readToEnd);
startIdx += readToEnd;
int wrapArround = toRead - readToEnd;
if (wrapArround > 0) {
System.arraycopy(memory, 0, b, off + readToEnd, wrapArround);
startIdx = wrapArround;
} else if (startIdx >= memory.length) {
startIdx = 0;
}
this.readBytes += toRead;
return toRead;
}
@Override
public synchronized int read() throws IOException {
if (isClosed) {
throw new IOException("Stream is closed " + this);
}
int availableToRead = availableToRead();
if (availableToRead <= 0) {
fill();
availableToRead = availableToRead();
}
if (availableToRead == 0) {
if (isEof) {
return -1;
} else {
throw new IllegalStateException("State=" + this);
}
}
int result = memory[startIdx++] & 0xff;
if (startIdx >= memory.length) {
startIdx = 0;
}
this.readBytes++;
return result;
}
@Override
public synchronized int available() throws IOException {
if (isClosed) {
throw new IOException("Stream is closed " + this);
}
return availableToRead();
}
@Override
public synchronized String toString() {
StringBuilder result;
if (isClosed) {
result = new StringBuilder(64);
} else {
result = new StringBuilder((availableToRead() + availableInMemory()) * 2 + 128);
}
result.append("MemorizingBufferedInputStream{\n");
if (isClosed) {
result.append("closed=true\n");
} else if (charset == null) {
final BaseEncoding base64 = BaseEncoding.base64();
result.append("readBytes=\"").append(base64.encode(getReadBytesFromBuffer())).append("\",\n");
result.append("unreadBytes=\"").append(base64.encode(getUnreadBytesFromBuffer())).append("\"\n");
} else {
try {
result.append("readStr=\"").append(
charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE).replaceWith("?")
.decode(ByteBuffer.wrap(getReadBytesFromBuffer()))).append("\",\n");
result.append("unreadStr=\"").append(
charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE).replaceWith("?")
.decode(ByteBuffer.wrap(getUnreadBytesFromBuffer()))).append("\"\n");
} catch (CharacterCodingException ex) {
throw new UncheckedIOException(ex);
}
}
result.append("memIdx=").append(this.memIdx).append("\"\n");
result.append("startIdx=").append(this.startIdx).append("\"\n");
result.append("endIdx=").append(this.endIdx).append("\"\n");
result.append('}');
return result.toString();
}
public synchronized long getReadBytes() {
return readBytes;
}
@Override
public synchronized long skip(final long n) throws IOException {
long nrSkipped = 0;
for (int i = 0; i < n; i++) {
int read = read();
if (read < 0) {
break;
} else {
nrSkipped++;
}
}
return nrSkipped;
}
@Override
public boolean markSupported() {
return false;
}
@Override
public synchronized void reset() {
throw new UnsupportedOperationException();
}
@Override
public synchronized void mark(final int readlimit) {
throw new UnsupportedOperationException();
}
}