ntaoo blog

主にDart, Flutter, AngularDartについて書いていきます。ときどき長文。日本語のみ。Twitter: @ntaoo

BLoCパターンにおける、AngularDartでのStreamの扱い方

BLoCパターンでModelを設計するとUIとの通信はStreamとSinkに限定される。StreamをAngularDartのComponentでlistenしてViewを更新するコードについて、迷ったりハマったりするかもしれないところを解説する。

Async Pipeを使用する際の不具合を避ける

BehaviorSubject、StreamTransformer、AsyncPipeの組み合わせで、無限ループ

問題ないケース

// Model
final aBehaviorSubject = BehaviorSubject<String>();
Stream<String> get aStream => aBehaviorSubject.stream;

// Angular Template
<div *ngFor="let element of aStream | async">{{element}}</div>

無限ループが起こるケース

BehaviorSubjectからのObservableまたはStreamStreamTransformertransformしたものを、getterAsyncPipeにパラメーターとして渡すと、無限ループが起こる。

// Model
final aBehaviorSubject = BehaviorSubject<String>();
Stream<String> get aStream => aBehaviorSubject.doOnData(print).stream;

// Angular Template
<div *ngFor="let element of model.aStream | async">{{element}}</div>

上記の例ではdoOnDataでstreamに流れる値をprintしており、これはデバッグ時によく使う手法だが、 (doOnDataに限らず、) Observableのmethodは内部でStreamTransformerで処理されている。

もちろん、package:stream_transformや、自作のStreamTransformerでも同様にこの問題が起こる。

// Model
final aBehaviorSubject = BehaviorSubject<String>();
// Stream<String> get aStream => aBehaviorSubject.stream.transform(tap(print));

// Angular Template
<div *ngFor="let element of model.aStream | async">{{element}}</div>

なぜ無限ループが起こるのか

ChangeDetectionで値を比較する際、StreamTransformerを起動することにより、毎回異なるidentityのstreamgetterで生成してしまうため。StreamTransformerは、同期的なCollection操作と同様に、副作用をおこさないために新しいStreamインスタンスを生成する。(より正確には、BehaviorSubjectからのStreamTransformerは、その内部に独自のBroadcastStreamControllerを持ち、bindによって入力Streamを元にそのBroadcastStreamController参照を保持した新たなBroadcastStreamが生成される。)

