Calculator

 

Calculator는 그래프의 노드이다. 그래프 실행 대부분은 Calculator 내에서 발생한다. (Calculator들이 모듈처럼 작동해서 그 모듈들을 붙여서 만드는 게 그래프라고 이해하면 될 듯하다.)

Calculator는 0개 0개 이상 입력 스트림 및/또는 사이드 패킷을 수신하고 0개 이상 출력 스트림 및/또는 사이드 패킷을 생성할 수 있다.

 

Calculator는 최소 다음 4가지 메서드를 구현해야 한다.

  • GetContract()
    • Calculator의 예상 입출력 유형을 지정할 수 있다. 초기화 시 프레임워크는 연결된 입출력 패킷 유형이 이 사양의 정보와 일치하는지 확인하기 위해 static 메서드를 호출한다.
  • Open()
    • 그래프가 시작될 때 framework는 open()함수를 호출한다. 이 시점에서 입력 side packet들을 사용할 수 있다. Open()은 노드 구성 작업을 해석하고 (그래프의 전체적 구성을 해석한다는 거인 듯) calculator의 per-graph-run 상태를 준비한다. 이 함수는 패킷들을 calculator의 출력들로 쓴(write)다. Open() 중 에러가 나면 그래프 실행이 중단된다.
  • Process()
    • 입력이 있는 Calculator의 경우 프레임워크는 하나 이상의 입력 stream에 사용 가능한 패킷이 있을 때마다 Process()를 반복적으로 호출한다. 프레임워크는 기본적으로 모든 입력이 동일한 타임스탬프를 갖도록 보장한다.(이건 잘 이해를 못하겠다.)
  • Close()
    • Process()에 대한 모든 호출이 완료된 후 또는 모든 입력 Stream이 닫히면 호출되는 함수이다. 이 함수는 Open()이 호출되어 성공한 경우와 오류로 인해 그래프 실행이 종료된 경우에도 호출된다. CLose()동안 입력 Stream을 통해 사용할 수 있는 입력은 없지만 여전히 입력 측 패킷에 접근 가능하여 출력을 쓸 수 있다. Close() 반환 후 Calculator는 죽은 노드로 간주되어야 한다. Calculator 객체는 그래프 실행이 완료되는 즉시 소멸된다.

 

CalculatorBase.h

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // CalculatorBase의 하위 클래스는 반드시 GetContract를 구현해야 한다.
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // Open은 새로 생성된 Calculator에서 Process() 호출 전 호출되어야 한다.
  // 하위 클래스들은 필요한 설정을 수행하기 위해 이 메서드를 override할 수 있고
  // 가능하면 출력 패킷 및/또는 출력 스트림의 헤더를 설정한다.
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // 들어오는 입력을 처리한다. 
  //.cc 파일의 메서드를 호출해 입력에 액세스하고 출력을 생성할 수 있다.
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  // Open()이 호출되고 성공하면 호출된다. 
  // 처리가 완료된 직후 또는 그래프 실행이 종료된 후(그래프에서 오류가 발생한 경우) 호출된다.
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Calculator의 생애

 

MediaPipe graph 초기화동안 framework는 어떤 종류의 패킷들이 예상될지 결정하기 위해서 GetContract() static 메서드를 부른다.

프레임워크는 각 그래프 실행에 대해 전체 Calculator를 구성하고 파괴한다. 그래프 실행에서 일정하게 유지되는 비싸거나 큰 객체는 입력 측 패킷으로 제공되어야 후속 실행에서 계산이 반복되지 않는다.

초기화 후 그래프의 각 실행에 대해 다음 시퀀스가 발생한다.

  • Open()
  • Process() (반복됨)
  • Close()

프레임워크는 calculator를 초기화하기 위해 Open()을 호출한다. Open()은 모든 옵션을 해석하고 calculator의 per-graph-run state를 설정해야 한다. Open()은 입력 측 패킷을 획득하고 패킷을 Calculator 출력에 기록할 수 있다. 적절한 경우 SetOffset()을 호출해 입력 Stream의 잠재적 패킷 버퍼링을 줄여야 한다.

Open()이나 Process() 중 에러가 발생하면 Calculator의 메서드를 더 이상 호출하지 않고 그래프 실행이 종료되고 Calculator가 파괴된다.

 

입력이 있는 Calculator의 경우 프레임워크 하나 이상의 입력에 사용 가능한 패킷이 있을 때마다 Process()를 호출한다. 프레임워크는 입력이 모두 동일한 타임스탬프를 가지며 Process()를 호출할 때마다 타임 스탬프가 증가하며 모든 패킷이 전달되도록 보장한다. 결과적으로 일부 입력에는 process()가 호출될 때 패킷이 없을 수 있다. 패킷이 누락된 입력은 타임스탬프가 없는 빈 패킷을 생성하는 것으로 나타난다.

 

프레임워크는 Process()에 대한 모든 호출 후 Close()를 호출한다. 모든 입력이 소진되었지만 Close()는 입력 측 패킷에 엑세스 할 수 있으며 출력을 쓸 수 있다. Close() 반환 후에는 Calculator는 파괴된다.

 

입력이 없는 Calculator를 Source라고 한다. Source Calculator는 Ok 상태를 반환하는 한 계속해서 Process()를 호출한다. Source Calculator는 중지 상태를 반환해 소진되었음을 나타낸다. (입력값이 있는 Calculator같은 경우 더이상 들어오는 입력값이 없으면 Close()를 불러오면 되지만 입력값이 없으면 계속 Process()만을 호출하게 되어 따로 중단점을 걸어줘야 한다는 의미인 듯. while(1)에 따로 break를 걸어주는 것 같은 느낌)


