TSDBWriter.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.tsdb2;

import com.google.common.io.ByteStreams;
import com.google.common.primitives.Longs;
import edu.umd.cs.findbugs.annotations.CreatesObligation;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.spf4j.base.Strings;
import org.spf4j.io.BufferedInputStream;
import org.spf4j.io.ByteArrayBuilder;
import org.spf4j.recyclable.impl.ArraySuppliers;
import org.spf4j.tsdb2.avro.DataBlock;
import org.spf4j.tsdb2.avro.DataRow;
import org.spf4j.tsdb2.avro.Header;
import org.spf4j.tsdb2.avro.TableDef;

/**
 * Second generation Time-Series database format. The linked list structure from first generation is dropped to reduce
 * write overhead.
 *
 *
 * @author zoly
 */
public final class TSDBWriter implements Closeable, Flushable {

  public static final Schema FILE_RECORD_SCHEMA
          = Schema.createUnion(Arrays.asList(TableDef.SCHEMA$, DataBlock.SCHEMA$));

  static final byte[] MAGIC = Strings.toUtf8("TSDB2");

  private final File file;
  private final FileChannel channel;
  private final BinaryEncoder encoder;
  private final Header header;
  private final SpecificDatumWriter<Object> recordWriter = new SpecificDatumWriter<>(FILE_RECORD_SCHEMA);
  private final DataBlock writeBlock;
  private final int maxRowsPerBlock;
  private final RandomAccessFile raf;

  private final ByteArrayBuilder bab;

  @CreatesObligation
  public TSDBWriter(final File file, final int maxRowsPerBlock,
          final String description, final boolean append) throws IOException {
    this.file = file;
    this.maxRowsPerBlock = maxRowsPerBlock;
    this.writeBlock = new DataBlock(System.currentTimeMillis(), new ArrayList<DataRow>(maxRowsPerBlock));
    raf = new RandomAccessFile(file, "rw");
    bab = new ByteArrayBuilder(32768, ArraySuppliers.Bytes.JAVA_NEW);
    encoder = EncoderFactory.get().directBinaryEncoder(bab, null);
    channel = raf.getChannel();
    channel.lock();
    if (!append) {
      raf.setLength(0);
      channel.force(true);
    }
    if (raf.length() <= 0) {
      // new file or overwite, will write header;
      bab.write(MAGIC);
      toOutputStream(0, bab);
      header = Header.newBuilder()
              .setContentSchema(FILE_RECORD_SCHEMA.toString())
              .setDescription(description)
              .build();
      SpecificDatumWriter<Header> headerWriter = new SpecificDatumWriter<>(Header.SCHEMA$);
      headerWriter.write(header, encoder);
      encoder.flush();
      byte[] buffer = bab.getBuffer();
      final int size = bab.size();
      toByteArray(size, buffer, MAGIC.length);
      raf.write(buffer, 0, size);
      channel.force(true);
    } else {
      if (description != null) {
        throw new IllegalArgumentException("Providing description when appending is not allowed for " + file);
      }

      try (BufferedInputStream bis = new BufferedInputStream(Files.newInputStream(file.toPath()));
              DataInputStream dis = new DataInputStream(bis)) {
        validateType(dis);
        long size = dis.readLong();
        SpecificDatumReader<Header> reader = new SpecificDatumReader<>(Header.getClassSchema());
        BinaryDecoder directBinaryDecoder = DecoderFactory.get().directBinaryDecoder(dis, null);
        header = reader.read(null, directBinaryDecoder);
        raf.seek(size);
      }
    }
  }

  static void validateType(final InputStream dis) throws IOException {
    byte[] readMagic = new byte[MAGIC.length];
    ByteStreams.readFully(dis, readMagic);
    if (!Arrays.equals(MAGIC, readMagic)) {
      throw new IOException("wrong file type, magic is " + Arrays.toString(readMagic));
    }
  }

  public synchronized long writeTableDef(final TableDef tableDef) throws IOException {
    final long position = raf.getFilePointer();
    bab.reset();
    tableDef.setId(position);
    recordWriter.write(tableDef, encoder);
    encoder.flush();
    raf.write(bab.getBuffer(), 0, bab.size());
    return position;
  }

  public synchronized void writeDataRow(final long tableId, final long timestamp, final long... data)
          throws IOException {
    List<DataRow> blockValues = this.writeBlock.getValues();
    if (blockValues.size() >= this.maxRowsPerBlock) {
      flush();
    }
    long baseTs = writeBlock.getBaseTimestamp();
    DataRow row = new DataRow((int) (timestamp - baseTs), tableId, Longs.asList(data));
    blockValues.add(row);
  }

  @Override
  public synchronized void close() throws IOException {
    try (RandomAccessFile f = raf) {
      flush();
    }
  }

  public File getFile() {
    return file;
  }

  public static void toByteArray(final long pvalue, final byte[] bytes, final int idx) {
    long value = pvalue;
    for (int i = idx + 7; i >= idx; i--) {
      bytes[i] = (byte) (value & 0xffL);
      value >>= 8;
    }
  }

  public static void toOutputStream(final long pvalue, final OutputStream os) throws IOException {
    long value = pvalue;
    for (int i = 7; i >= 0; i--) {
      os.write((byte) (value & 0xffL));
      value >>= 8;
    }
  }

  /**
   * Commits the data to disk.
   *
   * @throws IOException
   */
  @Override
  public synchronized void flush() throws IOException {
    List<DataRow> blockValues = writeBlock.getValues();
    if (!blockValues.isEmpty()) {
      bab.reset();
      this.recordWriter.write(writeBlock, this.encoder);
      encoder.flush();
      raf.write(bab.getBuffer(), 0, bab.size());
      channel.force(true);
      updateEOFPtrPointer();
      blockValues.clear();
    }
    channel.force(true);
  }

  private void updateEOFPtrPointer() throws IOException {
    long filePointer = raf.getFilePointer();
    raf.seek(MAGIC.length);
    raf.writeLong(filePointer);
    raf.seek(filePointer);
  }

  @SuppressFBWarnings("EI_EXPOSE_REP")
  public Header getHeader() {
    return header;
  }

  @Override
  public String toString() {
    return "TSDBWriter{" + "file=" + file + ", raf=" + raf + '}';
  }

}