Innovenergy_trunk/csharp/Lib/S3/Drivers/Internal/Writer.cs

129 lines
3.9 KiB
C#

using System.Diagnostics;
using InnovEnergy.Lib.S3.Drivers.Internal.Delegates;
using InnovEnergy.Lib.S3.Drivers.Internal.Util;
using InnovEnergy.Lib.S3.Metadata;
using InnovEnergy.Lib.S3.Records;
using InnovEnergy.Lib.S3.Records.Specialized;
using InnovEnergy.Lib.Time.Unix;
using InnovEnergy.Lib.Utils;
namespace InnovEnergy.Lib.S3.Drivers.Internal;
using AggregationLevels = IReadOnlyList<AggregationLevel>;
using Aggregators = IReadOnlyList<Aggregator>;
public partial class Writer : Reader, IDisposable
{
private Sampler Sampler { get; }
private Aggregators Aggregators { get; }
private readonly WriteRecord _WriteRecord;
internal Writer(UnixTime startTime,
AggregationLevels levels,
ReadRecord readRecord,
WriteRecord writeRecord) : base(levels, readRecord)
{
_WriteRecord = writeRecord;
startTime = AggregationLevels.First().GetPeriodStartTime(startTime);
Sampler = InitializeSampler(startTime);
Aggregators = InitializeAggregators(startTime);
}
private IReadOnlyList<Aggregator> InitializeAggregators(UnixTime startTime)
{
return AggregationLevels
.Pairwise()
.SelectTuple((lo, hi) => InitializeAggregator(lo, hi, startTime))
.ToReadOnlyList(AggregationLevels.Count - 1);
}
private Aggregator InitializeAggregator(AggregationLevel lo,
AggregationLevel hi,
UnixTime currentTime)
{
// This was a REAL brainfuck to get right
var loStartTime = lo.GetPeriodStartTime(currentTime);
var hiStartTime = hi.GetPeriodStartTime(currentTime);
Debug.Assert(hiStartTime <= loStartTime);
var initialRecords = lo
.RangeExclusive(hiStartTime, loStartTime)
.Select(t => ReadRecord(lo, t))
.WhenAll()
.Result
.Select(r => r.Record);
return new Aggregator(hi, lo, initialRecords);
}
private Sampler InitializeSampler(UnixTime startTime)
{
var samplerLevel = AggregationLevels.First();
var initialRecord = ReadRecord(samplerLevel, startTime).Result.Record;
return new Sampler(samplerLevel, initialRecord, startTime);
}
private IEnumerable<AggregatedRecord> Aggregate(Record record, UnixTime timeStamp)
{
return Sampler
.Sample(record, timeStamp)
.SelectMany(Aggregate);
}
private IEnumerable<AggregatedRecord> Aggregate(AggregatedRecord r)
{
yield return r;
var record = r.Record;
foreach (var a in Aggregators)
{
record = a.Aggregate(record);
if (record is null) break;
var timeStamp = a.AggregationLevel.GetPeriodStartTime(r.TimeStamp);
yield return new AggregatedRecord(record, a.AggregationLevel, timeStamp);
}
}
private Task WriteRecord(AggregatedRecord rec)
{
var tsr = rec.ToTimeStamped();
var level = rec.AggregationLevel;
var index = level.GetRetentionIndex(rec.TimeStamp);
return _WriteRecord(tsr, level, index);
}
void IDisposable.Dispose()
{
DisposeAsync().Wait();
}
private async Task DisposeAsync()
{
// feed the sampler an empty "next" record, so it writes and aggregates the current one.
await WriteRecord(Record.Empty, Sampler.CurrentTimeStamp + SamplePeriod);
foreach (var a in Aggregators)
{
// force and write incomplete aggregation for each level
var agg = a.ForceAggregation();
if (agg is null)
continue;
var lev = a.AggregationLevel;
var ts = lev.GetPeriodStartTime(Sampler.CurrentTimeStamp);
var tsr = agg!.TimeStamped(ts);
var idx = lev.GetRetentionIndex(ts);
await _WriteRecord(tsr, lev, idx);
}
}
}