이건 따로 파는게 민망할 정도로 양이 없으나... 그래도 써야징!

 

Calculator는 패킷을 보내고 받는 방식으로 통신한다. 일반적으로 각 입력 타임스탬프에서 각 입력스트림을 따라 단일 패킷이 전송된다. 패킷에는 단일 비디오 프레임 또는 단일 integer 감지 횟수와 같은 모든 종류의 데이터가 포함될 수 있다.

 

Creating a packet

 

패킷들은 일반적으로 mediapipe::MakePacket<T>() 또는 mediapipe::Adopt()와 함께 생성된다.

 

// Create a packet containing some new data.
// 새 데이터를 포함한 패킷 생성
Packet p = MakePacket<MyDataClass>("constructor_argument");
// Make a new packet with the same data and a different timestamp.
// 다른 타임스탬프를 가진 똑같은 데이터의 패킷 생성
Packet p2 = p.At(Timestamp::PostStream());

또는

// Create some new data.
// 새 패킷 생성
auto data = absl::make_unique<MyDataClass>("constructor_argument");
// Create a packet to own the data.
// 데이터를 소유할 패킷 생성
Packet p = Adopt(data.release()).At(Timestamp::PostStream());

'그 외 공부 > 하루하루 깨달은 바' 카테고리의 다른 글

Mediapipe Graphs  (0) 2021.11.05
Mediapipe Calculators  (0) 2021.11.05

GraphConfig

 

GraphConfig는 MediaPipe 그래프 토폴로지(통신 네트워크에서 의미하는 맥락의 토폴로지같다. 노드들과 이에 연결된 회선들을 포함한 네트워크 배열이나 구성을 개념적 그림을 표현한 것을 의미한다고 한다.)와 기능을 설명하는 명세서이다. GraphConfig에서 그래프 노드는 특정 calculator의 인스턴스를 나타낸다. 노드의 유형, 입출력과 같은 필요한 모든 구성은 GraphConfig에 설명되어야 한다. 

GraphConfig에는 그래프 실행기 구성, 스레드 수 및 입력 스트림 최대 대기열 크기와 같은 전역 그래프 수준 설정을 구성하기 위한 몇 가지 다른 필드가 있다. 여러 그래프 수준 설정은 다양한 플랫폼(ex : 데스크탑이랑 모바일)에서 그래프 성능을 조정하는 데 유용하다. 예를 들어, 모바일에서 무거운 모델 추론 계산기를 별도 실행기에 연결하면 스레드 지역성이 가능해 실시간 어플 성능을 향상시킬 수 있다.

 

ex)

# This graph named main_pass_throughcals_nosubgraph.pbtxt contains 4
# passthrough calculators.
# 4개의 통과 계산기가 포함되어 있다.
input_stream: "in"
node {
    calculator: "PassThroughCalculator"
    input_stream: "in"
    output_stream: "out1"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out1"
    output_stream: "out2"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out2"
    output_stream: "out3"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out3"
    output_stream: "out4"
}

Subgraph

 

CalculatorGraphConfig를 하위 모듈로 모듈화하고 perception solution 재사용을 지원하기 위해 MediaPipe 그래프를 하위 그래프로 정의할 수 있다. subgraph의 공개 인터페이스는 Calculator 공개 인터페이스와 유사한 일련의 입출력 스트림으로 구성된다. 그러면 subgraph가 Calculator인 것처럼 CalculatorGraphConfig에 포함될 수 있다. CalculatorGraphConfig에서 MediaPipe 그래프가 로드되면 각 subgraph 노드가 해당 Calculator 그래프로 대체된다. 결과적으로 subgraph 의미 및 성능은 해당 Calculator 그래프와 동일하다.

 

그래프 안의 그래프가 가능하다는 것 같다. 그래프든 Calculator든 모듈처럼 붙였다 뗐다가 가능한 것 같다.

 

ex)

 

1. subgraph

