JdbcHeartBeat.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.concurrent.jdbc;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.base.Iterables;
import org.spf4j.base.TimeSource;
import org.spf4j.base.Timing;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.concurrent.DefaultScheduler;
import org.spf4j.jdbc.JdbcTemplate;
import org.spf4j.jmx.JmxExport;
import org.spf4j.jmx.Registry;

/**
 * A class that does "heartbeats" (at a arbitrary inteval) to a database table.
 * This is to detect the death of a process.
 * The process is considered dead when: currentTime - lastheartbeat > beatInterval * 2
 * When this class mechanism detects that it cannot perform the heartbeats it throws a Error.
 * The sensible this for the process is to go down (and restart if it is a daemon).
 * This is typically done by registering a default uncaught exception handler with:
 * Thread.setDefaultUncaughtExceptionHandler
 *
 *
 * @author zoly
 */
@SuppressFBWarnings(value = {"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING",
  "PMB_POSSIBLE_MEMORY_BLOAT", "SQL_INJECTION_JDBC"}, justification = "The db object names are configurable,"
        + "we for know allow heartbeats to multiple data sources, should be one mostly")
@ParametersAreNonnullByDefault
@ThreadSafe
public final class JdbcHeartBeat implements AutoCloseable {

  private static final Logger LOG = LoggerFactory.getLogger(JdbcHeartBeat.class);

  private static final Map<DataSource, JdbcHeartBeat> HEARTBEATS = new IdentityHashMap<>();

  private static final int HEARTBEAT_INTERVAL_MILLIS =
          Integer.getInteger("spf4j.jdbc.heartBeats.defaultIntervalMillis", 10000);

  @GuardedBy("HEARTBEATS")
  private static boolean isShuttingdown = false;

  private final List<LifecycleHook> lifecycleHooks;

  private final JdbcTemplate jdbc;

  private final String insertHeartbeatSql;

  private final String updateHeartbeatSql;

  private final String selectLastRunSql;

  private final int jdbcTimeoutSeconds;

  private final long intervalNanos;

  private final long intervalMillis;

  private final HeartBeatTableDesc hbTableDesc;

  private final String deleteSql;

  private final String deleteHeartBeatSql;

  private volatile long lastRunNanos;

  private boolean isClosed;

  private ListenableScheduledFuture<?> scheduledHearbeat;

  private final long beatDurationNanos;

  private ScheduledHeartBeat heartbeatRunnable;

  private final double missedHBRatio;

  private final long maxMissedNanos;

  private final long tryBeatThresholdNanos;

  @Override
  @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
  public void close() throws SQLException {
    boolean weClosed = false;
    synchronized (jdbc) {
      if (!isClosed) {
        weClosed = true;
        isClosed = true;
      }
    }
    if (weClosed) {
        unregisterJmx();
        ScheduledFuture<?> running = scheduledHearbeat;
        if (running != null) {
          scheduledHearbeat.cancel(true);
        }
        removeHeartBeatRow(jdbcTimeoutSeconds);
        for (LifecycleHook hook : lifecycleHooks) {
          hook.onClose();
        }
    }
  }

  public interface LifecycleHook {

    void onError(Error error);

    void onClose() throws SQLException;

  }

