GitLab 이벤트 스토어

배경

거대한 GitLab 프로젝트가 커지면서 더 많은 도메인이 정의되고 있습니다. 그 결과, 이러한 도메인들은 시간적으로 결합되어 서로 얽히게 됩니다.

상징적인 예로는 PostReceive 워커가 있습니다. 이 워커에서 여러 도메인 간에 많은 일이 발생합니다. 새로운 커밋이 푸시되면, PostReceive나 그 하위 구성요소 (예: Git::ProcessRefChangesService) 어딘가에 코드를 추가합니다.

이 유형의 아키텍처는 다음과 같습니다.

  • 단일 책임 원칙을 위반합니다.
  • 익숙하지 않은 코드베이스에 코드를 추가하는 경우 버그 또는 성능 저하를 일으킬 수 있는 미묘한 점들이 있을 수 있습니다.
  • 도메인 경계를 위반합니다. 특정 네임스페이스(예: Git::) 내에서 갑자기 다른 도메인의 클래스들이 참여합니다(Ci:: 또는 MergeRequests::와 같은).

이벤트 스토어란?

Gitlab:EventStore는 기존 Sidekiq 워커 및 현재 가능한 관찰성을 기반으로 한 기본 pub-sub 시스템입니다. 우리는 이 시스템을 최소한의 결합으로 도메인을 모델링할 때 이벤트 주도 접근법을 적용하기 위해 사용합니다.

이를 통해 기존의 Sidekiq 워커는 비동기 작업을 수행하는 것으로 남겨두지만 의존성을 역전시킵니다.

이벤트 스토어 예제

CI 파이프라인이 생성되면 해당 파이프라인의 ref와 일치하는 모든 Merge Request의 헤드 파이프라인을 업데이트합니다. 그러면 Merge Request는 최신 파이프라인의 상태를 표시할 수 있습니다.

이벤트 스토어 없이

Ci::CreatePipelineService를 변경하고 (예: if 문처럼) 해당 파이프라인이 생성되었는지를 확인하는 로직을 추가합니다. 그런 다음, MergeRequests:: 도메인을 위한 부수 효과를 실행할 워커를 예약합니다.

이 스타일은 개방-폐쇄 원칙을 위반하며, 결합을 증가시키는 다른 도메인의 부수 효과 로직을 불필요하게 추가합니다.

graph LR subgraph ci[CI] cp[CreatePipelineService] end subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end cp-- perform_async -->upw cp-- perform_async -->pow

이벤트 스토어 사용시

Ci::CreatePipelineServiceCi::PipelineCreatedEvent 이벤트를 발행하고, 여기에서 그 책임이 멈춥니다.

MergeRequests:: 도메인은 이 이벤트를 구독할 수 있는 MergeRequests::UpdateHeadPipelineWorker를 가질 수 있으므로:

  • 부수 효과는 비동기적으로 예약되고 주요 비즈니스 트랜잭션에 영향을 끼치지 않습니다.
  • 주요 비즈니스 트랜잭션을 수정하지 않고 더 많은 부수 효과를 추가할 수 있습니다.
  • 관련된 도메인이 무엇인지, 그리고 그들의 소유권이 무엇인지 명확하게 알 수 있습니다.
  • 시스템 내에서 발생하는 이벤트를 명시적으로 선언하기 때문에 어떤 이벤트가 발생하는지 확인할 수 있습니다.

Gitlab::EventStore를 사용하면, 구독자(사이드킥 워커)와 도메인 이벤트 스키마 사이에는 여전히 결합이 존재합니다. 하지만 이 수준의 결합은 주요 트랜잭션인 (Ci::CreatePipelineService)이

  • 여러 구독자에 결합됨.
  • 구독자를 호출하는 여러 방법(조건부 호출 포함)이 존재함.
  • 매개변수를 전달하는 여러 방법이 존재함.

에 비해 훨씬 작습니다.

