View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.perf;
33  
34  import com.google.common.annotations.Beta;
35  import java.time.Instant;
36  import java.util.Iterator;
37  import java.util.List;
38  import org.apache.avro.Schema;
39  import org.apache.avro.generic.GenericRecord;
40  import org.spf4j.tsdb2.avro.Aggregation;
41  import org.spf4j.tsdb2.avro.Observation;
42  
43  /**
44   * @author Zoltan Farkas
45   */
46  public interface TimeSeriesRecord extends GenericRecord {
47  
48    String MEASUREMENT_TYPE_PROP = "measurementType";
49    String AGGREGATION_TYPE_PROP = "aggregation";
50    String UNIT_TYPE_PROP = "unit";
51    String FREQ_MILLIS_REC_PROP = "frequencyMillis";
52    String RAW_NAME = "rawName";
53    String IDS_PROP = "ids";
54  
55  
56    Instant getTimeStamp();
57  
58    long getLongValue(String column);
59  
60    double getDoubleValue(String column);
61  
62    static int getFrequencyMillis(final Schema schema) {
63      Number freq = (Number) schema.getObjectProp(FREQ_MILLIS_REC_PROP);
64      if (freq == null) {
65        return -1;
66      }
67      return freq.intValue();
68    }
69  
70    static String getUnit(final Schema schema) {
71      String unit = (String) schema.getObjectProp(UNIT_TYPE_PROP);
72      if (unit == null) {
73        return "N/A";
74      }
75      return unit;
76    }
77  
78    static TimeSeriesRecord from(final GenericRecord rec) {
79      return new TimeSeriesRecord() {
80        @Override
81        public Instant getTimeStamp() {
82          return (Instant) rec.get(0);
83        }
84  
85        @Override
86        public long getLongValue(final String column) {
87          return ((Number) rec.get(column)).longValue();
88        }
89  
90        @Override
91        public double getDoubleValue(final String column) {
92          return ((Number) rec.get(column)).doubleValue();
93        }
94  
95        @Override
96        public void put(final String key, final Object v) {
97          rec.put(key, v);
98        }
99  
100       @Override
101       public Object get(final String key) {
102         return rec.get(key);
103       }
104 
105       @Override
106       public void put(final int i, final Object v) {
107         rec.put(i, v);
108       }
109 
110       @Override
111       public Object get(final int i) {
112         return rec.get(i);
113       }
114 
115       @Override
116       public Schema getSchema() {
117         return rec.getSchema();
118       }
119     };
120   }
121 
122   /**
123    * Temporary, until better implementation.
124    * @param accumulator
125    * @param r2
126    */
127   @Beta
128   default void accumulate(final TimeSeriesRecord r2) {
129     Schema recSchema = getSchema();
130     Iterator<Schema.Field> it = recSchema.getFields().iterator();
131     it.next();
132     put(0, r2.get(0));
133     while (it.hasNext()) {
134       Schema.Field nf = it.next();
135       int pos = nf.pos();
136       Aggregation agg;
137       String prop = nf.schema().getProp(AGGREGATION_TYPE_PROP);
138       if (prop != null) {
139         agg = Aggregation.valueOf(prop);
140       } else {
141         agg = inferAggregationFromName(nf, recSchema);
142       }
143       switch (agg) {
144         case SUM:
145           put(pos, ((Long) get(pos)) + ((Long) r2.get(pos)));
146           break;
147         case MIN:
148           put(pos, Math.min((Long) get(pos), ((Long) r2.get(pos))));
149           break;
150         case MAX:
151           put(pos, Math.max((Long) get(pos), ((Long) r2.get(pos))));
152           break;
153         case FIRST:
154           break;
155         case LAST:
156         case UNKNOWN:
157           put(pos, ((Long) r2.get(pos)));
158           break;
159         default:
160           throw new UnsupportedOperationException("Unsupported aggregation: " + agg);
161       }
162     }
163   }
164 
165 
166   @Beta
167   static void accumulateObservations(final Schema recSchema, final Observation r1, final Observation r2) {
168     r1.setRelTimeStamp(r2.getRelTimeStamp());
169     r1.setTableDefId(-1L);
170     Iterator<Schema.Field> it = recSchema.getFields().iterator();
171     it.next();
172     List<Long> r1d = r1.getData();
173     List<Long> r2d = r2.getData();
174     while (it.hasNext()) {
175       Schema.Field nf = it.next();
176       int pos = nf.pos();
177       Aggregation agg;
178       String prop = nf.schema().getProp(AGGREGATION_TYPE_PROP);
179       if (prop != null) {
180         agg = Aggregation.valueOf(prop);
181       } else {
182         agg = inferAggregationFromName(nf, recSchema);
183       }
184       int apos = pos - 1;
185       switch (agg) {
186         case SUM:
187           r1d.set(apos, r1d.get(apos) + r2d.get(apos));
188           break;
189         case MIN:
190            r1d.set(apos, Math.min(r1d.get(apos), r2d.get(apos)));
191           break;
192         case MAX:
193            r1d.set(apos, Math.max(r1d.get(apos), r2d.get(apos)));
194           break;
195         case FIRST:
196           break;
197         case LAST:
198         case UNKNOWN:
199           r1d.set(apos, r2d.get(apos));
200           break;
201         default:
202           throw new UnsupportedOperationException("Unsupported aggregation: " + agg);
203       }
204     }
205   }
206 
207 
208   static Aggregation inferAggregationFromName(final Schema.Field nf, final Schema recSchema) {
209     Aggregation agg;
210     switch (nf.name()) {
211       case "count":
212       case "total":
213         agg = Aggregation.SUM;
214         break;
215       case "min":
216         agg = Aggregation.MIN;
217         break;
218       case "max":
219         agg = Aggregation.MAX;
220         break;
221       default:
222         String mType = recSchema.getProp(MEASUREMENT_TYPE_PROP);
223         if (mType != null) {
224           switch (mType) {
225             case "COUNTER":
226             case "SUMMARY":
227               agg = Aggregation.SUM;
228               break;
229             default:
230               agg = Aggregation.LAST;
231           }
232         } else {
233           agg = Aggregation.LAST;
234         }
235     }
236     return agg;
237   }
238 
239 
240 
241 }