Real-time Streams

Real-time timestmaps

Mediapipe calculator 그래프는 interactive 어플리케이션을 위한 비디오 또는 오디오 프레임의 스트림을 처리하는 데 자주 사용됩니다. Mediapipe 프레임워크는 연속적 패킷에 단조롭게 증가하는 타임스탬프를 할당하기만 하면 됩니다. 관례에 따라 실시간 Calculator와 그래프는 기록 시간 또는 각 프레임의 표시 시간을 타임스탬프로 사용하며, 각 타임스탬프는 1월 1일 이후 마이크로초를 나타냅니다.


Real-time shceduling

일반적으로 각 Calculator는 주어진 타임스탬프에 대한 모든 입력 패킷을 사용할 수 있게 되는 즉시 실행됩니다. 일반적으로 이것은 Calculator가 이전 프레임 처리를 완료하고 입력을 생성하는 각 Calculator가 현재 프레임 처리를 완료했을 때 발생합니다. Mediapipe 스케쥴러는 이러한 조건이 충족되는 즉시 각 Calculator를 호출합니다.


Timestamp bounds

Calculator가 주어진 타임스탬프에 대해 출력 패킷을 생성하지 않으면 대신 해당 타임스탬프에 대해 패킷이 새성되지 않음을 나타내는 Timestamp bounds를 출력할 수 있습니다. 이 표시는 해당 타임스탬프의 특정 스트림에 대해 패킷이 도착하지 않은 경우에도 다운스트림 Calculator가 해당 타임스탬프에서 실행되도록 하는데 필요합니다. 이는 각 계산기가 가능한 한 빨리 처리를 시작하는 것이 중요한 interactive 어플리케이션의 실시간 그래프에 특히 중요합니다.

그래프가 다음을 따름을 생각하십시오.

node {
  calculator : "A"
  input_stream : "alpha_in"
  output_stream : "alpha"
}
node {
  calculator : "B"
  input_stream : "alpha"
  input_stream : "foo"
  output_stream : "beta"
}

가정 : 타임스탬프 T에서 노드 A는 출력 스틀미 알파로 패킷을 보내지 않습니다. 노드 B는 타임스탬프 T에서 foo 패킷을 받고 타임스탬프 T에서 알파 패킷을 기다리고 있습니다. AB에게 알파에 대한 타임스탬프 바운드 업데이트를 보내지 않으면 B는 패킷이 알파로 도착할 때까지 계속 기다립니다. 한편, foo 패킷 큐는 T, T+1 등에서 패킷을 누적합니다.

스트림에서 패킷을 출력하기 위해 Calcualtor는 API 함수 CalculatorContext::OutputsOutputStream::Add를 사용합니다. 대신 스트림에 바인딩된 타임스탬프를 출력하기 위해 Calculator는 API 함수 CalculatorContext::OutputsCalcualtorContext::SetNextTimestmapBound를 사용할 수 있습니다. 지정된 경계는 지정된 출력 스트림 다음 패킷에 대해 허용되는 가장 낮은 타임스탬프입니다. 패킷이 출력되지 않으면 Calcualtor는 일반적으로 다음과 같은 작업을 수행합니다.

cc->Outputs().Tag("output_frame").SetNextTimestampBound(
  cc->InputTimestamp().NextAllowedInStream());

Timestamp::NextAllowedInStream 함수는 연속적인 타임스탬프를 반환합니다. ex) Timestamp(1).NextAllowedInStream() == Timestamp(2)


Propagating timestamp bound

실시간 그래프에 사용되는 Calculator는 다운스트림 Calculator를 즉시 예약할 수 있도록 입력 타임스탬프 경계를 기반으로 출력 타임스탬프 경게를 정의해야 합니다. 일반적 패턴은 Calculator가 입력 패킷과 동일한 타임스탬프를 가진 패킷을 출력하는 것입니다. 이 경우 Calcualtor::Process를 호출할 때마다 패킷을 출력하는 것만으로도 출력 타임스탬프 경계를 정의하기에 충분합니다.

