mirror of https://github.com/tidwall/tile38.git
Adds the ability to use SQS FIFO queues.
All SQS FIFO queues have the `.fifo` suffix at the end of their URL - this is how we determine if we are using a FIFO queue or not. The `MessageGroupId` (A required field for FIFO) is set using the key & id of the event's message. Note: This commit does not add the the `MessageDeduplicationID` so the FIFO queue must have content-based deduplication enabled.
This commit is contained in:
parent
d445566c06
commit
d24fbd02da
|
@ -6,6 +6,8 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
@ -108,11 +110,17 @@ func (conn *SQSConn) Send(msg string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
queueURL := conn.generateSQSURL()
|
queueURL := conn.generateSQSURL()
|
||||||
// Send message
|
// Create message
|
||||||
sendParams := &sqs.SendMessageInput{
|
sendParams := &sqs.SendMessageInput{
|
||||||
MessageBody: aws.String(msg),
|
MessageBody: aws.String(msg),
|
||||||
QueueUrl: aws.String(queueURL),
|
QueueUrl: aws.String(queueURL),
|
||||||
}
|
}
|
||||||
|
if isFifoQueue(queueURL) {
|
||||||
|
key := gjson.Get(msg, "key")
|
||||||
|
id := gjson.Get(msg, "id")
|
||||||
|
keyValue := fmt.Sprintf("%s#%s", key.String(), id.String())
|
||||||
|
sendParams.MessageGroupId = aws.String(keyValue)
|
||||||
|
}
|
||||||
_, err := conn.svc.SendMessage(sendParams)
|
_, err := conn.svc.SendMessage(sendParams)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
|
@ -145,3 +153,7 @@ func sqsRegionFromPlainURL(s string) string {
|
||||||
}
|
}
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isFifoQueue(s string) bool {
|
||||||
|
return strings.HasSuffix(s, ".fifo")
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue