View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.tsdb2;
33  
34  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35  import java.time.Instant;
36  import java.util.ArrayList;
37  import java.util.List;
38  import org.apache.avro.Schema;
39  import org.apache.avro.generic.GenericData;
40  import org.apache.avro.generic.GenericRecord;
41  import org.spf4j.avro.AvroCompatUtils;
42  import org.spf4j.base.LangIdEncDec;
43  import org.spf4j.perf.MeasurementsInfo;
44  import org.spf4j.perf.TimeSeriesRecord;
45  import org.spf4j.tsdb2.avro.Aggregation;
46  import org.spf4j.tsdb2.avro.ColumnDef;
47  import org.spf4j.tsdb2.avro.DataRow;
48  import org.spf4j.tsdb2.avro.MeasurementType;
49  import org.spf4j.tsdb2.avro.Observation;
50  import org.spf4j.tsdb2.avro.TableDef;
51  import org.spf4j.tsdb2.avro.Type;
52  
53  /**
54   * @author Zoltan Farkas
55   */
56  public final class TableDefs {
57  
58    private static final Schema INSTANT_SCHEMA
59            = new Schema.Parser().parse("{\"type\":\"string\",\"logicalType\":\"instant\"}");
60  
61    private TableDefs() {
62    }
63  
64    public static TableDef from(final MeasurementsInfo measurement, final int sampleTimeMillis,
65            final long id) {
66      int numberOfMeasurements = measurement.getNumberOfMeasurements();
67      List<ColumnDef> columns = new ArrayList<>(numberOfMeasurements);
68      for (int i = 0; i < numberOfMeasurements; i++) {
69        String mname = measurement.getMeasurementName(i);
70        String unit = measurement.getMeasurementUnit(i);
71        Aggregation aggregation = measurement.getMeasurementAggregation(i);
72        ColumnDef cd = new ColumnDef(mname, Type.LONG, unit, "", aggregation);
73        columns.add(cd);
74      }
75      return  new TableDef(id,
76              measurement.getMeasuredEntity().toString(),
77              "", columns, sampleTimeMillis, measurement.getMeasurementType());
78    }
79  
80    public static String sanitizeName(final String name) {
81      return LangIdEncDec.lossyEncode(name);
82    }
83  
84    public static Schema createSchema(final TableDef td) {
85      String rawName = td.getName();
86      Schema recSchema = AvroCompatUtils.createRecordSchema(sanitizeName(rawName),
87              td.getDescription(), null, false, false);
88      List<ColumnDef> columns = td.getColumns();
89      List<Schema.Field> fields = new ArrayList<>(columns.size() + 1);
90      fields.add(AvroCompatUtils.createField("ts", INSTANT_SCHEMA, "Measurement time stamp", null, true, false,
91              Schema.Field.Order.IGNORE));
92      for (ColumnDef cd : columns) {
93        Type type = cd.getType();
94        String rawFieldName = cd.getName();
95        String fieldName = sanitizeName(rawFieldName);
96        switch (type) {
97          case DOUBLE:
98            Schema schema = Schema.create(Schema.Type.DOUBLE);
99            schema.addProp(TimeSeriesRecord.UNIT_TYPE_PROP, cd.getUnitOfMeasurement());
100           schema.addProp(TimeSeriesRecord.AGGREGATION_TYPE_PROP, cd.getAggregation().toString());
101           Schema.Field field = AvroCompatUtils.createField(fieldName, schema, cd.getDescription(), null, true, false,
102                 Schema.Field.Order.IGNORE);
103           field.addProp(TimeSeriesRecord.RAW_NAME, rawFieldName);
104           fields.add(field);
105           break;
106         case LONG:
107           schema = Schema.create(Schema.Type.LONG);
108           schema.addProp(TimeSeriesRecord.UNIT_TYPE_PROP, cd.getUnitOfMeasurement());
109           schema.addProp(TimeSeriesRecord.AGGREGATION_TYPE_PROP, cd.getAggregation().toString());
110           field = AvroCompatUtils.createField(fieldName, schema, cd.getDescription(), null, true, false,
111                 Schema.Field.Order.IGNORE);
112           field.addProp(TimeSeriesRecord.RAW_NAME, rawFieldName);
113           fields.add(field);
114           break;
115         default:
116           throw new IllegalStateException("Invalid data type " + type);
117       }
118     }
119     recSchema.setFields(fields);
120     int sampleTime = td.getSampleTime();
121     if (sampleTime > 0) {
122       recSchema.addProp(TimeSeriesRecord.FREQ_MILLIS_REC_PROP, sampleTime);
123     }
124     recSchema.addProp(TimeSeriesRecord.MEASUREMENT_TYPE_PROP, getMeasurementType(td));
125     recSchema.addProp(TimeSeriesRecord.RAW_NAME, rawName);
126     return recSchema;
127   }
128 
129   public static TimeSeriesRecord toRecord(final Schema rSchema, final long baseTs, final DataRow row) {
130     GenericRecord rec = new GenericData.Record(rSchema);
131     long ts = baseTs + row.getRelTimeStamp();
132     rec.put(0, Instant.ofEpochMilli(ts));
133     List<Long> nrs = row.getData();
134     List<Schema.Field> fields = rSchema.getFields();
135     for (int i = 1, l = fields.size(), j = 0; i < l; i++, j++) {
136       Schema.Type type = fields.get(i).schema().getType();
137       switch (type) {
138         case DOUBLE:
139           rec.put(i, Double.longBitsToDouble(nrs.get(j)));
140           break;
141         case LONG:
142           rec.put(i, nrs.get(j));
143           break;
144         default:
145           throw new IllegalStateException("Unsupported data type: " + type);
146       }
147     }
148     return TimeSeriesRecord.from(rec);
149   }
150 
151 
152  public static TimeSeriesRecord toRecord(final Schema rSchema, final Observation row) {
153     GenericRecord rec = new GenericData.Record(rSchema);
154     rec.put(0, Instant.ofEpochMilli(row.getRelTimeStamp()));
155     List<Long> nrs = row.getData();
156     List<Schema.Field> fields = rSchema.getFields();
157     for (int i = 1, l = fields.size(), j = 0; i < l; i++, j++) {
158       Schema.Type type = fields.get(i).schema().getType();
159       switch (type) {
160         case DOUBLE:
161           rec.put(i, Double.longBitsToDouble(nrs.get(j)));
162           break;
163         case LONG:
164           rec.put(i, nrs.get(j));
165           break;
166         default:
167           throw new IllegalStateException("Unsupported data type: " + type);
168       }
169     }
170     return TimeSeriesRecord.from(rec);
171   }
172 
173   @SuppressFBWarnings("STT_STRING_PARSING_A_FIELD")
174   public static MeasurementType getMeasurementType(final TableDef info) {
175     MeasurementType measurementType = info.getMeasurementType();
176     if (measurementType != MeasurementType.UNTYPED) {
177       return measurementType;
178     }
179     boolean hasCount = false;
180     for (ColumnDef colDef : info.getColumns()) {
181       String colName = colDef.getName();
182       if (colName.startsWith("Q") && colName.contains("_")) {
183         return MeasurementType.HISTOGRAM;
184       } else if ("sum".equals(colName)) {
185         return MeasurementType.SUMMARY;
186       } else if ("count".equals(colName)) {
187         hasCount = true;
188       }
189     }
190     if (hasCount) {
191       return MeasurementType.COUNTER;
192     }
193     return MeasurementType.UNTYPED;
194   }
195 
196 }