Post

간단한 이벤트 드리븐 아키텍처 구현하기

개요

오늘날, 마이크로서비스 아키텍처 (MSA)는 분산 시스템 디자인에서 거의 표준이 되었습니다. MSA의 중요한 요소 중 하나는 서비스 간의 통신입니다. 이런 맥락에서 이벤트 드리븐 아키텍처 (EDA, Event-Driven Architecture)가 주목받고 있습니다.

EDA는 서비스들이 상호작용하는 방식을 바꾸는 아키텍처 패턴입니다. 각 서비스는 자신이 수행한 작업에 대한 “이벤트”를 발행하고, 이 이벤트는 다른 서비스들이 구독하고 반응하는 방식으로 동작합니다. 이렇게 구성하면 서비스들이 서로 독립적이면서도 유기적으로 협력할 수 있게 됩니다.

이번 포스트에서는 이벤트 드리븐 아키텍처를 구현하는 방법에 대해 상세히 알아보겠습니다. 특히, 우리는 Kafka, 특히 Confluent Kafka를 사용하여 간단한 EDA를 구현하는 방법에 초점을 맞추겠습니다. 그리고 예제로는 간단한 e-commerce 서비스를 EDA로 변환하는 과정을 단계별로 살펴보겠습니다.

다음 다이어그램은 주문 요청이 이루어질 때 시스템이 어떻게 반응하는지 보여줍니다.

20230520132952.png

유저 주문 Flow

이제 Kafka를 준비하고, 주제를 생성하고, 서비스를 구현하는 단계로 들어가 보겠습니다.

Prerequisite

이 튜토리얼을 진행하기 전에 아래의 요구사항들을 충족시켜야 합니다.

  1. Python: 이번 튜토리얼은 Python 기반입니다. Python 3.6 이상의 버전이 설치되어 있어야 합니다. 설치 방법은 Python 공식 웹사이트를 참조하시기 바랍니다. 이번 포스트에서는 **Python 3.11 버전을 사용합니다.
  2. Confluent Kafka: 이번 튜토리얼에서는 Confluent Kafka를 사용합니다. 이는 Apache Kafka의 확장 버전으로, 실시간 데이터 파이프라인 및 스트리밍 앱을 더욱 효율적으로 구축할 수 있게 돕는 도구입니다. Confluent Kafka를 클라우드에 설정하는 방법에 대해서구체적인 설명은 다루지 않습니다. Confluent Cloud에 접속하여, Free Tier 클라우드를 생성 후 kafka를 준비해야 합니다.
  3. confluent-kafka-python: Python에서 Confluent Kafka를 사용하기 위해 confluent-kafka-python 라이브러리가 필요합니다. 이 라이브러리는 Python에서 Kafka를 쉽게 사용할 수 있게 도와줍니다. 설치는 Python의 패키지 관리자인 pip를 통해 할 수 있습니다.
  4. FastAPI: FastAPI는 이번 튜토리얼에서 사용할 웹 프레임워크입니다. 그러나 이 튜토리얼에서는 FastAPI에 대한 구체적인 설명은 다루지 않습니다. FastAPI에 대한 기본적인 이해가 있다고 가정하고 진행합니다.

위의 요구사항들을 충족시킨 후, 다음 섹션에서는 Kafka 주제의 생성과 서비스의 구현을 시작하겠습니다.

Confluent kafka 셋팅

20230520132453.png

confluent cloud dashboard

confluent kafka 는 confluent cloud 에서 제공하는 서비스로 이번 포스트에서는 무료버전으로 클라우드 생성을 진행하여 사용합니다.

Topic 생성

이 프로젝트에서는 “order-request”와 “delivery-request”라는 두 개의 Kafka 토픽이 필요합니다. Confluent Cloud UI를 이용해 새로운 Kafka 토픽을 생성하는 방법은 매우 직관적입니다. 다음 단계를 따르면, 필요한 토픽을 손쉽게 생성할 수 있습니다.

  1. Confluent Cloud 대시보드에 로그인합니다.
  2. 왼쪽 메뉴에서 “Topics”를 클릭합니다.
  3. “Add a Topic” 버튼을 클릭합니다.
  4. 새 토픽의 이름을 입력합니다. 첫 번째 토픽의 이름은 “order-request”, 두 번째 토픽의 이름은 “delivery-request”로 지정합니다.
  5. 선택적으로 다른 설정을 조정할 수 있습니다. 이들 설정에는 키와 값의 데이터 형식, 파티션 수, 복제 수 등이 포함될 수 있습니다.
  6. “Create” 버튼을 클릭합니다.

이제 “order-request”와 “delivery-request”라는 두 개의 Confluent Kafka 토픽이 성공적으로 생성되었습니다. 이 토픽들은 이후의 작업에 사용될 것입니다.

20230520133147.png

토픽 생성

Implementation

