とことんDevOps | 日本仮想化技術のDevOps技術情報メディア

DevOpsに関連する技術情報を幅広く提供していきます。

日本仮想化技術がお届けする「とことんDevOps」では、DevOpsに関する技術情報や、日々のDevOps業務の中での検証結果、TipsなどDevOpsのお役立ち情報をお届けします。
主なテーマ: DevOps、CI/CD、コンテナ開発、IaCなど
読者登録と各種SNSのフォローもよろしくお願いいたします。

Go言語でAmazon SQS(ElasticMQ)を使ってみよう

SQSのローカル環境用にElasticMQを動かしてみたでローカル開発環境用にAmazon SQS(以降、SQS)互換のElasticMQを使って環境を構築しました。 今回は、Go言語からSQSを使ってキューの送信、受信、削除の流れを試してみたいと思います。

Amazon SQSとは

Amazon SQSは、分散型のメッセージキューサービスです。身近な例で言うと、待ち行列のようなものです。 正確には設定によって異なるのですが、並び順を厳密に守るFIFOキューと、順番を保証しない標準キューがあります。

aws.amazon.com

事前に済ませておくこと

  • Go言語の開発環境が整っていること
  • ElasticMQが起動していること
  • AWS CLIからAWSサービスを使うための認証情報が設定されていること

メッセージキュー送信

Go言語からSQSにメッセージを送信してみます。

はじめに、これから使うSDKのライブラリをインストールします。実際の開発を想定すると必要最小限のパッケージになるようにもう少し細かく指定してもいいかもしれませんが、今回は全てのパッケージをインストールしています。

go get -u github.com/aws/aws-sdk-go

main.goに以下のコードを書きます。

package main

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sqs"

)

func main() {
    endpoint := "http://localhost:9324"
    region := "elasticmq"
    awsSession, err := session.NewSession(&aws.Config{
        Region:      aws.String(region),
        Endpoint:    aws.String(endpoint),
        Credentials: credentials.NewStaticCredentials("x", "x", ""), // ダミーの認証情報
    })
    if err != nil {
        panic(err)
    }
    
    svc := sqs.New(awsSession)
    
    messageBody := "Hello, Message Queue!"
    
    queueURL := "http://localhost:9324/query/test"
    sendMsgInput := &sqs.SendMessageInput{
        MessageBody: aws.String(messageBody),
        QueueUrl:    aws.String(queueURL),
    }
    
    result, err := svc.SendMessage(sendMsgInput)
    if err != nil {
        log.Fatalf("Failed to send message: %v", err)
    }
    
    fmt.Printf("Successfully sent message to SQS with ID: %s\n", *result.MessageId)
}
awsSession, err := session.NewSession(&aws.Config{
    Region:      aws.String(region),
    Endpoint:    aws.String(endpoint),
    Credentials: credentials.NewStaticCredentials("x", "x", ""), // ダミーの認証情報
})

session.NewSessionでセッションを作成します。aws.Configには、リージョン、エンドポイント、認証情報を設定します。 今回はローカル環境でElasticMQを使っているので、リージョンはelasticmq、エンドポイントはhttp://localhost:9324、認証情報はダミーの情報を設定しています。

svc := sqs.New(awsSession)

svc := sqs.New(awsSession)でSQSのクライアントを作成します。

sendMsgInput := &sqs.SendMessageInput{
    MessageBody: aws.String(string(messageBody)),
    QueueUrl:    aws.String(queueURL),
}

SendMessageInputには、メッセージの本文と送信先のキューのURLを設定します。

result, err := svc.SendMessage(sendMsgInput)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}

svc.SendMessage(sendMsgInput)でメッセージを送信します。成功した場合は、result.MessageIdにメッセージIDが入ります。

実際にコードを実行してみましょう。

go run main.go

実行に成功すると、以下のようなメッセージが表示されます。

Successfully sent message to SQS with ID: 1b50d52f-f8a6-4f75-94cb-83309ad2dd43

ElasticMQのダッシュボードを開いて、キューにメッセージが追加されていることを確認してみましょう。

メッセージキュー受信

package main

import (
    "fmt"
    "log"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sqs"
)

