1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.io.tcp.proxy;
33
34 import java.io.IOException;
35 import java.io.UncheckedIOException;
36 import java.nio.ByteBuffer;
37 import java.nio.channels.ClosedChannelException;
38 import java.nio.channels.SocketChannel;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42
43
44
45
46 public final class TransferBuffer {
47
48 private static final Logger LOG = LoggerFactory.getLogger(TransferBuffer.class);
49
50 public enum Operation {
51 READ, WRITE
52 };
53
54 private final ByteBuffer buffer;
55
56 private Operation lastOperation;
57
58 private boolean isEof;
59
60 private Runnable isDataInBufferHook;
61
62 private Runnable isRoomInBufferHook;
63
64 private Sniffer incomingSniffer;
65
66 private IOException readException;
67
68 private IOException writeException;
69
70 public TransferBuffer(final int bufferSize) {
71 buffer = ByteBuffer.allocateDirect(bufferSize);
72 lastOperation = Operation.READ;
73 isEof = false;
74 this.isDataInBufferHook = null;
75 this.isRoomInBufferHook = null;
76 this.readException = null;
77 this.writeException = null;
78 }
79
80 public synchronized int read(final SocketChannel channel) {
81 if (lastOperation == Operation.WRITE) {
82 buffer.compact();
83 lastOperation = Operation.READ;
84 }
85 int nrRead;
86 IOException oex = null;
87 try {
88 nrRead = channel.read(buffer);
89 if (incomingSniffer != null && (nrRead != 0)) {
90 nrRead = incomingSniffer.received(buffer, nrRead);
91 }
92 } catch (IOException ex) {
93 oex = ex;
94 if (incomingSniffer == null) {
95 readException = ex;
96 } else {
97 readException = incomingSniffer.received(ex);
98 }
99 if (readException != null) {
100 LOG.debug("Exception while reading from {}", channel, ex);
101 nrRead = -1;
102 } else {
103 nrRead = 0;
104 }
105 try {
106 channel.close();
107 } catch (IOException ex1) {
108 if (readException != null) {
109 readException.addSuppressed(ex1);
110 }
111 }
112 }
113 if (nrRead < 0) {
114 isEof = true;
115 try {
116 channel.socket().shutdownInput();
117 } catch (ClosedChannelException cex) {
118
119 } catch (IOException ex) {
120 throw new UncheckedIOException(ex);
121 }
122 } else if (oex == null && buffer.hasRemaining()) {
123 isRoomInBufferHook.run();
124 }
125 if (buffer.position() > 0 || isEof) {
126 isDataInBufferHook.run();
127 }
128 return nrRead;
129 }
130
131 public synchronized int write(final SocketChannel channel) {
132 if (lastOperation == Operation.READ) {
133 buffer.flip();
134 lastOperation = Operation.WRITE;
135 }
136 int nrWritten;
137 try {
138 nrWritten = channel.write(buffer);
139 } catch (IOException ex) {
140 try {
141 channel.close();
142 } catch (IOException ex1) {
143 ex.addSuppressed(ex1);
144 }
145 LOG.debug("Exception while writing to {}", channel, ex);
146 writeException = ex;
147 nrWritten = 0;
148 }
149 final boolean hasRemaining = buffer.hasRemaining();
150 if (!hasRemaining) {
151 if (isEof) {
152 try {
153 channel.socket().shutdownOutput();
154 } catch (ClosedChannelException closed) {
155
156 } catch (IOException ex) {
157 throw new UncheckedIOException(ex);
158 }
159 return nrWritten;
160 } else if (readException != null) {
161 try {
162 channel.socket().shutdownOutput();
163 } catch (ClosedChannelException closed) {
164
165 } catch (IOException ex) {
166 readException.addSuppressed(ex);
167 }
168 LOG.debug("Closed channel {} due to read exception", channel, readException);
169 return nrWritten;
170 }
171 }
172 if (!isEof && buffer.position() > 0) {
173 isRoomInBufferHook.run();
174 }
175 if (hasRemaining && writeException != null) {
176 isDataInBufferHook.run();
177 }
178 return nrWritten;
179 }
180
181 public static int transfer(final SocketChannel in, final SocketChannel out, final ByteBuffer buffer)
182 throws IOException {
183 int read = 0;
184
185 while ((in.finishConnect() && ((read = in.read(buffer)) > 0)) || (buffer.position() > 0)) {
186
187 buffer.flip();
188 if (!out.finishConnect()) {
189 break;
190 }
191 int writen = out.write(buffer);
192 buffer.compact();
193 if (writen <= 0) {
194 break;
195 }
196 }
197 return read;
198 }
199
200 public synchronized void setIsDataInBufferHook(final Runnable isDataInBufferHook) {
201 this.isDataInBufferHook = isDataInBufferHook;
202 }
203
204 public synchronized void setIsRoomInBufferHook(final Runnable isRoomInBufferHook) {
205 this.isRoomInBufferHook = isRoomInBufferHook;
206 }
207
208 public synchronized void setIncomingSniffer(final Sniffer incomingSniffer) {
209 this.incomingSniffer = incomingSniffer;
210 }
211
212 @Override
213 public String toString() {
214 return "TransferBuffer{" + "buffer=" + buffer + ", lastOperation=" + lastOperation
215 + ", isEof=" + isEof + ", isDataInBufferHook=" + isDataInBufferHook
216 + ", isRoomInBufferHook=" + isRoomInBufferHook + '}';
217 }
218
219 }