Programing Language/JS & TS

RxJS 파이프라인 연산자

칼쵸쵸 2024. 3. 24. 20:34

pipe

RxJS의 pipe 함수는 여러 개의 연산자를 함께 사용하여 복잡한 데이터 흐름을 쉽게 관리할 수 있도록 도와줍니다. pipe 함수는 Observable에 메서드 체인으로 연산자를 적용하고, 최종적으로 새로운 Observable을 반환합니다. 이 방법을 통해, 입력 Observable로부터 출력 Observable까지 데이터가 어떻게 변형되고 처리되는지 명확하게 볼 수 있습니다.

import { interval } from 'rxjs';
import { filter, map, take } from 'rxjs/operators';

const observable = interval(1000).pipe(
  filter(value => value % 2 === 0), // 짝수만 통과
  map(value => value * value), // 제곱으로 변환
  take(5) // 처음 5개 값만 처리
);

observable.subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed!')
});

이 코드를 실행하면, 1초 간격으로 0부터 시작하는 숫자가 발생하고, 이 중 짝수만 필터링되어 그 제곱 값이 출력됩니다. 즉, 0(0의 제곱), 4(2의 제곱), 16(4의 제곱), 36(6의 제곱), 64(8의 제곱)가 차례로 출력되고, take 연산자에 의해 5개의 값이 처리된 후에는 'Completed!'가 출력되면서 Observable이 완료됩니다.

pipe 함수는 RxJS에서 매우 강력한 도구입니다. 다양한 연산자를 조합함으로써 복잡한 비동기 및 이벤트 기반 로직을 쉽고 깔끔하게 처리할 수 있습니다.

 

map 

RxJS의 map 연산자는 Observable이 방출하는 각 항목에 함수를 적용하고, 그 결과를 새로운 Observable로 방출하는 연산자입니다. 이는 JavaScript의 배열에서 사용되는 Array.prototype.map() 함수와 유사한 개념으로, 배열의 각 요소를 변환하는 대신 Observable이 방출하는 각 값에 대해 변환을 수행합니다.
 
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// 숫자 배열을 방출하는 Observable
const numbers$ = of(1, 2, 3, 4, 5);

// 각 숫자를 제곱하여 방출하는 Observable로 변환
const squaredNumbers$ = numbers$.pipe(
  map(value => value * value)
);

// 결과 출력
squaredNumbers$.subscribe({
  next: value => console.log(value),
  complete: () => console.log('Completed!')
});
 

이 코드를 실행하면, 원래의 숫자 1, 2, 3, 4, 5가 각각 1, 4, 9, 16, 25로 변환되어 출력됩니다. 마지막으로, 'Completed!'가 출력되며 Observable의 완료를 알립니다.

 

사용 시 주의 사항

  • map 연산자는 각 방출 값에 대해 동일한 변환 로직을 적용합니다. 따라서, 변환 로직이 복잡하거나 조건에 따라 다른 로직을 적용해야 할 경우, 다른 연산자(예: mergeMap, switchMap, concatMap)를 고려해야 할 수 있습니다.
  • map을 사용할 때는 함수가 예외 없이 안전하게 값을 반환하도록 주의해야 합니다. 가능한 오류를 catchError 연산자로 처리하는 것이 좋습니다.

map은 RxJS에서 데이터를 변환하고 조작할 때 가장 기본적이고 자주 사용되는 연산자 중 하나입니다.

 

tab

import { of } from 'rxjs';
import { tap } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);

const example = source.pipe(
  tap(value => console.log('Value before processing:', value)),
  map(value => value * 2), // 예제로 map 연산자 사용
  tap(value => console.log('Value after processing:', value))
);

example.subscribe(val => console.log('Final Value:', val));
 
 

위의 코드에서 tap 함수는 옵저버블이 값을 방출할 때마다 로그를 출력합니다. tap 함수는 옵저버블이 값을 수정하지 않고 오직 부수 효과를 수행합니다. 따라서 tap을 사용하여 디버깅, 로깅 또는 값의 변환 없이 옵저버블의 상태를 확인할 수 있습니다.

일반적으로 tap 함수는 로깅이나 디버깅 용도로 많이 사용됩니다. 하지만 반드시 부수 효과만을 수행하는 것이 아니라, 가독성을 위해 tap을 사용하여 옵저버블의 상태를 확인하는 것도 가능합니다. 단, tap 함수가 사용자 상호작용이나 동시성 문제를 발생시키지 않도록 주의해야 합니다.

 

filter

옵저버블이 방출하는 값 중에서 특정 조건을 만족하는 값만 허용합니다.

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);

const example = source.pipe(
  filter(value => value % 2 === 0) // 짝수만 허용
);

example.subscribe(val => console.log(val)); // 결과: 2, 4

 

mergeMap / flatMap

옵저버블이 방출하는 각 값에 대해 내부 옵저버블을 생성하고 병합합니다.

import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const source = of('a', 'b', 'c');

const example = source.pipe(
  mergeMap(value => of(value + '1', value + '2'))
);

example.subscribe(val => console.log(val)); // 결과: a1, a2, b1, b2, c1, c2

 

