truckstop/main.go

356 lines
8.4 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"local/storage"
"local/truckstop/broker"
"local/truckstop/config"
"local/truckstop/message"
"log"
"net/url"
"regexp"
"sort"
"strings"
"sync"
"time"
)
var stateFinder = regexp.MustCompile(`[A-Za-z]+`)
func main() {
if err := config.Refresh(); err != nil {
panic(err)
}
if err := matrixrecv(); err != nil {
panic(err)
}
lock := &sync.Mutex{}
go func() {
for {
time.Sleep(config.Get().Interval.Input.Get())
if err := config.Refresh(); err != nil {
log.Println(err)
} else {
if config.Get().Message.Matrix.ReceiveEnabled {
lock.Lock()
if err := matrixrecv(); err != nil {
log.Print(err)
}
lock.Unlock()
}
}
}
}()
if err := _main(); err != nil {
panic(err)
}
lock.Lock()
}
func matrixrecv() error {
log.Printf("checking matrix...")
defer log.Printf("/checking matrix...")
sender := message.NewMatrix()
messages, err := sender.Receive()
if err != nil {
return err
}
func() {
log.Printf("looking for help")
printed := false
for _, msg := range messages {
if !strings.HasPrefix(msg.Content, "!help") {
continue
}
key := fmt.Sprintf("help_%d", msg.Timestamp.Unix())
db := config.Get().DB()
if !printed {
if _, err := db.Get(key); err == storage.ErrNotFound {
log.Printf("sending help")
help := fmt.Sprintf("commands:\n...`!help`...print this help\n...`!state nc NC nC Nc`...set states for self\n...`!available 2022-12-31`...set date self is available for work\n\nrun a command for someone else: `!state ga @caleb`")
if err := sender.Send(help); err != nil {
log.Printf("failed to send help: %v", err)
} else {
printed = true
if err := db.Set(key, []byte{'k'}); err != nil {
log.Printf("failed to mark help given @%s: %v", key, err)
}
}
}
} else {
if err := db.Set(key, []byte{'k'}); err != nil {
log.Printf("failed to mark help given @%s: %v", key, err)
}
}
}
}()
func() {
log.Printf("looking for states")
db := config.Get().DB()
states := map[string]map[config.State]struct{}{}
for _, msg := range messages {
key := fmt.Sprintf("states_%d", msg.Timestamp.Unix())
if !strings.HasPrefix(msg.Content, "!state") {
continue
}
if _, ok := states[msg.Sender]; ok {
continue
}
if _, err := db.Get(key); err == storage.ErrNotFound {
states[msg.Sender] = map[config.State]struct{}{}
for _, state := range parseOutStates([]byte(msg.Content)) {
states[msg.Sender][state] = struct{}{}
}
}
if err := db.Set(key, []byte{'k'}); err != nil {
log.Printf("failed to mark state gathered @%s: %v", key, err)
}
}
setNewStates(states)
}()
func() {
log.Printf("looking for pauses")
db := config.Get().DB()
pauses := map[string]time.Time{}
for _, msg := range messages {
key := fmt.Sprintf("pauses_%d", msg.Timestamp.Unix())
if !strings.HasPrefix(msg.Content, "!available ") {
continue
}
if _, ok := pauses[msg.Sender]; ok {
continue
}
if _, err := db.Get(key); err == storage.ErrNotFound {
t, err := time.ParseInLocation(
"2006-01-02",
strings.TrimSpace(strings.TrimPrefix(msg.Content, "!available ")),
time.Local,
)
if err == nil {
pauses[msg.Sender] = t
}
}
if err := db.Set(key, []byte{'k'}); err != nil {
log.Printf("failed to mark state gathered @%s: %v", key, err)
}
}
setNewPauses(pauses)
}()
return nil
}
func setNewPauses(pauses map[string]time.Time) {
if len(pauses) == 0 {
return
}
log.Printf("set new pauses: %+v", pauses)
conf := *config.Get()
changed := map[string]time.Time{}
for client, pause := range pauses {
clientconf := conf.Clients[client]
if clientconf.Available.Get().Unix() == pause.Unix() {
continue
}
clientconf.Available = config.Time(pause)
conf.Clients[client] = clientconf
changed[client] = pause
}
if len(changed) == 0 {
return
}
log.Printf("updating config new pauses: %+v", conf)
config.Set(conf)
for client, pause := range changed {
if err := sendNewPause(client, pause); err != nil {
log.Printf("failed to send new pause %s/%+v: %v", client, pause, err)
}
}
}
func setNewStates(states map[string]map[config.State]struct{}) {
if len(states) == 0 {
return
}
conf := *config.Get()
changed := map[string][]config.State{}
for client, clientStates := range states {
newstates := []config.State{}
for k := range clientStates {
newstates = append(newstates, k)
}
sort.Slice(newstates, func(i, j int) bool {
return newstates[i] < newstates[j]
})
clientconf := conf.Clients[client]
if fmt.Sprint(newstates) == fmt.Sprint(clientconf.States) {
message.NewMatrix().Send(fmt.Sprintf("%s: still searching for %+v", client, newstates))
continue
}
clientconf.States = newstates
conf.Clients[client] = clientconf
changed[client] = newstates
}
if len(changed) == 0 {
return
}
log.Printf("updating config new states: %+v", conf)
config.Set(conf)
for client, states := range changed {
if err := sendNewStates(client, states); err != nil {
log.Printf("failed to send new states %s/%+v: %v", client, states, err)
}
}
}
func parseOutStates(b []byte) []config.State {
var states []config.State
var state config.State
candidates := stateFinder.FindAll(b, -1)
for _, candidate := range candidates {
if len(candidate) != 2 {
continue
}
s := fmt.Sprintf(`"%s"`, bytes.ToUpper(candidate))
err := json.Unmarshal([]byte(s), &state)
if err != nil {
} else {
states = append(states, state)
}
}
return states
}
func _main() error {
for {
err := _mainOne()
if err != nil {
log.Println(err)
}
if config.Get().Once {
time.Sleep(time.Second)
return err
}
if err != nil {
time.Sleep(config.Get().Interval.Error.Get())
} else {
time.Sleep(config.Get().Interval.OK.Get())
}
}
return nil
}
func _mainOne() error {
log.Println("config.refreshing...")
if err := config.Refresh(); err != nil {
return err
}
log.Println("once...")
if err := once(); err != nil {
return err
}
log.Println("/_mainOne")
return nil
}
func once() error {
alljobs, err := getJobs()
if err != nil {
return err
}
newjobs, err := dropStaleJobs(alljobs)
if err != nil {
return err
}
jobs, err := dropBanlistJobs(newjobs)
if err != nil {
return err
}
for i := range jobs {
if err := sendJob(jobs[i]); err != nil {
return err
}
log.Println("sent job", jobs[i])
if err := config.Get().DB().Set(jobs[i].ID, []byte(`sent`)); err != nil {
return err
}
}
return nil
}
func getJobs() ([]broker.Job, error) {
states := config.AllStates()
ntg := broker.NewNTGVision()
if config.Get().Brokers.NTG.Mock {
ntg = ntg.WithMock()
}
brokers := []broker.Broker{ntg}
jobs := []broker.Job{}
for _, broker := range brokers {
somejobs, err := broker.Search(states)
if err != nil {
return nil, err
}
jobs = append(jobs, somejobs...)
}
return jobs, nil
}
func dropStaleJobs(jobs []broker.Job) ([]broker.Job, error) {
db := config.Get().DB()
for i := len(jobs) - 1; i >= 0; i-- {
if _, err := db.Get(jobs[i].ID); err == storage.ErrNotFound {
} else if err != nil {
return nil, err
} else {
jobs = append(jobs[:i], jobs[i+1:]...)
}
}
return jobs, nil
}
func dropBanlistJobs(jobs []broker.Job) ([]broker.Job, error) {
// TODO
return jobs, nil
}
func sendJob(job broker.Job) error {
sender := message.NewMatrix()
payload := job.FormatMultilineText()
if len(payload) == 0 {
return nil
}
if err := sender.Send(payload); err != nil {
return err
}
maps := config.Get().Maps
if maps.Pickup {
pickup := fmt.Sprintf("%s,%s", url.QueryEscape(job.Pickup.City), job.Pickup.State)
uri := fmt.Sprintf(maps.URIFormat, pickup, pickup)
log.Printf("sending pickup image: %s", uri)
if err := sender.SendImage(uri); err != nil {
return err
}
}
if maps.Dropoff {
dropoff := fmt.Sprintf("%s,%s", url.QueryEscape(job.Dropoff.City), job.Dropoff.State)
uri := fmt.Sprintf(maps.URIFormat, dropoff, dropoff)
log.Printf("sending dropoff image: %s", uri)
if err := sender.SendImage(uri); err != nil {
return err
}
}
return nil
}
func sendNewStates(client string, states []config.State) error {
sender := message.NewMatrix()
return sender.Send(fmt.Sprintf("%s: now searching for loads from: %+v", client, states))
}
func sendNewPause(client string, pause time.Time) error {
sender := message.NewMatrix()
return sender.Send(fmt.Sprintf("%s: only searching for loads on and after %s", client, pause.Format("2006-01-02")))
}