FastAPI와 confluent-kafka를 활용하여 Kafka를 구현하는 과정을 알아보겠습니다. 이번 섹션에서는 파이썬 3.11 버전과 FastAPI, confluent-kafka 라이브러리, PostgreSQL 데이터베이스를 사용합니다. FastAPI에 대한 기본적인 문법은 이미 알고 있다고 가정하고 설명을 진행하겠습니다.

Order Service

먼저 confluent-kafka를 사용하기 위해 필요한 패키지를 설치합니다. pip를 이용하여 쉽게 설치할 수 있습니다.

1
pip install confluent-kafka

그 다음, confluent-kafka에서 제공하는 API 키를 이용해 client.properties를 설정합니다. 여기서는 클라우드 주소, 보안 프로토콜, SASL 메커니즘, API 키와 비밀번호 등을 설정합니다.

1
2
3
4
5
6
7
8
9
# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers=클라우드 주소
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=API 키
sasl.password=API Secret 키

# Best practice for higher availability in librdkafka clients prior to 1.7
session.timeout.ms=45000

그런 다음, 이 설정을 불러오는 함수를 구현합니다. 이 함수는 client.properties 파일을 읽어들여 설정 정보를 딕셔너리 형태로 반환합니다.

1
2
3
4
5
6
7
8
9
10
11
12
# client.properties 를 읽고 설정을 불러오는 함수

def read_ccloud_config() -> dict:
    conf = {}
    with open("client.properties") as fh:
        for line in fh:
            line = line.strip()
            if len(line) != 0 and line[0] != "#":
                parameter, value = line.strip().split("=", 1)
                conf[parameter] = value.strip()
    return conf

이제 이벤트를 발행하고 처리하기 위해 Producer와 Consumer를 구현해봅시다. Order Service에서 동작하는 Producer의 구현과 이벤트 발행 함수는 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Producer 구현과 이벤트 publish 함수

import json

from confluent_kafka import Producer

from event_services.config import read_ccloud_config

# config를 불러와 Producer 객체 생성
producer: Producer = Producer(read_ccloud_config())


# message publish
def publish(topic: str, key: str, value: dict) -> None:
    key_bytes: bytes = key.encode("utf-8")
    value_bytes: bytes = json.dumps(value).encode("utf-8")
    producer.produce(topic, key=key_bytes, value=value_bytes)

또한, 특정 토픽을 구독하고 이벤트를 polling하는 코드도 필요합니다. 이를 위해 Consumer를 구현합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import json
from typing import Any

from confluent_kafka import Consumer, Message
from sqlalchemy.orm import Session

from database import engine
from event_services.config import read_ccloud_config
from event_services.tasks import create_order_and_publish_message

props: dict[str, Any] = read_ccloud_config()
props["group.id"] = "python-group-1"
props["auto.offset.reset"] = "earliest"

consumer: Consumer = Consumer(props)
consumer.subscribe(["order-requeset"])


def polling_start() -> None:
    try:
        while True:
            msg: Message | None = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue

            if msg.topic() == "order-requeset":
                print(
                    f"key: {msg.key().decode('utf-8')} | value: {msg.value().decode('utf-8')}"
                )
                product_value: dict[str, Any] = json.loads(msg.value().decode("utf-8"))
                product_id: str = product_value.get("product_id")
                with Session(engine) as db:
                    create_order_and_publish_message(db, product_id=product_id)
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()

이 Consumer 코드는 메시지를 polling하고, 토픽이 order-requeset일 경우 create_order_and_publish_message 함수를 실행합니다. 이 함수는 주문 정보를 데이터베이스에 삽입하고 메시지를 발행하는 역할을 합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from datetime import datetime
from uuid import uuid4

from sqlalchemy.orm import Session

from domain.order.models import Order
from event_services.producer import publish


def create_order_and_publish_message(db: Session, product_id: str) -> None:
    # Order 데이터 생성 후 insert
    order: Order = Order(
        order_id=uuid4().__str__(),
        product_id=product_id,
        order_status=1,
        created_at=datetime.now(),
    )
    db.add(order)
    db.commit()

    # 메세지 publish
    publish(
        topic="delivery-request",
        key=order.order_id,
        value={
            "order_id": order.order_id,
            "product_id": order.product_id,
            "order_status": order.order_status,
        },
    )

이렇게 Producer와 Consumer를 구현하여 이벤트를 발행하고 처리하는 과정을 살펴보았습니다. 이를 통해 Kafka를 효과적으로 활용할 수 있습니다.

Test

우리의 API, Service, 그리고 DB를 테스트하기 위해, 우리는 docker-compose를 이용해 이들을 실행합니다. 이 과정은 아래 도커 파일들을 통해 이루어집니다.

첫 번째는 API 서버를 위한 도커 파일입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Dockerfile.api
FROM python:3.9

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /app

RUN pip install pipenv

COPY Pipfile Pipfile.lock /app/

RUN pipenv install --system --deploy

COPY . /app

CMD ["sh", "-c", "alembic upgrade head && uvicorn server:app --host 0.0.0.0 --port 8000"]

