AvroMeasurementStore.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.perf.impl.ms.tsdb;
import com.google.common.primitives.Longs;
import org.spf4j.perf.MeasurementsInfo;
import org.spf4j.perf.MeasurementStore;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.spf4j.jmx.JmxExport;
import org.spf4j.perf.MeasurementStoreQuery;
import org.spf4j.tsdb2.TableDefs;
import org.spf4j.tsdb2.avro.Observation;
import org.spf4j.tsdb2.avro.TableDef;
/**
*
* @author zoly
*/
@ThreadSafe
public final class AvroMeasurementStore
implements MeasurementStore {
private CodecFactory codecFact;
private DataFileWriter<TableDef> infoWriter;
private DataFileWriter<Observation> dataWriter;
private Path infoFile;
private Path dataFile;
private long ids;
private final long timeRef;
private final AvroMeasurementStoreReader reader;
public enum Compressor {
SNAPPY, ZSTANDARD
}
private static Compressor getConfiguredCompressor() {
if (Boolean.parseBoolean(System.getProperty("spf4j.perf.avro.snappyEnable", "false"))) {
return Compressor.SNAPPY;
}
return Compressor.valueOf(System.getProperty("spf4j.perf.avro.compressor", "ZSTANDARD"));
}
public AvroMeasurementStore(final Path destinationPath, final String fileNameBase) throws IOException {
this(destinationPath, fileNameBase, getConfiguredCompressor());
}
public AvroMeasurementStore(final Path destinationPath, final String fileNameBase,
@Nullable final Compressor compressor)
throws IOException {
if (compressor != null) {
switch (compressor) {
case SNAPPY:
try {
Class.forName("org.xerial.snappy.Snappy");
codecFact = CodecFactory.snappyCodec();
} catch (ClassNotFoundException | UnsatisfiedLinkError ex) {
Logger.getLogger(AvroMeasurementStore.class.getName())
.info("Snappy compression not available for metrics store");
codecFact = null;
}
break;
case ZSTANDARD:
try {
Class.forName("com.github.luben.zstd.Zstd");
codecFact = CodecFactory.zstandardCodec(-2, true);
} catch (ClassNotFoundException | UnsatisfiedLinkError ex) {
Logger.getLogger(AvroMeasurementStore.class.getName())
.info("Zstandard compression not available for metrics store");
codecFact = null;
}
break;
default:
throw new UnsupportedOperationException("Unsuppored compressor " + compressor);
}
} else {
codecFact = null;
}
AvroFileInfo<TableDef> info = initWriter(fileNameBase, destinationPath, true, TableDef.class);
this.infoFile = info.getFilePath();
this.infoWriter = info.getFileWriter();
AvroFileInfo<Observation> data = initWriter(fileNameBase, destinationPath, false, Observation.class);
this.dataFile = data.getFilePath();
this.dataWriter = data.getFileWriter();
timeRef = data.getFileEpoch();
reader = new AvroMeasurementStoreReader(infoFile, dataFile);
}
private <T extends SpecificRecord>
AvroFileInfo<T> initWriter(final String fileNameBase,
final Path destinationPath,
final boolean countEntries,
final Class<T> clasz) throws IOException {
DataFileWriter<T> writer = new DataFileWriter<>(new SpecificDatumWriter<>(clasz));
if (codecFact != null) {
writer.setCodec(codecFact);
}
long epoch = System.currentTimeMillis();
writer.setMeta("timeRef", epoch);
String fileName = fileNameBase + '.' + clasz.getSimpleName().toLowerCase(Locale.US) + ".avro";
Path file = destinationPath.resolve(fileName);
long initNrRecords;
if (Files.isWritable(file)) {
try (DataFileStream<T> streamReader = new DataFileStream<>(Files.newInputStream(file),
new SpecificDatumReader<>(clasz))) {
if (countEntries) {
long count = 0L;
while (streamReader.hasNext()) {
count += streamReader.getBlockCount();
streamReader.nextBlock();
}
initNrRecords = count;
} else {
initNrRecords = -1L;
}
epoch = streamReader.getMetaLong("timeRef");
}
writer = writer.appendTo(file.toFile());
} else {
try {
writer.create(clasz.getConstructor().newInstance().getSchema(), file.toFile());
} catch (InstantiationException | IllegalAccessException
| NoSuchMethodException | InvocationTargetException ex) {
throw new RuntimeException(ex);
}
initNrRecords = 0L;
}
return new AvroFileInfo<>(file, writer, epoch, initNrRecords);
}
@Override
public long alocateMeasurements(final MeasurementsInfo measurement,
final int sampleTimeMillis) throws IOException {
synchronized (infoWriter) {
long id = ids++;
infoWriter.append(TableDefs.from(measurement, sampleTimeMillis, id));
return id;
}
}
public static <T extends SpecificRecord> long getNrRecords(final Path avroFile, final Class<T> clasz)
throws IOException {
try (DataFileStream<T> streamReader = new DataFileStream<>(Files.newInputStream(avroFile),
new SpecificDatumReader<>(clasz))) {
long count = 0L;
while (streamReader.hasNext()) {
count += streamReader.getBlockCount();
streamReader.nextBlock();
}
return count;
}
}
@Override
public void saveMeasurements(final long tableId,
final long timeStampMillis, final long... measurements)
throws IOException {
synchronized (dataWriter) {
dataWriter.append(new Observation(timeStampMillis - timeRef, tableId, Longs.asList(measurements)));
}
}
@Override
public void close() throws IOException {
synchronized (infoWriter) {
infoWriter.close();
}
synchronized (dataWriter) {
dataWriter.close();
}
}
@JmxExport(description = "flush out buffers")
@Override
public void flush() throws IOException {
synchronized (infoWriter) {
infoWriter.flush();
}
synchronized (dataWriter) {
dataWriter.flush();
}
}
public Path getInfoFile() {
return infoFile;
}
public Path getDataFile() {
return dataFile;
}
@Override
public String toString() {
return "AvroMeasurementStore{" + "codecFact=" + codecFact + ", infoWriter=" + infoWriter
+ ", dataWriter=" + dataWriter + ", infoFile=" + infoFile + ", dataFile=" + dataFile
+ ", ids=" + ids + ", timeRef=" + timeRef + '}';
}
@Override
public MeasurementStoreQuery query() {
return reader;
}
}