View Javadoc
1   /*
2    * Copyright (c) 2001-2017, Zoltan Farkas All Rights Reserved.
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12   * GNU General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this program; if not, write to the Free Software
16   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
17   *
18   * Additionally licensed with:
19   *
20   * Licensed under the Apache License, Version 2.0 (the "License");
21   * you may not use this file except in compliance with the License.
22   * You may obtain a copy of the License at
23   *
24   *      http://www.apache.org/licenses/LICENSE-2.0
25   *
26   * Unless required by applicable law or agreed to in writing, software
27   * distributed under the License is distributed on an "AS IS" BASIS,
28   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29   * See the License for the specific language governing permissions and
30   * limitations under the License.
31   */
32  package org.spf4j.tsdb2;
33  
34  import com.google.common.collect.ListMultimap;
35  import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
36  import java.io.File;
37  import java.io.IOException;
38  import java.util.Arrays;
39  import java.util.concurrent.ArrayBlockingQueue;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.ExecutionException;
42  import java.util.concurrent.Future;
43  import java.util.concurrent.TimeUnit;
44  import java.util.concurrent.TimeoutException;
45  import org.junit.Assert;
46  import org.junit.Test;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  import org.spf4j.base.Either;
50  import org.spf4j.tsdb2.avro.ColumnDef;
51  import org.spf4j.tsdb2.avro.DataBlock;
52  import org.spf4j.tsdb2.avro.TableDef;
53  
54  /**
55   *
56   * @author zoly
57   */
58  @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
59  public class TSDBReaderTest {
60  
61    private static final Logger LOG = LoggerFactory.getLogger(TSDBReaderTest.class);
62  
63    private final TableDef tableDef = TableDef.newBuilder()
64            .setName("test")
65            .setDescription("test")
66            .setSampleTime(0)
67            .setColumns(Arrays.asList(
68                    ColumnDef.newBuilder().setName("a").setDescription("atest").setUnitOfMeasurement("ms").build(),
69                    ColumnDef.newBuilder().setName("b").setDescription("btest").setUnitOfMeasurement("ms").build(),
70                    ColumnDef.newBuilder().setName("c").setDescription("ctest").setUnitOfMeasurement("ms").build()))
71            .build();
72  
73    @Test
74    @SuppressFBWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE", "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
75      "NP_LOAD_OF_KNOWN_NULL_VALUE", "CLI_CONSTANT_LIST_INDEX"})
76    // try with resources trips up findbugs sometimes.
77    @SuppressWarnings("checkstyle:EmptyBlock")
78    public void testTsdb() throws IOException {
79      File testFile = File.createTempFile("test", ".tsdb2");
80      long tableId;
81      try (TSDBWriter writer = new TSDBWriter(testFile, 4, "test", false)) {
82  
83      }
84      try (TSDBWriter writer = new TSDBWriter(testFile, 4, "test", false)) {
85        tableId = writer.writeTableDef(tableDef);
86        final long time = System.currentTimeMillis();
87        writer.writeDataRow(tableId, time, 0, 1, 2);
88        writer.writeDataRow(tableId, time + 10, 1, 1, 2);
89        writer.writeDataRow(tableId, time + 20, 2, 1, 2);
90        writer.writeDataRow(tableId, time + 30, 3, 1, 2);
91        writer.writeDataRow(tableId, time + 40, 4, 1, 2);
92      }
93      try (TSDBReader reader = new TSDBReader(testFile, 1024)) {
94        Either<TableDef, DataBlock> read;
95        while ((read = reader.read()) != null) {
96          LOG.debug("TSDB block: {}", read);
97        }
98      }
99  
100     ListMultimap<String, TableDef> allTables = TSDBQuery.getAllTables(testFile);
101     Assert.assertEquals(1, allTables.size());
102     Assert.assertTrue(allTables.containsKey(tableDef.getName()));
103     TimeSeries timeSeries = TSDBQuery.getTimeSeries(testFile, new long[]{tableId}, 0, Long.MAX_VALUE);
104     Assert.assertEquals(2L, timeSeries.getValues()[2][0]);
105 
106   }
107 
108   @Test(timeout = 5000)
109   public void testTailing() throws IOException, InterruptedException, ExecutionException, TimeoutException {
110     File testFile = File.createTempFile("test", ".tsdb2");
111     try (TSDBWriter writer = new TSDBWriter(testFile, 4, "test", true);
112             TSDBReader reader = new TSDBReader(testFile, 1024)) {
113       writer.flush();
114       final BlockingQueue<Either<TableDef, DataBlock>> queue = new ArrayBlockingQueue<>(100);
115       Future<Void> bgWatch = reader.bgWatch((Either<TableDef, DataBlock> object, long deadline) -> {
116         queue.put(object);
117       }, TSDBReader.EventSensitivity.HIGH);
118       long tableId = writer.writeTableDef(tableDef);
119       writer.flush();
120       final long time = System.currentTimeMillis();
121       writer.writeDataRow(tableId, time, 0, 1, 2);
122       writer.writeDataRow(tableId, time + 10, 1, 1, 2);
123       writer.flush();
124       Either<TableDef, DataBlock> td = queue.take();
125       Assert.assertEquals(tableDef, td.getLeft());
126       Either<TableDef, DataBlock> take = queue.take();
127       Assert.assertEquals(2, take.getRight().getValues().size());
128       writer.writeDataRow(tableId, time + 20, 2, 1, 2);
129       writer.writeDataRow(tableId, time + 30, 3, 1, 2);
130       writer.writeDataRow(tableId, time + 40, 4, 1, 2);
131       writer.flush();
132       Assert.assertEquals(3, queue.take().getRight().getValues().size());
133       reader.stopWatching();
134       bgWatch.get(10000, TimeUnit.MILLISECONDS);
135 
136     }
137 
138   }
139 
140 }