Simplify events stream in Flutter with built_stream

August 3, 2020

In Flutter, we often use a stream to handle API events and rebuild the UI accordingly. Normally, this is done manually. In this post, I what to introduce the library built_stream in order to simplify and reduce the time for coding and maintaining.

flutter-simplify-event-stream

1. Handle API call events manually

Let's start with a common case of calling an API. We will define the events that we want to handle and the API service class.

event.dart
class Event {}

class EventStart extends Event {
  
  String toString() {
    return "Start fetching";
  }
}

class EventSuccess extends Event {
  final dynamic _data;
  EventSuccess(this._data);
  
  String toString() {
    return "Success with data: $_data";
  }
}

class EventFailure extends Event {
  final dynamic _error;
  EventFailure(this._error);
  
  String toString() {
    return "Failure with error: $_error";
  }
}

api_service.dart
class ApiService {
  Future<String> fetchData(String input) async {
    await Future.delayed(Duration(seconds: 1));
    print(input + " World");
    return input + " World";
  }
}

Then we will build a BLoC to handle the API. Note that when we call the API, we can either handle it parallelly or only just accept the latest call. Because we use the generator function trigger event the default mode will be parallel.

In order to make the stream to only handle the latest call, we will add an order number for each function call, and keep the lastest order number of the current call. There will be an order number check after the asynchronous call, if the order number of that call is equal to the latest order number, it is allowed to continue. Otherwise, it has to be stopped.

This is our BLoC.

bloc_event.dart
import 'package:flutter_demo_built_stream/normal_stream/event.dart';
import 'package:flutter_demo_built_stream/normal_stream/api_service.dart';
import 'package:rxdart/subjects.dart';

class BlocEvent {
  BehaviorSubject<Event> _event;
  bool _onlyNewestCall;

  /// this variable help keep the order number of the call
  /// ie: 1st fetch, 2nd fetch
  int _callOrder;

  BlocEvent() {
    _event = BehaviorSubject();
    _callOrder = 0;
    _onlyNewestCall = false;
  }
  get eventStream => _event.stream;
  set onlyNewestCall(bool onlyNewestCall) => _onlyNewestCall = onlyNewestCall;

  fetchData(input) async {
    _callOrder++;
    int callOrder = _callOrder;
    try {
      ApiService apiService = ApiService();
      _event.value = EventStart();
      String result = await apiService.fetchData(input);

      /// if _onlyNewestCall value is true and the value of the current _callOrder
      ///  is different than the local callOrder variable in this function
      /// fetchData will be stop to prevent add new  event to stream
      if (_onlyNewestCall && callOrder != _callOrder) return;

      if (result != "Hello World") throw "Not Hello World";
      _event.value = EventSuccess(result);
    } catch (e) {
      _event.value = EventFailure(e);
    }
  }

  dispose() {
    _event.close();
  }
}

In the above BLoC, we will use a variable onlyNewestCall to switch between parallel mode and only newest call allowance mode.

Then we will implement the UI

main.dart
import 'package:flutter/material.dart';
import 'package:flutter_demo_built_stream/normal_stream/bloc_event.dart';
import 'package:flutter_demo_built_stream/normal_stream/event.dart';

class MyHomePage extends StatefulWidget {
  MyHomePage({Key key, this.title}) : super(key: key);

  final String title;

  
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  BlocEvent _blocEvent;
  bool _onlyNewestCall;
  
  void initState() {
    super.initState();
    _blocEvent = BlocEvent();
    _onlyNewestCall = false;
  }

  void _toggleOnlyNewestCall() {
    setState(() {
      _onlyNewestCall = !_onlyNewestCall;
      _blocEvent.onlyNewestCall = _onlyNewestCall;
    });
  }

  void _fetchHello() {
    _blocEvent.fetchData("Hello");
  }

  void _fetchHenno() {
    _blocEvent.fetchData("Henno");
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Container(
              padding: const EdgeInsets.all(20),
              color: Colors.grey.withOpacity(0.4),
              child: Column(
                children: [
                  RaisedButton(
                    onPressed: _toggleOnlyNewestCall,
                    child: Text("Toggle only get newest call"),
                  ),
                  Text(_onlyNewestCall ? "On" : "Off"),
                ],
              ),
            ),
            SizedBox(
              height: 30,
            ),
            Row(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                RaisedButton(
                  onPressed: _fetchHello,
                  child: Text("Fetch Hello"),
                ),
                SizedBox(
                  width: 10,
                ),
                RaisedButton(
                  onPressed: _fetchHenno,
                  child: Text("Fetch Henno"),
                ),
              ],
            ),
            Container(
              height: 120,
              padding: const EdgeInsets.only(top: 30, right: 10, left: 10),
              child: StreamBuilder<Event>(
                  stream: _blocEvent.eventStream,
                  builder: (context, snapshot) {
                    if (snapshot.hasData == false) return SizedBox();
                    return Text(
                      snapshot.data.toString(),
                      textAlign: TextAlign.center,
                      style: Theme.of(context).textTheme.headline4,
                    );
                  }),
            ),
          ],
        ),
      ),
    );
  }
}

When the _onlyNewestCall is false, multiple fetch calls will be executed and non stop. It will allow older calls to return result events continuosly.

built_stream_newest.gif

When the _onlyNewestCall is false, multiple fetch calls will be executed. However, only the newest call is allowed to return value.

built_stream_newest.gif

2. Handle API call events with built_stream

