Calculators

각 Calculators는 그래프의 노드입니다. 대부분 그래프는 calculators 안에서 실행됩니다. calculator는 0개 이상의 input streams 그리고 / 혹은 side packets들을 받고, 0개 이상의 output streams 그리고 / 혹은 side packets들을 생성합니다.


CalculatorBase

CalculatorBase 클래스의 하위 클래스를 정의하고 여러 메소드를 구현하고 새 하위 클래스를 Mediapipe에 등록해 Calculator를 만듭니다. 새 Calculator는 최소 하위 4개의 메소드는 구현되어야 합니다.

  • GetContract()
    • Calculator 제작자는 원하는 input과 output 타입을 GetContract()에 특정할 수 있습니다. 그래프가 초기화될 때 프레임워크는 연결된 입출력 패킷 유형이 GetContract()에서 지정한 정보와 일치하는지 확인하기 위해 정적 메소드를 호출합니다.
  • Open()
    • 그래프가 시작된 후 프레임워크는 Open()을 호출합니다. 이 시점에서 입력 측 패킷을 Calculator에서 사용 가능합니다.Open()은 node configuration operations을 해석하고 per-graph-run 상태를 준비합니다. 또한 이 함수는 Calculator 출력에 패킷을 쓸 수 있습니다. Open() 중 에러 발생 시 그래프 실행을 중지할 수 있습니다.
  • Process()
    • framework는 적어도 하나의 입력 스트림에 사용 가능한 패킷이 있을 때마다 Process()를 반복적으로 호출합니다. framework는 기본적으로 모든 입력이 동일한 타임스탬프를 갖도록 보장합니다. 병렬 실행이 활성화된 경우 여러 Process()를 동시에 호출할 수 있습니다. Process() 중 에러 발생 시 framework는 Close()를 호출하고 그래프 실행을 중단시킵니다.
  • Close()
    • Process()의 모든 호출이 중단되거나 모든 입력 스트림들이 닫히면 framework는 Close()를 호출합니다. 이 함수는 항상 Open()이 호출되어 성공한 경우와 오류로 인해 그래프 실행이 종료된 경우에 항상 호출됩니다. Close() 동안 입력 스트림을 통해 사용할 수 있는 입력은 없으나 여전히 입력 측 패킷에 접근할 수 있어 출력을 사용할 수 있습니다. Close()가 반환된 후 Calculator는 죽은 노드로 간주되어야 합니다. Calculator는 그래프 실행이 완료되는 즉시 소멸됩니다.
class CalculatorBase {
  public:
    static absl::Status GetContract(CalculatorContract* cc);

    virtual absl::Status Open(CalculatorContext* cc){
      return absl::OkStatus();
    }

    virtual absl::Status Process(CalculatorContext* cc) = 0;

    virtual absl::Status Close(CalculatorContext* cc) {
      return absl::OkStatus();
    }
};

Life of a calculator

Mediapipe 그래프가 초기화되는 동안 frameworks는 GetContract() static method를 호출하여 예상되는 패킷 종류를 결정합니다.

framework는 각 그래프가 실행되는 데에 대한 전체 Calculator를 구축하고 파괴합니다. 그래프 실행에서 일정하게 유지되는 비싸거나 큰 개체는 입력 측 패킷으로 제공되어야 후속 실행에서 계산이 반복되지 않습니다.

초기화 이후 각 그래프가 실행될 때 다음 시퀀스가 발생합니다. (GetContract()는 초기화되는 동안 바로 발생하므로 포함 X)

  • Open()
  • Process()
  • Close()

 

framework는 calculator 초기화를 위해 Open()을 호출합니다. Open()은 모든 옵션을 해석하고 Calculator의 그래프 per-graph-run 상태를 설정해야 합니다. Open()은 입력 측 패킷을 얻고 패킷을 Calculator 출력에 쓸 수 있습니다. 적절한 경우 SetOffset()을 호출해 입력 스트림의 잠재적 패킷 버퍼링을 줄여야 합니다.
Open()이나 Process() 중 에러가 발생한 경우 Calculaotr의 메소드를 더 이상 호출하지 않고 그래프 실행이 종료되고, Calculator가 파괴됩니다.


