GraphiteUdpStore.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.perf.impl.ms.graphite;

import com.google.common.base.Throwables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.spf4j.base.HandlerNano;
import org.spf4j.base.Strings;
import org.spf4j.base.UncheckedExecutionException;
import org.spf4j.base.UncheckedTimeoutException;
import org.spf4j.failsafe.RetryPolicy;
import org.spf4j.io.ByteArrayBuilder;
import org.spf4j.perf.MeasurementsInfo;
import org.spf4j.perf.MeasurementStore;
import org.spf4j.perf.MeasurementStoreQuery;
import org.spf4j.perf.impl.ms.Id2Info;
import org.spf4j.recyclable.ObjectCreationException;
import org.spf4j.recyclable.ObjectDisposeException;
import org.spf4j.recyclable.RecyclingSupplier;
import org.spf4j.recyclable.Template;
import org.spf4j.recyclable.impl.RecyclingSupplierBuilder;

/**
 *
 * @author zoly
 */
public final class GraphiteUdpStore implements MeasurementStore {

  public static final int MAX_UDP_MSG_SIZE = 512;

  private final RecyclingSupplier<DatagramChannel> datagramChannelSupplier;

  private final InetSocketAddress address;

  @Override
  @Nullable
  public MeasurementStoreQuery query() {
    return null;
  }


  private static class DatagramChannelSupplierFactory implements RecyclingSupplier.Factory<DatagramChannel> {

    private final InetSocketAddress address;

    DatagramChannelSupplierFactory(final InetSocketAddress address) {
      this.address = address;
    }

    @Override
    public DatagramChannel create() throws ObjectCreationException {
      DatagramChannel datagramChannel;
      try {
        datagramChannel = DatagramChannel.open();
      } catch (IOException ex) {
        throw new ObjectCreationException(ex);
      }
      try {
        datagramChannel.connect(address);
        return datagramChannel;
      } catch (IOException ex) {
        try {
          datagramChannel.close();
        } catch (IOException ex1) {
          ex1.addSuppressed(ex);
          throw new ObjectCreationException(ex1);
        }
        throw new ObjectCreationException(ex);
      }
    }

    @Override
    public void dispose(final DatagramChannel object) throws ObjectDisposeException {
      try {
        object.close();
      } catch (IOException ex) {
        throw new ObjectDisposeException(ex);
      }
    }

    @Override
    public boolean validate(final DatagramChannel object, final Exception e) throws Exception {
      return e == null || !(Throwables.getRootCause(e) instanceof IOException);
    }

  }

  public GraphiteUdpStore(final String hostPort) throws ObjectCreationException, URISyntaxException {
    this(new URI("graphiteUdp://" + hostPort));
  }

  public GraphiteUdpStore(final URI uri) throws ObjectCreationException {
    this(uri.getHost(), uri.getPort());
  }

  public GraphiteUdpStore(final String hostName, final int port) throws ObjectCreationException {
    address = new InetSocketAddress(hostName, port);
    datagramChannelSupplier = new RecyclingSupplierBuilder<>(1,
            new DatagramChannelSupplierFactory(address)).build();
  }

  @Override
  public void flush() {
    // No buffering yet
  }

  @Override
  public long alocateMeasurements(final MeasurementsInfo measurement, final int sampleTimeMillis) {
    return Id2Info.getId(measurement);
  }

  @Override
  @SuppressFBWarnings("BED_BOGUS_EXCEPTION_DECLARATION") // fb nonsense
  public void saveMeasurements(final long tableId,
          final long timeStampMillis, final long... measurements) throws IOException {

    try {
      Template.doOnSupplied(new HandlerImpl(measurements, Id2Info.getInfo(tableId), timeStampMillis),
              1, TimeUnit.MINUTES,
              datagramChannelSupplier, RetryPolicy.defaultPolicy(), IOException.class);
    } catch (TimeoutException ex) {
      throw new UncheckedTimeoutException(ex);
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }

  }

  /**
   * Write with the plaintext protocol: https://graphite.readthedocs.io/en/0.9.10/feeding-carbon.html
   *
   * @param measurementInfo measuremrnt info
   * @param measurementName measurement name
   * @param measurement measurement value
   * @param timeStampMillis timestamp millis since epoch.
   * @param os the output writer to write to.
   * @throws IOException
   */
  public static void writeMetric(final MeasurementsInfo measurementInfo, final String measurementName,
          final long measurement, final long timeStampMillis, final Writer os)
          throws IOException {
    Strings.writeReplaceWhitespaces(measurementInfo.getMeasuredEntity().toString(), '-', os);
    os.append('/');
    Strings.writeReplaceWhitespaces(measurementName, '-', os);
    os.append(' ');
    os.append(Long.toString(measurement));
    os.append(' ');
    os.append(Long.toString(timeStampMillis));
    os.append('\n');
  }

  @Override
  public String toString() {
    return "GraphiteUdpStore{address=" + address + '}';
  }

  @Override
  public void close() {
    try {
      datagramChannelSupplier.dispose();
    } catch (ObjectDisposeException | InterruptedException ex) {
      throw new UncheckedExecutionException(ex);
    }
  }

  private static class HandlerImpl implements HandlerNano<DatagramChannel, Void, IOException> {

    private final long[] measurements;
    private final MeasurementsInfo measurementInfo;
    private final long timeStampMillis;

    HandlerImpl(final long[] measurements, final MeasurementsInfo measurementInfo,
            final long timeStampMillis) {
      this.measurements = measurements;
      this.measurementInfo = measurementInfo;
      this.timeStampMillis = timeStampMillis;
    }

    @Override
    @Nullable
    public Void handle(final DatagramChannel datagramChannel, final long deadline) throws IOException {
      try (ByteArrayBuilder bos = new ByteArrayBuilder();
              OutputStreamWriter os = new OutputStreamWriter(bos, StandardCharsets.UTF_8)) {

        int msgStart = 0;
        int msgEnd = 0;
        int prevEnd = 0;

        for (int i = 0; i < measurements.length; i++) {
          writeMetric(measurementInfo, measurementInfo.getMeasurementName(i),
                  measurements[i], timeStampMillis, os);
          os.flush();
          msgEnd = bos.size();
          int length = msgEnd - msgStart;
          if (length > MAX_UDP_MSG_SIZE) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(bos.getBuffer(), msgStart, prevEnd - msgStart);
            datagramChannel.write(byteBuffer);
            msgStart = prevEnd;
          }
          prevEnd = msgEnd;
        }
        if (msgEnd > msgStart) {
          ByteBuffer byteBuffer = ByteBuffer.wrap(bos.getBuffer(), msgStart, msgEnd - msgStart);
          datagramChannel.write(byteBuffer);
        }
      }
      return null;
    }
  }

}