1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *      http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.apache.commons.functor.aggregator;
18  
19  import java.util.Collections;
20  import java.util.List;
21  import java.util.Timer;
22  import java.util.TimerTask;
23  import java.util.concurrent.CopyOnWriteArrayList;
24  import java.util.concurrent.locks.ReadWriteLock;
25  import java.util.concurrent.locks.ReentrantReadWriteLock;
26  
27  /**
28   * An aggregator which automatically resets the aggregated data at regular
29   * intervals and sends a notification when it is about to do so, so listeners
30   * can decide to gather the information before it is being reset (and log it
31   * etc). This allows for smaller memory footprints for instance in the case of
32   * List-based aggregators, as regularly the list is emptied. Also it allows for
33   * the call to <code>evaluate</code> to represent an "aggregated" value over a
34   * certain period of time. Note that you can still have a regular aggregator
35   * extending this class by specifying an interval less than or equal to zero.
36   * The regular flush/reset will be triggered from a timer which will always be
37   * started as a daemon thread (so it will stop when there are no more non-daemon
38   * threads in the JVM); this class allows 2 types of timers:
39   * <ul>
40   * <li>(default) per instance <code>Timer</code> -- each instance of this class
41   * will create a new <code>Timer</code> and this <code>Timer</code> will have a
42   * single <code>TimerTask</code> scheduled, which is the one that resets this
43   * <code>Aggregator</code> regularly and sends notifications. This way, when the
44   * <code>Aggregator</code> instance is destroyed, the <code>Timer</code> goes as
45   * well.</li>
46   * <li>shared <code>Timer</code> instance -- this class will create a static
47   * instance of <code>Timer</code> which can be shared by other instances of this
48   * class. While this is a bit more effective from a memory and thread management
49   * point of view, it has the downside that if the <code>TimerTask</code>'s are
50   * not managed properly this can create memory leaks. So if you decide to take
51   * this route make sure when you are finished with this instance, to always stop
52   * the timer at the end.</li>
53   * </ul>
54   * <p>
55   * <b>Synchronization</b>: This class provides a thread safe framework so when
56   * {@link #doAdd(Object)}, {@link #reset()} and {@link #evaluate()} is called,
57   * access is synchronized via a read-write lock. {@link #evaluate()} is
58   * considered a read operation and {@link #doAdd(Object)} and {@link #reset()}
59   * are considered write operations.
60   * </p>
61   *
62   * @param <T>
63   *            type of data to aggregate
64   */
65  public abstract class AbstractTimedAggregator<T> implements Aggregator<T> {
66      /**
67       * Stores a list to all objects which are listening for time events
68       * generated by this object. If there is no timer programmed (e.g.
69       * {@link #interval} is set to 0) this list will be <code>null</code>. Under
70       * the cover, this will use a <code>CopyOnWriteArrayList</code> since there
71       * aren't too many updates expected to this list.
72       *
73       * @see #interval
74       * @see #timer
75       * @see TimedAggregatorListener
76       */
77      private List<TimedAggregatorListener<T>> timerListeners;
78  
79      /**
80       * As per {@link #timer} javadoc, if the interval specified is zero or less
81       * there will be no <code>Timer</code> created/assigned to this instance.
82       * This constant is defined to make it easier to read code which creates
83       * instances of this class and doesn't assign them a timer.
84       */
85      public static final long                 NO_TIMER   = 0L;
86  
87      /**
88       * Name of the shared timer which will run all the TimerTasks resulted from
89       * creating instances of this class which are set to used the shared timer.
90       * This is useful when looking at thread dumps. For instances which use
91       * their own timer task, the name will be
92       * <code>TIMER_NAME + hashCode()</code>.
93       */
94      public static final String               TIMER_NAME = "TimedSummarizerMainTimer";
95  
96      /**
97       * The main shared timer which will execute all the <code>TimerTasks</code>
98       * resulted from instances of this class which chose to use the shared
99       * timer. Note that this <code>Timer</code> is started as a daemon thread so
100      * it will stop when there are no more non-daemon threads.
101      *
102      * @see #timer
103      */
104     private static final Timer               MAIN_TIMER = new Timer(TIMER_NAME, true);
105 
106     /**
107      * The timer instance for this instance. Can point to {@link #MAIN_TIMER} if
108      * shared timer was chosen in constructor or a newly created instance of
109      * <code>Timer</code> which is private to this instance only.
110      *
111      * @see #MAIN_TIMER
112      */
113     private Timer                            timer;
114 
115     /**
116      * Interval in milliseconds we flush the result of the "summary". This will
117      * be used to set up our <code>TimerTask</code> and schedule it with the
118      * <code>Timer</code>. Every time the timer kicks in after this interval, it
119      * will call {@link #timer()}. If this is set to a value of zero or less, no
120      * timer will be created.
121      */
122     private long                             interval;
123 
124     /**
125      * This is the task that is created when a new instance of this class is
126      * created. Once created this task will be scheduled with the {@link #timer}
127      * . Calling {@link #stop()} cancels this task and also will set it to null
128      * (so it can be recycled by the garbage collection), otherwise, until that
129      * point this will store a reference to a valid <code>TimerTask</code>
130      * instance.
131      */
132     private TimerTask                        task;
133 
134     /**
135      * Lock used internally to synchronize access to {@link #add(Object)},
136      * {@link #reset()} and {@link #evaluate()}. Locks for writing when
137      * {@link #add(Object)} and {@link #reset()} is called and for reading when
138      * {@link #evaluate()} is called.
139      *
140      * @see #add(Object)
141      * @see #evaluate()
142      * @see #reset()
143      */
144     private ReadWriteLock                    dataLock;
145 
146     /**
147      * Default constructor -- creates an instance of this aggregator with no
148      * <code>Timer</code>. Equivalent to
149      * <code>AbstractTimedAggregator(NO_TIMER)</code>.
150      *
151      * @see #AbstractTimedAggregator(long)
152      */
153     public AbstractTimedAggregator() {
154         this(NO_TIMER);
155     }
156 
157     /**
158      * Creates an aggregator which has a timer at the specified interval
159      * (miliseconds) and uses its own timer rather than the shared
160      * {@link #MAIN_TIMER}. Equivalent to
161      * <code>AbstractTimedAggregator(interval,false)</code>.
162      *
163      * @param interval
164      *            interval in miliseconds to set the timer for.
165      * @see #interval
166      * @see #timer
167      * @see #AbstractTimedAggregator(long, boolean)
168      */
169     public AbstractTimedAggregator(long interval) {
170         this(interval, false);
171     }
172 
173     /**
174      * Creates an aggregator which has a timer at the specified interval and
175      * also allows control over using the {@link #MAIN_TIMER shared timer} or
176      * its own per-instance timer.
177      *
178      * @param interval
179      *            interval in miliseconds to set the timer for.
180      * @param useSharedTimer
181      *            if set to <code>true</code>, {@link #timer} will be set to
182      *            {@link #TIMER_NAME}, otherwise a new instance of
183      *            <code>Timer</code> will be created.
184      */
185     public AbstractTimedAggregator(long interval, boolean useSharedTimer) {
186         if (interval <= NO_TIMER) {
187             // not using timer
188             this.interval = NO_TIMER;
189             this.timer = null;
190             this.task = null;
191             this.timerListeners = null;
192         } else {
193             // we have been requested to use timers
194             this.interval = interval;
195             this.timerListeners = new CopyOnWriteArrayList<TimedAggregatorListener<T>>();
196             if (useSharedTimer) {
197                 this.timer = MAIN_TIMER;
198             } else {
199                 this.timer = new Timer(TIMER_NAME + hashCode(), true);
200             }
201             // having set up the timer, create the task
202             this.task = new TimerTask() {
203                 @Override
204                 public void run() {
205                     timer();
206                 }
207             };
208             this.timer.scheduleAtFixedRate(this.task, this.interval, this.interval);
209         }
210         this.dataLock = new ReentrantReadWriteLock();
211     }
212 
213     /**
214      * Getter for {@link #interval}.
215      *
216      * @return Current value of {@link #interval}.
217      */
218     public final long getInterval() {
219         return interval;
220     }
221 
222     /**
223      * Adds the data to this aggregator. This function first locks
224      * {@link #dataLock} for writing then calls {@link #doAdd(Object)}, which
225      * allows subclasses to perform the actual adding to the aggregator and then
226      * at the end it unlocks {@link #dataLock}.
227      *
228      * @param data
229      *            Data to be added to the aggregator.
230      * @see #doAdd(Object)
231      * @see #dataLock
232      */
233     public final void add(T data) {
234         dataLock.writeLock().lock();
235         try {
236             doAdd(data);
237         } finally {
238             dataLock.writeLock().unlock();
239         }
240     }
241 
242     /**
243      * Function provided to allow subclasses to perform the actual adding of the
244      * data to the aggregator. This function is wrapped by {@link #add(Object)}
245      * so that access to any internal data series (implemented by subclasses)
246      * via {@link #add(Object)} or {@link #evaluate()} or {@link #reset()} is
247      * prohibited during this call, as a <b>write</b> lock is acquired prior to
248      * this function call to ensure this function is the only one which has
249      * access to the data.
250      *
251      * @param data
252      *            Data to be aggregated
253      * @see #add(Object)
254      */
255     protected abstract void doAdd(T data);
256 
257     /**
258      * Aggregates all the data this object has been "fed" via calls to
259      * {@link #add(Object)}. Note that this object delegates the call to
260      * {@link #doEvaluate()} after it secured read-only access to
261      * {@link #dataLock} -- so any data series access can be safely read
262      * (however, subclasses should NOT try to modify any data series they might
263      * implement at this point!). The lock is released after
264      * {@link #doEvaluate()} returns.
265      *
266      * @return result of aggregating the data, as returned by
267      *         {@link #doEvaluate()}
268      * @see #doEvaluate()
269      */
270     public final T evaluate() {
271         dataLock.readLock().lock();
272         try {
273             return doEvaluate();
274         } finally {
275             dataLock.readLock().unlock();
276         }
277     }
278 
279     /**
280      * Allows subclasses to perform the actual evaluation of the aggregated
281      * result in a thread-safe manner. When this function is called,
282      * <b>write</b> access to data (via {@link #add(Object)} and
283      * {@link #reset()}) is prohibited until this function finishes. However,
284      * please note that other read access (via calls to the same
285      * {@link #evaluate()}) is possible.
286      *
287      * @return Result of evaluating the aggregated data
288      */
289     protected abstract T doEvaluate();
290 
291     /**
292      * Resets this aggregator.This function first locks {@link #dataLock} for
293      * writing then calls {@link #doReset()}, which allows subclasses to perform
294      * the actual resetting of the aggregator and then at the end it unlocks
295      * {@link #dataLock}.
296      *
297      * @see #doReset()
298      */
299     public final void reset() {
300         dataLock.writeLock().lock();
301         try {
302             doReset();
303         } finally {
304             dataLock.writeLock().unlock();
305         }
306     }
307 
308     /**
309      * Function provided to allow subclasses to perform the actual reset of the
310      * aggregator. This function is wrapped by {@link #reset()} so that access
311      * to data (via {@link #add(Object)} or {@link #evaluate()} or
312      * {@link #reset()}) is prohibited during this call, as a <b>write</b> lock
313      * is acquired prior to this function call to ensure this function is the
314      * only one which has access to the data.
315      */
316     protected abstract void doReset();
317 
318     /**
319      * Retrieves the size of the currently-stored data series. This function
320      * first locks {@link #dataLock} for reading then calls
321      * {@link #retrieveDataSize()}, which allows subclasses to compute the data
322      * series size and then at the end it unlocks {@link #dataLock}.
323      *
324      * @return Size of the current data series, which will be aggregated at the
325      *         next call to {@link #evaluate()}
326      */
327     public final int getDataSize() {
328         dataLock.readLock().lock();
329         try {
330             return retrieveDataSize();
331         } finally {
332             dataLock.readLock().unlock();
333         }
334     }
335 
336     /**
337      * Function provided to allow subclasses to retrieve the actual size of the
338      * data series. This function is wrapped by {@link #getDataSize()} so that
339      * access to data (via {@link #add(Object)} or {@link #reset()}) is
340      * prohibited during this call, as a <b>read</b> lock is acquired prior to
341      * this function call. (However, calls to {@link #evaluate()} are allowed as
342      * that locks for reading too.)
343      *
344      * @return Size of the current data series. Zero means no data stored.
345      */
346     protected abstract int retrieveDataSize();
347 
348     /**
349      * Retrieves <b>an unmodifiable copy</b> of the {@link #timerListeners timer
350      * listeners}. Used for testing.
351      *
352      * @return <code>Collections.unmodifiableList(timerListeners)</code> if
353      *         {@link #timerListeners} is <b>not</b> <code>null</code>, or
354      *         <code>null</code> otherwise.
355      */
356     final List<TimedAggregatorListener<T>> getTimerListeners() {
357         if (timerListeners == null) {
358             return null;
359         }
360         return Collections.unmodifiableList(timerListeners);
361     }
362 
363     /**
364      * If this <code>Aggregator</code> has been started with timer support, it
365      * will add the given listener, so it receives
366      * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
367      * timer events}. If no timer support has been configured for this
368      * Aggregator, this call has no effect.
369      *
370      * @param listener
371      *            Listener to be added to received timer events from this
372      *            aggregator.
373      * @see #timerListeners
374      */
375     public final void addTimerListener(TimedAggregatorListener<T> listener) {
376         if (timerListeners == null) {
377             return;
378         }
379         timerListeners.add(listener);
380     }
381 
382     /**
383      * Removes a listener from the timer listeners list if previously added. If
384      * this Aggregator has been configured with no timer support, this call will
385      * always return <code>false</code>.
386      *
387      * @param listener
388      *            Listener to be removed from the list. NullPointerException
389      *            thrown if this is null.
390      * @return <code>true</code> if this Aggregator has timer support and the
391      *         listener passed in was previously added (via
392      *         {@link #addTimerListener(TimedAggregatorListener)}) or false if
393      *         either the Aggregator has no timer support or it has timer
394      *         support but the listener was never registered with this
395      *         Aggregator.
396      * @see #timerListeners
397      */
398     public final boolean removeTimerListener(TimedAggregatorListener<T> listener) {
399         if (timerListeners == null) {
400             return false;
401         }
402         return timerListeners.remove(listener);
403     }
404 
405     /**
406      * Computes the current aggregated value (by calling {@link #evaluate()},
407      * resets this aggregator then notifies all listeners. Go through all the
408      * {@link #timerListeners} and sends
409      * {@link TimedAggregatorListener#onTimer(AbstractTimedAggregator,Object)
410      * notification messages} to each of them. Does nothing if
411      * {@link #timerListeners} is <code>null</code>. Please note that
412      * {@link #evaluate()} is called only once at the beginning of this
413      * function, and only if there are listeners configured, then this value is
414      * passed to every notification. This is in order to ensure all listeners
415      * receive the same value -- the value of the evaluation prior to resetting
416      * it.
417      */
418     private void timer() {
419         if (timerListeners != null) {
420             // if we have listeners, notify them
421             T aggregated = evaluate(); // NOTE: shouldn't evaluate() and reset()
422                                        // be done atomically here?
423             reset();
424             for (TimedAggregatorListener<T> i : timerListeners) {
425                 i.onTimer(this, aggregated);
426             }
427         } else {
428             reset();
429         }
430     }
431 
432     /**
433      * Checks whether this instance has a timer associated with it or not. If
434      * there is a timer for this Aggregator, then the {@link #task} member
435      * should be set to a non-null value.
436      *
437      * @return <code>true</code> if {@link #task} is not null,
438      *         <code>false</code> otherwise (in which case there is no timer).
439      */
440     public final boolean isTimerEnabled() {
441         return (task != null);
442     }
443 
444     /**
445      * Checks whether this instance uses its own timer or {@link #MAIN_TIMER the
446      * shared timer} for scheduling {@link #task the timer task}.
447      *
448      * @return <code>true</code> if <code>timer == MAIN_TIMER</code> or
449      *         <code>false</code> otherwise.
450      */
451     public final boolean isSharedTimer() {
452         return (timer == MAIN_TIMER);
453     }
454 
455     /**
456      * Cancels the current timer task (if set) -- which means from there on the
457      * data will not be reset anymore. Also, if {@link #timer} is not set to
458      * {@link #MAIN_TIMER the shared timer} then it will be cancelled as well
459      * Also releases all the listeners from the {@link #timerListeners list}.
460      */
461     public final void stop() {
462         // cancel the task first
463         if (task != null) {
464             task.cancel();
465             task = null;
466             timer.purge(); // remove the reference to this task
467         }
468         // then the timer if needed
469         if (timer != null && timer != MAIN_TIMER) {
470             timer.cancel();
471         }
472         timer = null;
473         // finally remove the elements from the listeners list
474         if (timerListeners != null) {
475             timerListeners.clear();
476         }
477     }
478 
479     @Override
480     protected final void finalize() throws Throwable {
481         // if we're going in the garbage, make sure we clean up
482         stop();
483     }
484 
485     @Override
486     public String toString() {
487         return AbstractTimedAggregator.class.getName();
488     }
489 }