feat(chat-service): add pending messages repository

This commit is contained in:
Kiril Tijsma 2025-03-07 14:49:44 +01:00 committed by FlutterJoey
parent 02ae2aa884
commit 61b588cfd5
5 changed files with 243 additions and 14 deletions

View file

@ -0,0 +1,38 @@
import "package:chat_repository_interface/src/models/message_model.dart";
/// The pending chat messages repository interface
/// Implement this interface to create a pending chat
/// messages repository with a given data source.
abstract class PendingMessageRepositoryInterface {
/// Get the messages for the given [chatId].
/// Returns a list of [MessageModel] stream.
/// [userId] is the user id.
/// [chatId] is the chat id.
/// Returns a list of [MessageModel] stream.
Stream<List<MessageModel>> getMessages({
required String chatId,
required String userId,
});
/// Create a message in the pending messages and return the created message.
/// [chatId] is the chat id.
/// [senderId] is the sender id.
/// [text] is the message text.
/// [imageUrl] is the image url.
Future<MessageModel> createMessage({
required String chatId,
required String senderId,
required String messageId,
String? text,
String? imageUrl,
String? messageType,
DateTime? timestamp,
});
/// Mark a message as being succesfully sent to the server,
/// so that it can be removed from this data source.
Future<void> markMessageSent({
required String chatId,
required String messageId,
});
}

View file