입력이 있는 Calculator의 경우 framework는 하나 이상의 입력에 사용 가능한 패킷이 있을 때마다 Process()를 호출합니다. framework는 입력이 모두 동일한 타임스탬프를 갖고 Process()를 호출할 때마다 타임스탬프가 증가하며 모든 패킷이 전달되도록 보장합니다. 결과적으로 일부 입력에는 Process()가 호출될 때 패킷이 없을 수 있습니다. 패킷이 누락된 입력은 타임스탬프가 없는 빈 패킷을 생성하는 것으로 나타납니다.


framework는 모든 Process()가 호출된 후 Close()를 호출합니다. 모든 입력이 소진되었으나 Close()는 입력 측 패킷에 엑세스할 수 있어 출력을 쓸 수 있습니다. Close()가 반환된 후 Calculator는 파괴됩니다.
입력이 없는 Calculator는 source라고 합니다. source Calculator는 Ok 상태를 반환하는 한 계속해서 Process()를 호출합니다. source Calculator는 중지 상태(ex - Mediapipe::tool::StatusStop)를 반환해 소진되었음을 나타냅니다.


Identifying inputs and outputs

Calculator에 대한 공용 인터페이스는 입력 스트림과 출력 스트림 세트로 구성됩니다. CalculatorGraphConfiguration에서 일부 Calculator 출력은 명명된 스트림을 사용해 다른 Calculator 입력에 연결됩니다. 스트림 이름은 일반적으로 소문자이고 입력 및 출력 태그는 일반적으로 대문자입니다.

 

ex)

node {
  calculator : "SomeAudioVideoCalculator"
  input_stream : "INPUT:combined_input"
  output_stream : "VIDEO:video_stream"
}

node {
  calculator : "SomeVideoCalculator"
  input_stream : "VIDEO_IN:video_stream"
  output_stream : "VIDEO_OUT:processed_video"
}

입력, 출력 스트림들은 인덱스 번호, 태그 이름, 태그 이름과 인덱스 번호의 결합으로 식별될 수 있습니다.

 

ex)

node {
  calculator : "SomeAudioVideoCalculator"
  input_stream : "combined_input"
  output_stream : "VIDEO:video_stream" # 태그 이름
  output_strema : "AUDIO:0:audio_left" # 태그 이름 + 인덱스 번호
  output_stream : "AUDIO:1:audio_right"
}

node  {
  calculator : "SomeAudioCalculator"
  input_stream : "audio_left"
  input_stream : "audio_right"
  output_stream : "audio_energy"
}

Calculator 구현에서 입출력은 태그 이름과 인덱스 번호로 식별됩니다.

 

ex)

  • 인덱스 번호 : 결합된 입력 스트림은 단순히 인덱스 0으로 식별됩니다.
  • 태그 이름 : 비디오 출력 스트림은 "VIDEO" 태그 이름으로 식별됩니다.
  • 태그 이름 + 인덱스 번호 : 출력 오디오 스트림은 "AUDIO" 태그 이름과 0, 1 인덱스 번호로 식별됩니다.
class SomeAudioVideoCalculator : public CalculatorBase {
  public :
    static absl::Status GetContract(CalcaultorContract* cc) {
      cc->Inputs().Index(0).SetAny();
      // SetAny()는 스트림 유형이 무엇이든 허용 가능함을 지정하는 데 사용됩니다.
      // 그렇다고 모든 패킷이 허용되는 건 아니고 여전히 특정 유형이 있습니다.
      // SetAny()는 명시적으로 유형을 스트림 유형으로 설정하는 것과 동일한 효과가 있습니다.
      cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
      cc->Outputs().Tag("AUDIO", 0).Set<Matrix>();
      cc->Outputs().Tag("AUDIO", 1).Set<Matrix>();
      return absl::OkStatus();
}

Processing

non-source node(소스 노드가 아닌 노드 = 입력이 있는 Calculator)에서 호출된 Process()는 모두 잘 진행되었음을 나타내기 위해 absl::OkStatus()를 반환하거나 오류를 알리기 위해 다른 상태코드를 반환해야 합니다.

만약 non-source node가 tool::StatusStop()를 반환하면 그래프가 일찍 취소된다는 신호입니다. 이러한 경우, 모든 source calculators와 그래프 입력 스트림들은 닫힙니다. 남은 패킷들은 그래프를 통해 전파됩니다.

 

그래프에 있는 source node는 absl::OkStatus()를 반환할 때까지 계속해서 Process()를 호출합니다. 더 이상 생성할 데이터가 없음을 나타내려면 tool::StatusStop()을 반환합니다. 다른 모든 상태는 오류가 발생했음을 나타냅니다.

 

Close()는 성공을 나타내는 absl::OkStatus를 반환합니다. 다른 상태들은 실패를 나타냅니다.

 

ex)

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Calculator options

