트리 계층에서의 배치 반복 (개념 증명)

GitLab에서의 그룹 계층 구조는 트리로 표현되며, 루트 요소는 최상위 네임스페이스이고 자식 요소는 하위 그룹 또는 최근 도입된 Namespaces::ProjectNamespace 레코드입니다.

이 트리는 namespaces 테이블의 parent_id 열을 통해 구현됩니다. 이 열은 상위 네임스페이스 레코드를 가리킵니다. 최상위 네임스페이스는 parent_id가 없습니다.

gitlab-org의 부분 계층:

flowchart TD A("gitlab-org (9979)") --- B("quality (2750817)") B --- C("engineering-productivity (16947798)") B --- D("performance-testing (9453799)") A --- F("charts (5032027)") A --- E("ruby (14018648)")

그룹 계층을 효율적으로 반복하는 것은 여러 잠재적인 사용 사례가 있습니다. 이는 특히 안정적이고 안전한 실행이 빠른 런타임보다 더 중요한 배경 작업에서 그렇습니다. 배치 반복은 더 많은 네트워크 왕복이 필요하지만 각 배치는 유사한 성능 특성을 제공합니다.

몇 가지 예:

  • 각 하위 그룹에서 작업 수행.
  • 계층 내 각 프로젝트에서 작업 수행.
  • 계층 내 각 이슈에서 작업 수행.

문제 진술

그룹 계층이 너무 커져서 단일 쿼리로는 제 시간에 로드할 수 없을 수 있습니다. 쿼리는 구문 시간 초과 오류로 실패하게 됩니다.

매우 큰 그룹과 관련된 확장성 문제를 해결하려면 동일한 데이터를 다양한 형식으로 저장해야 합니다(비정규화). 그러나 그룹 계층을 로드할 수 없는 경우 비정규화를 구현할 수 없습니다.

하나의 비정규화 기법은 주어진 그룹에 대한 모든 하위 그룹 ID를 저장하는 것입니다. 이렇게 하면 그룹 및 하위 그룹을 로드해야 할 때 쿼리를 가속화할 수 있습니다. 예:

flowchart TD A(1) --- B(2) A --- C(3) C --- D(4)
GROUP_ID DESCENDANT_GROUP_IDS
1 [2,3,4]
2 []
3 [4]
4 []

이 구조에서는 모든 하위 그룹을 결정할 때 데이터베이스에서 단 한 행만 읽으면 됩니다. 1000 그룹과 같은 큰 계층에서는 이는 큰 차이를 만들 수 있습니다.

이 비정규화를 통해 계층 문제를 해결할 수 있습니다. 그러나 이 데이터를 테이블에 지속적으로 저장할 방법을 찾아야 합니다. 그룹 및 해당 계층이 매우 클 수 있으므로 이곳에서도 단일 쿼리로 작동할 것으로 기대할 수 없습니다.

SELECT id FROM namespaces WHERE traversal_ids && ARRAY[9970]

위 쿼리는 큰 그룹의 경우 시간이 초과될 수 있으므로 데이터를 배치로 처리해야 합니다.

트리에서 배치 논리를 구현하는 것은 우리가 이전에 검토한 적이 없으며 구현하기에 상당히 복잡합니다. EachBatch 또는 find_in_batches 기반 솔루션은 다음과 같은 이유로 작동하지 않습니다:

  • 데이터(그룹 ID)가 계층에서 정렬되어 있지 않습니다.
  • 하위 그룹의 그룹은 최상위 그룹 ID를 알지 못합니다.

알고리즘

배치 쿼리는 재귀 CTE SQL 쿼리로 구현되며, 한 배치는 최대 N 행을 읽습니다. 트리 구조 때문에 N 행을 읽는다고 해서 반드시 N 그룹 ID를 읽는 것은 아닙니다. 트리가 비최적의 방식으로 구성된 경우, 한 배치는 적은 수의 그룹 ID를 반환할 수 있습니다(하지만 더 많은 그룹 ID는 반환하지 않습니다).

