鱒身(Masu_mi)のブログ

知った事をメモする場所。

RabbitMQ 利用開始

RabbitMQを使う時にだいたい忘れてしまうので次こそはスムースに利用できるように再びメモを残す。

作業履歴

とりあえずyumでインストールしてプラグイン(管理用コンパネ)を導入して起動する。 その後ユーザー(test) とvhost(test_vhost)を追加してtestにはtest_vhost の読み書き書き換え権限を全て与えてみた。

$ uname -a
Linux pit 3.10.0-229.el7.x86_64 #1 SMP Fri Mar 6 11:36:42 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/redhat-release
CentOS Linux release 7.1.1503 (Core)

# add repository EPEL && install erlang && install rabbitmq
$ su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm'
$ sudo yum install erlang
$ yum install rabbitmq-server.noarch

# plugin を有効にする
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo rabbitmq-plugins enable rabbitmq_management_visualiser

# rabbitmq server を動かす
$ sudo rabbitmq-server -detached
Warning: PID file not written; -detached was passed.
$ ps aux | grep rabbitmq
rabbitmq 15877  0.0  0.0  17884   360 ?        S    14:26   0:00 /usr/lib64/erlang/erts-5.10.4/bin/epmd -daemon
rabbitmq 16832  4.4  4.1 606236 42348 ?        Sl   14:42   0:00 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -K true -A30 -P 1048576 -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.3.5/sbin/../ebin -noshell -noinput -s rabbit boot -sname rabbit@pit -boot start_sasl -config /etc/rabbitmq/rabbitmq -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/var/log/rabbitmq/rabbit@pit.log"} -rabbit sasl_error_logger {file,"/var/log/rabbitmq/rabbit@pit-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.3.5/sbin/../plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@pit-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@pit" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput
rabbitmq 16869  0.0  0.0  17848   532 ?        Ss   14:42   0:00 inet_gethost 4
rabbitmq 16870  0.0  0.0  19944   744 ?        S    14:42   0:00 inet_gethost 4
vagrant  16910  0.0  0.0   9036   672 pts/1    R+   14:42   0:00 grep --color=auto rabbitmq

$ sudo rabbitmqctl add_user test pass
$ sudo rabbitmqctl add_vhost test_vhost
$ sudo rabbitmqctl add_vhost test_vhost
> Creating vhost "test_vhost" ...
> ...done.
$ sudo rabbitmqctl set_permissions -p test_vhost test '.*' '.*' '.*'
Setting permissions for user "test" in vhost "test_vhost" ...
...done.

rabbitmq_management プラグインのおかげでコンパネを利用できる。 Vagrantで立てた時に特別な設定しないとiptablesか何かで外部からのアクセスは拒否されてしまう。 ファイアーウォールの設定を整えるか以下のようにトンネルを掘る。

$ ssh -L 8888:pit:15672 vagrant@192.168.33.10
vagrant@192.168.33.10's password:
Last login: Thu Aug 25 15:09:57 2016 from 192.168.33.1
Welcome to your Vagrant-built virtual machine.
コンパネ内容(guestに test_vhostの権限を与える必要がある)

でチュートリアルを真似してプログラムから利用してみる。

package main

import (
        "fmt"
        "log"
        "math/rand"
        "os"
        "strings"
        "time"

        "github.com/streadway/amqp"
)

func main() {
        account := os.Getenv("pg_mq_user")
        password := os.Getenv("pg_mq_pass")
        hosts := strings.Split(os.Getenv("pg_mq_hosts"), ",")
        port := os.Getenv("pg_mq_port")
        vhost := os.Getenv("pg_mq_vhost")

        conn, err := amqp.Dial(fmt.Sprintf(
                "amqp://%s:%s@%s:%s/%s",
                account, password,
                hosts[rand.Intn(len(hosts))], port, vhost,
        ))
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "sample", // name
                false,    // durable
                false,    // delete when unused
                false,    // exclusive
                false,    // no-wait
                nil,      // arguments
        )
        cch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer cch.Close()
        failOnError(err, "Failed to declare a queue")
        go func() {
              num := 0
                for true {
                        body := fmt.Sprintf("hello:%d", num)
                        err = cch.Publish(
                                "",     // exchange
                                q.Name, // routing key
                                false,  // mandatory
                                false,  // immediate
                                amqp.Publishing{
                                        ContentType: "text/plain",
                                        Body:        []byte(body),
                                })
                        failOnError(err, "Failed to publish a message")
                        time.Sleep(1 * time.Second)
                        num++
                }
        }()

        cch2, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer cch2.Close()
        msgs1, err := cch2.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
                for d := range msgs1 {
                        log.Printf("Received a message(1): %s", d.Body)
                        go func(msg amqp.Delivery) {
                                time.Sleep(2 * time.Second)
                                fmt.Println("PRE d.Nack")
                                d.Nack(false, true)
                                fmt.Println("POST d.Nack")
                        }(d)
                        time.Sleep(1 * time.Second)
                }
                fmt.Println("end consume msg1")
        }()
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        msgs2, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
        go func() {
                for d := range msgs2 {
                        log.Printf("Received a message(2): %s", d.Body)
                        time.Sleep(1 * time.Second)
                        go func(msg amqp.Delivery) {
                                time.Sleep(1 * time.Second)
                                fmt.Println("PRE d.Ack")
                                msg.Ack(false)
                                fmt.Println("POST d.Ack")
                        }(d)
                }
        }()
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
        }
}

guestユーザーにtest_vhostの権限を与えると下のような画面をみる事ができる。 Ack, Nackを行うのでガタガタしてくれる。 画像は何度か動かしたあとなので最初からMessageが投入済み。

Queueの内容
$ pg_mq_user=test pg_mq_pass=pass pg_mq_host=pit pg_mq_vhost=test_vhost ./sample
2016/08/25 15:21:30  [*] Waiting for messages. To exit press CTRL+C
2016/08/25 15:21:30 Received a message: hello
2016/08/25 15:21:31 Received a message: hello
2016/08/25 15:21:32 Received a message: hello
2016/08/25 15:21:33 Received a message: hello

# 権限がないので当然 vhost="/"だとアクセス失敗する
$ pg_mq_user=test pg_mq_pass=pass pg_mq_host=pit ./sample
2016/08/25 15:20:56 Failed to connect to RabbitMQ: Exception (403) Reason: "no access to this vhost"

最後にサーバーを落とそう。

$ sudo rabbitmqctl stop