func main() {
    endpoint := "http://localhost:9324"
    region := "elasticmq"
    awsSession, err := session.NewSession(&aws.Config{
        Region:      aws.String(region),
        Endpoint:    aws.String(endpoint),
        Credentials: credentials.NewStaticCredentials("x", "x", ""), // ダミーの認証情報
    })
    if err != nil {
        panic(err)
    }

    svc := sqs.New(awsSession)

    queueURL := "http://localhost:9324/query/test"
    receiveMsgInput := &sqs.ReceiveMessageInput{
        QueueUrl:            aws.String(queueURL),
        MaxNumberOfMessages: aws.Int64(1), // 1度に取得するメッセージの数
        WaitTimeSeconds:     aws.Int64(10),     // ロングポーリングの時間(秒)
    }

    fmt.Println("Waiting to receive message...")
    receiveResult, err := svc.ReceiveMessage(receiveMsgInput)
    if err != nil {
        log.Fatalf("Failed to receive message: %v", err)
    }

    if len(receiveResult.Messages) > 0 {
        fmt.Printf("Received message: %s\n", *receiveResult.Messages[0].Body)
    } else {
        fmt.Println("No messages received")
    }
}
receiveMsgInput := &sqs.ReceiveMessageInput{
    QueueUrl:            aws.String(queueURL),
    MaxNumberOfMessages: aws.Int64(1), // 1度に取得するメッセージの数
    WaitTimeSeconds:     aws.Int64(10),     // ロングポーリングの時間(秒)
}

ReceiveMessageInputには、キューのURL、1度に取得するメッセージの数、ロングポーリングの時間を設定します。

receiveResult, err := svc.ReceiveMessage(receiveMsgInput)
if err != nil {
    log.Fatalf("Failed to receive message: %v", err)
}

svc.ReceiveMessage(receiveMsgInput)でメッセージを受信します。成功した場合は、receiveResult.Messagesにメッセージが入ります。

実際にコードを実行してみましょう。

go run main.go

実行に成功すると、以下のようなメッセージが表示されます。

Waiting to receive message...
Received message: Hello, Message Queue!

メッセージキュー削除

キューに登録されているメッセージは、処理が完了したら削除することが一般的です。 削除しないと、同じメッセージが何度も処理されてしまう可能性があります。

package main

import (
    "fmt"
    "log"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sqs"
)

func main() {
    endpoint := "http://localhost:9324"
    region := "elasticmq"
    awsSession, err := session.NewSession(&aws.Config{
        Region:      aws.String(region),
        Endpoint:    aws.String(endpoint),
        Credentials: credentials.NewStaticCredentials("x", "x", ""), // ダミーの認証情報
    })
    if err != nil {
        panic(err)
    }

    svc := sqs.New(awsSession)

    queueURL := "http://localhost:9324/query/test"
    receiveMsgInput := &sqs.ReceiveMessageInput{
        QueueUrl:            aws.String(queueURL),
        MaxNumberOfMessages: aws.Int64(1), // 1度に取得するメッセージの数
        WaitTimeSeconds:     aws.Int64(10),     // ロングポーリングの時間(秒)
    }

    fmt.Println("Waiting to receive message...")
    receiveResult, err := svc.ReceiveMessage(receiveMsgInput)
    if err != nil {
        log.Fatalf("Failed to receive message: %v", err)
    }

    if len(receiveResult.Messages) > 0 {
        fmt.Printf("Received message: %s\n", *receiveResult.Messages[0].Body)
        deleteMsgInput := &sqs.DeleteMessageInput{
            QueueUrl:      aws.String(queueURL),
            ReceiptHandle: receiveResult.Messages[0].ReceiptHandle,
        }
        _, err := svc.DeleteMessage(deleteMsgInput)
        if err != nil {
            log.Fatalf("Failed to delete message: %v", err)
        }
        fmt.Println("Successfully deleted message")
    } else {
        fmt.Println("No messages received")
    }
}
deleteMsgInput := &sqs.DeleteMessageInput{
    QueueUrl:      aws.String(queueURL),
    ReceiptHandle: receiveResult.Messages[0].ReceiptHandle,
}
_, err := svc.DeleteMessage(deleteMsgInput)
if err != nil {
    log.Fatalf("Failed to delete message: %v", err)
}

DeleteMessageInputには、キューのURLとメッセージのReceiptHandleを設定します。 ReceiptHandleは、メッセージを受信した際に取得できるメッセージの識別子です。

実際にコードを実行してみましょう。

go run main.go

実行に成功すると、以下のようなメッセージが表示されます。

Waiting to receive message...
Received message: Hello, Message Queue!
Successfully deleted message

ElasticMQのダッシュボードを開いて、キューにメッセージが削除されていることを確認してみましょう。

まとめ

Go言語からSQSを使ってメッセージの送信、受信、削除の流れを試してみました。 右も左もわからないときは写経的にコードを書いて実行してみたくなるのですが、ちょうどいいサンプルが見つからないときもあるので、今回は、備忘録も兼ねて自分で書いてみました。