트리 계층 구조에서 일괄 반복 (콘셉트 증명)

GitLab의 그룹 계층 구조는 루트 요소가 최상위 네임스페이스이고, 하위 요소가 서브그룹이나 최근에 도입된 Namespaces::ProjectNamespace 레코드인 트리로 표시됩니다.

이 트리는 parent_id 열을 통해 namespaces 테이블에 구현되어 있습니다. 이 열은 상위 네임스페이스 레코드를 가리킵니다. 최상위 네임스페이스에는 parent_id가 없습니다.

gitlab-org의 일부 계층 구조:

flowchart TD A("gitlab-org (9979)") --- B("quality (2750817)") B --- C("engineering-productivity (16947798)") B --- D("performance-testing (9453799)") A --- F("charts (5032027)") A --- E("ruby (14018648)")

그룹 계층 구조를 효율적으로 반복하는 것은 여러 잠재적인 사용 사례가 있습니다. 특히 안정적이고 안전한 실행이 빠른 실행보다 중요한 백그라운드 작업에서 필요합니다. 일괄 처리는 더 많은 네트워크 왕복이 필요하지만 각 일괄 처리는 유사한 성능 특성을 제공합니다.

일부 예시:

  • 각 하위 그룹에 대해 작업 수행
  • 계층 구조에 있는 각 프로젝트에 대해 작업 수행
  • 계층 구조에 있는 각 이슈에 대해 작업 수행

문제 설명

그룹 계층 구조가 커져서 단일 쿼리로 로드할 수 없을 수 있습니다. 쿼리는 문 제 시간 초과 오류가 발생합니다.

매우 큰 그룹과 관련된 확장성 문제를 해결하려면 동일한 데이터를 다른 형식(비정규화)으로 저장해야 합니다. 그러나 그룹 계층 구조를 로드할 수 없는 경우에는 비정규화를 구현할 수 없습니다.

비정규화 기술 중 하나는 주어진 그룹에 대한 모든 후손 그룹 ID를 저장하는 것입니다. 이는 그룹과 하위 그룹을 로드해야 하는 쿼리를 속도가 빨라집니다. 예시:

flowchart TD A(1) --- B(2) A --- C(3) C --- D(4)
그룹 ID 하위 그룹 ID
1 [2,3,4]
2 []
3 [4]
4 []

이 구조를 사용하면 모든 하위 그룹을 결정하는 데 데이터베이스에서 하나의 행만 읽어야 하며 1000개의 그룹과 같은 큰 계층에 대해서는 큰 차이를 만들 수 있습니다.

계층 구조를 읽는 문제는 이러한 비정규화로 해결됩니다. 그러나 이 데이터를 테이블에 유지해야 합니다. 그룹과 계층이 매우 커질 수 있으므로 여기서 단일 쿼리가 작동될 것으로 기대할 수 없습니다.

SELECT id FROM namespaces WHERE traversal_ids && ARRAY[9970]

위의 쿼리는 큰 그룹에 대해 시간 초과될 수 있으므로 데이터를 일괄 처리해야 합니다.

트리에 일괄 처리 로직을 구현하는 것은 우리가 이전에 살펴본 적이 없고, 구현도 상당히 복잡합니다. EachBatch 또는 find_in_batches 기반 솔루션은 다음과 같은 이유로 작동하지 않을 것입니다:

  • 데이터(그룹 ID)는 계층 구조에서 정렬되지 않습니다.
  • 하위 그룹의 그룹은 최상위 그룹 ID를 알지 못합니다.

알고리즘

일괄 쿼리는 재귀 CTE SQL 쿼리로 구현되며, 한 번의 일괄 처리는 최대 N개의 행을 읽습니다. 트리 구조 때문에 N개의 행을 읽는다는 것은 반드시 N개의 그룹 ID를 읽는다는 것이 아닐 수도 있습니다. 트리가 비최적적인 방식으로 구성되어 있다면 일괄은 더 적은(하지만 더 많지 않은) 그룹 ID를 반환할 수 있습니다.

쿼리는 깊이 우선 트리 운행 로직을 구현하여 DB가 트리의 첫 번째 가지를 잎 요소에 이르기까지 스캔합니다. 우리는 깊이 우선 알고리즘을 구현하는 이유는 일괄 처리를 마치면 쿼리가 다음 일괄 처리(커서)에 대한 정보를 충분히 반환해야 하기 때문입니다. GitLab에서는 트리의 깊이를 20으로 제한하는데, 이는 최악의 경우 쿼리가 19개의 요소를 포함하는 커서를 반환할 수 있음을 의미합니다.

