Chat Program with gRPC

먼저 IDE는 PyCharm을 택했다. Visual Code가 좀 더 기능도 좋고 그럴진 모르겠으나, 그냥 익숙한 것 쓰기로..

프로젝트 생성시 Interpreter를 이전에 만든 gRPC conda environment를 그대로 쓰기로 한다. PyCharm 설치는

이 곳을 참조하면 된다. 구글에 그냥 검색하면 Instruction은 수두룩하게 있으니 금방 설치할 수 있다.

튜토리얼을 옆에 띄워두고 이를 보면서 하나씩 익히면서 짜볼 것이다. 포스팅하진 않겠지만 궁금하다면 이 곳을 공부하면 도움이 될 듯 하다.

Making Project

간단한 프로그램이지만 세 개의 프로젝트를 생성할 것이다.

  • chat-proto
  • chat-server
  • chat-client

예제이므로 하나에 그냥 다 묶어버리면 그만이지만 결국엔 나중에 저런식으로 다 나누게 된다. 간단히 생각해봐도, 채팅 프로그램을 배포하려하는데 서버까지 배포하면 이상하지 않은가 ? 그리고 proto가 바뀌면 계속 컴파일 할텐데 아마 이렇게 따로 관리하면 좀 더 편하지 않을까 싶다.

making chat-proto, chat-server, chat-client
  1. File -> New Project 이후 아래와 같이 진행한다. 경로는 각자 다 다를 수 있으니 각자의 경로를 잘 찾아 입력하면 되고 위에서 말했듯 저번에 만들어둔 gRPC 환경을 그대로 쓸 것이다.

    create-project

    add-interpreter

  2. 위 와 같은 방법으로 chat-server와 chat-client를 생성하는데, Attach로 프로젝트를 열면 한 Explorer안에 리스트로 쭉 붙으니 그게 편한 사람은 Attach로 새 창에다 프로젝트 띄우고 싶으면 New window로 하면된다. This Window는 먼저 열어둔 프로젝트가 닫힌다.

chat-proto

채팅 메세지를 어떻게 정의할 수 있을까 ? 일단 복잡하게 생각하지 말고 그냥 단편적인 메세지로 생각하고 진행해보자.

protoc

이제 chat-proto 패키지에 chat_message_pb2.py chat_message_pb2_grpc.py가 생겼을 것이고, 이 것을 쓰려면 conda환경에 chat-proto를 설치해줘야 한다.

chat-proto-install

그럼 이제 chat-server나 chat-client 프로젝트가 같은 conda 환경을 쓰고 있으니, 저 chat-proto를 갖다 쓸 수 있다.

일단 내가 정의한 프로토는 이렇다.

syntax = "proto3";

package chat_proto;

//Empty
message Empty {

}

//Text
message Text {
    string owner = 1;
    string msg = 2;
}

//Message
message ChatMessage {
    Text text = 1;
}

//Request
message ChatRequest {
    ChatMessage cmsg = 1;
}

//Response
message ChatResponse {
    bool success = 1;
}

//Service
service Chat {
    rpc recv (Empty) returns (stream ChatMessage) {}
    rpc send (ChatRequest) returns (ChatResponse) {}
}

recv의 형태가 좀 눈에 거슬리겠으나 chat server에서 조금 더 설명하겠다. 어찌됐건 나름 구조를 갖춰보려고 Request, Response를 만들고 Request안에 Message를 넣고, Message는 Text를 가지도록 했다. 나중에 뭐 파일이라던가 다른 형태의 것도 넣어볼 생각이다. 어찌됐든 위에 첨부된 코드를 컴파일 후 develop 모드로 chat-proto 패키지를 셋업해준다.

참고로 필자의 패키지 디렉터리 구조다. 채팅 메세지를 주고 받을 거니까 chat_message.proto라고 해놓긴 했는데, 조금 어색하긴 하다.

chat-proto

​ ㄴchat_proto

__init__.py

chat_message_pb2.py (compile command 로 부터 생성됨.)

chat_message_pb2_grpc.py (compile command 로 부터 생성됨.)

​ ㄴprotos

chat_message.proto

setup.py

  • compile cmd ( /path/to/chat-proto)

    (grpc) $ python -m grpc_tools.protoc -I protos/ --python_out=chat_proto/ --grpc_python_out=chat_proto/ protos/chat_message.proto 
    
  • setup cmd

    (grpc) $ python setup.py develop
    

    아, setup.py는 아래처럼만 해도 된다.

    from setuptools import find_packages, setup
      
    setup(name='chat_proto',
          version='0.0.1',
          description='Basic proto for chat service',
          url='klize.github.io/beautiful-jekyll',
          author='dk',
          packages=find_packages(),
          classifiers=[
              "Programming Language :: Python :: 3.7"
          ],
          zip_safe=False)
    

chat-server

