1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.tsdb2;
33
34 import com.google.common.collect.ListMultimap;
35 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
36 import java.io.File;
37 import java.io.IOException;
38 import java.util.Arrays;
39 import java.util.concurrent.ArrayBlockingQueue;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.TimeoutException;
45 import org.junit.Assert;
46 import org.junit.Test;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import org.spf4j.base.Either;
50 import org.spf4j.tsdb2.avro.ColumnDef;
51 import org.spf4j.tsdb2.avro.DataBlock;
52 import org.spf4j.tsdb2.avro.TableDef;
53
54
55
56
57
58 @SuppressFBWarnings("PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS")
59 public class TSDBReaderTest {
60
61 private static final Logger LOG = LoggerFactory.getLogger(TSDBReaderTest.class);
62
63 private final TableDef tableDef = TableDef.newBuilder()
64 .setName("test")
65 .setDescription("test")
66 .setSampleTime(0)
67 .setColumns(Arrays.asList(
68 ColumnDef.newBuilder().setName("a").setDescription("atest").setUnitOfMeasurement("ms").build(),
69 ColumnDef.newBuilder().setName("b").setDescription("btest").setUnitOfMeasurement("ms").build(),
70 ColumnDef.newBuilder().setName("c").setDescription("ctest").setUnitOfMeasurement("ms").build()))
71 .build();
72
73 @Test
74 @SuppressFBWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE", "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
75 "NP_LOAD_OF_KNOWN_NULL_VALUE", "CLI_CONSTANT_LIST_INDEX"})
76
77 @SuppressWarnings("checkstyle:EmptyBlock")
78 public void testTsdb() throws IOException {
79 File testFile = File.createTempFile("test", ".tsdb2");
80 long tableId;
81 try (TSDBWriter writer = new TSDBWriter(testFile, 4, "test", false)) {
82
83 }
84 try (TSDBWriter writer = new TSDBWriter(testFile, 4, "test", false)) {
85 tableId = writer.writeTableDef(tableDef);
86 final long time = System.currentTimeMillis();
87 writer.writeDataRow(tableId, time, 0, 1, 2);
88 writer.writeDataRow(tableId, time + 10, 1, 1, 2);
89 writer.writeDataRow(tableId, time + 20, 2, 1, 2);
90 writer.writeDataRow(tableId, time + 30, 3, 1, 2);
91 writer.writeDataRow(tableId, time + 40, 4, 1, 2);
92 }
93 try (TSDBReader reader = new TSDBReader(testFile, 1024)) {
94 Either<TableDef, DataBlock> read;
95 while ((read = reader.read()) != null) {
96 LOG.debug("TSDB block: {}", read);
97 }
98 }
99
100 ListMultimap<String, TableDef> allTables = TSDBQuery.getAllTables(testFile);
101 Assert.assertEquals(1, allTables.size());
102 Assert.assertTrue(allTables.containsKey(tableDef.getName()));
103 TimeSeries timeSeries = TSDBQuery.getTimeSeries(testFile, new long[]{tableId}, 0, Long.MAX_VALUE);
104 Assert.assertEquals(2L, timeSeries.getValues()[2][0]);
105
106 }
107
108 @Test(timeout = 5000)
109 public void testTailing() throws IOException, InterruptedException, ExecutionException, TimeoutException {
110 File testFile = File.createTempFile("test", ".tsdb2");
111 try (TSDBWriter writer = new TSDBWriter(testFile, 4, "test", true);
112 TSDBReader reader = new TSDBReader(testFile, 1024)) {
113 writer.flush();
114 final BlockingQueue<Either<TableDef, DataBlock>> queue = new ArrayBlockingQueue<>(100);
115 Future<Void> bgWatch = reader.bgWatch((Either<TableDef, DataBlock> object, long deadline) -> {
116 queue.put(object);
117 }, TSDBReader.EventSensitivity.HIGH);
118 long tableId = writer.writeTableDef(tableDef);
119 writer.flush();
120 final long time = System.currentTimeMillis();
121 writer.writeDataRow(tableId, time, 0, 1, 2);
122 writer.writeDataRow(tableId, time + 10, 1, 1, 2);
123 writer.flush();
124 Either<TableDef, DataBlock> td = queue.take();
125 Assert.assertEquals(tableDef, td.getLeft());
126 Either<TableDef, DataBlock> take = queue.take();
127 Assert.assertEquals(2, take.getRight().getValues().size());
128 writer.writeDataRow(tableId, time + 20, 2, 1, 2);
129 writer.writeDataRow(tableId, time + 30, 3, 1, 2);
130 writer.writeDataRow(tableId, time + 40, 4, 1, 2);
131 writer.flush();
132 Assert.assertEquals(3, queue.take().getRight().getValues().size());
133 reader.stopWatching();
134 bgWatch.get(10000, TimeUnit.MILLISECONDS);
135
136 }
137
138 }
139
140 }