View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.io.tcp.proxy;
33  
34  import java.io.IOException;
35  import java.io.UncheckedIOException;
36  import java.nio.ByteBuffer;
37  import java.nio.channels.ClosedChannelException;
38  import java.nio.channels.SocketChannel;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   *
44   * @author zoly
45   */
46  public final class TransferBuffer {
47  
48    private static final Logger LOG = LoggerFactory.getLogger(TransferBuffer.class);
49  
50    public enum Operation {
51      READ, WRITE
52    };
53  
54    private final ByteBuffer buffer;
55  
56    private Operation lastOperation;
57  
58    private boolean isEof;
59  
60    private Runnable isDataInBufferHook;
61  
62    private Runnable isRoomInBufferHook;
63  
64    private Sniffer incomingSniffer;
65  
66    private IOException readException;
67  
68    private IOException writeException;
69  
70    public TransferBuffer(final int bufferSize) {
71      buffer = ByteBuffer.allocateDirect(bufferSize);
72      lastOperation = Operation.READ;
73      isEof = false;
74      this.isDataInBufferHook = null;
75      this.isRoomInBufferHook = null;
76      this.readException = null;
77      this.writeException = null;
78    }
79  
80    public synchronized int read(final SocketChannel channel) {
81      if (lastOperation == Operation.WRITE) {
82        buffer.compact();
83        lastOperation = Operation.READ;
84      }
85      int nrRead;
86      IOException oex = null;
87      try {
88        nrRead = channel.read(buffer);
89        if (incomingSniffer != null && (nrRead != 0)) {
90          nrRead = incomingSniffer.received(buffer, nrRead);
91        }
92      } catch (IOException ex) {
93        oex = ex;
94        if (incomingSniffer == null) {
95          readException = ex;
96        } else {
97          readException = incomingSniffer.received(ex);
98        }
99        if (readException != null) {
100         LOG.debug("Exception while reading from {}", channel, ex);
101         nrRead = -1;
102       } else {
103         nrRead = 0; // sniffer filters exception, behave a s no data received.
104       }
105       try {
106         channel.close();
107       } catch (IOException ex1) {
108         if (readException != null) {
109           readException.addSuppressed(ex1);
110         }
111       }
112     }
113     if (nrRead < 0) {
114       isEof = true;
115       try {
116         channel.socket().shutdownInput(); // ? is this really necessary?
117       } catch (ClosedChannelException cex) {
118         // channel is closed.
119       } catch (IOException ex) {
120         throw new UncheckedIOException(ex);
121       }
122     } else if (oex == null && buffer.hasRemaining()) {
123       isRoomInBufferHook.run();
124     }
125     if (buffer.position() > 0 || isEof) {
126       isDataInBufferHook.run();
127     }
128     return nrRead;
129   }
130 
131   public synchronized int write(final SocketChannel channel) {
132     if (lastOperation == Operation.READ) {
133       buffer.flip();
134       lastOperation = Operation.WRITE;
135     }
136     int nrWritten;
137     try {
138       nrWritten = channel.write(buffer);
139     } catch (IOException ex) {
140       try {
141         channel.close();
142       } catch (IOException ex1) {
143         ex.addSuppressed(ex1);
144       }
145       LOG.debug("Exception while writing to {}", channel, ex);
146       writeException = ex;
147       nrWritten = 0;
148     }
149     final boolean hasRemaining = buffer.hasRemaining();
150     if (!hasRemaining) {
151       if (isEof) {
152         try {
153           channel.socket().shutdownOutput();
154         } catch (ClosedChannelException closed) {
155           //channel is closed already
156         } catch (IOException ex) {
157           throw new UncheckedIOException(ex);
158         }
159         return nrWritten;
160       } else if (readException != null) {
161         try {
162           channel.socket().shutdownOutput();
163         } catch (ClosedChannelException closed) {
164           //channel is closed already
165         }  catch (IOException ex) {
166           readException.addSuppressed(ex);
167         }
168         LOG.debug("Closed channel {} due to read exception", channel, readException);
169         return nrWritten;
170       }
171     }
172     if (!isEof && buffer.position() > 0) {
173       isRoomInBufferHook.run();
174     }
175     if (hasRemaining && writeException != null) {
176       isDataInBufferHook.run();
177     }
178     return nrWritten;
179   }
180 
181   public static int transfer(final SocketChannel in, final SocketChannel out, final ByteBuffer buffer)
182           throws IOException {
183     int read = 0;
184     //CHECKSTYLE:OFF
185     while ((in.finishConnect() && ((read = in.read(buffer)) > 0)) || (buffer.position() > 0)) {
186       //CHECKSTYLE:ON
187       buffer.flip();
188       if (!out.finishConnect()) {
189         break;
190       }
191       int writen = out.write(buffer);
192       buffer.compact();
193       if (writen <= 0) {
194         break;
195       }
196     }
197     return read;
198   }
199 
200   public synchronized void setIsDataInBufferHook(final Runnable isDataInBufferHook) {
201     this.isDataInBufferHook = isDataInBufferHook;
202   }
203 
204   public synchronized void setIsRoomInBufferHook(final Runnable isRoomInBufferHook) {
205     this.isRoomInBufferHook = isRoomInBufferHook;
206   }
207 
208   public synchronized void setIncomingSniffer(final Sniffer incomingSniffer) {
209     this.incomingSniffer = incomingSniffer;
210   }
211 
212   @Override
213   public String toString() {
214     return "TransferBuffer{" + "buffer=" + buffer + ", lastOperation=" + lastOperation
215             + ", isEof=" + isEof + ", isDataInBufferHook=" + isDataInBufferHook
216             + ", isRoomInBufferHook=" + isRoomInBufferHook + '}';
217   }
218 
219 }