package main import ( "bytes" "encoding/json" "fmt" "local/storage" "local/truckstop/broker" "local/truckstop/config" "local/truckstop/message" "log" "net/url" "regexp" "sort" "strings" "sync" "time" ) var stateFinder = regexp.MustCompile(`[A-Za-z]+`) func main() { if err := config.Refresh(); err != nil { panic(err) } if config.Get().Message.Matrix.ReceiveEnabled { if err := matrixrecv(); err != nil { panic(err) } } lock := &sync.Mutex{} go func() { for { time.Sleep(config.Get().Interval.Input.Get()) if err := config.Refresh(); err != nil { log.Println(err) } else { if config.Get().Message.Matrix.ReceiveEnabled { lock.Lock() if err := matrixrecv(); err != nil { log.Print(err) } lock.Unlock() } } } }() if err := _main(); err != nil { panic(err) } lock.Lock() } func matrixrecv() error { log.Printf("checking matrix...") defer log.Printf("/checking matrix...") sender := message.NewMatrix() messages, err := sender.Receive() if err != nil { return err } func() { log.Printf("looking for help") printed := false for _, msg := range messages { if !strings.HasPrefix(msg.Content, "!help") { continue } key := fmt.Sprintf("help_%d", msg.Timestamp.Unix()) db := config.Get().DB() if !printed { if _, err := db.Get(key); err == storage.ErrNotFound { log.Printf("sending help") help := fmt.Sprintf("commands:\n `!help` print this help\n `!state nc NC nC Nc` set states for self\n `!available 2022-12-31` set date self is available for work\n\nrun a command for someone else: `!state ga @caleb`") if err := sender.Send(help); err != nil { log.Printf("failed to send help: %v", err) } else { printed = true if err := db.Set(key, []byte{'k'}); err != nil { log.Printf("failed to mark help given @%s: %v", key, err) } } } } else { if err := db.Set(key, []byte{'k'}); err != nil { log.Printf("failed to mark help given @%s: %v", key, err) } } } }() func() { log.Printf("looking for states") db := config.Get().DB() states := map[string]map[config.State]struct{}{} for _, msg := range messages { key := fmt.Sprintf("states_%d", msg.Timestamp.Unix()) if !strings.HasPrefix(msg.Content, "!state") { continue } if _, ok := states[msg.Sender]; ok { continue } if _, err := db.Get(key); err == storage.ErrNotFound { states[msg.Sender] = map[config.State]struct{}{} for _, state := range parseOutStates([]byte(msg.Content)) { states[msg.Sender][state] = struct{}{} } } if err := db.Set(key, []byte{'k'}); err != nil { log.Printf("failed to mark state gathered @%s: %v", key, err) } } setNewStates(states) }() func() { log.Printf("looking for pauses") db := config.Get().DB() pauses := map[string]time.Time{} for _, msg := range messages { key := fmt.Sprintf("pauses_%d", msg.Timestamp.Unix()) if !strings.HasPrefix(msg.Content, "!available ") { continue } if _, ok := pauses[msg.Sender]; ok { continue } if _, err := db.Get(key); err == storage.ErrNotFound { t, err := time.ParseInLocation( "2006-01-02", strings.TrimSpace(strings.TrimPrefix(msg.Content, "!available ")), time.Local, ) if err == nil { pauses[msg.Sender] = t } } if err := db.Set(key, []byte{'k'}); err != nil { log.Printf("failed to mark state gathered @%s: %v", key, err) } } setNewPauses(pauses) }() conf := *config.Get() if conf.Message.Matrix.Continuation != sender.Continuation() { conf.Message.Matrix.Continuation = sender.Continuation() config.Set(conf) } return nil } func setNewPauses(pauses map[string]time.Time) { if len(pauses) == 0 { return } log.Printf("set new pauses: %+v", pauses) conf := *config.Get() changed := map[string]time.Time{} for client, pause := range pauses { clientconf := conf.Clients[client] if clientconf.Available.Get().Unix() == pause.Unix() { continue } clientconf.Available = config.Time(pause) conf.Clients[client] = clientconf changed[client] = pause } if len(changed) == 0 { return } log.Printf("updating config new pauses: %+v", conf) config.Set(conf) for client, pause := range changed { if err := sendNewPause(client, pause); err != nil { log.Printf("failed to send new pause %s/%+v: %v", client, pause, err) } } } func setNewStates(states map[string]map[config.State]struct{}) { if len(states) == 0 { return } conf := *config.Get() changed := map[string][]config.State{} for client, clientStates := range states { newstates := []config.State{} for k := range clientStates { newstates = append(newstates, k) } sort.Slice(newstates, func(i, j int) bool { return newstates[i] < newstates[j] }) clientconf := conf.Clients[client] if fmt.Sprint(newstates) == fmt.Sprint(clientconf.States) { message.NewMatrix().Send(fmt.Sprintf("%s: still searching for %+v", client, newstates)) continue } clientconf.States = newstates conf.Clients[client] = clientconf changed[client] = newstates } if len(changed) == 0 { return } log.Printf("updating config new states: %+v", conf) config.Set(conf) for client, states := range changed { if err := sendNewStates(client, states); err != nil { log.Printf("failed to send new states %s/%+v: %v", client, states, err) } } } func parseOutStates(b []byte) []config.State { var states []config.State var state config.State candidates := stateFinder.FindAll(b, -1) for _, candidate := range candidates { if len(candidate) != 2 { continue } s := fmt.Sprintf(`"%s"`, bytes.ToUpper(candidate)) err := json.Unmarshal([]byte(s), &state) if err != nil { } else { states = append(states, state) } } return states } func _main() error { for { err := _mainOne() if err != nil { log.Println(err) } if config.Get().Once { time.Sleep(time.Second) return err } if err != nil { time.Sleep(config.Get().Interval.Error.Get()) } else { time.Sleep(config.Get().Interval.OK.Get()) } } return nil } func _mainOne() error { log.Println("config.refreshing...") if err := config.Refresh(); err != nil { return err } log.Println("once...") if err := once(); err != nil { return err } log.Println("/_mainOne") return nil } func once() error { alljobs, err := getJobs() if err != nil { return err } log.Printf("once: all jobs: %+v", alljobs) newjobs, err := dropStaleJobs(alljobs) if err != nil { return err } log.Printf("once: new jobs: %+v", newjobs) jobs, err := dropBanlistJobs(newjobs) if err != nil { return err } log.Printf("once: sending jobs: %+v", jobs) for i := range jobs { if ok, err := sendJob(jobs[i]); err != nil { return err } else if ok { log.Println("sent job", jobs[i]) if err := config.Get().DB().Set(jobs[i].ID, []byte(`sent`)); err != nil { return err } } } return nil } func getJobs() ([]broker.Job, error) { states := config.AllStates() ntg := broker.NewNTGVision() if config.Get().Brokers.NTG.Mock { ntg = ntg.WithMock() } brokers := []broker.Broker{ntg} jobs := []broker.Job{} for _, broker := range brokers { somejobs, err := broker.Search(states) if err != nil { return nil, err } jobs = append(jobs, somejobs...) } return jobs, nil } func dropStaleJobs(jobs []broker.Job) ([]broker.Job, error) { db := config.Get().DB() for i := len(jobs) - 1; i >= 0; i-- { if _, err := db.Get(jobs[i].ID); err == storage.ErrNotFound { } else if err != nil { return nil, err } else { jobs = append(jobs[:i], jobs[i+1:]...) } } return jobs, nil } func dropBanlistJobs(jobs []broker.Job) ([]broker.Job, error) { // TODO return jobs, nil } func sendJob(job broker.Job) (bool, error) { sender := message.NewMatrix() payload := job.FormatMultilineText() log.Printf("once: send job %s if nonzero: %s", job.String(), payload) if len(payload) == 0 { return false, nil } if err := sender.Send(payload); err != nil { return false, err } maps := config.Get().Maps if maps.Pickup { pickup := fmt.Sprintf("%s,%s", url.QueryEscape(job.Pickup.City), job.Pickup.State) uri := fmt.Sprintf(maps.URIFormat, pickup, pickup) log.Printf("sending pickup image: %s", uri) if err := sender.SendImage(uri); err != nil { return true, err } } if maps.Dropoff { dropoff := fmt.Sprintf("%s,%s", url.QueryEscape(job.Dropoff.City), job.Dropoff.State) uri := fmt.Sprintf(maps.URIFormat, dropoff, dropoff) log.Printf("sending dropoff image: %s", uri) if err := sender.SendImage(uri); err != nil { return true, err } } return true, nil } func sendNewStates(client string, states []config.State) error { sender := message.NewMatrix() return sender.Send(fmt.Sprintf("%s: now searching for loads from: %+v", client, states)) } func sendNewPause(client string, pause time.Time) error { sender := message.NewMatrix() return sender.Send(fmt.Sprintf("%s: only searching for loads on and after %s", client, pause.Format("2006-01-02"))) }