using System.Diagnostics; using InnovEnergy.S3.Drivers.Internal.Delegates; using InnovEnergy.S3.Drivers.Internal.Util; using InnovEnergy.S3.Metadata; using InnovEnergy.S3.Records; using InnovEnergy.S3.Records.Specialized; using InnovEnergy.Time.Unix; using InnovEnergy.Lib.Utils; namespace InnovEnergy.S3.Drivers.Internal; using AggregationLevels = IReadOnlyList; using Aggregators = IReadOnlyList; public partial class Writer : Reader, IDisposable { private Sampler Sampler { get; } private Aggregators Aggregators { get; } private readonly WriteRecord _WriteRecord; internal Writer(UnixTime startTime, AggregationLevels levels, ReadRecord readRecord, WriteRecord writeRecord) : base(levels, readRecord) { _WriteRecord = writeRecord; startTime = AggregationLevels.First().GetPeriodStartTime(startTime); Sampler = InitializeSampler(startTime); Aggregators = InitializeAggregators(startTime); } private IReadOnlyList InitializeAggregators(UnixTime startTime) { return AggregationLevels .Pairwise() .SelectTuple((lo, hi) => InitializeAggregator(lo, hi, startTime)) .ToReadOnlyList(AggregationLevels.Count - 1); } private Aggregator InitializeAggregator(AggregationLevel lo, AggregationLevel hi, UnixTime currentTime) { // This was a REAL brainfuck to get right var loStartTime = lo.GetPeriodStartTime(currentTime); var hiStartTime = hi.GetPeriodStartTime(currentTime); Debug.Assert(hiStartTime <= loStartTime); var initialRecords = lo .RangeExclusive(hiStartTime, loStartTime) .Select(t => ReadRecord(lo, t)) .WhenAll() .Result .Select(r => r.Record); return new Aggregator(hi, lo, initialRecords); } private Sampler InitializeSampler(UnixTime startTime) { var samplerLevel = AggregationLevels.First(); var initialRecord = ReadRecord(samplerLevel, startTime).Result.Record; return new Sampler(samplerLevel, initialRecord, startTime); } private IEnumerable Aggregate(Record record, UnixTime timeStamp) { return Sampler .Sample(record, timeStamp) .SelectMany(Aggregate); } private IEnumerable Aggregate(AggregatedRecord r) { yield return r; var record = r.Record; foreach (var a in Aggregators) { record = a.Aggregate(record); if (record is null) break; var timeStamp = a.AggregationLevel.GetPeriodStartTime(r.TimeStamp); yield return new AggregatedRecord(record, a.AggregationLevel, timeStamp); } } private Task WriteRecord(AggregatedRecord rec) { var tsr = rec.ToTimeStamped(); var level = rec.AggregationLevel; var index = level.GetRetentionIndex(rec.TimeStamp); return _WriteRecord(tsr, level, index); } void IDisposable.Dispose() { DisposeAsync().Wait(); } private async Task DisposeAsync() { // feed the sampler an empty "next" record, so it writes and aggregates the current one. await WriteRecord(Record.Empty, Sampler.CurrentTimeStamp + SamplePeriod); foreach (var a in Aggregators) { // force and write incomplete aggregation for each level var agg = a.ForceAggregation(); if (agg is null) continue; var lev = a.AggregationLevel; var ts = lev.GetPeriodStartTime(Sampler.CurrentTimeStamp); var tsr = agg!.TimeStamped(ts); var idx = lev.GetRetentionIndex(ts); await _WriteRecord(tsr, lev, idx); } } }