View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.concurrent.jdbc;
33  
34  import org.spf4j.concurrent.Semaphore;
35  import com.google.common.annotations.Beta;
36  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
37  import java.sql.Connection;
38  import java.sql.PreparedStatement;
39  import java.sql.ResultSet;
40  import java.sql.SQLException;
41  import java.sql.SQLIntegrityConstraintViolationException;
42  import java.sql.SQLTimeoutException;
43  import java.util.ArrayList;
44  import java.util.List;
45  import java.util.concurrent.ConcurrentHashMap;
46  import java.util.concurrent.ConcurrentMap;
47  import java.util.concurrent.ExecutionException;
48  import java.util.concurrent.Future;
49  import java.util.concurrent.ThreadLocalRandom;
50  import java.util.concurrent.TimeUnit;
51  import java.util.concurrent.TimeoutException;
52  import javax.annotation.CheckReturnValue;
53  import javax.annotation.Nonnull;
54  import javax.sql.DataSource;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  import org.spf4j.base.HandlerNano;
58  import org.spf4j.base.MutableHolder;
59  import org.spf4j.base.TimeSource;
60  import org.spf4j.concurrent.DefaultExecutor;
61  import org.spf4j.concurrent.LockRuntimeException;
62  import org.spf4j.jdbc.JdbcTemplate;
63  import org.spf4j.jmx.JmxExport;
64  import org.spf4j.jmx.Registry;
65  
66  /**
67   * A jdbc table based distributes semaphore implementation.
68   * Similar with a semaphore implemented with zookeeper, we rely on
69   * heartbeats to detect dead members. If you have a zookeeper instance accessible you should probably use a semaphore
70   * implemented with it... If you are already connecting to a database, this should be a reliable and low overhead
71   * implementation. Using a crappy database will give you crappy results.
72   *
73   * There are 3 tables involved:
74   *
75   * SEMAPHORES - keep track of available and total permits by semaphore.
76   * PERMITS_BY_OWNER - keeps track of all permits by
77   * owner.
78   * HEARTBEATS - keeps heartbeats by owner to detect - dead owners.
79   *
80   * All table names and columns are customizable to adapt this implementation to different naming conventions.
81   *
82   *
83   * @author zoly
84   */
85  @SuppressFBWarnings(value = {"NP_LOAD_OF_KNOWN_NULL_VALUE", "SQL_INJECTION_JDBC",
86    "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "PREDICTABLE_RANDOM"},
87          justification = "Sql injection is not really possible since the parameterized values are"
88                  + "  validated to be java ids")
89  @Beta
90  public final class JdbcSemaphore implements AutoCloseable, Semaphore {
91  
92    private static final int CLEANUP_TIMEOUT_SECONDS =
93            Integer.getInteger("spf4j.jdbc.semaphore.cleanupTimeoutSeconds", 60);
94  
95    private static final Logger LOG = LoggerFactory.getLogger(JdbcSemaphore.class);
96  
97    private static final ConcurrentMap<String, Object> SYNC_OBJS = new ConcurrentHashMap<>();
98  
99    private final JdbcTemplate jdbc;
100 
101   private final String permitsSql;
102 
103   private final String ownedPermitsSql;
104 
105   private final String totalPermitsSql;
106 
107   private final String reducePermitsSql;
108 
109   private final String increasePermitsSql;
110 
111   private final String updatePermitsSql;
112 
113   private final String acquireSql;
114 
115   private final String acquireByOwnerSql;
116 
117   private final String releaseSql;
118 
119   private final String releaseByOwnerSql;
120 
121   private final String deleteDeadOwnerRecordsSql;
122 
123   private final String getDeadOwnerPermitsSql;
124 
125   private final String deleteDeadOwerRecordSql;
126 
127   private final String insertLockRowSql;
128 
129   private final String insertPermitsByOwnerSql;
130 
131   private final int jdbcTimeoutSeconds;
132 
133   private final String semName;
134 
135   private final Object syncObj;
136 
137   private final JdbcHeartBeat heartBeat;
138 
139   private volatile boolean isHealthy;
140 
141   private boolean isClosed;
142 
143   private Error heartBeatFailure;
144 
145   private final int acquirePollMillis;
146 
147   private final JdbcHeartBeat.LifecycleHook failureHook;
148 
149   private int ownedReservations;
150 
151   /**
152    * @param dataSource  the jdbc data source with the Semaphores table. Please be sensible, no "test on borrow" pools.
153    * @param semaphoreName  number of initial permits, if semaphore already exists the existing nr of permits is kept.
154    * @param nrPermits  the number of initial permits.
155    */
156   public JdbcSemaphore(final DataSource dataSource, final String semaphoreName, final int nrPermits)
157           throws InterruptedException, SQLException {
158     this(dataSource, semaphoreName, nrPermits, false);
159   }
160 
161   /**
162    * create a JDBC Semaphore. create one instance / process.
163    *
164    * @param dataSource  the data source to use for sync.
165    * @param semaphoreName  the semaphore name.
166    * @param nrPermits  number of initial permits.
167    * @param strict  if true, if semaphore already exists and the total permits is different that param nrPermits an
168    * IllegalArgumentException will be thrown.
169    */
170   public JdbcSemaphore(final DataSource dataSource, final String semaphoreName,
171           final int nrPermits, final boolean strict) throws InterruptedException, SQLException {
172     this(dataSource, SemaphoreTablesDesc.DEFAULT, semaphoreName, nrPermits,
173             Integer.getInteger("spf4j.jdbc.semaphore.jdbcTimeoutSeconds", 10), strict);
174   }
175 
176   public JdbcSemaphore(final DataSource dataSource, final SemaphoreTablesDesc semTableDesc,
177           final String semaphoreName, final int nrPermits, final int jdbcTimeoutSeconds,
178           final boolean strictReservations) throws InterruptedException, SQLException {
179     this(dataSource, semTableDesc, semaphoreName, nrPermits, jdbcTimeoutSeconds, strictReservations,
180             Integer.getInteger("spf4j.jdbc.semaphore.defaultMaxPollIntervalMillis", 1000));
181   }
182 
183 
184   @SuppressFBWarnings({"CBX_CUSTOM_BUILT_XML", "STT_TOSTRING_STORED_IN_FIELD"}) // no sql builder (yet)
185   public JdbcSemaphore(final DataSource dataSource, final SemaphoreTablesDesc semTableDesc,
186           final String semaphoreName, final int nrPermits, final int jdbcTimeoutSeconds,
187           final boolean strictReservations, final int acquirePollMillis) throws InterruptedException, SQLException {
188     if (nrPermits < 0) {
189       throw new IllegalArgumentException("Permits must be positive and not " + nrPermits);
190     }
191     this.acquirePollMillis = acquirePollMillis;
192     this.semName = semaphoreName;
193     this.syncObj = SYNC_OBJS.computeIfAbsent(semaphoreName, (key) -> new Object());
194     this.jdbcTimeoutSeconds = jdbcTimeoutSeconds;
195     this.jdbc = new JdbcTemplate(dataSource);
196     this.isHealthy = true;
197     this.ownedReservations = 0;
198     this.failureHook = new JdbcHeartBeat.LifecycleHook() {
199       @Override
200       public void onError(final Error error) {
201         heartBeatFailure = error;
202         isHealthy = false;
203       }
204 
205       @Override
206       public void onClose() {
207           close();
208       }
209     };
210     this.heartBeat = JdbcHeartBeat.getHeartBeatAndSubscribe(dataSource,
211             semTableDesc.getHeartBeatTableDesc(), failureHook);
212     final String semaphoreTableName = semTableDesc.getSemaphoreTableName();
213     String availablePermitsColumn = semTableDesc.getAvailablePermitsColumn();
214     String lastModifiedByColumn = semTableDesc.getLastModifiedByColumn();
215     String lastModifiedAtColumn = semTableDesc.getLastModifiedAtColumn();
216     String ownerColumn = semTableDesc.getOwnerColumn();
217     String semaphoreNameColumn = semTableDesc.getSemNameColumn();
218     String totalPermitsColumn = semTableDesc.getTotalPermitsColumn();
219     String ownerPermitsColumn = semTableDesc.getOwnerPermitsColumn();
220     String permitsByOwnerTableName = semTableDesc.getPermitsByOwnerTableName();
221     HeartBeatTableDesc hbTableDesc = heartBeat.getHbTableDesc();
222     String heartBeatTableName = hbTableDesc.getTableName();
223     String heartBeatOwnerColumn = hbTableDesc.getOwnerColumn();
224     String currentTimeMillisFunc = hbTableDesc.getDbType().getCurrTSSqlFn();
225 
226     this.reducePermitsSql = "UPDATE " + semaphoreTableName + " SET "
227             + totalPermitsColumn + " = " + totalPermitsColumn + " - ?, "
228             + availablePermitsColumn + " = " + availablePermitsColumn + " - ?, "
229             + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
230             + semaphoreNameColumn + " = ? AND "
231             + totalPermitsColumn + " >= ?";
232 
233     this.increasePermitsSql = "UPDATE " + semaphoreTableName + " SET "
234             + totalPermitsColumn + " = " + totalPermitsColumn + " + ?, "
235             + availablePermitsColumn + " = " + availablePermitsColumn + " + ?, "
236             + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
237             + semaphoreNameColumn + " = ? ";
238 
239     this.updatePermitsSql = "UPDATE " + semaphoreTableName + " SET "
240             + totalPermitsColumn + " =  ?, "
241             + availablePermitsColumn + " =  " + availablePermitsColumn + " + ? - " + totalPermitsColumn + ','
242             + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
243             + semaphoreNameColumn + " = ?";
244 
245     this.acquireSql = "UPDATE " + semaphoreTableName + " SET "
246             + availablePermitsColumn + " = " + availablePermitsColumn + " - ?, "
247             + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
248             + semaphoreNameColumn + " = ? AND "
249             + availablePermitsColumn + " >= ?";
250     this.acquireByOwnerSql = "UPDATE " + permitsByOwnerTableName
251             + " SET " + ownerPermitsColumn + " = " + ownerPermitsColumn + " + ?, "
252             + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
253             + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ?";
254 
255     this.releaseSql = "UPDATE " + semaphoreTableName + " SET "
256             + availablePermitsColumn + " = CASE WHEN "
257             + availablePermitsColumn + " + ? > " + totalPermitsColumn
258             + " THEN " + totalPermitsColumn + " ELSE " + availablePermitsColumn + " + ? END, "
259             + lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
260             + semaphoreNameColumn + " = ?";
261 
262     this.releaseByOwnerSql = "UPDATE " + permitsByOwnerTableName
263             + " SET " + ownerPermitsColumn + " = " + ownerPermitsColumn
264             + " - ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
265             + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ? and " + ownerPermitsColumn + " >= ?";
266 
267     this.permitsSql = "SELECT " + availablePermitsColumn + ',' + totalPermitsColumn
268             + " FROM " + semaphoreTableName
269             + " WHERE " + semaphoreNameColumn + " = ?";
270 
271     this.totalPermitsSql = "SELECT " + totalPermitsColumn + " FROM " + semaphoreTableName
272             + " WHERE " + semTableDesc.getSemNameColumn() + " = ?";
273 
274     this.ownedPermitsSql = "SELECT " + ownerPermitsColumn + " FROM "
275             + permitsByOwnerTableName + " WHERE "
276             + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ?";
277 
278     this.deleteDeadOwnerRecordsSql = "DELETE FROM " + permitsByOwnerTableName + " RO "
279             + "WHERE RO." + semaphoreNameColumn + " = ? AND " + ownerPermitsColumn + " = 0 AND "
280             + "NOT EXISTS (select H." + heartBeatOwnerColumn + " from " + heartBeatTableName
281             + " H where H." + heartBeatOwnerColumn + " = RO." + ownerColumn + ')';
282 
283     this.getDeadOwnerPermitsSql = "SELECT " + ownerColumn + ", " + ownerPermitsColumn
284             + " FROM " + permitsByOwnerTableName + " RO "
285             + "WHERE RO." + semaphoreNameColumn + " = ? AND  " + ownerPermitsColumn + " > 0 AND "
286             + "NOT EXISTS (select H." + heartBeatOwnerColumn + " from " + heartBeatTableName
287             + " H where H." + heartBeatOwnerColumn + " = RO." + ownerColumn
288             + ") ORDER BY " + ownerColumn + ',' + ownerPermitsColumn;
289 
290     this.deleteDeadOwerRecordSql = "DELETE FROM " + permitsByOwnerTableName + " WHERE "
291             + ownerColumn + " = ? AND " + semaphoreNameColumn + " = ? AND "
292             + ownerPermitsColumn + " = ?";
293 
294     this.insertLockRowSql = "insert into " + semaphoreTableName
295                     + " (" + semaphoreNameColumn + ',' + availablePermitsColumn + ',' + totalPermitsColumn
296                     + ',' + lastModifiedByColumn + ',' + lastModifiedAtColumn + ") VALUES (?, ?, ?, ?, "
297                     + currentTimeMillisFunc + ')';
298 
299     this.insertPermitsByOwnerSql = "insert into " + permitsByOwnerTableName
300               + " (" + semaphoreNameColumn + ',' + ownerColumn + ',' + ownerPermitsColumn + ','
301               + lastModifiedAtColumn + ") VALUES (?, ?, ?, " + currentTimeMillisFunc + ")";
302 
303 
304     try {
305       createLockRowIfNotPresent(strictReservations, nrPermits);
306     } catch (SQLIntegrityConstraintViolationException ex) {
307       try {
308         // RACE condition while creating the row, will retry to validate if everything is OK.
309         createLockRowIfNotPresent(strictReservations, nrPermits);
310       } catch (SQLException ex1) {
311         ex1.addSuppressed(ex);
312         throw ex1;
313       }
314     }
315     createOwnerRowIfNotPresent();
316   }
317 
318   public void registerJmx() {
319     Registry.export(JdbcSemaphore.class.getName(), semName, this);
320   }
321 
322   public void unregisterJmx() {
323     Registry.unregister(JdbcSemaphore.class.getName(), semName);
324   }
325 
326   private void validate() {
327     if (!isHealthy) {
328       throw new IllegalStateException("Heartbeats failed! semaphore broken " + this, heartBeatFailure);
329     }
330   }
331 
332   private void checkClosed() {
333     if (isClosed) {
334       throw new IllegalStateException("Semaphore " + this + " is closed");
335     }
336   }
337 
338   private void createLockRowIfNotPresent(final boolean strictReservations, final int nrPermits)
339           throws SQLException, InterruptedException {
340     jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
341       try (PreparedStatement stmt = conn.prepareStatement(permitsSql)) {
342         stmt.setNString(1, semName);
343         stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
344         try (ResultSet rs = stmt.executeQuery()) {
345           if (!rs.next()) {
346             try (PreparedStatement insert = conn.prepareStatement(insertLockRowSql)) {
347               insert.setNString(1, semName);
348               insert.setInt(2, nrPermits);
349               insert.setInt(3, nrPermits);
350               insert.setNString(4, org.spf4j.base.Runtime.PROCESS_ID);
351               insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
352               insert.executeUpdate();
353             }
354           } else if (strictReservations) { // there is a record already. for now blow up if different nr reservations.
355             int existingMaxReservations = rs.getInt(2);
356             if (existingMaxReservations != nrPermits) {
357               throw new IllegalArgumentException("Semaphore " + semName + " max reservations count different "
358                       + existingMaxReservations + " != " + nrPermits + " use different semaphore");
359             }
360             if (rs.next()) {
361               throw new IllegalStateException("Cannot have mutiple semaphores with the same name " + semName);
362             }
363           } else if (rs.next()) {
364             throw new IllegalStateException("Cannot have mutiple semaphores with the same name " + semName);
365           }
366         }
367       }
368       return null;
369     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
370   }
371 
372   private void createOwnerRowIfNotPresent()
373           throws SQLException, InterruptedException {
374     try {
375       jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
376 
377         try (PreparedStatement insert = conn.prepareStatement(insertPermitsByOwnerSql)) {
378           insert.setNString(1, this.semName);
379           insert.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
380           insert.setInt(3, 0);
381           insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
382           insert.executeUpdate();
383         }
384         return null;
385       }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
386     } catch (SQLIntegrityConstraintViolationException ex) {
387       LOG.debug("Semaphore record for current process already there", ex);
388     }
389   }
390 
391   @SuppressFBWarnings("UW_UNCOND_WAIT")
392   @CheckReturnValue
393   @Override
394   public boolean tryAcquire(final int nrPermits, final long deadlineNanos)
395           throws InterruptedException {
396     if (nrPermits < 1) {
397       throw new IllegalArgumentException("You should try to acquire something! not " + nrPermits);
398     }
399     synchronized (syncObj) {
400       boolean acquired = false;
401       final MutableHolder<Boolean> beat = MutableHolder.of(Boolean.FALSE);
402       do {
403         checkClosed();
404         validate();
405         try {
406           acquired = jdbc.transactOnConnection(new HandlerNano<Connection, Boolean, SQLException>() {
407             @Override
408             public Boolean handle(final Connection conn, final long deadlineNanos) throws SQLException {
409               try (PreparedStatement stmt = conn.prepareStatement(acquireSql)) {
410                 stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
411                         jdbcTimeoutSeconds));
412                 stmt.setInt(1, nrPermits);
413                 stmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
414                 stmt.setNString(3, semName);
415                 stmt.setInt(4, nrPermits);
416                 int rowsUpdated = stmt.executeUpdate();
417                 Boolean acquired;
418                 if (rowsUpdated == 1) {
419                   try (PreparedStatement ostmt = conn.prepareStatement(acquireByOwnerSql)) {
420                     ostmt.setInt(1, nrPermits);
421                     ostmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
422                     ostmt.setNString(3, semName);
423                     ostmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
424                             jdbcTimeoutSeconds));
425                     int nrUpdated = ostmt.executeUpdate();
426                     if (nrUpdated != 1) {
427                       throw new IllegalStateException("Updated " + nrUpdated + " is incorrect for " + ostmt);
428                     }
429                   }
430                   acquired = Boolean.TRUE;
431                 } else {
432                   if (rowsUpdated > 1) {
433                     throw new IllegalStateException("Too many rows updated! when trying to acquire " + nrPermits);
434                   }
435                   acquired = Boolean.FALSE;
436                 }
437                 long currNanoTime = TimeSource.nanoTime();
438                 if (deadlineNanos - currNanoTime > heartBeat.getBeatDurationNanos()) {
439                   // do a heartbeat if have time, and if it makes sense.
440                   beat.setValue(heartBeat.tryBeat(conn, currNanoTime, deadlineNanos));
441                 }
442                 return acquired;
443               }
444             }
445           }, deadlineNanos);
446         } catch (SQLTimeoutException ex) {
447           return false;
448         } catch (SQLException ex) {
449           throw new LockRuntimeException(ex);
450         }
451         if (beat.getValue()) { // we did a heartbeat as part of the acquisition.
452           heartBeat.updateLastRunNanos(TimeSource.nanoTime());
453         }
454         if (!acquired) {
455           long secondsLeft = JdbcTemplate.getTimeoutToDeadlineSecondsNoEx(deadlineNanos);
456           if (secondsLeft < 0) {
457             return false;
458           }
459           if (secondsLeft < CLEANUP_TIMEOUT_SECONDS) {
460             Future<Integer> fut = DefaultExecutor.INSTANCE.submit(
461                     () -> removeDeadHeartBeatAndNotOwnerRows(CLEANUP_TIMEOUT_SECONDS));
462             try {
463               fut.get(secondsLeft, TimeUnit.SECONDS);
464             } catch (TimeoutException ex) {
465               //removing dead entries did not finish in time, but continues in the background.
466               break;
467             } catch (ExecutionException ex) {
468               throw new LockRuntimeException(ex);
469             }
470           } else {
471             try {
472               removeDeadHeartBeatAndNotOwnerRows(secondsLeft);
473             } catch (SQLTimeoutException ex) {
474               return false;
475             } catch (SQLException ex) {
476               throw new LockRuntimeException(ex);
477             }
478           }
479           try {
480             if (releaseDeadOwnerPermits(nrPermits) <= 0) { //wait of we did not find anything dead to release.
481               long wtimeMilis = Math.min(TimeUnit.NANOSECONDS.toMillis(deadlineNanos - TimeSource.nanoTime()),
482                       ThreadLocalRandom.current().nextLong(acquirePollMillis));
483               if (wtimeMilis > 0) {
484                 syncObj.wait(wtimeMilis);
485               } else {
486                 break;
487               }
488             }
489           } catch (SQLException ex) {
490             throw new LockRuntimeException(ex);
491           }
492 
493         }
494       } while (!acquired && deadlineNanos > TimeSource.nanoTime());
495       if (acquired) {
496         ownedReservations += nrPermits;
497       }
498       return acquired;
499     }
500   }
501 
502 
503   @Override
504   @SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED")
505   public void release(final int nrReservations) {
506     synchronized (syncObj) {
507       try {
508         checkClosed();
509         jdbc.transactOnConnectionNonInterrupt(new HandlerNano<Connection, Void, SQLException>() {
510           @Override
511           public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
512             releaseReservations(conn, deadlineNanos, nrReservations);
513             try (PreparedStatement ostmt = conn.prepareStatement(releaseByOwnerSql)) {
514               ostmt.setInt(1, nrReservations);
515               ostmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
516               ostmt.setNString(3, semName);
517               ostmt.setInt(4, nrReservations);
518               ostmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
519                       jdbcTimeoutSeconds));
520               int nrUpdated = ostmt.executeUpdate();
521               if (nrUpdated != 1) {
522                 throw new IllegalStateException("Trying to release more than you own! " + ostmt);
523               }
524             }
525             return null;
526           }
527         }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
528       } catch (SQLException ex) {
529         throw new LockRuntimeException(ex);
530       }
531       ownedReservations -= nrReservations;
532       if (ownedReservations < 0) {
533         throw new IllegalStateException("Should not be trying to release more than you acquired!" + nrReservations);
534       }
535       syncObj.notifyAll();
536     }
537   }
538 
539   public void releaseAll() {
540     synchronized (syncObj) {
541       checkClosed();
542       release(ownedReservations);
543     }
544   }
545 
546   private void releaseReservations(final Connection conn, final long deadlineNanos, final int nrReservations)
547           throws SQLException {
548     try (PreparedStatement stmt = conn.prepareStatement(releaseSql)) {
549       stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
550               jdbcTimeoutSeconds));
551       stmt.setInt(1, nrReservations);
552       stmt.setInt(2, nrReservations);
553       stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
554       stmt.setNString(4, semName);
555       stmt.executeUpdate(); // Since a release might or might not update a row.
556     }
557   }
558 
559   @JmxExport(description = "Get the available semaphore permits")
560   public int availablePermits() throws SQLException, InterruptedException {
561     return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
562       try (PreparedStatement stmt = conn.prepareStatement(permitsSql)) {
563         stmt.setNString(1, semName);
564         stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
565         try (ResultSet rs = stmt.executeQuery()) {
566           if (!rs.next()) {
567             throw new IllegalStateException();
568           } else {
569             int result = rs.getInt(1);
570             if (rs.next()) {
571               throw new IllegalStateException();
572             }
573             return result;
574           }
575         }
576       }
577     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
578   }
579 
580   @JmxExport(description = "get the number of permits owned by this process")
581   public int permitsOwned() throws SQLException, InterruptedException {
582     return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
583       try (PreparedStatement stmt = conn.prepareStatement(ownedPermitsSql)) {
584         stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
585         stmt.setNString(2, semName);
586         stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
587         try (ResultSet rs = stmt.executeQuery()) {
588           if (!rs.next()) {
589             throw new IllegalStateException();
590           } else {
591             int result = rs.getInt(1);
592             if (rs.next()) {
593               throw new IllegalStateException();
594             }
595             return result;
596           }
597         }
598       }
599     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
600   }
601 
602   @JmxExport(description = "Get the total permits this semaphore can hand out")
603   public int totalPermits() throws SQLException, InterruptedException {
604     return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
605       try (PreparedStatement stmt = conn.prepareStatement(totalPermitsSql)) {
606         stmt.setNString(1, semName);
607         stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
608         try (ResultSet rs = stmt.executeQuery()) {
609           if (!rs.next()) {
610             throw new IllegalStateException();
611           } else {
612             return rs.getInt(1);
613           }
614         }
615       }
616     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
617   }
618 
619   @JmxExport(description = "get a list of all dead owners which hold permits")
620   @Nonnull
621   public List<OwnerPermits> getDeadOwnerPermits(final int wishPermits) throws SQLException, InterruptedException {
622     return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
623       return getDeadOwnerPermits(conn, deadlineNanos, wishPermits);
624     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
625   }
626 
627   List<OwnerPermits> getDeadOwnerPermits(final Connection conn, final long deadlineNanos, final int wishPermits)
628           throws SQLException {
629     List<OwnerPermits> result = new ArrayList<>();
630     try (PreparedStatement stmt = conn.prepareStatement(getDeadOwnerPermitsSql)) {
631       stmt.setNString(1, semName);
632       stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
633       try (ResultSet rs = stmt.executeQuery()) {
634         int nrPermits = 0;
635         while (rs.next()) {
636           OwnerPermits ownerPermit = new OwnerPermits(rs.getNString(1), rs.getInt(2));
637           result.add(ownerPermit);
638           nrPermits += ownerPermit.getNrPermits();
639           if (nrPermits >= wishPermits) {
640             break;
641           }
642         }
643       }
644     }
645     return result;
646   }
647 
648   /**
649    * Attempts to release permits for this semaphore owned by dead owners.
650    *
651    * @param wishPermits - How many permits we would like to get released.
652    * @return - the number of permits we actually released.
653    * @throws SQLException - something went wrong with the db.
654    * @throws InterruptedException - thrown if thread is interrupted.
655    */
656   @JmxExport(description = "release dead owner permits")
657   @CheckReturnValue
658   public int releaseDeadOwnerPermits(@JmxExport(value = "wishPermits",
659           description = "how many we whish to release") final int wishPermits)
660           throws InterruptedException, SQLException {
661       return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
662         List<OwnerPermits> deadOwnerPermits = getDeadOwnerPermits(conn, deadlineNanos, wishPermits);
663         int released = 0;
664         for (OwnerPermits permit : deadOwnerPermits) {
665           try (PreparedStatement stmt = conn.prepareStatement(deleteDeadOwerRecordSql)) {
666             String owner = permit.getOwner();
667             stmt.setNString(1, owner);
668             stmt.setNString(2, semName);
669             int nrPermits = permit.getNrPermits();
670             stmt.setInt(3, nrPermits);
671             stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
672             if (stmt.executeUpdate() == 1) { // I can release! if not somebody else is doing it.
673               released += nrPermits;
674               releaseReservations(conn, deadlineNanos, nrPermits);
675               LOG.warn("Released {} reservations from dead owner {}", nrPermits, owner);
676             }
677           }
678         }
679         return released;
680       }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
681   }
682 
683   @JmxExport(description = "Change the total available permits to the provided number")
684   public void updatePermits(final int nrPermits) throws SQLException, InterruptedException {
685     if (nrPermits < 0) {
686       throw new IllegalArgumentException("Permits must be positive and not " + nrPermits);
687     }
688     jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
689       @Override
690       public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
691         try (PreparedStatement stmt = conn.prepareStatement(updatePermitsSql)) {
692           stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
693                   jdbcTimeoutSeconds));
694           stmt.setInt(1, nrPermits);
695           stmt.setInt(2, nrPermits);
696           stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
697           stmt.setNString(4, semName);
698           int rowsUpdated = stmt.executeUpdate();
699           if (rowsUpdated != 1) {
700             throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
701           }
702         }
703         return null;
704       }
705     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
706   }
707 
708   @JmxExport(description = "Reduce the total available permits by the provided number")
709   public void reducePermits(final int nrPermits) throws SQLException, InterruptedException {
710     jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
711       @Override
712       public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
713         try (PreparedStatement stmt = conn.prepareStatement(reducePermitsSql)) {
714           stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
715                   jdbcTimeoutSeconds));
716           stmt.setInt(1, nrPermits);
717           stmt.setInt(2, nrPermits);
718           stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
719           stmt.setNString(4, semName);
720           stmt.setInt(5, nrPermits);
721           int rowsUpdated = stmt.executeUpdate();
722           if (rowsUpdated != 1) {
723             throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
724           }
725         }
726         return null;
727       }
728     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
729   }
730 
731   @JmxExport(description = "Increase the total available permits by the provided number")
732   public void increasePermits(final int nrPermits) throws SQLException, InterruptedException {
733     jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
734       @Override
735       public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
736         try (PreparedStatement stmt = conn.prepareStatement(increasePermitsSql)) {
737           stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
738                   jdbcTimeoutSeconds));
739           stmt.setInt(1, nrPermits);
740           stmt.setInt(2, nrPermits);
741           stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
742           stmt.setNString(4, semName);
743           int rowsUpdated = stmt.executeUpdate();
744           if (rowsUpdated != 1) {
745             throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
746           }
747         }
748         return null;
749       }
750     }, jdbcTimeoutSeconds, TimeUnit.SECONDS);
751   }
752 
753   public int removeDeadHeartBeatAndNotOwnerRows(final long timeoutSeconds) throws SQLException, InterruptedException {
754     return jdbc.transactOnConnection(new HandlerNano<Connection, Integer, SQLException>() {
755       @Override
756       public Integer handle(final Connection conn, final long deadlineNanos) throws SQLException {
757         return removeDeadHeartBeatAndNotOwnerRows(conn, deadlineNanos);
758       }
759     }, timeoutSeconds, TimeUnit.SECONDS);
760   }
761 
762   private int removeDeadHeartBeatAndNotOwnerRows(final Connection conn, final long deadlineNanos) throws SQLException {
763     int removedDeadHeartBeatRows = this.heartBeat.removeDeadHeartBeatRows(conn, deadlineNanos);
764     if (removedDeadHeartBeatRows > 0) {
765       return removeDeadNotOwnedRowsOnly(conn, deadlineNanos);
766     } else {
767       return 0;
768     }
769   }
770 
771   private int removeDeadNotOwnedRowsOnly(final Connection conn, final long deadlineNanos) throws SQLException {
772     try (PreparedStatement stmt = conn.prepareStatement(deleteDeadOwnerRecordsSql)) {
773       stmt.setNString(1, semName);
774       stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
775       return stmt.executeUpdate();
776     }
777   }
778 
779   @Override
780   public String toString() {
781     return "JdbcSemaphore{" + "jdbc=" + jdbc
782             + ", jdbcTimeoutSeconds=" + jdbcTimeoutSeconds + ", semName=" + semName + '}';
783   }
784 
785   @Override
786   public void close() {
787     synchronized (syncObj) {
788       if (!isClosed) {
789         releaseAll();
790         unregisterJmx();
791         this.heartBeat.removeLifecycleHook(failureHook);
792         isClosed = true;
793       }
794     }
795   }
796 
797   @Override
798   protected void finalize() throws Throwable  {
799     try (AutoCloseable c = this) {
800       super.finalize();
801     }
802   }
803 
804   @JmxExport
805   public int getJdbcTimeoutSeconds() {
806     return jdbcTimeoutSeconds;
807   }
808 
809   @JmxExport
810   public boolean isIsHealthy() {
811     return isHealthy;
812   }
813 
814 
815 }