sns-sqs-test/main.go

187 lines
5.1 KiB
Go

package main
// TODO move this into rems with an SQS+SNS setup
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"time"
"gitlab-app.eng.qops.net/data-store/rems/src/common/id"
"gitlab-app.eng.qops.net/data-store/rems/src/common/message/wrap"
"gitlab-app.eng.qops.net/golang/isolation/v3"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
)
type Config struct {
AWSA string
AWSU string
AWSP string
AWSR string
Topic string
Message string
REMSWrap bool
Ack string
Pub bool
}
type isoc struct{}
func (_ isoc) GetBrandInfo(context.Context, string) (isolation.BrandInfo, error) {
return isolation.BrandInfo{Type: isolation.BrandStatusShared}, nil
}
func (_ isoc) GenerateDataKey(context.Context, string, map[string]interface{}) (isolation.DataKey, error) {
panic(nil)
}
func (_ isoc) DecryptDataKey(context.Context, string, []byte, map[string]interface{}) (isolation.DataKey, isolation.DecryptKeyResponseMeta) {
panic(nil)
}
func (_ isoc) ReencryptDataKey(context.Context, string, []byte, map[string]interface{}) (isolation.DataKey, error) {
panic(nil)
}
type cter struct{}
func (_ cter) GetCiphertext(context.Context, id.Document) ([]byte, time.Time, bool, error) {
panic(nil)
}
func (_ cter) UpsertCiphertext(context.Context, id.Document, []byte, time.Time) error {
panic(nil)
}
func main() {
ctx := context.Background()
c, err := getConfig()
if err != nil {
panic(err)
}
if err := c.Produce(ctx); err != nil {
panic(err)
}
if err := c.Consume(ctx); err != nil {
panic(err)
}
}
func getConfig() (Config, error) {
var c Config
flag.StringVar(&c.AWSA, "awsa", "674592268301", "aws account")
flag.StringVar(&c.AWSU, "awsu", "AKIAZ2EGVEAG6HOS65CI", "aws access")
flag.StringVar(&c.AWSP, "awsp", "WI6NIgz1Q5ls1FLiUR3Z/Kqje0jIPFmpN8wzxYDa", "aws secret")
flag.StringVar(&c.AWSR, "awsr", "eu-central-1", "aws region")
flag.StringVar(&c.Topic, "topic", "rems-queue-prod-fra1-priority-low", "sns topic/sqs queue")
flag.StringVar(&c.Message, "message", `{"messageType": "NoOpMessage"}`, "published message")
flag.StringVar(&c.Ack, "ack", "", "sqs message id to ack")
flag.BoolVar(&c.REMSWrap, "rems-wrap", true, "rems wrap message")
flag.BoolVar(&c.Pub, "pub", false, "publish message")
flag.Parse()
if c.REMSWrap {
wrap.Ciphertext = cter{}
var v interface{}
if err := json.Unmarshal([]byte(c.Message), &v); err != nil {
return c, err
}
b, err := wrap.Wrap("", "", v, isoc{})
if err != nil {
return c, err
}
c.Message = string(b)
}
return c, nil
}
func (c Config) awsc() *aws.Config {
return &aws.Config{
Credentials: credentials.NewStaticCredentials(c.AWSU, c.AWSP, ""),
Region: aws.String(c.AWSR),
HTTPClient: &http.Client{Timeout: time.Minute},
}
}
func (c Config) sqsurl() string {
return fmt.Sprintf(`https://sqs.%s.amazonaws.com/%s/%s`, c.AWSR, c.AWSA, c.Topic)
}
func (c Config) Produce(ctx context.Context) error {
if !c.Pub {
return nil
}
client := sns.New(session.Must(session.NewSession()), c.awsc())
output, err := client.PublishWithContext(ctx, &sns.PublishInput{
Message: aws.String(c.Message),
TopicArn: aws.String(fmt.Sprintf(`arn:aws:sns:%s:%s:%s`, c.AWSR, c.AWSA, c.Topic)),
})
log.Printf("%+v", output)
return err
}
func (c Config) Consume(ctx context.Context) error {
client := sqs.New(session.Must(session.NewSession()), c.awsc())
input := sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.sqsurl()),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(1),
}
messages, err := client.ReceiveMessageWithContext(ctx, &input)
if err != nil {
return fmt.Errorf("failed to consume with %+v at %s: %w", c, c.sqsurl(), err)
}
for _, message := range messages.Messages {
var v interface{}
if c.REMSWrap {
wrap.Unwrap([]byte(*message.Body), &v, isoc{})
}
log.Printf("message in queue: %s: %s: %+v", *message.MessageId, *message.Body, v)
if *message.MessageId != c.Ack {
if err := c.unack(ctx, message); err != nil {
return err
}
} else {
if err := c.ack(ctx, message); err != nil {
return err
}
}
}
return nil
}
func (c Config) unack(ctx context.Context, message *sqs.Message) error {
client := sqs.New(session.Must(session.NewSession()), c.awsc())
if _, err := client.ChangeMessageVisibilityWithContext(ctx, &sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(c.sqsurl()),
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: aws.Int64(1),
}); err != nil {
return err
}
log.Printf("unacked %s", *message.MessageId)
return nil
}
func (c Config) ack(ctx context.Context, message *sqs.Message) error {
client := sqs.New(session.Must(session.NewSession()), c.awsc())
if _, err := client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(c.sqsurl()),
ReceiptHandle: message.ReceiptHandle,
}); err != nil {
return err
}
log.Printf("acked %s", *message.MessageId)
return nil
}