graph LR subgraph ci[CI] cp[CreatePipelineService] cp-- publish --> e[PipelineCreateEvent] end subgraph mr[MergeRequests] upw[UpdateHeadPipelineWorker] end subgraph no[Namespaces::Onboarding] pow[PipelinesOnboardedWorker] end upw -. subscribe .-> e pow -. subscribe .-> e

사이드킥 워커인 각 구독자는 그들이 책임지는 작업 유형에 관련된 모든 속성을 지정할 수 있습니다. 예를 들어, 한 구독자는 urgency: high를 정의할 수 있고 다른 구독자는 덜 중요한 작업을 위해 urgency: low를 설정할 수 있습니다.

이벤트 스토어는 사실 다른 도메인에서 실행되는 부수 효과와 비즈니스 트랜잭션을 분리하는 데 도움을 주는 추상화만입니다.

이벤트가 발행되면, 이벤트 스토어는 각 구독자의 큐에 Sidekiq 작업을 예약하기 위해 이벤트 정보를 인수로 전달하면서 구독된 워커 위에 perform_async를 호출합니다.

이것은 구독자들이 그저 사이드킥 워커이기 때문에 다른 모든 것들에 대한 변화는 없다는 것을 의미합니다. 예를 들어, 한 워커(구독자)가 작업을 실행에 실패하면, 해당 작업은 사이드킥에 다시 넣어져 재시도됩니다.

이벤트 스토어 이점

  • 구독자(사이드킥 워커)는 부수 효과가 중요할 경우 작업 가중치를 변경하여 더 빨리 실행될 수 있습니다.
  • 부수 효과가 비동기로 실행되는 것을 자동으로 집행합니다. 이로써 다른 도메인이 주요 비즈니스 트랜잭션의 성능에 영향을 주지 않고 이벤트를 구독할 수 있게 됩니다.

이벤트 정의

Event 객체는 바운디드 컨텍스트에서 발생한 도메인 이벤트를 나타냅니다. 이벤트를 발행하여 해당 사실에 대해 다른 바운디드 컨텍스트에 통보하고, 그에 따라 반응하도록 합니다.

과거에 발생한 사건을 나타내는 이름으로 app/events/<namespace>/ 아래에 새로운 이벤트 클래스를 정의하세요.

class Ci::PipelineCreatedEvent < Gitlab::EventStore::Event
  def schema
    {
      'type' => 'object',
      'required' => ['pipeline_id'],
      'properties' => {
        'pipeline_id' => { 'type' => 'integer' },
        'ref' => { 'type' => 'string' }
      }
    }
  end
end