쿼리는 깊이 우선 트리 탐색 논리를 구현하여 DB가 트리의 첫 가지를 리프 요소까지 스캔합니다. 우리는 배치가 완료되었을 때 쿼리가 다음 배치를 위한 충분한 정보를 반환해야 하므로 깊이 우선 알고리즘을 구현하고 있습니다. GitLab에서는 트리의 깊이를 20으로 제한하며, 이는 최악의 경우 쿼리가 19개의 요소를 포함하는 커서를 반환함을 의미합니다.

너비 우선 트리 탐색 알고리즘을 구현하는 것은 비현실적입니다. 왜냐하면 그룹은 상한선 없는 하위 자식을 가질 수 있으므로 결과적으로 거대한 커스를 얻을 수 있기 때문입니다.

  1. 현재 처리 중인 그룹 ID(최상위 그룹 ID)를 포함하는 초기화 행을 만듭니다.

    1. 두 개의 배열(트리 깊이와 수집된 ID)

    2. 쿼리에서 읽은 행의 수를 추적하기 위한 카운터

  2. 행을 재귀적으로 처리하고 조건이 일치할 때마다 다음 중 하나를 수행합니다:

    • Leaf 노드에 도달하지 않은 경우 첫 번째 하위 네임스페이스를 로드하고 현재 처리 중인 네임스페이스 ID를 업데이트합니다. (가지를 따라 아래로 이동)

    • 동일한 깊이에서 남아 있는 행이 있는 경우 현재 깊이에 있는 다음 네임스페이스 레코드를 로드합니다.

    • 현재 수준에서 작업을 마쳤을 때 노드를 한 단계 위로 이동하고 한 수준 높은 행을 처리합니다.

  3. 읽은 수가 우리의 LIMIT(배치 크기)에 도달할 때까지 처리를 계속합니다.

  4. 커스에 대한 데이터를 포함하는 마지막으로 처리된 행과 모든 수집된 레코드 ID를 찾습니다.

WITH RECURSIVE result AS (
  (
    SELECT
      9970 AS current_id, /* 현재 처리 중인 네임스페이스 id */
      ARRAY[9970]::int[] AS depth, /* 커서 */
      ARRAY[9970]::int[] AS ids,  /* 수집된 ids */
      1::bigint AS reads,
      'initialize' AS action
  ) UNION ALL
  (
    WITH cte AS ( /* 결과 cte를 여러 번 참조하는 트릭 */
      select * FROM result
    )
    SELECT * FROM (
      (
        SELECT /* 가지를 따라 아래로 이동 */
          namespaces.id,
          cte.depth || namespaces.id,
          cte.ids || namespaces.id,
          cte.reads + 1,
          'walkdown'
        FROM namespaces, cte
        WHERE
        namespaces.parent_id = cte.current_id
        ORDER BY namespaces.id ASC
        LIMIT 1
      ) UNION ALL
      (
        SELECT /* 동일한 수준에서 다음 요소 찾기 */
          namespaces.id,
          cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id,
          cte.ids || namespaces.id,
          cte.reads + 1,
          'next'
        FROM namespaces, cte
        WHERE
        namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND
        namespaces.id > cte.depth[array_length(cte.depth, 1)]
        ORDER BY namespaces.id ASC
        LIMIT 1
      ) UNION ALL
      (
        SELECT /* 현재 수준을 마쳤을 때 한 노드 위로 이동 */
          cte.current_id,
          cte.depth[:array_length(cte.depth, 1) - 1],
          cte.ids,
          cte.reads + 1,
          'jump'
        FROM cte
        WHERE cte.depth <> ARRAY[]::int[]
        LIMIT 1
      )
    ) next_row LIMIT 1
  )
)
SELECT current_id, depth, ids, action
FROM result
 current_id |    depth     |          ids           |   action
