Callables.java

/*
 * Copyright (c) 2001, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 */

package org.spf4j.base;

import com.google.common.base.Predicate;
import java.util.concurrent.Callable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Utility class for executing stuff with retry logic.
 * @author zoly
 */
@ParametersAreNonnullByDefault
public final class Callables {

    private Callables() { }

    private static final Logger LOG = LoggerFactory.getLogger(Callables.class);

    public static final Predicate<Object> NORETRY_FOR_RESULT = new Predicate<Object>() {
        @Override
        public boolean apply(final Object input) {
            return false;
        }
    };

    public static final Predicate<?> RETRY_FOR_NULL_RESULT = new Predicate<Object>() {
        @Override
        public boolean apply(final Object input) {
            return (input != null);
        }
    };

   public static final Predicate<Exception> RETRY_FOR_ANY_EXCEPTION = new Predicate<Exception>() {
        @Override
        public boolean apply(final Exception input) {
            LOG.debug("Exception encountered, retrying...", input);
            return true;
        }
    };

    public static final class RetryPause implements Callable<Boolean> {

        private final int nrImmediateRetries;
        private final int nrTotalRetries;
        private final int waitMillis;
        private int count;

        public RetryPause(final int nrImmediateRetries, final int nrTotalRetries, final int retryWaitMillis)
        {
            assert (nrImmediateRetries < nrTotalRetries);
            this.nrImmediateRetries = nrImmediateRetries;
            this.nrTotalRetries = nrTotalRetries;
            this.waitMillis = retryWaitMillis;
        }
           
        @Override
        public Boolean call() throws Exception {
           if (count > nrTotalRetries) {
               return false;
           }
           if (count > nrImmediateRetries) {
               Thread.sleep(waitMillis);
           }
           count++;
           return true;
        }
        
    }
    
    /**
     * Probably the more used retry use case.
     * specifying a number of immediate retries
     * and delayed retries.
     * 
     * @param <T>
     * @param what
     * @param nrImmediateRetries
     * @param nrTotalRetries
     * @param retryWaitMillis
     * @return
     * @throws InterruptedException
     */
    
    public static <T> T executeWithRetry(final Callable<T> what, final int nrImmediateRetries,
            final int nrTotalRetries, final int retryWaitMillis)
            throws InterruptedException {
       return executeWithRetry(what, new RetryPause(nrImmediateRetries, nrTotalRetries, retryWaitMillis),
               NORETRY_FOR_RESULT, RETRY_FOR_ANY_EXCEPTION);
    }
    
    public static <T> T executeWithRetry(final Callable<T> what, final int nrImmediateRetries,
            final int nrTotalRetries, final int retryWaitMillis , final Predicate<Exception> retryOnException)
            throws InterruptedException {
       return executeWithRetry(what, new RetryPause(nrImmediateRetries, nrTotalRetries, retryWaitMillis),
               NORETRY_FOR_RESULT, retryOnException);
    }
     
    /**
     * Naive implementation of execution with retry logic.
     * a callable will be executed and retry attempted in current thread if the result and exception predicates.
     * before retry, a callable can be executed that can abort the retry
     * and finish the function with the previous result.
     * 
     * @param what
     * @param doBeforeRetry
     * @param retryOnReturnVal
     * @param retryOnException
     * @return
     * @throws InterruptedException
     */
    public static <T> T executeWithRetry(final Callable<T> what, final Callable<Boolean> doBeforeRetry,
            final Predicate<? super T> retryOnReturnVal, final Predicate<Exception> retryOnException)
            throws InterruptedException {
        T result = null;
        Exception ex = null;
        try {
            result = what.call();
        } catch (InterruptedException ex1) {
               throw ex1;
        } catch (Exception e) {
            ex = e;
        }
        Exception prevEx = ex;
        while ((ex != null && retryOnException.apply(ex)) || retryOnReturnVal.apply(result)) {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            try {
                boolean retry = doBeforeRetry.call();
                if (!retry) {
                    break;
                }
            } catch (InterruptedException ex1) {
               throw ex1;
            }  catch (Exception ex1) {
                throw new RuntimeException(ex1);
            }
            ex = null;
            result = null;
            try {
                result = what.call();
            } catch (InterruptedException ex1) {
               throw ex1;
            } catch (Exception e) {
                if (prevEx != null) {
                    e = Throwables.suppress(e, prevEx);
                    prevEx = e;
                }
                ex = e;
            }
        }
        if (ex != null) {
            throw new RuntimeException(ex);
        }
        return result;
    }
    
}