truckstop/main.go

687 lines
18 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"gogs.inhome.blapointe.com/local/storage"
"gogs.inhome.blapointe.com/local/truckstop/broker"
"gogs.inhome.blapointe.com/local/truckstop/config"
"gogs.inhome.blapointe.com/local/truckstop/logtr"
"gogs.inhome.blapointe.com/local/truckstop/message"
"net/http"
"net/url"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
)
var stateFinder = regexp.MustCompile(`[A-Za-z]+`)
var zipFinder = regexp.MustCompile(`[0-9]{5}[0-9]*`)
func main() {
if err := _main(); err != nil {
panic(err)
}
}
func _main() error {
if err := config.Refresh(message.NewSOSMatrix); err != nil {
return err
}
if config.Get().Message.Matrix.ReceiveEnabled {
if err := matrixrecv(); err != nil {
logtr.SOSf("failed to recv matrix on boot: %v", err)
return err
}
}
lock := &sync.Mutex{}
go func() {
for {
time.Sleep(config.Get().Interval.Input.Get())
if err := config.Refresh(message.NewSOSMatrix); err != nil {
logtr.Errorf("failed parsing config: %v", err)
} else {
if config.Get().Message.Matrix.ReceiveEnabled {
lock.Lock()
if err := matrixrecv(); err != nil {
logtr.Errorf("failed receiving and parsing matrix: %v", err)
}
lock.Unlock()
}
}
}
}()
if err := __main(); err != nil {
logtr.SOSf("failed __main: %v", err)
return err
}
lock.Lock()
return nil
}
func matrixrecv() error {
logtr.Verbosef("checking matrix...")
defer logtr.Verbosef("/checking matrix...")
sender := message.NewMatrix()
messages, err := sender.Receive()
if err != nil {
return err
}
func() {
logtr.Verbosef("looking for help")
printed := false
for _, msg := range messages {
if !strings.HasPrefix(msg.Content, "!help") {
continue
}
key := fmt.Sprintf("help_%d", msg.Timestamp.Unix())
db := config.Get().DB()
if !printed {
if _, err := db.Get(key); err == storage.ErrNotFound {
logtr.Debugf("sending help")
locationHelp := "`!state nc NC nC Nc` set states for self"
if config.Get().Brokers.UseZips {
locationHelp = "...`!zip 27006 84058` to set zip codes for self\n...`!radius 100` to set miles radius to 100, 200, or 300"
}
help := fmt.Sprintf("commands:\n...`!help` print this help\n%s\n\nrun a command for someone else: `!zip 2022-12-31 @caleb`", locationHelp)
if err := sender.Send(help); err != nil {
logtr.Errorf("failed to send help: %v", err)
} else {
printed = true
if err := db.Set(key, []byte{'k'}); err != nil {
logtr.Errorf("failed to mark help given @%s: %v", key, err)
}
}
}
} else {
if err := db.Set(key, []byte{'k'}); err != nil {
logtr.Errorf("failed to mark help given @%s: %v", key, err)
}
}
}
}()
func() {
logtr.Verbosef("looking for zips")
db := config.Get().DB()
zips := map[string]map[string]struct{}{}
for _, msg := range messages {
key := fmt.Sprintf("zips_%d", msg.Timestamp.Unix())
if !strings.HasPrefix(msg.Content, "!zip") {
continue
}
if _, ok := zips[msg.Sender]; ok {
continue
}
if _, err := db.Get(key); err == storage.ErrNotFound {
zips[msg.Sender] = map[string]struct{}{}
for _, zip := range parseOutZips([]byte(msg.Content)) {
zips[msg.Sender][zip] = struct{}{}
}
}
if err := db.Set(key, []byte{'k'}); err != nil {
logtr.Errorf("failed to mark state gathered @%s: %v", key, err)
}
}
if config.Get().Brokers.UseZips {
setNewZips(zips)
}
}()
func() {
logtr.Verbosef("looking for states")
db := config.Get().DB()
states := map[string]map[config.State]struct{}{}
for _, msg := range messages {
key := fmt.Sprintf("states_%d", msg.Timestamp.Unix())
if !strings.HasPrefix(msg.Content, "!state") {
continue
}
if _, ok := states[msg.Sender]; ok {
continue
}
if _, err := db.Get(key); err == storage.ErrNotFound {
states[msg.Sender] = map[config.State]struct{}{}
for _, state := range parseOutStates([]byte(msg.Content)) {
states[msg.Sender][state] = struct{}{}
}
}
if err := db.Set(key, []byte{'k'}); err != nil {
logtr.Errorf("failed to mark state gathered @%s: %v", key, err)
}
}
if !config.Get().Brokers.UseZips {
setNewStates(states)
}
}()
func() {
logtr.Verbosef("looking for radius")
db := config.Get().DB()
var radius *int
for _, msg := range messages {
key := fmt.Sprintf("radius_%d", msg.Timestamp.Unix())
if !strings.HasPrefix(msg.Content, "!radius ") {
continue
}
if _, err := db.Get(key); err == storage.ErrNotFound {
cmd := msg.Content
for strings.HasPrefix(cmd, "!radius ") {
cmd = strings.TrimPrefix(cmd, "!radius ")
}
n, err := strconv.Atoi(strings.TrimSpace(cmd))
if err == nil {
radius = &n
}
}
if err := db.Set(key, []byte{'k'}); err != nil {
logtr.Errorf("failed to mark radius gathered @%s: %v", key, err)
}
}
setNewRadius(radius)
}()
func() {
logtr.Verbosef("looking for pauses")
db := config.Get().DB()
pauses := map[string]time.Time{}
for _, msg := range messages {
key := fmt.Sprintf("pauses_%d", msg.Timestamp.Unix())
if !strings.HasPrefix(msg.Content, "!available ") {
continue
}
if _, ok := pauses[msg.Sender]; ok {
continue
}
if _, err := db.Get(key); err == storage.ErrNotFound {
t, err := time.ParseInLocation(
"2006-01-02",
strings.TrimSpace(strings.TrimPrefix(msg.Content, "!available ")),
time.Local,
)
if err == nil {
pauses[msg.Sender] = t
}
}
if err := db.Set(key, []byte{'k'}); err != nil {
logtr.Errorf("failed to mark pause gathered @%s: %v", key, err)
}
}
setNewPauses(pauses)
}()
message.SetMatrixContinuation(sender.Continuation())
return nil
}
func setNewRadius(radius *int) {
if radius == nil {
return
}
sender := message.NewMatrix()
logtr.Debugf("set new radius: %d", *radius)
acceptable := map[int]struct{}{
100: struct{}{},
200: struct{}{},
300: struct{}{},
}
if _, ok := acceptable[*radius]; !ok {
sender.Send(fmt.Sprintf("radius of %v is not among acceptable %+v", *radius, acceptable))
logtr.Debugf("bad radius, not setting: %d", *radius)
return
}
conf := *config.Get()
if conf.Brokers.RadiusMiles == *radius {
logtr.Debugf("noop radius, not setting: %d", *radius)
return
}
conf.Brokers.RadiusMiles = *radius
logtr.Infof("updating config new pauses: %+v", conf)
config.Set(conf)
sender.Send(fmt.Sprintf("now using radius of %d mi for zip code search", *radius))
}
func setNewPauses(pauses map[string]time.Time) {
if len(pauses) == 0 {
return
}
logtr.Debugf("set new pauses: %+v", pauses)
conf := *config.Get()
changed := map[string]time.Time{}
for client, pause := range pauses {
clientconf := conf.Clients[client]
if clientconf.Available.Get().Unix() == pause.Unix() {
continue
}
clientconf.Available = config.Time(pause)
conf.Clients[client] = clientconf
changed[client] = pause
}
if len(changed) == 0 {
return
}
logtr.Infof("updating config new pauses: %+v", conf)
config.Set(conf)
for client, pause := range changed {
if err := sendNewPause(client, pause); err != nil {
logtr.Errorf("failed to send new pause %s/%+v: %v", client, pause, err)
}
}
}
func setNewZips(zips map[string]map[string]struct{}) {
if len(zips) == 0 {
return
}
conf := *config.Get()
changed := map[string][]string{}
for client, clientZips := range zips {
newzips := []string{}
for k := range clientZips {
newzips = append(newzips, k)
}
sort.Slice(newzips, func(i, j int) bool {
return newzips[i] < newzips[j]
})
clientconf := conf.Clients[client]
if fmt.Sprint(newzips) == fmt.Sprint(clientconf.Zips) {
message.NewMatrix().Send(fmt.Sprintf("%s: still searching for %+v", client, newzips))
continue
}
clientconf.Zips = newzips
conf.Clients[client] = clientconf
changed[client] = newzips
}
if len(changed) == 0 {
return
}
logtr.Infof("updating config new zips: %+v", conf)
config.Set(conf)
for client, zips := range changed {
if err := sendNewZips(client, zips); err != nil {
logtr.Errorf("failed to send new zips %s/%+v: %v", client, zips, err)
}
}
}
func setNewStates(states map[string]map[config.State]struct{}) {
if len(states) == 0 {
return
}
conf := *config.Get()
changed := map[string][]config.State{}
for client, clientStates := range states {
newstates := []config.State{}
for k := range clientStates {
newstates = append(newstates, k)
}
sort.Slice(newstates, func(i, j int) bool {
return newstates[i] < newstates[j]
})
clientconf := conf.Clients[client]
if fmt.Sprint(newstates) == fmt.Sprint(clientconf.States) {
message.NewMatrix().Send(fmt.Sprintf("%s: still searching for %+v", client, newstates))
continue
}
clientconf.States = newstates
conf.Clients[client] = clientconf
changed[client] = newstates
}
if len(changed) == 0 {
return
}
logtr.Infof("updating config new states: %+v", conf)
config.Set(conf)
for client, states := range changed {
if err := sendNewStates(client, states); err != nil {
logtr.Errorf("failed to send new states %s/%+v: %v", client, states, err)
}
}
}
func parseOutZips(b []byte) []string {
var zips []string
candidates := zipFinder.FindAll(b, -1)
for _, candidate := range candidates {
if len(candidate) != 5 {
continue
}
zips = append(zips, string(candidate))
}
return zips
}
func parseOutStates(b []byte) []config.State {
var states []config.State
var state config.State
candidates := stateFinder.FindAll(b, -1)
for _, candidate := range candidates {
if len(candidate) != 2 {
continue
}
s := fmt.Sprintf(`"%s"`, bytes.ToUpper(candidate))
err := json.Unmarshal([]byte(s), &state)
if err != nil {
} else {
states = append(states, state)
}
}
return states
}
func __main() error {
for {
err := __mainOne()
if err != nil {
logtr.Errorf("failed _main: %v", err)
}
if config.Get().Once {
time.Sleep(10 * time.Second)
return err
}
if err != nil {
time.Sleep(config.Get().Interval.Error.Get())
} else {
time.Sleep(config.Get().Interval.OK.Get())
}
}
return nil
}
func __mainOne() error {
logtr.Debugf("config.refreshing...")
if err := config.Refresh(message.NewSOSMatrix); err != nil {
logtr.SOSf("bad config: %v", err)
return err
}
logtr.Debugf("once...")
if err := once(); err != nil {
logtr.SOSf("failed once(): %v", err)
return err
}
logtr.Debugf("/__mainOne")
return nil
}
func once() error {
alljobs, err := getJobs()
if err != nil {
return err
}
logtr.Debugf("once: update dead jobs: %+v", alljobs)
err = updateDeadJobs(alljobs)
if err != nil {
return err
}
logtr.Debugf("once: all jobs: %+v", alljobs)
newjobs, err := dropStaleJobs(alljobs)
if err != nil {
return err
}
logtr.Debugf("once: new jobs: %+v", newjobs)
jobs, err := dropBanlistJobs(newjobs)
if err != nil {
return err
}
logtr.Debugf("found jobs: %+v", jobs)
if len(jobs) == 0 {
return nil
}
logtr.Debugf("once: loading job secrets: %+v", jobs)
for i := range jobs {
jobs[i].Secrets()
}
logtr.Infof("once: sending jobs: %+v", jobs)
db := config.Get().DB()
for i := range jobs {
if ok, err := sendJob(jobs[i]); err != nil {
return err
} else if ok {
logtr.Debugf("sent job %+v", jobs[i])
if err := db.Set(jobs[i].UID(), []byte(`sent`)); err != nil {
return err
}
}
}
return nil
}
func getJobs() ([]broker.Job, error) {
if config.Get().Brokers.UseZips {
return getJobsZips()
}
return getJobsStates()
}
func getJobsZips() ([]broker.Job, error) {
zips := config.AllZips()
jobs := []broker.Job{}
for _, broker := range getBrokers() {
somejobs, err := broker.SearchZips(zips)
if err != nil {
return nil, err
}
jobs = append(jobs, somejobs...)
}
return jobs, nil
}
func getJobsStates() ([]broker.Job, error) {
states := config.AllStates()
jobs := []broker.Job{}
for _, broker := range getBrokers() {
somejobs, err := broker.SearchStates(states)
if err != nil {
return nil, err
}
jobs = append(jobs, somejobs...)
}
return jobs, nil
}
func getBrokers() []broker.Broker {
brokers := []broker.Broker{}
if config.Get().Brokers.NTG.Enabled {
logtr.Debugf("NTG enabled")
brokers = append(brokers, broker.NewNTGVision())
}
if config.Get().Brokers.FastExact.Enabled {
logtr.Debugf("FastExact enabled")
brokers = append(brokers, broker.NewFastExact())
}
return brokers
}
type recordedJob struct {
Job broker.Job
SentNS int64
MatrixID string
MatrixImageIDs []string
}
func updateDeadJobs(jobs []broker.Job) error {
db := config.Get().DB()
list, err := db.List([]string{}, "sent_job_", "sent_job_}}")
if err != nil {
return err
}
for _, listEntry := range list {
wouldBe := strings.TrimPrefix(listEntry, "sent_job_")
found := false
for i := range jobs {
found = found || jobs[i].UID() == wouldBe || jobs[i].ID == wouldBe
}
logtr.Debugf("found job %s to be still alive==%v", wouldBe, found)
if !found {
logtr.Debugf("updating dead job %+v", listEntry)
b, err := db.Get(listEntry)
if err != nil {
return err
}
var recorded recordedJob
if err := json.Unmarshal(b, &recorded); err != nil {
return err
}
/* // TODO this beeps on fluffychat
if err := message.NewMatrix().Update(recorded.MatrixID, recorded.Job.FormatMultilineTextDead()); err != nil {
return err
}
*/
if err := db.Set(listEntry, nil); err != nil {
logtr.Debugf("failed to remove db: %v: %v", listEntry, err)
}
for _, imageid := range recorded.MatrixImageIDs {
if err := message.NewMatrix().Update(imageid, "<job no longer available>"); err != nil {
logtr.Debugf("failed to update matrix image: %v: %v", imageid, err)
}
}
}
}
return nil
}
func dropStaleJobs(jobs []broker.Job) ([]broker.Job, error) {
db := config.Get().DB()
for i := len(jobs) - 1; i >= 0; i-- {
if _, err := db.Get(jobs[i].UID()); err == storage.ErrNotFound {
} else if err != nil {
return nil, err
} else {
jobs = append(jobs[:i], jobs[i+1:]...)
}
}
return jobs, nil
}
func dropBanlistJobs(jobs []broker.Job) ([]broker.Job, error) {
// TODO
return jobs, nil
}
func sendJob(job broker.Job) (bool, error) {
sender := message.NewMatrix()
payload := job.FormatMultilineText()
logtr.Debugf("once: send job %s if nonzero: %q", job.String(), payload)
if len(payload) == 0 {
return false, nil
}
id, err := sender.SendTracked(payload)
if err != nil {
return false, err
}
recordedJob := recordedJob{
Job: job,
SentNS: time.Now().UnixNano(),
MatrixID: id,
}
defer func() {
db := config.Get().DB()
b, err := json.Marshal(recordedJob)
if err != nil {
logtr.Errorf("failed to marshal recorded job: %v", err)
return
}
if err := db.Set("sent_job_"+job.UID(), b); err != nil {
logtr.Errorf("failed to set recorded job: %v", err)
return
}
}()
maps := config.Get().Maps
pickup := fmt.Sprintf("%s,%s", url.QueryEscape(job.Pickup.City), job.Pickup.State)
dropoff := fmt.Sprintf("%s,%s", url.QueryEscape(job.Dropoff.City), job.Dropoff.State)
if maps.Pathed.Enabled {
directionsURI := fmt.Sprintf(maps.Pathed.DirectionsURIFormat, pickup, dropoff)
resp, err := http.Get(directionsURI)
if err != nil {
return true, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return true, fmt.Errorf("bad status getting path: %d", resp.StatusCode)
}
var directionsResp struct {
Routes []struct {
Legs []struct {
Steps []struct {
StartLocation struct {
Lat float32 `json:"lat"`
Lng float32 `json:"lng"`
} `json:"start_location"`
} `json:"steps"`
} `json:"legs"`
} `json:"routes"`
}
if err := json.NewDecoder(resp.Body).Decode(&directionsResp); err != nil {
return true, err
}
if len(directionsResp.Routes) < 1 || len(directionsResp.Routes[0].Legs) < 1 || len(directionsResp.Routes[0].Legs[0].Steps) < 1 {
} else {
latLngPath := make([]string, 0)
minLat := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lat
maxLat := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lat
minLng := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lng
maxLng := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lng
for _, v := range directionsResp.Routes[0].Legs[0].Steps {
if v.StartLocation.Lng < minLng {
minLng = v.StartLocation.Lng
}
if v.StartLocation.Lng > maxLng {
maxLng = v.StartLocation.Lng
}
if v.StartLocation.Lat < minLat {
minLat = v.StartLocation.Lat
}
if v.StartLocation.Lat > maxLat {
maxLat = v.StartLocation.Lat
}
latLngPath = append(latLngPath, fmt.Sprintf("%.9f,%.9f", v.StartLocation.Lat, v.StartLocation.Lng))
}
pathQuery := strings.Join(latLngPath, "|")
uri := fmt.Sprintf(maps.Pathed.PathedURIFormat, pathQuery, pickup, dropoff)
// if the bigger delta is <acceptable, override zoom
if maxLat-minLat <= maps.Pathed.Zoom.AcceptableLatLngDelta && maxLng-minLng <= maps.Pathed.Zoom.AcceptableLatLngDelta {
uri = fmt.Sprintf("%s&zoom=%d", uri, maps.Pathed.Zoom.Override)
}
logtr.Debugf("sending pathed image: %s", uri)
pathedid, err := sender.SendImageTracked(uri)
if err != nil {
return true, err
}
recordedJob.MatrixImageIDs = append(recordedJob.MatrixImageIDs, pathedid)
}
}
if maps.Pickup {
uri := fmt.Sprintf(maps.URIFormat, pickup, pickup)
logtr.Debugf("sending pickup image: %s", uri)
pickupid, err := sender.SendImageTracked(uri)
if err != nil {
return true, err
}
recordedJob.MatrixImageIDs = append(recordedJob.MatrixImageIDs, pickupid)
}
if maps.Dropoff {
uri := fmt.Sprintf(maps.URIFormat, dropoff, dropoff)
logtr.Debugf("sending dropoff image: %s", uri)
dropid, err := sender.SendImageTracked(uri)
if err != nil {
return true, err
}
recordedJob.MatrixImageIDs = append(recordedJob.MatrixImageIDs, dropid)
}
return true, nil
}
func sendNewZips(client string, zips []string) error {
sender := message.NewMatrix()
return sender.Send(fmt.Sprintf("%s: now searching for loads from zip codes: %+v", client, zips))
}
func sendNewStates(client string, states []config.State) error {
sender := message.NewMatrix()
return sender.Send(fmt.Sprintf("%s: now searching for loads from states: %+v", client, states))
}
func sendNewPause(client string, pause time.Time) error {
sender := message.NewMatrix()
return sender.Send(fmt.Sprintf("%s: only searching for loads on and after %s", client, pause.Format("2006-01-02")))
}