【JavaScript】asyncとgeneratorを使ってobserverを実装してみる

 rxjsのObservable、状態管理に非常に便利ですよね。しかし、その実装がどうなっているか意識する機会はあまりありません。

 今回は、asyncとgeneratorを組み合わせてobserverのようなclassを実装してみましょう。

インターフェースを定義する

使用例を考える

 こう書いてこう動いたら嬉しいな、と考えてみます。

const wait = async (ms)=>{
  return new Promise(resolve => setTimeout(resolve, ms));
};

Observable.create(async function*(){
  for(let i = 0; i < 10; i++) {
    yield i;
    await wait(1000);
  }
}).subscribe(value=>console.log(value), err=>console.log(err));

 このプログラムは、タイマーをイメージしたプログラムで、1秒毎にconsoleに経過秒数が出力されることが期待値です。rxjsのObserverと違うところは、next()を使わずに、yieldで表現しているところです。

クラス設計する

 きっとこんな感じになります。

class Observable{  // factoryみたいなやつ
  static create(generatorFn){}  // きっとObserverを作成して返してくれる
}

class Observer{

  constructor(generatorFn){
    this.generatorFn = generatorFn;
  }

  subscribe(stepCallback, errCallback){}  // subscribeを登録し、処理をスタートする。

  async start(){}  // 処理をスタートするため関数
}

 
 YAGNI原則に従って、最初のユースケースを満たすために必要無いものは定義しません。また、そこまで厳密に定義する必要はありません。実装を進めていく上で足りなければ足すし、冗長なら削除します。 

実装

 シンプルなのでざっと書いてみます。for-await構文は比較的新しい書き方ですね。

class Observable{
  static create(generatorFn){
    return new Observer(generatorFn);
  }
}

class Observer{
  constructor(generatorFn){
    this.generatorFn = generatorFn;
    this.subscribers = null;
  }

  subscribe(stepCallback, errCallback){
    this.subscribers = { stepCallback, errCallback };
    this.start();
  }

  async start(){
    try{
      for await(let value of this.generatorFn()){
        this.subscribers.stepCallback(value);
      }
    }catch(e){
      this.subscribers.errCallback(e);
    }
  }
}

 nullチェックや値のチェックは行っていません。実際に実装するときは、例外処理も含め、適切なところでErrorを投げたり、引数が妥当かどうかのチェックをしてあげると親切です。

テスト

 最初のユースケースを繋げて実行してみて、実際にちゃんと0〜9の値が1秒ごとに表示され、10以降の出力が行われないことを確かめます。Chromeをお使いなら、タブを開いて開発者ツールのコンソールにそのままコピペすると実際に動かすことができます。


// Implemented code. class Observable{ static create(generatorFn){ return new Observer(generatorFn); } } class Observer{ constructor(generatorFn){ this.generatorFn = generatorFn; this.subscribers = null; } subscribe(stepCallback, errCallback){ this.subscribers = { stepCallback, errCallback }; this.start(); } async start(){ try{ for await(let value of this.generatorFn()){ this.subscribers.stepCallback(value); } }catch(e){ this.subscribers.errCallback(e); } } } // Test code. const wait = async (ms)=>{ return new Promise(resolve => setTimeout(resolve, ms)); }; Observable.create(async function*(){ for(let i = 0; i < 10; i++) { yield i; await wait(1000); } }).subscribe(value=>console.log(value), err=>console.log(err));

 結果

0
1
2
3
4
5
6
7
8
9

 想定通りですね。

まとめ

 generatorとasync/awaitを組み合わせてobserverを実装してみました。シンプルな実装ですが、generatorと非同期処理の良い使用例になったかと思います。

 まだobservableほどの柔軟性はありませんが、そこそこ似たようなことは出来るはずです…!