SharingObjectPool.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.recyclable.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.ds.UpdateablePriorityQueue;
import org.spf4j.recyclable.ObjectBorrowException;
import org.spf4j.recyclable.ObjectCreationException;
import org.spf4j.recyclable.ObjectDisposeException;
import org.spf4j.recyclable.RecyclingSupplier;

/**
 * a object sharing pool. this pool allows for non exclusive object sharing.
 * will always return the least shared object.
 * TODO: synchronization is too coarse, can be
 * improved.
 *
 * @author zoly
 */
public final class SharingObjectPool<T> implements RecyclingSupplier<T> {

  private static final Logger LOG = LoggerFactory.getLogger(SharingObjectPool.class);

  private static final Comparator<SharedObject<?>> SH_COMP = new Comparator<SharedObject<?>>() {

    @Override
    public int compare(final SharedObject<?> o1, final SharedObject<?> o2) {
      return o1.nrTimesShared - o2.nrTimesShared;
    }

  };

  private final Factory<T> factory;

  private final UpdateablePriorityQueue<SharedObject<T>> pooledObjects;
  private final Map<T, UpdateablePriorityQueue<SharedObject<T>>.ElementRef> o2QueueRefMap;

  private int nrObjects;
  private final int maxSize;
  private boolean closed;
  private final boolean asyncValidate;

  public static final class SharedObject<T> {

    private int nrTimesShared;

    private final T object;

    public SharedObject(final T object) {
      this(object, 0);
    }

    public SharedObject(final T object, final int nrTimeShared) {
      this.object = object;
      this.nrTimesShared = nrTimeShared;
    }

    public T getObject() {
      return object;
    }

    private void inc() {
      nrTimesShared++;
    }

    private void dec() {
      nrTimesShared--;
    }

    public int getNrTimesShared() {
      return nrTimesShared;
    }

    @Override
    public String toString() {
      return "SharedObject{" + "nrTimesShared=" + nrTimesShared + ", object=" + object + '}';
    }

  }

  public SharingObjectPool(final Factory<T> factory, final int coreSize, final int maxSize)
          throws ObjectCreationException {
    this(factory, coreSize, maxSize, false);
  }

  public SharingObjectPool(final Factory<T> factory, final int coreSize, final int maxSize,
          final boolean asyncValidate)
          throws ObjectCreationException {
    if (maxSize <= 0) {
      throw new IllegalArgumentException("max size must be greater than zero and not " + maxSize);
    }
    if (maxSize < coreSize) {
      throw new IllegalArgumentException("max size must be greater than core size  and not "
              + maxSize + " < " + coreSize);
    }
    this.factory = factory;
    this.pooledObjects = new UpdateablePriorityQueue<>(maxSize, SH_COMP);
    this.nrObjects = 0;
    this.closed = false;
    this.asyncValidate = asyncValidate;
    o2QueueRefMap = new IdentityHashMap<>(maxSize);
    for (int i = 0; i < coreSize; i++) {
      createObject(0);
    }
    this.maxSize = maxSize;
  }

  @Override
  @Nullable
  public synchronized T tryGet(final long deadlineNanos) throws ObjectBorrowException, ObjectCreationException {
    if (closed) {
      throw new ObjectBorrowException("Reclycler is closed " + this);
    }
    UpdateablePriorityQueue<SharedObject<T>>.ElementRef peekEntry = pooledObjects.peekEntry();
    if (peekEntry != null) {
      final SharedObject<T> elem = peekEntry.getElem();
      if (elem.getNrTimesShared() == 0) {
        elem.inc();
        peekEntry.elementMutated();
        return elem.getObject();
      } else if (nrObjects < maxSize) {
        return createObject(1);
      } else {
        return elem.getObject();
      }
    } else {
      return createObject(1);
    }
  }

  private synchronized T createObject(final int nrTimesShared) throws ObjectCreationException {
    T obj = factory.create();
    o2QueueRefMap.put(obj, pooledObjects.add(new SharedObject<>(obj, nrTimesShared)));
    nrObjects++;
    return obj;
  }

  @Override
  public void recycle(final T object, final Exception e) {
    if (e != null) {
      if (asyncValidate) {
        DefaultExecutor.INSTANCE.execute(new AbstractRunnable(true) {

          @Override
          public void doRun() {
            validate(object, e);
          }
        });
      } else {
        validate(object, e);
      }
    } else {
      returnToQueue(object);
    }
  }

  @SuppressFBWarnings("REC_CATCH_EXCEPTION")
  private synchronized void validate(final T object, final Exception e) {
    if (o2QueueRefMap.containsKey(object)) {
      // element still in queue
      boolean isValid;
      try {
        isValid = factory.validate(object, e); // validate
      } catch (Exception ex) {
        ex.addSuppressed(e);
        LOG.warn("Validation failed for {}", object, ex);
        isValid = false;
      }
      if (!isValid) { // remove from pool
        UpdateablePriorityQueue.ElementRef qref = o2QueueRefMap.remove(object);
        nrObjects--;
        qref.remove();
      } else {
        returnToQueue(object);
      }
    }
  }

  private synchronized void returnToQueue(final T object) {
    // object is valid
    UpdateablePriorityQueue<SharedObject<T>>.ElementRef ref = o2QueueRefMap.get(object);
    if (ref != null) { // elem can already be removed if it fails validation.
      final SharedObject<T> elem = ref.getElem();
      elem.dec();
      this.notifyAll();
      ref.elementMutated();
    }
  }

  @Override
  public void recycle(final T object) {
    recycle(object, null);
  }

  @Override
  public synchronized boolean tryDispose(final long timeoutMillis)
          throws ObjectDisposeException, InterruptedException {
    if (!closed) {
      long deadline = System.currentTimeMillis() + timeoutMillis;
      closed = true;
      ObjectDisposeException exres = null;
      Iterator<SharedObject<T>> iterator = pooledObjects.iterator();
      while (iterator.hasNext()) {
        SharedObject<T> so = iterator.next();
        try {
          while (so.getNrTimesShared() > 0) {
            long waitFor = deadline - System.currentTimeMillis();
            if (waitFor > 0) {
              this.wait(waitFor);
            } else {
              return false;
            }
          }
          T o = so.getObject();
          o2QueueRefMap.remove(o);
          iterator.remove();
          nrObjects--;
          factory.dispose(o);
        } catch (ObjectDisposeException ex) {
          if (exres == null) {
            exres = ex;
          } else {
            ex.addSuppressed(exres);
            exres = ex;
          }
        }
      }
      if (exres != null) {
        throw exres;
      }
    }
    return true;
  }

  @Override
  public synchronized String toString() {
    return "SharingObjectPool{" + "factory=" + factory + ", pooledObjects=" + pooledObjects + '}';
  }

}