Skip to content

Stream5

允許多stream subscriptions

Code

asBroadCastStream()
final stream =
    Stream<int>.periodic(const Duration(seconds: 1), (count) => count)
        .take(10);

final broadcastStream = stream.asBroadcastStream(
  onCancel: (controller) {
    print('Stream paused');
    controller.pause();
  },
  onListen: (controller) async {
    if (controller.isPaused) {
      print('Stream resumed');
      controller.resume();
    }
  },
);

final oddNumberStream = broadcastStream.where((event) => event.isOdd);
final oddNumberListener = oddNumberStream.listen(
      (event) {
    print('Odd: $event');
  },
  onDone: () => print('Done'),
);

final evenNumberStream = broadcastStream.where((event) => event.isEven);
final evenNumberListener = evenNumberStream.listen((event) {
  print('Even: $event');
}, onDone: () => print('Done'));

await Future.delayed(const Duration(milliseconds: 3500)); // 3.5 second
// Outputs:
// Even: 0
// Odd: 1
// Even: 2
oddNumberListener.cancel(); // Nothing printed.
evenNumberListener.cancel(); // "Stream paused"
await Future.delayed(const Duration(seconds: 2));
print(await broadcastStream.first); // "Stream resumed"
// Outputs:
// 3
這個流可以管理多個訂閱,並根據當前的監聽者數量處理事件。讓我們來分解代碼的主要部分和功能:


1. 創建周期性流:

    使用Stream<int>.periodic創建了一個基礎流。這個流每隔一秒產生一個事件,事件內容是從0開始的計數,一共產生10個事件(由.take(10)限制)。
    轉換為廣播流:

    基礎流通過.asBroadcastStream()方法轉換為廣播流。這允許多個監聽者同時訂閱同一個流。

2. 處理訂閱和取消訂閱:

    onCancel回調在廣播流沒有監聽者時調用。在這個例子中,它會暫停流的產生。
    onListen回調在廣播流獲得新的監聽者時調用。如果流是暫停狀態,它會恢復流的產生。
    創建基於條件的子流:

    從廣播流中創建了兩個子流,一個只處理奇數事件(event.isOdd),另一個只處理偶數事件(event.isEven)。

3. 監聽事件並處理:

    為這兩個子流添加了監聽者,分別打印出奇數和偶數事件。
    當監聽者取消訂閱時,會檢查是否需要暫停或恢復流。

4. 流的控制和輸出:

    使用Future.delayed來模擬異步等待。
    當取消奇數流的監聽者時,沒有輸出。
    當取消偶數流的監聽者時,流被暫停。
    經過一段時間後,嘗試獲取流的第一個事件,這時會恢復流,並輸出3(下一個事件)。
展示了如何在Dart中使用廣播流來靈活地處理多個訂閱,並根據監聽者的存在來控制流的行為。

//延續Stream4.md內容
import 'package:flutter/material.dart';
import 'stream.dart';
import 'number.dart';
import 'dart:async';
import 'dart:math';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
// This widget is the root of your application.
  @override
  Widget build(BuildContext context) {
    return MaterialApp(
        title: 'Stream',
        theme: ThemeData(
          primarySwatch: Colors.deepPurple,
          visualDensity: VisualDensity.adaptivePlatformDensity,
        ),
        // home: StreamHomePage(),
        home: StreamNumber());
  }
}

class StreamHomePage extends StatefulWidget {
  @override
  _StreamHomePageState createState() => _StreamHomePageState();
}

class _StreamHomePageState extends State<StreamHomePage> {
  Color? bgColor;
  ColorStream? colorStream;

  @override
  void initState() {
    colorStream = ColorStream();
    changeColor();
    super.initState();
  }

  changeColor() async {
    // await for (var eventColor in colorStream!.getColors()) {
    //   setState(() {
    //     bgColor = eventColor;
    //   });
    // }
    colorStream?.getColors().listen((eventColor) {
      setState(() {
        bgColor = eventColor;
      });
    });
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
        appBar: AppBar(
          title: Text('Stream'),
        ),
        body: Container(
          decoration: BoxDecoration(color: bgColor),
        ));
  }
}

class StreamNumber extends StatefulWidget {
  const StreamNumber({super.key});

  @override
  State<StreamNumber> createState() => _StreamNumberState();
}

class _StreamNumberState extends State<StreamNumber> {
  int lastNumber = 0;
  StreamTransformer? transformer;
  NumberStream? numberStream;
  StreamController? numberStreamController;
  StreamSubscription? subscription;
  StreamSubscription? subscription2;
  String values = '';
  @override
  void initState() {
    numberStream = NumberStream();
    numberStreamController = numberStream?.controller;
    Stream? stream = numberStreamController?.stream.asBroadcastStream();

    // stream?.listen((event) {
    //   setState(() {
    //     lastNumber = event;
    //   });
    // });
    // transformer = StreamTransformer<dynamic, dynamic>.fromHandlers(
    //     handleData: (value, sink) {
    //       sink.add(value * 10);
    //     },
    //     handleError: (error, trace, sink) {
    //       sink.add(-1);
    //     },
    //     handleDone: (sink) => sink.close());
    // stream?.transform(transformer!).listen((event) {
    //   setState(() {
    //     lastNumber = event;
    //   });
    // }).onError((error) {
    //   setState(() {
    //     lastNumber = -1;
    //   });
    // });
    subscription = stream?.listen((event) {
      setState(() {
        lastNumber = event;
      });
      subscription?.onDone(() {
        print('OnDone was called');
      });
    });
    subscription2 = stream?.listen((event) {
      setState(() {
        values += event.toString() + ' - ';
      });
    });
    subscription2 = stream?.listen((event) {
      setState(() {
        values += event.toString() + ' - ';
      });
    });

    super.initState();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      body: Container(
        width: double.infinity,
        child: Column(
          mainAxisAlignment: MainAxisAlignment.spaceEvenly,
          crossAxisAlignment: CrossAxisAlignment.center,
          children: [
            Text(lastNumber.toString(), style: TextStyle(fontSize: 30)),
            ElevatedButton(
              onPressed: () => addRandomNumber(),
              child: Text('New Random Number'),
            ),
            ElevatedButton(
              onPressed: () => stopStream(),
              child: Text('Stop Stream'),
            ),
            Text(values, style: TextStyle(fontSize: 30))
          ],
        ),
      ),
    );
  }

  void addRandomNumber() {
    Random random = Random();
    int myNum = random.nextInt(10);
    if (!numberStreamController!.isClosed) {
      numberStream?.addNumberToSink(myNum);
    } else {
      setState(() {
        lastNumber = -1;
      });
    }
  }

  void stopStream() {
    numberStreamController?.close();
  }
}

Last update : 13 novembre 2024
Created : 13 novembre 2024

Comments

Comments