그러나 Calculator는 출력 타임스탬프에 대해 이 일반적 패턴을 따를 필요가 없으며 단조롭게 증가하는 출력 타임스탬프를 선택하기만 하면 됩니다. 결과적으로 특정 Calculator는 타임스탬프 경계를 명시적으로 계산해야 합니다. MediaPipe는 각 계산기에 대한 적절한 타임스탬프 경계를 계산하기 위한 여러 도구를 제공합니다.

  1. SetNextTimestampBound() 는 출력 스트림에 대해 타임스탬프 경계 t+1을 지정하는 데 사용할 수 있습니다.
cc->Outputs.Tag("OUT").SetNextTimestampBound(t.NextAllowedInStream());

또는 타임스탬프가 t인 빈 패킷을 생성해 타임스탬프 경계 t+1을 지정할 수 있습니다.

cc->Outputs.Tag("OUT").Add(Packet(), t);

입력 스트림의 타임스탬프 경계는 패킷 또는 입력 스트림 빈 패킷으로 표시됩니다.

Timestamp bound = cc->Inputs().Tag("IN").Value().Timestamp();
  1. TimestampOffset() 입력 스트림에서 출력 스트림으로 바인딩된 타임스탬프를 자동으로 복사하기 위해 지정할 수 있습니다.
cc->SetTimestampOffset(0);
  1. ProcessTimestampBounds() 는 각각 새로운 정착된 타임스탬프 (현재 타임스탬프 경계 아래 새로운 최고 타임스태믚)에 대해 Calculator::Process를 호출하기 위해 지정할 수 있습니다. ProcessTimestampBoudns() 가 없으면 Calculator::Process는 하나 이상의 도착 패킷으로만 호출됩니다.
cc->SetProcessTimestampBounds(true);

이 설정을 사용하면 입력 타임스탬프만 업데이트된 경우에도 Calculator가 자체 타임스탬프 경계 계산 및 전파를 수행할 수 있습니다. TimestampOffset()의 효과를 복제하는 데 사용할 수 있지만 추가 요소를 고려하는 타임스탬프 경계를 계산하는 데 사용할 수도 있습니다.

예를 들어, SetTimestampOffset(0)을 복제하기 위해 Calculator는 다음을 수행할 수 있습니다.

absl::Status Open(CalculatorContext* cc) {
  cc->SetProcessTimestampBounds(true);
}

absl::Status Process(CalculatorContext* cc) {
  cc->Outputs.Tag("OUT").SetNextTimestampBound(
      cc->InputTimestamp().NextAllowedInStream());
}

Scheduling of Calculator::Open and Calculator::Close

Calculator::Open은 필요한 모든 사이드 패킷이 생성되면 호출됩니다. 입력 사이드 패킷은 인클로징 어플리케이션 또는 그래프 내부의 사이드 패킷 Calculator 에서 제공할 수 있습니다.
API의 CalculatorGraph::InitializeCalculatorGraph::StartRun을 사용해 그래프 외부에서 사이드 패킷을 지정할 수 있습니다.
CalculatorGraphConfig::OutputSidePacketsOutputSidePacket::Set을 사용해 그래프 내 Calculator에서 사이드 패킷을 지정할 수 있습니다.

CalculatorGraph::Close는 모든 입력 스트림이 닫히거나 타임스탬프 바운드 Timestamp::Done에 도달하여 Done이 되면 호출됩니다.

참고 : 그래프가 보류 중인 모든 Calculator 실행을 완료하고 완료되면 일부 스트림이 완료되기 전에 MediaPipe가 Calculator::Close에 대한 나머지 호출을 호출해 모든 Calculator가 최종 출력을 생성할 수 있도록 합니다.

