1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.commons.functor.aggregator;
18
19 import java.util.Collections;
20 import java.util.List;
21 import java.util.Timer;
22 import java.util.TimerTask;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.locks.ReadWriteLock;
25 import java.util.concurrent.locks.ReentrantReadWriteLock;
26
27 /**
28 * An aggregator which automatically resets the aggregated data at regular
29 * intervals and sends a notification when it is about to do so, so listeners
30 * can decide to gather the information before it is being reset (and log it
31 * etc). This allows for smaller memory footprints for instance in the case of
32 * List-based aggregators, as regularly the list is emptied. Also it allows for
33 * the call to <code>evaluate</code> to represent an "aggregated" value over a
34 * certain period of time. Note that you can still have a regular aggregator
35 * extending this class by specifying an interval less than or equal to zero.
36 * The regular flush/reset will be triggered from a timer which will always be
37 * started as a daemon thread (so it will stop when there are no more non-daemon
38 * threads in the JVM); this class allows 2 types of timers:
39 * <ul>
40 * <li>(default) per instance <code>Timer</code> -- each instance of this class
41 * will create a new <code>Timer</code> and this <code>Timer</code> will have a
42 * single <code>TimerTask</code> scheduled, which is the one that resets this
43 * <code>Aggregator</code> regularly and sends notifications. This way, when the
44 * <code>Aggregator</code> instance is destroyed, the <code>Timer</code> goes as
45 * well.</li>
46 * <li>shared <code>Timer</code> instance -- this class will create a static
47 * instance of <code>Timer</code> which can be shared by other instances of this
48 * class. While this is a bit more effective from a memory and thread management
49 * point of view, it has the downside that if the <code>TimerTask</code>'s are
50 * not managed properly this can create memory leaks. So if you decide to take
51 * this route make sure when you are finished with this instance, to always stop
52 * the timer at the end.</li>
53 * </ul>
54 * <p>
55 * <b>Synchronization</b>: This class provides a thread safe framework so when
56 * {@link #doAdd(Object)}, {@link #reset()} and {@link #evaluate()} is called,
57 * access is synchronized via a read-write lock. {@link #evaluate()} is
58 * considered a read operation and {@link #doAdd(Object)} and {@link #reset()}
59 * are considered write operations.
60 * </p>
61 *
62 * @param <T>
63 * type of data to aggregate
64 */
65 public abstract class AbstractTimedAggregator<T> implements Aggregator<T> {
66 /**
67 * Stores a list to all objects which are listening for time events
68 * generated by this object. If there is no timer programmed (e.g.
69 * {@link #interval} is set to 0) this list will be <code>null</code>. Under
70 * the cover, this will use a <code>CopyOnWriteArrayList</code> since there
71 * aren't too many updates expected to this list.
72 *
73 * @see #interval
74 * @see #timer
75 * @see TimedAggregatorListener
76 */
77 private List<TimedAggregatorListener<T>> timerListeners;
78
79 /**
80 * As per {@link #timer} javadoc, if the interval specified is zero or less
81 * there will be no <code>Timer</code> created/assigned to this instance.
82 * This constant is defined to make it easier to read code which creates
83 * instances of this class and doesn't assign them a timer.
84 */
85 public static final long NO_TIMER = 0L;
86
87 /**
88 * Name of the shared timer which will run all the TimerTasks resulted from
89 * creating instances of this class which are set to used the shared timer.
90 * This is useful when looking at thread dumps. For instances which use
91 * their own timer task, the name will be
92 * <code>TIMER_NAME + hashCode()</code>.
93 */
94 public static final String TIMER_NAME = "TimedSummarizerMainTimer";
95
96 /**
97 * The main shared timer which will execute all the <code>TimerTasks</code>
98 * resulted from instances of this class which chose to use the shared
99 * timer. Note that this <code>Timer</code> is started as a daemon thread so
100 * it will stop when there are no more non-daemon threads.
101 *
102 * @see #timer
103 */
104 private static final Timer MAIN_TIMER = new Timer(TIMER_NAME, true);
105
106 /**
107 * The timer instance for this instance. Can point to {@link #MAIN_TIMER} if
108 * shared timer was chosen in constructor or a newly created instance of
109 * <code>Timer</code> which is private to this instance only.
110 *
111 * @see #MAIN_TIMER
112 */
113 private Timer timer;
114
115 /**
116 * Interval in milliseconds we flush the result of the "summary". This will
117 * be used to set up our <code>TimerTask</code> and schedule it with the
118 * <code>Timer</code>. Every time the timer kicks in after this interval, it
119 * will call {@link #timer()}. If this is set to a value of zero or less, no
120 * timer will be created.
121 */
122 private long interval;
123
124 /**
125 * This is the task that is created when a new instance of this class is
126 * created. Once created this task will be scheduled with the {@link #timer}
127 * . Calling {@link #stop()} cancels this task and also will set it to null
128 * (so it can be recycled by the garbage collection), otherwise, until that
129 * point this will store a reference to a valid <code>TimerTask</code>
130 * instance.
131 */
132 private TimerTask task;
133
134 /**
135 * Lock used internally to synchronize access to {@link #add(Object)},
136 * {@link #reset()} and {@link #evaluate()}. Locks for writing when
137 * {@link #add(Object)} and {@link #reset()} is called and for reading when
138 * {@link #evaluate()} is called.
139 *
140 * @see #add(Object)
141 * @see #evaluate()
142 * @see #reset()
143 */
144 private ReadWriteLock dataLock;
145
146 /**
147 * Default constructor -- creates an instance of this aggregator with no
148 * <code>Timer</code>. Equivalent to
149 * <code>AbstractTimedAggregator(NO_TIMER)</code>.
150 *
151 * @see #AbstractTimedAggregator(long)
152 */
153 public AbstractTimedAggregator() {
154 this(NO_TIMER);
155 }
156
157 /**
158 * Creates an aggregator which has a timer at the specified interval
159 * (miliseconds) and uses its own timer rather than the shared
160 * {@link #MAIN_TIMER}. Equivalent to
161 * <code>AbstractTimedAggregator(interval,false)</code>.
162 *
163 * @param interval
164 * interval in miliseconds to set the timer for.
165 * @see #interval
166 * @see #timer
167 * @see #AbstractTimedAggregator(long, boolean)
168 */
169 public AbstractTimedAggregator(long interval) {
170 this(interval, false);
171 }
172
173 /**
174 * Creates an aggregator which has a timer at the specified interval and
175 * also allows control over using the {@link #MAIN_TIMER shared timer} or
176 * its own per-instance timer.
177 *
178 * @param interval
179 * interval in miliseconds to set the timer for.
180 * @param useSharedTimer
181 * if set to <code>true</code>, {@link #timer} will be set to
182 * {@link #TIMER_NAME}, otherwise a new instance of
183 * <code>Timer</code> will be created.
184 */
185 public AbstractTimedAggregator(long interval, boolean useSharedTimer) {
186 if (interval <= NO_TIMER) {
187 // not using timer
188 this.interval = NO_TIMER;
189 this.timer = null;
190 this.task = null;
191 this.timerListeners = null;
192 } else {
193 // we have been requested to use timers
194 this.interval = interval;
195 this.timerListeners = new CopyOnWriteArrayList<TimedAggregatorListener<T>>();
196 if (useSharedTimer) {
197 this.timer = MAIN_TIMER;
198 } else {
199 this.timer = new Timer(TIMER_NAME + hashCode(), true);
200 }
201 // having set up the timer, create the task
202 this.task = new TimerTask() {
203 @Override
204 public void run() {
205 timer();
206 }
207 };
208 this.timer.scheduleAtFixedRate(this.task, this.interval, this.interval);
209 }
210 this.dataLock = new ReentrantReadWriteLock();
211 }
212
213 /**
214 * Getter for {@link #interval}.
215 *
216 * @return Current value of {@link #interval}.
217 */
218 public final long getInterval() {
219 return interval;
220 }
221
222 /**
223 * Adds the data to this aggregator. This function first locks
224 * {@link #dataLock} for writing then calls {@link #doAdd(Object)}, which
225 * allows subclasses to perform the actual adding to the aggregator and then
226 * at the end it unlocks {@link #dataLock}.
227 *
228 * @param data
229 * Data to be added to the aggregator.
230 * @see #doAdd(Object)
231 * @see #dataLock
232 */
233 public final void add(T data) {
234 dataLock.writeLock().lock();
235 try {
236 doAdd(data);
237 } finally {
238 dataLock.writeLock().unlock();
239 }
240 }
241
242 /**
243 * Function provided to allow subclasses to perform the actual adding of the
244 * data to the aggregator. This function is wrapped by {@link #add(Object)}
245 * so that access to any internal data series (implemented by subclasses)
246 * via {@link #add(Object)} or {@link #evaluate()} or {@link #reset()} is
247 * prohibited during this call, as a <b>write</b> lock is acquired prior to
248 * this function call to ensure this function is the only one which has
249 * access to the data.
250 *
251 * @param data
252 * Data to be aggregated
253 * @see #add(Object)
254 */
255 protected abstract void doAdd(T data);
256
257 /**
258 * Aggregates all the data this object has been "fed" via calls to
259 * {@link #add(Object)}. Note that this object delegates the call to
260 * {@link #doEvaluate()} after it secured read-only access to
261 * {@link #dataLock} -- so any data series access can be safely read
262 * (however, subclasses should NOT try to modify any data series they might
263 * implement at this point!). The lock is released after
264 * {@link #doEvaluate()} returns.
265 *
266 * @return result of aggregating the data, as returned by
267 * {@link #doEvaluate()}
268 * @see #doEvaluate()
269 */
270 public final T evaluate() {
271 dataLock.readLock().lock();
272 try {
273 return doEvaluate();
274 } finally {
275 dataLock.readLock().unlock();
276 }
277 }
278
279 /**
280 * Allows subclasses to perform the actual evaluation of the aggregated
281 * result in a thread-safe manner. When this function is called,
282 * <b>write</b> access to data (via {@link #add(Object)} and
283 * {@link #reset()}) is prohibited until this function finishes. However,
284 * please note that other read access (via calls to the same
285 * {@link #evaluate()}) is possible.
286 *
287 * @return Result of evaluating the aggregated data
288 */
289 protected abstract T doEvaluate();
290
291 /**
292 * Resets this aggregator.This function first locks {@link #dataLock} for
293 * writing then calls {@link #doReset()}, which allows subclasses to perform
294 * the actual resetting of the aggregator and then at the end it unlocks
295 * {@link #dataLock}.
296 *
297 * @see #doReset()
298 */
299 public final void reset() {
300 dataLock.writeLock().lock();
301 try {
302 doReset();
303 } finally {
304 dataLock.writeLock().unlock();
305 }
306 }
307
308 /**
309 * Function provided to allow subclasses to perform the actual reset of the
310 * aggregator. This function is wrapped by {@link #reset()} so that access
311 * to data (via {@link #add(Object)} or {@link #evaluate()} or
312 * {@link #reset()}) is prohibited during this call, as a <b>write</b> lock
313 * is acquired prior to this function call to ensure this function is the
314 * only one which has access to the data.
315 */
316 protected abstract void doReset();
317
318 /**
319 * Retrieves the size of the currently-stored data series. This function
320 * first locks {@link #dataLock} for reading then calls
321 * {@link #retrieveDataSize()}, which allows subclasses to compute the data
322 * series size and then at the end it unlocks {@link #dataLock}.
323 *
324 * @return Size of the current data series, which will be aggregated at the
325 * next call to {@link #evaluate()}
326 */
327 public final int getDataSize() {
328 dataLock.readLock().lock();
329 try {
330 return retrieveDataSize();
331 } finally {
332 dataLock.readLock().unlock();
333 }
334 }
335
336 /**
337 * Function provided to allow subclasses to retrieve the actual size of the
338 * data series. This function is wrapped by {@link #getDataSize()} so that
339 * access to data (via {@link #add(Object)} or {@link #reset()}) is
340 * prohibited during this call, as a <b>read</b> lock is acquired prior to
341 * this function call. (However, calls to {@link #evaluate()} are allowed as
342 * that locks for reading too.)
343 *
344 * @return Size of the current data series. Zero means no data stored.
345 */
346 protected abstract int retrieveDataSize();
347
348 /**
349 * Retrieves <b>an unmodifiable copy</b> of the {@link #timerListeners timer
350 * listeners}. Used for testing.
351 *
352 * @return <code>Collections.unmodifiableList(timerListeners)</code> if
353 * {@link #timerListeners} is <b>not</b> <code>null</code>, or
354 * <code>null</code> otherwise.
355 */
356 final List<TimedAggregatorListener<T>> getTimerListeners() {
357 if (timerListeners == null) {
358 return null;
359 }
360 return Collections.unmodifiableList(timerListeners);
361 }
362
363 /**
364 * If this <code>Aggregator</code> has been started with timer support, it
365 * will add the given listener, so it receives
366 * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
367 * timer events}. If no timer support has been configured for this
368 * Aggregator, this call has no effect.
369 *
370 * @param listener
371 * Listener to be added to received timer events from this
372 * aggregator.
373 * @see #timerListeners
374 */
375 public final void addTimerListener(TimedAggregatorListener<T> listener) {
376 if (timerListeners == null) {
377 return;
378 }
379 timerListeners.add(listener);
380 }
381
382 /**
383 * Removes a listener from the timer listeners list if previously added. If
384 * this Aggregator has been configured with no timer support, this call will
385 * always return <code>false</code>.
386 *
387 * @param listener
388 * Listener to be removed from the list. NullPointerException
389 * thrown if this is null.
390 * @return <code>true</code> if this Aggregator has timer support and the
391 * listener passed in was previously added (via
392 * {@link #addTimerListener(TimedAggregatorListener)}) or false if
393 * either the Aggregator has no timer support or it has timer
394 * support but the listener was never registered with this
395 * Aggregator.
396 * @see #timerListeners
397 */
398 public final boolean removeTimerListener(TimedAggregatorListener<T> listener) {
399 if (timerListeners == null) {
400 return false;
401 }
402 return timerListeners.remove(listener);
403 }
404
405 /**
406 * Computes the current aggregated value (by calling {@link #evaluate()},
407 * resets this aggregator then notifies all listeners. Go through all the
408 * {@link #timerListeners} and sends
409 * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
410 * notification messages} to each of them. Does nothing if
411 * {@link #timerListeners} is <code>null</code>. Please note that
412 * {@link #evaluate()} is called only once at the beginning of this
413 * function, and only if there are listeners configured, then this value is
414 * passed to every notification. This is in order to ensure all listeners
415 * receive the same value -- the value of the evaluation prior to resetting
416 * it.
417 */
418 private void timer() {
419 if (timerListeners != null) {
420 // if we have listeners, notify them
421 T aggregated = evaluate(); // NOTE: shouldn't evaluate() and reset()
422 // be done atomically here?
423 reset();
424 for (TimedAggregatorListener<T> i : timerListeners) {
425 i.onTimer(this, aggregated);
426 }
427 } else {
428 reset();
429 }
430 }
431
432 /**
433 * Checks whether this instance has a timer associated with it or not. If
434 * there is a timer for this Aggregator, then the {@link #task} member
435 * should be set to a non-null value.
436 *
437 * @return <code>true</code> if {@link #task} is not null,
438 * <code>false</code> otherwise (in which case there is no timer).
439 */
440 public final boolean isTimerEnabled() {
441 return (task != null);
442 }
443
444 /**
445 * Checks whether this instance uses its own timer or {@link #MAIN_TIMER the
446 * shared timer} for scheduling {@link #task the timer task}.
447 *
448 * @return <code>true</code> if <code>timer == MAIN_TIMER</code> or
449 * <code>false</code> otherwise.
450 */
451 public final boolean isSharedTimer() {
452 return (timer == MAIN_TIMER);
453 }
454
455 /**
456 * Cancels the current timer task (if set) -- which means from there on the
457 * data will not be reset anymore. Also, if {@link #timer} is not set to
458 * {@link #MAIN_TIMER the shared timer} then it will be cancelled as well
459 * Also releases all the listeners from the {@link #timerListeners list}.
460 */
461 public final void stop() {
462 // cancel the task first
463 if (task != null) {
464 task.cancel();
465 task = null;
466 timer.purge(); // remove the reference to this task
467 }
468 // then the timer if needed
469 if (timer != null && timer != MAIN_TIMER) {
470 timer.cancel();
471 }
472 timer = null;
473 // finally remove the elements from the listeners list
474 if (timerListeners != null) {
475 timerListeners.clear();
476 }
477 }
478
479 @Override
480 protected final void finalize() throws Throwable {
481 // if we're going in the garbage, make sure we clean up
482 stop();
483 }
484
485 @Override
486 public String toString() {
487 return AbstractTimedAggregator.class.getName();
488 }
489 }