너비 우선 트리 운행 알고리즘을 구현하는 것은 현실적이지 않을 수 있습니다. 왜냐하면 그룹에는 제한되지 않는 수의 후손이 있을 수 있으므로 매우 큰 커서를 얻을 수 있기 때문입니다.

  1. 현재 처리중인 그룹 ID(최상위 그룹 ID), 두 배열(트리 깊이 및 수집된 ID), 쿼리에서 읽은 행 수를 포함하는 초기화 행을 생성합니다.
  2. 행을 재귀적으로 처리하고 다음 중 하나를 수행합니다(조건이 일치하는 경우):
    • 첫 번째 하위 네임스페이스를 로드하고 우리가 잎 노드에 있지 않다면 현재 처리 중인 네임스페이스 ID를 업데이트합니다. (가지를 아래로 내려가기)
    • 현재 깊이에서 다음 네임스페이스 레코드를 로드하고 남은 행이 있는 경우. (현재 수준에서 다음 네임스페이스 레코드 로드)
    • 하나의 노드를 올라가서 더 높은 수준의 행을 처리합니다.
  3. 읽은 행 수가 LIMIT(일괄 크기)에 도달할 때까지 처리를 계속합니다.
  4. 커서 및 수집된 레코드 ID를 포함하는 마지막으로 처리된 행을 찾습니다.
WITH RECURSIVE result AS (
  (
    SELECT
      9970 AS current_id,                 /* 현재 처리 중인 네임스페이스 ID */
      ARRAY[9970]::int[] AS depth,         /* 커서 */
      ARRAY[9970]::int[] AS ids,           /* 수집된 ID들 */
      1::bigint AS reads,                  /* 쿼리에서 읽은 수 */
      'initialize' AS action
    ) UNION ALL
    (
      WITH cte AS ( /* result CTE를 여러 번 참조하는 트릭 */
        select * FROM result
      )
      SELECT * FROM (
        (
          SELECT /* 가지 아래로 내로 내려가기 */
            namespaces.id,
            cte.depth || namespaces.id,
            cte.ids || namespaces.id,
            cte.reads + 1,
            'walkdown'
          FROM namespaces, cte
          WHERE
          namespaces.parent_id = cte.current_id
          ORDER BY namespaces.id ASC
          LIMIT 1
        ) UNION ALL
        (
          SELECT /* 같은 레벨의 다음 요소 찾기 */
            namespaces.id,
            cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id,
            cte.ids || namespaces.id,
            cte.reads + 1,
            'next'
          FROM namespaces, cte
          WHERE
          namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND
          namespaces.id > cte.depth[array_length(cte.depth, 1)]
          ORDER BY namespaces.id ASC
          LIMIT 1
        ) UNION ALL
        (
          SELECT /* 현재 수준의 처리를 마치면 한 수준 위로 이동 */
            cte.current_id,
            cte.depth[:array_length(cte.depth, 1) - 1],
            cte.ids,
            cte.reads + 1,
            'jump'
          FROM cte
          WHERE cte.depth <> ARRAY[]::int[]
          LIMIT 1
        )
      ) next_row LIMIT 1
    )
)
SELECT current_id, depth, ids, action
FROM result
 current_id |    depth     |          ids           |   action
------------+--------------+------------------------+------------
         24 | {24}         | {24}                   | initialize
         25 | {24,25}      | {24,25}                | walkdown
         26 | {24,26}      | {24,25,26}             | next
        112 | {24,112}     | {24,25,26,112}         | next
        113 | {24,113}     | {24,25,26,112,113}     | next
        114 | {24,113,114} | {24,25,26,112,113,114} | walkdown
        114 | {24,113}     | {24,25,26,112,113,114} | jump
        114 | {24}         | {24,25,26,112,113,114} | jump
        114 | {}           | {24,25,26,112,113,114} | jump
note
그룹 계층 구조의 모든 네임스페이스 ID를 찾기 위해 이 쿼리를 사용하는 것은 traversal_ids 열을 기반으로 하는 현재 self_and_descendants 구현과 같은 다른 쿼리 방법보다 느릴 수 있습니다. 위의 쿼리는 그룹 계층 구조를 일괄 처리하는 경우에만 사용해야 합니다.

루비에서의 초기 일괄 처리 구현:

