Implement Chat Stream Service

Open chatservice/src/main/scala/chatroom/grpc/ChatStreamServiceImpl.scala. ChatStreamServiceImpl.chat(…) is the bidirectional stream stub we need to implement.

On the server side, we’ll need to listen to incoming streamed messages. To do this, the server needs to return a StreamObserver, to listen to incoming messages:

override def chat(responseObserver: StreamObserver[ChatMessageFromServer]): StreamObserver[ChatMessage] = {
  val username = Constant.USER_ID_CTX_KEY.get()
  logger.info(s"processing chat from $username")

  new StreamObserver[ChatMessage] {
    //handle a new chat message and either join / leave a room or broadcast message
    override def onNext(chatMessage: ChatMessage): Unit = {
    }

    override def onError(t: Throwable): Unit = {
    }

    override def onCompleted(): Unit = {
    }
  }
}

The responseObserver in the method parameter is what the server needs to use to stream data to the client.

The ChatMessage being streamed to the server may have different types, such as JOIN a room, LEAVE a room, or simply a TEXT message for a room:

  1. When joining a chat room, add the responseObserver to the room’s set of all currently connected observers.
  2. When leaving a chat room, remove the responseObserver from the room’s observers set.
  3. When sending a message, first make sure user is in the room, and then send to all the connected observers in the room:
new StreamObserver[ChatMessage] {
  //handle a new chat message and either join / leave a room or broadcast message
  override def onNext(chatMessage: ChatMessage): Unit = {
    val optObservers = getRoomObservers(chatMessage.roomName)
    logger.info(s"optObservers $optObservers")

    optObservers.foreach { observers =>
      chatMessage.`type` match {
        //add a new response observer to the current set of observers
        case MessageType.JOIN => {
          logger.info("adding observer to room")
          observers.add(responseObserver)
        }
        //remove an observer from the current set of observers
        case MessageType.LEAVE =>
          observers - (responseObserver)
        //add a new response observer to the current set of observers
        case MessageType.TEXT =>
          if (observers.contains(responseObserver)) {
            val now = Timestamp((new Date()).getTime)
            val messageFromServer = ChatMessageFromServer(Some(now),chatMessage.`type`,chatMessage.roomName, username, chatMessage.message)
            observers.foreach(nxtObserver => nxtObserver.onNext(messageFromServer))
          }
          else {
            logger.info("returning error because user is not in room")
            responseObserver.onError(Status.PERMISSION_DENIED.withDescription(s"You are not in the room ${chatMessage.roomName}").asRuntimeException)
          }
        case _ => logger.error("Unknown chat message type")
      }
    }
  }
}
  1. If there is an error, or when the client closes connection, remove the responseObserver from all rooms
override def onError(t: Throwable): Unit = {
  logger.error("gRPC error", t)
  removeObserverFromAllRooms(responseObserver)
}

override def onCompleted(): Unit = {
  removeObserverFromAllRooms(responseObserver)
}
  1. Run the authserver and chatserver in separate terminals:
$ sbt authservice/run

//In a separate terminal run
$ sbt chatservice/run