GraphiteTcpStore.java

/*
 * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 * Additionally licensed with:
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.spf4j.perf.impl.ms.graphite;

import com.google.common.base.Throwables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import javax.net.SocketFactory;
import org.spf4j.base.HandlerNano;
import org.spf4j.base.UncheckedExecutionException;
import org.spf4j.base.UncheckedTimeoutException;
import org.spf4j.failsafe.RetryPolicy;
import org.spf4j.perf.MeasurementsInfo;
import org.spf4j.perf.MeasurementStore;
import org.spf4j.perf.MeasurementStoreQuery;
import org.spf4j.perf.impl.ms.Id2Info;
import static org.spf4j.perf.impl.ms.graphite.GraphiteUdpStore.writeMetric;
import org.spf4j.recyclable.ObjectCreationException;
import org.spf4j.recyclable.ObjectDisposeException;
import org.spf4j.recyclable.RecyclingSupplier;
import org.spf4j.recyclable.Template;
import org.spf4j.recyclable.impl.RecyclingSupplierBuilder;

/**
 *
 * @author zoly
 */
public final class GraphiteTcpStore implements MeasurementStore {

  private final RecyclingSupplier<Writer> socketWriterSupplier;

  private final InetSocketAddress address;

  @Override
  @Nullable
  public MeasurementStoreQuery query() {
    return null;
  }



  private static class WriterSupplierFactory implements RecyclingSupplier.Factory<Writer> {

    private final String hostName;
    private final int port;
    private final SocketFactory socketFactory;

    WriterSupplierFactory(final SocketFactory socketFactory, final String hostName, final int port) {
      this.hostName = hostName;
      this.port = port;
      this.socketFactory = socketFactory;
    }

    @Override
    public Writer create() throws ObjectCreationException {
      Socket socket;
      try {
        socket = socketFactory.createSocket(hostName, port);
      } catch (IOException ex) {
        throw new ObjectCreationException(ex);
      }
      try {
        return new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
      } catch (IOException ex) {
        try {
          socket.close();
        } catch (IOException ex1) {
          ex1.addSuppressed(ex);
          throw new ObjectCreationException(ex1);
        }
        throw new ObjectCreationException(ex);
      }
    }

    @Override
    public void dispose(final Writer object) throws ObjectDisposeException {
      try {
        object.close();
      } catch (IOException ex) {
        throw new ObjectDisposeException(ex);
      }
    }

    @Override
    public boolean validate(final Writer object, final Exception e) {
      return e == null || !(Throwables.getRootCause(e) instanceof IOException);
    }

  }

  public GraphiteTcpStore(final String hostPort) throws ObjectCreationException, URISyntaxException {
    this(new URI("graphiteTcp://" + hostPort));
  }

  public GraphiteTcpStore(final URI uri) throws ObjectCreationException {
    this(uri.getHost(), uri.getPort());
  }

  public GraphiteTcpStore(final String hostName, final int port) throws ObjectCreationException {
    this(hostName, port, SocketFactory.getDefault());
  }

  public GraphiteTcpStore(final String hostName, final int port, final SocketFactory socketFactory)
          throws ObjectCreationException {
    address = new InetSocketAddress(hostName, port);
    socketWriterSupplier = new RecyclingSupplierBuilder<>(1,
            new WriterSupplierFactory(socketFactory, hostName, port)).build();
  }

  @Override
  public long alocateMeasurements(final MeasurementsInfo measurement, final int sampleTimeMillis) {
    return Id2Info.getId(measurement);
  }

  @Override
  @SuppressFBWarnings("BED_BOGUS_EXCEPTION_DECLARATION") // fb nonsense
  public void saveMeasurements(final long tableId,
          final long timeStampMillis, final long... measurements) throws IOException {
    try {
      Template.doOnSupplied(new HandlerImpl(measurements, Id2Info.getInfo(tableId), timeStampMillis),
              1, TimeUnit.MINUTES,
              socketWriterSupplier, RetryPolicy.defaultPolicy(), IOException.class);
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      throw new IOException("Interupted while saving measurements to " + tableId, ex);
    } catch (TimeoutException ex) {
      throw new UncheckedTimeoutException(ex);
    }
  }

  @Override
  public String toString() {
    return "GraphiteTcpStore{address=" + address + '}';
  }

  @Override
  public void close() {
    try {
      socketWriterSupplier.dispose();
    }  catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
      throw new UncheckedExecutionException(ex);
    } catch (ObjectDisposeException ex) {
      throw new UncheckedExecutionException(ex);
    }
  }

  private static class HandlerImpl implements HandlerNano<Writer, Void, IOException> {

    private final long[] measurements;
    private final MeasurementsInfo measurementInfo;
    private final long timeStampMillis;

    HandlerImpl(final long[] measurements, final MeasurementsInfo measurementInfo,
            final long timeStampMillis) {
      this.measurements = measurements;
      this.measurementInfo = measurementInfo;
      this.timeStampMillis = timeStampMillis;
    }

    @Override
    @Nullable
    public Void handle(final Writer socketWriter, final long deadline) throws IOException {
      for (int i = 0; i < measurements.length; i++) {
        writeMetric(measurementInfo, measurementInfo.getMeasurementName(i),
                measurements[i], timeStampMillis, socketWriter);
      }
      socketWriter.flush();
      return null;
    }
  }

  @Override
  public void flush() {
    // No buffering yet
  }

}