1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.recyclable.impl;
33
34 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35 import org.spf4j.recyclable.ObjectBorrowException;
36 import org.spf4j.recyclable.ObjectCreationException;
37 import org.spf4j.recyclable.ObjectDisposeException;
38 import org.spf4j.recyclable.RecyclingSupplier;
39 import org.spf4j.recyclable.ObjectReturnException;
40 import java.lang.management.ManagementFactory;
41 import java.lang.management.ThreadMXBean;
42 import java.util.Map;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.ExecutorService;
45 import java.util.concurrent.Future;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import javax.annotation.concurrent.NotThreadSafe;
49 import org.junit.Assert;
50 import org.junit.Test;
51 import org.slf4j.LoggerFactory;
52 import org.slf4j.Logger;
53 import org.spf4j.base.ExecutionContext;
54 import org.spf4j.base.ExecutionContexts;
55 import org.spf4j.base.Pair;
56 import org.spf4j.base.Throwables;
57 import org.spf4j.concurrent.DefaultExecutor;
58 import org.spf4j.concurrent.DefaultScheduler;
59 import org.spf4j.concurrent.Futures;
60 import org.spf4j.concurrent.LifoThreadPoolExecutorSQP;
61 import org.spf4j.failsafe.AsyncRetryExecutor;
62 import org.spf4j.failsafe.RetryPolicy;
63 import org.spf4j.failsafe.concurrent.FailSafeExecutorImpl;
64 import org.spf4j.log.Level;
65 import org.spf4j.test.log.LogAssert;
66 import org.spf4j.test.log.TestLoggers;
67 import org.spf4j.test.matchers.LogMatchers;
68
69
70
71
72
73 @SuppressFBWarnings({"MDM_THREAD_YIELD", "SIC_INNER_SHOULD_BE_STATIC_ANON"})
74 @NotThreadSafe
75 public final class ObjectPoolBuilderTest {
76
77 private static final Logger LOG = LoggerFactory.getLogger(ObjectPoolBuilderTest.class);
78
79 private volatile boolean isDeadlock = false;
80
81
82
83
84 @Test
85 @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
86 public void testBuild() throws ObjectCreationException, InterruptedException,
87 ObjectBorrowException, TimeoutException, ObjectDisposeException {
88 RecyclingSupplier<ExpensiveTestObject> pool
89 = new RecyclingSupplierBuilder(10, new ExpensiveTestObjectFactory()).build();
90 LOG.debug("pool = {}", pool);
91 ExpensiveTestObject object = pool.get();
92 LOG.debug("pool = {}", pool);
93 pool.recycle(object, null);
94 LOG.debug("pool = {}", pool);
95 ExpensiveTestObject object2 = pool.get();
96 Assert.assertSame(object2, object);
97
98 pool.dispose();
99 }
100
101 @Test(expected = IllegalStateException.class)
102 @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
103 public void testBuildDisposeTimeout()
104 throws ObjectCreationException, ObjectBorrowException,
105 InterruptedException, TimeoutException, ObjectReturnException, ObjectDisposeException {
106 RecyclingSupplier<ExpensiveTestObject> pool
107 = new RecyclingSupplierBuilder(10, new ExpensiveTestObjectFactory()).build();
108 LOG.debug("pool = {}", pool);
109 pool.get();
110 pool.get();
111 LOG.debug("pool = {}", pool);
112 try (ExecutionContext start = ExecutionContexts.start(1, TimeUnit.SECONDS)) {
113 pool.dispose();
114 pool.get();
115 LOG.debug("pool = {}", pool);
116 }
117 }
118
119 @Test(timeout = 10000, expected = TimeoutException.class)
120 public void testGetTimeout()
121 throws ObjectCreationException, ObjectBorrowException,
122 InterruptedException, TimeoutException, ObjectReturnException, ObjectDisposeException, ExecutionException {
123 RecyclingSupplier<ExpensiveTestObject> pool
124 = new RecyclingSupplierBuilder(1, new ExpensiveTestObjectFactory()).build();
125 LOG.debug("pool = {}", pool);
126 Future<ExpensiveTestObject> fut = DefaultExecutor.instance().submit(() -> pool.get());
127 fut.get();
128 LOG.debug("pool = {}", pool);
129 try (ExecutionContext start = ExecutionContexts.start(1, TimeUnit.SECONDS)) {
130 pool.get();
131 LOG.debug("pool = {}", pool);
132 }
133 }
134
135
136 @Test
137 @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
138 public void testBuild2() throws ObjectCreationException, InterruptedException,
139 ObjectBorrowException, ExecutionException, TimeoutException {
140 final RecyclingSupplier<ExpensiveTestObject> pool
141 = new RecyclingSupplierBuilder(10, new ExpensiveTestObjectFactory())
142 .withMaintenance(DefaultScheduler.INSTANCE, 1L, true)
143 .build();
144 LOG.debug("pool = {}", pool);
145 final ExpensiveTestObject object = pool.get();
146 LOG.debug("pool = {}", pool);
147 Future<Void> submit = DefaultExecutor.INSTANCE.submit(() -> {
148 pool.recycle(object, null);
149 return null;
150 });
151 submit.get();
152 Thread.sleep(100);
153 final ExpensiveTestObject object2 = pool.get();
154 LOG.debug("pool = {}", pool);
155 Assert.assertNotNull(object2);
156 }
157
158 @Test
159 @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
160 public void testBuild3() throws ObjectCreationException, InterruptedException,
161 ObjectBorrowException, ExecutionException, TimeoutException {
162 final RecyclingSupplier<ExpensiveTestObject> pool
163 = new RecyclingSupplierBuilder(10, new ExpensiveTestObjectFactory())
164 .withMaintenance(DefaultScheduler.INSTANCE, 1L, true)
165 .build();
166 LOG.debug("pool = {}", pool);
167 final ExpensiveTestObject object = pool.get();
168 LOG.debug("pool = {}", pool);
169 Future<Void> submit = DefaultExecutor.INSTANCE.submit(() -> {
170 pool.recycle(object, null);
171 return null;
172 });
173 submit.get();
174 final ExpensiveTestObject object2 = pool.get();
175 LOG.debug("pool = {}", pool);
176 Assert.assertSame(object, object2);
177 }
178
179 @Test(timeout = 20000)
180 public void testPoolUseNoFailures()
181 throws ObjectCreationException, ObjectBorrowException, InterruptedException,
182 TimeoutException, ObjectReturnException, ObjectDisposeException, ExecutionException {
183 RecyclingSupplier<ExpensiveTestObject> pool
184 = new RecyclingSupplierBuilder(10, new ExpensiveTestObjectFactory(1000000, 1000000, 1, 5)).build();
185 runTest(pool, 0, 10000);
186 pool.dispose();
187 }
188
189 @Test(timeout = 16000)
190 public void testPoolUseNoFailuresStarvation()
191 throws ObjectCreationException, ObjectBorrowException, InterruptedException,
192 TimeoutException, ObjectReturnException, ObjectDisposeException, ExecutionException {
193 RecyclingSupplier<ExpensiveTestObject> pool
194 = new RecyclingSupplierBuilder(1, new ExpensiveTestObjectFactory(1000000, 1000000, 1, 5)).build();
195 runTest(pool, 0, 15000);
196 pool.dispose();
197 }
198
199 @Test(timeout = 20000)
200 public void testPoolUse()
201 throws ObjectCreationException, ObjectBorrowException, InterruptedException,
202 TimeoutException, ObjectReturnException, ObjectDisposeException, ExecutionException {
203 final RecyclingSupplier<ExpensiveTestObject> pool
204 = new RecyclingSupplierBuilder(10, new ExpensiveTestObjectFactory()).build();
205
206 runTest(pool, 0, 10000);
207 try {
208 ExpensiveTestObject.setFailAll(true);
209 LogAssert expect = TestLoggers.sys().expect("", Level.WARN, LogMatchers.hasFormat("Cannot dispose object {}"));
210 pool.dispose();
211 expect.assertObservation();
212 } finally {
213 ExpensiveTestObject.setFailAll(false);
214 }
215 }
216
217 @Test(timeout = 20000)
218 public void testPoolUseWithMaintenance()
219 throws ObjectCreationException, ObjectBorrowException, InterruptedException,
220 TimeoutException, ObjectReturnException, ObjectDisposeException, ExecutionException {
221 final RecyclingSupplier<ExpensiveTestObject> pool =
222 new RecyclingSupplierBuilder<>(10, new ExpensiveTestObjectFactory())
223 .withMaintenance(DefaultScheduler.INSTANCE, 10, true).build();
224 runTest(pool, 5, 20000);
225 try {
226 pool.dispose();
227 } catch (ObjectDisposeException ex) {
228 Throwables.writeTo(ex, System.err, Throwables.PackageDetail.SHORT);
229 }
230
231 }
232
233 private Thread startDeadlockMonitor(final RecyclingSupplier<ExpensiveTestObject> pool,
234 final long deadlockTimeout) {
235 isDeadlock = false;
236 Thread monitor = new Thread(() -> {
237 try {
238 Thread.sleep(deadlockTimeout);
239 ThreadMXBean threadMX = ManagementFactory.getThreadMXBean();
240 LOG.debug("Thread Info: {}", threadMX.dumpAllThreads(true, true));
241 LOG.debug("Pool = {}", pool);
242 isDeadlock = true;
243 } catch (InterruptedException ex) {
244
245 return;
246 }
247 });
248 monitor.start();
249 return monitor;
250 }
251
252 @SuppressFBWarnings("MDM_THREAD_YIELD")
253 private void runTest(final RecyclingSupplier<ExpensiveTestObject> pool,
254 final long sleepBetweenSubmit, final long deadlockTimeout) throws InterruptedException {
255 Thread monitor = startDeadlockMonitor(pool, deadlockTimeout);
256 ExecutorService execService = new LifoThreadPoolExecutorSQP("test", 10, 10,
257 5000, 1024, true);
258 FailSafeExecutorImpl exec = new FailSafeExecutorImpl(execService);
259 AsyncRetryExecutor policy = RetryPolicy.newBuilder()
260 .withDefaultThrowableRetryPredicate().buildAsync(exec);
261 int nrTests = 1000;
262 Future<Integer>[] futures = new Future[nrTests];
263 for (int i = 0; i < nrTests; i++) {
264 futures[i] = policy.submit(new TestCallable(pool, i));
265 Thread.sleep(sleepBetweenSubmit);
266 }
267 Pair<Map<Future, Object>, Exception> all = Futures.getAll(10000, futures);
268 Exception ex = all.getSecond();
269 if (ex != null) {
270 throw new RuntimeException(ex);
271 }
272 LOG.debug("Done({})", futures.length);
273 monitor.interrupt();
274 monitor.join();
275 Thread.sleep(100);
276 if (isDeadlock) {
277 Assert.fail("deadlock detected");
278 }
279 exec.close();
280 }
281
282 }