鱒身(Masu_mi)のブログ

知った事をメモする場所。

メモ: ZeroMQの入り口

通信の基本的な意味でZeroMQ(v4)を勉強した。

通信状況を解析する

通信内容を解析したい

wiresharkで解析するDissectorを書いてる人がいたので利用する。

通信量を追いたい

トラフィック量だけでいいのであればやってみた的な記事がコミュニティ配下にあったので参考にする。

リーディング

ZeroMQのトランスポートプロトコル(QUIC,PCC,SCTP,…)を追加したくなったと仮定してコードを読んだ。

TL;DR

手始めにzmq_bindを起点にTCPに重点を置いてコードを追った。 bindを成功させるには下をすれば良いと思われる所までは読めた(はず)。

  • プロトコルに対応付けるオブジェクトをown_tを継承して作成する
  • zmq::socket_base_t::check_protocol, zmq::socket_base_t::bindを書き換えてプロトコルを許可する
  • zmq::socket_base_t::bindの追加したコードでzmq::socket_base_t::add_endpointを呼び出す

続けてconnect, recv, send系を読んで条件を確認すればZeroMQのトランスポートレイヤ追加も夢じゃない!(はず..) recvくらいは読むかもだけど本格的に追うかは迷い。 あとIPv6周辺のコードが未だに苦手だと感じたので勉強しておきたい。

リーディングメモ

追いやすいところからコードは入ると良い。 API Referenceで公開されているAPIを確認する。 ØMQ - The Guideで使い方の流れを押さえておく。

流れは下な感じになる。

  1. contextを作り
  2. socketを作り
  3. bindしたりconnectする
  4. recvしたりsendしたりする

zmq_bind関数を追う

Cではzmq_bind関数によりソケットをトランスポートプロトコルと紐付ける。

include/zmq.h
423
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);

具体的には以下のようになっている。void *として受け取ったソケットを検査しbindメソッドを呼び出している。 バインドにはアドレスとして(char *addr_)が入る。

src/zmq.cpp
327
328
329
330
331
332
333
334
335
336
int zmq_bind (void *s_, const char *addr_)
{
  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
    errno = ENOTSOCK;
    return -1;
  }
  zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
  int result = s->bind (addr_);
  return result;
}

s->bindのコード箇所はsrc/socket_base.cppにある。

src/socket_base.cpp
496
497
498
499
int zmq::socket_base_t::bind (const char *addr_)
{
  // ...
}

socket_base_t::bindについてtcp に注目して大枠を確認する。 ZeroMQのソケットは配下に多くのプロトコルやエンドポイントを保持して1つとして扱えるためadd_endpoint()によりエンドポイント毎に管理していると想像した。

  1. 上位レイヤから渡されたアドレス(addr_)をaddressに格納する
  2. listner->set_address() の中、下位のTCPソケットを準備(open_socket()からlisten()まで)して# lisner->sで保持する
  3. add_endpoint()でリスナーをエンドポイントをキーにsocket_base_tのソケットに接続する
int zmq::socket_base_t::bind (const char *addr_)
{
  // ...
  std::string protocol;
  std::string address;
  // アドレスの事前処理: char * -> std::stringへの変換とパース
  // プロトコルがサポートされているか確認
  if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
    return -1;
  }
  // ...
  if (protocol == "tcp") {
    tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
      io_thread, this, options);
    // ...assertionとか

    // 以下の listner->set_address アドレスを解決してリッスンまでしている
    rc = listener->set_address (address.c_str ());
    // rc を用いたエラー処理
    listener->get_address (last_endpoint);
    add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
    options.connected = true;
    return 0;
  }
  // ...
}

add_endpointを確認するとlaunch_chidでソケットとリスナーを繋いでendpoints.insertで管理情報を登録していた。

986
987
988
989
990
991
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{
  //  Activate the session. Make it a child of this socket.
  launch_child (endpoint_);
  endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
}

ちなみにset_address内で確保されるTCPソケットは以下のようなオプションが設定されていてbindしたいアドレスがTIME_WAITで残っていても大丈夫。

//...
s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
// ...
#ifdef ZMQ_HAVE_WINDOWS
// ...
# else
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
# endif
// ...