1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.perf;
33
34 import com.google.common.annotations.Beta;
35 import java.time.Instant;
36 import java.util.Iterator;
37 import java.util.List;
38 import org.apache.avro.Schema;
39 import org.apache.avro.generic.GenericRecord;
40 import org.spf4j.tsdb2.avro.Aggregation;
41 import org.spf4j.tsdb2.avro.Observation;
42
43
44
45
46 public interface TimeSeriesRecord extends GenericRecord {
47
48 String MEASUREMENT_TYPE_PROP = "measurementType";
49 String AGGREGATION_TYPE_PROP = "aggregation";
50 String UNIT_TYPE_PROP = "unit";
51 String FREQ_MILLIS_REC_PROP = "frequencyMillis";
52 String RAW_NAME = "rawName";
53 String IDS_PROP = "ids";
54
55
56 Instant getTimeStamp();
57
58 long getLongValue(String column);
59
60 double getDoubleValue(String column);
61
62 static int getFrequencyMillis(final Schema schema) {
63 Number freq = (Number) schema.getObjectProp(FREQ_MILLIS_REC_PROP);
64 if (freq == null) {
65 return -1;
66 }
67 return freq.intValue();
68 }
69
70 static String getUnit(final Schema schema) {
71 String unit = (String) schema.getObjectProp(UNIT_TYPE_PROP);
72 if (unit == null) {
73 return "N/A";
74 }
75 return unit;
76 }
77
78 static TimeSeriesRecord from(final GenericRecord rec) {
79 return new TimeSeriesRecord() {
80 @Override
81 public Instant getTimeStamp() {
82 return (Instant) rec.get(0);
83 }
84
85 @Override
86 public long getLongValue(final String column) {
87 return ((Number) rec.get(column)).longValue();
88 }
89
90 @Override
91 public double getDoubleValue(final String column) {
92 return ((Number) rec.get(column)).doubleValue();
93 }
94
95 @Override
96 public void put(final String key, final Object v) {
97 rec.put(key, v);
98 }
99
100 @Override
101 public Object get(final String key) {
102 return rec.get(key);
103 }
104
105 @Override
106 public void put(final int i, final Object v) {
107 rec.put(i, v);
108 }
109
110 @Override
111 public Object get(final int i) {
112 return rec.get(i);
113 }
114
115 @Override
116 public Schema getSchema() {
117 return rec.getSchema();
118 }
119 };
120 }
121
122
123
124
125
126
127 @Beta
128 default void accumulate(final TimeSeriesRecord r2) {
129 Schema recSchema = getSchema();
130 Iterator<Schema.Field> it = recSchema.getFields().iterator();
131 it.next();
132 put(0, r2.get(0));
133 while (it.hasNext()) {
134 Schema.Field nf = it.next();
135 int pos = nf.pos();
136 Aggregation agg;
137 String prop = nf.schema().getProp(AGGREGATION_TYPE_PROP);
138 if (prop != null) {
139 agg = Aggregation.valueOf(prop);
140 } else {
141 agg = inferAggregationFromName(nf, recSchema);
142 }
143 switch (agg) {
144 case SUM:
145 put(pos, ((Long) get(pos)) + ((Long) r2.get(pos)));
146 break;
147 case MIN:
148 put(pos, Math.min((Long) get(pos), ((Long) r2.get(pos))));
149 break;
150 case MAX:
151 put(pos, Math.max((Long) get(pos), ((Long) r2.get(pos))));
152 break;
153 case FIRST:
154 break;
155 case LAST:
156 case UNKNOWN:
157 put(pos, ((Long) r2.get(pos)));
158 break;
159 default:
160 throw new UnsupportedOperationException("Unsupported aggregation: " + agg);
161 }
162 }
163 }
164
165
166 @Beta
167 static void accumulateObservations(final Schema recSchema, final Observation r1, final Observation r2) {
168 r1.setRelTimeStamp(r2.getRelTimeStamp());
169 r1.setTableDefId(-1L);
170 Iterator<Schema.Field> it = recSchema.getFields().iterator();
171 it.next();
172 List<Long> r1d = r1.getData();
173 List<Long> r2d = r2.getData();
174 while (it.hasNext()) {
175 Schema.Field nf = it.next();
176 int pos = nf.pos();
177 Aggregation agg;
178 String prop = nf.schema().getProp(AGGREGATION_TYPE_PROP);
179 if (prop != null) {
180 agg = Aggregation.valueOf(prop);
181 } else {
182 agg = inferAggregationFromName(nf, recSchema);
183 }
184 int apos = pos - 1;
185 switch (agg) {
186 case SUM:
187 r1d.set(apos, r1d.get(apos) + r2d.get(apos));
188 break;
189 case MIN:
190 r1d.set(apos, Math.min(r1d.get(apos), r2d.get(apos)));
191 break;
192 case MAX:
193 r1d.set(apos, Math.max(r1d.get(apos), r2d.get(apos)));
194 break;
195 case FIRST:
196 break;
197 case LAST:
198 case UNKNOWN:
199 r1d.set(apos, r2d.get(apos));
200 break;
201 default:
202 throw new UnsupportedOperationException("Unsupported aggregation: " + agg);
203 }
204 }
205 }
206
207
208 static Aggregation inferAggregationFromName(final Schema.Field nf, final Schema recSchema) {
209 Aggregation agg;
210 switch (nf.name()) {
211 case "count":
212 case "total":
213 agg = Aggregation.SUM;
214 break;
215 case "min":
216 agg = Aggregation.MIN;
217 break;
218 case "max":
219 agg = Aggregation.MAX;
220 break;
221 default:
222 String mType = recSchema.getProp(MEASUREMENT_TYPE_PROP);
223 if (mType != null) {
224 switch (mType) {
225 case "COUNTER":
226 case "SUMMARY":
227 agg = Aggregation.SUM;
228 break;
229 default:
230 agg = Aggregation.LAST;
231 }
232 } else {
233 agg = Aggregation.LAST;
234 }
235 }
236 return agg;
237 }
238
239
240
241 }