gRPC 에는 Broadcasting 개념이 없다. 말 그대로 Remote Procedure “Call” 이기 때문에 저런 기능이 없다해도 이해가 된다. 그래서 gRPC에서 이를 비슷하게 구현하기 위해 클라이언트 콜에 의해 어떤 스트림을 쏴주는 형식으로 구현했다. 물론 여러 디자인 패턴이 있을 것이고 특히 Observer Pattern이나 Pub/Sub Pattern등을 어떻게든 구현해볼 수는 있겠으나, 그건 나중에 진짜 필요하면 해보기로 한다.

  • Service

    먼저 service 를 정의하자. chat_message.proto 에 정의한 서비스를 구현하면 된다.

    import logging
      
    from typing import Tuple, List
    from concurrent import futures
      
    import grpc
    import chat_proto.chat_message_pb2
    import chat_proto.chat_message_pb2_grpc
      
      
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(levelname)s %(message)s')
      
      
    class ChatService(chat_proto.chat_message_pb2_grpc.ChatServicer):
        """Chat Service
      
            :param config: tuple of (ip, port)
        """
        def __init__(self, config: Tuple[str, int]):
            self._config = config
      
            self._cmsgs: List[chat_proto.chat_message_pb2.ChatMessage] = []
            self._last_idx_cmsgs = 0
      
        @property
        def config(self):
            return self._config
      
        def send(self, request: chat_proto.chat_message_pb2.ChatRequest, context)\
                -> chat_proto.chat_message_pb2.ChatResponse:
            """Send a message
      
            Clients are supposed to call this method to say their words.
            They make a request including chat_proto.chat_message_pb2.ChatMessage and put their
            message into the request.
      
            :param request:
            :param context:
            :return:
            """
            _cmsg = request.cmsg
      
            _owner = _cmsg.text.owner
            _text = _cmsg.text.msg
      
            # server debugging
            logging.info(f"Message from {_owner} : {_text}")
      
            _success = True
            if _owner == "":
                # If the sender has no its own id, server respond with False, which means
                # the client sends a message but server rejects it.
                _success = False
      
            if _success:
                self._cmsgs.append(_cmsg)
      
            resp = chat_proto.chat_message_pb2.ChatResponse()
            resp.success = _success
      
            return resp
      
        def recv(self, request: chat_proto.chat_message_pb2.Empty, context):
            """recieve messages from other clients
      
            Since gRPC has no concepts of `broadcast`, I tried seems like a broadcasting method
            as possible as I can. In the gRPC manner, client calls this method and server yields
            the latest messages. Client will iterate over this latest messages and print them out
            onto somewhere called a window.
      
            :param request:
            :param context:
            :return:
            """
            while True:
                while len(self._cmsgs) > self._last_idx_cmsgs:
                    cmsg = self._cmsgs[self._last_idx_cmsgs]
      
                    self._last_idx_cmsgs += 1
                    yield cmsg
      
      
    def serve(config: tuple):
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        servicer = ChatService(config=config)
      
        chat_proto.chat_message_pb2_grpc.add_ChatServicer_to_server(servicer=servicer, server=server)
      
        server.add_insecure_port('[::]:'+str(config[1]))
        server.start()
        server.wait_for_termination()
      
    

    이게 서버코드만 보면 send와 recv를 그래서 어떻게 써먹는 건지 이해가 안간다. 그래도 밑에 클라이언트 코드와 같이 보면서 흐름을 한 번 정도 따라가보면 금방 이해될 것이니 우선 서버 실행시킬 코드도 만들자.

  • server

    import argparse
      
    import sys
      
    import chat_server
      
      
    def define_args(arg_vec):
        parser = argparse.ArgumentParser()
      
        parser.add_argument('--host', type=str, default="0.0.0.0")
        parser.add_argument('--port', type=int, default="33333")
      
        _args = parser.parse_args(arg_vec)
        return _args
      
      
    if __name__ == "__main__":
        args = define_args(arg_vec=sys.argv[1:])
      
        chat_server.service.serve((args.host, args.port))
      
    

역시 디렉터리 구조를 한번 더 그려보겠다. client 구조도 완전히 똑같으니 그대로 하면될 것 이다. 이걸 패키지로 설치하고 원래는 다른 디렉터리에서 받아쓰는 형태를 생각했으나 그냥 안에다가 example 디렉터리 하나 만들어서 서버를 실행시킨다.

chat-server

​ ㄴchat_server

__init__.py

service.py

​ ㄴexample

server.py

setup.py

  • setup.py

    보면 알겠지만 name이랑 description빼면 달라진게 없다. import 할때 name에 쓴 chat_server로 import한다.

    from setuptools import find_packages, setup
      
    setup(name='chat_server',
          version='0.0.1',
          description='Basic Chat Server',
          url='klize.github.io/beautiful-jekyll',
          author='dk',
          packages=find_packages(),
          classifiers=[
              "Programming Language :: Python :: 3.7"
          ],
          zip_safe=False)
    
  • __init__.py

    from . import service
    __all__ = [service]
      
    

    이건 왜 해놨냐면, import chat_server를 하면 우리가 쓰려고 만든 service를 바로 사용 못하고

    import chat_server.service 같이 써야된다. 이러나 저러나 as를 통해 해결할 수 있지만 그냥 예시로써 작성했다.

chat-client

클라이언트는 이제 send와 recv를 호출하여 채팅 서비스를 이용하게 된다. 사실 이렇게 구현하는게 맞나 싶긴 하다.