# This subgraph is defined in two_pass_through_subgraph.pbtxt
# and is registered as "TwoPassThroughSubgraph"

type: "TwoPassThroughSubgraph"
input_stream: "out1"
output_stream: "out3"

node {
    calculator: "PassThroughCalculator"
    input_stream: "out1"
    output_stream: "out2"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out2"
    output_stream: "out3"
}

subgraph 공개 인터페이스는 다음과 같이 구성되어 있다.

  • Graph input streams
  • Graph output streams
  • Graph input side packets
  • Graph output side packets

2. BUILD 규칙 mediapipe_simple_subgraph를 사용해 subgraph를 등록한다. 매개변수 register_as는 새 subgraph의 구성 요소 이름을 정의한다.

# Small section of BUILD file for registering the "TwoPassThroughSubgraph"
# subgraph for use by main graph main_pass_throughcals.pbtxt

mediapipe_simple_subgraph(
    name = "twopassthrough_subgraph",
    graph = "twopassthrough_subgraph.pbtxt",
    register_as = "TwoPassThroughSubgraph",
    deps = [
            "//mediapipe/calculators/core:pass_through_calculator",
            "//mediapipe/framework:calculator_graph",
    ],
)
Use the subgraph in the main graph.

# This main graph is defined in main_pass_throughcals.pbtxt
# using subgraph called "TwoPassThroughSubgraph"

input_stream: "in"
node {
    calculator: "PassThroughCalculator"
    input_stream: "in"
    output_stream: "out1"
}
node {
    calculator: "TwoPassThroughSubgraph"
    input_stream: "out1"
    output_stream: "out3"
}
node {
    calculator: "PassThroughCalculator"
    input_stream: "out3"
    output_stream: "out4"
}

 

이걸 보니까 mediapipe 전부가 엄청 많은 subgraph 들로 이뤄져 있고 그걸 다 적당한 기능에 맞춰서 연결하는 거구나.. 싶다.

 

'그 외 공부 > 하루하루 깨달은 바' 카테고리의 다른 글

Mediapipe Packets  (0) 2021.11.05
Mediapipe Calculators  (0) 2021.11.05

Calculator

 

Calculator는 그래프의 노드이다. 그래프 실행 대부분은 Calculator 내에서 발생한다. (Calculator들이 모듈처럼 작동해서 그 모듈들을 붙여서 만드는 게 그래프라고 이해하면 될 듯하다.)

Calculator는 0개 0개 이상 입력 스트림 및/또는 사이드 패킷을 수신하고 0개 이상 출력 스트림 및/또는 사이드 패킷을 생성할 수 있다.

 

Calculator는 최소 다음 4가지 메서드를 구현해야 한다.

  • GetContract()
    • Calculator의 예상 입출력 유형을 지정할 수 있다. 초기화 시 프레임워크는 연결된 입출력 패킷 유형이 이 사양의 정보와 일치하는지 확인하기 위해 static 메서드를 호출한다.
  • Open()
    • 그래프가 시작될 때 framework는 open()함수를 호출한다. 이 시점에서 입력 side packet들을 사용할 수 있다. Open()은 노드 구성 작업을 해석하고 (그래프의 전체적 구성을 해석한다는 거인 듯) calculator의 per-graph-run 상태를 준비한다. 이 함수는 패킷들을 calculator의 출력들로 쓴(write)다. Open() 중 에러가 나면 그래프 실행이 중단된다.
  • Process()
    • 입력이 있는 Calculator의 경우 프레임워크는 하나 이상의 입력 stream에 사용 가능한 패킷이 있을 때마다 Process()를 반복적으로 호출한다. 프레임워크는 기본적으로 모든 입력이 동일한 타임스탬프를 갖도록 보장한다.(이건 잘 이해를 못하겠다.)
  • Close()
    • Process()에 대한 모든 호출이 완료된 후 또는 모든 입력 Stream이 닫히면 호출되는 함수이다. 이 함수는 Open()이 호출되어 성공한 경우와 오류로 인해 그래프 실행이 종료된 경우에도 호출된다. CLose()동안 입력 Stream을 통해 사용할 수 있는 입력은 없지만 여전히 입력 측 패킷에 접근 가능하여 출력을 쓸 수 있다. Close() 반환 후 Calculator는 죽은 노드로 간주되어야 한다. Calculator 객체는 그래프 실행이 완료되는 즉시 소멸된다.

 