  private JdbcHeartBeat(final DataSource dataSource, final HeartBeatTableDesc hbTableDesc, final long intervalMillis,
          final int jdbcTimeoutSeconds, final double missedHBRatio) throws InterruptedException, SQLException {
    if (intervalMillis < 1000) {
      throw new IllegalArgumentException("The heartbeat interval should be at least 1s and not "
              + intervalMillis + " ms");
    }
    this.missedHBRatio = missedHBRatio;
    this.jdbc = new JdbcTemplate(dataSource);
    this.jdbcTimeoutSeconds = jdbcTimeoutSeconds;
    this.intervalMillis = intervalMillis;
    this.intervalNanos = TimeUnit.MILLISECONDS.toNanos(intervalMillis);
    this.tryBeatThresholdNanos = intervalNanos / 2;
    this.maxMissedNanos = (long) ((double) intervalNanos * (1 + missedHBRatio));
    this.hbTableDesc = hbTableDesc;
    this.isClosed = false;
    String hbTableName = hbTableDesc.getTableName();
    String lastHeartbeatColumn = hbTableDesc.getLastHeartbeatColumn();
    String currentTimeMillisFunc = hbTableDesc.getDbType().getCurrTSSqlFn();
    String intervalColumn = hbTableDesc.getIntervalColumn();
    String ownerColumn = hbTableDesc.getOwnerColumn();
    this.insertHeartbeatSql = "insert into " + hbTableName + " (" + ownerColumn + ',' + intervalColumn + ','
                + lastHeartbeatColumn + ") VALUES (?, ?, " + currentTimeMillisFunc + ")";
    this.updateHeartbeatSql = "UPDATE " + hbTableName + " SET " + lastHeartbeatColumn + " = " + currentTimeMillisFunc
            + " WHERE " + ownerColumn + " = ? AND " + lastHeartbeatColumn + " + " + intervalColumn
            + " * 2 > " + currentTimeMillisFunc;
    this.deleteHeartBeatSql = "DELETE FROM " + hbTableName
            + " WHERE " + ownerColumn + " = ?";
    this.deleteSql = "DELETE FROM " + hbTableName + " WHERE " + lastHeartbeatColumn + " + " + intervalColumn
            + " * 2 < " + currentTimeMillisFunc;
    this.selectLastRunSql = "select " + lastHeartbeatColumn + " FROM " + hbTableName
            + " where " + ownerColumn + " = ?";
    this.lifecycleHooks = new CopyOnWriteArrayList<>();
    long startTimeNanos =  TimeSource.nanoTime();
    createHeartbeatRow();
    long currTime = TimeSource.nanoTime();
    this.lastRunNanos = currTime;
    long duration = currTime - startTimeNanos;
    this.beatDurationNanos = Math.max(duration, TimeUnit.MILLISECONDS.toNanos(10));
  }

  public long getBeatDurationNanos() {
    return beatDurationNanos;
  }

  public void registerJmx() {
    Registry.export(this);
  }

  public void unregisterJmx() {
    Registry.unregister(this);
  }

  public void addLyfecycleHook(final LifecycleHook hook) {
    lifecycleHooks.add(hook);
  }

  public void removeLifecycleHook(final LifecycleHook hook) {
    lifecycleHooks.remove(hook);
  }


  private void createHeartbeatRow() throws InterruptedException, SQLException {
      jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {

        try (PreparedStatement insert = conn.prepareStatement(insertHeartbeatSql)) {
          insert.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
          insert.setLong(2, this.intervalMillis);
          insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
          insert.executeUpdate();
        }
        return null;
      }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
      LOG.debug("Start Heart Beat for {}", org.spf4j.base.Runtime.PROCESS_ID);
  }