이 파일은 우리의 API 서버를 위한 설정을 담고 있습니다. Python 3.9 이미지를 기반으로 하며, 환경 변수를 설정해주고 필요한 패키지를 설치합니다. 마지막으로, API 서버를 실행하는 명령어를 정의합니다.

다음은 이벤트 처리를 담당하는 서비스를 위한 도커 파일입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Dockerfile.service
FROM python:3.9

ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /app

RUN pip install pipenv

COPY Pipfile Pipfile.lock /app/

RUN pipenv install --system --deploy

CMD ["python", "services.py"]

이 파일도 마찬가지로 Python 3.9 이미지를 기반으로 하며, 필요한 패키지를 설치하고 서비스를 실행하는 명령어를 정의합니다.

이 두 도커 파일을 이용하여, 우리는 docker-compose.yaml 파일로 컨테이너를 실행합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
version: "3.8"

services:
  api:
    build:
      context: .
      dockerfile: Dockerfile.api
    volumes:
      - ./:/app # 현재 디렉토리를 /app으로 마운트
    ports:
      - "8000:8000"
    depends_on: # db 서비스가 시작된 후에 api 서비스를 시작하도록 설정
      - db
  service:
    build:
      context: .
      dockerfile: Dockerfile.service
    volumes:
      - ./:/app # 현재 디렉토리를 /app으로 마운트
    depends_on: # db 서비스가 시작된 후에 api 서비스를 시작하도록 설정
      - db
  db:
    image: postgres:latest
    volumes:
      - postgres_data:/var/lib/postgresql/data/
    expose:
      - 5432
    environment:
      - POSTGRES_USER=root
      - POSTGRES_PASSWORD=root
      - POSTGRES_DB=order_db

volumes:
  postgres_data:

이 파일은 서비스들의 관계를 정의하고, 각 서비스가 어떻게 빌드되고 실행되는지를 명시합니다. 특히, API와 서비스들은 DB가 준비된 후에 실행되도록 설정되어 있습니다. Postgres 이미지를 사용하는 DB 서비스는 필요한 환경 변수를 설정하고, 데이터를 저장할 볼륨을 마운트합니다.

20230520160041.png

docker compose가 구동된 모습

다음으로, 우리의 FastAPI가 정상적으로 작동하는지 확인할 수 있습니다.

20230520160759.png

FastAPI 동작 확인

이제 모든 서비스가 준비되었고, 우리의 시스템을 테스트하기 위해, 우리는 임시로 order-request 토픽을 발행합니다. 이로 인해 우리의 데이터베이스에 데이터가 생성됩니다. 이 과정은 아래에 시각적으로 나타나 있습니다.

20230520161425.png

메세지 발행

20230520161915.png

컨슈머 메세지 polling

20230520162024.png

생성된 주문데이터 확인

이후, delivery-request 메세지 발행을 확인합니다.

20230520162147.png

delivery-request 메세지 발행 확인

이렇게, 우리는 EDA(Event-Driven Architecture)를 실습하면서 주문 요청에 대한 이벤트를 발행하고 그 이벤트가 정상적으로 처리되는 과정을 테스트하였습니다.

우리의 API 서버는 order-request 이벤트를 받아 적절한 비즈니스 로직을 수행하고, 이를 바탕으로 새로운 delivery-request 이벤트를 발행합니다. 이 이벤트는 서비스에 의해 수신되어 처리됩니다. 이 과정을 통해, 우리는 시스템이 이벤트 기반 아키텍처를 따라 예상대로 동작하는지 확인할 수 있었습니다.

마치며

이번 포스트를 통해, 우리는 이벤트 기반 아키텍처(Event-Driven Architecture, EDA)의 기본적인 구조와 그를 실제로 구현하는 방법에 대해 알아보았습니다. EDA는 높은 확장성과 느슨한 결합도를 가지며, 실시간 데이터 처리와 비동기 워크플로우를 지원하므로 다양한 분야에서 활용됩니다.

Confluent Kafka를 이용하여 이벤트를 발행하고 구독하는 프로듀서와 컨슈머를 구현하였으며, FastAPI를 이용하여 이벤트를 처리하는 API를 개발하였습니다. 또한 Docker를 이용하여 각각의 서비스를 분리하고 관리하는 방법에 대해서도 살펴보았습니다.

이 과정을 통해 이벤트 기반 아키텍처의 강력함과 유연성을 체험하였고, 이를 실제 애플리케이션에 적용하는 방법을 배웠습니다. 하지만 이번 포스트에서 다룬 내용은 EDA의 일부에 불과하며, EDA를 더욱 효과적으로 활용하기 위해서는 이벤트 스트리밍, 메시지 브로커, 비동기 패턴 등에 대한 깊은 이해가 필요합니다.

이벤트 기반 아키텍처는 복잡한 시스템을 관리하고 확장하는 데 있어 중요한 도구가 될 수 있습니다. 이에 대한 이해를 높이고, 실제 프로젝트에 적용하는 능력을 키우는 것이 중요합니다. 이번 포스트가 그 과정에서 도움이 되었기를 바랍니다.

Reference

This post is licensed under CC BY 4.0 by the author.