A Stream is nothing but a sequence of events that occur over a period of time. A Stream in Dart or Flutter is similar to Observable concept of Reactive programming. Please note that these are not audio or video streams.

To understand why Streams are very important and how they work, let us see an example.

Suppose we have a mobile app / website which displays the score of some game in real time. When we load the application for the first time, the application gets the data from the server and displays the score at that moment on the screen. Few minutes later, the score might have changed in the real game and also on the server, but the application which was loaded on the device do not know about that. We need to get the updated data from the server to display the updated score.

Traditionally developers used to send an automatic request to the server after certain interval of time to get the updates. But this approach have a problem. When we send the automatic request the score / data might have changed or not. If it is not changed, then it is an unnecessary request. To avoid this problem, the modern approach is to use reactive programming, in which, the server itself pushes the data whenever there is a change (or whenever required). Then the client can become a subscriber and listens for the updates.

In Dart, the source of the data which triggers the events is known as the Generator. The client which listens to these events is the Subscriber. In the above example, the server can become a Generator to which we can subscribe to in the client part of the application.

Generators

We have two types of Generators in Dart.

1. Synchronous Generator:

A synchronous generator returns an Iterable.

2. Asynchronous Generator:

An asynchronous generator returns a Stream.

Creating Generators in Dart & Flutter :

Generators are nothing but functions whose bodies are prefixed by sync* or async* based on the type of generator that we want to create. Instead of return keyword, Generator functions uses yield keyword.

How to create Synchronous Generators in Dart / Flutter:

Synchronous generators are synchronous, means, they will be executed in the main thread and will not be sent to the event pool.

example of a synchronous generator in Dart:

Iterable<int> mySyncGenerator() sync* {
  for (var i = 0; i < 5; i++) {
    yield i;
  }
}

void main() {
  print('Start');
  print(mySyncGenerator().length);
  mySyncGenerator().forEach((element) {
    print(element);
  });
  print('End');
}

We can see in the above code that we have created a function mySyncGenerator whose body is prefixed by the keyword sync* which tells Dart that this function is a Generator and returns an iterable.

output:


Start
5
0
1
2
3
4
End

In real time applications we may not see many synchronous generators.

How to create Asynchronous generators in Dart / Flutter

Asynchronous generators are more common than synchronous generators. Flutter comes with several asynchronous generators. Though most of the client side developers may not create generators in the application, they may, mostly have to work with the built-in generators.

example:

import 'dart:math';

Stream<int> myAsyncGenerator() async* {
  for (var i = 0; i < 5; i++) {
    await Future.delayed(Duration(seconds: Random().nextInt(5)));
    yield i;
  }
}

void main() {
  print('Start');
  myAsyncGenerator().forEach((element) {
    print(element);
  });
  print('End');
}

Output: (after Start, End you will get the numbers after random intervals of time)

Start
End
0
1
2
3
4

Here you can see that the asynchronous generators are executed after synchronous code.

Though we used for..each in the above example, it is more common to use listen method on asynchronous generators.

example:

...
myAsyncGenerator().listen((event) {
    print(event);
  });
...

If you replace the for..each with the above code, you will get the same output as you got before.

Stream Controller

In real time applications, most of the streams are created by using the StreamController  which is a part of async library, as this stream controller gives more options to work with the streams.

example:

import 'dart:async';

void main() {
  var controller = StreamController<String>();

  var stream = controller.stream;

  stream.listen((event) {
    print(event);
  }, onError: (error) {
    print(error);
  }, onDone: () {
    print('Done/Closed');
  });

  controller.add('Data Event 1');
  controller.addError('Error Event');
  controller.add('Data Event 2');
  controller.close();
}

Output:

Data Event 1
Error Event
Data Event 2
Done/Closed

In this example, we created a controller using the StreamController. We saved the stream from the controller into a stream variable and started listening to it.

When we call the add method on the controller, we are adding a data event onto the stream which will be received by the subscriber. Similarly we can send an error by using addError and let the subscriber know when the stream is completed by using the close method.

The first argument to the listen method of a stream is a successful callback function. Additionally we can pass named callback functions to handle errors and completion of the stream.

Managing Stream Subscriptions

In most of the client side applications, the developer may have to listen to the streams created by the framework or any library. So managing a subscription can be a more important task for a client side developer than creating them.

Developers must pay close attention when they are listening / subscribing to a stream. If you do not cancel the subscription when the data is no longer required,  since the subscriptions are nothing but event listeners, they will be continuously running the callback functions.

Cancelling a Subscription in Dart / Flutter based on Data received from the Stream:

example:

import 'dart:async';

StreamController<String> controller = StreamController<String>();
Stream<String> stream = controller.stream;
late StreamSubscription subscription;
void main() {
  subscription = stream.listen((event) {
    if (event == 'Data2') {
      subscription.cancel();
    }
    print(event);
  }, onError: (error) {
    print(error);
  }, onDone: () {
    print('Done');
  });

  controller.add('Data1');
  controller.addError('Error Message');
  controller.add('Data2');
  controller.add('Data3');
}

Output:

Data1
Error Message
Data2

In this example, we cancelled the subscription when the data from the stream equals to Data2.

The reason for creating subscription first using the late keyword is, if you try to create subscription and assign a value which itself contains a reference to subscription, then we get the error from Dart “Local variable ‘subscription’ can’t be referenced before it is declared”.

There are couple of other very useful methods available on Subscriptions which can be used for any requirement

pause: Pauses the subscription – subscription.pause()

resume: Resumes a paused subscription – subscription.resume()

Leave A Comment

Your email address will not be published. Required fields are marked *