LCOV - code coverage report
Current view: top level - lib/src - timeline.dart (source / functions) Hit Total Coverage
Test: merged.info Lines: 197 298 66.1 %
Date: 2024-11-12 07:37:08 Functions: 0 0 -

          Line data    Source code
       1             : /*
       2             :  *   Famedly Matrix SDK
       3             :  *   Copyright (C) 2019, 2020, 2021 Famedly GmbH
       4             :  *
       5             :  *   This program is free software: you can redistribute it and/or modify
       6             :  *   it under the terms of the GNU Affero General Public License as
       7             :  *   published by the Free Software Foundation, either version 3 of the
       8             :  *   License, or (at your option) any later version.
       9             :  *
      10             :  *   This program is distributed in the hope that it will be useful,
      11             :  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
      13             :  *   GNU Affero General Public License for more details.
      14             :  *
      15             :  *   You should have received a copy of the GNU Affero General Public License
      16             :  *   along with this program.  If not, see <https://www.gnu.org/licenses/>.
      17             :  */
      18             : 
      19             : import 'dart:async';
      20             : import 'dart:convert';
      21             : 
      22             : import 'package:collection/collection.dart';
      23             : 
      24             : import 'package:matrix/matrix.dart';
      25             : import 'package:matrix/src/models/timeline_chunk.dart';
      26             : 
      27             : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
      28             : /// automatically. The initial
      29             : /// event list will be retreived when created by the `room.getTimeline()` method.
      30             : 
      31             : class Timeline {
      32             :   final Room room;
      33          27 :   List<Event> get events => chunk.events;
      34             : 
      35             :   /// Map of event ID to map of type to set of aggregated events
      36             :   final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
      37             : 
      38             :   final void Function()? onUpdate;
      39             :   final void Function(int index)? onChange;
      40             :   final void Function(int index)? onInsert;
      41             :   final void Function(int index)? onRemove;
      42             :   final void Function()? onNewEvent;
      43             : 
      44             :   StreamSubscription<EventUpdate>? sub;
      45             :   StreamSubscription<SyncUpdate>? roomSub;
      46             :   StreamSubscription<String>? sessionIdReceivedSub;
      47             :   StreamSubscription<String>? cancelSendEventSub;
      48             :   bool isRequestingHistory = false;
      49             :   bool isRequestingFuture = false;
      50             : 
      51             :   bool allowNewEvent = true;
      52             :   bool isFragmentedTimeline = false;
      53             : 
      54             :   final Map<String, Event> _eventCache = {};
      55             : 
      56             :   TimelineChunk chunk;
      57             : 
      58             :   /// Searches for the event in this timeline. If not
      59             :   /// found, requests from the server. Requested events
      60             :   /// are cached.
      61           2 :   Future<Event?> getEventById(String id) async {
      62           4 :     for (final event in events) {
      63           4 :       if (event.eventId == id) return event;
      64             :     }
      65           4 :     if (_eventCache.containsKey(id)) return _eventCache[id];
      66           4 :     final requestedEvent = await room.getEventById(id);
      67             :     if (requestedEvent == null) return null;
      68           4 :     _eventCache[id] = requestedEvent;
      69           4 :     return _eventCache[id];
      70             :   }
      71             : 
      72             :   // When fetching history, we will collect them into the `_historyUpdates` set
      73             :   // first, and then only process all events at once, once we have the full history.
      74             :   // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
      75             :   // even if /sync's complete while history is being proccessed.
      76             :   bool _collectHistoryUpdates = false;
      77             : 
      78             :   // We confirmed, that there are no more events to load from the database.
      79             :   bool _fetchedAllDatabaseEvents = false;
      80             : 
      81           1 :   bool get canRequestHistory {
      82           2 :     if (events.isEmpty) return true;
      83           0 :     return !_fetchedAllDatabaseEvents ||
      84           0 :         (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
      85             :   }
      86             : 
      87           2 :   Future<void> requestHistory({
      88             :     int historyCount = Room.defaultHistoryCount,
      89             :   }) async {
      90           2 :     if (isRequestingHistory) {
      91             :       return;
      92             :     }
      93             : 
      94           2 :     isRequestingHistory = true;
      95           2 :     await _requestEvents(direction: Direction.b, historyCount: historyCount);
      96           2 :     isRequestingHistory = false;
      97             :   }
      98             : 
      99           0 :   bool get canRequestFuture => !allowNewEvent;
     100             : 
     101           1 :   Future<void> requestFuture({
     102             :     int historyCount = Room.defaultHistoryCount,
     103             :   }) async {
     104           1 :     if (allowNewEvent) {
     105             :       return; // we shouldn't force to add new events if they will autatically be added
     106             :     }
     107             : 
     108           1 :     if (isRequestingFuture) return;
     109           1 :     isRequestingFuture = true;
     110           1 :     await _requestEvents(direction: Direction.f, historyCount: historyCount);
     111           1 :     isRequestingFuture = false;
     112             :   }
     113             : 
     114           3 :   Future<void> _requestEvents({
     115             :     int historyCount = Room.defaultHistoryCount,
     116             :     required Direction direction,
     117             :   }) async {
     118           4 :     onUpdate?.call();
     119             : 
     120             :     try {
     121             :       // Look up for events in the database first. With fragmented view, we should delete the database cache
     122           3 :       final eventsFromStore = isFragmentedTimeline
     123             :           ? null
     124           8 :           : await room.client.database?.getEventList(
     125           2 :               room,
     126           4 :               start: events.length,
     127             :               limit: historyCount,
     128             :             );
     129             : 
     130           2 :       if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
     131             :         // Fetch all users from database we have got here.
     132           0 :         for (final event in events) {
     133           0 :           if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
     134             :             continue;
     135             :           }
     136             :           final dbUser =
     137           0 :               await room.client.database?.getUser(event.senderId, room);
     138           0 :           if (dbUser != null) room.setState(dbUser);
     139             :         }
     140             : 
     141           0 :         if (direction == Direction.b) {
     142           0 :           events.addAll(eventsFromStore);
     143           0 :           final startIndex = events.length - eventsFromStore.length;
     144           0 :           final endIndex = events.length;
     145           0 :           for (var i = startIndex; i < endIndex; i++) {
     146           0 :             onInsert?.call(i);
     147             :           }
     148             :         } else {
     149           0 :           events.insertAll(0, eventsFromStore);
     150           0 :           final startIndex = eventsFromStore.length;
     151             :           final endIndex = 0;
     152           0 :           for (var i = startIndex; i > endIndex; i--) {
     153           0 :             onInsert?.call(i);
     154             :           }
     155             :         }
     156             :       } else {
     157           3 :         _fetchedAllDatabaseEvents = true;
     158           6 :         Logs().i('No more events found in the store. Request from server...');
     159             : 
     160           3 :         if (isFragmentedTimeline) {
     161           1 :           await getRoomEvents(
     162             :             historyCount: historyCount,
     163             :             direction: direction,
     164             :           );
     165             :         } else {
     166           4 :           if (room.prev_batch == null) {
     167           0 :             Logs().i('No more events to request from server...');
     168             :           } else {
     169           4 :             await room.requestHistory(
     170             :               historyCount: historyCount,
     171             :               direction: direction,
     172           2 :               onHistoryReceived: () {
     173           2 :                 _collectHistoryUpdates = true;
     174             :               },
     175             :             );
     176             :           }
     177             :         }
     178             :       }
     179             :     } finally {
     180           3 :       _collectHistoryUpdates = false;
     181           3 :       isRequestingHistory = false;
     182           4 :       onUpdate?.call();
     183             :     }
     184             :   }
     185             : 
     186             :   /// Request more previous events from the server. [historyCount] defines how much events should
     187             :   /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
     188             :   /// the historical events will be published in the onEvent stream.
     189             :   /// Returns the actual count of received timeline events.
     190           1 :   Future<int> getRoomEvents({
     191             :     int historyCount = Room.defaultHistoryCount,
     192             :     direction = Direction.b,
     193             :   }) async {
     194           3 :     final resp = await room.client.getRoomEvents(
     195           2 :       room.id,
     196             :       direction,
     197           3 :       from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
     198             :       limit: historyCount,
     199           3 :       filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
     200             :     );
     201             : 
     202           1 :     if (resp.end == null) {
     203           2 :       Logs().w('We reached the end of the timeline');
     204             :     }
     205             : 
     206           2 :     final newNextBatch = direction == Direction.b ? resp.start : resp.end;
     207           2 :     final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
     208             : 
     209           1 :     final type = direction == Direction.b
     210             :         ? EventUpdateType.history
     211             :         : EventUpdateType.timeline;
     212             : 
     213           3 :     if ((resp.state?.length ?? 0) == 0 &&
     214           3 :         resp.start != resp.end &&
     215             :         newPrevBatch != null &&
     216             :         newNextBatch != null) {
     217           1 :       if (type == EventUpdateType.history) {
     218           0 :         Logs().w(
     219           0 :           '[nav] we can still request history prevBatch: $type $newPrevBatch',
     220             :         );
     221             :       } else {
     222           2 :         Logs().w(
     223           1 :           '[nav] we can still request timeline nextBatch: $type $newNextBatch',
     224             :         );
     225             :       }
     226             :     }
     227             : 
     228             :     final newEvents =
     229           6 :         resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
     230             : 
     231           1 :     if (!allowNewEvent) {
     232           3 :       if (resp.start == resp.end ||
     233           3 :           (resp.end == null && direction == Direction.f)) allowNewEvent = true;
     234             : 
     235           1 :       if (allowNewEvent) {
     236           2 :         Logs().d('We now allow sync update into the timeline.');
     237           1 :         newEvents.addAll(
     238           5 :           await room.client.database?.getEventList(room, onlySending: true) ??
     239           0 :               [],
     240             :         );
     241             :       }
     242             :     }
     243             : 
     244             :     // Try to decrypt encrypted events but don't update the database.
     245           2 :     if (room.encrypted && room.client.encryptionEnabled) {
     246           0 :       for (var i = 0; i < newEvents.length; i++) {
     247           0 :         if (newEvents[i].type == EventTypes.Encrypted) {
     248           0 :           newEvents[i] = await room.client.encryption!.decryptRoomEvent(
     249           0 :             room.id,
     250           0 :             newEvents[i],
     251             :           );
     252             :         }
     253             :       }
     254             :     }
     255             : 
     256             :     // update chunk anchors
     257           1 :     if (type == EventUpdateType.history) {
     258           0 :       chunk.prevBatch = newPrevBatch ?? '';
     259             : 
     260           0 :       final offset = chunk.events.length;
     261             : 
     262           0 :       chunk.events.addAll(newEvents);
     263             : 
     264           0 :       for (var i = 0; i < newEvents.length; i++) {
     265           0 :         onInsert?.call(i + offset);
     266             :       }
     267             :     } else {
     268           2 :       chunk.nextBatch = newNextBatch ?? '';
     269           4 :       chunk.events.insertAll(0, newEvents.reversed);
     270             : 
     271           3 :       for (var i = 0; i < newEvents.length; i++) {
     272           2 :         onInsert?.call(i);
     273             :       }
     274             :     }
     275             : 
     276           1 :     if (onUpdate != null) {
     277           2 :       onUpdate!();
     278             :     }
     279           2 :     return resp.chunk.length;
     280             :   }
     281             : 
     282           9 :   Timeline({
     283             :     required this.room,
     284             :     this.onUpdate,
     285             :     this.onChange,
     286             :     this.onInsert,
     287             :     this.onRemove,
     288             :     this.onNewEvent,
     289             :     required this.chunk,
     290             :   }) {
     291          63 :     sub = room.client.onEvent.stream.listen(_handleEventUpdate);
     292             : 
     293             :     // If the timeline is limited we want to clear our events cache
     294          45 :     roomSub = room.client.onSync.stream
     295          53 :         .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
     296          18 :         .listen(_removeEventsNotInThisSync);
     297             : 
     298           9 :     sessionIdReceivedSub =
     299          45 :         room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
     300           9 :     cancelSendEventSub =
     301          54 :         room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
     302             : 
     303             :     // we want to populate our aggregated events
     304          16 :     for (final e in events) {
     305           7 :       addAggregatedEvent(e);
     306             :     }
     307             : 
     308             :     // we are using a fragmented timeline
     309          27 :     if (chunk.nextBatch != '') {
     310           1 :       allowNewEvent = false;
     311           1 :       isFragmentedTimeline = true;
     312             :       // fragmented timelines never read from the database.
     313           1 :       _fetchedAllDatabaseEvents = true;
     314             :     }
     315             :   }
     316             : 
     317           4 :   void _cleanUpCancelledEvent(String eventId) {
     318           4 :     final i = _findEvent(event_id: eventId);
     319          12 :     if (i < events.length) {
     320          12 :       removeAggregatedEvent(events[i]);
     321           8 :       events.removeAt(i);
     322           6 :       onRemove?.call(i);
     323           6 :       onUpdate?.call();
     324             :     }
     325             :   }
     326             : 
     327             :   /// Removes all entries from [events] which are not in this SyncUpdate.
     328           2 :   void _removeEventsNotInThisSync(SyncUpdate sync) {
     329          15 :     final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
     330           4 :     final keepEventIds = newSyncEvents.map((e) => e.eventId);
     331           7 :     events.removeWhere((e) => !keepEventIds.contains(e.eventId));
     332             :   }
     333             : 
     334             :   /// Don't forget to call this before you dismiss this object!
     335           0 :   void cancelSubscriptions() {
     336             :     // ignore: discarded_futures
     337           0 :     sub?.cancel();
     338             :     // ignore: discarded_futures
     339           0 :     roomSub?.cancel();
     340             :     // ignore: discarded_futures
     341           0 :     sessionIdReceivedSub?.cancel();
     342             :     // ignore: discarded_futures
     343           0 :     cancelSendEventSub?.cancel();
     344             :   }
     345             : 
     346           2 :   void _sessionKeyReceived(String sessionId) async {
     347             :     var decryptAtLeastOneEvent = false;
     348           2 :     Future<void> decryptFn() async {
     349           6 :       final encryption = room.client.encryption;
     350           6 :       if (!room.client.encryptionEnabled || encryption == null) {
     351             :         return;
     352             :       }
     353           7 :       for (var i = 0; i < events.length; i++) {
     354           4 :         if (events[i].type == EventTypes.Encrypted &&
     355           4 :             events[i].messageType == MessageTypes.BadEncrypted &&
     356           0 :             events[i].content['session_id'] == sessionId) {
     357           0 :           events[i] = await encryption.decryptRoomEvent(
     358           0 :             room.id,
     359           0 :             events[i],
     360             :             store: true,
     361             :             updateType: EventUpdateType.history,
     362             :           );
     363           0 :           addAggregatedEvent(events[i]);
     364           0 :           onChange?.call(i);
     365           0 :           if (events[i].type != EventTypes.Encrypted) {
     366             :             decryptAtLeastOneEvent = true;
     367             :           }
     368             :         }
     369             :       }
     370             :     }
     371             : 
     372           6 :     if (room.client.database != null) {
     373           8 :       await room.client.database?.transaction(decryptFn);
     374             :     } else {
     375           0 :       await decryptFn();
     376             :     }
     377           0 :     if (decryptAtLeastOneEvent) onUpdate?.call();
     378             :   }
     379             : 
     380             :   /// Request the keys for undecryptable events of this timeline
     381           0 :   void requestKeys({
     382             :     bool tryOnlineBackup = true,
     383             :     bool onlineKeyBackupOnly = true,
     384             :   }) {
     385           0 :     for (final event in events) {
     386           0 :       if (event.type == EventTypes.Encrypted &&
     387           0 :           event.messageType == MessageTypes.BadEncrypted &&
     388           0 :           event.content['can_request_session'] == true) {
     389           0 :         final sessionId = event.content.tryGet<String>('session_id');
     390           0 :         final senderKey = event.content.tryGet<String>('sender_key');
     391             :         if (sessionId != null && senderKey != null) {
     392           0 :           room.client.encryption?.keyManager.maybeAutoRequest(
     393           0 :             room.id,
     394             :             sessionId,
     395             :             senderKey,
     396             :             tryOnlineBackup: tryOnlineBackup,
     397             :             onlineKeyBackupOnly: onlineKeyBackupOnly,
     398             :           );
     399             :         }
     400             :       }
     401             :     }
     402             :   }
     403             : 
     404             :   /// Set the read marker to the last synced event in this timeline.
     405           2 :   Future<void> setReadMarker({String? eventId, bool? public}) async {
     406             :     eventId ??=
     407          12 :         events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
     408             :     if (eventId == null) return;
     409           4 :     return room.setReadMarker(eventId, mRead: eventId, public: public);
     410             :   }
     411             : 
     412           7 :   int _findEvent({String? event_id, String? unsigned_txid}) {
     413             :     // we want to find any existing event where either the passed event_id or the passed unsigned_txid
     414             :     // matches either the event_id or transaction_id of the existing event.
     415             :     // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
     416             :     // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
     417             :     // thus meaning we found our element.
     418             :     final searchNeedle = <String>{};
     419             :     if (event_id != null) {
     420           7 :       searchNeedle.add(event_id);
     421             :     }
     422             :     if (unsigned_txid != null) {
     423           4 :       searchNeedle.add(unsigned_txid);
     424             :     }
     425             :     int i;
     426          28 :     for (i = 0; i < events.length; i++) {
     427          21 :       final searchHaystack = <String>{events[i].eventId};
     428             : 
     429          28 :       final txnid = events[i].unsigned?.tryGet<String>('transaction_id');
     430             :       if (txnid != null) {
     431           4 :         searchHaystack.add(txnid);
     432             :       }
     433          14 :       if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
     434             :         break;
     435             :       }
     436             :     }
     437             :     return i;
     438             :   }
     439             : 
     440           4 :   void _removeEventFromSet(Set<Event> eventSet, Event event) {
     441           4 :     eventSet.removeWhere(
     442           4 :       (e) =>
     443           8 :           e.matchesEventOrTransactionId(event.eventId) ||
     444           4 :           event.unsigned != null &&
     445           4 :               e.matchesEventOrTransactionId(
     446           8 :                 event.unsigned?.tryGet<String>('transaction_id'),
     447             :               ),
     448             :     );
     449             :   }
     450             : 
     451           9 :   void addAggregatedEvent(Event event) {
     452             :     // we want to add an event to the aggregation tree
     453           9 :     final relationshipType = event.relationshipType;
     454           9 :     final relationshipEventId = event.relationshipEventId;
     455             :     if (relationshipType == null || relationshipEventId == null) {
     456             :       return; // nothing to do
     457             :     }
     458           8 :     final events = (aggregatedEvents[relationshipEventId] ??=
     459           8 :         <String, Set<Event>>{})[relationshipType] ??= <Event>{};
     460             :     // remove a potential old event
     461           4 :     _removeEventFromSet(events, event);
     462             :     // add the new one
     463           4 :     events.add(event);
     464           4 :     if (onChange != null) {
     465           0 :       final index = _findEvent(event_id: relationshipEventId);
     466           0 :       onChange?.call(index);
     467             :     }
     468             :   }
     469             : 
     470           6 :   void removeAggregatedEvent(Event event) {
     471          18 :     aggregatedEvents.remove(event.eventId);
     472           6 :     if (event.unsigned != null) {
     473          24 :       aggregatedEvents.remove(event.unsigned?['transaction_id']);
     474             :     }
     475          16 :     for (final types in aggregatedEvents.values) {
     476           8 :       for (final events in types.values) {
     477           4 :         _removeEventFromSet(events, event);
     478             :       }
     479             :     }
     480             :   }
     481             : 
     482           7 :   void _handleEventUpdate(EventUpdate eventUpdate, {bool update = true}) {
     483             :     try {
     484          28 :       if (eventUpdate.roomID != room.id) return;
     485             : 
     486          14 :       if (eventUpdate.type != EventUpdateType.timeline &&
     487          12 :           eventUpdate.type != EventUpdateType.history) {
     488             :         return;
     489             :       }
     490             : 
     491          14 :       if (eventUpdate.type == EventUpdateType.timeline) {
     492           7 :         onNewEvent?.call();
     493             :       }
     494             : 
     495           7 :       if (!allowNewEvent) return;
     496             : 
     497           7 :       final status = eventStatusFromInt(
     498          14 :         eventUpdate.content['status'] ??
     499          15 :             (eventUpdate.content['unsigned'] is Map<String, dynamic>
     500          15 :                 ? eventUpdate.content['unsigned'][messageSendingStatusKey]
     501             :                 : null) ??
     502           4 :             EventStatus.synced.intValue,
     503             :       );
     504             : 
     505           7 :       final i = _findEvent(
     506          14 :         event_id: eventUpdate.content['event_id'],
     507          21 :         unsigned_txid: eventUpdate.content['unsigned'] is Map
     508          21 :             ? eventUpdate.content['unsigned']['transaction_id']
     509             :             : null,
     510             :       );
     511             : 
     512          21 :       if (i < events.length) {
     513             :         // if the old status is larger than the new one, we also want to preserve the old status
     514          21 :         final oldStatus = events[i].status;
     515          21 :         events[i] = Event.fromJson(
     516           7 :           eventUpdate.content,
     517           7 :           room,
     518             :         );
     519             :         // do we preserve the status? we should allow 0 -> -1 updates and status increases
     520          14 :         if ((latestEventStatus(status, oldStatus) == oldStatus) &&
     521          11 :             !(status.isError && oldStatus.isSending)) {
     522          21 :           events[i].status = oldStatus;
     523             :         }
     524          21 :         addAggregatedEvent(events[i]);
     525           9 :         onChange?.call(i);
     526             :       } else {
     527           6 :         final newEvent = Event.fromJson(
     528           6 :           eventUpdate.content,
     529           6 :           room,
     530             :         );
     531             : 
     532          12 :         if (eventUpdate.type == EventUpdateType.history &&
     533           6 :             events.indexWhere(
     534          15 :                   (e) => e.eventId == eventUpdate.content['event_id'],
     535           3 :                 ) !=
     536           3 :                 -1) return;
     537          12 :         var index = events.length;
     538          12 :         if (eventUpdate.type == EventUpdateType.history) {
     539           6 :           events.add(newEvent);
     540             :         } else {
     541           8 :           index = events.firstIndexWhereNotError;
     542           8 :           events.insert(index, newEvent);
     543             :         }
     544          10 :         onInsert?.call(index);
     545             : 
     546           6 :         addAggregatedEvent(newEvent);
     547             :       }
     548             : 
     549             :       // Handle redaction events
     550          21 :       if (eventUpdate.content['type'] == EventTypes.Redaction) {
     551             :         final index =
     552           9 :             _findEvent(event_id: eventUpdate.content.tryGet<String>('redacts'));
     553           9 :         if (index < events.length) {
     554           3 :           removeAggregatedEvent(events[index]);
     555             : 
     556             :           // Is the redacted event a reaction? Then update the event this
     557             :           // belongs to:
     558           1 :           if (onChange != null) {
     559           3 :             final relationshipEventId = events[index].relationshipEventId;
     560             :             if (relationshipEventId != null) {
     561           0 :               onChange?.call(_findEvent(event_id: relationshipEventId));
     562             :               return;
     563             :             }
     564             :           }
     565             : 
     566           3 :           events[index].setRedactionEvent(
     567           1 :             Event.fromJson(
     568           1 :               eventUpdate.content,
     569           1 :               room,
     570             :             ),
     571             :           );
     572           2 :           onChange?.call(index);
     573             :         }
     574             :       }
     575             : 
     576           7 :       if (update && !_collectHistoryUpdates) {
     577           9 :         onUpdate?.call();
     578             :       }
     579             :     } catch (e, s) {
     580           0 :       Logs().w('Handle event update failed', e, s);
     581             :     }
     582             :   }
     583             : 
     584           0 :   @Deprecated('Use [startSearch] instead.')
     585             :   Stream<List<Event>> searchEvent({
     586             :     String? searchTerm,
     587             :     int requestHistoryCount = 100,
     588             :     int maxHistoryRequests = 10,
     589             :     String? sinceEventId,
     590             :     int? limit,
     591             :     bool Function(Event)? searchFunc,
     592             :   }) =>
     593           0 :       startSearch(
     594             :         searchTerm: searchTerm,
     595             :         requestHistoryCount: requestHistoryCount,
     596             :         maxHistoryRequests: maxHistoryRequests,
     597             :         // ignore: deprecated_member_use_from_same_package
     598             :         sinceEventId: sinceEventId,
     599             :         limit: limit,
     600             :         searchFunc: searchFunc,
     601           0 :       ).map((result) => result.$1);
     602             : 
     603             :   /// Searches [searchTerm] in this timeline. It first searches in the
     604             :   /// cache, then in the database and then on the server. The search can
     605             :   /// take a while, which is why this returns a stream so the already found
     606             :   /// events can already be displayed.
     607             :   /// Override the [searchFunc] if you need another search. This will then
     608             :   /// ignore [searchTerm].
     609             :   /// Returns the List of Events and the next prevBatch at the end of the
     610             :   /// search.
     611           0 :   Stream<(List<Event>, String?)> startSearch({
     612             :     String? searchTerm,
     613             :     int requestHistoryCount = 100,
     614             :     int maxHistoryRequests = 10,
     615             :     String? prevBatch,
     616             :     @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
     617             :     int? limit,
     618             :     bool Function(Event)? searchFunc,
     619             :   }) async* {
     620           0 :     assert(searchTerm != null || searchFunc != null);
     621           0 :     searchFunc ??= (event) =>
     622           0 :         event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
     623           0 :     final found = <Event>[];
     624             : 
     625             :     if (sinceEventId == null) {
     626             :       // Search locally
     627           0 :       for (final event in events) {
     628           0 :         if (searchFunc(event)) {
     629           0 :           yield (found..add(event), null);
     630             :         }
     631             :       }
     632             : 
     633             :       // Search in database
     634           0 :       var start = events.length;
     635             :       while (true) {
     636           0 :         final eventsFromStore = await room.client.database?.getEventList(
     637           0 :               room,
     638             :               start: start,
     639             :               limit: requestHistoryCount,
     640             :             ) ??
     641           0 :             [];
     642           0 :         if (eventsFromStore.isEmpty) break;
     643           0 :         start += eventsFromStore.length;
     644           0 :         for (final event in eventsFromStore) {
     645           0 :           if (searchFunc(event)) {
     646           0 :             yield (found..add(event), null);
     647             :           }
     648             :         }
     649             :       }
     650             :     }
     651             : 
     652             :     // Search on the server
     653           0 :     prevBatch ??= room.prev_batch;
     654             :     if (sinceEventId != null) {
     655             :       prevBatch =
     656           0 :           (await room.client.getEventContext(room.id, sinceEventId)).end;
     657             :     }
     658           0 :     final encryption = room.client.encryption;
     659           0 :     for (var i = 0; i < maxHistoryRequests; i++) {
     660             :       if (prevBatch == null) break;
     661           0 :       if (limit != null && found.length >= limit) break;
     662             :       try {
     663           0 :         final resp = await room.client.getRoomEvents(
     664           0 :           room.id,
     665             :           Direction.b,
     666             :           from: prevBatch,
     667             :           limit: requestHistoryCount,
     668           0 :           filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
     669             :         );
     670           0 :         for (final matrixEvent in resp.chunk) {
     671           0 :           var event = Event.fromMatrixEvent(matrixEvent, room);
     672           0 :           if (event.type == EventTypes.Encrypted && encryption != null) {
     673           0 :             event = await encryption.decryptRoomEvent(room.id, event);
     674           0 :             if (event.type == EventTypes.Encrypted &&
     675           0 :                 event.messageType == MessageTypes.BadEncrypted &&
     676           0 :                 event.content['can_request_session'] == true) {
     677             :               // Await requestKey() here to ensure decrypted message bodies
     678           0 :               await event.requestKey();
     679             :             }
     680             :           }
     681           0 :           if (searchFunc(event)) {
     682           0 :             yield (found..add(event), resp.end);
     683           0 :             if (limit != null && found.length >= limit) break;
     684             :           }
     685             :         }
     686           0 :         prevBatch = resp.end;
     687             :         // We are at the beginning of the room
     688           0 :         if (resp.chunk.length < requestHistoryCount) break;
     689           0 :       } on MatrixException catch (e) {
     690             :         // We have no permission anymore to request the history
     691           0 :         if (e.error == MatrixError.M_FORBIDDEN) {
     692             :           break;
     693             :         }
     694             :         rethrow;
     695             :       }
     696             :     }
     697             :     return;
     698             :   }
     699             : }
     700             : 
     701             : extension on List<Event> {
     702           4 :   int get firstIndexWhereNotError {
     703           4 :     if (isEmpty) return 0;
     704          16 :     final index = indexWhere((event) => !event.status.isError);
     705           9 :     if (index == -1) return length;
     706             :     return index;
     707             :   }
     708             : }

Generated by: LCOV version 1.14