유효한 JSON 스키마여야 하는 스키마는 (JSONSchemer)[https://github.com/davishmcclurg/json_schemer] 젬에 의해 검증됩니다. 이 유효성 검사는 구독자가 계약을 따를 것을 보장하기 위해 이벤트 객체를 초기화하는 즉시 수행됩니다.

가능한 한 선택적 속성을 사용해야 하며, 스키마 변경을 위한 롤아웃이 적을 것을 요구합니다. 그러나 required 속성은 이벤트 주체의 고유 식별자로 사용될 수 있습니다. 예를 들어:

  • Ci::PipelineCreatedEvent의 경우 pipeline_id는 필수 속성이 될 수 있습니다.
  • Projects::ProjectDeletedEvent의 경우 project_id는 필수 속성이 될 수 있습니다.

구독자가 추가 데이터를 데이터베이스에서 가져와야 할 수 있게 만들면 안되며, 구독자가 첫 번째로 원하는 데이터만 포함시키도록 합니다.

Ci::PipelineCreatedEvent.new(data: {
  pipeline_id: pipeline.id,
  # 모든 구독자가 Merge Request ID를 필요로 하는 것이 아니라면,
  # 이 데이터는 구독자들이 가져올 수 있는 데이터입니다.
  merge_request_ids: pipeline.all_merge_requests.pluck(:id)
})

소비자들에게 더 많은 속성을 제공함으로써, 구독자들이 처음부터 필요로 하는 데이터를 제공합니다. 그렇지 않으면 구독자들은 데이터베이스에서 추가 데이터를 가져오게 됩니다. 그러나 이것은 지속적인 스키마 변경 및 단일 진실의 소스를 나타내지 못하는 속성을 추가할 수 있으므로 성능 최적화로서 사용하는 것이 좋습니다.

스키마 업데이트

스키마 변경에는 여러 번의 롤아웃이 필요합니다. 새로운 버전이 배포되는 동안:

  • 기존 발행자는 이전 버전을 사용하여 이벤트를 발행할 수 있음.
  • 기존 구독자는 이전 버전을 사용하여 이벤트를 소비할 수 있음.
  • 이벤트는 작업 인수로 Sidekiq 큐에 유지되므로 배포 중에 2가지 버전의 스키마를 가질 수 있음.

스키마 변경이 최종적으로 Sidekiq 인수에 영향을 미치기 때문에, 여러 번의 롤아웃에 대한 자세한 내용은 Sidekiq style guide를 참조하세요.

속성 추가

  1. 롤아웃 1:
    • 새로운 속성을 선택 사항으로 추가합니다. (required가 아님)
    • 구독자를 업데이트하여 새로운 속성을 포함하거나 미포함한 이벤트를 사용할 수 있도록 합니다.
  2. 롤아웃 2:
    • 발행자를 변경하여 새로운 속성을 제공합니다.
  3. 롤아웃 3: (속성이 required여야 할 경우):
    • 스키마 및 구독자 코드를 항상 해당 속성을 예상하도록 변경합니다.

속성 제거

  1. 롤아웃 1:
    • 속성이 required이면 선택 사항으로 만듭니다.
    • 구독자를 업데이트하여 속성을 항상 예상하지 않도록 합니다.
  2. 롤아웃 2:
    • 이벤트 발행에서 속성을 제거합니다.
    • 속성을 처리하는 구독자 코드를 제거합니다.

기타 변경사항

기타 변경사항에 대해서는 속성 이름을 변경하는 것과 같은 단계를 사용하세요:

  1. 이전 속성 제거
  2. 새로운 속성 추가

이벤트 발행

이전 예제에서 이벤트를 발행하려면 다음을 참조하세요:

Gitlab::EventStore.publish(
  Ci::PipelineCreatedEvent.new(data: { pipeline_id: pipeline.id })
)

가능하다면 이벤트는 관련 있는 서비스 클래스에서 발송되어야 합니다. 상태 기계 전이와 같이 모델이 이벤트를 발행할 수 있는 경우도 있습니다. 예를 들어, Ci::BuildFinishedWorker를 예약하는 대신 Ci::BuildFinishedEvent를 발행하여 다른 도메인이 비동기적으로 반응하도록 할 수 있습니다.

ActiveRecord 콜백은 도메인 이벤트를 나타내기에는 너무 저수준입니다. 이들은 데이터베이스 레코드 변경을 더 많이 나타냅니다. 그것이 합리적인 경우도 있을 수 있지만, 그러한 예외 사항들을 고려해야 합니다.

구독자 생성

구독자는 Gitlab::EventStore::Subscriber 모듈을 포함하는 Sidekiq worker입니다. 이 모듈은 perform 메서드를 처리하고 handle_event 메서드를 통해 이벤트를 안전하게 처리하기 위한 더 나은 추상화를 제공합니다. 예를 들어:

module MergeRequests
  class UpdateHeadPipelineWorker
    include Gitlab::EventStore::Subscriber
    
    def handle_event(event)
      Ci::Pipeline.find_by_id(event.data[:pipeline_id]).try do |pipeline|
        # ...
      end
    end
  end
end

이벤트에 구독자 등록

lib/gitlab/event_store.rb에서 특정 이벤트에 대한 워커를 구독하려면 Gitlab::EventStore.configure! 메서드에 다음과 같이 한 줄을 추가하세요:

note
새로운 워커는 카나리 배포 호환성을 보장하기 위해 피처 플래그와 함께 소개되어야 합니다.
module Gitlab
  module EventStore
    def self.configure!(store)
      # ...
      
      store.subscribe ::Sbom::ProcessTransferEventsWorker, to: ::Projects::ProjectTransferedEvent,
        if: ->(event) do
          actor = ::Project.actor_from_id(event.data[:project_id])
          Feature.enabled?(:sync_project_archival_status_to_sbom_occurrences, actor)
        end
      
      # ...
    end
  end
end

EE 코드베이스에만 정의된 워커는 ee/lib/ee/gitlab/event_store.rb에서 구독을 선언함으로써 동일한 방식으로 이벤트를 구독할 수 있습니다.

구독은 Rails 앱이 로드될 때 메모리에 저장되고 즉시 고정됩니다. 런타임에서 구독을 수정할 수 없습니다.

이벤트 조건부 디스패치

구독은 이벤트를 수락할 조건을 지정할 수 있습니다.

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  if: -> (event) { event.data[:merge_request_id].present? }

이것은 조건이 충족되면 이벤트 스토어가 구독자로 Ci::PipelineCreatedEvent를 디스패치하도록 지시합니다.

이 기술은 구독자가 소수의 이벤트에 관심을 가지고 있는 경우에 Sidekiq 작업 예약을 피하는 데 유용합니다.

caution
조건부 디스패치를 사용할 때는 각 이벤트가 게시될 때마다 동기적으로 실행되기 때문에 싼 조건만 포함되어 있어야 합니다.

복잡한 조건의 경우 모든 이벤트를 구독한 다음 구독자 워커의 handle_event 메서드에서 로직을 처리하는 것이 좋습니다.

이벤트 지연 디스패치

구독은 이벤트를 받을 때 지연을 지정할 수 있습니다.

store.subscribe ::MergeRequests::UpdateHeadPipelineWorker,
  to: ::Ci::PipelineCreatedEvent,
  delay: 1.minute

delay 매개변수는 구독자 Sidekiq 워커의 perform_in 메서드를 사용하여 이벤트를 디스패치하는 것으로 전환합니다.

이 기술은 많은 이벤트를 발행하고 Sidekiq 중복을 활용하는 데 유용합니다.

이벤트 그룹 발행

어떤 시나리오에서는 단일 비즈니스 트랜잭션에서 동일한 유형의 여러 이벤트를 발행합니다. 이는 각 이벤트에 대해 작업을 호출하여 사이드킥에 부하를 추가합니다. 이와 같은 경우, 우리는 Gitlab::EventStore.publish_group를 호출하여 여러 이벤트 그룹을 발행할 수 있습니다. 이 방법은 유사한 유형의 이벤트 배열을 수락하는데, 기본적으로 구독자 워커는 최대 10개의 이벤트 그룹을 한 번에 수신하지만 group_size 매개변수를 정의하여 구성할 수 있습니다. 구독자에게 수행되는 게시된 이벤트 수는 group_size에 기반하여 일괄 처리 단위로 배치에 의해 설정됩니다. 집합 그룹 수가 100보다 많은 경우 각 그룹을 10초의 지연으로 예약하여 Sidekiq에 부하를 줄일 수 있습니다.

store.subscribe ::Security::RefreshProjectPoliciesWorker,
  to: ::ProjectAuthorizations::AuthorizationsChangedEvent,
  delay: 1.minute,
  group_size: 25

구독자 워커의 handle_event 메서드는 그룹의 각 이벤트에 대해 호출됩니다.

테스트

발행자 테스트

발행자의 책임은 이벤트가 올바르게 발행되는 것을 보증하는 것입니다.

이벤트가 올바르게 발행되었는지 테스트하려면 RSpec 매처 :publish_event를 사용할 수 있습니다.

it '프로젝트 ID와 네임스페이스 ID가 포함된 ProjectDeleted 이벤트를 발행합니다' do
  expected_data = { project_id: project.id, namespace_id: project.namespace_id }
  
  # 블록을 호출했을 때 올바른 이벤트 및 데이터가 발행되는지 확인합니다.
  expect { destroy_project(project, user, {}) }
    .to publish_event(Projects::ProjectDeletedEvent)
    .with(expected_data)
end

또한 :publish_event 매처 내에서 매처를 구성하는 것도 가능합니다. 새로운 레코드를 만든 후에 이벤트가 생성되는지 확인하고 싶을 때 유용합니다.

it '프로젝트 ID와 네임스페이스 ID가 포함된 ProjectCreatedEvent를 발행합니다' do
  # 프로젝트 ID는 `create_project`를 호출할 때 생성됩니다.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }
  
  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
end

여러 이벤트를 게시하는 경우, 게시되지 않은 이벤트도 확인할 수 있습니다.

it '프로젝트 ID와 네임스페이스 ID가 포함된 ProjectCreatedEvent를 발행합니다' do
  # 프로젝트 ID는 `create_project`를 호출할 때 생성됩니다.
  expected_data = { project_id: kind_of(Numeric), namespace_id: group_id }
  
  expect { create_project(user, name: 'Project', path: 'project', namespace_id: group_id) }
    .to publish_event(Projects::ProjectCreatedEvent)
    .with(expected_data)
    .and not_publish_event(Projects::ProjectDeletedEvent)
end

구독자 테스트

구독자는 발행된 이벤트를 올바르게 소비할 수 있는지 보증해야 합니다. 이를 위해 구독자에 대한 테스트 방법을 표준화하기 위한 도우미 및 공유 예시를 추가했습니다.

RSpec.describe MergeRequests::UpdateHeadPipelineWorker do
  let(:pipeline_created_event) { Ci::PipelineCreatedEvent.new(data: ({ pipeline_id: pipeline.id })) }
  
  # 이 공유 예시는 발행된 이벤트가 올바르게 처리되고 현재 구독자(`described_class`)에 의해 처리되는 것을 보증합니다. 또한 이는 워커가 멱등시 되는지도 보증합니다.
  it_behaves_like 'subscribes to event' do
    let(:event) { pipeline_created_event }
  end
  
  # 이 공유 예시는 발행된 이벤트가 무시되는 것을 보증합니다. 이는 조건부 디스패치 테스트에 유용합니다.
  it_behaves_like 'ignores the published event' do
    let(:event) { pipeline_created_event }
  end
  
  it '실행하는 동작' do
    # 이 도우미는 `perform` 메서드가 올바르게 호출되었는지 확인하여 `handle_event`가 올바르게 호출되었음을 보증합니다.
    consume_event(subscriber: described_class, event: pipeline_created_event)
    
    # 기대 사항 실행
  end
end

최선의 방법

  • CE & EE 분리 및 호환성 유지:
    • 이벤트 클래스를 정의하고 이벤트를 항상 발생시키는 코드(CE 또는 EE)에서 발행합니다.
      • 이벤트가 CE 기능의 결과로 발생하는 경우, 이벤트 클래스는 CE에서 정의되고 발행되어야 합니다. 마찬가지로 이벤트가 EE 기능의 결과로 발생하는 경우 이벤트 클래스는 EE에서 정의되고 발행되어야 합니다.
    • 이벤트에 의존하는 구독자를 해당 의존 기능이 존재하는 코드(CE 또는 EE)에서 정의합니다.
      • CE에서 발행된 이벤트(예: Projects::ProjectCreatedEvent)와 이 이벤트에 의존하는 구독자가 EE에서 정의될 수 있습니다 (예: Security::SyncSecurityPolicyWorker).
  • 이벤트 클래스를 정의하고 이벤트를 동일한 bounded context(최상위 Ruby 네임스페이스) 내에서 발행합니다.
    • 주어진 bounded context는 해당 컨텍스트와 관련된 이벤트만 발행해야 합니다.
  • 이벤트를 구독할 때 신호/소음 비율을 평가하세요. 구독하는 이벤트와 무시하는 이벤트의 비율은 어떻게 되나요? 조건부 디스패치를 활용하여 원하는 이벤트의 작은 하위 집합에만 관심이 있는 경우 고려하세요. 조건부 디스패치 또는 잠재적으로 중복되는 워커를 실행하는 동기적 체크 사이의 균형을 유지하세요.