class NamespaceEachBatch
  def initialize(namespace_id:, cursor: nil)
    @namespace_id = namespace_id
    @cursor = cursor || { current_id: namespace_id, depth: [namespace_id] }
  end
  
  def each_batch(of: 500)
    current_cursor = cursor.dup
    
    first_iteration = true
    loop do
      new_cursor, ids = load_batch(cursor: current_cursor, of: of, first_iteration: first_iteration)
      first_iteration = false
      current_cursor = new_cursor
      
      yield ids
      
      break if new_cursor[:depth].empty?
    end
  end
  
  private
  
  # yields array of namespace ids
  def load_batch(cursor:, of:, first_iteration: false)
    recursive_cte = Gitlab::SQL::RecursiveCTE.new(:result,
      union_args: { remove_order: false, remove_duplicates: false })
    
    ids = first_iteration ? namespace_id.to_s : ""
    
    recursive_cte << Namespace.select(
      Arel.sql(Integer(cursor.fetch(:current_id)).to_s).as('current_id'),
      Arel.sql("ARRAY[#{cursor.fetch(:depth).join(',')}]::int[]").as('depth'),
      Arel.sql("ARRAY[#{ids}]::int[]").as('ids'),
      Arel.sql("1::bigint AS count")
    ).from('(VALUES (1)) AS does_not_matter').limit(1)
    
    cte = Gitlab::SQL::CTE.new(:cte, Namespace.select('*').from('result'))
    
    union_query = Namespace.with(cte.to_arel).from_union(
      walk_down,
      next_elements,
      up_one_level,
      remove_duplicates: false,
      remove_order: false
    ).select('current_id', 'depth', 'ids', 'count').limit(1)
    
    recursive_cte << union_query
    
    scope = Namespace.with
      .recursive(recursive_cte.to_arel)
      .from(recursive_cte.alias_to(Namespace.arel_table))
      .limit(of)
    row = Namespace.from(scope.arel.as('namespaces')).order(count: :desc).limit(1).first
    
    [
      { current_id: row[:current_id], depth: row[:depth] },
      row[:ids]
    ]
  end
  
  attr_reader :namespace_id, :cursor
  
  def walk_down
    Namespace.select(
      Arel.sql('namespaces.id').as('current_id'),
      Arel.sql('cte.depth || namespaces.id').as('depth'),
      Arel.sql('cte.ids || namespaces.id').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte, LATERAL (SELECT id FROM namespaces WHERE parent_id = cte.current_id ORDER BY id LIMIT 1) namespaces')
  end
  
  def next_elements
    Namespace.select(
      Arel.sql('namespaces.id').as('current_id'),
      Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1] || namespaces.id').as('depth'),
      Arel.sql('cte.ids || namespaces.id').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte, LATERAL (SELECT id FROM namespaces WHERE namespaces.parent_id = cte.depth[array_length(cte.depth, 1) - 1] AND namespaces.id > cte.depth[array_length(cte.depth, 1)] ORDER BY id LIMIT 1) namespaces')
  end
  
  def up_one_level
    Namespace.select(
      Arel.sql('cte.current_id').as('current_id'),
      Arel.sql('cte.depth[:array_length(cte.depth, 1) - 1]').as('depth'),
      Arel.sql('cte.ids').as('ids'),
      Arel.sql('cte.count + 1').as('count')
    ).from('cte')
      .where('cte.depth <> ARRAY[]::int[]')
      .limit(1)
  end
end

iterator = NamespaceEachBatch.new(namespace_id: 9970)
all_ids = []
iterator.each_batch do |ids|
  all_ids.concat(ids)
end

# Test
puts all_ids.count
puts all_ids.sort == Namespace.where('traversal_ids && ARRAY[9970]').pluck(:id).sort

예시 일괄 쿼리:

```sql SELECT “namespaces”.* FROM ( WITH RECURSIVE “result” AS (( SELECT 15847356 AS current_id, ARRAY[9970, 12061481, 12128714, 12445111, 15847356]::int[] AS depth, ARRAY[]::int[] AS ids, 1::bigint AS count FROM ( VALUES (1)) AS does_not_matter LIMIT 1) UNION ALL ( WITH “cte” AS MATERIALIZED ( SELECT * FROM result ) SELECT current_id, depth, ids, count FROM (( SELECT namespaces.id AS current_id, cte.depth || namespaces.id AS depth, cte.ids || namespaces.id AS ids, cte.count + 1 AS count FROM cte, LATERAL ( SELECT id FROM namespaces WHERE parent_id = cte.current_id ORDER BY id LIMIT 1 ) namespaces ) UNION ALL ( SELECT namespaces.id AS current_id, cte.depth[:array_length( cte.depth, 1 ) - 1] || namespaces.id AS depth, cte.ids || namespaces.id AS ids, cte.count + 1 AS count FROM cte, LATERAL ( SELECT id FROM