TimestampOffset을 사용하면 Calculator::Close에 몇 가지 의미가 생깁니다. SetTimestampOffset(0)을 지정하는 Calculator는 모든 입력 스트림이 Timestamp::Done에 도달했을 때 모든 출력 스트림이 TimeStamp::Done에 도달했음을 신호로 표시하므로 더 이상의 출력이 불가능합니다. 이렇게 하면 이러한 Calculator가 Calculator::Close동안 패킷을 내보내는 것을 방지할 수 있습니다. Calculator가 Calculator::Close 중에 요약 패킷을 생성해야 하는 경우 Calculator::ProcessCalculator::Close 중에 최소한 하나의 타임스탬프를 사용할 수 있도록 타임스탬프 경계를 지정해야 합니다. 즉, 이러한 Calculator는 일반적으로 SetTimestampOffset(0)에 의존할 수 없으며 대신 SetNextTimestampBounds()를 사용해 명시적으로 타임스탬프 경계를 지정해야 합니다.

Synchronization

Scheduling mechanics

MeidaPipe 그래프에서의 데이터 처리는 CalculatorBase 하위 클래스들로 정의된 내부의 처리 노드들 내에서 발생합니다. 스케쥴링 시스템은 각 calculator를 실행해야 하는 시기를 결정합니다.

각 그래프는 적어도 하나의 스케쥴러 큐를 갖고 있습니다. 각 스케쥴러 큐는 정확히 하나의 실행기가 있습니다. 노드는 대기열(실행자)에 정적으로 할당됩니다. 기본적으로 하나의 큐가 있으며, 실행자는 시스템 기능에 따라 여러 스레드가 있는 스레드 풀(미리 스레드들을 생성해 놓고 작업 큐에 작업이 들어올 시 미리 생성한 스레드에 작업을 할당하는 디자인 패턴.)입니다.

각 노드에는 not ready, ready, running 스케쥴링 상태가 있습니다. 준비 기능은 노드를 실행할 준비가 되었는지 여부를 결정합니다. 이 함수는 그래프 초기화 시, 노드 실행이 완료될 때마다, 노드 입력 상태가 변경될 때마다 호출됩니다.

사용되는 준비 기능은 노드 유형에 따라 다릅니다. 스트림 입력이 없는 노드를 소스 노드라고 합니다. 소스 노드는 프레임워크에 더 이상 출력할 데이터가 없다고 말할 때까지 항상 실행할 준비가 되어 있으며 이 시점에서 닫힙니다.

소스가 아닌 노드는 처리할 입력이 있고 해당 입력이 노드 입력 정책에 의해 설정된 조건에 따라 유효한 입력 세트를 형성하는 경우 준비가 됩니다. 대부분 노드는 기본 입력 정책을 사용하나 일부 노드는 다른 정책을 지정합니다.

노드가 준비되면 해당 스케쥴러 큐에 태스크가 추가되는데 이것이 우선순위 큐입니다. 우선 순위 기능은 현재 고정되어 있으며 그래프 노드의 정적 속성과 토폴로지 정렬을 고려합니다. 예를 들어, 그래프 출력 쪽에 가까운 노드는 우선 순위가 더 높고 소스 노드는 우선 순위가 가장 낮습니다.

각 큐는 Calculator 코드를 호출해 작업을 실제로 실행하는 책임이 있는 실행자에 의해 제공됩니다. 다른 실행기를 제공하고 구성할 수 있습니다. 이는 실행 자원 사용을 사용자 정의하는 데 사용할 수 있습니다. 우선 순위가 낮은 스레드에서 특정 노드를 실행합니다.


Timestamp Synchronization

MediaPipe 그래프 실행은 분산(탈중앙화)되어 있습니다. 글로벌 시계가 없고, 다른 노드가 동시에 다른 타임스탬프 데이터를 처리할 수 있습니다. 이것은 파이프라이닝을 통해 더 높은 처리량을 허용하게 됩니다.

그러나 시간 정보는 많은 perception workflows에서 매우 중요합니다. 여러 입력 스트림을 수신하는 노드는 일반적으로 어떤 방식으로든 이를 조정해야 합니다. 예를 들어, object detector가 프레임으로부터 boundary 상자 리스트를 출력할 수 있으며, 이 정보는 원래 프레임과 함께 처리해야 하는 렌더링 노드에 제공될 수 있습니다.