CalculatorBase.h

class CalculatorBase {
 public:
  ...

  // The subclasses of CalculatorBase must implement GetContract.
  // CalculatorBase의 하위 클래스는 반드시 GetContract를 구현해야 한다.
  static absl::Status GetContract(CalculatorContract* cc);

  // Open is called before any Process() calls, on a freshly constructed
  // calculator.  Subclasses may override this method to perform necessary
  // setup, and possibly output Packets and/or set output streams' headers.
  // Open은 새로 생성된 Calculator에서 Process() 호출 전 호출되어야 한다.
  // 하위 클래스들은 필요한 설정을 수행하기 위해 이 메서드를 override할 수 있고
  // 가능하면 출력 패킷 및/또는 출력 스트림의 헤더를 설정한다.
  virtual absl::Status Open(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  // Processes the incoming inputs. May call the methods on cc to access
  // inputs and produce outputs.
  // 들어오는 입력을 처리한다. 
  //.cc 파일의 메서드를 호출해 입력에 액세스하고 출력을 생성할 수 있다.
  virtual absl::Status Process(CalculatorContext* cc) = 0;

  // Is called if Open() was called and succeeded.  Is called either
  // immediately after processing is complete or after a graph run has ended
  // (if an error occurred in the graph).  ...
  // Open()이 호출되고 성공하면 호출된다. 
  // 처리가 완료된 직후 또는 그래프 실행이 종료된 후(그래프에서 오류가 발생한 경우) 호출된다.
  virtual absl::Status Close(CalculatorContext* cc) {
    return absl::OkStatus();
  }

  ...
};

Calculator의 생애

 

MediaPipe graph 초기화동안 framework는 어떤 종류의 패킷들이 예상될지 결정하기 위해서 GetContract() static 메서드를 부른다.

프레임워크는 각 그래프 실행에 대해 전체 Calculator를 구성하고 파괴한다. 그래프 실행에서 일정하게 유지되는 비싸거나 큰 객체는 입력 측 패킷으로 제공되어야 후속 실행에서 계산이 반복되지 않는다.

초기화 후 그래프의 각 실행에 대해 다음 시퀀스가 발생한다.

