1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.concurrent.jdbc;
33
34 import com.google.common.util.concurrent.FutureCallback;
35 import com.google.common.util.concurrent.Futures;
36 import com.google.common.util.concurrent.ListenableScheduledFuture;
37 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
38 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
39 import java.sql.Connection;
40 import java.sql.PreparedStatement;
41 import java.sql.ResultSet;
42 import java.sql.SQLException;
43 import java.time.Instant;
44 import java.time.ZoneId;
45 import java.time.ZonedDateTime;
46 import java.util.IdentityHashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.CancellationException;
50 import java.util.concurrent.CopyOnWriteArrayList;
51 import java.util.concurrent.Future;
52 import java.util.concurrent.ScheduledFuture;
53 import java.util.concurrent.TimeUnit;
54 import javax.annotation.Nullable;
55 import javax.annotation.ParametersAreNonnullByDefault;
56 import javax.annotation.concurrent.GuardedBy;
57 import javax.annotation.concurrent.ThreadSafe;
58 import javax.sql.DataSource;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import org.spf4j.base.AbstractRunnable;
62 import org.spf4j.base.Iterables;
63 import org.spf4j.base.TimeSource;
64 import org.spf4j.base.Timing;
65 import org.spf4j.concurrent.DefaultExecutor;
66 import org.spf4j.concurrent.DefaultScheduler;
67 import org.spf4j.jdbc.JdbcTemplate;
68 import org.spf4j.jmx.JmxExport;
69 import org.spf4j.jmx.Registry;
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @SuppressFBWarnings(value = {"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING",
84 "PMB_POSSIBLE_MEMORY_BLOAT", "SQL_INJECTION_JDBC"}, justification = "The db object names are configurable,"
85 + "we for know allow heartbeats to multiple data sources, should be one mostly")
86 @ParametersAreNonnullByDefault
87 @ThreadSafe
88 public final class JdbcHeartBeat implements AutoCloseable {
89
90 private static final Logger LOG = LoggerFactory.getLogger(JdbcHeartBeat.class);
91
92 private static final Map<DataSource, JdbcHeartBeat> HEARTBEATS = new IdentityHashMap<>();
93
94 private static final int HEARTBEAT_INTERVAL_MILLIS =
95 Integer.getInteger("spf4j.jdbc.heartBeats.defaultIntervalMillis", 10000);
96
97 @GuardedBy("HEARTBEATS")
98 private static boolean isShuttingdown = false;
99
100 private final List<LifecycleHook> lifecycleHooks;
101
102 private final JdbcTemplate jdbc;
103
104 private final String insertHeartbeatSql;
105
106 private final String updateHeartbeatSql;
107
108 private final String selectLastRunSql;
109
110 private final int jdbcTimeoutSeconds;
111
112 private final long intervalNanos;
113
114 private final long intervalMillis;
115
116 private final HeartBeatTableDesc hbTableDesc;
117
118 private final String deleteSql;
119
120 private final String deleteHeartBeatSql;
121
122 private volatile long lastRunNanos;
123
124 private boolean isClosed;
125
126 private ListenableScheduledFuture<?> scheduledHearbeat;
127
128 private final long beatDurationNanos;
129
130 private ScheduledHeartBeat heartbeatRunnable;
131
132 private final double missedHBRatio;
133
134 private final long maxMissedNanos;
135
136 private final long tryBeatThresholdNanos;
137
138 @Override
139 @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
140 public void close() throws SQLException {
141 boolean weClosed = false;
142 synchronized (jdbc) {
143 if (!isClosed) {
144 weClosed = true;
145 isClosed = true;
146 }
147 }
148 if (weClosed) {
149 unregisterJmx();
150 ScheduledFuture<?> running = scheduledHearbeat;
151 if (running != null) {
152 scheduledHearbeat.cancel(true);
153 }
154 removeHeartBeatRow(jdbcTimeoutSeconds);
155 for (LifecycleHook hook : lifecycleHooks) {
156 hook.onClose();
157 }
158 }
159 }
160
161 public interface LifecycleHook {
162
163 void onError(Error error);
164
165 void onClose() throws SQLException;
166
167 }
168
169 private JdbcHeartBeat(final DataSource dataSource, final HeartBeatTableDesc hbTableDesc, final long intervalMillis,
170 final int jdbcTimeoutSeconds, final double missedHBRatio) throws InterruptedException, SQLException {
171 if (intervalMillis < 1000) {
172 throw new IllegalArgumentException("The heartbeat interval should be at least 1s and not "
173 + intervalMillis + " ms");
174 }
175 this.missedHBRatio = missedHBRatio;
176 this.jdbc = new JdbcTemplate(dataSource);
177 this.jdbcTimeoutSeconds = jdbcTimeoutSeconds;
178 this.intervalMillis = intervalMillis;
179 this.intervalNanos = TimeUnit.MILLISECONDS.toNanos(intervalMillis);
180 this.tryBeatThresholdNanos = intervalNanos / 2;
181 this.maxMissedNanos = (long) ((double) intervalNanos * (1 + missedHBRatio));
182 this.hbTableDesc = hbTableDesc;
183 this.isClosed = false;
184 String hbTableName = hbTableDesc.getTableName();
185 String lastHeartbeatColumn = hbTableDesc.getLastHeartbeatColumn();
186 String currentTimeMillisFunc = hbTableDesc.getDbType().getCurrTSSqlFn();
187 String intervalColumn = hbTableDesc.getIntervalColumn();
188 String ownerColumn = hbTableDesc.getOwnerColumn();
189 this.insertHeartbeatSql = "insert into " + hbTableName + " (" + ownerColumn + ',' + intervalColumn + ','
190 + lastHeartbeatColumn + ") VALUES (?, ?, " + currentTimeMillisFunc + ")";
191 this.updateHeartbeatSql = "UPDATE " + hbTableName + " SET " + lastHeartbeatColumn + " = " + currentTimeMillisFunc
192 + " WHERE " + ownerColumn + " = ? AND " + lastHeartbeatColumn + " + " + intervalColumn
193 + " * 2 > " + currentTimeMillisFunc;
194 this.deleteHeartBeatSql = "DELETE FROM " + hbTableName
195 + " WHERE " + ownerColumn + " = ?";
196 this.deleteSql = "DELETE FROM " + hbTableName + " WHERE " + lastHeartbeatColumn + " + " + intervalColumn
197 + " * 2 < " + currentTimeMillisFunc;
198 this.selectLastRunSql = "select " + lastHeartbeatColumn + " FROM " + hbTableName
199 + " where " + ownerColumn + " = ?";
200 this.lifecycleHooks = new CopyOnWriteArrayList<>();
201 long startTimeNanos = TimeSource.nanoTime();
202 createHeartbeatRow();
203 long currTime = TimeSource.nanoTime();
204 this.lastRunNanos = currTime;
205 long duration = currTime - startTimeNanos;
206 this.beatDurationNanos = Math.max(duration, TimeUnit.MILLISECONDS.toNanos(10));
207 }
208
209 public long getBeatDurationNanos() {
210 return beatDurationNanos;
211 }
212
213 public void registerJmx() {
214 Registry.export(this);
215 }
216
217 public void unregisterJmx() {
218 Registry.unregister(this);
219 }
220
221 public void addLyfecycleHook(final LifecycleHook hook) {
222 lifecycleHooks.add(hook);
223 }
224
225 public void removeLifecycleHook(final LifecycleHook hook) {
226 lifecycleHooks.remove(hook);
227 }
228
229
230 private void createHeartbeatRow() throws InterruptedException, SQLException {
231 jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
232
233 try (PreparedStatement insert = conn.prepareStatement(insertHeartbeatSql)) {
234 insert.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
235 insert.setLong(2, this.intervalMillis);
236 insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
237 insert.executeUpdate();
238 }
239 return null;
240 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
241 LOG.debug("Start Heart Beat for {}", org.spf4j.base.Runtime.PROCESS_ID);
242 }
243
244 @JmxExport(description = "Remove all dead hearbeat rows")
245 public int removeDeadHeartBeatRows(@JmxExport("timeoutSeconds") final long timeoutSeconds)
246 throws SQLException, InterruptedException {
247 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
248 return JdbcHeartBeat.this.removeDeadHeartBeatRows(conn, deadlineNanos);
249 }, timeoutSeconds, TimeUnit.SECONDS);
250 }
251
252 @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE")
253 int removeDeadHeartBeatRows(final Connection conn, final long deadlineNanos) throws SQLException {
254 try (PreparedStatement stmt = conn.prepareStatement(deleteSql)) {
255 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
256 return stmt.executeUpdate();
257 }
258 }
259
260 private void removeHeartBeatRow(final int timeoutSeconds)
261 throws SQLException {
262 jdbc.transactOnConnectionNonInterrupt((final Connection conn, final long deadlineNanos) -> {
263 try (PreparedStatement stmt = conn.prepareStatement(deleteHeartBeatSql)) {
264 stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
265 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
266 int nrDeleted = stmt.executeUpdate();
267 if (nrDeleted != 1) {
268 throw new IllegalStateException("Heartbeat rows deleted: " + nrDeleted
269 + " for " + org.spf4j.base.Runtime.PROCESS_ID);
270 }
271 }
272 return null;
273 }, timeoutSeconds, TimeUnit.SECONDS);
274 }
275
276 @JmxExport(value = "removeDeadHeartBeatRowsAsync", description = "Remove all dead hearbeat rows async")
277 public void removeDeadHeartBeatRowsAsyncNoReturn(@JmxExport("timeoutSeconds") final long timeoutSeconds) {
278 DefaultExecutor.INSTANCE.execute(new AbstractRunnable(true) {
279 @Override
280 public void doRun() throws SQLException, InterruptedException {
281 removeDeadHeartBeatRows(timeoutSeconds);
282 }
283 });
284 }
285
286 public Future<Integer> removeDeadHeartBeatRowsAsync(final long timeoutSeconds) {
287 return DefaultExecutor.INSTANCE.submit(() -> removeDeadHeartBeatRows(timeoutSeconds));
288 }
289
290 private ScheduledHeartBeat getHeartBeatRunnable() {
291 if (heartbeatRunnable == null) {
292 heartbeatRunnable = new ScheduledHeartBeat();
293 }
294 return heartbeatRunnable;
295 }
296
297 public void scheduleHeartbeat(final ListeningScheduledExecutorService scheduler) {
298 synchronized (jdbc) {
299 if (isClosed) {
300 throw new IllegalStateException("Heartbeater is closed " + this);
301 }
302 if (scheduledHearbeat == null) {
303 long lrn = lastRunNanos;
304 long nanosSincelLastHB = TimeSource.nanoTime() - lrn;
305 long delayNanos = intervalNanos - nanosSincelLastHB;
306 if (delayNanos < (-intervalNanos) * missedHBRatio) {
307 throw new HeartBeatError("Missed heartbeat, last one was " + nanosSincelLastHB + " ns ago");
308 }
309 if (delayNanos < 0) {
310 delayNanos = 0;
311 }
312 ListenableScheduledFuture<?> scheduleFut = scheduler.schedule(
313 getHeartBeatRunnable(), delayNanos, TimeUnit.NANOSECONDS);
314 scheduledHearbeat = scheduleFut;
315 Futures.addCallback(scheduleFut, new FutureCallback() {
316 @Override
317 public void onSuccess(final Object result) {
318 synchronized (jdbc) {
319 if (!isClosed) {
320 scheduledHearbeat = null;
321 scheduleHeartbeat(scheduler);
322 }
323 }
324 }
325
326 @Override
327 @SuppressFBWarnings("ITC_INHERITANCE_TYPE_CHECKING")
328 public void onFailure(final Throwable t) {
329 if (t instanceof Error) {
330 throw (Error) t;
331 } else if (!(t instanceof CancellationException)) {
332 throw new HeartBeatError(t);
333 }
334 }
335 }, DefaultExecutor.INSTANCE);
336 }
337 }
338 }
339
340 @JmxExport
341 public void beat() throws SQLException, InterruptedException {
342 jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
343 beat(conn, deadlineNanos);
344 return null;
345 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
346 lastRunNanos = TimeSource.nanoTime();
347 }
348
349 void beat(final Connection conn, final long deadlineNanos) {
350 synchronized (jdbc) {
351 if (isClosed) {
352 throw new HeartBeatError("Heartbeater is closed " + this);
353 }
354 }
355 try (PreparedStatement stmt = conn.prepareStatement(updateHeartbeatSql)) {
356 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
357 stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
358 int rowsUpdated = stmt.executeUpdate();
359 if (rowsUpdated != 1) {
360 throw new IllegalStateException("Broken Heartbeat for "
361 + org.spf4j.base.Runtime.PROCESS_ID + "sql : " + updateHeartbeatSql + " rows : " + rowsUpdated);
362 }
363 LOG.debug("Heart Beat for {}", org.spf4j.base.Runtime.PROCESS_ID);
364 } catch (SQLException ex) {
365 throw new HeartBeatError(ex);
366 }
367 }
368
369
370 boolean tryBeat(final Connection conn, final long currentTimeNanos, final long deadlineNanos) {
371 if (currentTimeNanos - lastRunNanos > tryBeatThresholdNanos) {
372 beat(conn, deadlineNanos);
373 return true;
374 } else {
375 return false;
376 }
377 }
378
379 void updateLastRunNanos(final long lastRunTime) {
380 lastRunNanos = lastRunTime;
381 }
382
383
384 @JmxExport(description = "The last run time recorded in the DB by this process")
385 public long getLastRunDB() throws SQLException, InterruptedException {
386 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
387 try (PreparedStatement stmt = conn.prepareStatement(selectLastRunSql)) {
388 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
389 stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
390 try (ResultSet rs = stmt.executeQuery()) {
391 if (rs.next()) {
392 long result = rs.getLong(1);
393 if (rs.next()) {
394 throw new IllegalStateException("Multible beats for same owner " + org.spf4j.base.Runtime.PROCESS_ID);
395 }
396 return result;
397 } else {
398 return 0L;
399 }
400 }
401 }
402 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
403 }
404
405 @JmxExport(description = "The heartbeat interval in miliseconds")
406 public long getIntervalMillis() {
407 return intervalMillis;
408 }
409
410 @JmxExport(description = "The TimeSource nanos time the jdbc heartbeat run last")
411 public long getLastRunNanos() {
412 return lastRunNanos;
413 }
414
415 @JmxExport
416 public String getLastRunTimeStampString() {
417 return ZonedDateTime.ofInstant(
418 Instant.ofEpochMilli(Timing.getCurrentTiming().fromNanoTimeToEpochMillis(lastRunNanos)),
419 ZoneId.systemDefault()).toString();
420 }
421
422
423
424
425
426
427
428
429 public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
430 final HeartBeatTableDesc hbTableDesc,
431 @Nullable final LifecycleHook hook) throws InterruptedException, SQLException {
432 return getHeartBeatAndSubscribe(dataSource, hbTableDesc,
433 hook, HEARTBEAT_INTERVAL_MILLIS, HEARTBEAT_INTERVAL_MILLIS / 1000);
434 }
435
436 public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
437 final HeartBeatTableDesc hbTableDesc,
438 @Nullable final LifecycleHook hook,
439 final int heartBeatIntevalMillis, final int jdbcTimeoutSeconds)
440 throws InterruptedException, SQLException {
441 return getHeartBeatAndSubscribe(dataSource, hbTableDesc, hook, heartBeatIntevalMillis,
442 jdbcTimeoutSeconds, DefaultScheduler.listenableInstance());
443 }
444
445 public static JdbcHeartBeat getHeartBeatAndSubscribe(final DataSource dataSource,
446 final HeartBeatTableDesc hbTableDesc,
447 @Nullable final LifecycleHook hook,
448 final int heartBeatIntevalMillis, final int jdbcTimeoutSeconds,
449 final ListeningScheduledExecutorService scheduler)
450 throws InterruptedException, SQLException {
451 JdbcHeartBeat beat;
452 synchronized (HEARTBEATS) {
453 if (isShuttingdown) {
454 throw new IllegalStateException("Process is shutting down, no heartbeats are accepted for " + dataSource);
455 }
456 beat = HEARTBEATS.get(dataSource);
457 if (beat == null) {
458 beat = new JdbcHeartBeat(dataSource, hbTableDesc, heartBeatIntevalMillis, jdbcTimeoutSeconds, 0.5);
459 beat.registerJmx();
460 beat.addLyfecycleHook(new LifecycleHook() {
461 @Override
462 public void onError(final Error error) {
463
464 }
465
466 @Override
467 public void onClose() {
468 synchronized (HEARTBEATS) {
469 HEARTBEATS.remove(dataSource);
470 }
471 }
472 });
473 final JdbcHeartBeat fbeat = beat;
474 org.spf4j.base.Runtime.queueHookAtBeginning(new Runnable() {
475 @Override
476 public void run() {
477 synchronized (HEARTBEATS) {
478 isShuttingdown = true;
479 }
480 try {
481 fbeat.close();
482 } catch (SQLException | HeartBeatError ex) {
483
484 org.spf4j.base.Runtime.error("WARN: Could not clean heartbeat record,"
485 + " this error can be ignored since it is a best effort attempt, detail:", ex);
486 }
487 }
488 });
489 HEARTBEATS.put(dataSource, beat);
490 }
491 }
492 if (hook != null) {
493 beat.addLyfecycleHook(hook);
494 }
495 beat.scheduleHeartbeat(scheduler);
496 return beat;
497 }
498
499 public static void stopHeartBeats() {
500 synchronized (HEARTBEATS) {
501 Exception e = org.spf4j.base.Closeables.closeAll(HEARTBEATS.values());
502 if (e != null) {
503 throw new RuntimeException(e);
504 }
505 HEARTBEATS.clear();
506 }
507 }
508
509
510 public HeartBeatTableDesc getHbTableDesc() {
511 return hbTableDesc;
512 }
513
514 @Override
515 public String toString() {
516 return "JdbcHeartBeat{" + "jdbc=" + jdbc + ", jdbcTimeoutSeconds=" + jdbcTimeoutSeconds + ", intervalMillis="
517 + intervalMillis + ", hbTableDesc=" + hbTableDesc + ", lastRunNanos=" + lastRunNanos + '}';
518 }
519
520 private class ScheduledHeartBeat implements Runnable {
521
522 @Override
523 public void run() {
524 long lrn = lastRunNanos;
525 long currentTimeNanos = TimeSource.nanoTime();
526
527 long nanosSinceLastBeat = currentTimeNanos - lrn;
528 if (maxMissedNanos < nanosSinceLastBeat) {
529
530 HeartBeatError err = new HeartBeatError("System too busy to provide regular heartbeat, last heartbeat "
531 + nanosSinceLastBeat + " ns ago");
532
533 handleError(err);
534 }
535 if (nanosSinceLastBeat > tryBeatThresholdNanos) {
536 try {
537 beat();
538 } catch (RuntimeException | SQLException ex) {
539 HeartBeatError err = new HeartBeatError("System failed heartbeat", ex);
540 handleError(err);
541 } catch (InterruptedException ex) {
542 Thread.currentThread().interrupt();
543 }
544 }
545 }
546
547 public void handleError(final HeartBeatError err) {
548 for (LifecycleHook hook : lifecycleHooks) {
549 hook.onError(err);
550 }
551 RuntimeException ex = Iterables.forAll(lifecycleHooks, (final LifecycleHook t) -> {
552 try {
553 t.onClose();
554 } catch (SQLException e) {
555 throw new RuntimeException(e);
556 }
557 });
558 if (ex != null) {
559 err.addSuppressed(ex);
560 }
561 lifecycleHooks.clear();
562 throw err;
563 }
564 }
565
566 }