------------+--------------+------------------------+------------
         24 | {24}         | {24}                   | initialize
         25 | {24,25}      | {24,25}                | walkdown
         26 | {24,26}      | {24,25,26}             | next
        112 | {24,112}     | {24,25,26,112}         | next
        113 | {24,113}     | {24,25,26,112,113}     | next
        114 | {24,113,114} | {24,25,26,112,113,114} | walkdown
        114 | {24,113}     | {24,25,26,112,113,114} | jump
        114 | {24}         | {24,25,26,112,113,114} | jump
        114 | {}           | {24,25,26,112,113,114} | jump

참고: 이 쿼리를 사용하여 그룹 계층 내의 모든 네임스페이스 ID를 찾는 것은 traversal_ids 열을 기반으로 하는 현재 self_and_descendants 구현보다 느릴 수 있습니다. 위 쿼리는 그룹 계층을 통한 배치 반복을 구현할 때만 사용해야 합니다.

루비에서의 기초적인 배치 구현:

class NamespaceEachBatch
  def initialize(namespace_id:, cursor: nil)
    @namespace_id = namespace_id
    @cursor = cursor || { current_id: namespace_id, depth: [namespace_id] }
  end

  def each_batch(of: 500)
    current_cursor = cursor.dup

    first_iteration = true
    loop do
      new_cursor, ids = load_batch(cursor: current_cursor, of: of, first_iteration: first_iteration)
      first_iteration = false
      current_cursor = new_cursor

      yield ids

      break if new_cursor[:depth].empty?
    end
  end

  private

  # 네임스페이스 id 배열을 반환합니다
  def load_batch(cursor:, of:, first_iteration: false)
    recursive_cte = Gitlab::SQL::RecursiveCTE.new(:result,
      union_args: { remove_order: false, remove_duplicates: false })

    ids = first_iteration ? namespace_id.to_s : ""

    recursive_cte << Namespace.select(
      Arel.sql(Integer(cursor.fetch(:current_id)).to_s).as('current_id'),
      Arel.sql("ARRAY[#{cursor.fetch(:depth).join(',')}]::int[]").as('depth'),
      Arel.sql("ARRAY[#{ids}]::int[]").as('ids'),
      Arel.sql("1::bigint AS count")
    ).from('(VALUES (1)) AS does_not_matter').limit(1)

    cte = Gitlab::SQL::CTE.new(:cte, Namespace.select('*').from('result'))

    union_query = Namespace.with(cte.to_arel).from_union(
      walk_down,
      next_elements,
      up_one_level,
      remove_duplicates: false,
      remove_order: false
    ).select('current_id', 'depth', 'ids', 'count').limit(1)

    recursive_cte << union_query

    scope = Namespace.with
      .recursive(recursive_cte.to_arel)
      .from(recursive_cte.alias_to(Namespace.arel_table))
      .limit(of)
    row = Namespace.from(scope.arel.as('namespaces')).order(count: :desc).limit(1).first

    [
      { current_id: row[:current_id], depth: row[:depth] },
      row[:ids]
    ]
  end

  attr_reader :namespace_id, :cursor

  def walk_down
    Namespace.select(
      Arel.sql('namespaces.id').as('current_id'),
      Arel.sql('cte.depth || namespaces.id').as('depth'),
      Arel.sql('cte.ids || namespaces.id').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte, LATERAL (SELECT id FROM namespaces WHERE parent_id = cte.current_id ORDER BY id LIMIT 1) namespaces')
  end

  def next_elements
    Namespace.select(
      Arel.sql('namespaces.id').as('current_id'),
      Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id').as('depth'),
      Arel.sql('cte.ids || namespaces.id').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte, LATERAL (SELECT id FROM namespaces WHERE namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND namespaces.id > cte.depth[array_length(cte.depth, 1)] ORDER BY id LIMIT 1) namespaces')
  end

  def up_one_level
    Namespace.select(
      Arel.sql('cte.current_id').as('current_id'),
      Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1]').as('depth'),
      Arel.sql('cte.ids').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte')
      .where('cte.depth <> ARRAY[]::int[]')
      .limit(1)
  end
end

iterator = NamespaceEachBatch.new(namespace_id: 9970)
all_ids = []
iterator.each_batch do |ids|
  all_ids.concat(ids)
end

# 테스트
puts all_ids.count
puts all_ids.sort == Namespace.where('traversal_ids && ARRAY[9970]').pluck(:id).sort

예제 배치 쿼리:

SELECT
    "namespaces".*
FROM ( WITH RECURSIVE "result" AS ((
            SELECT
                15847356 AS current_id,
                ARRAY[9970,
                12061481,
                12128714,
                12445111,
                15847356]::int[] AS depth,
                ARRAY[]::int[] AS ids,
                1::bigint AS count
            FROM (
                VALUES (1)) AS does_not_matter
            LIMIT 1)
    UNION ALL ( WITH "cte" AS MATERIALIZED (
            SELECT
                *
            FROM
                result
)
            SELECT
                current_id,
                depth,
                ids,
                count
            FROM ((
                    SELECT
                        namespaces.id AS current_id,
                        cte.depth || namespaces.id AS depth,
                        cte.ids || namespaces.id AS ids,
                        cte.count + 1 AS count
                    FROM
                        cte,
                        LATERAL (
                            SELECT
                                id
                            FROM
                                namespaces
                            WHERE
                                parent_id = cte.current_id
                            ORDER BY
                                id
                            LIMIT 1
) namespaces
)
                UNION ALL (
                    SELECT
                        namespaces.id AS current_id,
                        cte.depth[:array_length(
                            cte.depth, 1
) - 1] || namespaces.id AS depth,
                        cte.ids || namespaces.id AS ids,
                        cte.count + 1 AS count
                    FROM
                        cte,
                        LATERAL (
                            SELECT
                                id
                            FROM
                                namespaces
                            WHERE
                                namespaces.parent_id = cte.depth[array_length(
                                    cte.depth, 1
) - 1]
                                AND namespaces.id > cte.depth[array_length(
                                    cte.depth, 1
)]
                            ORDER BY
                                id
                            LIMIT 1
) namespaces
)
                UNION ALL (
                    SELECT
                        cte.current_id AS current_id,
                        cte.depth[:array_length(
                            cte.depth, 1
) - 1] AS depth,
                        cte.ids AS ids,
                        cte.count + 1 AS count
                    FROM
                        cte
                    WHERE (
                        cte.depth <> ARRAY[]::int[]
)
                LIMIT 1
)
) namespaces
    LIMIT 1
))
SELECT
    "namespaces".*
FROM
    "result" AS "namespaces"
LIMIT 500) namespaces
ORDER BY
    "count" DESC
LIMIT 1

실행 계획:

 Limit  (cost=16.36..16.36 rows=1 width=76) (actual time=436.963..436.970 rows=1 loops=1)
   Buffers: shared hit=3721 read=423 dirtied=8
   I/O Timings: read=412.590 write=0.000
   ->  Sort  (cost=16.36..16.39 rows=11 width=76) (actual time=436.961..436.968 rows=1 loops=1)
         Sort Key: namespaces.count DESC
         Sort Method: top-N heapsort  Memory: 27kB
         Buffers: shared hit=3721 read=423 dirtied=8
         I/O Timings: read=412.590 write=0.000
         ->  Limit  (cost=15.98..16.20 rows=11 width=76) (actual time=0.005..436.394 rows=500 loops=1)
               Buffers: shared hit=3718 read=423 dirtied=8
               I/O Timings: read=412.590 write=0.000
               CTE result
                 ->  Recursive Union  (cost=0.00..15.98 rows=11 width=76) (actual time=0.003..432.924 rows=500 loops=1)
                       Buffers: shared hit=3718 read=423 dirtied=8
                       I/O Timings: read=412.590 write=0.000
                       ->  Limit  (cost=0.00..0.01 rows=1 width=76) (actual time=0.002..0.003 rows=1 loops=1)
                             I/O Timings: read=0.000 write=0.000
                             ->  Result  (cost=0.00..0.01 rows=1 width=76) (actual time=0.001..0.002 rows=1 loops=1)
                                   I/O Timings: read=0.000 write=0.000
                       ->  Limit  (cost=0.76..1.57 rows=1 width=76) (actual time=0.862..0.862 rows=1 loops=499)
                             Buffers: shared hit=3718 read=423 dirtied=8
                             I/O Timings: read=412.590 write=0.000
                             CTE cte
                               ->  WorkTable Scan on result  (cost=0.00..0.20 rows=10 width=76) (actual time=0.000..0.000 rows=1 loops=499)
                                     I/O Timings: read=0.000 write=0.000
                             ->  Append  (cost=0.56..17.57 rows=21 width=76) (actual time=0.862..0.862 rows=1 loops=499)
                                   Buffers: shared hit=3718 read=423 dirtied=8
                                   I/O Timings: read=412.590 write=0.000
                                   ->  Nested Loop  (cost=0.56..7.77 rows=10 width=76) (actual time=0.675..0.675 rows=0 loops=499)
                                         Buffers: shared hit=1693 read=357 dirtied=1
                                         I/O Timings: read=327.812 write=0.000
                                         ->  CTE Scan on cte  (cost=0.00..0.20 rows=10 width=76) (actual time=0.001..0.001 rows=1 loops=499)
                                               I/O Timings: read=0.000 write=0.000
                                         ->  Limit  (cost=0.56..0.73 rows=1 width=4) (actual time=0.672..0.672 rows=0 loops=499)
                                               Buffers: shared hit=1693 read=357 dirtied=1
                                               I/O Timings: read=327.812 write=0.000
                                               ->  Index Only Scan using index_namespaces_on_parent_id_and_id on public.namespaces namespaces_1  (cost=0.56..5.33 rows=29 width=4) (actual time=0.671..0.671 rows=0 loops=499)
                                                     Index Cond: (namespaces_1.parent_id = cte.current_id)
                                                     Heap Fetches: 7
                                                     Buffers: shared hit=1693 read=357 dirtied=1
                                                     I/O Timings: read=327.812 write=0.000
                                   ->  Nested Loop  (cost=0.57..9.45 rows=10 width=76) (actual time=0.208..0.208 rows=1 loops=442)
                                         Buffers: shared hit=2025 read=66 dirtied=7
                                         I/O Timings: read=84.778 write=0.000
                                         ->  CTE Scan on cte cte_1  (cost=0.00..0.20 rows=10 width=76) (actual time=0.000..0.000 rows=1 loops=442)
                                               I/O Timings: read=0.000 write=0.000
                                         ->  Limit  (cost=0.57..0.89 rows=1 width=4) (actual time=0.203..0.203 rows=1 loops=442)
                                               Buffers: shared hit=2025 read=66 dirtied=7
                                               I/O Timings: read=84.778 write=0.000
                                               ->  Index Only Scan using index_namespaces_on_parent_id_and_id on public.namespaces namespaces_2  (cost=0.57..3.77 rows=10 width=4) (actual time=0.201..0.201 rows=1 loops=442)
                                                     Index Cond: ((namespaces_2.parent_id = (cte_1.depth)[(array_length(cte_1.depth, 1) - 1)]) AND (namespaces_2.id > (cte_1.depth)[array_length(cte_1.depth, 1)]))
                                                     Heap Fetches: 35
                                                     Buffers: shared hit=2025 read=66 dirtied=6
                                                     I/O Timings: read=84.778 write=0.000
                                   ->  Limit  (cost=0.00..0.03 rows=1 width=76) (actual time=0.003..0.003 rows=1 loops=59)
                                         I/O Timings: read=0.000 write=0.000
                                         ->  CTE Scan on cte cte_2  (cost=0.00..0.29 rows=9 width=76) (actual time=0.002..0.002 rows=1 loops=59)
                                               Filter: (cte_2.depth <> '{}'::integer[])
                                               Rows Removed by Filter: 0
                                               I/O Timings: read=0.000 write=0.000
               ->  CTE Scan on result namespaces  (cost=0.00..0.22 rows=11 width=76) (actual time=0.005..436.240 rows=500 loops=1)
                     Buffers: shared hit=3718 read=423 dirtied=8
                     I/O Timings: read=412.590 write=0.000