Calculators는 입력 스트림 패킷, 입력 사이드 패킷, calculator 옵션을 통해 매개변수를 수락합니다. 지정된 경우 calculator 옵션은 CalculatorGraphConfiguration.Node 메시지의 node_options 필드에 리터럴 값(immutable한 데이터 값)으로 나타납니다.

 

ex)

node {
  calculator : "TFLiteInferenceCalculator"
  input_stream : "TENSORS : main_model_input"
  output_stream : "TENSORS : main_model_output"
  node_options: {
    [type.googleapis.com/mediapipe.TFLiteInferenceCalculatorOptions] {
      model_path : "mediapipe/models/detection_model.tflite"
    }
  }
}

모든 Calculator에서 calculator 옵션을 사용할 수 있는 것은 아닙니다.

 

ex)

옵션을 수락하기 위해 Calculator는 일반적으로 PacketClonerCalculatorOptions와 같은 옵션을 나타내는 새로운 protobuf 메시지 유형을 정의합니다. 그 다음 Calculator는 CalculatorBase::Open 메소드에서 해당 protobuf 메시지를 읽습니다. 또한 CalculatorBase::GetContract 함수 또는 CalculatorBase::Process 메소드에서도 가능합니다. 일반적으로 새로운 protobuf 메시지 유형은 ".proto" 파일과 mediapipe_proto_library() 빌드 규칙(bazel)을 사용해 protobuf 스키마로 정의됩니다.

mediapipe_proto_library(
  name = "packet_cloner_calculator_proto",
  srcs = ["packet_cloner_calculator.proto"],
  visibility = ["//visibility:public"],
  deps = [
    "//mediapipe/framework:calculator_options_proto",
    "//mediapipe/framework:calculator_proto",
  ],
)

Example calculator

이 섹션에서는 PacketClonerCalculator 구현에 대해 설명합니다. PacketClonerCalculator는 요청 시 가장 최근 입력 패킷 복사본을 생성합니다.

PacketClonerCalculator는 도착하는 데이터 패킷 타임스탬프가 완벽히 정렬되지 않은 경우 유용합니다. 마이크, 광센서 및 비디오 카메라가 있는 방이 있다고 가정합니다. 각 센서는 독립적으로 작동하며 간헐적으로 데이터를 수집합니다. 각 센서 출력은 다음과 같다 가정합니다.

  • microphone = 방 소리 크기(데시벨) Integer
  • light sensor = 방 밝기 Integer
  • video camera = 방의 RGB 이미지 프레임 ImageFrame

우리의 단순한 퍼셉트론 파이프라인은 마지막으로 수집된 마이크 음량 데이터 및 광센서 밝기 데이터와 동기화되는 카메라 이미지 프레임 데이터가 있을 때 언제든 이 3개 센서의 데이터를 처리하도록 디자인되었습니다. 이를 MediaPipe와 수행하기 위해 우리 퍼셉트론 파이프라인은 세 입력 스트림들을 가집니다.

  • room_mic_signal - 이 입력 스트림의 각 데이터 패킷은 타임스탬프가 있는 방에서 오디오가 얼마나 큰지 나타내는 정수 데이터입니다.
  • room_lightening_sensor - 이 입력 스트림의 각 데이터 패킷은 타임스탬프가 있는 방에서의 밝기를 나타내는 정수 데이터입니다.
  • room_video_tick_signal - 이 입력 스트림의 각 데이터 패킷은 타임스탬프가 있는 방의 카메라에서 수집된 비디오를 나타내는 비디오 데이터 이미지프레임입니다.

