//..LeapArray public WindowWrap<T> currentWindow() { // 获取当前时间所在的样本窗口 return currentWindow(TimeUtil.currentTimeMillis()); } //------------------------------------------------------------ public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { returnnull; } // 计算当前时间所在的样本窗口id,即在计算数组LeapArray中的索引 intidx= calculateTimeIdx(timeMillis); // Calculate current bucket start time. // 计算当前样本窗口的开始时间点 longwindowStart= calculateWindowStart(timeMillis); ..... }
在这里我们先分析calculateTimeIdx方法
1 2 3 4 5 6 7
privateintcalculateTimeIdx(/*@Valid*/long timeMillis) { // 计算当前时间在那个样本窗口(样本窗口下标),当前时间/样本窗口长度 longtimeId= timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. // 计算具体索引,这个array就是装样本窗口的数组 return (int)(timeId % array.length()); }
while (true) { // 获取到当前时间所在的样本窗口 WindowWrap<T> old = array.get(idx); // 如果获取不到,表示没有创建 if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ // 创建新的时间窗口 WindowWrap<T> window = newWindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); // 通过CAS方式将新建窗口放入Array if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 若当前样本窗口的起始时间点与计算出的样本窗口起始点相同,则说明两个是同一个样本窗口 } elseif (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; // 若当前样本窗口的起始时间点 大于 计算出的样本窗口起始时间点,说明计算出的样本窗口已经过时了, // 需要将原来的样本窗口替换 } elseif (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. // 替换掉老的样本窗口 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 当前样本窗口的起始时间点 小于 计算出的样本窗口起始时间点, // 这种情况一般不会出现,因为时间不会倒流。除非人为修改了系统时钟 } elseif (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. returnnewWindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } }
这里的原理如下:
具体是如何替换的拿,我们来看源码
1 2 3 4 5 6 7 8 9 10 11 12 13
// 替换掉老的样本窗口 return resetWindowTo(old, windowStart); //------------------------------------------------------------ // BucketLeapArray.resetWindowTo @Override protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) { // Update the start time and reset value. // 更新窗口起始时间 w.resetTo(startTime); // 将多维度统计数据清零 w.value().reset(); return w; }
更新数据分析
1 2 3 4 5 6 7 8
public MetricBucket reset() { // 将每个维度的统计数据清零 for (MetricEvent event : MetricEvent.values()) { counters[event.ordinal()].reset(); } initMinRt(); returnthis; }