メモ: ZeroMQの入り口

通信の基本的な意味で*ZeroMQ(v4)* を勉強した。

資料

下に目を通せば大丈夫そうな気配がある。

通信状況を解析する

通信内容を解析したい

wiresharkで解析するDissectorを書いてる人がいたので利用する。

通信量を追いたい

トラフィック量だけでいいのであればやってみた的な記事がコミュニティ配下にあったので参考にする。

リーディング

ZeroMQ のトランスポートプロトコル (QUIC,PCC,SCTP, …)を追加したくなったと仮定してコードを読んだ。

TL;DR

手始めにzmq_bind を起点にTCPに重点を置いてコードを追った。 bindを成功させるには下をすれば良いと思われる所までは読めた(はず)。

  • プロトコルに対応付けるオブジェクトをown_t を継承して作成する
  • zmq::socket_base_t::check_protocol, zmq::socket_base_t::bind を変更しプロトコルを許可する
  • zmq::socket_base_t::bind の追加したコードでzmq::socket_base_t::add_endpoint を呼び出す

続けてconnect, recv, send系を読んで条件を確認すればZeroMQのトランスポートレイヤ追加も夢じゃない!(はず..) recvくらいは読むかもだけど本格的に追うかは迷い。 あとIPv6周辺のコードが未だに苦手だと感じたので勉強しておきたい。

リーディングメモ

追いやすいところからコードは入ると良い。 API Referenceで公開されているAPIを確認する。 ØMQ - The Guideで使い方の流れを押さえておく。

流れは下な感じになる。

  1. contextを作り
  2. socketを作り
  3. bindしたりconnectする
  4. recvしたりsendしたりする

zmq_bind関数を追う

Cではzmq_bind 関数によりソケットをトランスポートプロトコルと紐付ける。

include/zmq.h

1
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);

具体的には以下のようになっている。void * として受け取ったソケットを検査しbind メソッドを呼び出している。 バインドにはアドレスとして(char *addr_)が入る。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
int zmq_bind (void *s_, const char *addr_)
{
  if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
    errno = ENOTSOCK;
    return -1;
  }
  zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
  int result = s->bind (addr_);
  return result;
}

s->bindのコード箇所はsrc/socket_base.cpp にある。

socket_base_t::bind についてtcp に注目して大枠を確認する。 ZeroMQ のソケットは配下に多くのプロトコルやエンドポイントを保持して1つとして扱えるためadd_endpoint() によりエンドポイント毎に管理していると想像した。

  1. 上位レイヤから渡されたアドレス*(addr_*)をaddressに格納する
  2. listner->set_address() の中、下位のTCPソケットを準備(open_socket() からlisten() まで)して*# lisner->s* で保持する
  3. add_endpoint() でリスナーをエンドポイントをキーにsocket_base_t のソケットに接続する
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int zmq::socket_base_t::bind (const char *addr_)
{
  // ...
  std::string protocol;
  std::string address;
  // アドレスの事前処理: char * -> std::stringへの変換とパース
  // プロトコルがサポートされているか確認
  if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
    return -1;
  }
  // ...
  if (protocol == "tcp") {
    tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (
      io_thread, this, options);
    // ...assertionとか

    // 以下の listner->set_address アドレスを解決してリッスンまでしている
    rc = listener->set_address (address.c_str ());
    // rc を用いたエラー処理
    listener->get_address (last_endpoint);
    add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
    options.connected = true;
    return 0;
  }
  // ...
}

add_endpoint を確認するとlaunch_chid でソケットとリスナーを繋いでendpoints.insert で管理情報を登録していた。

1
2
3
4
5
6
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe)
{
  //  Activate the session. Make it a child of this socket.
  launch_child (endpoint_);
  endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe)));
}

ちなみにset_address 内で確保されるTCPソケットは以下のようなオプションが設定されていてbindしたいアドレスがTIME_WAIT で残っていても大丈夫。

1
2
3
4
5
6
7
8
9
//...
s = open_socket(address.family (), SOCK_STREAM, IPPROTO_TCP);
// ...
#ifdef ZMQ_HAVE_WINDOWS
// ...
# else
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
# endif
// ...
comments powered by Disqus