TimeSeriesDatabase.java
/*
* Copyright (c) 2001, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package org.spf4j.perf.tsdb;
import com.google.common.base.Charsets;
import org.spf4j.base.Pair;
import gnu.trove.list.array.TIntArrayList;
import gnu.trove.list.array.TLongArrayList;
import java.io.Closeable;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.jfree.chart.JFreeChart;
import org.spf4j.base.Arrays;
import org.spf4j.perf.impl.chart.Charts;
import static org.spf4j.perf.impl.chart.Charts.fillGaps;
/**
* Yet another time series database. Why? because all the other ts databases had
* various constraints that restrict the functionality I can add to spf4j.
*
* Initial Features:
*
* 1. measurements can be added dynamically anytime to a database. 2. long
* measurement names. 3. the stored interval is not known from the beginning. 4.
* implementation biased towards write performance.
*
* Future thoughts:
*
*
*
* @author zoly
*/
public final class TimeSeriesDatabase implements Closeable {
public static final int VERSION = 1;
private final Map<String, ColumnInfo> groups;
private final RandomAccessFile file;
private final Header header;
private final TableOfContents toc;
private ColumnInfo lastColumnInfo;
private final Map<String, DataFragment> writeDataFragments;
private final String pathToDatabaseFile;
public TimeSeriesDatabase(final String pathToDatabaseFile, final byte[] metaData) throws IOException {
this.pathToDatabaseFile = pathToDatabaseFile;
file = new RandomAccessFile(pathToDatabaseFile, "rw");
// read or create header
if (file.length() == 0) {
this.header = new Header(VERSION, metaData);
this.header.writeTo(file);
this.toc = new TableOfContents(file.getFilePointer());
this.toc.writeTo(file);
} else {
this.header = new Header(file);
this.toc = new TableOfContents(file);
}
groups = new HashMap<String, ColumnInfo>();
if (toc.getFirstColumnInfo() > 0) {
file.seek(toc.getFirstColumnInfo());
ColumnInfo colInfo = new ColumnInfo(file);
groups.put(colInfo.getGroupName(), colInfo);
lastColumnInfo = colInfo;
while (colInfo.getNextColumnInfo() > 0) {
file.seek(colInfo.getNextColumnInfo());
colInfo = new ColumnInfo(file);
groups.put(colInfo.getGroupName(), colInfo);
lastColumnInfo = colInfo;
}
}
writeDataFragments = new HashMap<String, DataFragment>();
}
@Override
public synchronized void close() throws IOException {
try {
flush();
} finally {
file.close();
}
}
public synchronized boolean hasColumnGroup(final String groupName) {
return groups.containsKey(groupName);
}
public synchronized void addColumnGroup(final String groupName,
final byte[] groupMetaData, final int sampleTime, final String[] columnNames,
final byte[][] metaData) throws IOException {
if (groups.containsKey(groupName)) {
throw new IllegalArgumentException("group already exists " + groupName);
}
//write column information at the end of the file.
flush();
file.seek(file.length());
ColumnInfo colInfo = new ColumnInfo(groupName, groupMetaData, columnNames,
metaData, sampleTime, file.getFilePointer());
colInfo.writeTo(file);
//update refferences to this new ColumnInfo.
if (lastColumnInfo != null) {
lastColumnInfo.setNextColumnInfo(colInfo.getLocation(), file);
} else {
toc.setFirstColumnInfo(colInfo.getLocation(), file);
}
toc.setLastColumnInfo(colInfo.getLocation(), file);
lastColumnInfo = colInfo;
groups.put(groupName, colInfo);
}
public synchronized void write(final long time, final String groupName, final long[] values) throws IOException {
if (!groups.containsKey(groupName)) {
throw new IllegalArgumentException("Unknown group name" + groupName);
}
DataFragment writeDataFragment = writeDataFragments.get(groupName);
if (writeDataFragment == null) {
writeDataFragment = new DataFragment(time);
writeDataFragments.put(groupName, writeDataFragment);
}
writeDataFragment.addData(time, values);
}
public synchronized void flush() throws IOException {
for (Map.Entry<String, DataFragment> entry : writeDataFragments.entrySet()) {
DataFragment writeDataFragment = entry.getValue();
String groupName = entry.getKey();
file.seek(file.length());
writeDataFragment.setLocation(file.getFilePointer());
writeDataFragment.writeTo(file);
ColumnInfo colInfo = groups.get(groupName);
if (colInfo.getLastDataFragment() != 0) {
DataFragment.setNextDataFragment(colInfo.getLastDataFragment(), writeDataFragment.getLocation(), file);
} else {
colInfo.setFirstDataFragment(writeDataFragment.getLocation(), file);
}
colInfo.setLastDataFragment(writeDataFragment.getLocation(), file);
}
writeDataFragments.clear();
sync();
}
public synchronized String[] getColumnNames(final String groupName) {
return groups.get(groupName).getColumnNames();
}
public synchronized ColumnInfo getColumnInfo(final String groupName) {
return groups.get(groupName);
}
public synchronized Collection<ColumnInfo> getColumnsInfo() {
return groups.values();
}
public synchronized Pair<long[], long[][]> readAll(final String groupName) throws IOException {
return read(groupName, 0, Long.MAX_VALUE);
}
public synchronized Pair<long[], long[][]> read(final String groupName,
final long startTime, final long endTime) throws IOException {
TLongArrayList timeStamps = new TLongArrayList();
List<long[]> data = new ArrayList<long[]>();
ColumnInfo info = groups.get(groupName);
if (info.getFirstDataFragment() > 0) {
DataFragment frag;
long nextFragmentLocation = info.getFirstDataFragment();
do {
file.seek(nextFragmentLocation);
frag = new DataFragment(file);
long fragStartTime = frag.getStartTimeMillis();
if (fragStartTime > startTime) {
TIntArrayList fragTimestamps = frag.getTimestamps();
int nr = 0;
for (int i = 0; i < fragTimestamps.size(); i++) {
long ts = fragStartTime + fragTimestamps.get(i);
if (ts < endTime) {
timeStamps.add(ts);
nr++;
} else {
break;
}
}
List<long[]> d = frag.getData();
for (int i = 0; i < nr; i++) {
data.add(d.get(i));
}
if (fragTimestamps.size() > nr) {
break;
}
}
nextFragmentLocation = frag.getNextDataFragment();
} while (nextFragmentLocation > 0);
}
return Pair.of(timeStamps.toArray(), data.toArray(new long[data.size()][]));
}
public synchronized void sync() throws IOException {
file.getFD().sync();
}
public String getDBFilePath() {
return pathToDatabaseFile;
}
public JFreeChart createHeatJFreeChart(final String groupName) throws IOException {
ColumnInfo info = this.getColumnInfo(groupName);
Pair<long[], long[][]> data = this.readAll(groupName);
return createHeatJFreeChart(data, info);
}
public JFreeChart createMinMaxAvgJFreeChart(final String groupName) throws IOException {
ColumnInfo info = this.getColumnInfo(groupName);
Pair<long[], long[][]> data = this.readAll(groupName);
return createMinMaxAvgJFreeChart(data, info);
}
public JFreeChart createCountJFreeChart(final String groupName) throws IOException {
ColumnInfo info = this.getColumnInfo(groupName);
Pair<long[], long[][]> data = this.readAll(groupName);
return createCountJFreeChart(data, info);
}
public static JFreeChart createHeatJFreeChart(final Pair<long[], long[][]> data, final ColumnInfo info) {
Pair<long[], double[][]> mData = fillGaps(data.getFirst(), data.getSecond(),
info.getSampleTime(), info.getColumnNames().length);
JFreeChart chart = Charts.createHeatJFreeChart(info.getColumnNames(),
mData.getSecond(), data.getFirst()[0], info.getSampleTime(),
new String(info.getGroupMetaData(), Charsets.UTF_8), "Measurements distribution for "
+ info.getGroupName() + " generated by spf4j");
return chart;
}
public static JFreeChart createMinMaxAvgJFreeChart(final Pair<long[], long[][]> data, final ColumnInfo info) {
long[][] vals = data.getSecond();
double[] min = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("min"));
double[] max = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("max"));
double[] total = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("total"));
double[] count = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("count"));
for (int i = 0; i < count.length; i++) {
if (count[i] == 0) {
min[i] = 0;
max[i] = 0;
}
}
long[] timestamps = data.getFirst();
return Charts.createTimeSeriesJFreeChart("Min,Max,Avg chart for "
+ info.getGroupName() + " generated by spf4j", timestamps,
new String[]{"min", "max", "avg"}, new String(info.getGroupMetaData(), Charsets.UTF_8),
new double[][]{min, max, Arrays.divide(total, count)});
}
public static JFreeChart createCountJFreeChart(final Pair<long[], long[][]> data, final ColumnInfo info) {
long[][] vals = data.getSecond();
double[] count = Arrays.getColumnAsDoubles(vals, info.getColumnIndex("count"));
long[] timestamps = data.getFirst();
return Charts.createTimeSeriesJFreeChart("count chart for "
+ info.getGroupName() + " generated by spf4j", timestamps,
new String[]{"count"}, "count", new double[][]{count});
}
@Override
public String toString() {
return "TimeSeriesDatabase{" + "groups=" + groups + ", pathToDatabaseFile=" + pathToDatabaseFile + '}';
}
}