그러므로 MediaPipe 프레임워크 주요 책임 중 하나는 노드에 대한 입력 동기화를 제공하는 것입니다. 프레임워크 mechanics 측면에서 타임스탬프의 주 역할은 동기화 키 역할을 하는 것입니다.

더 나아가 MediaPipe는 많은 시나리오(테스트, 시뮬레이션, batch processing 등)에서 중요한 결정론적 작업을 지원하도록 설계되었으며, 그래프 작성자는 실시간 제약 조건을 충족하는 데 필요한 결정론을 완화할 수 있습니다.

동기화 및 결정론의 두 가지 목표는 몇 가지 설계 선택 기초가 됩니다. 특히, 주어진 스트림으로 푸시된 패킷에는 단조롭게 증가하는 타임스탬프가 있어야 합니다.이는 많은 노드에 대해 유용한 가정일 뿐 아니라 동기화 논리에 의존하기도 합니다. 각 스트림에는 스트림 새 패킷에 허용되는 가장 낮은 타임스탬프인 타임스탬프 경계가 있습니다. 타임스탬프가 T인 패킷이 도착하면 경계는 자동으로 T+1로 진행해 단조로운 요구 사항을 반영합니다. 이를 통해 프레임워크는 타임스탬프가 T보다 낮은 패킷이 더 이상 도착하지 않는다는 것을 확실히 알 수 있습니다.


Input policies

동기화는 노드에서 지정한 입력 정책을 사용해 각 노드에서 로컬로 처리됩니다.

DefaultInputStreamHandler에 의해 정의된 기본 입력 정책은 다음을 보장해 입력의 결정적 동기화를 제공합니다.

  • 동일한 타임스탬프를 가진 패킷이 여러 입력 스트림에 제공되면 실시간으로 도착 순서에 관계 없이 항상 함께 처리됩니다.
  • 입력 세트는 타임스탬프 오름차순으로 처리됩니다.
  • 패킷이 삭제되지 않으며 처리가 완전히 결정적입니다.
  • 노드는 위가 보장됨에 따라 가능한 한 빨리 데이터를 처리할 준비가 됩니다.

참고 : 이의 중요한 결과는 Calculator가 패킷을 출력할 때 항상 현재 입력 타임스탬프를 사용하는 경우 출력이 본질적으로 단조롭게 증가하는 타임스탬프 요구 사항을 준수한다는 것입니다.

경고 : 반면에 모든 스트림에 대해 입력 패킷을 항상 사용할 수 있단 보장은 없습니다.

작동 방식을 설명하려면 확정된 타임스탬프 정의를 소개해야 합니다. 우리는 스트림 타임스탬프가 타임스탬프 경계보다 낮으면 확정된다고 말합니다. 즉, 해당 타임스탬프 입력 상태가 취소 불가능하게 알려지면 타임스탬프가 스트림에 대해 결정됩니다. 즉, 패킷이 있거나 해당 타임스탬프가 있는 패킷이 도착하지 않을 것이라는 확신이 있습니다.

참고 : 이러한 이유로 MediaPipe는 스트림 생산자가 명시적으로 마지막 패킷이 의미하는 것보다 더 멀리, 즉, 더 엄격한 경계를 제공하기 위해 더 멀리 타임스탬프 경계를 진행하도록 허용합니다. 이를 통해 다운스트림 노드가 입력을 더 빨리 해결할 수 있습니다.

타임스탬프는 각 스트림에 대해 정산되는 경우 여러 스트림에 걸쳐 정산됩니다. 또한 타임스탬프가 확정되면 이전 모든 타임스탬프도 확정된단 의미입니다. 따라서 확정된 타임스탬프는 오름차순으로 결정적으로 처리될 수 있습니다.

이 정의가 주어지면 기본 입력 정책이 있는 Calculator는 모든 입력 스트림에 걸쳐 정산되고, 적어도 하나의 입력 스트림에 패킷을 포함하는 타임스탬프가 있는 경우 준비가 된 것입니다. 입력 정책은 확정된 타임스탬프에 대해 사용 가능한 모든 패킷을 계산기에 대한 단일 입력 세트로 제공합니다.

