using System.Diagnostics;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace InnovEnergy.Lib.Utils;

public static class ObservableUtils
{

    public static IObservable<R> TrySelect<T, R>(this IObservable<T> source, Func<T, R> map)
    {
        
        return Observable.Create<R>(o =>
        {
            void OnNext(T next)
            {
                var success = false;
                R result = default!;

                try
                {
                    result = map(next);
                    success = true;
                }
                catch
                {
                    // ignored
                }

                if (success)
                    o.OnNext(result!);
            }

            return source.Subscribe(OnNext, o.OnError, o.OnCompleted);
            
        });
    }

    public static IObservable<Exception> MergeErrors(params IObservable<Object>[] sources)
    {
        return Observable.Merge(sources.Select(s=>s.SelectError())).Take(1);
    }
    
    public static IObservable<Exception> MergeErrors<L,R>(this IObservable<L> fst, IObservable<R> snd)
    {
        return Observable.Merge(fst.SelectError(), snd.SelectError()).Take(1);
    }
    
    public static IObservable<Exception> SelectErrors(this IObservable<Exception> source)
    {
        return source
              .Materialize()
              .Where(n => n.Kind != NotificationKind.OnCompleted)
              .Select(n => n.Kind == NotificationKind.OnError ? n.Exception! : n.Value);
    }
    
    public static IObservable<Exception> SelectError<TSource>(this IObservable<TSource> source)
    {
        if (typeof(TSource).IsAssignableTo(typeof(Exception)))
            return source.Select(s =>
            {   
                Console.WriteLine(s);
                return (s as Exception)!;
            }).Take(1);

        return source
              .Materialize()
              .Where(n => n.Kind == NotificationKind.OnError)
              .Select(n => n.Exception!);
    }
    


    public static IObservable<TSource> OnDispose<TSource>(this IObservable<TSource> source, Action onDispose)
    {
        return Observable
            .Create<TSource>(observer =>
            {
                var subscription = source.Subscribe(observer);
                return () =>
                {
                    lock (onDispose)
                    {
                        subscription.Dispose();
                        onDispose();
                    }
                };
            });
    }

    public static IObservable<TSource> OnSubscribe<TSource>(this IObservable<TSource> source, Action onSubscribe)
    {
        return Observable
            .Create<TSource>(observer =>
            {
                var subscription = source.Subscribe(observer);
                onSubscribe();
                return () => subscription.Dispose();
            });
    }


    public static IObservable<T> Repeat<T>(this IScheduler scheduler, Func<T> func)
    {
        return Observable
            .Repeat(0, scheduler)
            .Select(_ => func());
    }


    public static IObservable<T> LazyAsync<T>(Func<IObservable<T>> observableFactory)
    {
        IObservable<T> FromObservable() => observableFactory().FirstAsync();

        return Observable
            .Defer(FromObservable)
            .Replay(1)
            .RefCount();
    }


    public static IObservable<T> LazyAsync<T>(Func<Task<T>> taskFactory)
    {
        IObservable<T> FromAsync() => Observable.FromAsync(taskFactory);

        return Observable
            .Defer(FromAsync)
            .Replay(1)
            .RefCount();
    }

    // public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source,
    //                                                             TimeSpan duration,
    //                                                           IScheduler scheduler)
    // {
    //     scheduler ??= Scheduler.Default;
    //
    //     return source
    //           .GroupByUntil(k => k, _ => Observable.Timer(duration, scheduler))
    //           .SelectMany(Observable.FirstAsync);
    // }
    //
    //
    // public static IObservable<T> ThrottleDistinct<T,K>(this IObservable<T> source,
    //                                                              Func<T,K> keySelector,
    //                                                               TimeSpan duration,
    //                                                             IScheduler scheduler)
    // {
    //
    //
    //     scheduler ??= Scheduler.Default;
    //
    //     return source
    //           .GroupByUntil(keySelector, _ => Observable.Timer(duration, scheduler))
    //           .SelectMany(Observable.FirstAsync);
    // }

    public static IObservable<T> ThrottleDistinct<T, K>(this IObservable<T> source,
        Func<T, K> keySelector,
        TimeSpan duration,
        IScheduler? scheduler = null)
    {
        scheduler ??= Scheduler.Default;

        var hs = new HashSet<K>();

        Boolean Add(K key)
        {
            lock (hs)
                return hs.Add(key);
        }

        void Remove(K key)
        {
            lock (hs)
            {
                var removed = hs.Remove(key);
                Debug.Assert(removed);
            }
        }

        Boolean Filter(K key)
        {
            var added = Add(key);

            if (added)
                scheduler.Schedule(duration, _ => Remove(key));

            return added;
        }

        return source.Where(t => Filter(keySelector(t)));
    }


    public static IObservable<T> ThrottleDistinct<T>(this IObservable<T> source,
        TimeSpan duration,
        IScheduler? scheduler = null)
    {
        scheduler ??= Scheduler.Default;

        var hs = new HashSet<T>();

        Boolean Add(T key)
        {
            lock (hs)
                return hs.Add(key);
        }

        void Remove(T key)
        {
            lock (hs)
            {
                var removed = hs.Remove(key);
                Debug.Assert(removed);
            }
        }

        Boolean Filter(T key)
        {
            var added = Add(key);

            if (added)
                scheduler.Schedule(duration, _ => Remove(key));

            return added;
        }

        return source.Where(Filter);
    }


    public static IObservable<IList<T>> BufferBy<T, X>(this IObservable<T> src, Func<T, X> keySelector)
    {
        return src.Publish(s =>
        {
            var boundaries = s.Select(keySelector).DistinctUntilChanged();
            return s.Buffer(boundaries);
        });
    }

    public static IObservable<Double> Integrate(this IObservable<Double> source, Double initConst = 0)
    {
        return source.Scan(initConst, (x, y) => x + y);
    }

    public static IObservable<Double> IntegrateNormalize(this IObservable<Double> source, Double initConst = 0)
    {
        return source.Scan(initConst, (x, y) => Math.Tanh(x + y));
    }


    // TODO
    // public static IObservable<Double> MovingAverage(this IObservable<Double> source, Int32 windowSize)
    // {
    //     if (windowSize < 1)
    //         throw new ArgumentException(nameof(windowSize) + " must be positive", nameof(windowSize));
    //
    //     using var e = source.GetEnumerator();
    //
    //     if (!e.MoveNext()) yield break;
    //
    //     var state = e.Current;
    //
    //     do
    //     {
    //         state = ((windowSize - 1) * state + e.Current) / windowSize;
    //         yield return state;
    //     }
    //     while (e.MoveNext());
    // }
}