Innovenergy_trunk/csharp/Lib/Channels/V2/StreamChannel.cs

124 lines
3.0 KiB
C#

namespace InnovEnergy.Lib.Channels.V2;
public class StreamChannel : Stream, IStreamChannel
{
private readonly Func<Stream> _Open;
private readonly Boolean _Dispose;
private UInt32 _ReadBufferSize;
private Stream? _Stream;
public StreamChannel(Stream stream, UInt32 readBufferSize = 64)
{
_Open = () => stream;
_Dispose = false;
_ReadBufferSize = readBufferSize;
}
public StreamChannel(Func<Stream> open, UInt32 readBufferSize = 64)
{
_Open = open;
_Dispose = true;
_ReadBufferSize = readBufferSize;
}
private Stream Open()
{
return _Stream ??= _Open();
}
public async Task<IReadOnlyList<Byte>> Read()
{
try
{
var stream = Open();
var buffer = new Byte[_ReadBufferSize]; // not reused!
var nRead = await stream.ReadAsync(buffer, 0, buffer.Length);
if (nRead >= _ReadBufferSize)
_ReadBufferSize *= 2; // buffer was too small, double size
else if (nRead <= 0)
throw new IOException("End of stream reached!");
return new ArraySegment<Byte>(buffer, 0, nRead);
}
catch
{
if (_Dispose && _Stream is not null)
{
var stream = _Stream;
_Stream = null;
await stream.DisposeAsync();
}
throw;
}
}
public Task Write(IReadOnlyList<Byte> tx)
{
throw new NotImplementedException();
}
public override Int32 Read(Byte[] buffer, Int32 offset, Int32 count)
{
try
{
return Open().Read(buffer, offset, count);
}
catch
{
if (_Dispose && _Stream is not null)
{
_Stream.Dispose();
_Stream = null;
}
throw;
}
}
public override void Write(Byte[] buffer, Int32 offset, Int32 count)
{
try
{
Open().Write(buffer, offset, count);
}
catch
{
if (_Dispose && _Stream is not null)
{
_Stream.Dispose();
_Stream = null;
}
throw;
}
}
public override Int64 Seek(Int64 offset, SeekOrigin origin) => throw new NotImplementedException();
public override void SetLength(Int64 value) => throw new NotImplementedException();
public override Boolean CanRead => true;
public override Boolean CanWrite => true;
public override Boolean CanSeek => false;
public override Int64 Length => throw new NotImplementedException();
public override Int64 Position
{
get => throw new NotImplementedException();
set => throw new NotImplementedException();
}
public override void Flush(){}
}