Synchronization

Scheduling mechanics

MeidaPipe 그래프에서의 데이터 처리는 CalculatorBase 하위 클래스들로 정의된 내부의 처리 노드들 내에서 발생합니다. 스케쥴링 시스템은 각 calculator를 실행해야 하는 시기를 결정합니다.

각 그래프는 적어도 하나의 스케쥴러 큐를 갖고 있습니다. 각 스케쥴러 큐는 정확히 하나의 실행기가 있습니다. 노드는 대기열(실행자)에 정적으로 할당됩니다. 기본적으로 하나의 큐가 있으며, 실행자는 시스템 기능에 따라 여러 스레드가 있는 스레드 풀(미리 스레드들을 생성해 놓고 작업 큐에 작업이 들어올 시 미리 생성한 스레드에 작업을 할당하는 디자인 패턴.)입니다.

각 노드에는 not ready, ready, running 스케쥴링 상태가 있습니다. 준비 기능은 노드를 실행할 준비가 되었는지 여부를 결정합니다. 이 함수는 그래프 초기화 시, 노드 실행이 완료될 때마다, 노드 입력 상태가 변경될 때마다 호출됩니다.

사용되는 준비 기능은 노드 유형에 따라 다릅니다. 스트림 입력이 없는 노드를 소스 노드라고 합니다. 소스 노드는 프레임워크에 더 이상 출력할 데이터가 없다고 말할 때까지 항상 실행할 준비가 되어 있으며 이 시점에서 닫힙니다.

소스가 아닌 노드는 처리할 입력이 있고 해당 입력이 노드 입력 정책에 의해 설정된 조건에 따라 유효한 입력 세트를 형성하는 경우 준비가 됩니다. 대부분 노드는 기본 입력 정책을 사용하나 일부 노드는 다른 정책을 지정합니다.

노드가 준비되면 해당 스케쥴러 큐에 태스크가 추가되는데 이것이 우선순위 큐입니다. 우선 순위 기능은 현재 고정되어 있으며 그래프 노드의 정적 속성과 토폴로지 정렬을 고려합니다. 예를 들어, 그래프 출력 쪽에 가까운 노드는 우선 순위가 더 높고 소스 노드는 우선 순위가 가장 낮습니다.

각 큐는 Calculator 코드를 호출해 작업을 실제로 실행하는 책임이 있는 실행자에 의해 제공됩니다. 다른 실행기를 제공하고 구성할 수 있습니다. 이는 실행 자원 사용을 사용자 정의하는 데 사용할 수 있습니다. 우선 순위가 낮은 스레드에서 특정 노드를 실행합니다.


Timestamp Synchronization

MediaPipe 그래프 실행은 분산(탈중앙화)되어 있습니다. 글로벌 시계가 없고, 다른 노드가 동시에 다른 타임스탬프 데이터를 처리할 수 있습니다. 이것은 파이프라이닝을 통해 더 높은 처리량을 허용하게 됩니다.

그러나 시간 정보는 많은 perception workflows에서 매우 중요합니다. 여러 입력 스트림을 수신하는 노드는 일반적으로 어떤 방식으로든 이를 조정해야 합니다. 예를 들어, object detector가 프레임으로부터 boundary 상자 리스트를 출력할 수 있으며, 이 정보는 원래 프레임과 함께 처리해야 하는 렌더링 노드에 제공될 수 있습니다.

그러므로 MediaPipe 프레임워크 주요 책임 중 하나는 노드에 대한 입력 동기화를 제공하는 것입니다. 프레임워크 mechanics 측면에서 타임스탬프의 주 역할은 동기화 키 역할을 하는 것입니다.

더 나아가 MediaPipe는 많은 시나리오(테스트, 시뮬레이션, batch processing 등)에서 중요한 결정론적 작업을 지원하도록 설계되었으며, 그래프 작성자는 실시간 제약 조건을 충족하는 데 필요한 결정론을 완화할 수 있습니다.

동기화 및 결정론의 두 가지 목표는 몇 가지 설계 선택 기초가 됩니다. 특히, 주어진 스트림으로 푸시된 패킷에는 단조롭게 증가하는 타임스탬프가 있어야 합니다.이는 많은 노드에 대해 유용한 가정일 뿐 아니라 동기화 논리에 의존하기도 합니다. 각 스트림에는 스트림 새 패킷에 허용되는 가장 낮은 타임스탬프인 타임스탬프 경계가 있습니다. 타임스탬프가 T인 패킷이 도착하면 경계는 자동으로 T+1로 진행해 단조로운 요구 사항을 반영합니다. 이를 통해 프레임워크는 타임스탬프가 T보다 낮은 패킷이 더 이상 도착하지 않는다는 것을 확실히 알 수 있습니다.


