package main // TODO move this into rems with an SQS+SNS setup import ( "context" "encoding/json" "flag" "fmt" "log" "net/http" "time" "gitlab-app.eng.qops.net/data-store/rems/src/common/id" "gitlab-app.eng.qops.net/data-store/rems/src/common/message/wrap" "gitlab-app.eng.qops.net/golang/isolation/v3" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sqs" ) type Config struct { AWSA string AWSU string AWSP string AWSR string Topic string Message string REMSWrap bool Ack string Pub bool } type isoc struct{} func (_ isoc) GetBrandInfo(context.Context, string) (isolation.BrandInfo, error) { return isolation.BrandInfo{Type: isolation.BrandStatusShared}, nil } func (_ isoc) GenerateDataKey(context.Context, string, map[string]interface{}) (isolation.DataKey, error) { panic(nil) } func (_ isoc) DecryptDataKey(context.Context, string, []byte, map[string]interface{}) (isolation.DataKey, isolation.DecryptKeyResponseMeta) { panic(nil) } func (_ isoc) ReencryptDataKey(context.Context, string, []byte, map[string]interface{}) (isolation.DataKey, error) { panic(nil) } type cter struct{} func (_ cter) GetCiphertext(context.Context, id.Document) ([]byte, time.Time, bool, error) { panic(nil) } func (_ cter) UpsertCiphertext(context.Context, id.Document, []byte, time.Time) error { panic(nil) } func main() { ctx := context.Background() c, err := getConfig() if err != nil { panic(err) } if err := c.Produce(ctx); err != nil { panic(err) } if err := c.Consume(ctx); err != nil { panic(err) } } func getConfig() (Config, error) { var c Config flag.StringVar(&c.AWSA, "awsa", "674592268301", "aws account") flag.StringVar(&c.AWSU, "awsu", "AKIAZ2EGVEAG6HOS65CI", "aws access") flag.StringVar(&c.AWSP, "awsp", "WI6NIgz1Q5ls1FLiUR3Z/Kqje0jIPFmpN8wzxYDa", "aws secret") flag.StringVar(&c.AWSR, "awsr", "eu-central-1", "aws region") flag.StringVar(&c.Topic, "topic", "rems-queue-prod-fra1-priority-low", "sns topic/sqs queue") flag.StringVar(&c.Message, "message", `{"messageType": "NoOpMessage"}`, "published message") flag.StringVar(&c.Ack, "ack", "", "sqs message id to ack") flag.BoolVar(&c.REMSWrap, "rems-wrap", true, "rems wrap message") flag.BoolVar(&c.Pub, "pub", false, "publish message") flag.Parse() if c.REMSWrap { wrap.Ciphertext = cter{} var v interface{} if err := json.Unmarshal([]byte(c.Message), &v); err != nil { return c, err } b, err := wrap.Wrap("", "", v, isoc{}) if err != nil { return c, err } c.Message = string(b) } return c, nil } func (c Config) awsc() *aws.Config { return &aws.Config{ Credentials: credentials.NewStaticCredentials(c.AWSU, c.AWSP, ""), Region: aws.String(c.AWSR), HTTPClient: &http.Client{Timeout: time.Minute}, } } func (c Config) sqsurl() string { return fmt.Sprintf(`https://sqs.%s.amazonaws.com/%s/%s`, c.AWSR, c.AWSA, c.Topic) } func (c Config) Produce(ctx context.Context) error { if !c.Pub { return nil } client := sns.New(session.Must(session.NewSession()), c.awsc()) output, err := client.PublishWithContext(ctx, &sns.PublishInput{ Message: aws.String(c.Message), TopicArn: aws.String(fmt.Sprintf(`arn:aws:sns:%s:%s:%s`, c.AWSR, c.AWSA, c.Topic)), }) log.Printf("%+v", output) return err } func (c Config) Consume(ctx context.Context) error { client := sqs.New(session.Must(session.NewSession()), c.awsc()) messages, err := client.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(c.sqsurl()), MaxNumberOfMessages: aws.Int64(10), VisibilityTimeout: aws.Int64(5), WaitTimeSeconds: aws.Int64(1), }) if err != nil { return err } for _, message := range messages.Messages { var v interface{} if c.REMSWrap { wrap.Unwrap([]byte(*message.Body), &v, isoc{}) } log.Printf("message in queue: %s: %s: %+v", *message.MessageId, *message.Body, v) if *message.MessageId != c.Ack { if err := c.unack(ctx, message); err != nil { return err } } else { if err := c.ack(ctx, message); err != nil { return err } } } return nil } func (c Config) unack(ctx context.Context, message *sqs.Message) error { client := sqs.New(session.Must(session.NewSession()), c.awsc()) if _, err := client.ChangeMessageVisibilityWithContext(ctx, &sqs.ChangeMessageVisibilityInput{ QueueUrl: aws.String(c.sqsurl()), ReceiptHandle: message.ReceiptHandle, VisibilityTimeout: aws.Int64(1), }); err != nil { return err } log.Printf("unacked %s", *message.MessageId) return nil } func (c Config) ack(ctx context.Context, message *sqs.Message) error { client := sqs.New(session.Must(session.NewSession()), c.awsc()) if _, err := client.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{ QueueUrl: aws.String(c.sqsurl()), ReceiptHandle: message.ReceiptHandle, }); err != nil { return err } log.Printf("acked %s", *message.MessageId) return nil }