Dart Stream firstWhere does not immediately resolve

Question

Asked by altair on November 07, 2021 (source).

I have, for example, this simple code:

//create stream with numbers from 1 to 100, delayed by 10sec duration
Stream<int> countStream() async* {
  for (int i = 1; i <= 100; i++) {
    yield i;      
    sleep(Duration(seconds: 10));
  }
}

void main() async {
  var x = await countStream().firstWhere((element) => element == 1); //here Im waiting for number 1
  print(x);
}

The problem is that firstWhere does not exit right after the yield of 1, but after the yeild of 2, and print is holded for 10 seconds.

Why? In my real life App, I have websocket stream that is transformed to message stream, and waiting for specific message. But because the websocket stream does not yield another message, firstWhere hangs.

Here is my original code:

  Stream<Message> lines() async* {
    var partial = '';

    await for (String chunk in ws!) { //ws is WebSocket
      var lines = chunk.split('\n');
      lines[0] = partial + lines[0];
      partial = lines.removeLast();
      for (final line in lines) {
        var msg = Message.parse(line); //Message.parse returns CodeMessage object
        if (msg != null) yield msg;
      }
    }
  }

//at some place in code this hangs because last arrived message is CodeMessage
var msg = await lines().firstWhere((obj) => obj is CodeMessage);
print(msg);

Is there another way to do this or where I am wrong?

Answer

Question answered by hacker1024 (source).

Two things are going wrong here.

  1. First of all, in your example:

    The use of sleep here is not appropriate. Consider the documentation for this function:

    Use this with care, as no asynchronous operations can be processed in an isolate while it is blocked in a sleep call.

    Your countStream function, despite being asynchronous, blocks the entire isolate when it sleeps.

    Try this instead:

    await Future<void>.delayed(const Duration(seconds: 10));
    
  2. Now, on to the real reason why firstWhere is not immediately resolving:

    Let's try a simple experiment:

    Stream<int> testStream() async* {
      for (var i = 0; i < 100; ++i) {
        print(i);
        yield i;
      }
    }
    
    void main() {
      late final StreamSubscription streamSubscription;
      streamSubscription = testStream().listen((value) {
        if (value == 1) streamSubscription.cancel();
      }); 
    }
    

    Output:

    0
    1
    2
    

    So what's going on here? The subscription is cancelled at i == 1 - why does the loop continue until 2?

    The answer is simple. The async generator function will not stop (and the stream will therefore not close) until another yield statement is reached. This is due to the way the event loop works:

    Once a Dart function starts executing, it continues executing until it exits. In other words, Dart functions can’t be interrupted by other Dart code.

    The function has no opportunity to stop until it yields again, because cancelling the stream subscription cannot stop it immediately.

    firstWhere uses the internal Dart _cancelAndValue function to complete with a value. It does not complete until the stream is closed, and the stream does not close until the next yield is reached - which, in your cases, may be delayed or never even happen.

    The only way to fix this behaviour while using an asynchronous generator function would be to add another yield or a return statement before the next delay.

ASYNC-AWAIT DART FLUTTER STREAM WEBSOCKET
SHARE: