1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.concurrent.jdbc;
33
34 import org.spf4j.concurrent.Semaphore;
35 import com.google.common.annotations.Beta;
36 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
37 import java.sql.Connection;
38 import java.sql.PreparedStatement;
39 import java.sql.ResultSet;
40 import java.sql.SQLException;
41 import java.sql.SQLIntegrityConstraintViolationException;
42 import java.sql.SQLTimeoutException;
43 import java.util.ArrayList;
44 import java.util.List;
45 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentMap;
47 import java.util.concurrent.ExecutionException;
48 import java.util.concurrent.Future;
49 import java.util.concurrent.ThreadLocalRandom;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.CheckReturnValue;
53 import javax.annotation.Nonnull;
54 import javax.sql.DataSource;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.spf4j.base.HandlerNano;
58 import org.spf4j.base.MutableHolder;
59 import org.spf4j.base.TimeSource;
60 import org.spf4j.concurrent.DefaultExecutor;
61 import org.spf4j.concurrent.LockRuntimeException;
62 import org.spf4j.jdbc.JdbcTemplate;
63 import org.spf4j.jmx.JmxExport;
64 import org.spf4j.jmx.Registry;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 @SuppressFBWarnings(value = {"NP_LOAD_OF_KNOWN_NULL_VALUE", "SQL_INJECTION_JDBC",
86 "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "PREDICTABLE_RANDOM"},
87 justification = "Sql injection is not really possible since the parameterized values are"
88 + " validated to be java ids")
89 @Beta
90 public final class JdbcSemaphore implements AutoCloseable, Semaphore {
91
92 private static final int CLEANUP_TIMEOUT_SECONDS =
93 Integer.getInteger("spf4j.jdbc.semaphore.cleanupTimeoutSeconds", 60);
94
95 private static final Logger LOG = LoggerFactory.getLogger(JdbcSemaphore.class);
96
97 private static final ConcurrentMap<String, Object> SYNC_OBJS = new ConcurrentHashMap<>();
98
99 private final JdbcTemplate jdbc;
100
101 private final String permitsSql;
102
103 private final String ownedPermitsSql;
104
105 private final String totalPermitsSql;
106
107 private final String reducePermitsSql;
108
109 private final String increasePermitsSql;
110
111 private final String updatePermitsSql;
112
113 private final String acquireSql;
114
115 private final String acquireByOwnerSql;
116
117 private final String releaseSql;
118
119 private final String releaseByOwnerSql;
120
121 private final String deleteDeadOwnerRecordsSql;
122
123 private final String getDeadOwnerPermitsSql;
124
125 private final String deleteDeadOwerRecordSql;
126
127 private final String insertLockRowSql;
128
129 private final String insertPermitsByOwnerSql;
130
131 private final int jdbcTimeoutSeconds;
132
133 private final String semName;
134
135 private final Object syncObj;
136
137 private final JdbcHeartBeat heartBeat;
138
139 private volatile boolean isHealthy;
140
141 private boolean isClosed;
142
143 private Error heartBeatFailure;
144
145 private final int acquirePollMillis;
146
147 private final JdbcHeartBeat.LifecycleHook failureHook;
148
149 private int ownedReservations;
150
151
152
153
154
155
156 public JdbcSemaphore(final DataSource dataSource, final String semaphoreName, final int nrPermits)
157 throws InterruptedException, SQLException {
158 this(dataSource, semaphoreName, nrPermits, false);
159 }
160
161
162
163
164
165
166
167
168
169
170 public JdbcSemaphore(final DataSource dataSource, final String semaphoreName,
171 final int nrPermits, final boolean strict) throws InterruptedException, SQLException {
172 this(dataSource, SemaphoreTablesDesc.DEFAULT, semaphoreName, nrPermits,
173 Integer.getInteger("spf4j.jdbc.semaphore.jdbcTimeoutSeconds", 10), strict);
174 }
175
176 public JdbcSemaphore(final DataSource dataSource, final SemaphoreTablesDesc semTableDesc,
177 final String semaphoreName, final int nrPermits, final int jdbcTimeoutSeconds,
178 final boolean strictReservations) throws InterruptedException, SQLException {
179 this(dataSource, semTableDesc, semaphoreName, nrPermits, jdbcTimeoutSeconds, strictReservations,
180 Integer.getInteger("spf4j.jdbc.semaphore.defaultMaxPollIntervalMillis", 1000));
181 }
182
183
184 @SuppressFBWarnings({"CBX_CUSTOM_BUILT_XML", "STT_TOSTRING_STORED_IN_FIELD"})
185 public JdbcSemaphore(final DataSource dataSource, final SemaphoreTablesDesc semTableDesc,
186 final String semaphoreName, final int nrPermits, final int jdbcTimeoutSeconds,
187 final boolean strictReservations, final int acquirePollMillis) throws InterruptedException, SQLException {
188 if (nrPermits < 0) {
189 throw new IllegalArgumentException("Permits must be positive and not " + nrPermits);
190 }
191 this.acquirePollMillis = acquirePollMillis;
192 this.semName = semaphoreName;
193 this.syncObj = SYNC_OBJS.computeIfAbsent(semaphoreName, (key) -> new Object());
194 this.jdbcTimeoutSeconds = jdbcTimeoutSeconds;
195 this.jdbc = new JdbcTemplate(dataSource);
196 this.isHealthy = true;
197 this.ownedReservations = 0;
198 this.failureHook = new JdbcHeartBeat.LifecycleHook() {
199 @Override
200 public void onError(final Error error) {
201 heartBeatFailure = error;
202 isHealthy = false;
203 }
204
205 @Override
206 public void onClose() {
207 close();
208 }
209 };
210 this.heartBeat = JdbcHeartBeat.getHeartBeatAndSubscribe(dataSource,
211 semTableDesc.getHeartBeatTableDesc(), failureHook);
212 final String semaphoreTableName = semTableDesc.getSemaphoreTableName();
213 String availablePermitsColumn = semTableDesc.getAvailablePermitsColumn();
214 String lastModifiedByColumn = semTableDesc.getLastModifiedByColumn();
215 String lastModifiedAtColumn = semTableDesc.getLastModifiedAtColumn();
216 String ownerColumn = semTableDesc.getOwnerColumn();
217 String semaphoreNameColumn = semTableDesc.getSemNameColumn();
218 String totalPermitsColumn = semTableDesc.getTotalPermitsColumn();
219 String ownerPermitsColumn = semTableDesc.getOwnerPermitsColumn();
220 String permitsByOwnerTableName = semTableDesc.getPermitsByOwnerTableName();
221 HeartBeatTableDesc hbTableDesc = heartBeat.getHbTableDesc();
222 String heartBeatTableName = hbTableDesc.getTableName();
223 String heartBeatOwnerColumn = hbTableDesc.getOwnerColumn();
224 String currentTimeMillisFunc = hbTableDesc.getDbType().getCurrTSSqlFn();
225
226 this.reducePermitsSql = "UPDATE " + semaphoreTableName + " SET "
227 + totalPermitsColumn + " = " + totalPermitsColumn + " - ?, "
228 + availablePermitsColumn + " = " + availablePermitsColumn + " - ?, "
229 + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
230 + semaphoreNameColumn + " = ? AND "
231 + totalPermitsColumn + " >= ?";
232
233 this.increasePermitsSql = "UPDATE " + semaphoreTableName + " SET "
234 + totalPermitsColumn + " = " + totalPermitsColumn + " + ?, "
235 + availablePermitsColumn + " = " + availablePermitsColumn + " + ?, "
236 + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
237 + semaphoreNameColumn + " = ? ";
238
239 this.updatePermitsSql = "UPDATE " + semaphoreTableName + " SET "
240 + totalPermitsColumn + " = ?, "
241 + availablePermitsColumn + " = " + availablePermitsColumn + " + ? - " + totalPermitsColumn + ','
242 + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
243 + semaphoreNameColumn + " = ?";
244
245 this.acquireSql = "UPDATE " + semaphoreTableName + " SET "
246 + availablePermitsColumn + " = " + availablePermitsColumn + " - ?, "
247 + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
248 + semaphoreNameColumn + " = ? AND "
249 + availablePermitsColumn + " >= ?";
250 this.acquireByOwnerSql = "UPDATE " + permitsByOwnerTableName
251 + " SET " + ownerPermitsColumn + " = " + ownerPermitsColumn + " + ?, "
252 + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
253 + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ?";
254
255 this.releaseSql = "UPDATE " + semaphoreTableName + " SET "
256 + availablePermitsColumn + " = CASE WHEN "
257 + availablePermitsColumn + " + ? > " + totalPermitsColumn
258 + " THEN " + totalPermitsColumn + " ELSE " + availablePermitsColumn + " + ? END, "
259 + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
260 + semaphoreNameColumn + " = ?";
261
262 this.releaseByOwnerSql = "UPDATE " + permitsByOwnerTableName
263 + " SET " + ownerPermitsColumn + " = " + ownerPermitsColumn
264 + " - ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
265 + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ? and " + ownerPermitsColumn + " >= ?";
266
267 this.permitsSql = "SELECT " + availablePermitsColumn + ',' + totalPermitsColumn
268 + " FROM " + semaphoreTableName
269 + " WHERE " + semaphoreNameColumn + " = ?";
270
271 this.totalPermitsSql = "SELECT " + totalPermitsColumn + " FROM " + semaphoreTableName
272 + " WHERE " + semTableDesc.getSemNameColumn() + " = ?";
273
274 this.ownedPermitsSql = "SELECT " + ownerPermitsColumn + " FROM "
275 + permitsByOwnerTableName + " WHERE "
276 + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ?";
277
278 this.deleteDeadOwnerRecordsSql = "DELETE FROM " + permitsByOwnerTableName + " RO "
279 + "WHERE RO." + semaphoreNameColumn + " = ? AND " + ownerPermitsColumn + " = 0 AND "
280 + "NOT EXISTS (select H." + heartBeatOwnerColumn + " from " + heartBeatTableName
281 + " H where H." + heartBeatOwnerColumn + " = RO." + ownerColumn + ')';
282
283 this.getDeadOwnerPermitsSql = "SELECT " + ownerColumn + ", " + ownerPermitsColumn
284 + " FROM " + permitsByOwnerTableName + " RO "
285 + "WHERE RO." + semaphoreNameColumn + " = ? AND " + ownerPermitsColumn + " > 0 AND "
286 + "NOT EXISTS (select H." + heartBeatOwnerColumn + " from " + heartBeatTableName
287 + " H where H." + heartBeatOwnerColumn + " = RO." + ownerColumn
288 + ") ORDER BY " + ownerColumn + ',' + ownerPermitsColumn;
289
290 this.deleteDeadOwerRecordSql = "DELETE FROM " + permitsByOwnerTableName + " WHERE "
291 + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ? AND "
292 + ownerPermitsColumn + " = ?";
293
294 this.insertLockRowSql = "insert into " + semaphoreTableName
295 + " (" + semaphoreNameColumn + ',' + availablePermitsColumn + ',' + totalPermitsColumn
296 + ',' + lastModifiedByColumn + ',' + lastModifiedAtColumn + ") VALUES (?, ?, ?, ?, "
297 + currentTimeMillisFunc + ')';
298
299 this.insertPermitsByOwnerSql = "insert into " + permitsByOwnerTableName
300 + " (" + semaphoreNameColumn + ',' + ownerColumn + ',' + ownerPermitsColumn + ','
301 + lastModifiedAtColumn + ") VALUES (?, ?, ?, " + currentTimeMillisFunc + ")";
302
303
304 try {
305 createLockRowIfNotPresent(strictReservations, nrPermits);
306 } catch (SQLIntegrityConstraintViolationException ex) {
307 try {
308
309 createLockRowIfNotPresent(strictReservations, nrPermits);
310 } catch (SQLException ex1) {
311 ex1.addSuppressed(ex);
312 throw ex1;
313 }
314 }
315 createOwnerRowIfNotPresent();
316 }
317
318 public void registerJmx() {
319 Registry.export(JdbcSemaphore.class.getName(), semName, this);
320 }
321
322 public void unregisterJmx() {
323 Registry.unregister(JdbcSemaphore.class.getName(), semName);
324 }
325
326 private void validate() {
327 if (!isHealthy) {
328 throw new IllegalStateException("Heartbeats failed! semaphore broken " + this, heartBeatFailure);
329 }
330 }
331
332 private void checkClosed() {
333 if (isClosed) {
334 throw new IllegalStateException("Semaphore " + this + " is closed");
335 }
336 }
337
338 private void createLockRowIfNotPresent(final boolean strictReservations, final int nrPermits)
339 throws SQLException, InterruptedException {
340 jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
341 try (PreparedStatement stmt = conn.prepareStatement(permitsSql)) {
342 stmt.setNString(1, semName);
343 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
344 try (ResultSet rs = stmt.executeQuery()) {
345 if (!rs.next()) {
346 try (PreparedStatement insert = conn.prepareStatement(insertLockRowSql)) {
347 insert.setNString(1, semName);
348 insert.setInt(2, nrPermits);
349 insert.setInt(3, nrPermits);
350 insert.setNString(4, org.spf4j.base.Runtime.PROCESS_ID);
351 insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
352 insert.executeUpdate();
353 }
354 } else if (strictReservations) {
355 int existingMaxReservations = rs.getInt(2);
356 if (existingMaxReservations != nrPermits) {
357 throw new IllegalArgumentException("Semaphore " + semName + " max reservations count different "
358 + existingMaxReservations + " != " + nrPermits + " use different semaphore");
359 }
360 if (rs.next()) {
361 throw new IllegalStateException("Cannot have mutiple semaphores with the same name " + semName);
362 }
363 } else if (rs.next()) {
364 throw new IllegalStateException("Cannot have mutiple semaphores with the same name " + semName);
365 }
366 }
367 }
368 return null;
369 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
370 }
371
372 private void createOwnerRowIfNotPresent()
373 throws SQLException, InterruptedException {
374 try {
375 jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
376
377 try (PreparedStatement insert = conn.prepareStatement(insertPermitsByOwnerSql)) {
378 insert.setNString(1, this.semName);
379 insert.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
380 insert.setInt(3, 0);
381 insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
382 insert.executeUpdate();
383 }
384 return null;
385 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
386 } catch (SQLIntegrityConstraintViolationException ex) {
387 LOG.debug("Semaphore record for current process already there", ex);
388 }
389 }
390
391 @SuppressFBWarnings("UW_UNCOND_WAIT")
392 @CheckReturnValue
393 @Override
394 public boolean tryAcquire(final int nrPermits, final long deadlineNanos)
395 throws InterruptedException {
396 if (nrPermits < 1) {
397 throw new IllegalArgumentException("You should try to acquire something! not " + nrPermits);
398 }
399 synchronized (syncObj) {
400 boolean acquired = false;
401 final MutableHolder<Boolean> beat = MutableHolder.of(Boolean.FALSE);
402 do {
403 checkClosed();
404 validate();
405 try {
406 acquired = jdbc.transactOnConnection(new HandlerNano<Connection, Boolean, SQLException>() {
407 @Override
408 public Boolean handle(final Connection conn, final long deadlineNanos) throws SQLException {
409 try (PreparedStatement stmt = conn.prepareStatement(acquireSql)) {
410 stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
411 jdbcTimeoutSeconds));
412 stmt.setInt(1, nrPermits);
413 stmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
414 stmt.setNString(3, semName);
415 stmt.setInt(4, nrPermits);
416 int rowsUpdated = stmt.executeUpdate();
417 Boolean acquired;
418 if (rowsUpdated == 1) {
419 try (PreparedStatement ostmt = conn.prepareStatement(acquireByOwnerSql)) {
420 ostmt.setInt(1, nrPermits);
421 ostmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
422 ostmt.setNString(3, semName);
423 ostmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
424 jdbcTimeoutSeconds));
425 int nrUpdated = ostmt.executeUpdate();
426 if (nrUpdated != 1) {
427 throw new IllegalStateException("Updated " + nrUpdated + " is incorrect for " + ostmt);
428 }
429 }
430 acquired = Boolean.TRUE;
431 } else {
432 if (rowsUpdated > 1) {
433 throw new IllegalStateException("Too many rows updated! when trying to acquire " + nrPermits);
434 }
435 acquired = Boolean.FALSE;
436 }
437 long currNanoTime = TimeSource.nanoTime();
438 if (deadlineNanos - currNanoTime > heartBeat.getBeatDurationNanos()) {
439
440 beat.setValue(heartBeat.tryBeat(conn, currNanoTime, deadlineNanos));
441 }
442 return acquired;
443 }
444 }
445 }, deadlineNanos);
446 } catch (SQLTimeoutException ex) {
447 return false;
448 } catch (SQLException ex) {
449 throw new LockRuntimeException(ex);
450 }
451 if (beat.getValue()) {
452 heartBeat.updateLastRunNanos(TimeSource.nanoTime());
453 }
454 if (!acquired) {
455 long secondsLeft = JdbcTemplate.getTimeoutToDeadlineSecondsNoEx(deadlineNanos);
456 if (secondsLeft < 0) {
457 return false;
458 }
459 if (secondsLeft < CLEANUP_TIMEOUT_SECONDS) {
460 Future<Integer> fut = DefaultExecutor.INSTANCE.submit(
461 () -> removeDeadHeartBeatAndNotOwnerRows(CLEANUP_TIMEOUT_SECONDS));
462 try {
463 fut.get(secondsLeft, TimeUnit.SECONDS);
464 } catch (TimeoutException ex) {
465
466 break;
467 } catch (ExecutionException ex) {
468 throw new LockRuntimeException(ex);
469 }
470 } else {
471 try {
472 removeDeadHeartBeatAndNotOwnerRows(secondsLeft);
473 } catch (SQLTimeoutException ex) {
474 return false;
475 } catch (SQLException ex) {
476 throw new LockRuntimeException(ex);
477 }
478 }
479 try {
480 if (releaseDeadOwnerPermits(nrPermits) <= 0) {
481 long wtimeMilis = Math.min(TimeUnit.NANOSECONDS.toMillis(deadlineNanos - TimeSource.nanoTime()),
482 ThreadLocalRandom.current().nextLong(acquirePollMillis));
483 if (wtimeMilis > 0) {
484 syncObj.wait(wtimeMilis);
485 } else {
486 break;
487 }
488 }
489 } catch (SQLException ex) {
490 throw new LockRuntimeException(ex);
491 }
492
493 }
494 } while (!acquired && deadlineNanos > TimeSource.nanoTime());
495 if (acquired) {
496 ownedReservations += nrPermits;
497 }
498 return acquired;
499 }
500 }
501
502
503 @Override
504 @SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED")
505 public void release(final int nrReservations) {
506 synchronized (syncObj) {
507 try {
508 checkClosed();
509 jdbc.transactOnConnectionNonInterrupt(new HandlerNano<Connection, Void, SQLException>() {
510 @Override
511 public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
512 releaseReservations(conn, deadlineNanos, nrReservations);
513 try (PreparedStatement ostmt = conn.prepareStatement(releaseByOwnerSql)) {
514 ostmt.setInt(1, nrReservations);
515 ostmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
516 ostmt.setNString(3, semName);
517 ostmt.setInt(4, nrReservations);
518 ostmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
519 jdbcTimeoutSeconds));
520 int nrUpdated = ostmt.executeUpdate();
521 if (nrUpdated != 1) {
522 throw new IllegalStateException("Trying to release more than you own! " + ostmt);
523 }
524 }
525 return null;
526 }
527 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
528 } catch (SQLException ex) {
529 throw new LockRuntimeException(ex);
530 }
531 ownedReservations -= nrReservations;
532 if (ownedReservations < 0) {
533 throw new IllegalStateException("Should not be trying to release more than you acquired!" + nrReservations);
534 }
535 syncObj.notifyAll();
536 }
537 }
538
539 public void releaseAll() {
540 synchronized (syncObj) {
541 checkClosed();
542 release(ownedReservations);
543 }
544 }
545
546 private void releaseReservations(final Connection conn, final long deadlineNanos, final int nrReservations)
547 throws SQLException {
548 try (PreparedStatement stmt = conn.prepareStatement(releaseSql)) {
549 stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
550 jdbcTimeoutSeconds));
551 stmt.setInt(1, nrReservations);
552 stmt.setInt(2, nrReservations);
553 stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
554 stmt.setNString(4, semName);
555 stmt.executeUpdate();
556 }
557 }
558
559 @JmxExport(description = "Get the available semaphore permits")
560 public int availablePermits() throws SQLException, InterruptedException {
561 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
562 try (PreparedStatement stmt = conn.prepareStatement(permitsSql)) {
563 stmt.setNString(1, semName);
564 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
565 try (ResultSet rs = stmt.executeQuery()) {
566 if (!rs.next()) {
567 throw new IllegalStateException();
568 } else {
569 int result = rs.getInt(1);
570 if (rs.next()) {
571 throw new IllegalStateException();
572 }
573 return result;
574 }
575 }
576 }
577 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
578 }
579
580 @JmxExport(description = "get the number of permits owned by this process")
581 public int permitsOwned() throws SQLException, InterruptedException {
582 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
583 try (PreparedStatement stmt = conn.prepareStatement(ownedPermitsSql)) {
584 stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
585 stmt.setNString(2, semName);
586 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
587 try (ResultSet rs = stmt.executeQuery()) {
588 if (!rs.next()) {
589 throw new IllegalStateException();
590 } else {
591 int result = rs.getInt(1);
592 if (rs.next()) {
593 throw new IllegalStateException();
594 }
595 return result;
596 }
597 }
598 }
599 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
600 }
601
602 @JmxExport(description = "Get the total permits this semaphore can hand out")
603 public int totalPermits() throws SQLException, InterruptedException {
604 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
605 try (PreparedStatement stmt = conn.prepareStatement(totalPermitsSql)) {
606 stmt.setNString(1, semName);
607 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
608 try (ResultSet rs = stmt.executeQuery()) {
609 if (!rs.next()) {
610 throw new IllegalStateException();
611 } else {
612 return rs.getInt(1);
613 }
614 }
615 }
616 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
617 }
618
619 @JmxExport(description = "get a list of all dead owners which hold permits")
620 @Nonnull
621 public List<OwnerPermits> getDeadOwnerPermits(final int wishPermits) throws SQLException, InterruptedException {
622 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
623 return getDeadOwnerPermits(conn, deadlineNanos, wishPermits);
624 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
625 }
626
627 List<OwnerPermits> getDeadOwnerPermits(final Connection conn, final long deadlineNanos, final int wishPermits)
628 throws SQLException {
629 List<OwnerPermits> result = new ArrayList<>();
630 try (PreparedStatement stmt = conn.prepareStatement(getDeadOwnerPermitsSql)) {
631 stmt.setNString(1, semName);
632 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
633 try (ResultSet rs = stmt.executeQuery()) {
634 int nrPermits = 0;
635 while (rs.next()) {
636 OwnerPermits ownerPermit = new OwnerPermits(rs.getNString(1), rs.getInt(2));
637 result.add(ownerPermit);
638 nrPermits += ownerPermit.getNrPermits();
639 if (nrPermits >= wishPermits) {
640 break;
641 }
642 }
643 }
644 }
645 return result;
646 }
647
648
649
650
651
652
653
654
655
656 @JmxExport(description = "release dead owner permits")
657 @CheckReturnValue
658 public int releaseDeadOwnerPermits(@JmxExport(value = "wishPermits",
659 description = "how many we whish to release") final int wishPermits)
660 throws InterruptedException, SQLException {
661 return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
662 List<OwnerPermits> deadOwnerPermits = getDeadOwnerPermits(conn, deadlineNanos, wishPermits);
663 int released = 0;
664 for (OwnerPermits permit : deadOwnerPermits) {
665 try (PreparedStatement stmt = conn.prepareStatement(deleteDeadOwerRecordSql)) {
666 String owner = permit.getOwner();
667 stmt.setNString(1, owner);
668 stmt.setNString(2, semName);
669 int nrPermits = permit.getNrPermits();
670 stmt.setInt(3, nrPermits);
671 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
672 if (stmt.executeUpdate() == 1) {
673 released += nrPermits;
674 releaseReservations(conn, deadlineNanos, nrPermits);
675 LOG.warn("Released {} reservations from dead owner {}", nrPermits, owner);
676 }
677 }
678 }
679 return released;
680 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
681 }
682
683 @JmxExport(description = "Change the total available permits to the provided number")
684 public void updatePermits(final int nrPermits) throws SQLException, InterruptedException {
685 if (nrPermits < 0) {
686 throw new IllegalArgumentException("Permits must be positive and not " + nrPermits);
687 }
688 jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
689 @Override
690 public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
691 try (PreparedStatement stmt = conn.prepareStatement(updatePermitsSql)) {
692 stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
693 jdbcTimeoutSeconds));
694 stmt.setInt(1, nrPermits);
695 stmt.setInt(2, nrPermits);
696 stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
697 stmt.setNString(4, semName);
698 int rowsUpdated = stmt.executeUpdate();
699 if (rowsUpdated != 1) {
700 throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
701 }
702 }
703 return null;
704 }
705 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
706 }
707
708 @JmxExport(description = "Reduce the total available permits by the provided number")
709 public void reducePermits(final int nrPermits) throws SQLException, InterruptedException {
710 jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
711 @Override
712 public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
713 try (PreparedStatement stmt = conn.prepareStatement(reducePermitsSql)) {
714 stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
715 jdbcTimeoutSeconds));
716 stmt.setInt(1, nrPermits);
717 stmt.setInt(2, nrPermits);
718 stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
719 stmt.setNString(4, semName);
720 stmt.setInt(5, nrPermits);
721 int rowsUpdated = stmt.executeUpdate();
722 if (rowsUpdated != 1) {
723 throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
724 }
725 }
726 return null;
727 }
728 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
729 }
730
731 @JmxExport(description = "Increase the total available permits by the provided number")
732 public void increasePermits(final int nrPermits) throws SQLException, InterruptedException {
733 jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
734 @Override
735 public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
736 try (PreparedStatement stmt = conn.prepareStatement(increasePermitsSql)) {
737 stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
738 jdbcTimeoutSeconds));
739 stmt.setInt(1, nrPermits);
740 stmt.setInt(2, nrPermits);
741 stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
742 stmt.setNString(4, semName);
743 int rowsUpdated = stmt.executeUpdate();
744 if (rowsUpdated != 1) {
745 throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
746 }
747 }
748 return null;
749 }
750 }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
751 }
752
753 public int removeDeadHeartBeatAndNotOwnerRows(final long timeoutSeconds) throws SQLException, InterruptedException {
754 return jdbc.transactOnConnection(new HandlerNano<Connection, Integer, SQLException>() {
755 @Override
756 public Integer handle(final Connection conn, final long deadlineNanos) throws SQLException {
757 return removeDeadHeartBeatAndNotOwnerRows(conn, deadlineNanos);
758 }
759 }, timeoutSeconds, TimeUnit.SECONDS);
760 }
761
762 private int removeDeadHeartBeatAndNotOwnerRows(final Connection conn, final long deadlineNanos) throws SQLException {
763 int removedDeadHeartBeatRows = this.heartBeat.removeDeadHeartBeatRows(conn, deadlineNanos);
764 if (removedDeadHeartBeatRows > 0) {
765 return removeDeadNotOwnedRowsOnly(conn, deadlineNanos);
766 } else {
767 return 0;
768 }
769 }
770
771 private int removeDeadNotOwnedRowsOnly(final Connection conn, final long deadlineNanos) throws SQLException {
772 try (PreparedStatement stmt = conn.prepareStatement(deleteDeadOwnerRecordsSql)) {
773 stmt.setNString(1, semName);
774 stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
775 return stmt.executeUpdate();
776 }
777 }
778
779 @Override
780 public String toString() {
781 return "JdbcSemaphore{" + "jdbc=" + jdbc
782 + ", jdbcTimeoutSeconds=" + jdbcTimeoutSeconds + ", semName=" + semName + '}';
783 }
784
785 @Override
786 public void close() {
787 synchronized (syncObj) {
788 if (!isClosed) {
789 releaseAll();
790 unregisterJmx();
791 this.heartBeat.removeLifecycleHook(failureHook);
792 isClosed = true;
793 }
794 }
795 }
796
797 @Override
798 protected void finalize() throws Throwable {
799 try (AutoCloseable c = this) {
800 super.finalize();
801 }
802 }
803
804 @JmxExport
805 public int getJdbcTimeoutSeconds() {
806 return jdbcTimeoutSeconds;
807 }
808
809 @JmxExport
810 public boolean isIsHealthy() {
811 return isHealthy;
812 }
813
814
815 }