TSDBReader.java
/*
* Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* Additionally licensed with:
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.spf4j.tsdb2;
import org.spf4j.io.CountingInputStream;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Longs;
import com.sun.nio.file.SensitivityWatchEventModifier;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.spf4j.base.Either;
import org.spf4j.base.ExecutionContexts;
import org.spf4j.base.Handler;
import org.spf4j.base.TimeSource;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.io.MemorizingBufferedInputStream;
import org.spf4j.tsdb2.avro.DataBlock;
import org.spf4j.tsdb2.avro.Header;
import org.spf4j.tsdb2.avro.TableDef;
/**
*
* @author zoly
*/
@SuppressFBWarnings("IICU_INCORRECT_INTERNAL_CLASS_USE")
public final class TSDBReader implements Closeable {
private static final boolean CORUPTION_LENIENT = Boolean.getBoolean("spf4j.tsdb2.lenientRead");
private static final Schema R_SCHEMA = Schema.createUnion(Arrays.asList(TableDef.SCHEMA$, DataBlock.SCHEMA$));
private CountingInputStream bis;
private final Header header;
private long size;
private BinaryDecoder decoder;
private final SpecificDatumReader<Object> recordReader;
private RandomAccessFile raf;
private final File file;
private final Path filePath;
private volatile boolean watch;
private final int bufferSize;
private final SeekableByteChannel byteChannel;
public TSDBReader(final File file, final int bufferSize) throws IOException {
this(file, bufferSize, 0L);
}
public TSDBReader(final File file, final int bufferSize, final long from) throws IOException {
this.file = file;
this.filePath = file.toPath();
this.bufferSize = bufferSize;
this.byteChannel = Files.newByteChannel(filePath);
resetStream(0);
SpecificDatumReader<Header> reader = new SpecificDatumReader<>(Header.getClassSchema());
TSDBWriter.validateType(bis);
byte[] buff = new byte[8];
ByteStreams.readFully(bis, buff);
size = Longs.fromByteArray(buff);
header = reader.read(null, decoder);
recordReader = new SpecificDatumReader<>(
new Schema.Parser().parse(header.getContentSchema()), R_SCHEMA);
if (from > 0L) {
resetStream(from);
}
}
private void resetStream(final long position) throws IOException {
byteChannel.position(position);
bis = new CountingInputStream(new MemorizingBufferedInputStream(Channels.newInputStream(byteChannel),
bufferSize), position);
decoder = DecoderFactory.get().directBinaryDecoder(bis, decoder);
}
/**
* method useful when implementing tailing.
*
* @return true if size changed.
* @throws IOException
*/
public synchronized boolean reReadSize() throws IOException {
if (raf == null) {
raf = new RandomAccessFile(file, "r");
}
raf.seek(TSDBWriter.MAGIC.length);
long old = size;
size = raf.readLong();
if (size != old) {
resetStream(bis.getCount());
return true;
} else {
return false;
}
}
@Nullable
public synchronized Either<TableDef, DataBlock> read() throws IOException {
final long position = bis.getCount();
if (position >= size) {
return null;
}
Object result;
try {
result = recordReader.read(null, decoder);
} catch (IOException | RuntimeException ex) {
if (CORUPTION_LENIENT) {
return null;
} else {
throw new IOException("Error reading tsdb file at " + position + ", this= " + this, ex);
}
}
if (result instanceof TableDef) {
final TableDef td = (TableDef) result;
long tdId = td.getId();
if (position != tdId) {
throw new IOException("Table Id should be equal with file position " + position + ", " + tdId);
}
return Either.left(td);
} else {
return Either.right((DataBlock) result);
}
}
@Override
public synchronized void close() throws IOException {
try (InputStream is = bis) {
if (raf != null) {
raf.close();
}
}
}
public synchronized long getSize() {
return size;
}
@SuppressFBWarnings("EI_EXPOSE_REP")
public Header getHeader() {
return header;
}
public void stopWatching() {
watch = false;
}
public synchronized <E extends Exception> Future<Void> bgWatch(
final Handler<Either<TableDef, DataBlock>, E> handler,
final EventSensitivity es) {
return bgWatch(handler, es, ExecutionContexts.getContextDeadlineNanos());
}
public synchronized <E extends Exception> Future<Void> bgWatch(
final Handler<Either<TableDef, DataBlock>, E> handler,
final EventSensitivity es, final long timeout, final TimeUnit unit) {
return bgWatch(handler, es, TimeSource.nanoTime() + unit.toNanos(timeout));
}
//CHECKSTYLE:OFF
public synchronized <E extends Exception> Future<Void> bgWatch(
final Handler<Either<TableDef, DataBlock>, E> handler,
final EventSensitivity es, final long deadlineNanos) {
//CHECKSTYLE:ON
return DefaultExecutor.INSTANCE.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
watch(handler, es);
return null;
}
});
}
public enum EventSensitivity {
HIGH, MEDIUM, LOW
}
public <E extends Exception> void watch(final Handler<Either<TableDef, DataBlock>, E> handler,
final EventSensitivity es) throws IOException, InterruptedException, E {
watch(handler, es, ExecutionContexts.getContextDeadlineNanos());
}
public <E extends Exception> void watch(final Handler<Either<TableDef, DataBlock>, E> handler,
final EventSensitivity es, final long timeout, final TimeUnit unit)
throws IOException, InterruptedException, E {
watch(handler, es, TimeSource.nanoTime() + unit.toNanos(timeout));
}
//CHECKSTYLE:OFF
@SuppressFBWarnings("NOS_NON_OWNED_SYNCHRONIZATION")
public <E extends Exception> void watch(final Handler<Either<TableDef, DataBlock>, E> handler,
final EventSensitivity es, final long deadlineNanos)
throws IOException, InterruptedException, E {
//CHECKSTYLE:ON
synchronized (this) {
if (watch) {
throw new IllegalStateException("File is already watched " + file);
}
watch = true;
}
SensitivityWatchEventModifier sensitivity;
switch (es) {
case LOW:
sensitivity = SensitivityWatchEventModifier.LOW;
break;
case MEDIUM:
sensitivity = SensitivityWatchEventModifier.MEDIUM;
break;
case HIGH:
sensitivity = SensitivityWatchEventModifier.HIGH;
break;
default:
throw new UnsupportedOperationException("Unsupported sensitivity " + es);
}
final Path path = file.getParentFile().toPath();
try (WatchService watchService = path.getFileSystem().newWatchService()) {
path.register(watchService, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.OVERFLOW
}, sensitivity);
readAll(handler, deadlineNanos);
do {
long tNanos = deadlineNanos - TimeSource.nanoTime();
if (tNanos <= 0) {
break;
}
WatchKey key = watchService.poll(1, TimeUnit.SECONDS);
if (key == null) {
if (reReadSize()) {
readAll(handler, deadlineNanos);
}
continue;
}
if (!key.isValid()) {
key.cancel();
break;
}
if (!key.pollEvents().isEmpty() && reReadSize()) {
readAll(handler, deadlineNanos);
}
if (!key.reset()) {
key.cancel();
break;
}
} while (watch);
} finally {
watch = false;
}
}
//CHECKSTYLE:OFF
public synchronized <E extends Exception> void readAll(final Handler<Either<TableDef, DataBlock>, E> handler,
final long deadlineNanos)
throws IOException, E {
//CHECKSTYLE:ON
Either<TableDef, DataBlock> data;
while ((data = read()) != null) {
handler.handle(data, deadlineNanos);
}
}
public synchronized void readAll(final Consumer<Either<TableDef, DataBlock>> consumer)
throws IOException {
Either<TableDef, DataBlock> data;
while ((data = read()) != null) {
consumer.accept(data);
}
}
@Override
public String toString() {
return "TSDBReader{" + "size=" + size + ", raf=" + raf + ", file=" + file + '}';
}
}