1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.tsdb2;
33
34 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35 import java.time.Instant;
36 import java.util.ArrayList;
37 import java.util.List;
38 import org.apache.avro.Schema;
39 import org.apache.avro.generic.GenericData;
40 import org.apache.avro.generic.GenericRecord;
41 import org.spf4j.avro.AvroCompatUtils;
42 import org.spf4j.base.LangIdEncDec;
43 import org.spf4j.perf.MeasurementsInfo;
44 import org.spf4j.perf.TimeSeriesRecord;
45 import org.spf4j.tsdb2.avro.Aggregation;
46 import org.spf4j.tsdb2.avro.ColumnDef;
47 import org.spf4j.tsdb2.avro.DataRow;
48 import org.spf4j.tsdb2.avro.MeasurementType;
49 import org.spf4j.tsdb2.avro.Observation;
50 import org.spf4j.tsdb2.avro.TableDef;
51 import org.spf4j.tsdb2.avro.Type;
52
53
54
55
56 public final class TableDefs {
57
58 private static final Schema INSTANT_SCHEMA
59 = new Schema.Parser().parse("{\"type\":\"string\",\"logicalType\":\"instant\"}");
60
61 private TableDefs() {
62 }
63
64 public static TableDef from(final MeasurementsInfo measurement, final int sampleTimeMillis,
65 final long id) {
66 int numberOfMeasurements = measurement.getNumberOfMeasurements();
67 List<ColumnDef> columns = new ArrayList<>(numberOfMeasurements);
68 for (int i = 0; i < numberOfMeasurements; i++) {
69 String mname = measurement.getMeasurementName(i);
70 String unit = measurement.getMeasurementUnit(i);
71 Aggregation aggregation = measurement.getMeasurementAggregation(i);
72 ColumnDef cd = new ColumnDef(mname, Type.LONG, unit, "", aggregation);
73 columns.add(cd);
74 }
75 return new TableDef(id,
76 measurement.getMeasuredEntity().toString(),
77 "", columns, sampleTimeMillis, measurement.getMeasurementType());
78 }
79
80 public static String sanitizeName(final String name) {
81 return LangIdEncDec.lossyEncode(name);
82 }
83
84 public static Schema createSchema(final TableDef td) {
85 String rawName = td.getName();
86 Schema recSchema = AvroCompatUtils.createRecordSchema(sanitizeName(rawName),
87 td.getDescription(), null, false, false);
88 List<ColumnDef> columns = td.getColumns();
89 List<Schema.Field> fields = new ArrayList<>(columns.size() + 1);
90 fields.add(AvroCompatUtils.createField("ts", INSTANT_SCHEMA, "Measurement time stamp", null, true, false,
91 Schema.Field.Order.IGNORE));
92 for (ColumnDef cd : columns) {
93 Type type = cd.getType();
94 String rawFieldName = cd.getName();
95 String fieldName = sanitizeName(rawFieldName);
96 switch (type) {
97 case DOUBLE:
98 Schema schema = Schema.create(Schema.Type.DOUBLE);
99 schema.addProp(TimeSeriesRecord.UNIT_TYPE_PROP, cd.getUnitOfMeasurement());
100 schema.addProp(TimeSeriesRecord.AGGREGATION_TYPE_PROP, cd.getAggregation().toString());
101 Schema.Field field = AvroCompatUtils.createField(fieldName, schema, cd.getDescription(), null, true, false,
102 Schema.Field.Order.IGNORE);
103 field.addProp(TimeSeriesRecord.RAW_NAME, rawFieldName);
104 fields.add(field);
105 break;
106 case LONG:
107 schema = Schema.create(Schema.Type.LONG);
108 schema.addProp(TimeSeriesRecord.UNIT_TYPE_PROP, cd.getUnitOfMeasurement());
109 schema.addProp(TimeSeriesRecord.AGGREGATION_TYPE_PROP, cd.getAggregation().toString());
110 field = AvroCompatUtils.createField(fieldName, schema, cd.getDescription(), null, true, false,
111 Schema.Field.Order.IGNORE);
112 field.addProp(TimeSeriesRecord.RAW_NAME, rawFieldName);
113 fields.add(field);
114 break;
115 default:
116 throw new IllegalStateException("Invalid data type " + type);
117 }
118 }
119 recSchema.setFields(fields);
120 int sampleTime = td.getSampleTime();
121 if (sampleTime > 0) {
122 recSchema.addProp(TimeSeriesRecord.FREQ_MILLIS_REC_PROP, sampleTime);
123 }
124 recSchema.addProp(TimeSeriesRecord.MEASUREMENT_TYPE_PROP, getMeasurementType(td));
125 recSchema.addProp(TimeSeriesRecord.RAW_NAME, rawName);
126 return recSchema;
127 }
128
129 public static TimeSeriesRecord toRecord(final Schema rSchema, final long baseTs, final DataRow row) {
130 GenericRecord rec = new GenericData.Record(rSchema);
131 long ts = baseTs + row.getRelTimeStamp();
132 rec.put(0, Instant.ofEpochMilli(ts));
133 List<Long> nrs = row.getData();
134 List<Schema.Field> fields = rSchema.getFields();
135 for (int i = 1, l = fields.size(), j = 0; i < l; i++, j++) {
136 Schema.Type type = fields.get(i).schema().getType();
137 switch (type) {
138 case DOUBLE:
139 rec.put(i, Double.longBitsToDouble(nrs.get(j)));
140 break;
141 case LONG:
142 rec.put(i, nrs.get(j));
143 break;
144 default:
145 throw new IllegalStateException("Unsupported data type: " + type);
146 }
147 }
148 return TimeSeriesRecord.from(rec);
149 }
150
151
152 public static TimeSeriesRecord toRecord(final Schema rSchema, final Observation row) {
153 GenericRecord rec = new GenericData.Record(rSchema);
154 rec.put(0, Instant.ofEpochMilli(row.getRelTimeStamp()));
155 List<Long> nrs = row.getData();
156 List<Schema.Field> fields = rSchema.getFields();
157 for (int i = 1, l = fields.size(), j = 0; i < l; i++, j++) {
158 Schema.Type type = fields.get(i).schema().getType();
159 switch (type) {
160 case DOUBLE:
161 rec.put(i, Double.longBitsToDouble(nrs.get(j)));
162 break;
163 case LONG:
164 rec.put(i, nrs.get(j));
165 break;
166 default:
167 throw new IllegalStateException("Unsupported data type: " + type);
168 }
169 }
170 return TimeSeriesRecord.from(rec);
171 }
172
173 @SuppressFBWarnings("STT_STRING_PARSING_A_FIELD")
174 public static MeasurementType getMeasurementType(final TableDef info) {
175 MeasurementType measurementType = info.getMeasurementType();
176 if (measurementType != MeasurementType.UNTYPED) {
177 return measurementType;
178 }
179 boolean hasCount = false;
180 for (ColumnDef colDef : info.getColumns()) {
181 String colName = colDef.getName();
182 if (colName.startsWith("Q") && colName.contains("_")) {
183 return MeasurementType.HISTOGRAM;
184 } else if ("sum".equals(colName)) {
185 return MeasurementType.SUMMARY;
186 } else if ("count".equals(colName)) {
187 hasCount = true;
188 }
189 }
190 if (hasCount) {
191 return MeasurementType.COUNTER;
192 }
193 return MeasurementType.UNTYPED;
194 }
195
196 }