1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.io.tcp.proxy;
33
34 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35 import java.io.IOException;
36 import java.nio.channels.ClosedChannelException;
37 import java.nio.channels.SelectionKey;
38 import java.nio.channels.Selector;
39 import java.nio.channels.SocketChannel;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ExecutorService;
42 import javax.annotation.Nullable;
43 import javax.annotation.ParametersAreNonnullByDefault;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.spf4j.base.AbstractRunnable;
47 import org.spf4j.ds.UpdateablePriorityQueue;
48 import org.spf4j.io.tcp.SelectorEventHandler;
49
50
51
52
53
54 @SuppressFBWarnings("HES_EXECUTOR_NEVER_SHUTDOWN")
55 @ParametersAreNonnullByDefault
56 public final class ProxyBufferTransferHandler extends SelectorEventHandler {
57
58 private static final Logger LOG = LoggerFactory.getLogger(ProxyBufferTransferHandler.class);
59
60 private final SocketChannel channel;
61
62 private final Selector selector;
63
64 private final UpdateablePriorityQueue.ElementRef deadlineActionRef;
65
66 private volatile boolean connected;
67
68 private final ExecutorService exec;
69
70 private final Runnable readRun;
71 private final Runnable writeRun;
72
73 private final SnifferFactory snifferFactory;
74
75 private final TransferBuffer in;
76
77 private final TransferBuffer out;
78
79 private final BlockingQueue<Runnable> tasksToRunBySelector;
80
81 @SuppressFBWarnings("EI_EXPOSE_REP2")
82 public ProxyBufferTransferHandler(final TransferBuffer in, final TransferBuffer out,
83 @Nullable final SnifferFactory snifferFactory,
84 final SocketChannel channel, final Selector selector, final ExecutorService exec,
85 final BlockingQueue<Runnable> tasksToRunBySelector,
86 final UpdateablePriorityQueue.ElementRef deadlineActionRef) {
87 this.in = in;
88 this.out = out;
89 this.exec = exec;
90 this.channel = channel;
91 this.selector = selector;
92 this.deadlineActionRef = deadlineActionRef;
93 this.connected = channel.isConnected();
94 this.snifferFactory = snifferFactory;
95 this.tasksToRunBySelector = tasksToRunBySelector;
96 readRun = new ReadFromChannel(in, channel);
97 writeRun = new WriteToChannel(out, channel);
98 }
99
100 @Override
101 public SelectionKey initialInterestRegistration() throws ClosedChannelException {
102 SelectionKey tkey = channel.register(selector, SelectionKey.OP_READ
103 | SelectionKey.OP_CONNECT, this);
104 final ReadInterest readInterest = new ReadInterest(tkey);
105 final WriteInterest writeInterest = new WriteInterest(tkey);
106 out.setIsDataInBufferHook(new DataAvailableToWriteHook(tasksToRunBySelector, writeInterest, selector));
107 in.setIsRoomInBufferHook(new RoomToReadHook(tasksToRunBySelector, readInterest, selector));
108 return tkey;
109 }
110
111 @Override
112 public boolean canRunAsync() {
113 return true;
114 }
115
116 @Override
117 public synchronized void runAsync(final SelectionKey sKey) throws IOException {
118 if (!connected && sKey.isConnectable()) {
119 sKey.interestOps(sKey.interestOps() & (~SelectionKey.OP_CONNECT));
120 connected = channel.finishConnect();
121 if (connected) {
122 LOG.debug("Connected to {}", channel);
123 deadlineActionRef.remove();
124 if (snifferFactory != null) {
125 in.setIncomingSniffer(snifferFactory.get(channel));
126 }
127 }
128 }
129 if (connected) {
130 if (sKey.isReadable()) {
131 exec.execute(readRun);
132 sKey.interestOps(sKey.interestOps() & (~SelectionKey.OP_READ));
133 }
134 if (sKey.isWritable()) {
135 exec.execute(writeRun);
136 sKey.interestOps(sKey.interestOps() & (~SelectionKey.OP_WRITE));
137 }
138 }
139 }
140
141 @Override
142 public void run(final SelectionKey skey) {
143 throw new UnsupportedOperationException();
144 }
145
146 private static class ReadInterest implements Runnable {
147
148 private final SelectionKey tKey;
149
150 ReadInterest(final SelectionKey key) {
151 tKey = key;
152 }
153
154 @Override
155 public void run() {
156 tKey.interestOps(tKey.interestOps() | SelectionKey.OP_READ);
157 }
158 }
159
160 private static class WriteInterest implements Runnable {
161
162 private final SelectionKey tKey;
163
164 WriteInterest(final SelectionKey key) {
165 this.tKey = key;
166 }
167
168 @Override
169 public void run() {
170 tKey.interestOps(tKey.interestOps() | SelectionKey.OP_WRITE);
171 }
172 }
173
174 private static class DataAvailableToWriteHook extends AbstractRunnable {
175
176 private final BlockingQueue<Runnable> tasksToRunBySelector;
177 private final WriteInterest writeInterest;
178 private final Selector selector;
179
180 DataAvailableToWriteHook(final BlockingQueue<Runnable> tasksToRunBySelector,
181 final WriteInterest writeInterest, final Selector selector) {
182 super(false);
183 this.tasksToRunBySelector = tasksToRunBySelector;
184 this.writeInterest = writeInterest;
185 this.selector = selector;
186 }
187
188 @Override
189 public void doRun() throws InterruptedException {
190 tasksToRunBySelector.put(writeInterest);
191 selector.wakeup();
192 }
193 }
194
195 private static class RoomToReadHook extends AbstractRunnable {
196
197 private final BlockingQueue<Runnable> tasksToRunBySelector;
198 private final ReadInterest readInterest;
199 private final Selector selector;
200
201 RoomToReadHook(final BlockingQueue<Runnable> tasksToRunBySelector,
202 final ReadInterest readInterest, final Selector selector) {
203 super(false);
204 this.tasksToRunBySelector = tasksToRunBySelector;
205 this.readInterest = readInterest;
206 this.selector = selector;
207 }
208
209 @Override
210 public void doRun() throws InterruptedException {
211 tasksToRunBySelector.put(readInterest);
212 selector.wakeup();
213 }
214 }
215
216 private static class ReadFromChannel extends AbstractRunnable {
217
218 private final TransferBuffer in;
219 private final SocketChannel channel;
220
221 ReadFromChannel(final TransferBuffer in, final SocketChannel channel) {
222 super(true);
223 this.in = in;
224 this.channel = channel;
225 }
226
227 @Override
228 public void doRun() {
229 int read = in.read(channel);
230 LOG.debug("Read {} bytes from {}", read, channel);
231 }
232 }
233
234 private static class WriteToChannel extends AbstractRunnable {
235
236 private final TransferBuffer out;
237 private final SocketChannel channel;
238
239 WriteToChannel(final TransferBuffer out, final SocketChannel channel) {
240 super(true);
241 this.out = out;
242 this.channel = channel;
243 }
244
245 @Override
246 public void doRun() {
247 int written = out.write(channel);
248 LOG.debug("Written {} bytes to {}", written, channel);
249 }
250 }
251
252 @Override
253 public String toString() {
254 return "ProxyBufferTransferHandler{" + "channel=" + channel + ", selector=" + selector
255 + ", connected=" + connected + ", in=" + in + ", out=" + out + '}';
256 }
257
258 }