View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.concurrent.jdbc;
33  
34  import com.google.common.util.concurrent.FutureCallback;
35  import com.google.common.util.concurrent.Futures;
36  import com.google.common.util.concurrent.ListenableScheduledFuture;
37  import com.google.common.util.concurrent.ListeningScheduledExecutorService;
38  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
39  import java.sql.Connection;
40  import java.sql.PreparedStatement;
41  import java.sql.ResultSet;
42  import java.sql.SQLException;
43  import java.time.Instant;
44  import java.time.ZoneId;
45  import java.time.ZonedDateTime;
46  import java.util.IdentityHashMap;
47  import java.util.List;
48  import java.util.Map;
49  import java.util.concurrent.CancellationException;
50  import java.util.concurrent.CopyOnWriteArrayList;
51  import java.util.concurrent.Future;
52  import java.util.concurrent.ScheduledFuture;
53  import java.util.concurrent.TimeUnit;
54  import javax.annotation.Nullable;
55  import javax.annotation.ParametersAreNonnullByDefault;
56  import javax.annotation.concurrent.GuardedBy;
57  import javax.annotation.concurrent.ThreadSafe;
58  import javax.sql.DataSource;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  import org.spf4j.base.AbstractRunnable;
62  import org.spf4j.base.Iterables;
63  import org.spf4j.base.TimeSource;
64  import org.spf4j.base.Timing;
65  import org.spf4j.concurrent.DefaultExecutor;
66  import org.spf4j.concurrent.DefaultScheduler;
67  import org.spf4j.jdbc.JdbcTemplate;
68  import org.spf4j.jmx.JmxExport;
69  import org.spf4j.jmx.Registry;
70  
71  /**
72   * A class that does "heartbeats" (at a arbitrary inteval) to a database table.
73   * This is to detect the death of a process.
74   * The process is considered dead when: currentTime - lastheartbeat > beatInterval * 2
75   * When this class mechanism detects that it cannot perform the heartbeats it throws a Error.
76   * The sensible this for the process is to go down (and restart if it is a daemon).
77   * This is typically done by registering a default uncaught exception handler with:
78   * Thread.setDefaultUncaughtExceptionHandler
79   *
80   *
81   * @author zoly
82   */
83  @SuppressFBWarnings(value = {"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING",
84    "PMB_POSSIBLE_MEMORY_BLOAT", "SQL_INJECTION_JDBC"}, justification = "The db object names are configurable,"
85          + "we for know allow heartbeats to multiple data sources, should be one mostly")
86  @ParametersAreNonnullByDefault
87  @ThreadSafe
88  public final class JdbcHeartBeat implements AutoCloseable {
89  
90    private static final Logger LOG = LoggerFactory.getLogger(JdbcHeartBeat.class);
91  
92    private static final Map<DataSource, JdbcHeartBeat> HEARTBEATS = new IdentityHashMap<>();
93  
94    private static final int HEARTBEAT_INTERVAL_MILLIS =
95            Integer.getInteger("spf4j.jdbc.heartBeats.defaultIntervalMillis", 10000);
96  
97    @GuardedBy("HEARTBEATS")
98    private static boolean isShuttingdown = false;
99  
100   private final List<LifecycleHook> lifecycleHooks;
101 
102   private final JdbcTemplate jdbc;
103 
104   private final String insertHeartbeatSql;
105 
106   private final String updateHeartbeatSql;
107 
108   private final String selectLastRunSql;
109 
110   private final int jdbcTimeoutSeconds;
111 
112   private final long intervalNanos;
113 
114   private final long intervalMillis;
115 
116   private final HeartBeatTableDesc hbTableDesc;
117 
118   private final String deleteSql;
119 
120   private final String deleteHeartBeatSql;
121 
122   private volatile long lastRunNanos;
123 
124   private boolean isClosed;
125 
126   private ListenableScheduledFuture<?> scheduledHearbeat;
127 
128   private final long beatDurationNanos;
129 
130   private ScheduledHeartBeat heartbeatRunnable;
131 
132   private final double missedHBRatio;
133 
134   private final long maxMissedNanos;
135 
136   private final long tryBeatThresholdNanos;
137 
138   @Override
139   @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
140   public void close() throws SQLException {
141     boolean weClosed = false;
142     synchronized (jdbc) {
143       if (!isClosed) {
144         weClosed = true;
145         isClosed = true;
146       }
147     }
148     if (weClosed) {
149         unregisterJmx();
150         ScheduledFuture<?> running = scheduledHearbeat;
151         if (running != null) {
152           scheduledHearbeat.cancel(true);
153         }
154         removeHeartBeatRow(jdbcTimeoutSeconds);
155         for (LifecycleHook hook : lifecycleHooks) {
156           hook.onClose();
157         }
158     }
159   }
160 
161   public interface LifecycleHook {
162 
163     void onError(Error error);
164 
165     void onClose() throws SQLException;
166 
167   }
168 
169   private JdbcHeartBeat(final DataSource dataSource, final HeartBeatTableDesc hbTableDesc, final long intervalMillis,
170           final int jdbcTimeoutSeconds, final double missedHBRatio) throws InterruptedException, SQLException {
171     if (intervalMillis < 1000) {
172       throw new IllegalArgumentException("The heartbeat interval should be at least 1s and not "
173               + intervalMillis + " ms");
174     }
175     this.missedHBRatio = missedHBRatio;
176     this.jdbc = new JdbcTemplate(dataSource);
177     this.jdbcTimeoutSeconds = jdbcTimeoutSeconds;
178     this.intervalMillis = intervalMillis;
179     this.intervalNanos = TimeUnit.MILLISECONDS.toNanos(intervalMillis);
180     this.tryBeatThresholdNanos = intervalNanos / 2;
181     this.maxMissedNanos = (long) ((double) intervalNanos * (1 + missedHBRatio));
182     this.hbTableDesc = hbTableDesc;
183     this.isClosed = false;
184     String hbTableName = hbTableDesc.getTableName();
185     String lastHeartbeatColumn = hbTableDesc.getLastHeartbeatColumn();
186     String currentTimeMillisFunc = hbTableDesc.getDbType().getCurrTSSqlFn();
187     String intervalColumn = hbTableDesc.getIntervalColumn();
188     String ownerColumn = hbTableDesc.getOwnerColumn();
189     this.insertHeartbeatSql = "insert into " + hbTableName + " (" + ownerColumn + ',' + intervalColumn + ','
190                 + lastHeartbeatColumn + ") VALUES (?, ?, " + currentTimeMillisFunc + ")";
191     this.updateHeartbeatSql = "UPDATE " + hbTableName + " SET " + lastHeartbeatColumn + " = " + currentTimeMillisFunc
192             + " WHERE " + ownerColumn + " = ? AND " + lastHeartbeatColumn + " + " + intervalColumn
193             + " * 2 > " + currentTimeMillisFunc;
194     this.deleteHeartBeatSql = "DELETE FROM " + hbTableName
195             + " WHERE " + ownerColumn + " = ?";
196     this.deleteSql = "DELETE FROM " + hbTableName + " WHERE " + lastHeartbeatColumn + " + " + intervalColumn
197             + " * 2 < " + currentTimeMillisFunc;
198     this.selectLastRunSql = "select " + lastHeartbeatColumn + " FROM " + hbTableName
199             + " where " + ownerColumn + " = ?";
200     this.lifecycleHooks = new CopyOnWriteArrayList<>();
201     long startTimeNanos =  TimeSource.nanoTime();
202     createHeartbeatRow();
203     long currTime = TimeSource.nanoTime();
204     this.lastRunNanos = currTime;
205     long duration = currTime - startTimeNanos;
206     this.beatDurationNanos = Math.max(duration, TimeUnit.MILLISECONDS.toNanos(10));
207   }
208 
209   public long getBeatDurationNanos() {
210     return beatDurationNanos;
211   }
212 
213   public void registerJmx() {
214     Registry.export(this);
215   }
216 
217   public void unregisterJmx() {
218     Registry.unregister(this);
219   }
220 
221   public void addLyfecycleHook(final LifecycleHook hook) {
222     lifecycleHooks.add(hook);
223   }
224 
225   public void removeLifecycleHook(final LifecycleHook hook) {
226     lifecycleHooks.remove(hook);
227   }
228 
229 
230   private void createHeartbeatRow() throws InterruptedException, SQLException {
231       jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
232 
233         try (PreparedStatement insert = conn.prepareStatement(insertHeartbeatSql)) {
234           insert.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
235           insert.setLong(2, this.intervalMillis);
236           insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
237           insert.executeUpdate();
238         }
239         return null;
240       }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
241       LOG.debug("Start Heart Beat for {}", org.spf4j.base.Runtime.PROCESS_ID);
242   }
243 
244   @JmxExport(description = "Remove all dead hearbeat rows")
245   public int removeDeadHeartBeatRows(@JmxExport("timeoutSeconds") final long timeoutSeconds)
246           throws SQLException, InterruptedException {
247     return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
248       return JdbcHeartBeat.this.removeDeadHeartBeatRows(conn, deadlineNanos);
249     }, timeoutSeconds, TimeUnit.SECONDS);
250   }
251 
252   @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
253   int removeDeadHeartBeatRows(final Connection conn, final long deadlineNanos) throws SQLException {
254     try (PreparedStatement stmt = conn.prepareStatement(deleteSql)) {
255       stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
256       return stmt.executeUpdate();
257     }
258   }
259 
260   private void removeHeartBeatRow(final int timeoutSeconds)
261           throws SQLException {
262     jdbc.transactOnConnectionNonInterrupt((final Connection conn, final long deadlineNanos) -> {
263       try (PreparedStatement stmt = conn.prepareStatement(deleteHeartBeatSql)) {
264         stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
265         stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
266         int nrDeleted = stmt.executeUpdate();
267         if (nrDeleted != 1) {
268           throw new IllegalStateException("Heartbeat rows deleted: " + nrDeleted
269                   + " for " + org.spf4j.base.Runtime.PROCESS_ID);
270         }
271       }
272       return null;
273     }, timeoutSeconds, TimeUnit.SECONDS);
274   }
275 
276   @JmxExport(value = "removeDeadHeartBeatRowsAsync", description = "Remove all dead hearbeat rows async")
277   public void removeDeadHeartBeatRowsAsyncNoReturn(@JmxExport("timeoutSeconds") final long timeoutSeconds) {
278     DefaultExecutor.INSTANCE.execute(new AbstractRunnable(true) {
279       @Override
280       public void doRun() throws SQLException, InterruptedException {
281         removeDeadHeartBeatRows(timeoutSeconds);
282       }
283     });
284   }
285 
286   public Future<Integer> removeDeadHeartBeatRowsAsync(final long timeoutSeconds) {
287     return DefaultExecutor.INSTANCE.submit(() -> removeDeadHeartBeatRows(timeoutSeconds));
288   }
289 
290   private ScheduledHeartBeat getHeartBeatRunnable() {
291     if (heartbeatRunnable == null) {
292       heartbeatRunnable = new ScheduledHeartBeat();
293     }
294     return heartbeatRunnable;
295   }
296 
297   public void scheduleHeartbeat(final ListeningScheduledExecutorService scheduler) {
298     synchronized (jdbc) {
299       if (isClosed) {
300         throw new IllegalStateException("Heartbeater is closed " + this);
301       }
302       if (scheduledHearbeat == null) {
303         long lrn = lastRunNanos;
304         long nanosSincelLastHB = TimeSource.nanoTime() - lrn;
305         long delayNanos = intervalNanos - nanosSincelLastHB;
306         if (delayNanos < (-intervalNanos) * missedHBRatio) {
307           throw new HeartBeatError("Missed heartbeat, last one was " + nanosSincelLastHB + " ns ago");
308         }
309         if (delayNanos < 0) {
310           delayNanos = 0;
311         }
312         ListenableScheduledFuture<?> scheduleFut = scheduler.schedule(
313                 getHeartBeatRunnable(), delayNanos, TimeUnit.NANOSECONDS);
314         scheduledHearbeat = scheduleFut;
315         Futures.addCallback(scheduleFut, new FutureCallback() {
316           @Override
317           public void onSuccess(final Object result) {
318             synchronized (jdbc) {
319               if (!isClosed) {
320                 scheduledHearbeat = null;
321                 scheduleHeartbeat(scheduler);
322               }
323             }
324           }
325 
326           @Override
327           @SuppressFBWarnings("ITC_INHERITANCE_TYPE_CHECKING")
328           public void onFailure(final Throwable t) {
329             if (t instanceof Error) {
330               throw (Error) t;
331             } else if (!(t instanceof CancellationException)) {
332               throw new HeartBeatError(t);
333             }
334           }
335         }, DefaultExecutor.INSTANCE);
336       }
337     }
338   }
339 
340   @JmxExport
341   public void beat() throws SQLException, InterruptedException {
342     jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
343       beat(conn, deadlineNanos);
344       return null;
345     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
346     lastRunNanos = TimeSource.nanoTime();
347   }
348 
349   void beat(final Connection conn, final long deadlineNanos) {
350     synchronized (jdbc) {
351       if (isClosed) {
352         throw new HeartBeatError("Heartbeater is closed " + this);
353       }
354     }
355     try (PreparedStatement stmt = conn.prepareStatement(updateHeartbeatSql)) {
356       stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
357       stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
358       int rowsUpdated = stmt.executeUpdate();
359       if (rowsUpdated != 1) {
360         throw new IllegalStateException("Broken Heartbeat for "
361                 + org.spf4j.base.Runtime.PROCESS_ID + "sql : " + updateHeartbeatSql + " rows : " + rowsUpdated);
362       }
363       LOG.debug("Heart Beat for {}", org.spf4j.base.Runtime.PROCESS_ID);
364     } catch (SQLException ex) {
365       throw new HeartBeatError(ex);
366     }
367   }
368 
369 
370   boolean tryBeat(final Connection conn, final long currentTimeNanos, final long deadlineNanos) {
371     if (currentTimeNanos - lastRunNanos > tryBeatThresholdNanos) {
372       beat(conn, deadlineNanos);
373       return true;
374     } else {
375       return false;
376     }
377   }
378 
379   void updateLastRunNanos(final long lastRunTime) {
380     lastRunNanos = lastRunTime;
381   }
382 
383 
384   @JmxExport(description = "The last run time recorded in the DB by this process")
385   public long getLastRunDB() throws SQLException, InterruptedException {
386     return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
387       try (PreparedStatement stmt = conn.prepareStatement(selectLastRunSql)) {
388         stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
389         stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
390         try (ResultSet rs = stmt.executeQuery()) {
391           if (rs.next()) {
392             long result = rs.getLong(1);
393             if (rs.next()) {
394               throw new IllegalStateException("Multible beats for same owner " + org.spf4j.base.Runtime.PROCESS_ID);
395             }
396             return result;
397           } else {
398             return 0L;
399           }
400         }
401       }
402     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
403   }
404 
405   @JmxExport(description = "The heartbeat interval in miliseconds")
406   public long getIntervalMillis() {
407     return intervalMillis;
408   }
409 
410   @JmxExport(description =  "The TimeSource nanos time  the jdbc heartbeat run last")
411   public long getLastRunNanos() {
412     return lastRunNanos;
413   }
414 
415   @JmxExport
416   public String getLastRunTimeStampString() {
417     return ZonedDateTime.ofInstant(
418             Instant.ofEpochMilli(Timing.getCurrentTiming().fromNanoTimeToEpochMillis(lastRunNanos)),
419             ZoneId.systemDefault()).toString();
420   }
421 
422   /**
423    * Get a reference to the hearbeat instance.
424    * @param dataSource  the datasource the hearbeat goes against.
425    * @param hbTableDesc - heartbeat table description.
426    * @param hook  a hook to notify when heartbeat fails.
427    * @return the heartbeat instance.
428    */
429   public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
430           final HeartBeatTableDesc hbTableDesc,
431           @Nullable final LifecycleHook hook) throws InterruptedException, SQLException {
432     return getHeartBeatAndSubscribe(dataSource, hbTableDesc,
433             hook, HEARTBEAT_INTERVAL_MILLIS, HEARTBEAT_INTERVAL_MILLIS / 1000);
434   }
435 
436   public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
437           final HeartBeatTableDesc hbTableDesc,
438           @Nullable final LifecycleHook hook,
439           final int heartBeatIntevalMillis, final int jdbcTimeoutSeconds)
440           throws InterruptedException, SQLException {
441     return getHeartBeatAndSubscribe(dataSource, hbTableDesc, hook, heartBeatIntevalMillis,
442             jdbcTimeoutSeconds, DefaultScheduler.listenableInstance());
443   }
444 
445   public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
446           final HeartBeatTableDesc hbTableDesc,
447           @Nullable final LifecycleHook hook,
448           final int heartBeatIntevalMillis, final int jdbcTimeoutSeconds,
449           final ListeningScheduledExecutorService scheduler)
450           throws InterruptedException, SQLException {
451     JdbcHeartBeat beat;
452     synchronized (HEARTBEATS) {
453       if (isShuttingdown) {
454         throw new IllegalStateException("Process is shutting down, no heartbeats are accepted for " + dataSource);
455       }
456       beat = HEARTBEATS.get(dataSource);
457       if (beat == null) {
458         beat = new JdbcHeartBeat(dataSource, hbTableDesc, heartBeatIntevalMillis, jdbcTimeoutSeconds, 0.5);
459         beat.registerJmx();
460         beat.addLyfecycleHook(new LifecycleHook() {
461           @Override
462           public void onError(final Error error) {
463             // this hook is only to remove the heartbeat from the registry.
464           }
465 
466           @Override
467           public void onClose() {
468             synchronized (HEARTBEATS) {
469               HEARTBEATS.remove(dataSource);
470             }
471           }
472         });
473         final JdbcHeartBeat fbeat = beat;
474         org.spf4j.base.Runtime.queueHookAtBeginning(new Runnable() {
475           @Override
476           public void run() {
477             synchronized (HEARTBEATS) {
478               isShuttingdown = true;
479             }
480             try {
481               fbeat.close();
482             } catch (SQLException | HeartBeatError ex) {
483               // logging in shutdownhooks is not reliable.
484               org.spf4j.base.Runtime.error("WARN: Could not clean heartbeat record,"
485                       + " this error can be ignored since it is a best effort attempt, detail:", ex);
486             }
487           }
488         });
489         HEARTBEATS.put(dataSource, beat);
490       }
491     }
492     if (hook != null) {
493       beat.addLyfecycleHook(hook);
494     }
495     beat.scheduleHeartbeat(scheduler);
496     return beat;
497   }
498 
499   public static void stopHeartBeats() {
500    synchronized (HEARTBEATS) {
501      Exception e = org.spf4j.base.Closeables.closeAll(HEARTBEATS.values());
502      if (e != null) {
503        throw new RuntimeException(e);
504      }
505      HEARTBEATS.clear();
506    }
507   }
508 
509 
510   public HeartBeatTableDesc getHbTableDesc() {
511     return hbTableDesc;
512   }
513 
514   @Override
515   public String toString() {
516     return "JdbcHeartBeat{" + "jdbc=" + jdbc + ", jdbcTimeoutSeconds=" + jdbcTimeoutSeconds + ", intervalMillis="
517             + intervalMillis + ", hbTableDesc=" + hbTableDesc + ", lastRunNanos=" + lastRunNanos + '}';
518   }
519 
520   private class ScheduledHeartBeat implements Runnable {
521 
522     @Override
523     public void run() {
524         long lrn = lastRunNanos;
525         long currentTimeNanos = TimeSource.nanoTime();
526         // not first beat.
527         long nanosSinceLastBeat = currentTimeNanos - lrn;
528         if (maxMissedNanos < nanosSinceLastBeat) {
529           // Unable to beat at inteval!
530           HeartBeatError err = new HeartBeatError("System too busy to provide regular heartbeat, last heartbeat "
531                   + nanosSinceLastBeat + " ns ago");
532 
533           handleError(err);
534         }
535         if (nanosSinceLastBeat > tryBeatThresholdNanos) {
536           try {
537             beat();
538           } catch (RuntimeException | SQLException ex) {
539             HeartBeatError err = new HeartBeatError("System failed heartbeat", ex);
540             handleError(err);
541           } catch (InterruptedException ex) {
542             Thread.currentThread().interrupt();
543           }
544         }
545     }
546 
547     public void handleError(final HeartBeatError err) {
548       for (LifecycleHook hook : lifecycleHooks) {
549         hook.onError(err);
550       }
551       RuntimeException ex = Iterables.forAll(lifecycleHooks, (final LifecycleHook t) -> {
552         try {
553           t.onClose();
554         } catch (SQLException e) {
555           throw new RuntimeException(e);
556         }
557       });
558       if (ex != null) {
559         err.addSuppressed(ex);
560       }
561       lifecycleHooks.clear();
562       throw err;
563     }
564   }
565 
566 }