diff --git a/packages/chat_repository_interface/lib/src/interfaces/pending_message_repository_interface.dart b/packages/chat_repository_interface/lib/src/interfaces/pending_message_repository_interface.dart new file mode 100644 index 0000000..cddbd49 --- /dev/null +++ b/packages/chat_repository_interface/lib/src/interfaces/pending_message_repository_interface.dart @@ -0,0 +1,38 @@ +import "package:chat_repository_interface/src/models/message_model.dart"; + +/// The pending chat messages repository interface +/// Implement this interface to create a pending chat +/// messages repository with a given data source. +abstract class PendingMessageRepositoryInterface { + /// Get the messages for the given [chatId]. + /// Returns a list of [MessageModel] stream. + /// [userId] is the user id. + /// [chatId] is the chat id. + /// Returns a list of [MessageModel] stream. + Stream> getMessages({ + required String chatId, + required String userId, + }); + + /// Create a message in the pending messages and return the created message. + /// [chatId] is the chat id. + /// [senderId] is the sender id. + /// [text] is the message text. + /// [imageUrl] is the image url. + Future createMessage({ + required String chatId, + required String senderId, + required String messageId, + String? text, + String? imageUrl, + String? messageType, + DateTime? timestamp, + }); + + /// Mark a message as being succesfully sent to the server, + /// so that it can be removed from this data source. + Future markMessageSent({ + required String chatId, + required String messageId, + }); +} diff --git a/packages/chat_repository_interface/lib/src/local/local_memory_db.dart b/packages/chat_repository_interface/lib/src/local/local_memory_db.dart index 221794f..db6658c 100644 --- a/packages/chat_repository_interface/lib/src/local/local_memory_db.dart +++ b/packages/chat_repository_interface/lib/src/local/local_memory_db.dart @@ -11,6 +11,9 @@ final List chats = []; /// All the messages of the local memory database mapped by chat id final Map> chatMessages = {}; +/// All the pending messages of the local memory database mapped by chat id +final Map> pendingChatMessages = {}; + /// All the users of the local memory database final List users = [ const UserModel( diff --git a/packages/chat_repository_interface/lib/src/local/local_pending_message_repository.dart.dart b/packages/chat_repository_interface/lib/src/local/local_pending_message_repository.dart.dart new file mode 100644 index 0000000..5f843f4 --- /dev/null +++ b/packages/chat_repository_interface/lib/src/local/local_pending_message_repository.dart.dart @@ -0,0 +1,102 @@ +import "dart:async"; +import "dart:math" as math; + +import "package:chat_repository_interface/chat_repository_interface.dart"; +import "package:chat_repository_interface/src/interfaces/pending_message_repository_interface.dart"; +import "package:chat_repository_interface/src/local/local_memory_db.dart"; +import "package:collection/collection.dart"; +import "package:rxdart/rxdart.dart"; + +/// The local pending message repository +class LocalPendingMessageRepository + implements PendingMessageRepositoryInterface { + /// The local pending message repository constructor + LocalPendingMessageRepository(); + + final StreamController> _messageController = + BehaviorSubject>(); + + final Map _startIndexMap = {}; + final Map _endIndexMap = {}; + + @override + Stream> getMessages({ + required String chatId, + required String userId, + }) { + var foundChat = + chats.firstWhereOrNull((chatModel) => chatModel.id == chatId); + + if (foundChat == null) { + _messageController.add([]); + } else { + var allMessages = List.from( + pendingChatMessages[chatId] ?? [], + ); + allMessages.sort((a, b) => a.timestamp.compareTo(b.timestamp)); + + _startIndexMap[chatId] ??= math.max(0, allMessages.length - chunkSize); + _endIndexMap[chatId] ??= allMessages.length; + + var displayedMessages = allMessages.sublist( + _startIndexMap[chatId]!, + _endIndexMap[chatId], + ); + _messageController.add(displayedMessages); + } + + return _messageController.stream; + } + + Future _chatExists(String chatId) async { + var chat = chats.firstWhereOrNull((e) => e.id == chatId); + if (chat == null) throw Exception("Chat not found"); + } + + @override + Future createMessage({ + required String chatId, + required String senderId, + required String messageId, + String? text, + String? imageUrl, + String? messageType, + DateTime? timestamp, + }) async { + var message = MessageModel( + chatId: chatId, + id: messageId, + timestamp: timestamp ?? DateTime.now(), + text: text, + messageType: messageType, + senderId: senderId, + imageUrl: imageUrl, + status: MessageStatus.sending, + ); + + await _chatExists(chatId); + + var messages = List.from(pendingChatMessages[chatId] ?? []); + messages.add(message); + + pendingChatMessages[chatId] = messages; + + _messageController.add(pendingChatMessages[chatId] ?? []); + + return message; + } + + @override + Future markMessageSent({ + required String chatId, + required String messageId, + }) async { + await _chatExists(chatId); + var messages = List.from(pendingChatMessages[chatId] ?? []); + + MessageModel markSent(MessageModel message) => + (message.id == messageId) ? message.markSent() : message; + + pendingChatMessages[chatId] = messages.map(markSent).toList(); + } +} diff --git a/packages/chat_repository_interface/lib/src/models/message_model.dart b/packages/chat_repository_interface/lib/src/models/message_model.dart index 0bdb532..a48be76 100644 --- a/packages/chat_repository_interface/lib/src/models/message_model.dart +++ b/packages/chat_repository_interface/lib/src/models/message_model.dart @@ -1,3 +1,24 @@ +/// Message status enumeration +enum MessageStatus { + /// Status when a message has not yet been received by the server. + sending, + + /// Status used when a message has been received by the server. + sent; + + /// Attempt to parse [MessageStatus] from String + static MessageStatus? tryParse(String name) => + MessageStatus.values.where((status) => status.name == name).firstOrNull; + + /// Parse [MessageStatus] from String + /// or throw a [FormatException] + static MessageStatus parse(String name) => + tryParse(name) ?? + (throw const FormatException( + "MessageStatus with that name does not exist", + )); +} + /// Message model /// Represents a message in a chat /// [id] is the message id. @@ -15,6 +36,7 @@ class MessageModel { required this.imageUrl, required this.timestamp, required this.senderId, + this.status = MessageStatus.sent, }); /// Creates a message model instance given a map instance @@ -27,6 +49,7 @@ class MessageModel { imageUrl: map["imageUrl"], timestamp: DateTime.fromMillisecondsSinceEpoch(map["timestamp"]), senderId: map["senderId"], + status: MessageStatus.tryParse(map["status"]) ?? MessageStatus.sent, ); /// The chat id @@ -50,6 +73,9 @@ class MessageModel { /// The sender id final String senderId; + /// The message status + final MessageStatus status; + /// The message model copy with method MessageModel copyWith({ String? chatId, @@ -59,6 +85,7 @@ class MessageModel { String? imageUrl, DateTime? timestamp, String? senderId, + MessageStatus? status, }) => MessageModel( chatId: chatId ?? this.chatId, @@ -68,6 +95,7 @@ class MessageModel { imageUrl: imageUrl ?? this.imageUrl, timestamp: timestamp ?? this.timestamp, senderId: senderId ?? this.senderId, + status: status ?? this.status, ); /// Creates a map representation of this object @@ -78,7 +106,11 @@ class MessageModel { "imageUrl": imageUrl, "timestamp": timestamp.millisecondsSinceEpoch, "senderId": senderId, + "status": status.name, }; + + /// marks the message model as sent + MessageModel markSent() => copyWith(status: MessageStatus.sent); } /// Extension on [MessageModel] to check the message type diff --git a/packages/chat_repository_interface/lib/src/services/chat_service.dart b/packages/chat_repository_interface/lib/src/services/chat_service.dart index ebb5705..1112388 100644 --- a/packages/chat_repository_interface/lib/src/services/chat_service.dart +++ b/packages/chat_repository_interface/lib/src/services/chat_service.dart @@ -2,13 +2,16 @@ import "dart:async"; import "dart:typed_data"; import "package:chat_repository_interface/src/interfaces/chat_repostory_interface.dart"; +import "package:chat_repository_interface/src/interfaces/pending_message_repository_interface.dart"; import "package:chat_repository_interface/src/interfaces/user_repository_interface.dart"; import "package:chat_repository_interface/src/local/local_chat_repository.dart"; +import "package:chat_repository_interface/src/local/local_pending_message_repository.dart.dart"; import "package:chat_repository_interface/src/local/local_user_repository.dart"; import "package:chat_repository_interface/src/models/chat_model.dart"; import "package:chat_repository_interface/src/models/message_model.dart"; import "package:chat_repository_interface/src/models/user_model.dart"; import "package:collection/collection.dart"; +import "package:rxdart/rxdart.dart"; /// The chat service /// Use this service to interact with the chat repository. @@ -18,8 +21,11 @@ class ChatService { ChatService({ required this.userId, ChatRepositoryInterface? chatRepository, + PendingMessageRepositoryInterface? pendingMessageRepository, UserRepositoryInterface? userRepository, }) : chatRepository = chatRepository ?? LocalChatRepository(), + pendingMessageRepository = + pendingMessageRepository ?? LocalPendingMessageRepository(), userRepository = userRepository ?? LocalUserRepository(); /// The user ID of the person currently looking at the chat @@ -28,6 +34,9 @@ class ChatService { /// The chat repository final ChatRepositoryInterface chatRepository; + /// The pending messages repository + final PendingMessageRepositoryInterface pendingMessageRepository; + /// The user repository final UserRepositoryInterface userRepository; @@ -135,11 +144,32 @@ class ChatService { /// Returns a list of [MessageModel] stream. Stream?> getMessages({ required String chatId, - }) => - chatRepository.getMessages( - userId: userId, - chatId: chatId, - ); + }) { + List mergePendingMessages( + List messages, + List pendingMessages, + ) => + { + ...Map.fromEntries( + pendingMessages.map((message) => MapEntry(message.id, message)), + ), + ...Map.fromEntries( + messages.map((message) => MapEntry(message.id, message)), + ), + }.values.toList(); + + return Rx.combineLatest2( + chatRepository.getMessages(userId: userId, chatId: chatId), + pendingMessageRepository.getMessages(userId: userId, chatId: chatId), + (chatMessages, pendingChatMessages) { + // TODO(Quirille): This is because chatRepository.getMessages + // might return null, when really it should've just thrown + // an exception instead. + if (chatMessages == null) return null; + return mergePendingMessages(chatMessages, pendingChatMessages); + }, + ); + } /// Signals that new messages should be loaded after the given message. /// The stream should emit the new messages. @@ -173,15 +203,39 @@ class ChatService { String? text, String? messageType, String? imageUrl, - }) => - chatRepository.sendMessage( - chatId: chatId, - messageId: messageId, - text: text, - messageType: messageType, - senderId: senderId, - imageUrl: imageUrl, - ); + }) async { + await pendingMessageRepository.createMessage( + chatId: chatId, + senderId: senderId, + messageId: messageId, + text: text, + messageType: messageType, + imageUrl: imageUrl, + ); + + unawaited( + chatRepository + .sendMessage( + chatId: chatId, + messageId: messageId, + text: text, + messageType: messageType, + senderId: senderId, + imageUrl: imageUrl, + ) + .then( + (_) => pendingMessageRepository.markMessageSent( + chatId: chatId, + messageId: messageId, + ), + ) + .onError( + (e, s) { + // TODO(Quirille): handle exception when message sending has failed. + }, + ), + ); + } /// Delete the chat with the given parameters. /// [chatId] is the chat id.