  @JmxExport(description = "Remove all dead hearbeat rows")
  public int removeDeadHeartBeatRows(@JmxExport("timeoutSeconds") final long timeoutSeconds)
          throws SQLException, InterruptedException {
    return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
      return JdbcHeartBeat.this.removeDeadHeartBeatRows(conn, deadlineNanos);
    }, timeoutSeconds, TimeUnit.SECONDS);
  }

  @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
  int removeDeadHeartBeatRows(final Connection conn, final long deadlineNanos) throws SQLException {
    try (PreparedStatement stmt = conn.prepareStatement(deleteSql)) {
      stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
      return stmt.executeUpdate();
    }
  }

  private void removeHeartBeatRow(final int timeoutSeconds)
          throws SQLException {
    jdbc.transactOnConnectionNonInterrupt((final Connection conn, final long deadlineNanos) -> {
      try (PreparedStatement stmt = conn.prepareStatement(deleteHeartBeatSql)) {
        stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
        stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
        int nrDeleted = stmt.executeUpdate();
        if (nrDeleted != 1) {
          throw new IllegalStateException("Heartbeat rows deleted: " + nrDeleted
                  + " for " + org.spf4j.base.Runtime.PROCESS_ID);
        }
      }
      return null;
    }, timeoutSeconds, TimeUnit.SECONDS);
  }

  @JmxExport(value = "removeDeadHeartBeatRowsAsync", description = "Remove all dead hearbeat rows async")
  public void removeDeadHeartBeatRowsAsyncNoReturn(@JmxExport("timeoutSeconds") final long timeoutSeconds) {
    DefaultExecutor.INSTANCE.execute(new AbstractRunnable(true) {
      @Override
      public void doRun() throws SQLException, InterruptedException {
        removeDeadHeartBeatRows(timeoutSeconds);
      }
    });
  }

  public Future<Integer> removeDeadHeartBeatRowsAsync(final long timeoutSeconds) {
    return DefaultExecutor.INSTANCE.submit(() -> removeDeadHeartBeatRows(timeoutSeconds));
  }

  private ScheduledHeartBeat getHeartBeatRunnable() {
    if (heartbeatRunnable == null) {
      heartbeatRunnable = new ScheduledHeartBeat();
    }
    return heartbeatRunnable;
  }

  public void scheduleHeartbeat(final ListeningScheduledExecutorService scheduler) {
    synchronized (jdbc) {
      if (isClosed) {
        throw new IllegalStateException("Heartbeater is closed " + this);
      }
      if (scheduledHearbeat == null) {
        long lrn = lastRunNanos;
        long nanosSincelLastHB = TimeSource.nanoTime() - lrn;
        long delayNanos = intervalNanos - nanosSincelLastHB;
        if (delayNanos < (-intervalNanos) * missedHBRatio) {
          throw new HeartBeatError("Missed heartbeat, last one was " + nanosSincelLastHB + " ns ago");
        }
        if (delayNanos < 0) {
          delayNanos = 0;
        }
        ListenableScheduledFuture<?> scheduleFut = scheduler.schedule(
                getHeartBeatRunnable(), delayNanos, TimeUnit.NANOSECONDS);
        scheduledHearbeat = scheduleFut;
        Futures.addCallback(scheduleFut, new FutureCallback() {
          @Override
          public void onSuccess(final Object result) {
            synchronized (jdbc) {
              if (!isClosed) {
                scheduledHearbeat = null;
                scheduleHeartbeat(scheduler);
              }
            }
          }

          @Override
          @SuppressFBWarnings("ITC_INHERITANCE_TYPE_CHECKING")
          public void onFailure(final Throwable t) {
            if (t instanceof Error) {
              throw (Error) t;
            } else if (!(t instanceof CancellationException)) {
              throw new HeartBeatError(t);
            }
          }
        }, DefaultExecutor.INSTANCE);
      }
    }
  }

  @JmxExport
  public void beat() throws SQLException, InterruptedException {
    jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
      beat(conn, deadlineNanos);
      return null;
    }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
    lastRunNanos = TimeSource.nanoTime();
  }

  void beat(final Connection conn, final long deadlineNanos) {
    synchronized (jdbc) {
      if (isClosed) {
        throw new HeartBeatError("Heartbeater is closed " + this);
      }
    }
    try (PreparedStatement stmt = conn.prepareStatement(updateHeartbeatSql)) {
      stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
      stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
      int rowsUpdated = stmt.executeUpdate();
      if (rowsUpdated != 1) {
        throw new IllegalStateException("Broken Heartbeat for "
                + org.spf4j.base.Runtime.PROCESS_ID + "sql : " + updateHeartbeatSql + " rows : " + rowsUpdated);
      }
      LOG.debug("Heart Beat for {}", org.spf4j.base.Runtime.PROCESS_ID);
    } catch (SQLException ex) {
      throw new HeartBeatError(ex);
    }
  }


  boolean tryBeat(final Connection conn, final long currentTimeNanos, final long deadlineNanos) {
    if (currentTimeNanos - lastRunNanos > tryBeatThresholdNanos) {
      beat(conn, deadlineNanos);
      return true;
    } else {
      return false;
    }
  }

  void updateLastRunNanos(final long lastRunTime) {
    lastRunNanos = lastRunTime;
  }


  @JmxExport(description = "The last run time recorded in the DB by this process")
  public long getLastRunDB() throws SQLException, InterruptedException {
    return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
      try (PreparedStatement stmt = conn.prepareStatement(selectLastRunSql)) {
        stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
        stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
        try (ResultSet rs = stmt.executeQuery()) {
          if (rs.next()) {
            long result = rs.getLong(1);
            if (rs.next()) {
              throw new IllegalStateException("Multible beats for same owner " + org.spf4j.base.Runtime.PROCESS_ID);
            }
            return result;
          } else {
            return 0L;
          }
        }
      }
    }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
  }

  @JmxExport(description = "The heartbeat interval in miliseconds")
  public long getIntervalMillis() {
    return intervalMillis;
  }

  @JmxExport(description =  "The TimeSource nanos time  the jdbc heartbeat run last")
  public long getLastRunNanos() {
    return lastRunNanos;
  }

  @JmxExport
  public String getLastRunTimeStampString() {
    return ZonedDateTime.ofInstant(
            Instant.ofEpochMilli(Timing.getCurrentTiming().fromNanoTimeToEpochMillis(lastRunNanos)),
            ZoneId.systemDefault()).toString();
  }

  /**
   * Get a reference to the hearbeat instance.
   * @param dataSource  the datasource the hearbeat goes against.
   * @param hbTableDesc - heartbeat table description.
   * @param hook  a hook to notify when heartbeat fails.
   * @return the heartbeat instance.
   */
  public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
          final HeartBeatTableDesc hbTableDesc,
          @Nullable final LifecycleHook hook) throws InterruptedException, SQLException {
    return getHeartBeatAndSubscribe(dataSource, hbTableDesc,
            hook, HEARTBEAT_INTERVAL_MILLIS, HEARTBEAT_INTERVAL_MILLIS / 1000);
  }

  public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
          final HeartBeatTableDesc hbTableDesc,
          @Nullable final LifecycleHook hook,
          final int heartBeatIntevalMillis, final int jdbcTimeoutSeconds)
          throws InterruptedException, SQLException {
    return getHeartBeatAndSubscribe(dataSource, hbTableDesc, hook, heartBeatIntevalMillis,
            jdbcTimeoutSeconds, DefaultScheduler.listenableInstance());
  }

  public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
          final HeartBeatTableDesc hbTableDesc,
          @Nullable final LifecycleHook hook,
          final int heartBeatIntevalMillis, final int jdbcTimeoutSeconds,
          final ListeningScheduledExecutorService scheduler)
          throws InterruptedException, SQLException {
    JdbcHeartBeat beat;
    synchronized (HEARTBEATS) {
      if (isShuttingdown) {
        throw new IllegalStateException("Process is shutting down, no heartbeats are accepted for " + dataSource);
      }
      beat = HEARTBEATS.get(dataSource);
      if (beat == null) {
        beat = new JdbcHeartBeat(dataSource, hbTableDesc, heartBeatIntevalMillis, jdbcTimeoutSeconds, 0.5);
        beat.registerJmx();
        beat.addLyfecycleHook(new LifecycleHook() {
          @Override
          public void onError(final Error error) {
            // this hook is only to remove the heartbeat from the registry.
          }

          @Override
          public void onClose() {
            synchronized (HEARTBEATS) {
              HEARTBEATS.remove(dataSource);
            }
          }
        });
        final JdbcHeartBeat fbeat = beat;
        org.spf4j.base.Runtime.queueHookAtBeginning(new Runnable() {
          @Override
          public void run() {
            synchronized (HEARTBEATS) {
              isShuttingdown = true;
            }
            try {
              fbeat.close();
            } catch (SQLException | HeartBeatError ex) {
              // logging in shutdownhooks is not reliable.
              org.spf4j.base.Runtime.error("WARN: Could not clean heartbeat record,"
                      + " this error can be ignored since it is a best effort attempt, detail:", ex);
            }
          }
        });
        HEARTBEATS.put(dataSource, beat);
      }
    }
    if (hook != null) {
      beat.addLyfecycleHook(hook);
    }
    beat.scheduleHeartbeat(scheduler);
    return beat;
  }

  public static void stopHeartBeats() {
   synchronized (HEARTBEATS) {
     Exception e = org.spf4j.base.Closeables.closeAll(HEARTBEATS.values());
     if (e != null) {
       throw new RuntimeException(e);
     }
     HEARTBEATS.clear();
   }
  }


  public HeartBeatTableDesc getHbTableDesc() {
    return hbTableDesc;
  }

  @Override
  public String toString() {
    return "JdbcHeartBeat{" + "jdbc=" + jdbc + ", jdbcTimeoutSeconds=" + jdbcTimeoutSeconds + ", intervalMillis="
            + intervalMillis + ", hbTableDesc=" + hbTableDesc + ", lastRunNanos=" + lastRunNanos + '}';
  }

  private class ScheduledHeartBeat implements Runnable {

    @Override
    public void run() {
        long lrn = lastRunNanos;
        long currentTimeNanos = TimeSource.nanoTime();
        // not first beat.
        long nanosSinceLastBeat = currentTimeNanos - lrn;
        if (maxMissedNanos < nanosSinceLastBeat) {
          // Unable to beat at inteval!
          HeartBeatError err = new HeartBeatError("System too busy to provide regular heartbeat, last heartbeat "
                  + nanosSinceLastBeat + " ns ago");

          handleError(err);
        }
        if (nanosSinceLastBeat > tryBeatThresholdNanos) {
          try {
            beat();
          } catch (RuntimeException | SQLException ex) {
            HeartBeatError err = new HeartBeatError("System failed heartbeat", ex);
            handleError(err);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
        }
    }

    public void handleError(final HeartBeatError err) {
      for (LifecycleHook hook : lifecycleHooks) {
        hook.onError(err);
      }
      RuntimeException ex = Iterables.forAll(lifecycleHooks, (final LifecycleHook t) -> {
        try {
          t.onClose();
        } catch (SQLException e) {
          throw new RuntimeException(e);
        }
      });
      if (ex != null) {
        err.addSuppressed(ex);
      }
      lifecycleHooks.clear();
      throw err;
    }
  }

}