Input policies

동기화는 노드에서 지정한 입력 정책을 사용해 각 노드에서 로컬로 처리됩니다.

DefaultInputStreamHandler에 의해 정의된 기본 입력 정책은 다음을 보장해 입력의 결정적 동기화를 제공합니다.

  • 동일한 타임스탬프를 가진 패킷이 여러 입력 스트림에 제공되면 실시간으로 도착 순서에 관계 없이 항상 함께 처리됩니다.
  • 입력 세트는 타임스탬프 오름차순으로 처리됩니다.
  • 패킷이 삭제되지 않으며 처리가 완전히 결정적입니다.
  • 노드는 위가 보장됨에 따라 가능한 한 빨리 데이터를 처리할 준비가 됩니다.

참고 : 이의 중요한 결과는 Calculator가 패킷을 출력할 때 항상 현재 입력 타임스탬프를 사용하는 경우 출력이 본질적으로 단조롭게 증가하는 타임스탬프 요구 사항을 준수한다는 것입니다.

경고 : 반면에 모든 스트림에 대해 입력 패킷을 항상 사용할 수 있단 보장은 없습니다.

작동 방식을 설명하려면 확정된 타임스탬프 정의를 소개해야 합니다. 우리는 스트림 타임스탬프가 타임스탬프 경계보다 낮으면 확정된다고 말합니다. 즉, 해당 타임스탬프 입력 상태가 취소 불가능하게 알려지면 타임스탬프가 스트림에 대해 결정됩니다. 즉, 패킷이 있거나 해당 타임스탬프가 있는 패킷이 도착하지 않을 것이라는 확신이 있습니다.

참고 : 이러한 이유로 MediaPipe는 스트림 생산자가 명시적으로 마지막 패킷이 의미하는 것보다 더 멀리, 즉, 더 엄격한 경계를 제공하기 위해 더 멀리 타임스탬프 경계를 진행하도록 허용합니다. 이를 통해 다운스트림 노드가 입력을 더 빨리 해결할 수 있습니다.

타임스탬프는 각 스트림에 대해 정산되는 경우 여러 스트림에 걸쳐 정산됩니다. 또한 타임스탬프가 확정되면 이전 모든 타임스탬프도 확정된단 의미입니다. 따라서 확정된 타임스탬프는 오름차순으로 결정적으로 처리될 수 있습니다.

이 정의가 주어지면 기본 입력 정책이 있는 Calculator는 모든 입력 스트림에 걸쳐 정산되고, 적어도 하나의 입력 스트림에 패킷을 포함하는 타임스탬프가 있는 경우 준비가 된 것입니다. 입력 정책은 확정된 타임스탬프에 대해 사용 가능한 모든 패킷을 계산기에 대한 단일 입력 세트로 제공합니다.

이 결정적 동작 결과 중 하나는 여러 입력 스트림이 있는 노드의 경우 타임스탬프가 해결될 때까지 이론적으로 무제한 대기가 있을 수 있으며 그동안 무제한 수의 패킷이 버퍼링될 수 있다는 것입니다.

따라서 사용자 지정 입력 정책도 제공합니다. 예를 들어 SyncSetInputStreamHandler에 의해 정의된 다른 동기화 세트의 입력을 분할하거나, 동기화를 모두 피하고 ImmediateInputStreamHandler에 의해 정의된 입력이 도착하는 즉시 처리합니다.


Flow control

두 가지 main flow control mechanisms이 있습니다.

  1. backpressure mechanism

스트림에 버퍼링된 패킷이 CalculatorGraphConfig::max_queue_size에 의해 정의된 제한에 도달할 때 업스트림 노드 실행을 제한합니다. 이 메커니즘은 결정적 동작을 유지하고 필요할 때 구성된 제한을 완화하는 교착 상태 방지 시스템을 포함합니다.

  1. 실시간 제약 조건에 따라 패킷을 삭제할 수 있는 특수 노드 삽입

