Developing a chat application #2 (Server)

 

Generating the GRPC code


To generate GRPC code, you need to first install the grpc, and grpc go compiler. This guide will walk you through the installation.
After that you need generate the gRPC code from the Proto file. 

The command below generates the Go code under the ./gen directory. The complete command can be found here
protoc -I=./proto --go_out=./gen --go_opt=paths=source_relative ./proto/chat.proto --go-grpc_out=./gen --go-grpc_opt=paths=source_relative


Implementing the Stub

When the code is generated, you need to implement the gRPC stub functions.
First stub we will implement is: 

AddUser(ctx context.Context, in *pb.AddUserRequest) (*pb.AddUserResponse, error)

This method is called when the client starts a new instance and adds himself to the application. 



figure.1
Figure.1



The figure.1 shows that when the user adds himself to the server, it replies back with the user information.

The implementation of this method is very simple. You only need to add the newly created user into a map[string]*User. The key corresponds to the UUID client generates.

When the user is created, you need to create the conversation between two different users. 

func (s *ChatServer) CreateConversation(ctx context.Context,
req *pb.CreateConversationRequest) (*pb.CreateConversationResponse, error)



CreateConversation(), will do exactly that. We want to have a level of conversation because it creates a new abstraction to add "Group Chat" feature. The creation of Conversation is the same as User. Therefore I won't go deep into this here. The other methods are also straight forward to implement which you can find here

The most interesting stub to implement is the GetMessage(). Because it's a stream stub. Which has to update the user's with new messages. 

Figure.2


Figure.2 shows the workflow of the SendMessage and GetMessage. The basic idea is, when the clients sends a message with SendMessage() function, we would like to publish to particular conversation with the Watermills publish method. And in the GetMessage() function, we subscribe to that particular conversation. When there is an update to that certain conversation, we will notify the user through the stream.

Publishing of the conversation in SendMessage()

payload, err := proto.Marshal(updatedConv)
if err != nil {
logging.Logger.Sugar().Errorf("Error marshalling conversation for Watermill: %v", err)
} else {
watermillMsg := message.NewMessage(uuid.NewString(), payload)
topic := "updated_convo_" + updatedConv.Id
if err := s.publisher.Publish(topic, watermillMsg); err != nil {
logging.Logger.Sugar().Info("Error publishing conversation update to Watermill topic %s: %v", topic, err)
} else {
logging.Logger.Sugar().Info("Published update for conversation %s to topic %s", updatedConv.Id, topic)
}
}

Then subscription of the conversation in GetMessages()

topic := "updated_convo_" + convoID
messages, err := s.publisher.Subscribe(ctx, topic)
if err != nil {
log.Fatal(err)
}

msgChannel := make(chan []*pb.Message)

go func() {
logging.Logger.Sugar().Infof("Subscriber started, listening on topic %q", topic)
for msg := range messages {
logging.Logger.Sugar().Infof("Subscriber received message: %s, payload: %s", msg.UUID, string(msg.Payload))
receivedConv := &pb.Conversation{}
err := proto.Unmarshal(msg.Payload, receivedConv)
if err != nil {
logging.Logger.Sugar().Error(err)
} else {
message := s.messages[receivedConv.Id]
msgChannel <- message
logging.Logger.Sugar().Info(message)
}
msg.Ack()
}
logging.Logger.Info("Subscriber goroutine finished")
}()


You can see that the subscribed topic can only be caught in go-routine. It means we need to have another channel which has to wait for the update. This can be accomplished by the msgChannel in the main thread.

for {
select {
case messages := <-msgChannel:
start := req.Offset
end := int64(len(messages))
// TODO: After integrating DB and encryption, add paging
// end := start + req.Limit
// if int(end) > len(messages) {
// end = int64(len(messages))
// }
message := pb.GetMessagesResponse{Messages: messages[start:end]}
err := stream.Send(&message)
if err != nil {
return err
}
case <-ctx.Done():
logging.Logger.Sugar().Infof("gRPC stream context cancelled for convo %s: %v", convoID, ctx.Err())
return ctx.Err()
}

time.Sleep(1000 * time.Millisecond)
}

When the msgChannel is updated, then we know that there was a conversation update. The sleep method is not necessary but it is there just to save some CPU usage.

Comments