Developing a chat application #3 (Memory store)

Now that the basic server is set up, we need to decide where to store user data. I want to design a flexible and extensible storage layer that allows developers to choose different backends — whether in-memory, Postgres, or even block storage like S3 or Minio.

Designing an Extensible Storage Layer

To support pluggable storage backends, we'll define an interface that developers can implement based on their needs.

I'll create an interface called DataStoreHandler. This interface will define the methods that must be implemented by any storage backend:

type DataStoreHandler interface {
AddUser(in *pb.User) error
SetConversation(userId string, conversation *pb.Conversation) error
GetConversation(userId string) (*pb.Conversation, error)
GetMessages(convId string) ([]*pb.Message, error)
GetLastMessage(convId string) (*pb.Message, error)
SendMessage(message *pb.Message, convId string) error
SetPublicKey(userId string, publicKey string) error
GetPublicKey(userId string) (string, error)
GetAvailableUsers() []*pb.User
}

Implementing the In-Memory Datastore

Let’s implement this interface using an in-memory storage backend:


type MemoryStoreHandler struct {
mu sync.Mutex
messages map[string][]*pb.Message
users []*pb.User
publicKeys map[string]string
conversations map[string]*pb.Conversation
}

var _ DataStoreHandler = (*MemoryStoreHandler)(nil)

This in-memory datastore holds user, message, public key, and conversation data. The line:

var _ DataStoreHandler = (*MemoryStoreHandler)(nil)

is a compile-time assertion that ensures MemoryStoreHandler correctly implements the DataStoreHandler interface.

func NewMemoryStore() *MemoryStoreHandler {
return &MemoryStoreHandler{
messages: make(map[string][]*pb.Message),
users: make([]*pb.User, 0),
publicKeys: make(map[string]string),
conversations: make(map[string]*pb.Conversation),
}
}

We initialize the memory store without pre-allocating memory. Later, we might consider adding memory limits

func (m *MemoryStoreHandler) GetAvailableUsers() []*pb.User {
availableUsers := make([]*pb.User, 0)
for _, user := range m.users {
if user.Available {
availableUsers = append(availableUsers, user)
}
}
return availableUsers
}

GetAvailableUsers() returns a list of users who are marked as available.

func (m *MemoryStoreHandler) SetPublicKey(userid string, publicKey string)
error {
m.mu.Lock()
defer m.mu.Unlock()
m.publicKeys[userid] = publicKey
// TODO: check the memory limit
return nil
}

Since we will implement end-to-end encryption, each user will require a public key. These are stored in a map[string]string, where the key is the user's UUID.

func (m *MemoryStoreHandler) GetPublicKey(userid string) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
if k, ok := m.publicKeys[userid]; ok {
return k, nil
}
return "", fmt.Errorf("user: %s does not have public key exist", userid)
}

then getting of public key is simply checking if the key exists in the map, if it does, then retrieve it.

The rest of the Setter/Getter methods will follow the same pattern.

GetConversation/SetConversation

func (m *MemoryStoreHandler) SetConversation(convId string, conversation
*pb.Conversation) error {
m.mu.Lock()
defer m.mu.Unlock()
m.conversations[convId] = conversation
// TODO: check the memory limit
return nil
}

func (m *MemoryStoreHandler) GetConversation(convId string) (*pb.Conversation,
error) {
if m, ok := m.conversations[convId]; ok {
return m, nil
}
return nil, fmt.Errorf("convId: %s does not belong to any conversation",
convId)
}

GetMessage/SendMessage


func (m *MemoryStoreHandler) GetMessages(convId string) ([]*pb.Message, error) {
if m, ok := m.messages[convId]; ok {
return m, nil
}
return nil, fmt.Errorf("conv:%s does not have message", convId)
}

func (m *MemoryStoreHandler) SendMessage(message *pb.Message, convId string) error {
m.mu.Lock()
defer m.mu.Unlock()
m.messages[convId] = append(m.messages[convId], message)
// TODO: check the memory limit
return nil
}


Not Yet Implemented: GetLastMessage

We haven’t implemented GetLastMessage yet. This method is intended to help optimize streaming and will be added later.

Comments