  • Open()
  • Process() (반복됨)
  • Close()

프레임워크는 calculator를 초기화하기 위해 Open()을 호출한다. Open()은 모든 옵션을 해석하고 calculator의 per-graph-run state를 설정해야 한다. Open()은 입력 측 패킷을 획득하고 패킷을 Calculator 출력에 기록할 수 있다. 적절한 경우 SetOffset()을 호출해 입력 Stream의 잠재적 패킷 버퍼링을 줄여야 한다.

Open()이나 Process() 중 에러가 발생하면 Calculator의 메서드를 더 이상 호출하지 않고 그래프 실행이 종료되고 Calculator가 파괴된다.

 

입력이 있는 Calculator의 경우 프레임워크 하나 이상의 입력에 사용 가능한 패킷이 있을 때마다 Process()를 호출한다. 프레임워크는 입력이 모두 동일한 타임스탬프를 가지며 Process()를 호출할 때마다 타임 스탬프가 증가하며 모든 패킷이 전달되도록 보장한다. 결과적으로 일부 입력에는 process()가 호출될 때 패킷이 없을 수 있다. 패킷이 누락된 입력은 타임스탬프가 없는 빈 패킷을 생성하는 것으로 나타난다.

 

프레임워크는 Process()에 대한 모든 호출 후 Close()를 호출한다. 모든 입력이 소진되었지만 Close()는 입력 측 패킷에 엑세스 할 수 있으며 출력을 쓸 수 있다. Close() 반환 후에는 Calculator는 파괴된다.

 

입력이 없는 Calculator를 Source라고 한다. Source Calculator는 Ok 상태를 반환하는 한 계속해서 Process()를 호출한다. Source Calculator는 중지 상태를 반환해 소진되었음을 나타낸다. (입력값이 있는 Calculator같은 경우 더이상 들어오는 입력값이 없으면 Close()를 불러오면 되지만 입력값이 없으면 계속 Process()만을 호출하게 되어 따로 중단점을 걸어줘야 한다는 의미인 듯. while(1)에 따로 break를 걸어주는 것 같은 느낌)


입력값과 출력값 식별

 

Calculator에 대한 공용 인터페이스는 입력 스트림과 출력 스트림 세트로 구성된다. CalculatorGraphConfiguration에서 일부 Calculator 출력은 명명된 스트림을 사용해 다른 Calculator 입력에 연결된다. 스트림 이름은 일반적으로 소문자고, 입력 및 출력 태그는 일반적으로 대문자이다.

 

ex)

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "INPUT:combined_input"
  output_stream: "VIDEO:video_stream"
}
node {
  calculator: "SomeVideoCalculator"
  input_stream: "VIDEO_IN:video_stream"
  output_stream: "VIDEO_OUT:processed_video"
}

태그 이름이 VIDEO인 출력인 video_stream이라는 스트림을 사용하여 태그 이름이 VIDEO_IN인 입력에 연결된다.

 

입출력 스트림들은 index 번호, 태그 이름 또는 태그 이름과 index 번호 조합으로 식별할 수 있다. 

 

ex)

# Graph describing calculator SomeAudioVideoCalculator
node {
  calculator: "SomeAudioVideoCalculator"
  input_stream: "combined_input"
  output_stream: "VIDEO:video_stream"
  output_stream: "AUDIO:0:audio_left"
  output_stream: "AUDIO:1:audio_right"
}

node {
  calculator: "SomeAudioCalculator"
  input_stream: "audio_left"
  input_stream: "audio_right"
  output_stream: "audio_energy"
}

 

SomeAudioVideoCalculator는 태그로 비디오 출력을 식별하고 태그와 인덱스 조합으로 오디오 출력을 식별한다. 

VIDEO 태그가 있는 입력은 video_stream이라는 스트림에 연결된다. 태그 AUDIO 및 인덱스 0 및 1이 있는 출력은 audio_left 및 audio_right라는 스트림에 연결된다. SomeAudioCalculator는 색인으로만 오디오 입력을 식별한다.