@ -11,6 +11,9 @@ final List<ChatModel> chats = [];
/// All the messages of the local memory database mapped by chat id /// All the messages of the local memory database mapped by chat id
final Map<String, List<MessageModel>> chatMessages = {}; final Map<String, List<MessageModel>> chatMessages = {};
/// All the pending messages of the local memory database mapped by chat id
final Map<String, List<MessageModel>> pendingChatMessages = {};
/// All the users of the local memory database /// All the users of the local memory database
final List<UserModel> users = [ final List<UserModel> users = [
const UserModel( const UserModel(

View file

@ -0,0 +1,102 @@
import "dart:async";
import "dart:math" as math;
import "package:chat_repository_interface/chat_repository_interface.dart";
import "package:chat_repository_interface/src/interfaces/pending_message_repository_interface.dart";
import "package:chat_repository_interface/src/local/local_memory_db.dart";
import "package:collection/collection.dart";
import "package:rxdart/rxdart.dart";
/// The local pending message repository
class LocalPendingMessageRepository
implements PendingMessageRepositoryInterface {
/// The local pending message repository constructor
LocalPendingMessageRepository();
final StreamController<List<MessageModel>> _messageController =
BehaviorSubject<List<MessageModel>>();
final Map<String, int> _startIndexMap = {};
final Map<String, int> _endIndexMap = {};
@override
Stream<List<MessageModel>> getMessages({
required String chatId,
required String userId,
}) {
var foundChat =
chats.firstWhereOrNull((chatModel) => chatModel.id == chatId);
if (foundChat == null) {
_messageController.add([]);
} else {
var allMessages = List<MessageModel>.from(
pendingChatMessages[chatId] ?? [],
);
allMessages.sort((a, b) => a.timestamp.compareTo(b.timestamp));
_startIndexMap[chatId] ??= math.max(0, allMessages.length - chunkSize);
_endIndexMap[chatId] ??= allMessages.length;
var displayedMessages = allMessages.sublist(
_startIndexMap[chatId]!,
_endIndexMap[chatId],
);
_messageController.add(displayedMessages);
}
return _messageController.stream;
}
Future<void> _chatExists(String chatId) async {
var chat = chats.firstWhereOrNull((e) => e.id == chatId);
if (chat == null) throw Exception("Chat not found");
}
@override
Future<MessageModel> createMessage({
required String chatId,
required String senderId,
required String messageId,
String? text,
String? imageUrl,
String? messageType,
DateTime? timestamp,
}) async {
var message = MessageModel(
chatId: chatId,
id: messageId,
timestamp: timestamp ?? DateTime.now(),
text: text,
messageType: messageType,
senderId: senderId,
imageUrl: imageUrl,
status: MessageStatus.sending,
);
await _chatExists(chatId);
var messages = List<MessageModel>.from(pendingChatMessages[chatId] ?? []);
messages.add(message);
pendingChatMessages[chatId] = messages;
_messageController.add(pendingChatMessages[chatId] ?? []);
return message;
}
@override
Future<void> markMessageSent({
required String chatId,
required String messageId,
}) async {
await _chatExists(chatId);
var messages = List<MessageModel>.from(pendingChatMessages[chatId] ?? []);
MessageModel markSent(MessageModel message) =>
(message.id == messageId) ? message.markSent() : message;
pendingChatMessages[chatId] = messages.map(markSent).toList();
}
}

View file

@ -1,3 +1,24 @@
/// Message status enumeration
enum MessageStatus {
/// Status when a message has not yet been received by the server.
sending,
/// Status used when a message has been received by the server.
sent;
/// Attempt to parse [MessageStatus] from String
static MessageStatus? tryParse(String name) =>
MessageStatus.values.where((status) => status.name == name).firstOrNull;
/// Parse [MessageStatus] from String
/// or throw a [FormatException]
static MessageStatus parse(String name) =>
tryParse(name) ??
(throw const FormatException(
"MessageStatus with that name does not exist",
));
}
/// Message model /// Message model
/// Represents a message in a chat /// Represents a message in a chat
/// [id] is the message id. /// [id] is the message id.
@ -15,6 +36,7 @@ class MessageModel {
required this.imageUrl, required this.imageUrl,
required this.timestamp, required this.timestamp,
required this.senderId, required this.senderId,
this.status = MessageStatus.sent,
}); });
/// Creates a message model instance given a map instance /// Creates a message model instance given a map instance
@ -27,6 +49,7 @@ class MessageModel {
imageUrl: map["imageUrl"], imageUrl: map["imageUrl"],
timestamp: DateTime.fromMillisecondsSinceEpoch(map["timestamp"]), timestamp: DateTime.fromMillisecondsSinceEpoch(map["timestamp"]),
senderId: map["senderId"], senderId: map["senderId"],
status: MessageStatus.tryParse(map["status"]) ?? MessageStatus.sent,
); );
/// The chat id /// The chat id
@ -50,6 +73,9 @@ class MessageModel {
/// The sender id /// The sender id
final String senderId; final String senderId;
/// The message status
final MessageStatus status;
/// The message model copy with method /// The message model copy with method
MessageModel copyWith({ MessageModel copyWith({
String? chatId, String? chatId,
@ -59,6 +85,7 @@ class MessageModel {
String? imageUrl, String? imageUrl,
DateTime? timestamp, DateTime? timestamp,
String? senderId, String? senderId,
MessageStatus? status,
}) => }) =>
MessageModel( MessageModel(
chatId: chatId ?? this.chatId, chatId: chatId ?? this.chatId,
@ -68,6 +95,7 @@ class MessageModel {
imageUrl: imageUrl ?? this.imageUrl, imageUrl: imageUrl ?? this.imageUrl,
timestamp: timestamp ?? this.timestamp, timestamp: timestamp ?? this.timestamp,
senderId: senderId ?? this.senderId, senderId: senderId ?? this.senderId,
status: status ?? this.status,
); );
/// Creates a map representation of this object /// Creates a map representation of this object
@ -78,7 +106,11 @@ class MessageModel {
"imageUrl": imageUrl, "imageUrl": imageUrl,
"timestamp": timestamp.millisecondsSinceEpoch, "timestamp": timestamp.millisecondsSinceEpoch,
"senderId": senderId, "senderId": senderId,
"status": status.name,
}; };
/// marks the message model as sent
MessageModel markSent() => copyWith(status: MessageStatus.sent);
} }
/// Extension on [MessageModel] to check the message type /// Extension on [MessageModel] to check the message type

View file