FlowLimiterCalculator에 정의되어 있으며, 일반적으로 사용자 지정 입력 정책에 사용됩니다. 예를 들어, 공통 패턴은 최종 출력에서 흐름 제어 노드로 루프백 연결과 함께 하위 그래프 입력에 흐름 제어 노드를 배치합니다. 따라서 흐름 제어 노드는 다운스트림 그래프에서 처리 중인 타임스탬프 수를 추적하고 이 수가 제한에 도달하면 패킷을 삭제할 수 있습니다. 패킷이 업스트림으로 삭제되기 때문에 타임스탬프를 부분적으로 처리한 다음 중간 단계 사이에서 패킷을 삭제함으로써 발생하는 작업 낭비를 피할 수 있습니다.

이 Calculator 기반 접근 방식을 통해 그래프 작성자는 패킷을 삭제할 수 있는 위치를 제어할 수 있으며 리소스 제약 조건에 따라 그래프 동작을 유연히 조정하고 사용자 지정을 할 수 있습니다.

Packets

Calculator는 패킷들을 주고받으며 통신합니다. 일반적으로 각 입력 타임스탬프에서 각 입력 스트림을 따라 단일 패킷은이 전송됩니다. 패킷은 비디오의 단일 프레임 혹은 단일 정수 감지 개수와 같은 모든 종류의 데이터가 포함될 수 있습니다.

Creating a packet

패킷은 일반적으로 packet.h에 있는 mediapipe::MakePacket<T>() 또는 mediapipe::Adopt()를 통해 생성합니다.

Packet p = MakePacket<MyDataClass>("constructor_argument");
Packet p2 = p.At(Timestamp::PostStream());

또는

auto data = absl::make_unique<MyDataCalss>("constructor_argument");
Pakcet p = Adopt(data.release()).At(Timestamp:;PostStream());

Packet::Get<T>()를 사용해 패킷 내 데이터에 액세스할 수 있습니다.

Graphs

GraphConfig

GraphConfig는 Meidapipe 그래프의 토폴로지(아마 네트워크 맥락에서의 토폴로지인 듯. 노드들과 이에 연결된 회선들을 포함한 네트워크의 배열이나 구성을 개념적인 그림으로 표현한 것을 토폴로지라고 한다고 한다.)와 기능을 설명하는 명세서입니다. 명세서에서 그래프의 노드는 특정 Calcualtor의 인스턴스를 나타냅니다. 노드 유형, 입출력과 같은 필요한 모든 구성은 명세서에 설명되어야 합니다. 노드에 대한 설명에는 동기화에서 논의된 노드별 옵션, 입력 정책 및 실행 프로그램과 같은 여러 선택적 필드가 포함될 수 있습니다.
GraphConfig에는 그래프 실행기 구성, 스레드 수 및 입력 스트림 최대 대기열 크기와 같은 전역 그래프 수준 설정을 구성하기 위한 몇 가지 다른 필드가 있습니다. 여러 그래프 수준 설정은 다양한 플랫폼에서 그래프의 성능을 조정하는 데 유용합니다. 예를 들어 모바일에서 무거운 모델 추론 Calculator를 별도 실행기에 연결하면 스레드 지역성이 가능하여 실시간 응용 프로그램 성능을 향상시킬 수 있습니다.

ex)

input_stream : "in"
node {
  calculator : "PassThroughCalculator"
  input_stream : "in"
  output_stream : "out1"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out1"
  output_stream : "out2"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out2"
  output_stream : "out3"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out3"
  output_stream : "out4"
}

Subgraph

CalculatorGraphConfig를 하위 모듈로 모듈화하고 perception 솔루션 재사용을 지원하기 위해 MediaPipe 그래프를 Subgraph로 정의할 수 있습니다. 하위 그래프 공개 인터페이스는 Calculator의 공개 인터페이스와 유사한 일련의 입력 및 출력 스트림으로 구성됩니다. 그러면 하위 그래프가 Calcualtor인 것처럼 CalculatorGraphConfig에 포함될 수 있습니다. CalculatorGraphConfig에서 MediaPipe 그래프가 로드되면 각 하위 그래프 노드가 해당 Calculator 그래프로 대체됩니다. 결과적으로 하위 그래프 의미 및 성능은 해당 Calculator 그래프와 동일합니다.

ex)

  1. 하위 그래프를 정의합니다.
type : "TwoPassThroughSubgraph"
input_stream : "out1"
output_stream : "out3"