다른 쓰레드를 하나 만들어서 recv로 계속 최신 메세지를 불러오고, 메인 쓰레드에서는 입력을 받아 서버의 send를 실행할 것이다. 이 코드에 있는 Exception은 그냥 ChatResponse에 대한 테스트용도로 만든 거고 사실 테스트 코드를 따로 짜기 귀찮아서 그냥 집어 넣었다.

  • chat_client/client.py

    import logging
    from queue import Queue
    from typing import Tuple
      
    import sys
    import threading
      
    import grpc
      
    import chat_proto.chat_message_pb2
    import chat_proto.chat_message_pb2_grpc
      
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(levelname)s %(message)s')
      
      
    class ChatClient:
      
        def __init__(self, user_id: str, config: Tuple[str, int]):
            self._id = user_id
            self._config = config
      
            channel = grpc.insecure_channel(":".join(list(map(str, config))))
            self._connection = chat_proto.chat_message_pb2_grpc.ChatStub(channel)
      
            self._get_cmsg_stream_task = self._create_task("_get_cmsg_stream")
      
            self._cmsg_q = Queue()
      
            logging.info(f"Client<{self._id}> generated")
      
      
        @property
        def id(self):
            return self._id
      
        @id.setter
        def id(self, _id):
            self._id = _id
      
        @property
        def config(self):
            return self._config
      
        @property
        def connection(self):
            return self._connection
      
        def _make_req(self, msg):
            """
      
            Make a chat_proto.chat_message_pb2.ChatRequest to use as input to server's send method
      
            :param msg:
            :return:
            """
            chat_req = chat_proto.chat_message_pb2.ChatRequest()
            chat_req.cmsg.text.owner = self.id  # "" not works, Must feed None
            chat_req.cmsg.text.msg = msg
            return chat_req
      
        def send_msg(self, msg):
            """
      
            Make a request, then call server's send method with request.
      
            :param msg:
            :return:
            """
            req = self._make_req(msg)
            resp = self.connection.send(req)
            return resp
      
        def _get_cmsg_stream(self):
            """
      
            As you can see from chat_server.service, `recv` method is a blocking by outer while loop.
            def recv(...):
                while True:
                    while ...:
                        c = ...
      
                        yield  c
      
            But if you send any message to the server, then server's message list will be updated
            ,thus the length of current list gets different from cached latest length, _last_idx_cmsgs.
      
            As a result of updates, server yields the updated messages and client who called this method
            gets cmsg in the for-loop.
      
            Since the method is basically blocking, this will be started on other thread.
      
            :return:
            """
            for cmsg in self.connection.recv(chat_proto.chat_message_pb2.Empty()):
                logging.debug(f"{cmsg.text.owner} > {cmsg.text.msg}") # or print
                self._cmsg_q.put(cmsg)
      
        def _create_task(self, task: str):
            th = threading.Thread(target=getattr(self, task), daemon=True)
            th.start()
            return th
      
        @staticmethod
        def get_id_from_terminal():
            _id = input("id ? ")
            return _id
      
        def start(self):
            while True:
                try:
                    msg = input("message >> ")
                    if msg == "q":
                        sys.exit(0)
                    else:
                        resp = self.send_msg(msg)
                        if not resp:
                            raise Exception # @TODO exception based on resp
                except Exception as e:
                    logging.warning(e)
                    self.id = self.get_id_from_terminal()
      
    
  • example/client.py

    import argparse
      
    import sys
      
    from chat_client.client import ChatClient
      
      
    def define_args(arg_vec):
        parser = argparse.ArgumentParser()
      
        parser.add_argument('--id', type=str)
        parser.add_argument('--host', type=str, default="localhost")
        parser.add_argument('--port', type=int, default="33333")
      
        _args = parser.parse_args(arg_vec)
        return _args
      
      
    if __name__ == "__main__":
        args = define_args(arg_vec=sys.argv[1:])
      
        c = ChatClient(user_id=args.id, config=(args.host, args.port))
      
        c.start()
      
    

이 정도로 하고 서버를 돌리고,

#Terminal 1
$ python example/server.py

클라이언트를 다른 터미널에서 각각 실행하면 간단한 채팅이 되기는 한다. 그러나 터미널인지라 메세지 입력중에 다른사람 메세지가 오기도하고 아주 보기가 까다롭다. 조금 더 그럴싸 한 앱을 만들고 싶다면 GUI를 적용해보길 바란다.

#Terminal 2
$ python example/client.py --id a

#Terminal 3
$ python example/client.py --id b

이제 이 gRPC를 공부를 해야하는데, 사실 나는 글만 봐서는 잘 익혀지지 않는 스타일이다. 그래서 이 참에 생각난 예제가 하나 더 있어서 다음 포스팅 부터 이를 다루면서 필요할 때 document를 보는 방법으로 또 진행 하겠다. 이전 포스팅들 에서도 얘기했지만, gRPC 홈페이지를 가면 documentation으로 부터 충분히 배울 수 있으니 예제를 가지고 다루는데에 거부감이 있다면 그쪽에서 먼저 공부하면 된다.