Dart – 流
Stream是一系列异步事件。它就像一个异步的 Iterable——流不是在你请求下一个事件时获取它,而是在它准备好时告诉你有一个事件。
换句话说,流是顺序传递的异步事件的来源。有数据事件,由于流与列表的相似性,它们有时被称为流的元素,还有错误事件,它们是失败的通知。一旦所有数据元素都被发出,一个特殊的事件信号流完成将通知任何侦听器没有更多。
流的优势:
使用流进行通信的主要优点是它使代码保持松散耦合。流的所有者可以在值可用时发出值,并且不需要知道有关谁在收听或为什么收听的任何信息。类似地,数据的消费者只需要遵守流接口,并且生成流数据的方式是完全隐藏的。
要记住的要点:
- 流就像一个管道,你在一端放置一个值,如果另一端有一个监听器,该监听器将接收到该值。
- 您可以使用 Stream API 中的await for或listen()处理流。
流是如何创建的?
可以通过多种方式创建流:异步await for迭代流的事件,就像 for 循环迭代 Iterable 一样。
示例 1:
Dart
Future sumStream(Stream stream) async {
var sum=0;
await for(var value in stream) {
sum += value;
}
return sum;
}
Future main() async {
final stream = Stream.fromIterable([1,2,3,4,5]);
final sum = await sumStream(stream);
print('Sum: $sum');
}
Dart
import 'dart:async';
import 'package:flutter/material.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: CounterApp()
);
}
}
class CounterApp extends StatefulWidget {
const CounterApp({Key? key}) : super(key: key);
@override
_CounterAppState createState() => _CounterAppState();
}
class _CounterAppState extends State {
// create instance of streamcontroller class
StreamController _controller = StreamController();
int _counter = 10;
void StartTimer() async{
// Timer Method that runs every second
Timer.periodic(Duration(seconds: 1), (timer) {
_counter--;
// add event/data to stream controller using sink
_controller.sink.add(_counter);
// will stop Count Down Timer when _counter value is 0
if(_counter<=0){
timer.cancel();
_controller.close();
}
});
}
@override
void dispose() {
super.dispose();
// Destroy the Stream Controller when use exit the app
_controller.close();
}
@override
Widget build(BuildContext context) {
return Scaffold(
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
StreamBuilder(
initialData: _counter,
stream: _controller.stream,
builder: (context,snapshot){
return Text('${snapshot.data}');
}
),
SizedBox(
height: 20,
),
ElevatedButton(onPressed: (){
// start the timer
StartTimer();
}, child: Text('Start Count Down'))
],
),
),
);
}
}
Dart
import 'dart:convert';
import 'dart:async';
// Initializing a stream controller
StreamController controller = StreamController();
// Creating a new stream through the controller
Stream stream = controller.stream;
void main() {
// Setting up a subscriber to listen for any events sent on the stream
StreamSubscription subscriber = stream.listen((String data) {
print(data);
},
onError: (error) {
print(error);
},
onDone: () {
print('Stream closed!');
});
// Adding a data event to the stream with the controller
controller.sink.add('GeeksforGeeks!');
// Adding an error event to the stream with the controller
controller.addError('Error!');
// Closing the stream with the controller
controller.close();
}
Dart
import 'dart:convert';
import 'dart:async';
// Initializing a stream controller for a broadcast stream
StreamController controller = StreamController.broadcast();
// Creating a new broadcast stream through the controller
Stream stream = controller.stream;
void main() {
// Setting up a subscriber to listen for any events sent on the stream
StreamSubscription subscriber1 = stream.listen((String data) {
print('Subscriber1: ${data}');
},
onError: (error) {
print('Subscriber1: ${error}');
},
onDone: () {
print('Subscriber1: Stream closed!');
});
// Setting up another subscriber to listen for any events sent on the stream
StreamSubscription subscriber2 = stream.listen((String data) {
print('Subscriber2: ${data}');
},
onError: (error) {
print('Subscriber2: ${error}');
},
onDone: () {
print('Subscriber2: Stream closed!');
});
// Adding a data event to the stream with the controller
controller.sink.add('GeeksforGeeks!');
// Adding an error event to the stream with the controller
controller.addError('Error!');
// Closing the stream with the controller
controller.close();
}
输出:
说明:此代码仅接收整数事件流中的每个事件,将它们相加,然后返回总和(的未来)。当循环体结束时,函数将暂停,直到下一个事件到达或流完成。
在函数,我们使用了async 关键字,这是在循环中使用await时所必需的。
Flutter中 Streams 的重要概念:
流控制器: StreamController简化了流管理,自动创建流和接收器,并提供控制流行为的方法。 Dart中的 StreamController 对象正如其名称所暗示的那样,它控制Dart Streams。该对象用于创建流并在其上发送数据、错误和完成事件。控制器还有助于检查流的属性,例如它有多少订阅者或它是否已暂停。
Stream Builders: StreamBuilder 是一个使用流操作的小部件,基本上,当它获得通过它侦听的 Stream 传递的新值时,它会重建其 UI。
StreamBuilder 需要 2 个参数:
- 流:返回流对象的方法
- builder:在streambuilder 的不同状态期间将返回的小部件。
使用倒计时应用程序实际实现流控制器和流生成器:
我们将使用 Stream Controller 和 Stream builder创建一个Flutter倒计时应用程序。在这个Flutter StreamController 示例中,我们将简单地构建一个应用程序,该应用程序可以使用流控制器接收器将值从 n 倒计时到 0 并更新 UI。
示例1:主要。dart
Dart
import 'dart:async';
import 'package:flutter/material.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: CounterApp()
);
}
}
class CounterApp extends StatefulWidget {
const CounterApp({Key? key}) : super(key: key);
@override
_CounterAppState createState() => _CounterAppState();
}
class _CounterAppState extends State {
// create instance of streamcontroller class
StreamController _controller = StreamController();
int _counter = 10;
void StartTimer() async{
// Timer Method that runs every second
Timer.periodic(Duration(seconds: 1), (timer) {
_counter--;
// add event/data to stream controller using sink
_controller.sink.add(_counter);
// will stop Count Down Timer when _counter value is 0
if(_counter<=0){
timer.cancel();
_controller.close();
}
});
}
@override
void dispose() {
super.dispose();
// Destroy the Stream Controller when use exit the app
_controller.close();
}
@override
Widget build(BuildContext context) {
return Scaffold(
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
StreamBuilder(
initialData: _counter,
stream: _controller.stream,
builder: (context,snapshot){
return Text('${snapshot.data}');
}
),
SizedBox(
height: 20,
),
ElevatedButton(onPressed: (){
// start the timer
StartTimer();
}, child: Text('Start Count Down'))
],
),
),
);
}
}
输出:
解释:
有两个重要的点需要知道——
- Sink:在Flutter Streams 中,Sink 是我们可以将数据添加到流管道的点。
- Source :在Flutter Stream 中,Source 是我们可以继续监听流数据或获取添加到流中的数据的点。
在上面的代码中,我们创建了 StreamController 并使用sink.add() 向其添加数据。
我们可以在下面看到这是我们如何在上面的代码中使用流控制器:
// Create Stream
StreamController _controller = StreamController();
int _counter = 60;
// add event/data to stream controller using sink
_controller.sink.add(_counter);
在这里,我们将初始计数设为 10。因此,从 10 开始,它将以相反的顺序进行,直到值变为 0。
现在我们需要监听进入流的数据并在屏幕上打印数据,为此,我们使用了StreamBuilder 小部件来监听异步事件。
下面是一段伪代码,说明了我们如何在上述程序中使用流构建器来收集数据并打印在屏幕上——
StreamBuilder(
initialData: _counter,
stream: _controller.stream,
builder: (context,snapshot){
return Text('${snapshot.data}');
}
)
两种类型的流:
- 单一订阅流
- 广播流
1. 单订阅流:
单个订阅流是默认设置。当您仅在一个屏幕上使用特定流时,它们运行良好。
单个订阅流只能收听一次。它直到有一个侦听器才开始生成事件,并且在侦听器停止侦听时停止发送事件,即使事件源仍然可以提供更多数据。
单一订阅流可用于下载文件或任何一次性操作。例如,小部件可以订阅流以接收有关值的更新,例如下载进度,并相应地更新其 UI。
示例 1。
Dart
import 'dart:convert';
import 'dart:async';
// Initializing a stream controller
StreamController controller = StreamController();
// Creating a new stream through the controller
Stream stream = controller.stream;
void main() {
// Setting up a subscriber to listen for any events sent on the stream
StreamSubscription subscriber = stream.listen((String data) {
print(data);
},
onError: (error) {
print(error);
},
onDone: () {
print('Stream closed!');
});
// Adding a data event to the stream with the controller
controller.sink.add('GeeksforGeeks!');
// Adding an error event to the stream with the controller
controller.addError('Error!');
// Closing the stream with the controller
controller.close();
}
输出:
2. 广播流:
如果您需要应用程序的多个部分来访问同一个流,请改用广播流。广播流允许任意数量的侦听器。它在其事件准备就绪时触发,无论是否有侦听器。要创建广播流,您只需在现有的单个订阅流上调用asBroadcastStream() 。
Syntax: final broadcastStream = singleStream.asBroadcastStream();
示例 2。
Dart
import 'dart:convert';
import 'dart:async';
// Initializing a stream controller for a broadcast stream
StreamController controller = StreamController.broadcast();
// Creating a new broadcast stream through the controller
Stream stream = controller.stream;
void main() {
// Setting up a subscriber to listen for any events sent on the stream
StreamSubscription subscriber1 = stream.listen((String data) {
print('Subscriber1: ${data}');
},
onError: (error) {
print('Subscriber1: ${error}');
},
onDone: () {
print('Subscriber1: Stream closed!');
});
// Setting up another subscriber to listen for any events sent on the stream
StreamSubscription subscriber2 = stream.listen((String data) {
print('Subscriber2: ${data}');
},
onError: (error) {
print('Subscriber2: ${error}');
},
onDone: () {
print('Subscriber2: Stream closed!');
});
// Adding a data event to the stream with the controller
controller.sink.add('GeeksforGeeks!');
// Adding an error event to the stream with the controller
controller.addError('Error!');
// Closing the stream with the controller
controller.close();
}
输出 :
让我们来看看上面程序中使用的一些类的有用方法:
- add() 方法:处理将任何数据转发到接收器。
- addError() 方法:如果发生错误并且需要通知流的侦听器,则使用addError() 。
- listen() 方法:我们通过.listen()方法监听传入的数据流。