node {
  calculator : "PassThroughCalculator"
  input_stream : "out1"
  output_stream : "out2"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out2"
  output_stream : "out3"
}

하위 그래프 공개 인터페이스는 다음으로 구성되어 있습니다.

  • Graph input streams
  • Graph output streams
  • Graph input side packets
  • Graph output side packets
  1. BUILD 규칙 mediapipe_simple_subgraph를 통해 하위 그래프를 등록합니다. 매개변수 register_as는 새 하위 그래프의 구성요소 이름을 정의합니다.
mediapipe_simple_subgraph(
  name = "twopassthrough_subgraph",
  graph = "twopassthrough_subgraph.pbtxt",
  register_as = "TwoPassThroughSubgraph",
  deps = [
      "//mediapipe/calculators/core:pass_through_calculator",
      "//mediapipe/framework:calculator_graph",
  ],
)
  1. 하위 그래프를 메인 그래프에 사용합니다.

input_stream "in"
node {
  calculator : "PassThroughCalculator"
  input_stream : "in"
  output_stream : "out1"
}
node {
  calculator : "TwoPassThroughSubgraph"
  input_stream : "out1"
  output_stream : "out3"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out3"
  output_stream : "out4"
}

주기

기본적으로 MediaPipe는 Calcualtor 그래프가 비주기적이어야 하고 그래프 주기를 오류로 처리해야 합니다. 그래프에 주기가 있어야 하는 경우 그래프 구성에서 주기에 주석을 달아야 합니다. 이 섹션에서는 이를 어떻게 하는 지에 대해 설명합니다.

(참고 : 이는 변경될 수 있는 사항입니다)

샘플 코드로 mediapipe/framework/calculator_graph_test.ccCalculatorGraphTest.Cycle 단위 테스트를 사용하세요. 아래는 테스트의 순환 그래프입니다. adder의 sum 출력은 정수 source calculator에서 생성된 정수 합입니다.

calculator_graph_test.cc 예시 순환 그래프

Back Edge Annotation

각 주기 edge는 back edge로 주석 처리되어야 합니다. 이렇게 하면 back edges를 모두 제거한 후 MediaPipe의 토폴로지 정렬이 작동합니다.
일반적으로 back edges를 선택하는 방법엔 여러 가지가 있습니다. back edges로 표시된 edges는 up streams로 간주되는 노드와 down streams로 간주되는 노드에 영향을 미치며, 이는 MediaPipe가 노드에 할당하는 우선 순위에 영향을 줍니다.
예를 들어, CalculatorGraphTest.Cycle 테스트는 old_sum edge를 back edge로 표시하므로 Delay 노드는 Adder 노드의 다운스트림 노드로 간주되고 더 높은 우선 순위가 부여됩니다. 또는 delay node에 대한 sum 입력을 back edges로 표시할 수 있습니다. 이 경우 delay node는 Adder 노드의 업스트림 노드로 간주되고 더 낮은 우선 순위가 부여됩니다.

Initial Packet

정수 소스의 첫 번째 정수가 도착할 때 Adder Calculator를 실행할 수 있으려면 Adder에 대한 old_sum 입력 스트림에서 값이 0이고 타임스탬프가 동일한 초기 패킷이 필요합니다. 이 초기 패킷은 Open() 메소드의 delay Calculator에 의해 출력되어야 합니다.

Delay in a Loop

각 루프는 이전 sum 출력을 다음 정수 입력과 정렬하기 위해 지연을 발생시켜야 합니다. 이것 또한 delay node에 의해 수행됩니다. 따라서 delay node는 정수 source Calculator의 타임스탬프에 대해 다음을 알아야 합니다.

  • 첫 번째 출력 타임스탬프
  • 연속 출력 간 타임스탬프 delta

패킷 순서만 신경쓰고 패킷 타임스탬프는 무시하는 대체 스케줄링 정책을 추가해 불편함을 없앨 계획입니다. (추후 기능이 추가된다는 의미인 듯)

Early Termination of a Calculator When One Input Stream is Done

기본적으로 MeidaPipe는 모든 입력 스트림이 완료되면 source가 아닌 Calculator의 Close() 메소드를 호출합니다. 그러나 예제 그래프에서 정수 source가 완료되는 즉시 Adder 노드를 중지하려 합니다. 이는 대체 입력 스트림 핸들러인 EarlyCloseInputStreamHandler로 Adder 노드를 구성하여 수행됩니다.

