Innovenergy_trunk/csharp/lib/S3/Drivers/S3Driver.cs

84 lines
2.3 KiB
C#

using Flurl.Http;
using InnovEnergy.S3.Drivers.Internal;
using InnovEnergy.S3.Drivers.Internal.Delegates;
using InnovEnergy.S3.Metadata;
using InnovEnergy.S3.Records.Serialization;
using InnovEnergy.S3.Records.Specialized;
using InnovEnergy.Time.Unix;
namespace InnovEnergy.S3.Drivers;
using Levels = IReadOnlyList<AggregationLevel>;
using Memory = Dictionary<String, TimeStampedRecord>;
public class S3Driver : Writer
{
public S3Driver(Levels levels, UnixTime startTime, S3Config config) :
base(
startTime,
levels,
ReadRecord(config),
WriteRecord(config)
)
{
}
private static String GetS3Path(AggregationLevel level, UInt32 index)
{
return $"/{level}/{index}";
}
private static ReadRecord ReadRecord(S3Config config)
{
async Task<TimeStampedRecord> Read(AggregationLevel level, UInt32 index)
{
var s3Path = GetS3Path(level, index);
var request = config.CreateGetRequest(s3Path);
var response = await request.GetAsync();
if (response.StatusCode != 200)
{
Console.WriteLine("ERROR: Get " + s3Path);
var error = await response.GetStringAsync();
Console.WriteLine(error);
return TimeStampedRecord.Empty();
}
var payload = await response.GetBytesAsync();
Console.WriteLine("GET " + s3Path);
return Parser.ParseTimeStampedRecord(payload);
}
return Read;
}
private static WriteRecord WriteRecord(S3Config config)
{
async Task Write(TimeStampedRecord record, AggregationLevel level, UInt32 index)
{
var payload = record.Serialize().ToArray();
var s3Path = GetS3Path(level, index);
var request = config.CreatePutRequest(s3Path);
var response = await request.PutAsync(new ByteArrayContent(payload));
if (response.StatusCode == 200)
{
//Console.WriteLine("PUT " + s3Path);
}
else
{
Console.WriteLine("ERROR: PUT");
var error = await response.GetStringAsync();
Console.WriteLine(error);
}
}
return Write;
}
}