// c++ Code snippet describing the SomeAudioVideoCalculator GetContract() method
class SomeAudioVideoCalculator : public CalculatorBase {
 public:
  static absl::Status GetContract(CalculatorContract* cc) {
    cc->Inputs().Index(0).SetAny();
    // SetAny() is used to specify that whatever the type of the
    // stream is, it's acceptable.  This does not mean that any
    // packet is acceptable.  Packets in the stream still have a
    // particular type.  SetAny() has the same effect as explicitly
    // setting the type to be the stream's type.
    // SetAny()는 스트림 유형이 무엇이든 허용 가능함을 지정하는 데 사용된다.
    // 그렇다고 모든 패킷이 허용되는 건 아니다.
    // 스트림의 패킷에는 여전히 특정 유형이 있다.
    // SetAny()는 명시적으로 유형을 스트림의 유형으로 설정하는 것과 동일한 효과가 있다.
    cc->Outputs().Tag("VIDEO").Set<ImageFrame>();
    cc->Outputs().Get("AUDIO", 0).Set<Matrix>();
    cc->Outputs().Get("AUDIO", 1).Set<Matrix>();
    return absl::OkStatus();
  }

Processing

 

non-source node에서 호출된 (입력값이 있는 Calculator) Process()는 모든 것이 잘 진행되었음을 나타내기 위해 absl::OkStatus()를 반환하거나 오류를 알리기 위해 다른 상태 코드를 반환해야 한다.

 

입력값이 있는 Calculator가 tool::StatusStop()을 반환하면 그래프가 일찍 취소되고 있음을 나타낸다. 이 경우 모든 Source Calculator와 그래프 입력 스트림이 닫힌다. (남은 패킷은 그래프를 통해 전파된다. 그냥 흐르던 건 계속 흘러간다는 의미일까?)

 

그래프 Source Calculator는 absl::OkStatus()를 반환하는 한 계속 Process()를 호출한다. 더 이상 생성할 데이터가 없음을 나타내려면 tool::StatusStop()을 반환한다. 다른 모든 상태는 오류가 발생했음을 나타낸다.

 

Close()는 성공을 나타내기 위해 absl::OkStatus()를 반환한다. 다른 모든 상태는 실패를 나타낸다.

 

다음은 기본적인 Process() 함수이다. Input() 메서드 (계산기에 단일 입력이 있는 경우에만 사용 가능)를 사용해 입력 데이터를 요청한다. 그런 다음 std::unique_ptr을 사용해 출력 패킷에 필요한 메모리를 할당하고 계산을 수행한다. 완료되면 출력 스트림에 추가할 때 포인터를 해제한다.

 

absl::Status MyCalculator::Process() {
  const Matrix& input = Input()->Get<Matrix>();
  std::unique_ptr<Matrix> output(new Matrix(input.rows(), input.cols()));
  // do your magic here.... <- 이건 뭘까
  //    output->row(n) =  ...
  Output()->Add(output.release(), InputTimestamp());
  return absl::OkStatus();
}

Calculator options

 

Calculator는 (1) 입력 스트림 패킷 (2) 입력 사이드 패킷 (3) Calculator 옵션을 통해 처리 매개변수를 받아들인다. 지정된 경우 Calculator 옵션은 CalculatorGraphConfiguration.Node 메시지의 node_options 필드에 리터럴 값(변수의 값이 변하지 않는 데이터)으로 나타난다.

 

ex)

node {
  calculator: "TfLiteInferenceCalculator"
  input_stream: "TENSORS:main_model_input"
  output_stream: "TENSORS:main_model_output"
  node_options: {
    [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
      model_path: "mediapipe/models/detection_model.tflite"
    }
  }
}

 

node_options 필드는 proto3 구문을 받아들인다. 또는 calculator 옵션들은 proto2 구문을 사용한 options 필드에 지정될 수 있다.

 

ex)

node {
  calculator: "TfLiteInferenceCalculator"
  input_stream: "TENSORS:main_model_input"
  output_stream: "TENSORS:main_model_output"
  node_options: {
    [type.googleapis.com/mediapipe.TfLiteInferenceCalculatorOptions] {
      model_path: "mediapipe/models/detection_model.tflite"
    }
  }
}

 

모든 Calculator가 calculator 옵션들을 받아들이는 건 아니다. 옵션을 받아들이기 위해 calculator는 보통 PacketClonerCalculatorOptions와 같은 옵션을 나타내는 새로운 protobuf 메시지 타입을 정의한다. 그런 다음 calculator는 CalculatorBase:;Open 메서드에서 해당 protobuf 메시지를 읽는다. 또한 CalculatorBase::GetContract 함수 또는 CalculatorBase::Process 메서드에서도 가능하다. (Close()빼고 다 되자네..) 일반적으로 새로운 protobuf 메시지 유형은 ".proto" 파일과 mediapipe_proto_library() 빌드 규칙을 사용해 protobuf 스키마로 정의된다.

 

ex)