Since the API call events in Flutter application are pretty common, repeating writing the code to handle those often takes time. There is a solution that we can use the built_stream library to generate the code. You can follow the guide in the link above to install and setup code generation.

First, we will define the origin class to generate code

fetch_data_stream.dart
import 'dart:async';
import 'package:built_stream/stream_annotations.dart';
import 'package:built_stream/stream_types.dart';
import 'package:customized_streams/customized_streams.dart';
import 'package:flutter_demo_built_stream/built_stream_approach/api_service.dart';


part 'fetch_data_stream.g.dart';

(ApiService, 'fetchData')
('String', 'input')
('String', 'output')
class FetchDataStream extends _FetchDataStreamOrigin {
  
  String get errorMessage => 'Cannot fetch data';
}

Then we run this command flutter pub pub run build_runner build --delete-conflicting-outputs. Let's take a lot at one generated class that handles the business, the logic is the same as the manual, we have parallel calls, and only the newest call setup, and an option to switch between two modes.

fetch_data_stream.g.dart
abstract class _FetchDataStreamOrigin {
  String get errorMessage;

  //TODO: make api service private
  ApiService _apiService = ApiService();
    //TODO: add call order
    
  int _callOrder;
  bool _onlyNewestCall;
  _FetchDataStreamOrigin() {
    _callOrder = 0;
    _onlyNewestCall = false;
  }
  set onlyNewestCall(onlyNewestCall) => _onlyNewestCall = onlyNewestCall;
  Stream<StreamState> process(FetchDataParams params) async* {
    _callOrder++;
    int callOrder = _callOrder;
    try {
      yield const FetchDataStart();
      FetchDataResults results = await _apiService.fetchData(params);
      if (_onlyNewestCall && callOrder != _callOrder) return;
      yield FetchDataSucceed(results);
    } catch (error) {
      ErrorLocation location = ErrorLocation(this.runtimeType, errorMessage);
      yield FetchDataError(location, error, params);
    }
  }
}

The API service class is now different than the above since it will accept the argument type and the returned type from the generated file.

api_service.dart
import 'package:flutter_demo_built_stream/built_stream_approach/built_stream/fetch_data_stream.dart';

class ApiService {
  Future<FetchDataResults> fetchData(FetchDataParams params) async {
    await Future.delayed(Duration(seconds: 1));
    return FetchDataResults(params.input + " World");
  }
}

Now, it's time for the UI

main.dart
import 'package:built_stream/stream_types.dart';
import 'package:flutter/material.dart';
import 'package:flutter_demo_built_stream/built_stream_approach/built_stream/fetch_data_stream.dart';

class MyHomePage extends StatefulWidget {
  MyHomePage({Key key, this.title}) : super(key: key);

  final String title;

  
  _MyHomePageState createState() => _MyHomePageState();
}

class _MyHomePageState extends State<MyHomePage> {
  FetchDataBloc _fetchDataBloc;
  bool _onlyNewestCall;
  
  void initState() {
    super.initState();
    _fetchDataBloc = FetchDataBloc();
    _onlyNewestCall = false;
  }

  void _toggleOnlyNewestCall() {
    setState(() {
      _onlyNewestCall = !_onlyNewestCall;
      _fetchDataBloc.onlyNewestCall = _onlyNewestCall;
    });
  }

  void _fetchHello() {
    _fetchDataBloc.fetchDataSubject.add(FetchDataParams("Hello"));
  }

  void _fetchHenno() {
    _fetchDataBloc.fetchDataSubject.add(FetchDataParams("Henno"));
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text(widget.title),
      ),
      body: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            Container(
              padding: const EdgeInsets.all(20),
              color: Colors.grey.withOpacity(0.4),
              child: Column(
                children: [
                  RaisedButton(
                    onPressed: _toggleOnlyNewestCall,
                    child: Text("Toggle only get newest call"),
                  ),
                  Text(_onlyNewestCall ? "On" : "Off"),
                ],
              ),
            ),
            SizedBox(
              height: 30,
            ),
            Row(
              mainAxisAlignment: MainAxisAlignment.center,
              children: [
                RaisedButton(
                  onPressed: _fetchHello,
                  child: Text("Fetch Hello"),
                ),
                SizedBox(
                  width: 10,
                ),
                RaisedButton(
                  onPressed: _fetchHenno,
                  child: Text("Fetch Henno"),
                ),
              ],
            ),
            Container(
              height: 120,
              padding: const EdgeInsets.only(top: 30, right: 10, left: 10),
              child: StreamBuilder<StreamState>(
                  stream: _fetchDataBloc.fetchDataSubject.outputStream,
                  builder: (context, snapshot) {
                    if (snapshot.hasData == false) return SizedBox();
                    String result;
                    if (snapshot.data is FetchDataStart) {
                      result = "Start";
                    } else if (snapshot.data is FetchDataSucceed) {
                      result =
                          (snapshot.data as FetchDataSucceed).results.output;
                    } else if (snapshot.data is FetchDataError) {
                      result = (snapshot.data as FetchDataError).error.output;
                    }
                    return Text(
                      result,
                      textAlign: TextAlign.center,
                      style: Theme.of(context).textTheme.headline4,
                    );
                  }),
            ),
          ],
        ),
      ),
    );
  }
}

This is the result of parallel mode

built_stream_newest.gif

And this is the result of the only newest call allowance mode

built_stream_newest.gif

3. Conclustion

By using built_stream to generate code to handle API call events, we have reduced a lot of code and also merge the classes declaration file and BLoC file to one. You can find all source code here.

Thank you for reading. I hope you can find something valuable from this approach!