Relavant Source Code

DELAY CALCULATOR

초기 패킷을 출력하는 Open() 코드와 입력 패킷에 (단위) 지연을 추가하는 Process() 코드에 유의하십시오. 위에서 언급했듯 이 delay node는 출력 스트림이 패킷 타임스탬프가 0, 1, 2, 3, ...인 입력 스트림과 함께 사용된다 가정합니다.

class UnitDelayCalculator : public Calculator {
  public : 
    static absl::Status FillExpectations(
      const CalcualtorOptions& extendable_options, PacketTypeSet* inputs,
      PacketTypeSet* outputs, PacketTypeSet* input_side_packets) {
    inputs->Index(0)->Set<int>("An Integer.");
    outputs->Index(0)->Set<int>("The inputdelayed by one time unit.");
    return absl::OkStatus();
  }

  absl::Status Open() final {
    Output()->ADD(new int(0), Timestamp(0));
    return absl::OkStatus();
  }

  absl::Status Process() final{
    const Packet& packet = Input()->Value();
    Output()->AddPacket(packet.At(pakcet.Timestamp().NextAllowedInStream()));
    return absl::OkStatus();
  }
};

GRAPH CONFIG

back_edge주석과 대체 input_stream_handler에 유의하십시오.

node {
  calculator : "GlobalCountSourceCalculator"
  input_side_packet : "global_counter"
  output_stream : "integers"
}
node {
  calculator : "IntAdderCalculator"
  input_stream : "integers"
  input_stream : "old_sum"
  input_stream_info: {
    tag_index : ":1"
    back_edge : true
  }
  output_stream : "sum"
  input_stream_handler {
    input_steram_handler : "EarlyCloseInputStreamHandler"
  }
}
node {
  calculator : "UnitDelayCalculator"
  input_stream : "sum"
  output_stream : "old_sum"
}

참고 : https://google.github.io/mediapipe/framework_concepts/graphs.html

Calculators

각 Calculators는 그래프의 노드입니다. 대부분 그래프는 calculators 안에서 실행됩니다. calculator는 0개 이상의 input streams 그리고 / 혹은 side packets들을 받고, 0개 이상의 output streams 그리고 / 혹은 side packets들을 생성합니다.


CalculatorBase

CalculatorBase 클래스의 하위 클래스를 정의하고 여러 메소드를 구현하고 새 하위 클래스를 Mediapipe에 등록해 Calculator를 만듭니다. 새 Calculator는 최소 하위 4개의 메소드는 구현되어야 합니다.

  • GetContract()
    • Calculator 제작자는 원하는 input과 output 타입을 GetContract()에 특정할 수 있습니다. 그래프가 초기화될 때 프레임워크는 연결된 입출력 패킷 유형이 GetContract()에서 지정한 정보와 일치하는지 확인하기 위해 정적 메소드를 호출합니다.
  • Open()
    • 그래프가 시작된 후 프레임워크는 Open()을 호출합니다. 이 시점에서 입력 측 패킷을 Calculator에서 사용 가능합니다.Open()은 node configuration operations을 해석하고 per-graph-run 상태를 준비합니다. 또한 이 함수는 Calculator 출력에 패킷을 쓸 수 있습니다. Open() 중 에러 발생 시 그래프 실행을 중지할 수 있습니다.
  • Process()
    • framework는 적어도 하나의 입력 스트림에 사용 가능한 패킷이 있을 때마다 Process()를 반복적으로 호출합니다. framework는 기본적으로 모든 입력이 동일한 타임스탬프를 갖도록 보장합니다. 병렬 실행이 활성화된 경우 여러 Process()를 동시에 호출할 수 있습니다. Process() 중 에러 발생 시 framework는 Close()를 호출하고 그래프 실행을 중단시킵니다.
  • Close()
    • Process()의 모든 호출이 중단되거나 모든 입력 스트림들이 닫히면 framework는 Close()를 호출합니다. 이 함수는 항상 Open()이 호출되어 성공한 경우와 오류로 인해 그래프 실행이 종료된 경우에 항상 호출됩니다. Close() 동안 입력 스트림을 통해 사용할 수 있는 입력은 없으나 여전히 입력 측 패킷에 접근할 수 있어 출력을 사용할 수 있습니다. Close()가 반환된 후 Calculator는 죽은 노드로 간주되어야 합니다. Calculator는 그래프 실행이 완료되는 즉시 소멸됩니다.
