TransferBuffer.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.io.tcp.proxy;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author zoly
*/
public final class TransferBuffer {
private static final Logger LOG = LoggerFactory.getLogger(TransferBuffer.class);
public enum Operation {
READ, WRITE
};
private final ByteBuffer buffer;
private Operation lastOperation;
private boolean isEof;
private Runnable isDataInBufferHook;
private Runnable isRoomInBufferHook;
private Sniffer incomingSniffer;
private IOException readException;
private IOException writeException;
public TransferBuffer(final int bufferSize) {
buffer = ByteBuffer.allocateDirect(bufferSize);
lastOperation = Operation.READ;
isEof = false;
this.isDataInBufferHook = null;
this.isRoomInBufferHook = null;
this.readException = null;
this.writeException = null;
}
public synchronized int read(final SocketChannel channel) {
if (lastOperation == Operation.WRITE) {
buffer.compact();
lastOperation = Operation.READ;
}
int nrRead;
IOException oex = null;
try {
nrRead = channel.read(buffer);
if (incomingSniffer != null && (nrRead != 0)) {
nrRead = incomingSniffer.received(buffer, nrRead);
}
} catch (IOException ex) {
oex = ex;
if (incomingSniffer == null) {
readException = ex;
} else {
readException = incomingSniffer.received(ex);
}
if (readException != null) {
LOG.debug("Exception while reading from {}", channel, ex);
nrRead = -1;
} else {
nrRead = 0; // sniffer filters exception, behave a s no data received.
}
try {
channel.close();
} catch (IOException ex1) {
if (readException != null) {
readException.addSuppressed(ex1);
}
}
}
if (nrRead < 0) {
isEof = true;
try {
channel.socket().shutdownInput(); // ? is this really necessary?
} catch (ClosedChannelException cex) {
// channel is closed.
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
} else if (oex == null && buffer.hasRemaining()) {
isRoomInBufferHook.run();
}
if (buffer.position() > 0 || isEof) {
isDataInBufferHook.run();
}
return nrRead;
}
public synchronized int write(final SocketChannel channel) {
if (lastOperation == Operation.READ) {
buffer.flip();
lastOperation = Operation.WRITE;
}
int nrWritten;
try {
nrWritten = channel.write(buffer);
} catch (IOException ex) {
try {
channel.close();
} catch (IOException ex1) {
ex.addSuppressed(ex1);
}
LOG.debug("Exception while writing to {}", channel, ex);
writeException = ex;
nrWritten = 0;
}
final boolean hasRemaining = buffer.hasRemaining();
if (!hasRemaining) {
if (isEof) {
try {
channel.socket().shutdownOutput();
} catch (ClosedChannelException closed) {
//channel is closed already
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
return nrWritten;
} else if (readException != null) {
try {
channel.socket().shutdownOutput();
} catch (ClosedChannelException closed) {
//channel is closed already
} catch (IOException ex) {
readException.addSuppressed(ex);
}
LOG.debug("Closed channel {} due to read exception", channel, readException);
return nrWritten;
}
}
if (!isEof && buffer.position() > 0) {
isRoomInBufferHook.run();
}
if (hasRemaining && writeException != null) {
isDataInBufferHook.run();
}
return nrWritten;
}
public static int transfer(final SocketChannel in, final SocketChannel out, final ByteBuffer buffer)
throws IOException {
int read = 0;
//CHECKSTYLE:OFF
while ((in.finishConnect() && ((read = in.read(buffer)) > 0)) || (buffer.position() > 0)) {
//CHECKSTYLE:ON
buffer.flip();
if (!out.finishConnect()) {
break;
}
int writen = out.write(buffer);
buffer.compact();
if (writen <= 0) {
break;
}
}
return read;
}
public synchronized void setIsDataInBufferHook(final Runnable isDataInBufferHook) {
this.isDataInBufferHook = isDataInBufferHook;
}
public synchronized void setIsRoomInBufferHook(final Runnable isRoomInBufferHook) {
this.isRoomInBufferHook = isRoomInBufferHook;
}
public synchronized void setIncomingSniffer(final Sniffer incomingSniffer) {
this.incomingSniffer = incomingSniffer;
}
@Override
public String toString() {
return "TransferBuffer{" + "buffer=" + buffer + ", lastOperation=" + lastOperation
+ ", isEof=" + isEof + ", isDataInBufferHook=" + isDataInBufferHook
+ ", isRoomInBufferHook=" + isRoomInBufferHook + '}';
}
}