Programing Language/JS & TS

RxJS 설명과 Observable 예제

칼쵸쵸 2024. 3. 24. 20:11

 

    RxJS

    RxJS는 특히 비동기 이벤트를 처리할 때 강력하며, 복잡한 데이터 스트림을 다루어야 하는 상황에서 꼭 필요하다고 할 수 있습니다. 이러한 상황은 주로 실시간 데이터 처리, 사용자 입력 처리, HTTP 요청 처리 등에 해당됩니다. RxJS를 사용하면 이벤트 스트림을 쉽게 생성, 변환, 조합 및 구독할 수 있으며, 이는 복잡한 비동기 코드를 보다 선언적이고 관리하기 쉬운 형태로 만들어줍니다.

    예제: 실시간 검색어 기능 구현

    다음은 사용자가 입력 필드에 텍스트를 입력할 때마다 검색어를 실시간으로 서버에 쿼리하고 결과를 받아 표시하는 기능을 구현한 예제입니다. 이 예제는 사용자 입력 처리와 HTTP 요청을 동시에 다루는 방법을 보여줍니다. RxJS를 사용하면 이런 유형의 상호작용을 효과적으로 구현할 수 있습니다.

     

    import { fromEvent, switchMap, debounceTime, distinctUntilChanged, filter } from 'rxjs';
    import { ajax } from 'rxjs/ajax';
    
    // input 요소에 대한 참조
    const searchBox = document.getElementById('search-box');
    
    // input 이벤트를 Observable로 변환
    const typeahead = fromEvent(searchBox, 'input').pipe(
      debounceTime(500), // 마지막 이벤트 이후 500ms 동안 대기
      map((e: any) => e.target.value), // 입력된 텍스트 추출
      filter(text => text.length > 2), // 최소 3글자가 입력되어야 함
      distinctUntilChanged(), // 이전 입력과 다른 경우에만 처리
      switchMap(query => ajax(`/search?q=${query}`)) // 검색 쿼리 수행, 이전 요청은 취소
    );
    
    // 결과 구독 및 처리
    typeahead.subscribe({
      next: response => {
        // 검색 결과를 처리하는 로직
        console.log(response);
      },
      error: err => {
        // 오류 처리
        console.error(err);
      },
    });

     

    RxJS(Reactive Extensions for JavaScript)는 JavaScript 및 TypeScript를 위한 라이브러리로, 비동기 및 이벤트 기반 프로그램을 작성할 때 사용됩니다. RxJS는 옵저버 패턴, 반복자 패턴 및 함수형 프로그래밍의 개념을 결합하여 데이터 스트림을 다루는데 유용합니다.

     

    옵저버 패턴과 이터레이터 패턴은 소프트웨어 설계에서 자주 사용되는 디자인 패턴입니다. 이 두 패턴은 서로 다른 목적을 가지고 있으며, 그 사용 예를 통해 각각을 더 잘 이해할 수 있습니다.

     

    옵저버 패턴과 이터레이터 패턴

    옵저버 패턴

    옵저버 패턴은 한 객체의 상태 변화를 관찰하는 관찰자들에게 자동으로 알림을 보내는 방식입니다. 주로 이벤트 핸들링 시스템에서 많이 사용됩니다. 예를 들어, 뉴스 서비스와 이를 구독하는 여러 구독자가 있는 경우를 생각해 볼 수 있습니다.

    interface Observer {
        update: (data: any) => void;
    }
    
    interface Subject {
        subscribe: (observer: Observer) => void;
        unsubscribe: (observer: Observer) => void;
        notify: () => void;
    }
    
    class NewsService implements Subject {
        private observers: Observer[] = [];
        private news: string = "";
    
        subscribe(observer: Observer) {
            this.observers.push(observer);
        }
    
        unsubscribe(observer: Observer) {
            const index = this.observers.indexOf(observer);
            if (index > -1) {
                this.observers.splice(index, 1);
            }
        }
    
        notify() {
            for (const observer of this.observers) {
                observer.update(this.news);
            }
        }
    
        publishNews(news: string) {
            this.news = news;
            this.notify();
        }
    }
    
    class Subscriber implements Observer {
        constructor(private name: string) {}
    
        update(data: any) {
            console.log(`${this.name} received news: ${data}`);
        }
    }
    
    const newsService = new NewsService();
    const subscriber1 = new Subscriber("Subscriber 1");
    const subscriber2 = new Subscriber("Subscriber 2");
    
    newsService.subscribe(subscriber1);
    newsService.subscribe(subscriber2);
    
    newsService.publishNews("Breaking News: Observer Pattern Example!");

     

    이 예제에서, NewsServiceSubject 인터페이스를 구현하며 뉴스를 발행할 때마다 모든 구독자(Observer)에게 이를 알립니다. 각 SubscriberObserver 인터페이스를 구현하여 뉴스 업데이트를 받을 수 있습니다.

     

    이터레이터 패턴

    이터레이터 패턴은 컬렉션 내부의 표현 방식을 노출시키지 않고도 그 요소들을 순회할 수 있는 방법을 제공합니다. 이를 통해 컬렉션의 구현과는 독립적으로 요소에 접근할 수 있습니다.

     

    반복자 패턴의 주 아이디어는 컬렉션의 순회 동작을 라는 별도의 객체로 추출하는 것입니다.

    interface Iterator<T> {
        next: () => T | null;
        hasNext: () => boolean;
    }
    
    interface IterableCollection<T> {
        createIterator: () => Iterator<T>;
    }
    
    class ConcreteCollection implements IterableCollection<number> {
        private items: number[] = [];
    
        constructor(items: number[]) {
            this.items = items;
        }
    
        createIterator() {
            return new ConcreteIterator(this);
        }
    }
    
    class ConcreteIterator implements Iterator<number> {
        private collection: ConcreteCollection;
        private position: number = 0;
    
        constructor(collection: ConcreteCollection) {
            this.collection = collection;
        }
    
        next() {
            if (this.hasNext()) {
                return this.collection['items'][this.position++];
            }
            return null;
        }
    
        hasNext() {
            return this.position < this.collection['items'].length;
        }
    }
    
    const collection = new ConcreteCollection([1, 2, 3, 4, 5]);
    const iterator = collection.createIterator();
    
    while (iterator.hasNext()) {
        console.log(iterator.next());
    }

     

     

    이 예제에서는 ConcreteCollection이라는 숫자의 컬렉션과 이 컬렉션을 순회할 수 있는 ConcreteIterator를 정의했습니다. Iterator 인터페이스를 구현함으로써, 컬렉션의 내부 구현에 의존하지 않고도 요소들을 순회할 수 있게 됩니다.

    RxJS에서 두 패턴의 사용 예제

    RxJS에서 옵저버 패턴 사용 예제

    옵저버 패턴에서는 객체(주제)가 상태 변화를 관찰자(옵저버)에게 알리는 구조를 가집니다. RxJS에서는 Observable이 주제의 역할을 하고, Observer가 관찰자의 역할을 합니다.

     

    import { Observable } from 'rxjs';
    
    // Observable 생성
    const observable = new Observable(subscriber => {
      subscriber.next('Hello');
      subscriber.next('World');
      setTimeout(() => {
        subscriber.next('with delay');
        subscriber.complete();
      }, 1000);
    });
    
    // Observer 정의
    const observer = {
      next: (value: any) => console.log(value),
      error: (error: any) => console.error('Error:', error),
      complete: () => console.log('Completed'),
    };
    
    // Observable 구독
    observable.subscribe(observer);

     

     

    위 예제에서 Observable은 두 개의 즉시 메시지("Hello", "World")와 하나의 지연 메시지("with delay")를 방출합니다. Observer는 이 메시지들을 받아서 처리하며, 모든 메시지 처리가 완료되면 "Completed"를 로그로 출력합니다.

     

    이터레이터 패턴 사용 예제

    이터레이터 패턴에서는 컬렉션의 요소들을 순회할 수 있는 방법을 제공합니다. RxJS에서는 이 패턴이 Observable 스트림을 통해 구현되어 있습니다. Observable은 시간에 따라 데이터 항목의 시퀀스를 제공하고, 이 시퀀스는 구독하는 Observer에 의해 하나씩 순회됩니다.

    import { from } from 'rxjs';
    
    // 배열을 Observable로 변환
    const numbers = [1, 2, 3, 4, 5];
    const numbersObservable = from(numbers);
    
    // Observable 구독 및 각 숫자 출력
    numbersObservable.subscribe({
      next: (value) => console.log(value),
      complete: () => console.log('Completed')
    });

     

    이 예제에서 from 함수는 배열을 Observable로 변환합니다. 이 Observable은 배열의 각 요소를 순차적으로 방출하고, 구독하는 Observer는 이 요소들을 하나씩 받아서 처리합니다.

     

    RxJS에서 두 패턴의 조합

    RxJS는 이 두 패턴을 유연하게 조합하여 사용할 수 있습니다. Observable을 사용하여 데이터 스트림을 생성하고(옵저버 패턴), 이 스트림을 통해 데이터 항목을 순차적으로 처리하고 구독할 수 있습니다(이터레이터 패턴). 이를 통해 비동기 데이터 처리, 이벤트 처리, 시간 기반의 스트림 처리 등 다양한 작업을 선언적이고 효율적으로 구현할 수 있습니다.

     

     

    RxJS 기본 생성 예제

    간단한 옵저버블 생성 및 구독

    import { Observable } from 'rxjs';
    
    // 간단한 옵저버블 생성
    const observable = new Observable(subscriber => {
      subscriber.next('Hello');
      subscriber.next('World');
      subscriber.complete();
    });
    
    // 옵저버블 구독
    observable.subscribe({
      next: value => console.log(value),
      complete: () => console.log('Done'),
    });

    옵저버블과 연산자 활용

    import { of } from 'rxjs';
    import { map } from 'rxjs/operators';
    
    // 옵저버블과 map 연산자를 사용하여 데이터 변환
    of(1, 2, 3)
      .pipe(
        map(value => value * 2)
      )
      .subscribe(val => console.log(val)); // 결과: 2, 4, 6

    필터링 및 구독 중단

    import { fromEvent } from 'rxjs';
    import { filter, take } from 'rxjs/operators';
    
    // 클릭 이벤트를 감지하고 필터링하여 처음 세 번의 클릭만 처리
    const subscription = fromEvent(document, 'click')
      .pipe(
        filter((_, index) => index < 3),
        take(3)
      )
      .subscribe(event => console.log('Clicked', event));
    
    // 3번의 클릭 후 자동으로 구독 중단
    setTimeout(() => {
      subscription.unsubscribe();
      console.log('Subscription stopped');
    }, 10000);

    에러 처리

    import { throwError } from 'rxjs';
    
    // 에러 발생
    const observable = throwError('Something went wrong');
    observable.subscribe({
      error: error => console.error(error)
    });

     

     

    RxJS Observable

    생성(Creation)

    Observable.create(), new Observable() 및 다양한 생성 함수를 사용하여 Observable을 생성할 수 있습니다.

    주로 비동기 작업, 이벤트 처리 또는 외부 리소스와의 상호 작용으로 Observable을 생성합니다.

     

    1. of를 사용한 Observable 생성

    of 함수는 인자로 받은 값을 순서대로 발행하는 Observable을 생성합니다. 모든 값이 발행된 후 자동으로 완료됩니다.

    import { of } from 'rxjs';
    
    const observable = of(1, 2, 3);
    
    observable.subscribe({
      next: value => console.log(value),
      complete: () => console.log('Completed')
    });

     

    2. fromEvent를 사용한 이벤트 기반 Observable 생성

    DOM 이벤트 또는 기타 이벤트 소스로부터 Observable을 생성합니다. 예를 들어, 사용자의 클릭 이벤트를 기반으로 Observable을 생성할 수 있습니다

     

    import { fromEvent } from 'rxjs';
    
    const button = document.querySelector('button');
    const clicks$ = fromEvent(button, 'click');
    
    clicks$.subscribe(() => console.log('Button clicked'));

     

    3. interval을 사용한 시간 기반 Observable 생성

    일정 시간 간격으로 연속적인 숫자를 발행하는 Observable을 생성합니다.

    import { interval } from 'rxjs';
    
    const interval$ = interval(1000); // 매 1초마다 숫자를 발행
    
    interval$.subscribe(value => console.log(`Tick: ${value}`));

     

    4. 사용자 정의 Observable 생성

    new Observable을 통해 사용자 정의 Observable을 생성할 수 있습니다. 이 방식은 더 복잡한 비동기 로직을 구현할 때 유용합니다.

    import { Observable } from 'rxjs';
    
    const customObservable = new Observable(subscriber => {
      subscriber.next('Hello');
      subscriber.next('World');
    
      // 비동기 작업을 흉내
      setTimeout(() => {
        subscriber.next('Asynchronous message');
        subscriber.complete(); // 스트림을 완료
      }, 1000);
    
      // 구독이 취소되었을 때의 정리 로직
      return () => {
        console.log('Observable cleaned up');
      };
    });
    
    customObservable.subscribe({
      next: value => console.log(value),
      complete: () => console.log('Completed')
    });

    구독(Subscribing):

    Observable은 subscribe() 메서드를 통해 구독됩니다. 이를 통해 Observer가 Observable에서 발생하는 이벤트를 수신할 수 있습니다.

    Observer에는 next, error, complete와 같은 콜백 함수를 포함할 수 있습니다.

    1. 단순 구독

    가장 간단한 형태는 데이터를 발행할 때마다 실행할 콜백 함수를 subscribe 메소드에 전달하는 것입니다.

    import { of } from 'rxjs';
    
    const observable = of(1, 2, 3);
    
    observable.subscribe(value => console.log(value)); // 각 값에 대해 로그 출력

     

    2. 다중 콜백 구독

    subscribe 메소드는 세 가지 유형의 콜백을 받을 수 있습니다: next, error, complete.

    const observable = of(1, 2, 3);
    
    observable.subscribe(
      value => console.log(value), // next: 각 값에 대해 호출
      error => console.error(error), // error: 에러 발생 시 호출
      () => console.log('Completed') // complete: 스트림이 완료될 때 호출
    );

     

    3. 구독 객체 사용

    subscribe 호출은 Subscription 객체를 반환합니다. 이 객체를 사용하여 구독을 프로그래밍 방식으로 관리할 수 있습니다.

    import { interval } from 'rxjs';
    
    const observable = interval(1000); // 매초마다 숫자 방출
    
    const subscription = observable.subscribe(value => console.log(value));
    
    // 5초 후 구독 취소
    setTimeout(() => {
      subscription.unsubscribe();
    }, 5000);

    이벤트 발행(Event Emission):

    Observable은 데이터 스트림에서 이벤트를 발생시킬 수 있습니다. 이러한 이벤트는 next(), error(), complete()를 통해 전달됩니다.

    next()는 값을 전달하고, error()는 오류를 발생시키고, complete()는 스트림의 종료를 알립니다.

     

    1. next를 통한 값 방출

    Subject를 사용하여 명시적으로 값들을 방출할 수 있습니다.

    import { Subject } from 'rxjs';
    
    const subject = new Subject();
    
    subject.subscribe(value => console.log(value));
    
    // 값 방출
    subject.next(1);
    subject.next(2);

     

    2. 이벤트 기반 데이터 발행

    DOM 이벤트 또는 Node.js 이벤트 에미터와 같은 이벤트 소스로부터 데이터 스트림을 생성할 수 있습니다.

    import { fromEvent } from 'rxjs';
    
    const button = document.querySelector('button');
    const clicks$ = fromEvent(button, 'click');
    
    clicks$.subscribe(() => console.log('Button was clicked'));

     

    3. 비동기 데이터 발행

    Observable 생성자 또는 생성 함수(of, from, interval 등)를 사용하여 비동기적으로 데이터를 발행할 수 있습니다.

    import { asyncScheduler, of } from 'rxjs';
    
    // asyncScheduler를 사용한 비동기 발행
    of(1, 2, 3, asyncScheduler).subscribe(value => console.log(value));

     

    비동기성(Asynchronicity):

    Observable은 비동기적으로 값을 생성하거나 처리할 수 있습니다.

    이것은 AJAX 호출, 타이머 이벤트, 사용자 입력 및 다른 비동기적 작업과 같은 경우에 특히 유용합니다.

    1. HTTP 요청 처리

    RxJS의 ajax 함수를 사용하여 HTTP 요청을 비동기적으로 처리할 수 있습니다. 예를 들어, 특정 URL에서 데이터를 가져오는 간단한 GET 요청은 다음과 같이 구현할 수 있습니다.

    import { ajax } from 'rxjs/ajax';
    import { map, catchError } from 'rxjs/operators';
    import { of } from 'rxjs';
    
    const fetchPosts = ajax.getJSON('https://jsonplaceholder.typicode.com/posts').pipe(
      map(data => {
        // 데이터 처리 로직
        console.log(data);
        return data;
      }),
      catchError(error => {
        // 오류 처리 로직
        console.error('Error:', error);
        return of(error);
      })
    );
    
    fetchPosts.subscribe(
      data => console.log('Data received:', data),
      error => console.log('Error:', error)
    );

     

    2. 사용자 입력 디바운싱

    사용자가 입력 필드에 타이핑을 할 때, 서버에 너무 많은 요청을 보내는 것을 방지하기 위해 디바운싱 기법을 사용할 수 있습니다. 이는 일정 시간 동안 추가 입력이 없을 때만 마지막 입력에 대한 처리를 수행하는 방식입니다.

    import { fromEvent } from 'rxjs';
    import { map, debounceTime, filter } from 'rxjs/operators';
    
    const searchBox = document.getElementById('search-box');
    
    const typeahead = fromEvent(searchBox, 'input').pipe(
      map((e: any) => e.target.value),
      debounceTime(500),
      filter(query => query.length >= 3) // 최소 3글자 이상 입력됐을 때만 처리
    );
    
    typeahead.subscribe(data => {
      // 입력 데이터 처리 로직, 예: 서버 검색 요청
      console.log(data);
    });

     

    3. 여러 비동기 요청 병렬 처리

    여러 비동기 요청을 동시에 실행하고 모든 요청이 완료됐을 때 결과를 처리하려면, forkJoin 함수를 사용할 수 있습니다.

    import { forkJoin } from 'rxjs';
    import { ajax } from 'rxjs/ajax';
    
    const postRequest = ajax.getJSON('https://jsonplaceholder.typicode.com/posts/1');
    const userRequest = ajax.getJSON('https://jsonplaceholder.typicode.com/users/1');
    
    forkJoin([postRequest, userRequest]).subscribe(results => {
      const [post, user] = results;
      console.log(`Post title: ${post.title}, User name: ${user.name}`);
    });

     

    4. 이벤트 스트림의 조합

    여러 이벤트 소스로부터 오는 데이터 스트림을 조합하여 새로운 스트림을 생성할 수 있습니다. 예를 들어, 두 개의 버튼 클릭을 감지하여 각각 다른 동작을 수행한 후 결과를 조합할 수 있습니다.

    import { fromEvent } from 'rxjs';
    import { map } from 'rxjs/operators';
    
    const button1 = document.getElementById('button1');
    const button2 = document.getElementById('button2');
    
    const clicksButton1 = fromEvent(button1, 'click').pipe(map(() => 'Button 1 clicked'));
    const clicksButton2 = fromEvent(button2, 'click').pipe(map(() => 'Button 2 clicked'));
    
    clicksButton1.subscribe(console.log);
    clicksButton2.subscribe(console.log);