207 lines
4.1 KiB
Go
207 lines
4.1 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"local/storage"
|
|
"local/truckstop/broker"
|
|
"local/truckstop/config"
|
|
"local/truckstop/message"
|
|
"log"
|
|
"regexp"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
var stateFinder = regexp.MustCompile(`[A-Za-z]+`)
|
|
|
|
func main() {
|
|
lock := &sync.Mutex{}
|
|
go func() {
|
|
for {
|
|
time.Sleep(config.Get().Interval.Input.Get())
|
|
if err := config.Refresh(); err != nil {
|
|
log.Println(err)
|
|
} else {
|
|
if config.Get().Message.Matrix.ReceiveEnabled {
|
|
lock.Lock()
|
|
if err := matrixrecv(); err != nil {
|
|
log.Print(err)
|
|
}
|
|
lock.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
if err := _main(); err != nil {
|
|
panic(err)
|
|
}
|
|
lock.Lock()
|
|
}
|
|
|
|
func matrixrecv() error {
|
|
log.Printf("checking matrix...")
|
|
defer log.Printf("/checking matrix...")
|
|
sender := message.NewMatrix()
|
|
messages, err := sender.Receive()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
states := map[config.State]struct{}{}
|
|
for _, msg := range messages {
|
|
if len(states) > 0 {
|
|
continue
|
|
}
|
|
for _, state := range parseOutStates([]byte(msg)) {
|
|
states[state] = struct{}{}
|
|
}
|
|
}
|
|
setNewStates(states)
|
|
return nil
|
|
}
|
|
|
|
func setNewStates(states map[config.State]struct{}) {
|
|
if len(states) == 0 {
|
|
return
|
|
}
|
|
newstates := []config.State{}
|
|
for k := range states {
|
|
newstates = append(newstates, k)
|
|
}
|
|
sort.Slice(newstates, func(i, j int) bool {
|
|
return newstates[i] < newstates[j]
|
|
})
|
|
conf := *config.Get()
|
|
if fmt.Sprint(newstates) == fmt.Sprint(conf.States) {
|
|
return
|
|
}
|
|
conf.States = newstates
|
|
log.Printf("updating config new states: %+v", conf)
|
|
config.Set(conf)
|
|
if err := sendNewStates(conf.States); err != nil {
|
|
log.Printf("failed to send new states %+v: %v", conf.States, err)
|
|
}
|
|
}
|
|
|
|
func parseOutStates(b []byte) []config.State {
|
|
var states []config.State
|
|
var state config.State
|
|
candidates := stateFinder.FindAll(b, -1)
|
|
for _, candidate := range candidates {
|
|
if len(candidate) != 2 {
|
|
continue
|
|
}
|
|
s := fmt.Sprintf(`"%s"`, bytes.ToUpper(candidate))
|
|
err := json.Unmarshal([]byte(s), &state)
|
|
if err != nil {
|
|
} else {
|
|
states = append(states, state)
|
|
}
|
|
}
|
|
return states
|
|
}
|
|
|
|
func _main() error {
|
|
for {
|
|
err := _mainOne()
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
if config.Get().Once {
|
|
return err
|
|
}
|
|
if err != nil {
|
|
time.Sleep(config.Get().Interval.Error.Get())
|
|
} else {
|
|
time.Sleep(config.Get().Interval.OK.Get())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func _mainOne() error {
|
|
log.Println("config.refreshing...")
|
|
if err := config.Refresh(); err != nil {
|
|
return err
|
|
}
|
|
log.Println("once...")
|
|
if err := once(); err != nil {
|
|
return err
|
|
}
|
|
log.Println("/_mainOne")
|
|
return nil
|
|
}
|
|
|
|
func once() error {
|
|
alljobs, err := getJobs()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
newjobs, err := dropStaleJobs(alljobs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobs, err := dropBanlistJobs(newjobs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range jobs {
|
|
if err := sendJob(jobs[i]); err != nil {
|
|
return err
|
|
}
|
|
log.Println("sent job", jobs[i])
|
|
if err := config.Get().DB().Set(jobs[i].ID, []byte(`sent`)); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func getJobs() ([]broker.Job, error) {
|
|
states := config.Get().States
|
|
ntg := broker.NewNTGVision()
|
|
if config.Get().Brokers.NTG.Mock {
|
|
ntg = ntg.WithMock()
|
|
}
|
|
brokers := []broker.Broker{ntg}
|
|
jobs := []broker.Job{}
|
|
for _, broker := range brokers {
|
|
somejobs, err := broker.Search(states)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, somejobs...)
|
|
}
|
|
return jobs, nil
|
|
}
|
|
|
|
func dropStaleJobs(jobs []broker.Job) ([]broker.Job, error) {
|
|
db := config.Get().DB()
|
|
for i := len(jobs) - 1; i >= 0; i-- {
|
|
if _, err := db.Get(jobs[i].ID); err == storage.ErrNotFound {
|
|
} else if err != nil {
|
|
return nil, err
|
|
} else {
|
|
jobs = append(jobs[:i], jobs[i+1:]...)
|
|
}
|
|
}
|
|
return jobs, nil
|
|
}
|
|
|
|
func dropBanlistJobs(jobs []broker.Job) ([]broker.Job, error) {
|
|
// TODO
|
|
return jobs, nil
|
|
}
|
|
|
|
func sendJob(job broker.Job) error {
|
|
sender := message.NewMatrix()
|
|
return sender.Send(job.FormatMultilineText())
|
|
}
|
|
|
|
func sendNewStates(states []config.State) error {
|
|
sender := message.NewMatrix()
|
|
return sender.Send(fmt.Sprintf("now searching for loads from: %+v", states))
|
|
}
|