From d24fbd02da9699ac8fb3c36c2a4229715ee4f6f8 Mon Sep 17 00:00:00 2001 From: Jason Kwon Date: Thu, 5 Dec 2024 11:57:50 +0100 Subject: [PATCH] Adds the ability to use SQS FIFO queues. All SQS FIFO queues have the `.fifo` suffix at the end of their URL - this is how we determine if we are using a FIFO queue or not. The `MessageGroupId` (A required field for FIFO) is set using the key & id of the event's message. Note: This commit does not add the the `MessageDeduplicationID` so the FIFO queue must have content-based deduplication enabled. --- internal/endpoint/sqs.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/endpoint/sqs.go b/internal/endpoint/sqs.go index e83a8711..c4ad42fc 100644 --- a/internal/endpoint/sqs.go +++ b/internal/endpoint/sqs.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/tidwall/gjson" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" @@ -108,11 +110,17 @@ func (conn *SQSConn) Send(msg string) error { } queueURL := conn.generateSQSURL() - // Send message + // Create message sendParams := &sqs.SendMessageInput{ MessageBody: aws.String(msg), QueueUrl: aws.String(queueURL), } + if isFifoQueue(queueURL) { + key := gjson.Get(msg, "key") + id := gjson.Get(msg, "id") + keyValue := fmt.Sprintf("%s#%s", key.String(), id.String()) + sendParams.MessageGroupId = aws.String(keyValue) + } _, err := conn.svc.SendMessage(sendParams) if err != nil { fmt.Println(err) @@ -145,3 +153,7 @@ func sqsRegionFromPlainURL(s string) string { } return "" } + +func isFifoQueue(s string) bool { + return strings.HasSuffix(s, ".fifo") +}