getMessagesStream method

  1. @override
Stream<List<ChatMessageModel>> getMessagesStream(
  1. String chatId
)

Retrieves a stream of messages for a chat.

chatId: The ID of the chat.

Implementation

@override
Stream<List<ChatMessageModel>> getMessagesStream(String chatId) {
  timestampToFilter = DateTime.now();
  var messages = <ChatMessageModel>[];
  _controller = StreamController<List<ChatMessageModel>>(
    onListen: () {
      var messagesCollection = _db
          .collection(_options.chatsCollectionName)
          .doc(chatId)
          .collection(_options.messagesCollectionName)
          .where(
            'timestamp',
            isGreaterThan: timestampToFilter,
          )
          .withConverter<FirebaseMessageDocument>(
            fromFirestore: (snapshot, _) => FirebaseMessageDocument.fromJson(
              snapshot.data()!,
              snapshot.id,
            ),
            toFirestore: (user, _) => user.toJson(),
          )
          .snapshots();

      _subscription = messagesCollection.listen((event) async {
        for (var message in event.docChanges) {
          var data = message.doc.data();
          var sender = await _userService.getUser(data!.sender);
          var timestamp = DateTime.fromMillisecondsSinceEpoch(
            data.timestamp.millisecondsSinceEpoch,
          );

          if (timestamp.isBefore(timestampToFilter)) {
            return;
          }
          messages.add(
            data.imageUrl != null
                ? ChatImageMessageModel(
                    sender: sender!,
                    imageUrl: data.imageUrl!,
                    timestamp: timestamp,
                  )
                : ChatTextMessageModel(
                    sender: sender!,
                    text: data.text!,
                    timestamp: timestamp,
                  ),
          );
          timestampToFilter = DateTime.now();
        }
        _cumulativeMessages = [
          ..._cumulativeMessages,
          ...messages,
        ];
        var uniqueObjects = _cumulativeMessages.toSet().toList();
        _cumulativeMessages = uniqueObjects;
        _cumulativeMessages
            .sort((a, b) => a.timestamp.compareTo(b.timestamp));
        notifyListeners();
      });
    },
    onCancel: () async {
      await _subscription?.cancel();
      _subscription = null;
      _cumulativeMessages = [];
      lastChat = chatId;
      lastMessage = null;
      debugPrint('Canceling messages stream');
    },
  );

  return _controller!.stream;
}