이 결정적 동작 결과 중 하나는 여러 입력 스트림이 있는 노드의 경우 타임스탬프가 해결될 때까지 이론적으로 무제한 대기가 있을 수 있으며 그동안 무제한 수의 패킷이 버퍼링될 수 있다는 것입니다.

따라서 사용자 지정 입력 정책도 제공합니다. 예를 들어 SyncSetInputStreamHandler에 의해 정의된 다른 동기화 세트의 입력을 분할하거나, 동기화를 모두 피하고 ImmediateInputStreamHandler에 의해 정의된 입력이 도착하는 즉시 처리합니다.


Flow control

두 가지 main flow control mechanisms이 있습니다.

  1. backpressure mechanism

스트림에 버퍼링된 패킷이 CalculatorGraphConfig::max_queue_size에 의해 정의된 제한에 도달할 때 업스트림 노드 실행을 제한합니다. 이 메커니즘은 결정적 동작을 유지하고 필요할 때 구성된 제한을 완화하는 교착 상태 방지 시스템을 포함합니다.

  1. 실시간 제약 조건에 따라 패킷을 삭제할 수 있는 특수 노드 삽입

FlowLimiterCalculator에 정의되어 있으며, 일반적으로 사용자 지정 입력 정책에 사용됩니다. 예를 들어, 공통 패턴은 최종 출력에서 흐름 제어 노드로 루프백 연결과 함께 하위 그래프 입력에 흐름 제어 노드를 배치합니다. 따라서 흐름 제어 노드는 다운스트림 그래프에서 처리 중인 타임스탬프 수를 추적하고 이 수가 제한에 도달하면 패킷을 삭제할 수 있습니다. 패킷이 업스트림으로 삭제되기 때문에 타임스탬프를 부분적으로 처리한 다음 중간 단계 사이에서 패킷을 삭제함으로써 발생하는 작업 낭비를 피할 수 있습니다.

이 Calculator 기반 접근 방식을 통해 그래프 작성자는 패킷을 삭제할 수 있는 위치를 제어할 수 있으며 리소스 제약 조건에 따라 그래프 동작을 유연히 조정하고 사용자 지정을 할 수 있습니다.

Packets

Calculator는 패킷들을 주고받으며 통신합니다. 일반적으로 각 입력 타임스탬프에서 각 입력 스트림을 따라 단일 패킷은이 전송됩니다. 패킷은 비디오의 단일 프레임 혹은 단일 정수 감지 개수와 같은 모든 종류의 데이터가 포함될 수 있습니다.

Creating a packet

패킷은 일반적으로 packet.h에 있는 mediapipe::MakePacket<T>() 또는 mediapipe::Adopt()를 통해 생성합니다.

Packet p = MakePacket<MyDataClass>("constructor_argument");
Packet p2 = p.At(Timestamp::PostStream());

또는

auto data = absl::make_unique<MyDataCalss>("constructor_argument");
Pakcet p = Adopt(data.release()).At(Timestamp:;PostStream());

Packet::Get<T>()를 사용해 패킷 내 데이터에 액세스할 수 있습니다.

Graphs

GraphConfig

GraphConfig는 Meidapipe 그래프의 토폴로지(아마 네트워크 맥락에서의 토폴로지인 듯. 노드들과 이에 연결된 회선들을 포함한 네트워크의 배열이나 구성을 개념적인 그림으로 표현한 것을 토폴로지라고 한다고 한다.)와 기능을 설명하는 명세서입니다. 명세서에서 그래프의 노드는 특정 Calcualtor의 인스턴스를 나타냅니다. 노드 유형, 입출력과 같은 필요한 모든 구성은 명세서에 설명되어야 합니다. 노드에 대한 설명에는 동기화에서 논의된 노드별 옵션, 입력 정책 및 실행 프로그램과 같은 여러 선택적 필드가 포함될 수 있습니다.
GraphConfig에는 그래프 실행기 구성, 스레드 수 및 입력 스트림 최대 대기열 크기와 같은 전역 그래프 수준 설정을 구성하기 위한 몇 가지 다른 필드가 있습니다. 여러 그래프 수준 설정은 다양한 플랫폼에서 그래프의 성능을 조정하는 데 유용합니다. 예를 들어 모바일에서 무거운 모델 추론 Calculator를 별도 실행기에 연결하면 스레드 지역성이 가능하여 실시간 응용 프로그램 성능을 향상시킬 수 있습니다.

