Stream5
允許多stream subscriptions¶
Code¶
asBroadCastStream()¶
final stream =
Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
.take(10);
final broadcastStream = stream.asBroadcastStream(
onCancel: (controller) {
print('Stream paused');
controller.pause();
},
onListen: (controller) async {
if (controller.isPaused) {
print('Stream resumed');
controller.resume();
}
},
);
final oddNumberStream = broadcastStream.where((event) => event.isOdd);
final oddNumberListener = oddNumberStream.listen(
(event) {
print('Odd: $event');
},
onDone: () => print('Done'),
);
final evenNumberStream = broadcastStream.where((event) => event.isEven);
final evenNumberListener = evenNumberStream.listen((event) {
print('Even: $event');
}, onDone: () => print('Done'));
await Future.delayed(const Duration(milliseconds: 3500)); // 3.5 second
// Outputs:
// Even: 0
// Odd: 1
// Even: 2
oddNumberListener.cancel(); // Nothing printed.
evenNumberListener.cancel(); // "Stream paused"
await Future.delayed(const Duration(seconds: 2));
print(await broadcastStream.first); // "Stream resumed"
// Outputs:
// 3
這個流可以管理多個訂閱,並根據當前的監聽者數量處理事件。讓我們來分解代碼的主要部分和功能:
1. 創建周期性流:
使用Stream<int>.periodic創建了一個基礎流。這個流每隔一秒產生一個事件,事件內容是從0開始的計數,一共產生10個事件(由.take(10)限制)。
轉換為廣播流:
基礎流通過.asBroadcastStream()方法轉換為廣播流。這允許多個監聽者同時訂閱同一個流。
2. 處理訂閱和取消訂閱:
onCancel回調在廣播流沒有監聽者時調用。在這個例子中,它會暫停流的產生。
onListen回調在廣播流獲得新的監聽者時調用。如果流是暫停狀態,它會恢復流的產生。
創建基於條件的子流:
從廣播流中創建了兩個子流,一個只處理奇數事件(event.isOdd),另一個只處理偶數事件(event.isEven)。
3. 監聽事件並處理:
為這兩個子流添加了監聽者,分別打印出奇數和偶數事件。
當監聽者取消訂閱時,會檢查是否需要暫停或恢復流。
4. 流的控制和輸出:
使用Future.delayed來模擬異步等待。
當取消奇數流的監聽者時,沒有輸出。
當取消偶數流的監聽者時,流被暫停。
經過一段時間後,嘗試獲取流的第一個事件,這時會恢復流,並輸出3(下一個事件)。
展示了如何在Dart中使用廣播流來靈活地處理多個訂閱,並根據監聽者的存在來控制流的行為。
//延續Stream4.md內容
import 'package:flutter/material.dart';
import 'stream.dart';
import 'number.dart';
import 'dart:async';
import 'dart:math';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
// This widget is the root of your application.
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Stream',
theme: ThemeData(
primarySwatch: Colors.deepPurple,
visualDensity: VisualDensity.adaptivePlatformDensity,
),
// home: StreamHomePage(),
home: StreamNumber());
}
}
class StreamHomePage extends StatefulWidget {
@override
_StreamHomePageState createState() => _StreamHomePageState();
}
class _StreamHomePageState extends State<StreamHomePage> {
Color? bgColor;
ColorStream? colorStream;
@override
void initState() {
colorStream = ColorStream();
changeColor();
super.initState();
}
changeColor() async {
// await for (var eventColor in colorStream!.getColors()) {
// setState(() {
// bgColor = eventColor;
// });
// }
colorStream?.getColors().listen((eventColor) {
setState(() {
bgColor = eventColor;
});
});
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Stream'),
),
body: Container(
decoration: BoxDecoration(color: bgColor),
));
}
}
class StreamNumber extends StatefulWidget {
const StreamNumber({super.key});
@override
State<StreamNumber> createState() => _StreamNumberState();
}
class _StreamNumberState extends State<StreamNumber> {
int lastNumber = 0;
StreamTransformer? transformer;
NumberStream? numberStream;
StreamController? numberStreamController;
StreamSubscription? subscription;
StreamSubscription? subscription2;
String values = '';
@override
void initState() {
numberStream = NumberStream();
numberStreamController = numberStream?.controller;
Stream? stream = numberStreamController?.stream.asBroadcastStream();
// stream?.listen((event) {
// setState(() {
// lastNumber = event;
// });
// });
// transformer = StreamTransformer<dynamic, dynamic>.fromHandlers(
// handleData: (value, sink) {
// sink.add(value * 10);
// },
// handleError: (error, trace, sink) {
// sink.add(-1);
// },
// handleDone: (sink) => sink.close());
// stream?.transform(transformer!).listen((event) {
// setState(() {
// lastNumber = event;
// });
// }).onError((error) {
// setState(() {
// lastNumber = -1;
// });
// });
subscription = stream?.listen((event) {
setState(() {
lastNumber = event;
});
subscription?.onDone(() {
print('OnDone was called');
});
});
subscription2 = stream?.listen((event) {
setState(() {
values += event.toString() + ' - ';
});
});
subscription2 = stream?.listen((event) {
setState(() {
values += event.toString() + ' - ';
});
});
super.initState();
}
@override
Widget build(BuildContext context) {
return Scaffold(
body: Container(
width: double.infinity,
child: Column(
mainAxisAlignment: MainAxisAlignment.spaceEvenly,
crossAxisAlignment: CrossAxisAlignment.center,
children: [
Text(lastNumber.toString(), style: TextStyle(fontSize: 30)),
ElevatedButton(
onPressed: () => addRandomNumber(),
child: Text('New Random Number'),
),
ElevatedButton(
onPressed: () => stopStream(),
child: Text('Stop Stream'),
),
Text(values, style: TextStyle(fontSize: 30))
],
),
),
);
}
void addRandomNumber() {
Random random = Random();
int myNum = random.nextInt(10);
if (!numberStreamController!.isClosed) {
numberStream?.addNumberToSink(myNum);
} else {
setState(() {
lastNumber = -1;
});
}
}
void stopStream() {
numberStreamController?.close();
}
}
Last update :
13 novembre 2024
Created : 13 novembre 2024
Created : 13 novembre 2024