class CalculatorBase {
  public:
    static absl::Status GetContract(CalculatorContract* cc);

    virtual absl::Status Open(CalculatorContext* cc){
      return absl::OkStatus();
    }

    virtual absl::Status Process(CalculatorContext* cc) = 0;

    virtual absl::Status Close(CalculatorContext* cc) {
      return absl::OkStatus();
    }
};

Life of a calculator

Mediapipe 그래프가 초기화되는 동안 frameworks는 GetContract() static method를 호출하여 예상되는 패킷 종류를 결정합니다.

framework는 각 그래프가 실행되는 데에 대한 전체 Calculator를 구축하고 파괴합니다. 그래프 실행에서 일정하게 유지되는 비싸거나 큰 개체는 입력 측 패킷으로 제공되어야 후속 실행에서 계산이 반복되지 않습니다.

초기화 이후 각 그래프가 실행될 때 다음 시퀀스가 발생합니다. (GetContract()는 초기화되는 동안 바로 발생하므로 포함 X)

  • Open()
  • Process()
  • Close()

 

framework는 calculator 초기화를 위해 Open()을 호출합니다. Open()은 모든 옵션을 해석하고 Calculator의 그래프 per-graph-run 상태를 설정해야 합니다. Open()은 입력 측 패킷을 얻고 패킷을 Calculator 출력에 쓸 수 있습니다. 적절한 경우 SetOffset()을 호출해 입력 스트림의 잠재적 패킷 버퍼링을 줄여야 합니다.
Open()이나 Process() 중 에러가 발생한 경우 Calculaotr의 메소드를 더 이상 호출하지 않고 그래프 실행이 종료되고, Calculator가 파괴됩니다.


입력이 있는 Calculator의 경우 framework는 하나 이상의 입력에 사용 가능한 패킷이 있을 때마다 Process()를 호출합니다. framework는 입력이 모두 동일한 타임스탬프를 갖고 Process()를 호출할 때마다 타임스탬프가 증가하며 모든 패킷이 전달되도록 보장합니다. 결과적으로 일부 입력에는 Process()가 호출될 때 패킷이 없을 수 있습니다. 패킷이 누락된 입력은 타임스탬프가 없는 빈 패킷을 생성하는 것으로 나타납니다.


framework는 모든 Process()가 호출된 후 Close()를 호출합니다. 모든 입력이 소진되었으나 Close()는 입력 측 패킷에 엑세스할 수 있어 출력을 쓸 수 있습니다. Close()가 반환된 후 Calculator는 파괴됩니다.
입력이 없는 Calculator는 source라고 합니다. source Calculator는 Ok 상태를 반환하는 한 계속해서 Process()를 호출합니다. source Calculator는 중지 상태(ex - Mediapipe::tool::StatusStop)를 반환해 소진되었음을 나타냅니다.


Identifying inputs and outputs

Calculator에 대한 공용 인터페이스는 입력 스트림과 출력 스트림 세트로 구성됩니다. CalculatorGraphConfiguration에서 일부 Calculator 출력은 명명된 스트림을 사용해 다른 Calculator 입력에 연결됩니다. 스트림 이름은 일반적으로 소문자이고 입력 및 출력 태그는 일반적으로 대문자입니다.

 

ex)

node {
  calculator : "SomeAudioVideoCalculator"
  input_stream : "INPUT:combined_input"
  output_stream : "VIDEO:video_stream"
}

node {
  calculator : "SomeVideoCalculator"
  input_stream : "VIDEO_IN:video_stream"
  output_stream : "VIDEO_OUT:processed_video"
}

입력, 출력 스트림들은 인덱스 번호, 태그 이름, 태그 이름과 인덱스 번호의 결합으로 식별될 수 있습니다.

 

ex)

node {
  calculator : "SomeAudioVideoCalculator"
  input_stream : "combined_input"
  output_stream : "VIDEO:video_stream" # 태그 이름
  output_strema : "AUDIO:0:audio_left" # 태그 이름 + 인덱스 번호
  output_stream : "AUDIO:1:audio_right"
}

node  {
  calculator : "SomeAudioCalculator"
  input_stream : "audio_left"
  input_stream : "audio_right"
  output_stream : "audio_energy"
}

Calculator 구현에서 입출력은 태그 이름과 인덱스 번호로 식별됩니다.

 