mediapipe_proto_library(
      name = "packet_cloner_calculator_proto",
      srcs = ["packet_cloner_calculator.proto"],
      visibility = ["//visibility:public"],
      deps = [
          "//mediapipe/framework:calculator_options_proto",
          "//mediapipe/framework:calculator_proto",
      ],
  )

 

'그 외 공부 > 하루하루 깨달은 바' 카테고리의 다른 글

Mediapipe Packets  (0) 2021.11.05
Mediapipe Graphs  (0) 2021.11.05

Process - 별도의 프로세스에서 실행되는 작업을 의미.

 

  • parameters
    • group : 항상 None이어야 함. threading.Thread와의 호환성을 위해 존재
    • target : run() 메소드에 의해 호출될 callable 객체. defalut = None
    • name : 프로세스 이름
    • args : target 호출을 위한 인자 튜플
    • kwargs : target 호출을 위한 키워드 인자 딕셔너리
  • method
    • run() : 프로세스의 활동을 나타내는 메소드. 서브 클래스에서 재정의 가능. 표준 run() 메소드는 객체 생성자에 target 인자로 전달된 callable 객체를 호출하는데 args와 kwargs 인자를 각각 위치 인자와 키워드 인자로 사용
    • start() : 프로세스 활동 시작. 프로세스 객체 당 최대 한 번 호출되어야 함. 객체의 run() 메소드가 별도 프로세스에서 호출되도록 함
    • join([timeout]) : 선택적 인자 timeout이 None(default)인 경우, 메소드는 join() 메소드가 호출된 프로세스가 종료될 때까지 블록된다. timeout이 양수면 최대 timeout초동안 블록. 프로세스가 종료되거나 메소드가 시간 초과되면 None을 돌려주게 되므로 주의. 프로세스의 exitcode를 검사해 종료되었는지 확인.
      프로세스는 여러 번 조인할 수 있다.
      교착 상태를 유발할 수 있으므로 프로세스는 자신을 조인할 수 없다. 프로세스가 시작되기 전 프로세스에 조인하려고 하면 에러가 발생한다.
    • name : 프로세스 이름. 식별 목적으로만 사용.
    • is_alive() : 프로세스가 살았는지 죽었는지 반환
    • daemon : start() 호출 전 설정되어야 함. 초깃값은 생성 프로세스에서 상속. 프로세스가 종료될 때 모든 데몬 자식 프로세스를 강제 종료시키려 시도.
    • pid : 프로세스 ID  반환
    • exitcode : 자식 종료 코드
    • authkey : 프로세스 인증 키
    • terminate(), kill() : 프로세스 강제 종료
    • close() : 프로세스와 관련된 모든 자원 해제

 