다음은 PacketClonerCalculator 구현입니다.
current_ = 가장 최근 입력 패킷을 보유하는 인스턴스 변수

#include <vector>

#include "absl/strings/str_cat.h"
#include "mediapipe/framework/calculator_framwork.h"

namespace mediapipe {

// 마지막 스트림에서 수신된 모든 패킷에 대해 다른 모든 스트림에서 얻은 최신 패킷을 출력합니다.
// 따라서 마지막 스트림이 다른 스트림보다 더 높은 속도로 출력되면 
// 다른 스트림 패킷을 마지막 스트림과 일치하도록 효과적으로 복제합니다.
// Example config :
// node {
//   calculator : "PacketClonerCalculator"
//   input_stream : "first_base_signal"
//   input_stream : "second_base_signal"
//   output_stream : "cloned_first_base_signal"
//   output_stream : "cloned_second_base_signal"
// }
//

class PacketClonerCalculator : public CalculatorBase {
  public:
    static absl::Status GetContract(CalculatorContract* cc) {
      const int tick_signal_index = cc->Inputs().NumEntries() - 1;

      for (int i=0; i < tick_signal_index; ++i) {
        cc->Inputs().Index(i).SetAny();
        cc->Ouputs().Index(i).SetSameAs(&cc->Inputs().Index(i));
      }
      cc->Inputs().Index(tick_signal_index).SetAny();
      return absl::OkStatus();
    }

    absl::Status Open(CalculatorContext* cc) final {
      tick_signal_index_ = cc->Inputs().NumEntries() - 1;
      current_.resize(tick_signal_index_);
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!cc->Inputs().Index(i).Header().IsEmpty()) {
          cc->Outputs().Index(i).SetHeader(cc->Inputs().Index(i).Header());
        }
      }
      return absl::OkStatus();
    }

    absl::Status Process(CalculatorContext* cc) final {
      for (int i = 0; i < tick_signal_index_; ++i) {
        if (!cc->Inputs().Index(i).Value().IsEmpty()) {
          current_[i] = cc->Inputs().Index(i).Value();
        }
      }

      if (!cc->Inputs().Index(tick_signal_index_).Value().IsEmpty()) {
        for (int i = 0; i < tick_signal_index_; ++i) {
          if (!current_[i].IsEmpty()) { 
            cc->Outputs().Index(i).AddPacket(
              current_[i].At(cc->InputTimestamp()));
          } else {
            cc->Outputs().Index(i).SetNextTimestampBound(
              cc->InputTimestamp().NextAllowedInStream());
          }
        }
      }
      return absl::OkStatus();
    }

  private:
    std::vector<Packet> current_;
    int tick_signal_index_;
};

REGISTER_CALCULATOR(PacketClonerCalculator);
}

일반적으로 calculator는 REGISTER_CALCULATOR(calculator_class_name)을 통해 등록하여 calculator를 알릴 수 있으므로 .cc 파일만 필요하고, .h는 필요하지 않습니다.
아래는 3개 입력 스트림, 1개의 노드(PacketClonerCalculator) 그리고 2개의 출력 스트림이 있는 Mediapipe 그래프입니다.

input_stream : "room_mic_signal"
input_stream : "room_lighting_sensor"
input_stream : "room_video_tick_signal"

node {
  calculator : "PacketClonerCalcualtor"
  input_stream : "room_mic_signal"
  input_stream : "room_lighting_sensor"
  input_stream : "room_video_tick_signal"
  output_stream : "cloned_room_mic_signal"
  output_stream : "cloned_lighting_sensor"
}

 

PacketClonerCalculator 작동 다이어그램

 

참고 : https://google.github.io/mediapipe/framework_concepts/calculators.html

+ Recent posts