ex)

  • 인덱스 번호 : 결합된 입력 스트림은 단순히 인덱스 0으로 식별됩니다.
  • 태그 이름 : 비디오 출력 스트림은 "VIDEO" 태그 이름으로 식별됩니다.
  • 태그 이름 + 인덱스 번호 : 출력 오디오 스트림은 "AUDIO" 태그 이름과 0, 1 인덱스 번호로 식별됩니다.
class SomeAudioVideoCalculator : public CalculatorBase {
  public :
    static absl::Status GetContract(CalcaultorContract* cc) {
      cc->Inputs().Index(0).SetAny();
      // SetAny()는 스트림 유형이 무엇이든 허용 가능함을 지정하는 데 사용됩니다.
      // 그렇다고 모든 패킷이 허용되는 건 아니고 여전히 특정 유형이 있습니다.
      // SetAny()는 명시적으로 유형을 스트림 유형으로 설정하는 것과 동일한 효과가 있습니다.
      cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
      cc->Outputs().Tag("AUDIO", 0).Set<Matrix>();
      cc->Outputs().Tag("AUDIO", 1).Set<Matrix>();
      return absl::OkStatus();
}

Processing

non-source node(소스 노드가 아닌 노드 = 입력이 있는 Calculator)에서 호출된 Process()는 모두 잘 진행되었음을 나타내기 위해 absl::OkStatus()를 반환하거나 오류를 알리기 위해 다른 상태코드를 반환해야 합니다.

만약 non-source node가 tool::StatusStop()를 반환하면 그래프가 일찍 취소된다는 신호입니다. 이러한 경우, 모든 source calculators와 그래프 입력 스트림들은 닫힙니다. 남은 패킷들은 그래프를 통해 전파됩니다.

 

그래프에 있는 source node는 absl::OkStatus()를 반환할 때까지 계속해서 Process()를 호출합니다. 더 이상 생성할 데이터가 없음을 나타내려면 tool::StatusStop()을 반환합니다. 다른 모든 상태는 오류가 발생했음을 나타냅니다.

 

Close()는 성공을 나타내는 absl::OkStatus를 반환합니다. 다른 상태들은 실패를 나타냅니다.

 

ex)

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Calculator options

Calculators는 입력 스트림 패킷, 입력 사이드 패킷, calculator 옵션을 통해 매개변수를 수락합니다. 지정된 경우 calculator 옵션은 CalculatorGraphConfiguration.Node 메시지의 node_options 필드에 리터럴 값(immutable한 데이터 값)으로 나타납니다.

 

ex)

node {
  calculator : "TFLiteInferenceCalculator"
  input_stream : "TENSORS : main_model_input"
  output_stream : "TENSORS : main_model_output"
  node_options: {
    [type.googleapis.com/mediapipe.TFLiteInferenceCalculatorOptions] {
      model_path : "mediapipe/models/detection_model.tflite"
    }
  }
}

모든 Calculator에서 calculator 옵션을 사용할 수 있는 것은 아닙니다.

 

ex)

옵션을 수락하기 위해 Calculator는 일반적으로 PacketClonerCalculatorOptions와 같은 옵션을 나타내는 새로운 protobuf 메시지 유형을 정의합니다. 그 다음 Calculator는 CalculatorBase::Open 메소드에서 해당 protobuf 메시지를 읽습니다. 또한 CalculatorBase::GetContract 함수 또는 CalculatorBase::Process 메소드에서도 가능합니다. 일반적으로 새로운 protobuf 메시지 유형은 ".proto" 파일과 mediapipe_proto_library() 빌드 규칙(bazel)을 사용해 protobuf 스키마로 정의됩니다.

mediapipe_proto_library(
  name = "packet_cloner_calculator_proto",
  srcs = ["packet_cloner_calculator.proto"],
  visibility = ["//visibility:public"],
  deps = [
    "//mediapipe/framework:calculator_options_proto",
    "//mediapipe/framework:calculator_proto",
  ],
)

Example calculator

이 섹션에서는 PacketClonerCalculator 구현에 대해 설명합니다. PacketClonerCalculator는 요청 시 가장 최근 입력 패킷 복사본을 생성합니다.