concatMap

내부 옵저버블을 순차적으로 처리합니다.

import { of } from 'rxjs';
import { concatMap, delay } from 'rxjs/operators';

const source = of(2000, 1000, 3000);

const example = source.pipe(
  concatMap(value => of(`Delayed by: ${value}ms`).pipe(delay(value)))
);

example.subscribe(val => console.log(val)); // 결과: 1000ms 후 'Delayed by: 1000ms', 2000ms 후 'Delayed by: 2000ms', 3000ms 후 'Delayed by: 3000ms'

 

switchMap

새로운 내부 옵저버블을 생성할 때 이전 내부 옵저버블을 취소합니다.

import { of, interval } from 'rxjs';
import { switchMap, take } from 'rxjs/operators';

const source = of(1000, 2000);

const example = source.pipe(
  switchMap(value => interval(value).pipe(take(2)))
);

example.subscribe(val => console.log(val)); // 결과: 0, 0, 0, 1

 

scan

옵저버블이 방출하는 값을 일련의 누적된 값으로 집계합니다.

import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);

const example = source.pipe(
  scan((acc, curr) => acc + curr, 0) // 초기값 0으로 시작하고 각 값들을 누적하여 더함
);

example.subscribe(val => console.log(val)); // 결과: 1, 3, 6, 10, 15

 

combineLatest

여러 옵저버블에서 가장 최신 값을 결합합니다.

import { combineLatest, of } from 'rxjs';

const source1 = of('a', 'b', 'c');
const source2 = of(1, 2, 3);

const example = combineLatest(source1, source2);

example.subscribe(([value1, value2]) => console.log(value1, value2)); // 결과: ['c', 1], ['c', 2], ['c', 3]

 

zip

import { zip, of } from 'rxjs';

const source1 = of('a', 'b', 'c');
const source2 = of(1, 2, 3);

const example = zip(source1, source2);

example.subscribe(([value1, value2]) => console.log(value1, value2)); // 결과: ['a', 1], ['b', 2], ['c', 3]

 

debounceTime

import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const searchBox = document.getElementById('search-box');

const example = fromEvent(searchBox, 'input').pipe(
  map(event => (event.target as HTMLInputElement).value),
  debounceTime(300) // 300ms 간격으로 대기
);

example.subscribe(value => console.log(value)); // 입력이 멈춘 후 300ms 후에 마지막 입력 값 방출

 

distinctUntilChanged

연속적으로 중복되는 값을 방출하지 않습니다.

import { from } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';

const source = from([1, 1, 2, 2, 3, 3]);

const example = source.pipe(
  distinctUntilChanged()
);

example.subscribe(val => console.log(val)); // 결과: 1, 2, 3

 

retry

옵저버블이 오류를 발생할 경우 지정된 횟수만큼 재시도합니다.

import { throwError, of } from 'rxjs';
import { retry } from 'rxjs/operators';

let count = 0;
const source = of('Initial Value').pipe(
  map(() => {
    count++;
    if (count < 3) {
      throw new Error('Error Occurred');
    }
    return 'Success';
  }),
  retry(2) // 최대 2번 재시도
);

source.subscribe(
  value => console.log(value),
  error => console.error(error)
);

 

take

지정된 수의 값을 방출하고 옵저버블을 완료합니다.

import { of } from 'rxjs';
import { take } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);

const example = source.pipe(
  take(3) // 처음 3개의 값만 방출
);

example.subscribe(val => console.log(val)); // 결과: 1, 2, 3

 

skip

지정된 수의 값을 건너뛴 후 나머지 값을 방출합니다.

import { of } from 'rxjs';
import { skip } from 'rxjs/operators';

const source = of(1, 2, 3, 4, 5);

const example = source.pipe(
  skip(3) // 처음 3개의 값을 건너뛴 후 나머지 값 방출
);

example.subscribe(val => console.log(val)); // 결과: 4, 5

 

pluck

객체에서 지정된 속성의 값을 추출합니다.

import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';

const source = of(
  { name: 'John', age: 30 },
  { name: 'Jane', age: 25 }
);

const example = source.pipe(
  pluck('name') // 객체에서 'name' 속성의 값 추출
);

example.subscribe(val => console.log(val)); // 결과: 'John', 'Jane'

 

catchError / catchError

에러가 발생한 경우 대체 옵저버블을 반환하거나 예외를 처리합니다.

import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';

const source = throwError('This is an error');

const example = source.pipe(
  catchError(error => of('Fallback Value')) // 에러 발생 시 대체 값 방출
);

example.subscribe(
  value => console.log(value),
  error => console.error(error) // 결과: 'Fallback Value'
);

 

finalize

옵저버블이 완료되거나 에러가 발생할 때 실행됩니다. 주로 정리 작업에 사용됩니다.

import { of } from 'rxjs';
import { finalize } from 'rxjs/operators';

const source = of('Final Value');

const example = source.pipe(
  finalize(() => console.log('Finalized')) // 옵저버블이 완료되면 실행됨
);

example.subscribe(val => console.log(val)); // 결과: 'Final Value', 'Finalized'