注) BehaviorSubjectBroadcastStreamControllerである。

  1. ChangeDetection -> Async Pipe transform
    1. 初回なので、そのままlisten。BehaviorSubjectのstreamなので即座に値が流れてくる
    2. _updateLatestValue -> markForCheckでChangeDetectionを起動させる
  2. ChangeDetection -> Async Pipe transform
    1. getterで新たなstreamを取得するため、以前のstreamをdisposeし、その新たなstreamでAsync Pipe transformを再帰呼び出しする
  3. Async Pipe transform(再帰
    1. getterで新たなBehaviorSubjectのstreamをlistenする。即座に値が流れてくる
    2. _updateLatestValue -> markForCheckでChangeDetectionを起動させる。そして2.1.へ戻る

どうすれば良いか

ChangeDetectionのたびにgetterによりStreamTransformerによる変換を経て新たなStreamが生成されることが問題なので、生成されたstreamをcomponentのインスタンス変数に保持しておくことで防ぐ。

// 可能ならばconstructorで処理し、finalを付ける。
Stream aStream;
ngOnInit() {
  aStream = model.aStream;
}

このほうが無駄なStreamインスタンス再生成を省略できるので、処理効率も良い。しかし、ボイラープレートコードが増えるので退屈。

Viewの更新手法が異なるFlutterのStreamBuilderでは、私が理解している限りではこのような問題は起こらず、同時により効率的な更新手法になっている。

Single Subscription Stream、StreamTransformer、AsyncPipeの組み合わせで、EXCEPTION: Bad state: Stream has already been listened to.

Single Subscription Stream Controllerから生成されたstreamををlistenできるのは一度だけである。複数回listenすると、EXCEPTION: Bad state: Stream has already been listened to.というエラーが起こる。

// Model
final aStreamController = StreamController<String>();
// Stream<String> get aStream => aStreamController.stream.transform(tap(print));

// Angular Template
<div *ngFor="let element of model.aStream | async">{{element}}</div>

なぜAsync Pipeでこのエラーが起こるのか

こちらも、上記の無限ループの挙動のように、 StreamTransformerで新たなSingle Subscription Streamが生成され、その際に入力 StreamがStreamTransformer内部でlistenされているため、上記の無限ループの問題のようにAsync Pipe内部でdispose -> listenすると、新たなStreamTransformerが以前のStreamTransformerによりlisten済みの状態のstreamをlistenしてしまい、このエラーが起こる。

どうすれば良いか

上記無限ループ問題と同様に、StreamTransformerにより変換されたStreamをcomponentのインスタンス変数に保持しておくことで防ぐ。

Component, Templateのコードをどう書くべきか

Async Pipeを使う場合と使わない場合の両方を解説する。

Async Pipeを使う場合

Async Pipeはその内部で、ComponentのonDestroy時にlistenしているstreamのstreamSubscription.cancel()が実行されるので、Componentでstreamをcancelするコードを書く必要がない。

ただ、なんとAngularDartには、*ngIfasync as構文がない。 *ngIf="aStream | async as anObject"と書けない。Objectのfieldが複数ある場合は、残念なことに、以下のような何回もlistenする冗長なコードを書かなければならない。

<div>{{(aStream | async).field1}}</div>
<div>{{(aStream | async).field2}}</div>
<div>{{(aStream | async).field3}}</div>

*ngForではAsync PipeでStreamで流れてきたListのそれぞれの要素を同期的に扱うことができるのだが、*ngForを使わない場合に複数のfieldがあるobjectをハンドリングする際は、Async Pipeを使わないほうが良いかもしれない。

もし良いやり方があれば知りたい。

Async Pipeを使わない場合

Async Pipeを使わない場合は、ComponentのDart側のコードでの対応が必要。コンストラクタやngOnInitで、BLoCのoutput streamをlistenし、そのsubscriptionをListにまとめておく。

var anObject;
final List<StreamSubscription> _subscriptions = [];

ngOnInit() {
  var aSubscription = bloc.aStream.listen((e) {
    anObject = e;
  });
  _subscriptions.add(aSubscription);
}

void ngOnDestroy() {
  print('cancel all subscriptions.');
  for (final s in subscriptions) s.cancel();
}

モリーリークを防ぐために、componentのOnDestroyですべてのStreamSubscriptioncancel()を行う。


BroadcastStreamControllerはその内部で複数のlistenerを保持できる。Listenする側がstreamSubscription.cancel()を忘れると、BroadcastStreamControllerに不要なListenerがいつまでも残ったままになる。つまり、メモリーリークする。長い時間実行される類のアプリケーションならば気にしたほうが良い。 対照的に、SingleSubscriptionStreamControllerにはメモリーリークの心配はない。


StreamはChangeDetectionの対象なので、Streamの新しい値が来てanObjectが書き換わるたびにViewが更新される。

<div>{{(anObject.field1}}</div>
<div>{{(anObject.field2}}</div>
<div>{{(anObject.field3}}</div>

このngOnDestory()でのstreamSubscription.cancel()は頻出するパターンなので、以下のようなMixinを用意すると楽ができる。

mixin CancelSubscriptionsOnDestroy implements OnDestroy {
  @protected
  final List<StreamSubscription> subscriptions = [];
  
  void ngOnDestroy() {
    // cancel all subscriptions.
    for (final s in subscriptions) s.cancel();
  }  
}

使用方法は以下。

class AComponent with CancelSubscriptionsOnDestroy implements OnInit, OnDestroy {
  AComponent(this.bloc);
  
  final ABloc bloc;
  var anObject;
  
  ngOnInit() {
    subscriptions.add(bloc.aStream.listen((e) {
      anObject = e;
    }));
  }
  
  // Override and call super only if the rest process after canceling all stream subscriptions is necessary.
  @override
  void ngOnDestroy() {
    super.ngOnDestroy();
  }
}

Async Pipeのテンプレート構文サポートが不十分なため、このようなボイラープレートコードを書くことが多くなる。また、Async Pipeを使用しても、前述のStreamTransformer使用時の注意点のように、インスタンス変数にStreamを保持するコードが必要になる。


ここまでで説明してきた配慮と使い分けが面倒に感じるならば、いっそAsync Pipeをまったく使わないという判断もありだと思う。

ComponentState Mixin

FlutterのStreamBuilderでは、私が理解している限りは、この解説でAsync Pipeについて見てきたような問題は起こらない。AngularのChangeDetectionほどの黒魔術感もなく、より効率的なView更新メカニズムだと思う。

実は、AngularDartにはComponentStateというMixinが提案されており、これはFlutterのようにsetState()を明示的に呼ぶことでViewを更新する。 ComponentStateはまだExperimental扱いなので実戦投入するべきではないが、これの発展次第ではFlutterのようなより効率的で罠の少ないView更新メカニズムが手に入るのかもしれない。

Stream Pipeを作る

いちどlistenしたstreamが入れ替わることを考慮するユースケースはおそらくないはずなので、いちどlistenしたらcomponentがdestroyされるまでstreamを入れ替えないというpipeがあればいいかもしれない。また、Async PipeはStreamとFutureの両方に対応して内部で処理が分岐しているので、Streamだけに対応したものがあればすこしだけ性能があがる。StreamPipeという名前にして、aStream | listenという構文にする。そうすれば、StreamTransformerで変換したStreamをインスタンス変数に保持するボイラープレートコードが必要なくなる。

FlutterのStreamBuilder WidgetのようなComponentまたはStructual Directiveを作る

Angular TypeScript版のようなasync as構文が無いので、pipeする回数が増えるのは退屈。

FlutterのStreamBuilder WidgetのようなComponentまたはStructural Directiveを作り、Template変数と共に使用すれば、TemplateだけでStreamを扱うことができ、ComponentのDart側のボイラープレートコードをまったく書かなくてすむようになるかもしれない。


両者とも開発コストは高くなさそうだが、今から非公式にそういうものを作るのにかなり躊躇がある。おとなしくいまのままで、ComponentState Mixinの発展やFlutter Webの成功を祈るほうが無難かもしれない。

まとめ

以上のように、Async Pipe使用時の落とし穴と、Streamを取り扱うComponentのパターンについて見てきた。

問題の根本には、Angular TemplateやWeb ComponentのHTMLといったマークアップ用外部DSLプログラミング言語のコードを連携させていく辛みがある。 Angular TypeScript版はTemplate構文が複雑化する宿命。一方、FlutterはViewがXML, HTMLのような外部DSLでなくDartコードによる内部DSLなので問題は起こらないが、Angularとは対照的に、HTML, CSSに慣れたWebデザイナーに再学習を強いる。また、もともとUIを表現することを想定されているわけではないC系の構文でUIを表現するので、Dartのシンプルな構文といえどもXMLなどよりも可読性が低くなりがち。そこで、Dart 2になってnewの記述が任意になったり、2.3でその可読性問題の痛みを緩和するための機能追加が行われた。

Angularの現在のChangeDetectionも十分にこなれており性能も良いので、このままAngularDartを使用しつつ、ComponentState、もしくはFlutterのWeb版の成功を気長に待ちたい。

Streamについての基礎知識を解説するのは負担が重いので避けたが、そういった基礎が不安な人は、以下の記事を読み、dart:asyncソースコードを読むなどしたら理解が深まると思う。

https://www.dartlang.org/articles/libraries/broadcast-streams

RxDartの実装はdart:asyncのStream関連ライブラリのラッパーとなっているので、その内部構造の理解を通じてStreamへの理解を深めていくのも面白いのではないか。

あと、UnmodifiableListView*ngForに渡すとフリーズした件、単なる私の勘違いだったか、Angularのアップデートで直ったのかはわからないが、5.2.0では問題なかった。