View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.io.tcp.proxy;
33  
34  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35  import java.io.IOException;
36  import java.nio.channels.ClosedChannelException;
37  import java.nio.channels.SelectionKey;
38  import java.nio.channels.Selector;
39  import java.nio.channels.SocketChannel;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.ExecutorService;
42  import javax.annotation.Nullable;
43  import javax.annotation.ParametersAreNonnullByDefault;
44  import org.slf4j.Logger;
45  import org.slf4j.LoggerFactory;
46  import org.spf4j.base.AbstractRunnable;
47  import org.spf4j.ds.UpdateablePriorityQueue;
48  import org.spf4j.io.tcp.SelectorEventHandler;
49  
50  /**
51   *
52   * @author zoly
53   */
54  @SuppressFBWarnings("HES_EXECUTOR_NEVER_SHUTDOWN")
55  @ParametersAreNonnullByDefault
56  public final class ProxyBufferTransferHandler extends SelectorEventHandler {
57  
58      private static final Logger LOG = LoggerFactory.getLogger(ProxyBufferTransferHandler.class);
59  
60      private final SocketChannel channel;
61  
62      private final Selector selector;
63  
64      private final UpdateablePriorityQueue.ElementRef deadlineActionRef;
65  
66      private volatile boolean connected;
67  
68      private final ExecutorService exec;
69  
70      private final Runnable readRun;
71      private final Runnable writeRun;
72  
73      private final SnifferFactory snifferFactory;
74  
75      private final TransferBuffer in;
76  
77      private final TransferBuffer out;
78  
79      private final BlockingQueue<Runnable> tasksToRunBySelector;
80  
81      @SuppressFBWarnings("EI_EXPOSE_REP2")
82      public ProxyBufferTransferHandler(final TransferBuffer in, final TransferBuffer out,
83              @Nullable final SnifferFactory snifferFactory,
84              final SocketChannel channel, final Selector selector, final ExecutorService exec,
85              final BlockingQueue<Runnable> tasksToRunBySelector,
86              final UpdateablePriorityQueue.ElementRef deadlineActionRef) {
87          this.in = in;
88          this.out = out;
89          this.exec = exec;
90          this.channel = channel;
91          this.selector = selector;
92          this.deadlineActionRef = deadlineActionRef;
93          this.connected = channel.isConnected();
94          this.snifferFactory = snifferFactory;
95          this.tasksToRunBySelector = tasksToRunBySelector;
96          readRun = new ReadFromChannel(in, channel);
97          writeRun = new WriteToChannel(out, channel);
98      }
99  
100     @Override
101     public SelectionKey initialInterestRegistration() throws ClosedChannelException {
102         SelectionKey tkey = channel.register(selector, SelectionKey.OP_READ
103                 | SelectionKey.OP_CONNECT, this);
104         final ReadInterest readInterest = new ReadInterest(tkey);
105         final WriteInterest writeInterest = new WriteInterest(tkey);
106         out.setIsDataInBufferHook(new DataAvailableToWriteHook(tasksToRunBySelector, writeInterest, selector));
107         in.setIsRoomInBufferHook(new RoomToReadHook(tasksToRunBySelector, readInterest, selector));
108         return tkey;
109     }
110 
111     @Override
112     public boolean canRunAsync() {
113         return true;
114     }
115 
116     @Override
117     public synchronized void runAsync(final SelectionKey sKey) throws IOException {
118         if (!connected && sKey.isConnectable()) {
119             sKey.interestOps(sKey.interestOps() & (~SelectionKey.OP_CONNECT));
120             connected = channel.finishConnect();
121             if (connected) {
122                 LOG.debug("Connected to {}", channel);
123                 deadlineActionRef.remove();
124                 if (snifferFactory != null) {
125                     in.setIncomingSniffer(snifferFactory.get(channel));
126                 }
127             }
128         }
129         if (connected) {
130             if (sKey.isReadable()) {
131                 exec.execute(readRun);
132                 sKey.interestOps(sKey.interestOps() & (~SelectionKey.OP_READ));
133             }
134             if (sKey.isWritable()) {
135                 exec.execute(writeRun);
136                 sKey.interestOps(sKey.interestOps() & (~SelectionKey.OP_WRITE));
137             }
138         }
139     }
140 
141     @Override
142     public void run(final SelectionKey skey) {
143         throw new UnsupportedOperationException();
144     }
145 
146     private static class ReadInterest implements Runnable {
147 
148         private final SelectionKey tKey;
149 
150         ReadInterest(final SelectionKey key) {
151             tKey = key;
152         }
153 
154         @Override
155         public void run() {
156             tKey.interestOps(tKey.interestOps() | SelectionKey.OP_READ);
157         }
158     }
159 
160     private static class WriteInterest implements Runnable {
161 
162         private final SelectionKey tKey;
163 
164         WriteInterest(final SelectionKey key) {
165             this.tKey = key;
166         }
167 
168         @Override
169         public void run() {
170             tKey.interestOps(tKey.interestOps() | SelectionKey.OP_WRITE);
171         }
172     }
173 
174     private static class DataAvailableToWriteHook extends AbstractRunnable {
175 
176         private final BlockingQueue<Runnable> tasksToRunBySelector;
177         private final WriteInterest writeInterest;
178         private final Selector selector;
179 
180         DataAvailableToWriteHook(final BlockingQueue<Runnable> tasksToRunBySelector,
181                 final WriteInterest writeInterest, final Selector selector) {
182             super(false);
183             this.tasksToRunBySelector = tasksToRunBySelector;
184             this.writeInterest = writeInterest;
185             this.selector = selector;
186         }
187 
188         @Override
189         public void doRun() throws InterruptedException  {
190             tasksToRunBySelector.put(writeInterest);
191             selector.wakeup();
192         }
193     }
194 
195     private static class RoomToReadHook extends AbstractRunnable {
196 
197         private final BlockingQueue<Runnable> tasksToRunBySelector;
198         private final ReadInterest readInterest;
199         private final Selector selector;
200 
201         RoomToReadHook(final BlockingQueue<Runnable> tasksToRunBySelector,
202                 final ReadInterest readInterest, final Selector selector) {
203             super(false);
204             this.tasksToRunBySelector = tasksToRunBySelector;
205             this.readInterest = readInterest;
206             this.selector = selector;
207         }
208 
209         @Override
210         public void doRun() throws InterruptedException {
211             tasksToRunBySelector.put(readInterest);
212             selector.wakeup();
213         }
214     }
215 
216     private static class ReadFromChannel extends AbstractRunnable {
217 
218         private final TransferBuffer in;
219         private final SocketChannel channel;
220 
221         ReadFromChannel(final TransferBuffer in, final SocketChannel channel) {
222             super(true);
223             this.in = in;
224             this.channel = channel;
225         }
226 
227       @Override
228       public void doRun() {
229         int read = in.read(channel);
230         LOG.debug("Read {} bytes from {}", read, channel);
231       }
232     }
233 
234     private static class WriteToChannel extends AbstractRunnable {
235 
236         private final TransferBuffer out;
237         private final SocketChannel channel;
238 
239         WriteToChannel(final TransferBuffer out, final SocketChannel channel) {
240             super(true);
241             this.out = out;
242             this.channel = channel;
243         }
244 
245         @Override
246         public void doRun() {
247                 int written = out.write(channel);
248                 LOG.debug("Written {} bytes to {}", written, channel);
249         }
250     }
251 
252     @Override
253     public String toString() {
254         return "ProxyBufferTransferHandler{" + "channel=" + channel + ", selector=" + selector
255                 + ", connected=" + connected + ", in=" + in + ", out=" + out + '}';
256     }
257 
258 }