Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 27 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<EventUpdate>? sub;
45 : StreamSubscription<SyncUpdate>? roomSub;
46 : StreamSubscription<String>? sessionIdReceivedSub;
47 : StreamSubscription<String>? cancelSendEventSub;
48 : bool isRequestingHistory = false;
49 : bool isRequestingFuture = false;
50 :
51 : bool allowNewEvent = true;
52 : bool isFragmentedTimeline = false;
53 :
54 : final Map<String, Event> _eventCache = {};
55 :
56 : TimelineChunk chunk;
57 :
58 : /// Searches for the event in this timeline. If not
59 : /// found, requests from the server. Requested events
60 : /// are cached.
61 2 : Future<Event?> getEventById(String id) async {
62 4 : for (final event in events) {
63 4 : if (event.eventId == id) return event;
64 : }
65 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
66 4 : final requestedEvent = await room.getEventById(id);
67 : if (requestedEvent == null) return null;
68 4 : _eventCache[id] = requestedEvent;
69 4 : return _eventCache[id];
70 : }
71 :
72 : // When fetching history, we will collect them into the `_historyUpdates` set
73 : // first, and then only process all events at once, once we have the full history.
74 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
75 : // even if /sync's complete while history is being proccessed.
76 : bool _collectHistoryUpdates = false;
77 :
78 : // We confirmed, that there are no more events to load from the database.
79 : bool _fetchedAllDatabaseEvents = false;
80 :
81 1 : bool get canRequestHistory {
82 2 : if (events.isEmpty) return true;
83 0 : return !_fetchedAllDatabaseEvents ||
84 0 : (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
85 : }
86 :
87 2 : Future<void> requestHistory({
88 : int historyCount = Room.defaultHistoryCount,
89 : }) async {
90 2 : if (isRequestingHistory) {
91 : return;
92 : }
93 :
94 2 : isRequestingHistory = true;
95 2 : await _requestEvents(direction: Direction.b, historyCount: historyCount);
96 2 : isRequestingHistory = false;
97 : }
98 :
99 0 : bool get canRequestFuture => !allowNewEvent;
100 :
101 1 : Future<void> requestFuture({
102 : int historyCount = Room.defaultHistoryCount,
103 : }) async {
104 1 : if (allowNewEvent) {
105 : return; // we shouldn't force to add new events if they will autatically be added
106 : }
107 :
108 1 : if (isRequestingFuture) return;
109 1 : isRequestingFuture = true;
110 1 : await _requestEvents(direction: Direction.f, historyCount: historyCount);
111 1 : isRequestingFuture = false;
112 : }
113 :
114 3 : Future<void> _requestEvents({
115 : int historyCount = Room.defaultHistoryCount,
116 : required Direction direction,
117 : }) async {
118 4 : onUpdate?.call();
119 :
120 : try {
121 : // Look up for events in the database first. With fragmented view, we should delete the database cache
122 3 : final eventsFromStore = isFragmentedTimeline
123 : ? null
124 8 : : await room.client.database?.getEventList(
125 2 : room,
126 4 : start: events.length,
127 : limit: historyCount,
128 : );
129 :
130 2 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
131 : // Fetch all users from database we have got here.
132 0 : for (final event in events) {
133 0 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
134 : continue;
135 : }
136 : final dbUser =
137 0 : await room.client.database?.getUser(event.senderId, room);
138 0 : if (dbUser != null) room.setState(dbUser);
139 : }
140 :
141 0 : if (direction == Direction.b) {
142 0 : events.addAll(eventsFromStore);
143 0 : final startIndex = events.length - eventsFromStore.length;
144 0 : final endIndex = events.length;
145 0 : for (var i = startIndex; i < endIndex; i++) {
146 0 : onInsert?.call(i);
147 : }
148 : } else {
149 0 : events.insertAll(0, eventsFromStore);
150 0 : final startIndex = eventsFromStore.length;
151 : final endIndex = 0;
152 0 : for (var i = startIndex; i > endIndex; i--) {
153 0 : onInsert?.call(i);
154 : }
155 : }
156 : } else {
157 3 : _fetchedAllDatabaseEvents = true;
158 6 : Logs().i('No more events found in the store. Request from server...');
159 :
160 3 : if (isFragmentedTimeline) {
161 1 : await getRoomEvents(
162 : historyCount: historyCount,
163 : direction: direction,
164 : );
165 : } else {
166 4 : if (room.prev_batch == null) {
167 0 : Logs().i('No more events to request from server...');
168 : } else {
169 4 : await room.requestHistory(
170 : historyCount: historyCount,
171 : direction: direction,
172 2 : onHistoryReceived: () {
173 2 : _collectHistoryUpdates = true;
174 : },
175 : );
176 : }
177 : }
178 : }
179 : } finally {
180 3 : _collectHistoryUpdates = false;
181 3 : isRequestingHistory = false;
182 4 : onUpdate?.call();
183 : }
184 : }
185 :
186 : /// Request more previous events from the server. [historyCount] defines how much events should
187 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
188 : /// the historical events will be published in the onEvent stream.
189 : /// Returns the actual count of received timeline events.
190 1 : Future<int> getRoomEvents({
191 : int historyCount = Room.defaultHistoryCount,
192 : direction = Direction.b,
193 : }) async {
194 3 : final resp = await room.client.getRoomEvents(
195 2 : room.id,
196 : direction,
197 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
198 : limit: historyCount,
199 3 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
200 : );
201 :
202 1 : if (resp.end == null) {
203 2 : Logs().w('We reached the end of the timeline');
204 : }
205 :
206 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
207 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
208 :
209 1 : final type = direction == Direction.b
210 : ? EventUpdateType.history
211 : : EventUpdateType.timeline;
212 :
213 3 : if ((resp.state?.length ?? 0) == 0 &&
214 3 : resp.start != resp.end &&
215 : newPrevBatch != null &&
216 : newNextBatch != null) {
217 1 : if (type == EventUpdateType.history) {
218 0 : Logs().w(
219 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch',
220 : );
221 : } else {
222 2 : Logs().w(
223 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch',
224 : );
225 : }
226 : }
227 :
228 : final newEvents =
229 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
230 :
231 1 : if (!allowNewEvent) {
232 3 : if (resp.start == resp.end ||
233 3 : (resp.end == null && direction == Direction.f)) allowNewEvent = true;
234 :
235 1 : if (allowNewEvent) {
236 2 : Logs().d('We now allow sync update into the timeline.');
237 1 : newEvents.addAll(
238 5 : await room.client.database?.getEventList(room, onlySending: true) ??
239 0 : [],
240 : );
241 : }
242 : }
243 :
244 : // Try to decrypt encrypted events but don't update the database.
245 2 : if (room.encrypted && room.client.encryptionEnabled) {
246 0 : for (var i = 0; i < newEvents.length; i++) {
247 0 : if (newEvents[i].type == EventTypes.Encrypted) {
248 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
249 0 : room.id,
250 0 : newEvents[i],
251 : );
252 : }
253 : }
254 : }
255 :
256 : // update chunk anchors
257 1 : if (type == EventUpdateType.history) {
258 0 : chunk.prevBatch = newPrevBatch ?? '';
259 :
260 0 : final offset = chunk.events.length;
261 :
262 0 : chunk.events.addAll(newEvents);
263 :
264 0 : for (var i = 0; i < newEvents.length; i++) {
265 0 : onInsert?.call(i + offset);
266 : }
267 : } else {
268 2 : chunk.nextBatch = newNextBatch ?? '';
269 4 : chunk.events.insertAll(0, newEvents.reversed);
270 :
271 3 : for (var i = 0; i < newEvents.length; i++) {
272 2 : onInsert?.call(i);
273 : }
274 : }
275 :
276 1 : if (onUpdate != null) {
277 2 : onUpdate!();
278 : }
279 2 : return resp.chunk.length;
280 : }
281 :
282 9 : Timeline({
283 : required this.room,
284 : this.onUpdate,
285 : this.onChange,
286 : this.onInsert,
287 : this.onRemove,
288 : this.onNewEvent,
289 : required this.chunk,
290 : }) {
291 63 : sub = room.client.onEvent.stream.listen(_handleEventUpdate);
292 :
293 : // If the timeline is limited we want to clear our events cache
294 45 : roomSub = room.client.onSync.stream
295 53 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
296 18 : .listen(_removeEventsNotInThisSync);
297 :
298 9 : sessionIdReceivedSub =
299 45 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
300 9 : cancelSendEventSub =
301 54 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
302 :
303 : // we want to populate our aggregated events
304 16 : for (final e in events) {
305 7 : addAggregatedEvent(e);
306 : }
307 :
308 : // we are using a fragmented timeline
309 27 : if (chunk.nextBatch != '') {
310 1 : allowNewEvent = false;
311 1 : isFragmentedTimeline = true;
312 : // fragmented timelines never read from the database.
313 1 : _fetchedAllDatabaseEvents = true;
314 : }
315 : }
316 :
317 4 : void _cleanUpCancelledEvent(String eventId) {
318 4 : final i = _findEvent(event_id: eventId);
319 12 : if (i < events.length) {
320 12 : removeAggregatedEvent(events[i]);
321 8 : events.removeAt(i);
322 6 : onRemove?.call(i);
323 6 : onUpdate?.call();
324 : }
325 : }
326 :
327 : /// Removes all entries from [events] which are not in this SyncUpdate.
328 2 : void _removeEventsNotInThisSync(SyncUpdate sync) {
329 15 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
330 4 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
331 7 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
332 : }
333 :
334 : /// Don't forget to call this before you dismiss this object!
335 0 : void cancelSubscriptions() {
336 : // ignore: discarded_futures
337 0 : sub?.cancel();
338 : // ignore: discarded_futures
339 0 : roomSub?.cancel();
340 : // ignore: discarded_futures
341 0 : sessionIdReceivedSub?.cancel();
342 : // ignore: discarded_futures
343 0 : cancelSendEventSub?.cancel();
344 : }
345 :
346 2 : void _sessionKeyReceived(String sessionId) async {
347 : var decryptAtLeastOneEvent = false;
348 2 : Future<void> decryptFn() async {
349 6 : final encryption = room.client.encryption;
350 6 : if (!room.client.encryptionEnabled || encryption == null) {
351 : return;
352 : }
353 7 : for (var i = 0; i < events.length; i++) {
354 4 : if (events[i].type == EventTypes.Encrypted &&
355 4 : events[i].messageType == MessageTypes.BadEncrypted &&
356 0 : events[i].content['session_id'] == sessionId) {
357 0 : events[i] = await encryption.decryptRoomEvent(
358 0 : room.id,
359 0 : events[i],
360 : store: true,
361 : updateType: EventUpdateType.history,
362 : );
363 0 : addAggregatedEvent(events[i]);
364 0 : onChange?.call(i);
365 0 : if (events[i].type != EventTypes.Encrypted) {
366 : decryptAtLeastOneEvent = true;
367 : }
368 : }
369 : }
370 : }
371 :
372 6 : if (room.client.database != null) {
373 8 : await room.client.database?.transaction(decryptFn);
374 : } else {
375 0 : await decryptFn();
376 : }
377 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
378 : }
379 :
380 : /// Request the keys for undecryptable events of this timeline
381 0 : void requestKeys({
382 : bool tryOnlineBackup = true,
383 : bool onlineKeyBackupOnly = true,
384 : }) {
385 0 : for (final event in events) {
386 0 : if (event.type == EventTypes.Encrypted &&
387 0 : event.messageType == MessageTypes.BadEncrypted &&
388 0 : event.content['can_request_session'] == true) {
389 0 : final sessionId = event.content.tryGet<String>('session_id');
390 0 : final senderKey = event.content.tryGet<String>('sender_key');
391 : if (sessionId != null && senderKey != null) {
392 0 : room.client.encryption?.keyManager.maybeAutoRequest(
393 0 : room.id,
394 : sessionId,
395 : senderKey,
396 : tryOnlineBackup: tryOnlineBackup,
397 : onlineKeyBackupOnly: onlineKeyBackupOnly,
398 : );
399 : }
400 : }
401 : }
402 : }
403 :
404 : /// Set the read marker to the last synced event in this timeline.
405 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
406 : eventId ??=
407 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
408 : if (eventId == null) return;
409 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
410 : }
411 :
412 7 : int _findEvent({String? event_id, String? unsigned_txid}) {
413 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
414 : // matches either the event_id or transaction_id of the existing event.
415 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
416 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
417 : // thus meaning we found our element.
418 : final searchNeedle = <String>{};
419 : if (event_id != null) {
420 7 : searchNeedle.add(event_id);
421 : }
422 : if (unsigned_txid != null) {
423 4 : searchNeedle.add(unsigned_txid);
424 : }
425 : int i;
426 28 : for (i = 0; i < events.length; i++) {
427 21 : final searchHaystack = <String>{events[i].eventId};
428 :
429 28 : final txnid = events[i].unsigned?.tryGet<String>('transaction_id');
430 : if (txnid != null) {
431 4 : searchHaystack.add(txnid);
432 : }
433 14 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
434 : break;
435 : }
436 : }
437 : return i;
438 : }
439 :
440 4 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
441 4 : eventSet.removeWhere(
442 4 : (e) =>
443 8 : e.matchesEventOrTransactionId(event.eventId) ||
444 4 : event.unsigned != null &&
445 4 : e.matchesEventOrTransactionId(
446 8 : event.unsigned?.tryGet<String>('transaction_id'),
447 : ),
448 : );
449 : }
450 :
451 9 : void addAggregatedEvent(Event event) {
452 : // we want to add an event to the aggregation tree
453 9 : final relationshipType = event.relationshipType;
454 9 : final relationshipEventId = event.relationshipEventId;
455 : if (relationshipType == null || relationshipEventId == null) {
456 : return; // nothing to do
457 : }
458 8 : final events = (aggregatedEvents[relationshipEventId] ??=
459 8 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
460 : // remove a potential old event
461 4 : _removeEventFromSet(events, event);
462 : // add the new one
463 4 : events.add(event);
464 4 : if (onChange != null) {
465 0 : final index = _findEvent(event_id: relationshipEventId);
466 0 : onChange?.call(index);
467 : }
468 : }
469 :
470 6 : void removeAggregatedEvent(Event event) {
471 18 : aggregatedEvents.remove(event.eventId);
472 6 : if (event.unsigned != null) {
473 24 : aggregatedEvents.remove(event.unsigned?['transaction_id']);
474 : }
475 16 : for (final types in aggregatedEvents.values) {
476 8 : for (final events in types.values) {
477 4 : _removeEventFromSet(events, event);
478 : }
479 : }
480 : }
481 :
482 7 : void _handleEventUpdate(EventUpdate eventUpdate, {bool update = true}) {
483 : try {
484 28 : if (eventUpdate.roomID != room.id) return;
485 :
486 14 : if (eventUpdate.type != EventUpdateType.timeline &&
487 12 : eventUpdate.type != EventUpdateType.history) {
488 : return;
489 : }
490 :
491 14 : if (eventUpdate.type == EventUpdateType.timeline) {
492 7 : onNewEvent?.call();
493 : }
494 :
495 7 : if (!allowNewEvent) return;
496 :
497 7 : final status = eventStatusFromInt(
498 14 : eventUpdate.content['status'] ??
499 15 : (eventUpdate.content['unsigned'] is Map<String, dynamic>
500 15 : ? eventUpdate.content['unsigned'][messageSendingStatusKey]
501 : : null) ??
502 4 : EventStatus.synced.intValue,
503 : );
504 :
505 7 : final i = _findEvent(
506 14 : event_id: eventUpdate.content['event_id'],
507 21 : unsigned_txid: eventUpdate.content['unsigned'] is Map
508 21 : ? eventUpdate.content['unsigned']['transaction_id']
509 : : null,
510 : );
511 :
512 21 : if (i < events.length) {
513 : // if the old status is larger than the new one, we also want to preserve the old status
514 21 : final oldStatus = events[i].status;
515 21 : events[i] = Event.fromJson(
516 7 : eventUpdate.content,
517 7 : room,
518 : );
519 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
520 14 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
521 11 : !(status.isError && oldStatus.isSending)) {
522 21 : events[i].status = oldStatus;
523 : }
524 21 : addAggregatedEvent(events[i]);
525 9 : onChange?.call(i);
526 : } else {
527 6 : final newEvent = Event.fromJson(
528 6 : eventUpdate.content,
529 6 : room,
530 : );
531 :
532 12 : if (eventUpdate.type == EventUpdateType.history &&
533 6 : events.indexWhere(
534 15 : (e) => e.eventId == eventUpdate.content['event_id'],
535 3 : ) !=
536 3 : -1) return;
537 12 : var index = events.length;
538 12 : if (eventUpdate.type == EventUpdateType.history) {
539 6 : events.add(newEvent);
540 : } else {
541 8 : index = events.firstIndexWhereNotError;
542 8 : events.insert(index, newEvent);
543 : }
544 10 : onInsert?.call(index);
545 :
546 6 : addAggregatedEvent(newEvent);
547 : }
548 :
549 : // Handle redaction events
550 21 : if (eventUpdate.content['type'] == EventTypes.Redaction) {
551 : final index =
552 9 : _findEvent(event_id: eventUpdate.content.tryGet<String>('redacts'));
553 9 : if (index < events.length) {
554 3 : removeAggregatedEvent(events[index]);
555 :
556 : // Is the redacted event a reaction? Then update the event this
557 : // belongs to:
558 1 : if (onChange != null) {
559 3 : final relationshipEventId = events[index].relationshipEventId;
560 : if (relationshipEventId != null) {
561 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
562 : return;
563 : }
564 : }
565 :
566 3 : events[index].setRedactionEvent(
567 1 : Event.fromJson(
568 1 : eventUpdate.content,
569 1 : room,
570 : ),
571 : );
572 2 : onChange?.call(index);
573 : }
574 : }
575 :
576 7 : if (update && !_collectHistoryUpdates) {
577 9 : onUpdate?.call();
578 : }
579 : } catch (e, s) {
580 0 : Logs().w('Handle event update failed', e, s);
581 : }
582 : }
583 :
584 0 : @Deprecated('Use [startSearch] instead.')
585 : Stream<List<Event>> searchEvent({
586 : String? searchTerm,
587 : int requestHistoryCount = 100,
588 : int maxHistoryRequests = 10,
589 : String? sinceEventId,
590 : int? limit,
591 : bool Function(Event)? searchFunc,
592 : }) =>
593 0 : startSearch(
594 : searchTerm: searchTerm,
595 : requestHistoryCount: requestHistoryCount,
596 : maxHistoryRequests: maxHistoryRequests,
597 : // ignore: deprecated_member_use_from_same_package
598 : sinceEventId: sinceEventId,
599 : limit: limit,
600 : searchFunc: searchFunc,
601 0 : ).map((result) => result.$1);
602 :
603 : /// Searches [searchTerm] in this timeline. It first searches in the
604 : /// cache, then in the database and then on the server. The search can
605 : /// take a while, which is why this returns a stream so the already found
606 : /// events can already be displayed.
607 : /// Override the [searchFunc] if you need another search. This will then
608 : /// ignore [searchTerm].
609 : /// Returns the List of Events and the next prevBatch at the end of the
610 : /// search.
611 0 : Stream<(List<Event>, String?)> startSearch({
612 : String? searchTerm,
613 : int requestHistoryCount = 100,
614 : int maxHistoryRequests = 10,
615 : String? prevBatch,
616 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
617 : int? limit,
618 : bool Function(Event)? searchFunc,
619 : }) async* {
620 0 : assert(searchTerm != null || searchFunc != null);
621 0 : searchFunc ??= (event) =>
622 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
623 0 : final found = <Event>[];
624 :
625 : if (sinceEventId == null) {
626 : // Search locally
627 0 : for (final event in events) {
628 0 : if (searchFunc(event)) {
629 0 : yield (found..add(event), null);
630 : }
631 : }
632 :
633 : // Search in database
634 0 : var start = events.length;
635 : while (true) {
636 0 : final eventsFromStore = await room.client.database?.getEventList(
637 0 : room,
638 : start: start,
639 : limit: requestHistoryCount,
640 : ) ??
641 0 : [];
642 0 : if (eventsFromStore.isEmpty) break;
643 0 : start += eventsFromStore.length;
644 0 : for (final event in eventsFromStore) {
645 0 : if (searchFunc(event)) {
646 0 : yield (found..add(event), null);
647 : }
648 : }
649 : }
650 : }
651 :
652 : // Search on the server
653 0 : prevBatch ??= room.prev_batch;
654 : if (sinceEventId != null) {
655 : prevBatch =
656 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
657 : }
658 0 : final encryption = room.client.encryption;
659 0 : for (var i = 0; i < maxHistoryRequests; i++) {
660 : if (prevBatch == null) break;
661 0 : if (limit != null && found.length >= limit) break;
662 : try {
663 0 : final resp = await room.client.getRoomEvents(
664 0 : room.id,
665 : Direction.b,
666 : from: prevBatch,
667 : limit: requestHistoryCount,
668 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
669 : );
670 0 : for (final matrixEvent in resp.chunk) {
671 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
672 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
673 0 : event = await encryption.decryptRoomEvent(room.id, event);
674 0 : if (event.type == EventTypes.Encrypted &&
675 0 : event.messageType == MessageTypes.BadEncrypted &&
676 0 : event.content['can_request_session'] == true) {
677 : // Await requestKey() here to ensure decrypted message bodies
678 0 : await event.requestKey();
679 : }
680 : }
681 0 : if (searchFunc(event)) {
682 0 : yield (found..add(event), resp.end);
683 0 : if (limit != null && found.length >= limit) break;
684 : }
685 : }
686 0 : prevBatch = resp.end;
687 : // We are at the beginning of the room
688 0 : if (resp.chunk.length < requestHistoryCount) break;
689 0 : } on MatrixException catch (e) {
690 : // We have no permission anymore to request the history
691 0 : if (e.error == MatrixError.M_FORBIDDEN) {
692 : break;
693 : }
694 : rethrow;
695 : }
696 : }
697 : return;
698 : }
699 : }
700 :
701 : extension on List<Event> {
702 4 : int get firstIndexWhereNotError {
703 4 : if (isEmpty) return 0;
704 16 : final index = indexWhere((event) => !event.status.isError);
705 9 : if (index == -1) return length;
706 : return index;
707 : }
708 : }
|