From 9a916ada098deca07d7941d112bdb94247759bdb Mon Sep 17 00:00:00 2001 From: Bel LaPointe Date: Mon, 6 Feb 2023 08:02:06 -0700 Subject: [PATCH] mvp --- main.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 74 insertions(+), 10 deletions(-) diff --git a/main.go b/main.go index 0ede702..512d030 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,6 @@ package main import ( "context" "encoding/json" - "errors" "flag" "fmt" "log" @@ -20,16 +19,20 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" + "github.com/aws/aws-sdk-go/service/sqs" ) type Config struct { - AWSA string - AWSU string - AWSP string - AWSR string - Topic string - Message string - Wrap bool + AWSA string + AWSU string + AWSP string + AWSR string + Topic string + Message string + Wrap bool + MessageID string + Ack bool + Pub bool } type isoc struct{} @@ -63,7 +66,6 @@ func main() { if err != nil { panic(err) } - log.Printf("%+v", c) if err := c.Produce(ctx); err != nil { panic(err) @@ -82,7 +84,10 @@ func getConfig() (Config, error) { flag.StringVar(&c.AWSR, "awsr", "eu-central-1", "aws region") flag.StringVar(&c.Topic, "topic", "rems-queue-prod-fra1-priority-low", "sns topic/sqs queue") flag.StringVar(&c.Message, "message", `{"messageType": "NoOpMessage"}`, "published message") + flag.StringVar(&c.MessageID, "message-id", "", "sqs message id to ack") flag.BoolVar(&c.Wrap, "wrap", true, "rems wrap message") + flag.BoolVar(&c.Ack, "ack", true, "ack message matching message-id") + flag.BoolVar(&c.Pub, "pub", false, "publish message") flag.Parse() if c.Wrap { @@ -109,7 +114,14 @@ func (c Config) awsc() *aws.Config { } } +func (c Config) sqsurl() string { + return fmt.Sprintf(`https://sqs.%s.amazonaws.com/%s/%s`, c.AWSR, c.AWSA, c.Topic) +} + func (c Config) Produce(ctx context.Context) error { + if !c.Pub { + return nil + } client := sns.New(session.Must(session.NewSession()), c.awsc()) output, err := client.PublishWithContext(ctx, &sns.PublishInput{ Message: aws.String(c.Message), @@ -120,5 +132,57 @@ func (c Config) Produce(ctx context.Context) error { } func (c Config) Consume(ctx context.Context) error { - return errors.New("not impl") + client := sqs.New(session.Must(session.NewSession()), c.awsc()) + messages, err := client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: aws.String(c.sqsurl()), + MaxNumberOfMessages: aws.Int64(10), + VisibilityTimeout: aws.Int64(5), + WaitTimeSeconds: aws.Int64(1), + }) + if err != nil { + return err + } + for _, message := range messages.Messages { + var v interface{} + wrap.Unwrap([]byte(*message.Body), &v, isoc{}) + log.Printf("message in queue: %s: %s: %+v", *message.MessageId, *message.Body, v) + if *message.MessageId != c.MessageID { + if err := c.unack(ctx, message); err != nil { + return err + } + } else { + if err := c.ack(ctx, message); err != nil { + return err + } + } + } + return nil +} + +func (c Config) unack(ctx context.Context, message *sqs.Message) error { + client := sqs.New(session.Must(session.NewSession()), c.awsc()) + if _, err := client.ChangeMessageVisibilityWithContext(ctx, &sqs.ChangeMessageVisibilityInput{ + QueueUrl: aws.String(c.sqsurl()), + ReceiptHandle: message.ReceiptHandle, + VisibilityTimeout: aws.Int64(1), + }); err != nil { + return err + } + log.Printf("unacked %s", *message.MessageId) + return nil +} + +func (c Config) ack(ctx context.Context, message *sqs.Message) error { + if !c.Ack { + return c.unack(ctx, message) + } + client := sqs.New(session.Must(session.NewSession()), c.awsc()) + if _, err := client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{ + QueueUrl: aws.String(c.sqsurl()), + ReceiptHandle: message.ReceiptHandle, + }); err != nil { + return err + } + log.Printf("acked %s", *message.MessageId) + return nil }