gooddata-js v13.5.0
File: src/DataLayer/utils/async.ts
// (C) 2007-2020 GoodData Corporation
import identity from "lodash/identity";
import { Subject, Subscription } from "rxjs";
import { catchError, switchMap } from "rxjs/operators";
export { Subscription };
export type StreamSuccessHandler<T> = (result: T) => void;
export type StreamErrorHandler = (error: any) => void;
export interface ISubject<T> {
next: (promise: T) => void;
unsubscribe: () => void;
}
/**
* Creates infinite stream
* Usage:
* const subject = createSubject(
* (result) => console.log('Success:', result),
* (error) => console.error('Error:', error)
* );
* subject.next(promise1);
* subject.next(promise2);
*
* subject.unsubscribe();
*
* @method createSubject
* @param {StreamSuccessHandler<T>} successHandler
* @param {StreamErrorHandler} errorHandler
* @return {ISubject<Promise<T>>}
*/
export function createSubject<T>(
successHandler: StreamSuccessHandler<T>,
errorHandler: StreamErrorHandler,
): ISubject<Promise<T>> {
const subject = new Subject<Promise<T>>();
const subscription = subject
.pipe(
// This ensures we get last added promise
switchMap(identity),
// Streams are closed on error by default so we need this workaround
catchError((error, caught) => {
errorHandler(error); // handle error
return caught; // stream continue
}),
)
.subscribe(successHandler);
const wrapper: ISubject<Promise<T>> = {
next: (promise: Promise<T>) => {
subject.next(promise);
},
unsubscribe: () => {
subscription.unsubscribe();
subject.unsubscribe();
},
};
return wrapper;
}