SimpleSmartObjectPool.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.recyclable.impl;
import com.google.common.collect.LinkedHashMultimap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import org.spf4j.base.Either;
import org.spf4j.base.Pair;
import org.spf4j.base.Throwables;
import org.spf4j.base.TimeSource;
import org.spf4j.recyclable.ObjectBorower;
import org.spf4j.recyclable.ObjectBorower.Action;
import org.spf4j.recyclable.ObjectCreationException;
import org.spf4j.recyclable.ObjectDisposeException;
import org.spf4j.recyclable.RecyclingSupplier;
import org.spf4j.recyclable.SmartRecyclingSupplier;
/**
* @author zoly
*/
final class SimpleSmartObjectPool<T> implements SmartRecyclingSupplier<T> {
private int maxSize;
private int waitingForReturn;
private final LinkedHashMultimap<ObjectBorower<T>, T> borrowedObjects;
private final List<T> availableObjects;
private final ReentrantLock lock;
private final Condition available;
private final RecyclingSupplier.Factory<T> factory;
private T sample;
SimpleSmartObjectPool(final int initialSize, final int maxSize,
final RecyclingSupplier.Factory<T> factory, final boolean fair)
throws ObjectCreationException {
if (maxSize < 1) {
throw new IllegalArgumentException("Invalid pool size: " + maxSize);
}
this.maxSize = maxSize;
this.factory = factory;
this.lock = new ReentrantLock(fair);
this.available = this.lock.newCondition();
this.borrowedObjects = LinkedHashMultimap.create();
this.availableObjects = new ArrayList<>(initialSize);
for (int i = 0; i < initialSize; i++) {
availableObjects.add(factory.create());
}
waitingForReturn = 0;
this.sample = factory.create();
}
@Override
@SuppressFBWarnings({"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE", "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS"})
@Nullable
public T tryGet(final ObjectBorower borower, final long deadlineNanos) throws InterruptedException,
ObjectCreationException {
lock.lock();
try {
if (maxSize <= 0) {
throw new IllegalStateException("Pool closed, noting available for " + borower);
}
int nrAvailable = availableObjects.size();
// trying to be fair here, if others are already waiting, we will not get one.
if (nrAvailable - waitingForReturn > 0) {
Iterator<T> it = availableObjects.iterator();
T object = it.next();
it.remove();
if (!borrowedObjects.put(borower, object)) {
throw new IllegalStateException("Cannot borrow " + object + ", " + borrowedObjects);
}
return object;
} else if (borrowedObjects.size() + nrAvailable < maxSize) {
T object = factory.create();
if (!borrowedObjects.put(borower, object)) {
throw new IllegalStateException("Cannot borrow " + object + ", " + borrowedObjects);
}
return object;
} else {
// try to reclaim object from borrowers
for (ObjectBorower<T> b : borrowedObjects.keySet()) {
if (borower != b) {
T object = b.tryReturnObjectIfNotInUse();
if (object != null) {
if (!borrowedObjects.remove(b, object)) {
throw new IllegalStateException("Returned Object hasn't been borrowed " + object);
}
if (!borrowedObjects.put(borower, object)) {
throw new IllegalStateException("Cannot borrow " + object + ", " + borrowedObjects);
}
return object;
}
}
}
// evrything in use, try to place a return request
boolean requestMade = false;
while (!requestMade) {
boolean hasValidBorowers = false;
for (ObjectBorower<T> b : borrowedObjects.keySet()) {
if (borower != b) {
hasValidBorowers = true;
Either<ObjectBorower.Action, T> objOrPromise = b.tryRequestReturnObject();
if (objOrPromise.isRight()) {
T obj = objOrPromise.getRight();
if (!borrowedObjects.remove(b, obj)) {
throw new IllegalStateException("Returned Object " + obj
+ " hasn't been borrowed: " + borrowedObjects);
}
if (!borrowedObjects.put(borower, obj)) {
throw new IllegalStateException("Cannot boroow " + obj + ", " + borrowedObjects);
}
return obj;
} else {
requestMade = objOrPromise.getLeft() == Action.REQUEST_MADE;
if (requestMade) {
break;
}
}
}
}
if (!hasValidBorowers) {
throw new IllegalStateException("Borrower asks for more than possible " + borower);
}
if (!requestMade) {
// probably was unable to acquire the locks
do {
available.await(1, TimeUnit.MILLISECONDS);
long nanosToDeadline = deadlineNanos - TimeSource.nanoTime();
if (nanosToDeadline < 0) {
return null;
}
} while (borrowedObjects.isEmpty());
}
}
waitingForReturn++;
while (availableObjects.isEmpty()) {
long waitTime = deadlineNanos - TimeSource.nanoTime();
if (waitTime <= 0) {
return null;
}
if (!available.await(waitTime, TimeUnit.NANOSECONDS)) {
return null;
}
}
waitingForReturn--;
Iterator<T> it = availableObjects.iterator();
T objectT = it.next();
it.remove();
if (!borrowedObjects.put(borower, objectT)) {
throw new IllegalStateException("Cannot borrow " + objectT + ", " + borrowedObjects);
}
return objectT;
}
} finally {
lock.unlock();
}
}
@Override
public void recycle(final T object, final ObjectBorower borower) {
lock.lock();
try {
if (!borrowedObjects.remove(borower, object)) {
// returned somebody else's objOrPromise.
Entry<ObjectBorower<T>, T> foundEntry = null;
for (Entry<ObjectBorower<T>, T> entry : borrowedObjects.entries()) {
final ObjectBorower<T> lb = entry.getKey();
if (lb == borower) {
continue;
}
if (lb.nevermind(entry.getValue())) {
foundEntry = entry;
break;
}
}
if (foundEntry == null) {
throw new IllegalStateException("Object " + object + " has not been borrowed from this pool");
} else {
if (!borrowedObjects.remove(foundEntry.getKey(), foundEntry.getValue())) {
throw new IllegalStateException("Should have removed " + foundEntry);
}
}
}
availableObjects.add(object);
available.signalAll();
} finally {
lock.unlock();
}
}
@Override
public boolean tryDispose(final long timeoutMillis) throws ObjectDisposeException, InterruptedException {
factory.dispose(sample);
long deadlineNanos = TimeSource.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
lock.lock();
try {
maxSize = 0;
List<Pair<ObjectBorower<T>, T>> returnedObjects = new ArrayList<>();
for (Entry<ObjectBorower<T>, Collection<T>> b : borrowedObjects.asMap().entrySet()) {
ObjectBorower<T> borrower = b.getKey();
final int nrObjects = b.getValue().size();
for (int i = 0; i < nrObjects; i++) {
Either<Action, T> object = borrower.tryRequestReturnObject();
if (object.isRight()) {
returnedObjects.add(Pair.of(borrower, object.getRight()));
}
}
}
for (Pair<ObjectBorower<T>, T> objectAndBorrower : returnedObjects) {
T object = objectAndBorrower.getSecond();
if (!borrowedObjects.remove(objectAndBorrower.getFirst(), object)) {
throw new IllegalStateException("Returned Object hasn't been borrowed " + object);
}
availableObjects.add(object);
}
ObjectDisposeException exception = disposeReturnedObjects(null);
while (!borrowedObjects.isEmpty()) {
long waitTimeNanos = deadlineNanos - TimeSource.nanoTime();
if (waitTimeNanos <= 0) {
return false;
}
if (!available.await(waitTimeNanos, TimeUnit.NANOSECONDS)) {
return false;
}
exception = disposeReturnedObjects(exception);
}
if (exception != null) {
throw exception;
}
return true;
} catch (InterruptedException | RuntimeException e) {
throw e;
} finally {
lock.unlock();
}
}
@CheckReturnValue
private ObjectDisposeException disposeReturnedObjects(final ObjectDisposeException exception) {
ObjectDisposeException result = exception;
for (T obj : availableObjects) {
try {
factory.dispose(obj);
} catch (ObjectDisposeException ex) {
if (result != null) {
Throwables.suppressLimited(ex, result);
}
result = ex;
} catch (Exception ex) {
if (result == null) {
result = new ObjectDisposeException(ex);
} else {
ObjectDisposeException nex = new ObjectDisposeException(ex);
Throwables.suppressLimited(nex, result);
result = nex;
}
}
}
availableObjects.clear();
return result;
}
@SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_HAS_CHECKED")
@Override
public boolean scan(final ScanHandler<T> handler) throws Exception {
lock.lock();
Exception resEx = null;
try {
for (ObjectBorower<T> objectBorower : borrowedObjects.keySet()) {
try {
if (!objectBorower.scan(handler)) {
return false;
}
} catch (Exception e) {
if (resEx != null) {
Throwables.suppressLimited(e, resEx);
}
resEx = e;
}
Collection<T> returned = objectBorower.tryReturnObjectsIfNotNeededAnymore();
for (T ro : returned) {
if (!borrowedObjects.remove(objectBorower, ro)) {
throw new IllegalStateException("Object returned hasn't been borrowed" + ro);
}
availableObjects.add(ro);
}
}
for (T object : availableObjects) {
try {
if (!handler.handle(object)) {
return false;
}
} catch (Exception e) {
if (resEx != null) {
Throwables.suppressLimited(e, resEx);
}
resEx = e;
}
}
if (resEx != null) {
throw resEx;
}
return true;
} finally {
lock.unlock();
}
}
@SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
public void requestReturnFromBorrowersIfNotInUse() throws InterruptedException {
lock.lock();
try {
List<Pair<ObjectBorower<T>, T>> returnedObjects = new ArrayList<>();
for (ObjectBorower<T> b : borrowedObjects.keySet()) {
Collection<T> objects = b.tryReturnObjectsIfNotInUse();
for (T object : objects) {
returnedObjects.add(Pair.of(b, object));
}
}
for (Pair<ObjectBorower<T>, T> ro : returnedObjects) {
T object = ro.getSecond();
borrowedObjects.remove(ro.getFirst(), object);
availableObjects.add(object);
}
} finally {
lock.unlock();
}
}
@Override
public String toString() {
lock.lock();
try {
return "SimpleSmartObjectPool{" + "maxSize=" + maxSize + ", borrowedObjects="
+ borrowedObjects + ", availableObjects=" + availableObjects
+ ", factory=" + factory + '}';
} finally {
lock.unlock();
}
}
@Override
public T getSample() {
return sample;
}
}