import { RpcError, ServerStreamingCall } from '@protobuf-ts/runtime-rpc';
import {
  QueryKey,
  QueryOptions,
  useQuery,
  useQueryClient,
  UseQueryOptions,
  UseQueryResult,
} from '@tanstack/react-query';
import { useCallback, useRef } from 'react';

const ERROR_NO_DATA = 'NO_DATA';
const ERROR_CANCELLED = 'CANCELLED';

const defaultStreamingQueryOptions: QueryOptions = {
  retry: true,
  retryDelay: (attemptIndex) => Math.min(1000 * 2 ** attemptIndex, 10000),
};

export function useStreamingQuery<TRequest extends object, TData extends object, TQueryKey extends QueryKey = QueryKey>(
  queryKey: TQueryKey,
  func: (abort: AbortSignal | undefined) => ServerStreamingCall<TRequest, TData>,
  options?: Omit<UseQueryOptions<TData, RpcError, TData, TQueryKey>, 'queryKey'>
): UseQueryResult<TData, RpcError> & {
  abort: () => void;
} {
  const queryClient = useQueryClient();

  const abortCtrlRef = useRef<AbortController | undefined>(undefined);
  if (abortCtrlRef.current === undefined) abortCtrlRef.current = new AbortController();

  const abortCallback = useCallback(() => {
    if (abortCtrlRef.current) {
      abortCtrlRef.current.abort();
      abortCtrlRef.current = undefined;
    }
  }, []);

  const optionsWithDefaults: Omit<UseQueryOptions<TData, RpcError, TData, TQueryKey>, 'queryKey'> = {
    ...(defaultStreamingQueryOptions as QueryOptions<TData, RpcError, TData>),
    ...(options ?? {}),
  };

  const queryResult = useQuery<TData, RpcError, TData, TQueryKey>(
    queryKey,
    async (): Promise<TData> => {
      const ensureHasData = (data: TData | undefined): TData => {
        if (!data) {
          throw new Error(ERROR_NO_DATA);
        }

        return data;
      };

      const callResponse = func(abortCtrlRef.current?.signal);
      let lastData: TData | undefined = undefined;

      try {
        callResponse.responses.onMessage((data) => {
          queryClient.setQueryData<TData>(queryKey, data);
          lastData = data;
        });

        await callResponse;

        return ensureHasData(lastData);
      } catch (error: unknown) {
        if (error instanceof RpcError && error.code === ERROR_CANCELLED) {
          return ensureHasData(lastData);
        } else {
          throw error;
        }
      }
    },
    optionsWithDefaults
  );

  return { ...queryResult, abort: abortCallback };
}