@ -2,13 +2,16 @@ import "dart:async";
import "dart:typed_data"; import "dart:typed_data";
import "package:chat_repository_interface/src/interfaces/chat_repostory_interface.dart"; import "package:chat_repository_interface/src/interfaces/chat_repostory_interface.dart";
import "package:chat_repository_interface/src/interfaces/pending_message_repository_interface.dart";
import "package:chat_repository_interface/src/interfaces/user_repository_interface.dart"; import "package:chat_repository_interface/src/interfaces/user_repository_interface.dart";
import "package:chat_repository_interface/src/local/local_chat_repository.dart"; import "package:chat_repository_interface/src/local/local_chat_repository.dart";
import "package:chat_repository_interface/src/local/local_pending_message_repository.dart.dart";
import "package:chat_repository_interface/src/local/local_user_repository.dart"; import "package:chat_repository_interface/src/local/local_user_repository.dart";
import "package:chat_repository_interface/src/models/chat_model.dart"; import "package:chat_repository_interface/src/models/chat_model.dart";
import "package:chat_repository_interface/src/models/message_model.dart"; import "package:chat_repository_interface/src/models/message_model.dart";
import "package:chat_repository_interface/src/models/user_model.dart"; import "package:chat_repository_interface/src/models/user_model.dart";
import "package:collection/collection.dart"; import "package:collection/collection.dart";
import "package:rxdart/rxdart.dart";
/// The chat service /// The chat service
/// Use this service to interact with the chat repository. /// Use this service to interact with the chat repository.
@ -18,8 +21,11 @@ class ChatService {
ChatService({ ChatService({
required this.userId, required this.userId,
ChatRepositoryInterface? chatRepository, ChatRepositoryInterface? chatRepository,
PendingMessageRepositoryInterface? pendingMessageRepository,
UserRepositoryInterface? userRepository, UserRepositoryInterface? userRepository,
}) : chatRepository = chatRepository ?? LocalChatRepository(), }) : chatRepository = chatRepository ?? LocalChatRepository(),
pendingMessageRepository =
pendingMessageRepository ?? LocalPendingMessageRepository(),
userRepository = userRepository ?? LocalUserRepository(); userRepository = userRepository ?? LocalUserRepository();
/// The user ID of the person currently looking at the chat /// The user ID of the person currently looking at the chat
@ -28,6 +34,9 @@ class ChatService {
/// The chat repository /// The chat repository
final ChatRepositoryInterface chatRepository; final ChatRepositoryInterface chatRepository;
/// The pending messages repository
final PendingMessageRepositoryInterface pendingMessageRepository;
/// The user repository /// The user repository
final UserRepositoryInterface userRepository; final UserRepositoryInterface userRepository;
@ -135,11 +144,32 @@ class ChatService {
/// Returns a list of [MessageModel] stream. /// Returns a list of [MessageModel] stream.
Stream<List<MessageModel>?> getMessages({ Stream<List<MessageModel>?> getMessages({
required String chatId, required String chatId,
}) => }) {
chatRepository.getMessages( List<MessageModel> mergePendingMessages(
userId: userId, List<MessageModel> messages,
chatId: chatId, List<MessageModel> pendingMessages,
); ) =>
{
...Map.fromEntries(
pendingMessages.map((message) => MapEntry(message.id, message)),
),
...Map.fromEntries(
messages.map((message) => MapEntry(message.id, message)),
),
}.values.toList();
return Rx.combineLatest2(
chatRepository.getMessages(userId: userId, chatId: chatId),
pendingMessageRepository.getMessages(userId: userId, chatId: chatId),
(chatMessages, pendingChatMessages) {
// TODO(Quirille): This is because chatRepository.getMessages
// might return null, when really it should've just thrown
// an exception instead.
if (chatMessages == null) return null;
return mergePendingMessages(chatMessages, pendingChatMessages);
},
);
}
/// Signals that new messages should be loaded after the given message. /// Signals that new messages should be loaded after the given message.
/// The stream should emit the new messages. /// The stream should emit the new messages.
@ -173,15 +203,39 @@ class ChatService {
String? text, String? text,
String? messageType, String? messageType,
String? imageUrl, String? imageUrl,
}) => }) async {
chatRepository.sendMessage( await pendingMessageRepository.createMessage(
chatId: chatId, chatId: chatId,
messageId: messageId, senderId: senderId,
text: text, messageId: messageId,
messageType: messageType, text: text,
senderId: senderId, messageType: messageType,
imageUrl: imageUrl, imageUrl: imageUrl,
); );
unawaited(
chatRepository
.sendMessage(
chatId: chatId,
messageId: messageId,
text: text,
messageType: messageType,
senderId: senderId,
imageUrl: imageUrl,
)
.then(
(_) => pendingMessageRepository.markMessageSent(
chatId: chatId,
messageId: messageId,
),
)
.onError(
(e, s) {
// TODO(Quirille): handle exception when message sending has failed.
},
),
);
}
/// Delete the chat with the given parameters. /// Delete the chat with the given parameters.
/// [chatId] is the chat id. /// [chatId] is the chat id.