ex)

input_stream : "in"
node {
  calculator : "PassThroughCalculator"
  input_stream : "in"
  output_stream : "out1"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out1"
  output_stream : "out2"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out2"
  output_stream : "out3"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out3"
  output_stream : "out4"
}

Subgraph

CalculatorGraphConfig를 하위 모듈로 모듈화하고 perception 솔루션 재사용을 지원하기 위해 MediaPipe 그래프를 Subgraph로 정의할 수 있습니다. 하위 그래프 공개 인터페이스는 Calculator의 공개 인터페이스와 유사한 일련의 입력 및 출력 스트림으로 구성됩니다. 그러면 하위 그래프가 Calcualtor인 것처럼 CalculatorGraphConfig에 포함될 수 있습니다. CalculatorGraphConfig에서 MediaPipe 그래프가 로드되면 각 하위 그래프 노드가 해당 Calculator 그래프로 대체됩니다. 결과적으로 하위 그래프 의미 및 성능은 해당 Calculator 그래프와 동일합니다.

ex)

  1. 하위 그래프를 정의합니다.
type : "TwoPassThroughSubgraph"
input_stream : "out1"
output_stream : "out3"

node {
  calculator : "PassThroughCalculator"
  input_stream : "out1"
  output_stream : "out2"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out2"
  output_stream : "out3"
}

하위 그래프 공개 인터페이스는 다음으로 구성되어 있습니다.

  • Graph input streams
  • Graph output streams
  • Graph input side packets
  • Graph output side packets
  1. BUILD 규칙 mediapipe_simple_subgraph를 통해 하위 그래프를 등록합니다. 매개변수 register_as는 새 하위 그래프의 구성요소 이름을 정의합니다.
mediapipe_simple_subgraph(
  name = "twopassthrough_subgraph",
  graph = "twopassthrough_subgraph.pbtxt",
  register_as = "TwoPassThroughSubgraph",
  deps = [
      "//mediapipe/calculators/core:pass_through_calculator",
      "//mediapipe/framework:calculator_graph",
  ],
)
  1. 하위 그래프를 메인 그래프에 사용합니다.

input_stream "in"
node {
  calculator : "PassThroughCalculator"
  input_stream : "in"
  output_stream : "out1"
}
node {
  calculator : "TwoPassThroughSubgraph"
  input_stream : "out1"
  output_stream : "out3"
}
node {
  calculator : "PassThroughCalculator"
  input_stream : "out3"
  output_stream : "out4"
}

주기

기본적으로 MediaPipe는 Calcualtor 그래프가 비주기적이어야 하고 그래프 주기를 오류로 처리해야 합니다. 그래프에 주기가 있어야 하는 경우 그래프 구성에서 주기에 주석을 달아야 합니다. 이 섹션에서는 이를 어떻게 하는 지에 대해 설명합니다.

(참고 : 이는 변경될 수 있는 사항입니다)

샘플 코드로 mediapipe/framework/calculator_graph_test.ccCalculatorGraphTest.Cycle 단위 테스트를 사용하세요. 아래는 테스트의 순환 그래프입니다. adder의 sum 출력은 정수 source calculator에서 생성된 정수 합입니다.

calculator_graph_test.cc 예시 순환 그래프

Back Edge Annotation

