master
Bel LaPointe 2023-02-06 08:02:06 -07:00
parent 30d492cf31
commit 9a916ada09
1 changed files with 74 additions and 10 deletions

84
main.go
View File

@ -5,7 +5,6 @@ package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"flag" "flag"
"fmt" "fmt"
"log" "log"
@ -20,16 +19,20 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sqs"
) )
type Config struct { type Config struct {
AWSA string AWSA string
AWSU string AWSU string
AWSP string AWSP string
AWSR string AWSR string
Topic string Topic string
Message string Message string
Wrap bool Wrap bool
MessageID string
Ack bool
Pub bool
} }
type isoc struct{} type isoc struct{}
@ -63,7 +66,6 @@ func main() {
if err != nil { if err != nil {
panic(err) panic(err)
} }
log.Printf("%+v", c)
if err := c.Produce(ctx); err != nil { if err := c.Produce(ctx); err != nil {
panic(err) panic(err)
@ -82,7 +84,10 @@ func getConfig() (Config, error) {
flag.StringVar(&c.AWSR, "awsr", "eu-central-1", "aws region") flag.StringVar(&c.AWSR, "awsr", "eu-central-1", "aws region")
flag.StringVar(&c.Topic, "topic", "rems-queue-prod-fra1-priority-low", "sns topic/sqs queue") flag.StringVar(&c.Topic, "topic", "rems-queue-prod-fra1-priority-low", "sns topic/sqs queue")
flag.StringVar(&c.Message, "message", `{"messageType": "NoOpMessage"}`, "published message") flag.StringVar(&c.Message, "message", `{"messageType": "NoOpMessage"}`, "published message")
flag.StringVar(&c.MessageID, "message-id", "", "sqs message id to ack")
flag.BoolVar(&c.Wrap, "wrap", true, "rems wrap message") flag.BoolVar(&c.Wrap, "wrap", true, "rems wrap message")
flag.BoolVar(&c.Ack, "ack", true, "ack message matching message-id")
flag.BoolVar(&c.Pub, "pub", false, "publish message")
flag.Parse() flag.Parse()
if c.Wrap { if c.Wrap {
@ -109,7 +114,14 @@ func (c Config) awsc() *aws.Config {
} }
} }
func (c Config) sqsurl() string {
return fmt.Sprintf(`https://sqs.%s.amazonaws.com/%s/%s`, c.AWSR, c.AWSA, c.Topic)
}
func (c Config) Produce(ctx context.Context) error { func (c Config) Produce(ctx context.Context) error {
if !c.Pub {
return nil
}
client := sns.New(session.Must(session.NewSession()), c.awsc()) client := sns.New(session.Must(session.NewSession()), c.awsc())
output, err := client.PublishWithContext(ctx, &sns.PublishInput{ output, err := client.PublishWithContext(ctx, &sns.PublishInput{
Message: aws.String(c.Message), Message: aws.String(c.Message),
@ -120,5 +132,57 @@ func (c Config) Produce(ctx context.Context) error {
} }
func (c Config) Consume(ctx context.Context) error { func (c Config) Consume(ctx context.Context) error {
return errors.New("not impl") client := sqs.New(session.Must(session.NewSession()), c.awsc())
messages, err := client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.sqsurl()),
MaxNumberOfMessages: aws.Int64(10),
VisibilityTimeout: aws.Int64(5),
WaitTimeSeconds: aws.Int64(1),
})
if err != nil {
return err
}
for _, message := range messages.Messages {
var v interface{}
wrap.Unwrap([]byte(*message.Body), &v, isoc{})
log.Printf("message in queue: %s: %s: %+v", *message.MessageId, *message.Body, v)
if *message.MessageId != c.MessageID {
if err := c.unack(ctx, message); err != nil {
return err
}
} else {
if err := c.ack(ctx, message); err != nil {
return err
}
}
}
return nil
}
func (c Config) unack(ctx context.Context, message *sqs.Message) error {
client := sqs.New(session.Must(session.NewSession()), c.awsc())
if _, err := client.ChangeMessageVisibilityWithContext(ctx, &sqs.ChangeMessageVisibilityInput{
QueueUrl: aws.String(c.sqsurl()),
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: aws.Int64(1),
}); err != nil {
return err
}
log.Printf("unacked %s", *message.MessageId)
return nil
}
func (c Config) ack(ctx context.Context, message *sqs.Message) error {
if !c.Ack {
return c.unack(ctx, message)
}
client := sqs.New(session.Must(session.NewSession()), c.awsc())
if _, err := client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(c.sqsurl()),
ReceiptHandle: message.ReceiptHandle,
}); err != nil {
return err
}
log.Printf("acked %s", *message.MessageId)
return nil
} }