1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 package org.spf4j.base;
33
34 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
35 import java.util.ArrayDeque;
36 import java.util.Deque;
37 import java.util.Set;
38 import java.util.function.BiConsumer;
39 import java.util.function.BinaryOperator;
40 import java.util.function.Function;
41 import java.util.function.Predicate;
42 import java.util.function.Supplier;
43 import java.util.stream.Collector;
44
45
46
47
48 public final class XCollectors {
49
50 private XCollectors() {
51 }
52
53 public static <T> Collector<T, ?, ArrayDeque<T>> last(final int limit) {
54 return Collector.of(
55 ArrayDeque<T>::new,
56 (l, e) -> {
57 if (l.size() >= limit) {
58 l.removeFirst();
59 }
60 l.addLast(e);
61 },
62 (l1, l2) -> {
63 throw new UnsupportedOperationException("Limiting collectors do not support combining");
64 }
65 );
66 }
67
68 public static <T, X extends T> Collector<T, ArrayDeque<T>,
69 ArrayDeque<T>> last(final int limit, final X addIfLimited) {
70 return last(() -> new ArrayDeque<T>(), limit, addIfLimited);
71 }
72
73 public static <T, X extends T, D extends Deque<T>> Collector<T, D, D> last(final Supplier<D> dqSupp,
74 final int limit, final X addIfLimited) {
75 return Collector.of(dqSupp,
76 (l, e) -> {
77 l.addLast(e);
78 limitDequeue(l, limit, addIfLimited);
79 }, new BinaryOperator<D>() {
80 @Override
81 @SuppressFBWarnings("CFS_CONFUSING_FUNCTION_SEMANTICS")
82
83 public D apply(final D l1, final D l2) {
84 l1.addAll(l2);
85 limitDequeue(l1, limit, addIfLimited);
86 return l1;
87 }
88 }, Collector.Characteristics.IDENTITY_FINISH);
89 }
90
91 public static <T> void limitDequeue(final Deque<T> l1, final int limit, final T addIfLimited) {
92 int extra = l1.size() - limit;
93 if (extra > 0) {
94 for (int i = 0, m = extra + 1; i < m; i++) {
95 l1.removeFirst();
96 }
97 l1.addFirst(addIfLimited);
98 }
99 }
100
101
102
103
104 public static <T, A, R>
105 Collector<T, ?, R> filtering(final Predicate<? super T> predicate,
106 final Collector<? super T, A, R> downstream) {
107 BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
108 return new CollectorImpl<>(downstream.supplier(),
109 (r, t) -> {
110 if (predicate.test(t)) {
111 downstreamAccumulator.accept(r, t);
112 }
113 },
114 downstream.combiner(), downstream.finisher(),
115 downstream.characteristics());
116 }
117
118 @SuppressWarnings("unchecked")
119 private static <I, R> Function<I, R> castingIdentity() {
120 return (Function<I, R>) Function.identity();
121 }
122
123 static final class CollectorImpl<T, A, R> implements Collector<T, A, R> {
124
125 private final Supplier<A> supplier;
126 private final BiConsumer<A, T> accumulator;
127 private final BinaryOperator<A> combiner;
128 private final Function<A, R> finisher;
129 private final Set<Collector.Characteristics> characteristics;
130
131 CollectorImpl(final Supplier<A> supplier,
132 final BiConsumer<A, T> accumulator,
133 final BinaryOperator<A> combiner,
134 final Function<A, R> finisher,
135 final Set<Collector.Characteristics> characteristics) {
136 this.supplier = supplier;
137 this.accumulator = accumulator;
138 this.combiner = combiner;
139 this.finisher = finisher;
140 this.characteristics = characteristics;
141 }
142
143 CollectorImpl(final Supplier<A> supplier,
144 final BiConsumer<A, T> accumulator,
145 final BinaryOperator<A> combiner,
146 final Set<Collector.Characteristics> characteristics) {
147 this(supplier, accumulator, combiner, castingIdentity(), characteristics);
148 }
149
150 @Override
151 public BiConsumer<A, T> accumulator() {
152 return accumulator;
153 }
154
155 @Override
156 public Supplier<A> supplier() {
157 return supplier;
158 }
159
160 @Override
161 public BinaryOperator<A> combiner() {
162 return combiner;
163 }
164
165 @Override
166 public Function<A, R> finisher() {
167 return finisher;
168 }
169
170 @Override
171 public Set<Collector.Characteristics> characteristics() {
172 return characteristics;
173 }
174 }
175
176 }