Pool - 여러 입력 값에 걸쳐 함수 실행을 병렬 처리하고 입력 데이터를 프로세스에 분산시키는 방법 제공 (데이터 병렬 처리)

 

  • Parameters
    • proccesses : 사용할 작업자 프로세스 수. None이면 os.cpu_count()에 의해 반환되는 수가 사용됨
    • initializer : None이 아니면, 각 작업자 프로세스는 시작할 때 initializer(*initargs)를 호출
    • maxtasksperchild : 사용되지 않는 자원을 해제할 수 있도록, 작업 프로세스가 종료되고 새 작업 프로세스로 교체되기 전에 완료할 수 있는 작업 수.
    • context : 작업자 프로세스를 시작하는 데 사용되는 컨텍스트를 지정하는 데 사용할 수 있음.
  • method
    • apply(func[, args[, kwds]]) : 인자 args 및 키워드 인자 kwds를 사용해 func를 호출한다. 결과가 준비될 때까지 블록된다.
    • apply_async(func[, args[, kwds[. callback[, error_callback]]]]) : AsyncResult 객체를 반환하는 apply() 메소드 변형. callback이 지정되면 단일 인자를 받아들이는 callable이어야 한다. 결과가 준비되면 callback을 결과를 인자로 호출한다. 실패한 결과면 error_callback이 대신 적용된다. 콜백은 즉시 완료되어야 하며, 그렇지 않으면 결과를 처리하는 스레드가 블록된다.
    • map(func, iterable[, chunksize]) : map() 내장 함수의 병렬 버전. 하나의 iterable 인자만 지원. 여러 개의 인자를 지원하는 건 starmap(). 결과가 준비될 때까지 블록된다. 이 메소드는 iterable을 여러 묶음으로 잘라 별도 작업으로 프로세스 풀에 제출한다. 이 묶음의 크기는 chunksize를 양의 정수로 설정해 지정가능. 너무 긴 iterable은 높은 메모리 사용을 유발하므로 명시적 chunksize 옵션으로 imap()이나 imap_unordered()를 사용하는 걸 고려해야 한다.
    • map_async(func, iterable[, chunksize[, callback[, error_callback]]]) : AsyncResult 객체를 반환하는 map() 메소드 변형.매커니즘은 apply_async()와 같다.
    • close() : 더는 작업이 풀에 제출되지 않도록 한다. 모든 작업이 완료되면 작업자 프로세스가 종료됨
    • terminate() : 계류 중인 작업을 완료하지 않고 즉시 작업자 프로세스 중지. Pool 객체가 garbage 수집될 때 terminate()가 죽시 호출됨
    • join() : 작업자 프로세스가 종료될 때까지 기다린다. 호출 전 반드시 close()나 terminate()를 호출해야 한다.
  • 그 외
    • imap, imap_unordered, starmap, starmap_async, AsyncResult, etc...

 

Pipe & Queue - 여러 프로세스 사용 시, 프로세스 간 통신을 위해 메시지를 전달하기 위해 Pipe()를 사용할 수 있다.

 

  • multiprocessing.Pipe([duplex])
    • 파이프의 끝을 나타내는 Connection 객체 쌍 (conn1, conn2)를 반환.
    • duplex가 True(default)면 파이프는 양방향.
      False일 시, 단방향. conn1은 메시지 받는 데, conn2는 메시지 보내는 데에만 사용 가능
  • multiprocessing.Queue([maxsize])
    • 파이프와 몇 개의 록/세마포어를 사용해 구현된 프로세스 공유 큐 반환. 프로세스가 처음으로 항목을 큐에 넣으면 버퍼에서 파이프로 객체 전송 피더 스레드가 시작됨
    • method
      • qsize() - 큐 크기 리턴
      • empty() - 큐가 비어 있으면 True, 아니면 False
      • full() - 큐가 가득 차면 True, 아니면 False
        • 위의 세 메소드는 다중 스레딩 / 다중 프로세싱 특성을 타 신뢰할 수 없다.
      • put(obj[, block[, timeout]]) : obj를 큐에 넣는다. block이 True이고 timeout이 None(default)이면 빈 슬롯이 생길 때까지 필요한 경우 블록함. timeout이 양수인 경우, 최대 timeout초만큼 블록하고 그 시간 내 사용 가능 슬롯이 생기지 않으면 queue.Full 예외 발생. block이 False이고 빈슬롯을 즉시 사용 가능하면 항목을 넣으나 그렇지 않으면 queue.Full 예외 발생.(이 경우 timeout은 무시)
      • get([block[, timeout]]) : obj를 큐에서 반환. 매커니즘은 put과 비슷.
      • close() : 현재 프로세스가 이 큐에 더는 데이터를 넣지 않을 것을 나타냄.
      • join_thread() : 배경 스레드에 조인. close()가 호출된 후 사용 가능.
      • cancel_join_thread() : join_thread()의 블록 방지. 프로세스 종료 시 배경 스레드를 자동 조인하는 것을 막음.
    • etc
      • SimpleQueue(록이 걸린 Pipe에 가까움), JoinableQueue

 

 

 

+ Recent posts