각 주기 edge는 back edge로 주석 처리되어야 합니다. 이렇게 하면 back edges를 모두 제거한 후 MediaPipe의 토폴로지 정렬이 작동합니다.
일반적으로 back edges를 선택하는 방법엔 여러 가지가 있습니다. back edges로 표시된 edges는 up streams로 간주되는 노드와 down streams로 간주되는 노드에 영향을 미치며, 이는 MediaPipe가 노드에 할당하는 우선 순위에 영향을 줍니다.
예를 들어, CalculatorGraphTest.Cycle 테스트는 old_sum edge를 back edge로 표시하므로 Delay 노드는 Adder 노드의 다운스트림 노드로 간주되고 더 높은 우선 순위가 부여됩니다. 또는 delay node에 대한 sum 입력을 back edges로 표시할 수 있습니다. 이 경우 delay node는 Adder 노드의 업스트림 노드로 간주되고 더 낮은 우선 순위가 부여됩니다.

Initial Packet

정수 소스의 첫 번째 정수가 도착할 때 Adder Calculator를 실행할 수 있으려면 Adder에 대한 old_sum 입력 스트림에서 값이 0이고 타임스탬프가 동일한 초기 패킷이 필요합니다. 이 초기 패킷은 Open() 메소드의 delay Calculator에 의해 출력되어야 합니다.

Delay in a Loop

각 루프는 이전 sum 출력을 다음 정수 입력과 정렬하기 위해 지연을 발생시켜야 합니다. 이것 또한 delay node에 의해 수행됩니다. 따라서 delay node는 정수 source Calculator의 타임스탬프에 대해 다음을 알아야 합니다.

  • 첫 번째 출력 타임스탬프
  • 연속 출력 간 타임스탬프 delta

패킷 순서만 신경쓰고 패킷 타임스탬프는 무시하는 대체 스케줄링 정책을 추가해 불편함을 없앨 계획입니다. (추후 기능이 추가된다는 의미인 듯)

Early Termination of a Calculator When One Input Stream is Done

기본적으로 MeidaPipe는 모든 입력 스트림이 완료되면 source가 아닌 Calculator의 Close() 메소드를 호출합니다. 그러나 예제 그래프에서 정수 source가 완료되는 즉시 Adder 노드를 중지하려 합니다. 이는 대체 입력 스트림 핸들러인 EarlyCloseInputStreamHandler로 Adder 노드를 구성하여 수행됩니다.

Relavant Source Code

DELAY CALCULATOR

초기 패킷을 출력하는 Open() 코드와 입력 패킷에 (단위) 지연을 추가하는 Process() 코드에 유의하십시오. 위에서 언급했듯 이 delay node는 출력 스트림이 패킷 타임스탬프가 0, 1, 2, 3, ...인 입력 스트림과 함께 사용된다 가정합니다.

class UnitDelayCalculator : public Calculator {
  public : 
    static absl::Status FillExpectations(
      const CalcualtorOptions& extendable_options, PacketTypeSet* inputs,
      PacketTypeSet* outputs, PacketTypeSet* input_side_packets) {
    inputs->Index(0)->Set<int>("An Integer.");
    outputs->Index(0)->Set<int>("The inputdelayed by one time unit.");
    return absl::OkStatus();
  }

  absl::Status Open() final {
    Output()->ADD(new int(0), Timestamp(0));
    return absl::OkStatus();
  }

  absl::Status Process() final{
    const Packet& packet = Input()->Value();
    Output()->AddPacket(packet.At(pakcet.Timestamp().NextAllowedInStream()));
    return absl::OkStatus();
  }
};

GRAPH CONFIG

back_edge주석과 대체 input_stream_handler에 유의하십시오.

node {
  calculator : "GlobalCountSourceCalculator"
  input_side_packet : "global_counter"
  output_stream : "integers"
}
node {
  calculator : "IntAdderCalculator"
  input_stream : "integers"
  input_stream : "old_sum"
  input_stream_info: {
    tag_index : ":1"
    back_edge : true
  }
  output_stream : "sum"
  input_stream_handler {
    input_steram_handler : "EarlyCloseInputStreamHandler"
  }
}
node {
  calculator : "UnitDelayCalculator"
  input_stream : "sum"
  output_stream : "old_sum"
}

참고 : https://google.github.io/mediapipe/framework_concepts/graphs.html

+ Recent posts