PacketClonerCalculator는 도착하는 데이터 패킷 타임스탬프가 완벽히 정렬되지 않은 경우 유용합니다. 마이크, 광센서 및 비디오 카메라가 있는 방이 있다고 가정합니다. 각 센서는 독립적으로 작동하며 간헐적으로 데이터를 수집합니다. 각 센서 출력은 다음과 같다 가정합니다.

  • microphone = 방 소리 크기(데시벨) Integer
  • light sensor = 방 밝기 Integer
  • video camera = 방의 RGB 이미지 프레임 ImageFrame

우리의 단순한 퍼셉트론 파이프라인은 마지막으로 수집된 마이크 음량 데이터 및 광센서 밝기 데이터와 동기화되는 카메라 이미지 프레임 데이터가 있을 때 언제든 이 3개 센서의 데이터를 처리하도록 디자인되었습니다. 이를 MediaPipe와 수행하기 위해 우리 퍼셉트론 파이프라인은 세 입력 스트림들을 가집니다.

  • room_mic_signal - 이 입력 스트림의 각 데이터 패킷은 타임스탬프가 있는 방에서 오디오가 얼마나 큰지 나타내는 정수 데이터입니다.
  • room_lightening_sensor - 이 입력 스트림의 각 데이터 패킷은 타임스탬프가 있는 방에서의 밝기를 나타내는 정수 데이터입니다.
  • room_video_tick_signal - 이 입력 스트림의 각 데이터 패킷은 타임스탬프가 있는 방의 카메라에서 수집된 비디오를 나타내는 비디오 데이터 이미지프레임입니다.

다음은 PacketClonerCalculator 구현입니다.
current_ = 가장 최근 입력 패킷을 보유하는 인스턴스 변수

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framwork.h"

namespace mediapipe {

// 마지막 스트림에서 수신된 모든 패킷에 대해 다른 모든 스트림에서 얻은 최신 패킷을 출력합니다.
// 따라서 마지막 스트림이 다른 스트림보다 더 높은 속도로 출력되면 
// 다른 스트림 패킷을 마지막 스트림과 일치하도록 효과적으로 복제합니다.
// Example config :
// node {
//   calculator : "PacketClonerCalculator"
//   input_stream : "first_base_signal"
//   input_stream : "second_base_signal"
//   output_stream : "cloned_first_base_signal"
//   output_stream : "cloned_second_base_signal"
// }
//

class PacketClonerCalculator : public CalculatorBase {
  public:
    static absl::Status GetContract(CalculatorContract* cc) {
      const int tick_signal_index = cc->Inputs().NumEntries() - 1;

      for (int i=0; i < tick_signal_index; ++i) {
        cc->Inputs().Index(i).SetAny();
        cc->Ouputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
      }
      cc->Inputs().Index(tick_signal_index).SetAny();
      return absl::OkStatus();
    }

    absl::Status Open(CalculatorContext* cc) final {
      tick_signal_index_ = cc->Inputs().NumEntries() - 1;
      current_.resize(tick_signal_index_);
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!cc->Inputs().Index(i).Header().IsEmpty()) {
          cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        }
      }
      return absl::OkStatus();
    }

    absl::Status Process(CalculatorContext* cc) final {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!cc->Inputs().Index(i).Value().IsEmpty()) {
          current_[i] = cc->Inputs().Index(i).Value();
        }
      }

      if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
        for (int i = 0; i < tick_signal_index_; ++i) {
          if (!current_[i].IsEmpty()) { 
            cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          } else {
            cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          }
        }
      }
      return absl::OkStatus();
    }

  private:
    std::vector<Packet> current_;
    int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}

일반적으로 calculator는 REGISTER_CALCULATOR(calculator_class_name)을 통해 등록하여 calculator를 알릴 수 있으므로 .cc 파일만 필요하고, .h는 필요하지 않습니다.
아래는 3개 입력 스트림, 1개의 노드(PacketClonerCalculator) 그리고 2개의 출력 스트림이 있는 Mediapipe 그래프입니다.

input_stream : "room_mic_signal"
input_stream : "room_lighting_sensor"
input_stream : "room_video_tick_signal"

node {
  calculator : "PacketClonerCalcualtor"
  input_stream : "room_mic_signal"
  input_stream : "room_lighting_sensor"
  input_stream : "room_video_tick_signal"
  output_stream : "cloned_room_mic_signal"
  output_stream : "cloned_lighting_sensor"
}

 

PacketClonerCalculator 작동 다이어그램

 

참고 : https://google.github.io/mediapipe/framework_concepts/calculators.html

+ Recent posts