129 lines
3.8 KiB
C#
129 lines
3.8 KiB
C#
using System.Diagnostics;
|
|
using InnovEnergy.S3.Drivers.Internal.Delegates;
|
|
using InnovEnergy.S3.Drivers.Internal.Util;
|
|
using InnovEnergy.S3.Metadata;
|
|
using InnovEnergy.S3.Records;
|
|
using InnovEnergy.S3.Records.Specialized;
|
|
using InnovEnergy.Time.Unix;
|
|
using InnovEnergy.Lib.Utils;
|
|
|
|
namespace InnovEnergy.S3.Drivers.Internal;
|
|
|
|
using AggregationLevels = IReadOnlyList<AggregationLevel>;
|
|
using Aggregators = IReadOnlyList<Aggregator>;
|
|
|
|
public partial class Writer : Reader, IDisposable
|
|
{
|
|
private Sampler Sampler { get; }
|
|
private Aggregators Aggregators { get; }
|
|
|
|
private readonly WriteRecord _WriteRecord;
|
|
|
|
internal Writer(UnixTime startTime,
|
|
AggregationLevels levels,
|
|
ReadRecord readRecord,
|
|
WriteRecord writeRecord) : base(levels, readRecord)
|
|
{
|
|
_WriteRecord = writeRecord;
|
|
startTime = AggregationLevels.First().GetPeriodStartTime(startTime);
|
|
Sampler = InitializeSampler(startTime);
|
|
Aggregators = InitializeAggregators(startTime);
|
|
}
|
|
|
|
private IReadOnlyList<Aggregator> InitializeAggregators(UnixTime startTime)
|
|
{
|
|
return AggregationLevels
|
|
.Pairwise()
|
|
.SelectTuple((lo, hi) => InitializeAggregator(lo, hi, startTime))
|
|
.ToReadOnlyList(AggregationLevels.Count - 1);
|
|
}
|
|
|
|
private Aggregator InitializeAggregator(AggregationLevel lo,
|
|
AggregationLevel hi,
|
|
UnixTime currentTime)
|
|
{
|
|
// This was a REAL brainfuck to get right
|
|
|
|
var loStartTime = lo.GetPeriodStartTime(currentTime);
|
|
var hiStartTime = hi.GetPeriodStartTime(currentTime);
|
|
|
|
Debug.Assert(hiStartTime <= loStartTime);
|
|
|
|
var initialRecords = lo
|
|
.RangeExclusive(hiStartTime, loStartTime)
|
|
.Select(t => ReadRecord(lo, t))
|
|
.WhenAll()
|
|
.Result
|
|
.Select(r => r.Record);
|
|
|
|
return new Aggregator(hi, lo, initialRecords);
|
|
}
|
|
|
|
private Sampler InitializeSampler(UnixTime startTime)
|
|
{
|
|
var samplerLevel = AggregationLevels.First();
|
|
var initialRecord = ReadRecord(samplerLevel, startTime).Result.Record;
|
|
|
|
return new Sampler(samplerLevel, initialRecord, startTime);
|
|
}
|
|
|
|
private IEnumerable<AggregatedRecord> Aggregate(Record record, UnixTime timeStamp)
|
|
{
|
|
return Sampler
|
|
.Sample(record, timeStamp)
|
|
.SelectMany(Aggregate);
|
|
}
|
|
|
|
private IEnumerable<AggregatedRecord> Aggregate(AggregatedRecord r)
|
|
{
|
|
yield return r;
|
|
|
|
var record = r.Record;
|
|
|
|
foreach (var a in Aggregators)
|
|
{
|
|
record = a.Aggregate(record);
|
|
if (record is null) break;
|
|
|
|
var timeStamp = a.AggregationLevel.GetPeriodStartTime(r.TimeStamp);
|
|
yield return new AggregatedRecord(record, a.AggregationLevel, timeStamp);
|
|
}
|
|
}
|
|
|
|
private Task WriteRecord(AggregatedRecord rec)
|
|
{
|
|
var tsr = rec.ToTimeStamped();
|
|
var level = rec.AggregationLevel;
|
|
var index = level.GetRetentionIndex(rec.TimeStamp);
|
|
|
|
return _WriteRecord(tsr, level, index);
|
|
}
|
|
|
|
void IDisposable.Dispose()
|
|
{
|
|
DisposeAsync().Wait();
|
|
}
|
|
|
|
private async Task DisposeAsync()
|
|
{
|
|
// feed the sampler an empty "next" record, so it writes and aggregates the current one.
|
|
await WriteRecord(Record.Empty, Sampler.CurrentTimeStamp + SamplePeriod);
|
|
|
|
foreach (var a in Aggregators)
|
|
{
|
|
// force and write incomplete aggregation for each level
|
|
|
|
var agg = a.ForceAggregation();
|
|
if (agg is null)
|
|
continue;
|
|
|
|
var lev = a.AggregationLevel;
|
|
var ts = lev.GetPeriodStartTime(Sampler.CurrentTimeStamp);
|
|
var tsr = agg!.TimeStamped(ts);
|
|
var idx = lev.GetRetentionIndex(ts);
|
|
|
|
await _WriteRecord(tsr, lev, idx);
|
|
}
|
|
}
|
|
|
|
} |