TSDBMeasurementDatabase.java
/*
* Copyright (c) 2001, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package org.spf4j.perf.impl.mdb.tsdb;
import com.google.common.base.Charsets;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.base.Arrays;
import org.spf4j.base.DefaultScheduler;
import org.spf4j.base.Pair;
import org.spf4j.perf.EntityMeasurementsInfo;
import org.spf4j.perf.MeasurementDatabase;
import org.spf4j.perf.impl.chart.Charts;
import org.spf4j.perf.tsdb.ColumnInfo;
import org.spf4j.perf.tsdb.TimeSeriesDatabase;
import java.awt.image.BufferedImage;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.ThreadSafe;
import javax.imageio.ImageIO;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import org.jfree.chart.JFreeChart;
import org.joda.time.DateTimeConstants;
import org.joda.time.LocalDate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author zoly
*/
@ThreadSafe
public final class TSDBMeasurementDatabase
implements MeasurementDatabase, Closeable, TSDBMeasurementDatabaseMBean {
private final TimeSeriesDatabase database;
private volatile ScheduledFuture<?> future;
private static final Logger LOG = LoggerFactory.getLogger(TSDBMeasurementDatabase.class);
public TSDBMeasurementDatabase(final String databaseName) throws IOException {
this.database = new TimeSeriesDatabase(databaseName, new byte[]{});
}
private static final AtomicInteger COUNTER = new AtomicInteger(0);
public void registerJmx() throws MalformedObjectNameException, InstanceAlreadyExistsException,
MBeanRegistrationException, NotCompliantMBeanException {
ManagementFactory.getPlatformMBeanServer().registerMBean(this,
new ObjectName("SPF4J:name=TSDBMeasurementDatabase" + COUNTER.getAndIncrement()));
}
public void closeOnShutdown() {
Runtime.getRuntime().addShutdownHook(new Thread(new AbstractRunnable(false) {
@Override
public void doRun() throws Exception {
close();
}
}, "tsdb shutdown"));
}
public void flushEvery(final int intervalMillis) {
future = DefaultScheduler.INSTANCE.scheduleAtFixedRate(new AbstractRunnable(false) {
@Override
public void doRun() throws Exception {
database.flush();
}
}, intervalMillis, intervalMillis, TimeUnit.MILLISECONDS);
}
@Override
public void alocateMeasurements(final EntityMeasurementsInfo measurement,
final int sampleTimeMillis) throws IOException {
String groupName = measurement.getMeasuredEntity().toString();
alocateMeasurements(groupName, measurement, sampleTimeMillis);
}
private void alocateMeasurements(final String groupName, final EntityMeasurementsInfo measurement,
final int sampleTimeMillis) throws IOException {
synchronized (database) {
if (!database.hasColumnGroup(groupName)) {
String[] measurementNames = measurement.getMeasurementNames();
byte[] uom = measurement.getUnitOfMeasurement().getBytes(Charsets.UTF_8);
byte[][] metaData = new byte[measurementNames.length][];
String [] uoms = measurement.getMeasurementUnits();
for (int i = 0; i < metaData.length; i++) {
metaData[i] = uoms[i].getBytes(Charsets.UTF_8);
}
database.addColumnGroup(groupName, uom, sampleTimeMillis, measurementNames,
metaData);
}
}
}
@Override
public void saveMeasurements(final EntityMeasurementsInfo measurementInfo,
final long[] measurements, final long timeStampMillis, final int sampleTimeMillis)
throws IOException {
String groupName = measurementInfo.getMeasuredEntity().toString();
alocateMeasurements(groupName, measurementInfo, sampleTimeMillis);
database.write(timeStampMillis, groupName, measurements);
}
@PreDestroy
@Override
public void close() throws IOException {
if (future != null) {
future.cancel(false);
}
database.close();
}
private static String fixName(final String name) {
StringBuilder result = new StringBuilder(name.length());
for (int i = 0; i < name.length(); i++) {
char c = name.charAt(i);
if (Character.isJavaIdentifierPart(c)) {
result.append(c);
}
}
return result.toString();
}
@Override
public List<String> generateCharts(final int width, final int height) throws IOException {
long startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
long endTime = System.currentTimeMillis();
return generateCharts(startTime, endTime, width, height);
}
/**
* Quantized recorders will have min, max avg charts and distribution charts
* generated. Counting recorders will have simple charts generated.
*
* @param startTimeMillis
* @param endTimeMillis
* @param width
* @param height
* @return
* @throws IOException
*/
@Override
public List<String> generateCharts(final long startTimeMillis, final long endTimeMillis,
final int width, final int height) throws IOException {
try {
database.flush();
List<String> result = new ArrayList<String>();
Collection<ColumnInfo> columnsInfo = database.getColumnsInfo();
for (ColumnInfo info : columnsInfo) {
Pair<long[], long[][]> data = database.read(info.getGroupName(), startTimeMillis, endTimeMillis);
if (data.getFirst().length > 0) {
if (canGenerateMinMaxAvgCount(info)) {
result.add(generateMinMaxAvgCountChart(info, data, width, height));
}
if (canGenerateHeatChart(info)) {
result.add(generateHeatChart(info, data, width, height));
}
}
}
Multimap<String, ColumnInfo> counters = getCounters(columnsInfo);
for (Map.Entry<String, Collection<ColumnInfo>> entry : counters.asMap().entrySet()) {
long[][] timestamps = new long[entry.getValue().size()][];
double[][] cdata = new double[entry.getValue().size()][];
double[][] cdata2 = new double[entry.getValue().size()][];
int i = 0;
String[] measurementNames = new String[cdata.length];
String[] measurementNames2 = new String[cdata2.length];
String uom1 = "count";
String uom2 = "";
for (ColumnInfo colInfo : entry.getValue()) {
Pair<long[], long[][]> data = database.read(colInfo.getGroupName(), startTimeMillis, endTimeMillis);
timestamps[i] = data.getFirst();
cdata[i] = Arrays.getColumnAsDoubles(data.getSecond(), colInfo.getColumnIndex("count"));
cdata2[i] = Arrays.getColumnAsDoubles(data.getSecond(), colInfo.getColumnIndex("total"));
measurementNames[i] = colInfo.getGroupName() + ".count";
measurementNames2[i] = colInfo.getGroupName() + ".total";
uom2 = new String(colInfo.getGroupMetaData(), Charsets.UTF_8);
i++;
}
result.add(generateCountChart(entry.getKey(), timestamps, measurementNames,
measurementNames2, uom1, uom2, cdata, cdata2, width, height));
}
LOG.info("Generated charts {}", result);
return result;
} catch (IOException ex) {
LOG.error("Error while generating charts", ex);
throw ex;
} catch (RuntimeException ex) {
LOG.error("Error while generating charts", ex);
throw ex;
}
}
private static Multimap<String, ColumnInfo> getCounters(final Collection<ColumnInfo> columnInfos) {
Multimap<String, ColumnInfo> result = HashMultimap.create();
for (ColumnInfo info : columnInfos) {
if (isCounterOnly(info)) {
String groupName = info.getGroupName();
if (groupName.startsWith("(")) {
int cidx = groupName.indexOf(',');
if (cidx > 0) {
groupName = groupName.substring(1, cidx);
}
}
result.put(groupName, info);
}
}
return result;
}
public static boolean isCounterOnly(final ColumnInfo info) {
String[] columns = info.getColumnNames();
return columns.length == 2 && columns[0].equals("count")
&& columns[1].equals("total");
}
public static boolean canGenerateMinMaxAvgCount(final ColumnInfo info) {
return ((info.getColumnIndex("min") >= 0)
&& (info.getColumnIndex("max") >= 0)
&& (info.getColumnIndex("total") >= 0)
&& (info.getColumnIndex("count") >= 0));
}
public static boolean canGenerateCount(final ColumnInfo info) {
return ((info.getColumnIndex("count") >= 0));
}
public static boolean canGenerateHeatChart(final ColumnInfo info) {
for (String mname : info.getColumnNames()) {
if (mname.startsWith("Q") && mname.contains("_")) {
return true;
}
}
return false;
}
private String generateMinMaxAvgCountChart(
final ColumnInfo info, final Pair<long[], long[][]> data,
final int width, final int height) throws IOException {
long[][] vals = data.getSecond();
double[] min = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("min"));
double[] max = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("max"));
double[] total = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("total"));
double[] count = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("count"));
for (int i = 0; i < count.length; i++) {
if (count[i] == 0) {
min[i] = 0;
max[i] = 0;
}
}
long[] timestamps = data.getFirst();
BufferedImage combined = Charts.createMinMaxAvgCountImg("Measurements for "
+ info.getGroupName() + " generated by spf4j",
timestamps, min, max, total, count, new String(info.getGroupMetaData(), Charsets.UTF_8), width, height);
File dbFile = new File(database.getDBFilePath());
File graphicFile = File.createTempFile(dbFile.getName() + "_" + fixName(info.getGroupName()), ".mmac.png",
dbFile.getParentFile());
ImageIO.write(combined, "png", graphicFile);
return graphicFile.getPath();
}
private String generateCountChart(
final String groupName, final long[][] timestamps,
final String[] measurementNames, final String[] measurementNames2,
final String uom1, final String uom2,
final double[][] measurements, final double[][] measurements2,
final int width, final int height) throws IOException {
BufferedImage count = Charts.createTimeSeriesJFreeChart("Measurements for "
+ groupName + " generated by spf4j",
timestamps, measurementNames, uom1, measurements).createBufferedImage(width, height / 2);
BufferedImage total = Charts.createTimeSeriesJFreeChart(null,
timestamps, measurementNames2, uom2, measurements2).createBufferedImage(width, height / 2);
BufferedImage combined = new BufferedImage(width, height, BufferedImage.TYPE_INT_RGB);
combined.getGraphics().drawImage(count, 0, 0, null);
combined.getGraphics().drawImage(total, 0, height / 2, null);
File dbFile = new File(database.getDBFilePath());
File graphicFile = File.createTempFile(dbFile.getName() + "_" + fixName(groupName), ".count.png",
dbFile.getParentFile());
ImageIO.write(combined, "png", graphicFile);
return graphicFile.getPath();
}
private String generateHeatChart(final ColumnInfo info, final Pair<long[], long[][]> data,
final int width, final int height) throws IOException {
JFreeChart chart = TimeSeriesDatabase.createHeatJFreeChart(data, info);
BufferedImage img = chart.createBufferedImage(width, height);
File dbFile = new File(database.getDBFilePath());
File graphicFile = File.createTempFile(dbFile.getName() + "_" + fixName(info.getGroupName()), ".dist.png",
dbFile.getParentFile());
ImageIO.write(img, "png", graphicFile);
return graphicFile.getAbsolutePath();
}
@Override
public List<String> generate(final Properties props) throws IOException {
int width = Integer.valueOf(props.getProperty("width", "1200"));
int height = Integer.valueOf(props.getProperty("height", "800"));
long startTime = Long.valueOf(props.getProperty("startTime",
Long.toString(new LocalDate().withDayOfWeek(DateTimeConstants.MONDAY).toDate().getTime())));
long endTime = Long.valueOf(props.getProperty("endTime", Long.toString(System.currentTimeMillis())));
return generateCharts(startTime, endTime, width, height);
}
@Override
public List<String> getParameters() {
return java.util.Arrays.asList("width", "height", "startTime", "endTime");
}
}