입력값과 출력값 식별

 

Calculator에 대한 공용 인터페이스는 입력 스트림과 출력 스트림 세트로 구성된다. CalculatorGraphConfiguration에서 일부 Calculator 출력은 명명된 스트림을 사용해 다른 Calculator 입력에 연결된다. 스트림 이름은 일반적으로 소문자고, 입력 및 출력 태그는 일반적으로 대문자이다.

 

ex)

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

태그 이름이 VIDEO인 출력인 video_stream이라는 스트림을 사용하여 태그 이름이 VIDEO_IN인 입력에 연결된다.

 

입출력 스트림들은 index 번호, 태그 이름 또는 태그 이름과 index 번호 조합으로 식별할 수 있다. 

 

ex)

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

 

SomeAudioVideoCalculator는 태그로 비디오 출력을 식별하고 태그와 인덱스 조합으로 오디오 출력을 식별한다. 

VIDEO 태그가 있는 입력은 video_stream이라는 스트림에 연결된다. 태그 AUDIO 및 인덱스 0 및 1이 있는 출력은 audio_left 및 audio_right라는 스트림에 연결된다. SomeAudioCalculator는 색인으로만 오디오 입력을 식별한다.

// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    // SetAny()는 스트림 유형이 무엇이든 허용 가능함을 지정하는 데 사용된다.
    // 그렇다고 모든 패킷이 허용되는 건 아니다.
    // 스트림의 패킷에는 여전히 특정 유형이 있다.
    // SetAny()는 명시적으로 유형을 스트림의 유형으로 설정하는 것과 동일한 효과가 있다.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Processing

 

non-source node에서 호출된 (입력값이 있는 Calculator) Process()는 모든 것이 잘 진행되었음을 나타내기 위해 absl::OkStatus()를 반환하거나 오류를 알리기 위해 다른 상태 코드를 반환해야 한다.

 

입력값이 있는 Calculator가 tool::StatusStop()을 반환하면 그래프가 일찍 취소되고 있음을 나타낸다. 이 경우 모든 Source Calculator와 그래프 입력 스트림이 닫힌다. (남은 패킷은 그래프를 통해 전파된다. 그냥 흐르던 건 계속 흘러간다는 의미일까?)

 

그래프 Source Calculator는 absl::OkStatus()를 반환하는 한 계속 Process()를 호출한다. 더 이상 생성할 데이터가 없음을 나타내려면 tool::StatusStop()을 반환한다. 다른 모든 상태는 오류가 발생했음을 나타낸다.

 

Close()는 성공을 나타내기 위해 absl::OkStatus()를 반환한다. 다른 모든 상태는 실패를 나타낸다.

 

다음은 기본적인 Process() 함수이다. Input() 메서드 (계산기에 단일 입력이 있는 경우에만 사용 가능)를 사용해 입력 데이터를 요청한다. 그런 다음 std::unique_ptr을 사용해 출력 패킷에 필요한 메모리를 할당하고 계산을 수행한다. 완료되면 출력 스트림에 추가할 때 포인터를 해제한다.

 

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here.... <- 이건 뭘까
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Calculator options

 

Calculator는 (1) 입력 스트림 패킷 (2) 입력 사이드 패킷 (3) Calculator 옵션을 통해 처리 매개변수를 받아들인다. 지정된 경우 Calculator 옵션은 CalculatorGraphConfiguration.Node 메시지의 node_options 필드에 리터럴 값(변수의 값이 변하지 않는 데이터)으로 나타난다.

 

ex)

node {
  calculator: "TfLiteInferenceCalculator"
  input_stream: "TENSORS:main_model_input"
  output_stream: "TENSORS:main_model_output"
  node_options: {
    [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
      model_path: "mediapipe/models/detection_model.tflite"
    }
  }
}

 

node_options 필드는 proto3 구문을 받아들인다. 또는 calculator 옵션들은 proto2 구문을 사용한 options 필드에 지정될 수 있다.

 

ex)

node {
  calculator: "TfLiteInferenceCalculator"
  input_stream: "TENSORS:main_model_input"
  output_stream: "TENSORS:main_model_output"
  node_options: {
    [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
      model_path: "mediapipe/models/detection_model.tflite"
    }
  }
}

 

모든 Calculator가 calculator 옵션들을 받아들이는 건 아니다. 옵션을 받아들이기 위해 calculator는 보통 PacketClonerCalculatorOptions와 같은 옵션을 나타내는 새로운 protobuf 메시지 타입을 정의한다. 그런 다음 calculator는 CalculatorBase:;Open 메서드에서 해당 protobuf 메시지를 읽는다. 또한 CalculatorBase::GetContract 함수 또는 CalculatorBase::Process 메서드에서도 가능하다. (Close()빼고 다 되자네..) 일반적으로 새로운 protobuf 메시지 유형은 ".proto" 파일과 mediapipe_proto_library() 빌드 규칙을 사용해 protobuf 스키마로 정의된다.

 

ex)

mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

 

'그 외 공부 > 하루하루 깨달은 바' 카테고리의 다른 글

Mediapipe Packets  (0) 2021.11.05
Mediapipe Graphs  (0) 2021.11.05

+ Recent posts