Merge pull request #759 from crankycookie/jkwon/add-sqs-fifo

Adds the ability to use SQS FIFO queues.
This commit is contained in:
Josh Baker 2024-12-06 13:33:58 -07:00 committed by GitHub
commit 459b3e6152
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 13 additions and 1 deletions

View File

@ -6,6 +6,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/tidwall/gjson"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
@ -108,11 +110,17 @@ func (conn *SQSConn) Send(msg string) error {
} }
queueURL := conn.generateSQSURL() queueURL := conn.generateSQSURL()
// Send message // Create message
sendParams := &sqs.SendMessageInput{ sendParams := &sqs.SendMessageInput{
MessageBody: aws.String(msg), MessageBody: aws.String(msg),
QueueUrl: aws.String(queueURL), QueueUrl: aws.String(queueURL),
} }
if isFifoQueue(queueURL) {
key := gjson.Get(msg, "key")
id := gjson.Get(msg, "id")
keyValue := fmt.Sprintf("%s#%s", key.String(), id.String())
sendParams.MessageGroupId = aws.String(keyValue)
}
_, err := conn.svc.SendMessage(sendParams) _, err := conn.svc.SendMessage(sendParams)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -145,3 +153,7 @@ func sqsRegionFromPlainURL(s string) string {
} }
return "" return ""
} }
func isFifoQueue(s string) bool {
return strings.HasSuffix(s, ".fifo")
}