RabbitMQ 利用開始

RabbitMQを使う時にだいたい忘れてしまうので次こそスムースに利用できるよう再びメモを残す。

資料

公式資料

基本的には公式docを読めばいい。

公式Doc以外では以下を読む。

外部資料

昔のメモと重複するけど、下を参考に読んだ。

作業履歴

とりあえずyumでインストールしてプラグイン(管理用コンパネ)を導入して起動する。 その後ユーザー(test) とvhost(test_vhost)を追加してtest にはtest_vhost の読み書き書き換え権限を全て与えてみた。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
$ uname -a
Linux pit 3.10.0-229.el7.x86_64 #1 SMP Fri Mar 6 11:36:42 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
$ cat /etc/redhat-release
CentOS Linux release 7.1.1503 (Core)

# add repository EPEL && install erlang && install rabbitmq
$ su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm'
$ sudo yum install erlang
$ yum install rabbitmq-server.noarch

# plugin を有効にする
$ sudo rabbitmq-plugins enable rabbitmq_management
$ sudo rabbitmq-plugins enable rabbitmq_management_visualiser

# rabbitmq server を動かす
$ sudo rabbitmq-server -detached
Warning: PID file not written; -detached was passed.
$ ps aux | grep rabbitmq
rabbitmq 15877  0.0  0.0  17884   360 ?        S    14:26   0:00 /usr/lib64/erlang/erts-5.10.4/bin/epmd -daemon
rabbitmq 16832  4.4  4.1 606236 42348 ?        Sl   14:42   0:00 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -K true -A30 -P 1048576 -- -root /usr/lib64/erlang -progname erl -- -home /var/lib/rabbitmq -- -pa /usr/lib/rabbitmq/lib/rabbitmq_server-3.3.5/sbin/../ebin -noshell -noinput -s rabbit boot -sname rabbit@pit -boot start_sasl -config /etc/rabbitmq/rabbitmq -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/var/log/rabbitmq/rabbit@pit.log"} -rabbit sasl_error_logger {file,"/var/log/rabbitmq/rabbit@pit-sasl.log"} -rabbit enabled_plugins_file "/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/lib/rabbitmq/lib/rabbitmq_server-3.3.5/sbin/../plugins" -rabbit plugins_expand_dir "/var/lib/rabbitmq/mnesia/rabbit@pit-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/var/lib/rabbitmq/mnesia/rabbit@pit" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput
rabbitmq 16869  0.0  0.0  17848   532 ?        Ss   14:42   0:00 inet_gethost 4
rabbitmq 16870  0.0  0.0  19944   744 ?        S    14:42   0:00 inet_gethost 4
vagrant  16910  0.0  0.0   9036   672 pts/1    R+   14:42   0:00 grep --color=auto rabbitmq

$ sudo rabbitmqctl add_user test pass
$ sudo rabbitmqctl add_vhost test_vhost
$ sudo rabbitmqctl add_vhost test_vhost
> Creating vhost "test_vhost" ...
> ...done.
$ sudo rabbitmqctl set_permissions -p test_vhost test '.*' '.*' '.*'
Setting permissions for user "test" in vhost "test_vhost" ...
...done.

rabbitmq_management プラグインのおかげでコンパネを利用できる。 Vagrant で立てた時に特別な設定しないとiptables か何かで外部からのアクセスは拒否されてしまう。 ファイアーウォールの設定を整えるか以下のようにトンネルを掘る。

1
2
3
4
$ ssh -L 8888:pit:15672 vagrant@192.168.33.10
vagrant@192.168.33.10's password:
Last login: Thu Aug 25 15:09:57 2016 from 192.168.33.1
Welcome to your Vagrant-built virtual machine.

コンパネ内容(guestに test\_vhostの権限を与える必要がある)

でチュートリアルを真似してプログラムから利用してみる。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package main

import (
        "fmt"
        "log"
        "math/rand"
        "os"
        "strings"
        "time"

        "github.com/streadway/amqp"
)

func main() {
        account := os.Getenv("pg_mq_user")
        password := os.Getenv("pg_mq_pass")
        hosts := strings.Split(os.Getenv("pg_mq_hosts"), ",")
        port := os.Getenv("pg_mq_port")
        vhost := os.Getenv("pg_mq_vhost")

        conn, err := amqp.Dial(fmt.Sprintf(
                "amqp://%s:%s@%s:%s/%s",
                account, password,
                hosts[rand.Intn(len(hosts))], port, vhost,
        ))
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "sample", // name
                false,    // durable
                false,    // delete when unused
                false,    // exclusive
                false,    // no-wait
                nil,      // arguments
        )
        cch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer cch.Close()
        failOnError(err, "Failed to declare a queue")
        go func() {
              num := 0
                for true {
                        body := fmt.Sprintf("hello:%d", num)
                        err = cch.Publish(
                                "",     // exchange
                                q.Name, // routing key
                                false,  // mandatory
                                false,  // immediate
                                amqp.Publishing{
                                        ContentType: "text/plain",
                                        Body:        []byte(body),
                                })
                        failOnError(err, "Failed to publish a message")
                        time.Sleep(1 * time.Second)
                        num++
                }
        }()

        cch2, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer cch2.Close()
        msgs1, err := cch2.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
        forever := make(chan bool)
        go func() {
                for d := range msgs1 {
                        log.Printf("Received a message(1): %s", d.Body)
                        go func(msg amqp.Delivery) {
                                time.Sleep(2 * time.Second)
                                fmt.Println("PRE d.Nack")
                                d.Nack(false, true)
                                fmt.Println("POST d.Nack")
                        }(d)
                        time.Sleep(1 * time.Second)
                }
                fmt.Println("end consume msg1")
        }()
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        msgs2, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
        go func() {
                for d := range msgs2 {
                        log.Printf("Received a message(2): %s", d.Body)
                        time.Sleep(1 * time.Second)
                        go func(msg amqp.Delivery) {
                                time.Sleep(1 * time.Second)
                                fmt.Println("PRE d.Ack")
                                msg.Ack(false)
                                fmt.Println("POST d.Ack")
                        }(d)
                }
        }()
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
                panic(fmt.Sprintf("%s: %s", msg, err))
        }
}

guestユーザーにtest_vhostの権限を与えると下のような画面をみる事ができる。 Ack, Nackを行うのでガタガタしてくれる。 画像は何度か動かしたあとなので最初からMessageが投入済み。

Queueの内容

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
$ pg_mq_user=test pg_mq_pass=pass pg_mq_host=pit pg_mq_vhost=test_vhost ./sample
2016/08/25 15:21:30  [*] Waiting for messages. To exit press CTRL+C
2016/08/25 15:21:30 Received a message: hello
2016/08/25 15:21:31 Received a message: hello
2016/08/25 15:21:32 Received a message: hello
2016/08/25 15:21:33 Received a message: hello

# 権限がないので当然 vhost="/"だとアクセス失敗する
$ pg_mq_user=test pg_mq_pass=pass pg_mq_host=pit ./sample
2016/08/25 15:20:56 Failed to connect to RabbitMQ: Exception (403) Reason: "no access to this vhost"

最後にサーバーを落とそう。

1
$ sudo rabbitmqctl stop
comments powered by Disqus