JdbcSemaphore.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.concurrent.jdbc;
import org.spf4j.concurrent.Semaphore;
import com.google.common.annotations.Beta;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.HandlerNano;
import org.spf4j.base.MutableHolder;
import org.spf4j.base.TimeSource;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.concurrent.LockRuntimeException;
import org.spf4j.jdbc.JdbcTemplate;
import org.spf4j.jmx.JmxExport;
import org.spf4j.jmx.Registry;
/**
* A jdbc table based distributes semaphore implementation.
* Similar with a semaphore implemented with zookeeper, we rely on
* heartbeats to detect dead members. If you have a zookeeper instance accessible you should probably use a semaphore
* implemented with it... If you are already connecting to a database, this should be a reliable and low overhead
* implementation. Using a crappy database will give you crappy results.
*
* There are 3 tables involved:
*
* SEMAPHORES - keep track of available and total permits by semaphore.
* PERMITS_BY_OWNER - keeps track of all permits by
* owner.
* HEARTBEATS - keeps heartbeats by owner to detect - dead owners.
*
* All table names and columns are customizable to adapt this implementation to different naming conventions.
*
*
* @author zoly
*/
@SuppressFBWarnings(value = {"NP_LOAD_OF_KNOWN_NULL_VALUE", "SQL_INJECTION_JDBC",
"SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING", "PREDICTABLE_RANDOM"},
justification = "Sql injection is not really possible since the parameterized values are"
+ " validated to be java ids")
@Beta
public final class JdbcSemaphore implements AutoCloseable, Semaphore {
private static final int CLEANUP_TIMEOUT_SECONDS =
Integer.getInteger("spf4j.jdbc.semaphore.cleanupTimeoutSeconds", 60);
private static final Logger LOG = LoggerFactory.getLogger(JdbcSemaphore.class);
private static final ConcurrentMap<String, Object> SYNC_OBJS = new ConcurrentHashMap<>();
private final JdbcTemplate jdbc;
private final String permitsSql;
private final String ownedPermitsSql;
private final String totalPermitsSql;
private final String reducePermitsSql;
private final String increasePermitsSql;
private final String updatePermitsSql;
private final String acquireSql;
private final String acquireByOwnerSql;
private final String releaseSql;
private final String releaseByOwnerSql;
private final String deleteDeadOwnerRecordsSql;
private final String getDeadOwnerPermitsSql;
private final String deleteDeadOwerRecordSql;
private final String insertLockRowSql;
private final String insertPermitsByOwnerSql;
private final int jdbcTimeoutSeconds;
private final String semName;
private final Object syncObj;
private final JdbcHeartBeat heartBeat;
private volatile boolean isHealthy;
private boolean isClosed;
private Error heartBeatFailure;
private final int acquirePollMillis;
private final JdbcHeartBeat.LifecycleHook failureHook;
private int ownedReservations;
/**
* @param dataSource the jdbc data source with the Semaphores table. Please be sensible, no "test on borrow" pools.
* @param semaphoreName number of initial permits, if semaphore already exists the existing nr of permits is kept.
* @param nrPermits the number of initial permits.
*/
public JdbcSemaphore(final DataSource dataSource, final String semaphoreName, final int nrPermits)
throws InterruptedException, SQLException {
this(dataSource, semaphoreName, nrPermits, false);
}
/**
* create a JDBC Semaphore. create one instance / process.
*
* @param dataSource the data source to use for sync.
* @param semaphoreName the semaphore name.
* @param nrPermits number of initial permits.
* @param strict if true, if semaphore already exists and the total permits is different that param nrPermits an
* IllegalArgumentException will be thrown.
*/
public JdbcSemaphore(final DataSource dataSource, final String semaphoreName,
final int nrPermits, final boolean strict) throws InterruptedException, SQLException {
this(dataSource, SemaphoreTablesDesc.DEFAULT, semaphoreName, nrPermits,
Integer.getInteger("spf4j.jdbc.semaphore.jdbcTimeoutSeconds", 10), strict);
}
public JdbcSemaphore(final DataSource dataSource, final SemaphoreTablesDesc semTableDesc,
final String semaphoreName, final int nrPermits, final int jdbcTimeoutSeconds,
final boolean strictReservations) throws InterruptedException, SQLException {
this(dataSource, semTableDesc, semaphoreName, nrPermits, jdbcTimeoutSeconds, strictReservations,
Integer.getInteger("spf4j.jdbc.semaphore.defaultMaxPollIntervalMillis", 1000));
}
@SuppressFBWarnings({"CBX_CUSTOM_BUILT_XML", "STT_TOSTRING_STORED_IN_FIELD"}) // no sql builder (yet)
public JdbcSemaphore(final DataSource dataSource, final SemaphoreTablesDesc semTableDesc,
final String semaphoreName, final int nrPermits, final int jdbcTimeoutSeconds,
final boolean strictReservations, final int acquirePollMillis) throws InterruptedException, SQLException {
if (nrPermits < 0) {
throw new IllegalArgumentException("Permits must be positive and not " + nrPermits);
}
this.acquirePollMillis = acquirePollMillis;
this.semName = semaphoreName;
this.syncObj = SYNC_OBJS.computeIfAbsent(semaphoreName, (key) -> new Object());
this.jdbcTimeoutSeconds = jdbcTimeoutSeconds;
this.jdbc = new JdbcTemplate(dataSource);
this.isHealthy = true;
this.ownedReservations = 0;
this.failureHook = new JdbcHeartBeat.LifecycleHook() {
@Override
public void onError(final Error error) {
heartBeatFailure = error;
isHealthy = false;
}
@Override
public void onClose() {
close();
}
};
this.heartBeat = JdbcHeartBeat.getHeartBeatAndSubscribe(dataSource,
semTableDesc.getHeartBeatTableDesc(), failureHook);
final String semaphoreTableName = semTableDesc.getSemaphoreTableName();
String availablePermitsColumn = semTableDesc.getAvailablePermitsColumn();
String lastModifiedByColumn = semTableDesc.getLastModifiedByColumn();
String lastModifiedAtColumn = semTableDesc.getLastModifiedAtColumn();
String ownerColumn = semTableDesc.getOwnerColumn();
String semaphoreNameColumn = semTableDesc.getSemNameColumn();
String totalPermitsColumn = semTableDesc.getTotalPermitsColumn();
String ownerPermitsColumn = semTableDesc.getOwnerPermitsColumn();
String permitsByOwnerTableName = semTableDesc.getPermitsByOwnerTableName();
HeartBeatTableDesc hbTableDesc = heartBeat.getHbTableDesc();
String heartBeatTableName = hbTableDesc.getTableName();
String heartBeatOwnerColumn = hbTableDesc.getOwnerColumn();
String currentTimeMillisFunc = hbTableDesc.getDbType().getCurrTSSqlFn();
this.reducePermitsSql = "UPDATE " + semaphoreTableName + " SET "
+ totalPermitsColumn + " = " + totalPermitsColumn + " - ?, "
+ availablePermitsColumn + " = " + availablePermitsColumn + " - ?, "
+ lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ semaphoreNameColumn + " = ? AND "
+ totalPermitsColumn + " >= ?";
this.increasePermitsSql = "UPDATE " + semaphoreTableName + " SET "
+ totalPermitsColumn + " = " + totalPermitsColumn + " + ?, "
+ availablePermitsColumn + " = " + availablePermitsColumn + " + ?, "
+ lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ semaphoreNameColumn + " = ? ";
this.updatePermitsSql = "UPDATE " + semaphoreTableName + " SET "
+ totalPermitsColumn + " = ?, "
+ availablePermitsColumn + " = " + availablePermitsColumn + " + ? - " + totalPermitsColumn + ','
+ lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ semaphoreNameColumn + " = ?";
this.acquireSql = "UPDATE " + semaphoreTableName + " SET "
+ availablePermitsColumn + " = " + availablePermitsColumn + " - ?, "
+ lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ semaphoreNameColumn + " = ? AND "
+ availablePermitsColumn + " >= ?";
this.acquireByOwnerSql = "UPDATE " + permitsByOwnerTableName
+ " SET " + ownerPermitsColumn + " = " + ownerPermitsColumn + " + ?, "
+ lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ ownerColumn + " = ? AND " + semaphoreNameColumn + " = ?";
this.releaseSql = "UPDATE " + semaphoreTableName + " SET "
+ availablePermitsColumn + " = CASE WHEN "
+ availablePermitsColumn + " + ? > " + totalPermitsColumn
+ " THEN " + totalPermitsColumn + " ELSE " + availablePermitsColumn + " + ? END, "
+ lastModifiedByColumn + " = ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ semaphoreNameColumn + " = ?";
this.releaseByOwnerSql = "UPDATE " + permitsByOwnerTableName
+ " SET " + ownerPermitsColumn + " = " + ownerPermitsColumn
+ " - ?, " + lastModifiedAtColumn + " = " + currentTimeMillisFunc + " WHERE "
+ ownerColumn + " = ? AND " + semaphoreNameColumn + " = ? and " + ownerPermitsColumn + " >= ?";
this.permitsSql = "SELECT " + availablePermitsColumn + ',' + totalPermitsColumn
+ " FROM " + semaphoreTableName
+ " WHERE " + semaphoreNameColumn + " = ?";
this.totalPermitsSql = "SELECT " + totalPermitsColumn + " FROM " + semaphoreTableName
+ " WHERE " + semTableDesc.getSemNameColumn() + " = ?";
this.ownedPermitsSql = "SELECT " + ownerPermitsColumn + " FROM "
+ permitsByOwnerTableName + " WHERE "
+ ownerColumn + " = ? AND " + semaphoreNameColumn + " = ?";
this.deleteDeadOwnerRecordsSql = "DELETE FROM " + permitsByOwnerTableName + " RO "
+ "WHERE RO." + semaphoreNameColumn + " = ? AND " + ownerPermitsColumn + " = 0 AND "
+ "NOT EXISTS (select H." + heartBeatOwnerColumn + " from " + heartBeatTableName
+ " H where H." + heartBeatOwnerColumn + " = RO." + ownerColumn + ')';
this.getDeadOwnerPermitsSql = "SELECT " + ownerColumn + ", " + ownerPermitsColumn
+ " FROM " + permitsByOwnerTableName + " RO "
+ "WHERE RO." + semaphoreNameColumn + " = ? AND " + ownerPermitsColumn + " > 0 AND "
+ "NOT EXISTS (select H." + heartBeatOwnerColumn + " from " + heartBeatTableName
+ " H where H." + heartBeatOwnerColumn + " = RO." + ownerColumn
+ ") ORDER BY " + ownerColumn + ',' + ownerPermitsColumn;
this.deleteDeadOwerRecordSql = "DELETE FROM " + permitsByOwnerTableName + " WHERE "
+ ownerColumn + " = ? AND " + semaphoreNameColumn + " = ? AND "
+ ownerPermitsColumn + " = ?";
this.insertLockRowSql = "insert into " + semaphoreTableName
+ " (" + semaphoreNameColumn + ',' + availablePermitsColumn + ',' + totalPermitsColumn
+ ',' + lastModifiedByColumn + ',' + lastModifiedAtColumn + ") VALUES (?, ?, ?, ?, "
+ currentTimeMillisFunc + ')';
this.insertPermitsByOwnerSql = "insert into " + permitsByOwnerTableName
+ " (" + semaphoreNameColumn + ',' + ownerColumn + ',' + ownerPermitsColumn + ','
+ lastModifiedAtColumn + ") VALUES (?, ?, ?, " + currentTimeMillisFunc + ")";
try {
createLockRowIfNotPresent(strictReservations, nrPermits);
} catch (SQLIntegrityConstraintViolationException ex) {
try {
// RACE condition while creating the row, will retry to validate if everything is OK.
createLockRowIfNotPresent(strictReservations, nrPermits);
} catch (SQLException ex1) {
ex1.addSuppressed(ex);
throw ex1;
}
}
createOwnerRowIfNotPresent();
}
public void registerJmx() {
Registry.export(JdbcSemaphore.class.getName(), semName, this);
}
public void unregisterJmx() {
Registry.unregister(JdbcSemaphore.class.getName(), semName);
}
private void validate() {
if (!isHealthy) {
throw new IllegalStateException("Heartbeats failed! semaphore broken " + this, heartBeatFailure);
}
}
private void checkClosed() {
if (isClosed) {
throw new IllegalStateException("Semaphore " + this + " is closed");
}
}
private void createLockRowIfNotPresent(final boolean strictReservations, final int nrPermits)
throws SQLException, InterruptedException {
jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
try (PreparedStatement stmt = conn.prepareStatement(permitsSql)) {
stmt.setNString(1, semName);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
try (ResultSet rs = stmt.executeQuery()) {
if (!rs.next()) {
try (PreparedStatement insert = conn.prepareStatement(insertLockRowSql)) {
insert.setNString(1, semName);
insert.setInt(2, nrPermits);
insert.setInt(3, nrPermits);
insert.setNString(4, org.spf4j.base.Runtime.PROCESS_ID);
insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
insert.executeUpdate();
}
} else if (strictReservations) { // there is a record already. for now blow up if different nr reservations.
int existingMaxReservations = rs.getInt(2);
if (existingMaxReservations != nrPermits) {
throw new IllegalArgumentException("Semaphore " + semName + " max reservations count different "
+ existingMaxReservations + " != " + nrPermits + " use different semaphore");
}
if (rs.next()) {
throw new IllegalStateException("Cannot have mutiple semaphores with the same name " + semName);
}
} else if (rs.next()) {
throw new IllegalStateException("Cannot have mutiple semaphores with the same name " + semName);
}
}
}
return null;
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
private void createOwnerRowIfNotPresent()
throws SQLException, InterruptedException {
try {
jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
try (PreparedStatement insert = conn.prepareStatement(insertPermitsByOwnerSql)) {
insert.setNString(1, this.semName);
insert.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
insert.setInt(3, 0);
insert.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
insert.executeUpdate();
}
return null;
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
} catch (SQLIntegrityConstraintViolationException ex) {
LOG.debug("Semaphore record for current process already there", ex);
}
}
@SuppressFBWarnings("UW_UNCOND_WAIT")
@CheckReturnValue
@Override
public boolean tryAcquire(final int nrPermits, final long deadlineNanos)
throws InterruptedException {
if (nrPermits < 1) {
throw new IllegalArgumentException("You should try to acquire something! not " + nrPermits);
}
synchronized (syncObj) {
boolean acquired = false;
final MutableHolder<Boolean> beat = MutableHolder.of(Boolean.FALSE);
do {
checkClosed();
validate();
try {
acquired = jdbc.transactOnConnection(new HandlerNano<Connection, Boolean, SQLException>() {
@Override
public Boolean handle(final Connection conn, final long deadlineNanos) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(acquireSql)) {
stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
stmt.setInt(1, nrPermits);
stmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
stmt.setNString(3, semName);
stmt.setInt(4, nrPermits);
int rowsUpdated = stmt.executeUpdate();
Boolean acquired;
if (rowsUpdated == 1) {
try (PreparedStatement ostmt = conn.prepareStatement(acquireByOwnerSql)) {
ostmt.setInt(1, nrPermits);
ostmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
ostmt.setNString(3, semName);
ostmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
int nrUpdated = ostmt.executeUpdate();
if (nrUpdated != 1) {
throw new IllegalStateException("Updated " + nrUpdated + " is incorrect for " + ostmt);
}
}
acquired = Boolean.TRUE;
} else {
if (rowsUpdated > 1) {
throw new IllegalStateException("Too many rows updated! when trying to acquire " + nrPermits);
}
acquired = Boolean.FALSE;
}
long currNanoTime = TimeSource.nanoTime();
if (deadlineNanos - currNanoTime > heartBeat.getBeatDurationNanos()) {
// do a heartbeat if have time, and if it makes sense.
beat.setValue(heartBeat.tryBeat(conn, currNanoTime, deadlineNanos));
}
return acquired;
}
}
}, deadlineNanos);
} catch (SQLTimeoutException ex) {
return false;
} catch (SQLException ex) {
throw new LockRuntimeException(ex);
}
if (beat.getValue()) { // we did a heartbeat as part of the acquisition.
heartBeat.updateLastRunNanos(TimeSource.nanoTime());
}
if (!acquired) {
long secondsLeft = JdbcTemplate.getTimeoutToDeadlineSecondsNoEx(deadlineNanos);
if (secondsLeft < 0) {
return false;
}
if (secondsLeft < CLEANUP_TIMEOUT_SECONDS) {
Future<Integer> fut = DefaultExecutor.INSTANCE.submit(
() -> removeDeadHeartBeatAndNotOwnerRows(CLEANUP_TIMEOUT_SECONDS));
try {
fut.get(secondsLeft, TimeUnit.SECONDS);
} catch (TimeoutException ex) {
//removing dead entries did not finish in time, but continues in the background.
break;
} catch (ExecutionException ex) {
throw new LockRuntimeException(ex);
}
} else {
try {
removeDeadHeartBeatAndNotOwnerRows(secondsLeft);
} catch (SQLTimeoutException ex) {
return false;
} catch (SQLException ex) {
throw new LockRuntimeException(ex);
}
}
try {
if (releaseDeadOwnerPermits(nrPermits) <= 0) { //wait of we did not find anything dead to release.
long wtimeMilis = Math.min(TimeUnit.NANOSECONDS.toMillis(deadlineNanos - TimeSource.nanoTime()),
ThreadLocalRandom.current().nextLong(acquirePollMillis));
if (wtimeMilis > 0) {
syncObj.wait(wtimeMilis);
} else {
break;
}
}
} catch (SQLException ex) {
throw new LockRuntimeException(ex);
}
}
} while (!acquired && deadlineNanos > TimeSource.nanoTime());
if (acquired) {
ownedReservations += nrPermits;
}
return acquired;
}
}
@Override
@SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED")
public void release(final int nrReservations) {
synchronized (syncObj) {
try {
checkClosed();
jdbc.transactOnConnectionNonInterrupt(new HandlerNano<Connection, Void, SQLException>() {
@Override
public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
releaseReservations(conn, deadlineNanos, nrReservations);
try (PreparedStatement ostmt = conn.prepareStatement(releaseByOwnerSql)) {
ostmt.setInt(1, nrReservations);
ostmt.setNString(2, org.spf4j.base.Runtime.PROCESS_ID);
ostmt.setNString(3, semName);
ostmt.setInt(4, nrReservations);
ostmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
int nrUpdated = ostmt.executeUpdate();
if (nrUpdated != 1) {
throw new IllegalStateException("Trying to release more than you own! " + ostmt);
}
}
return null;
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
} catch (SQLException ex) {
throw new LockRuntimeException(ex);
}
ownedReservations -= nrReservations;
if (ownedReservations < 0) {
throw new IllegalStateException("Should not be trying to release more than you acquired!" + nrReservations);
}
syncObj.notifyAll();
}
}
public void releaseAll() {
synchronized (syncObj) {
checkClosed();
release(ownedReservations);
}
}
private void releaseReservations(final Connection conn, final long deadlineNanos, final int nrReservations)
throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(releaseSql)) {
stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
stmt.setInt(1, nrReservations);
stmt.setInt(2, nrReservations);
stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
stmt.setNString(4, semName);
stmt.executeUpdate(); // Since a release might or might not update a row.
}
}
@JmxExport(description = "Get the available semaphore permits")
public int availablePermits() throws SQLException, InterruptedException {
return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
try (PreparedStatement stmt = conn.prepareStatement(permitsSql)) {
stmt.setNString(1, semName);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
try (ResultSet rs = stmt.executeQuery()) {
if (!rs.next()) {
throw new IllegalStateException();
} else {
int result = rs.getInt(1);
if (rs.next()) {
throw new IllegalStateException();
}
return result;
}
}
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
@JmxExport(description = "get the number of permits owned by this process")
public int permitsOwned() throws SQLException, InterruptedException {
return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
try (PreparedStatement stmt = conn.prepareStatement(ownedPermitsSql)) {
stmt.setNString(1, org.spf4j.base.Runtime.PROCESS_ID);
stmt.setNString(2, semName);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
try (ResultSet rs = stmt.executeQuery()) {
if (!rs.next()) {
throw new IllegalStateException();
} else {
int result = rs.getInt(1);
if (rs.next()) {
throw new IllegalStateException();
}
return result;
}
}
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
@JmxExport(description = "Get the total permits this semaphore can hand out")
public int totalPermits() throws SQLException, InterruptedException {
return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
try (PreparedStatement stmt = conn.prepareStatement(totalPermitsSql)) {
stmt.setNString(1, semName);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
try (ResultSet rs = stmt.executeQuery()) {
if (!rs.next()) {
throw new IllegalStateException();
} else {
return rs.getInt(1);
}
}
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
@JmxExport(description = "get a list of all dead owners which hold permits")
@Nonnull
public List<OwnerPermits> getDeadOwnerPermits(final int wishPermits) throws SQLException, InterruptedException {
return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
return getDeadOwnerPermits(conn, deadlineNanos, wishPermits);
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
List<OwnerPermits> getDeadOwnerPermits(final Connection conn, final long deadlineNanos, final int wishPermits)
throws SQLException {
List<OwnerPermits> result = new ArrayList<>();
try (PreparedStatement stmt = conn.prepareStatement(getDeadOwnerPermitsSql)) {
stmt.setNString(1, semName);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
try (ResultSet rs = stmt.executeQuery()) {
int nrPermits = 0;
while (rs.next()) {
OwnerPermits ownerPermit = new OwnerPermits(rs.getNString(1), rs.getInt(2));
result.add(ownerPermit);
nrPermits += ownerPermit.getNrPermits();
if (nrPermits >= wishPermits) {
break;
}
}
}
}
return result;
}
/**
* Attempts to release permits for this semaphore owned by dead owners.
*
* @param wishPermits - How many permits we would like to get released.
* @return - the number of permits we actually released.
* @throws SQLException - something went wrong with the db.
* @throws InterruptedException - thrown if thread is interrupted.
*/
@JmxExport(description = "release dead owner permits")
@CheckReturnValue
public int releaseDeadOwnerPermits(@JmxExport(value = "wishPermits",
description = "how many we whish to release") final int wishPermits)
throws InterruptedException, SQLException {
return jdbc.transactOnConnection((final Connection conn, final long deadlineNanos) -> {
List<OwnerPermits> deadOwnerPermits = getDeadOwnerPermits(conn, deadlineNanos, wishPermits);
int released = 0;
for (OwnerPermits permit : deadOwnerPermits) {
try (PreparedStatement stmt = conn.prepareStatement(deleteDeadOwerRecordSql)) {
String owner = permit.getOwner();
stmt.setNString(1, owner);
stmt.setNString(2, semName);
int nrPermits = permit.getNrPermits();
stmt.setInt(3, nrPermits);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
if (stmt.executeUpdate() == 1) { // I can release! if not somebody else is doing it.
released += nrPermits;
releaseReservations(conn, deadlineNanos, nrPermits);
LOG.warn("Released {} reservations from dead owner {}", nrPermits, owner);
}
}
}
return released;
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
@JmxExport(description = "Change the total available permits to the provided number")
public void updatePermits(final int nrPermits) throws SQLException, InterruptedException {
if (nrPermits < 0) {
throw new IllegalArgumentException("Permits must be positive and not " + nrPermits);
}
jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
@Override
public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(updatePermitsSql)) {
stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
stmt.setInt(1, nrPermits);
stmt.setInt(2, nrPermits);
stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
stmt.setNString(4, semName);
int rowsUpdated = stmt.executeUpdate();
if (rowsUpdated != 1) {
throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
}
}
return null;
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
@JmxExport(description = "Reduce the total available permits by the provided number")
public void reducePermits(final int nrPermits) throws SQLException, InterruptedException {
jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
@Override
public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(reducePermitsSql)) {
stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
stmt.setInt(1, nrPermits);
stmt.setInt(2, nrPermits);
stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
stmt.setNString(4, semName);
stmt.setInt(5, nrPermits);
int rowsUpdated = stmt.executeUpdate();
if (rowsUpdated != 1) {
throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
}
}
return null;
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
@JmxExport(description = "Increase the total available permits by the provided number")
public void increasePermits(final int nrPermits) throws SQLException, InterruptedException {
jdbc.transactOnConnection(new HandlerNano<Connection, Void, SQLException>() {
@Override
public Void handle(final Connection conn, final long deadlineNanos) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(increasePermitsSql)) {
stmt.setQueryTimeout(Math.min(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos),
jdbcTimeoutSeconds));
stmt.setInt(1, nrPermits);
stmt.setInt(2, nrPermits);
stmt.setNString(3, org.spf4j.base.Runtime.PROCESS_ID);
stmt.setNString(4, semName);
int rowsUpdated = stmt.executeUpdate();
if (rowsUpdated != 1) {
throw new IllegalArgumentException("Cannot reduce nr total permits by " + nrPermits);
}
}
return null;
}
}, jdbcTimeoutSeconds, TimeUnit.SECONDS);
}
public int removeDeadHeartBeatAndNotOwnerRows(final long timeoutSeconds) throws SQLException, InterruptedException {
return jdbc.transactOnConnection(new HandlerNano<Connection, Integer, SQLException>() {
@Override
public Integer handle(final Connection conn, final long deadlineNanos) throws SQLException {
return removeDeadHeartBeatAndNotOwnerRows(conn, deadlineNanos);
}
}, timeoutSeconds, TimeUnit.SECONDS);
}
private int removeDeadHeartBeatAndNotOwnerRows(final Connection conn, final long deadlineNanos) throws SQLException {
int removedDeadHeartBeatRows = this.heartBeat.removeDeadHeartBeatRows(conn, deadlineNanos);
if (removedDeadHeartBeatRows > 0) {
return removeDeadNotOwnedRowsOnly(conn, deadlineNanos);
} else {
return 0;
}
}
private int removeDeadNotOwnedRowsOnly(final Connection conn, final long deadlineNanos) throws SQLException {
try (PreparedStatement stmt = conn.prepareStatement(deleteDeadOwnerRecordsSql)) {
stmt.setNString(1, semName);
stmt.setQueryTimeout(JdbcTemplate.getTimeoutToDeadlineSeconds(deadlineNanos));
return stmt.executeUpdate();
}
}
@Override
public String toString() {
return "JdbcSemaphore{" + "jdbc=" + jdbc
+ ", jdbcTimeoutSeconds=" + jdbcTimeoutSeconds + ", semName=" + semName + '}';
}
@Override
public void close() {
synchronized (syncObj) {
if (!isClosed) {
releaseAll();
unregisterJmx();
this.heartBeat.removeLifecycleHook(failureHook);
isClosed = true;
}
}
}
@Override
protected void finalize() throws Throwable {
try (AutoCloseable c = this) {
super.finalize();
}
}
@JmxExport
public int getJdbcTimeoutSeconds() {
return jdbcTimeoutSeconds;
}
@JmxExport
public boolean isIsHealthy() {
return isHealthy;
}
}