ScalableMeasurementRecorderSource.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.perf.impl;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import gnu.trove.map.TObjectLongMap;
import gnu.trove.map.hash.TObjectLongHashMap;
import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.concurrent.DefaultScheduler;
import org.spf4j.base.Pair;
import org.spf4j.io.Csv;
import org.spf4j.jmx.GenericExportedValue;
import org.spf4j.jmx.JmxExport;
import org.spf4j.jmx.DynamicMBeanBuilder;
import org.spf4j.jmx.Registry;
import org.spf4j.perf.CloseableMeasurementRecorderSource;
import org.spf4j.perf.JmxSupport;
import org.spf4j.perf.MeasurementAccumulator;
import org.spf4j.perf.MeasurementsInfo;
import org.spf4j.perf.MeasurementsSource;
import org.spf4j.perf.MeasurementStore;
import org.spf4j.perf.MeasurementRecorder;
import org.spf4j.perf.MeasurementRecorderSource;
@ThreadSafe
// a recorder instance is tipically alive for the entire life of the process
@SuppressFBWarnings("PMB_INSTANCE_BASED_THREAD_LOCAL")
public final class ScalableMeasurementRecorderSource implements
MeasurementRecorderSource, MeasurementsSource, CloseableMeasurementRecorderSource, JmxSupport {
private final Map<Thread, Map<Object, MeasurementAccumulator>> measurementProcessorMap;
private final ThreadLocal<Map<Object, MeasurementAccumulator>> threadLocalMeasurementProcessorMap;
private final ScheduledFuture<?> samplingFuture;
private final MeasurementAccumulator processorTemplate;
private final TObjectLongMap<MeasurementsInfo> tableIds;
private final Persister persister;
private final Runnable shutdownHook;
ScalableMeasurementRecorderSource(final MeasurementAccumulator processor,
final int sampleTimeMillis, final MeasurementStore database, final boolean closeOnShutdown) {
if (sampleTimeMillis < 1000) {
throw new IllegalArgumentException("sample time needs to be at least 1000 and not " + sampleTimeMillis);
}
this.processorTemplate = processor;
measurementProcessorMap = new HashMap<>();
threadLocalMeasurementProcessorMap = new ThreadLocal<Map<Object, MeasurementAccumulator>>() {
@Override
protected Map<Object, MeasurementAccumulator> initialValue() {
Map<Object, MeasurementAccumulator> result = new HashMap<>();
synchronized (measurementProcessorMap) {
measurementProcessorMap.put(Thread.currentThread(), result);
}
return result;
}
};
tableIds = new TObjectLongHashMap<>();
persister = new Persister(database, sampleTimeMillis, processor);
samplingFuture = DefaultScheduler.scheduleAllignedAtFixedRateMillis(persister, sampleTimeMillis);
if (closeOnShutdown) {
shutdownHook = closeOnShutdown();
} else {
shutdownHook = null;
}
}
private Runnable closeOnShutdown() {
final AbstractRunnable runnable = new AbstractRunnable(true) {
@Override
public void doRun() {
close();
}
};
org.spf4j.base.Runtime.queueHook(0, runnable);
return runnable;
}
@Override
public MeasurementRecorder getRecorder(final Object forWhat) {
Map<Object, MeasurementAccumulator> recorders = threadLocalMeasurementProcessorMap.get();
synchronized (recorders) {
MeasurementAccumulator result = recorders.get(forWhat);
if (result == null) {
result = (MeasurementAccumulator) processorTemplate.createLike(
Pair.of(processorTemplate.getInfo().getMeasuredEntity(), forWhat));
recorders.put(forWhat, result);
}
return result;
}
}
@Override
public Map<Object, MeasurementAccumulator> getEntitiesMeasurements() {
Map<Object, MeasurementAccumulator> result = new HashMap<>();
synchronized (measurementProcessorMap) {
Iterator<Map.Entry<Thread, Map<Object, MeasurementAccumulator>>> iterator
= measurementProcessorMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Thread, Map<Object, MeasurementAccumulator>> entry = iterator.next();
Map<Object, MeasurementAccumulator> measurements = entry.getValue();
synchronized (measurements) {
for (Map.Entry<Object, MeasurementAccumulator> lentry : measurements.entrySet()) {
Object what = lentry.getKey();
MeasurementAccumulator existingMeasurement = result.get(what);
if (existingMeasurement == null) {
existingMeasurement = lentry.getValue().createClone();
} else {
existingMeasurement = existingMeasurement.aggregate(lentry.getValue().createClone());
}
result.put(what, existingMeasurement);
}
}
}
}
return result;
}
@Override
@Nonnull
public Map<Object, MeasurementAccumulator> getEntitiesMeasurementsAndReset() {
Map<Object, MeasurementAccumulator> result = new HashMap<>();
synchronized (measurementProcessorMap) {
Iterator<Map.Entry<Thread, Map<Object, MeasurementAccumulator>>> iterator
= measurementProcessorMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Thread, Map<Object, MeasurementAccumulator>> entry = iterator.next();
Thread thread = entry.getKey();
if (!thread.isAlive()) {
iterator.remove();
}
Map<Object, MeasurementAccumulator> measurements = entry.getValue();
synchronized (measurements) {
Iterator<Map.Entry<Object, MeasurementAccumulator>> iterator1 = measurements.entrySet().iterator();
while (iterator1.hasNext()) {
Map.Entry<Object, MeasurementAccumulator> lentry = iterator1.next();
Object what = lentry.getKey();
MeasurementAccumulator existingMeasurement = result.get(what);
if (existingMeasurement == null) {
existingMeasurement = lentry.getValue().reset();
if (existingMeasurement == null) {
iterator1.remove();
} else {
result.put(what, existingMeasurement);
}
} else {
final MeasurementAccumulator vals = lentry.getValue().reset();
if (vals != null) {
existingMeasurement = existingMeasurement.aggregate(vals);
result.put(what, existingMeasurement);
} else {
iterator1.remove();
}
}
}
}
}
}
return result;
}
@SuppressWarnings("unchecked")
@SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED")
public void registerJmx() {
MeasurementsInfo info = this.processorTemplate.getInfo();
CompositeType targetType = addNameDescription(info.toCompositeType());
try {
String description = info.getDescription();
if (description.isEmpty()) {
description = "Dynamic measurements";
}
new DynamicMBeanBuilder().withJmxExportObject(this)
.withAttribute(new GenericExportedValue<>("measurements", description,
this::getMeasurements, null, new TabularType(info.getMeasuredEntity().toString(),
description, targetType, new String[]{"name"})))
.register("org.spf4j.perf.recorders", info.getMeasuredEntity().toString());
} catch (OpenDataException ex) {
throw new RuntimeException("Cannot create tabular type for " + targetType, ex);
}
}
@Override
@SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED")
public void close() {
synchronized (persister) {
if (!samplingFuture.isCancelled()) {
if (shutdownHook != null) {
org.spf4j.base.Runtime.removeQueuedShutdownHook(shutdownHook);
}
samplingFuture.cancel(false);
try {
persister.persist(false);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
Registry.unregister("org.spf4j.perf.recorders",
this.processorTemplate.getInfo().getMeasuredEntity().toString());
}
}
}
@JmxExport(description = "measurements as csv")
public String getMeasurementsAsString() {
StringWriter sw = new StringWriter(128);
Map<Object, MeasurementAccumulator> entitiesMeasurements = getEntitiesMeasurements();
MeasurementsInfo info = this.processorTemplate.getInfo();
try {
Csv.writeCsvRow2(sw, "Measured", (Object[]) info.getMeasurementNames());
Csv.writeCsvRow2(sw, "string", (Object[]) info.getMeasurementUnits());
for (Map.Entry<Object, MeasurementAccumulator> entry : entitiesMeasurements.entrySet()) {
Csv.writeCsvElement(entry.getKey().toString(), sw);
sw.write(',');
final long[] measurements = entry.getValue().get();
if (measurements != null) {
Csv.writeCsvRow(sw, measurements);
}
}
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
return sw.toString();
}
static CompositeType addNameDescription(final CompositeType initType) {
Set<String> keys = initType.keySet();
int colNr = keys.size() + 2;
String[] names = new String[colNr];
String[] descrs = new String[colNr];
OpenType[] types = new OpenType[colNr];
names[0] = "name";
names[1] = "description";
descrs[0] = "metric name";
descrs[1] = "metric description";
types[0] = javax.management.openmbean.SimpleType.STRING;
types[1] = javax.management.openmbean.SimpleType.STRING;
int i = 2;
for (String key : keys) {
names[i] = key;
descrs[i] = initType.getDescription(key);
types[i++] = initType.getType(key);
}
try {
return new CompositeType(initType.getTypeName(), initType.getDescription(),
names, descrs, types);
} catch (OpenDataException ex) {
throw new IllegalArgumentException("Invalid type contructed from " + initType, ex);
}
}
static CompositeData addNameDescription(final CompositeType targetType, final CompositeData data,
final String name, final String description) {
Set<String> keySet = targetType.keySet();
int size = keySet.size();
Map<String, Object> vals = com.google.common.collect.Maps.newLinkedHashMapWithExpectedSize(size);
vals.put("name", name);
vals.put("description", description);
for (String key : data.getCompositeType().keySet()) {
vals.put(key, data.get(key));
}
try {
return new CompositeDataSupport(targetType, vals);
} catch (OpenDataException ex) {
throw new IllegalArgumentException("Invalid open data contructed from " + data, ex);
}
}
public TabularDataSupport getMeasurements() {
Map<Object, MeasurementAccumulator> entitiesMeasurements = getEntitiesMeasurements();
MeasurementsInfo info = this.processorTemplate.getInfo();
CompositeType targetType = addNameDescription(info.toCompositeType());
TabularDataSupport result;
try {
String name = info.getMeasuredEntity().toString();
String description = info.getDescription();
if (description.isEmpty()) {
description = name;
}
result = new TabularDataSupport(new TabularType(name, description, targetType, new String[]{"name"}));
} catch (OpenDataException ex) {
throw new RuntimeException("Enable to contruct tabular data " + entitiesMeasurements, ex);
}
for (Map.Entry<Object, MeasurementAccumulator> entry : entitiesMeasurements.entrySet()) {
MeasurementAccumulator acc = entry.getValue();
MeasurementsInfo eInfo = acc.getInfo();
String cattrName = eInfo.getMeasuredEntity().toString();
String cattrDesc = eInfo.getDescription();
if (cattrDesc.isEmpty()) {
cattrDesc = cattrName;
}
result.put(addNameDescription(targetType, acc.getCompositeData(), cattrName, cattrDesc));
}
return result;
}
@JmxExport
public void clear() {
getEntitiesMeasurementsAndReset();
}
private class Persister extends AbstractRunnable {
private final MeasurementStore database;
private final int sampleTimeMillis;
private final MeasurementAccumulator processor;
private volatile long lastRun = 0;
Persister(final MeasurementStore database, final int sampleTimeMillis,
final MeasurementAccumulator processor) {
super(true);
this.database = database;
this.sampleTimeMillis = sampleTimeMillis;
this.processor = processor;
}
@Override
public void doRun() throws IOException {
persist(true);
}
public void persist(final boolean warn) throws IOException {
long currentTime = System.currentTimeMillis();
if (currentTime > lastRun) {
lastRun = currentTime;
for (MeasurementAccumulator m
: ScalableMeasurementRecorderSource.this.getEntitiesMeasurementsAndReset().values()) {
final MeasurementsInfo info = m.getInfo();
long tableId;
synchronized (tableIds) {
tableId = tableIds.get(info);
if (tableId == 0) {
tableId = database.alocateMeasurements(info, sampleTimeMillis);
tableIds.put(info, tableId);
}
}
final long[] data = m.getThenReset();
if (data != null) {
database.saveMeasurements(tableId, currentTime, data);
}
}
} else if (warn) {
Logger.getLogger(ScalableMeasurementRecorderSource.class.getName())
.log(Level.WARNING,
"Last measurement recording for {0} was at {1} current run is {2}, something is wrong",
new Object[] {processor.getInfo(), lastRun, currentTime});
}
}
}
@Override
public String toString() {
return "ScalableMeasurementRecorderSource{" + "measurementProcessorMap=" + measurementProcessorMap
+ ", threadLocalMeasurementProcessorMap=" + threadLocalMeasurementProcessorMap
+ ", samplingFuture=" + samplingFuture + ", processorTemplate=" + processorTemplate
+ ", tableIds=" + tableIds + ", persister=" + persister + ", shutdownHook=" + shutdownHook + '}';
}
}