package main import ( "bytes" "encoding/json" "fmt" "local/storage" "local/truckstop/broker" "local/truckstop/config" "local/truckstop/logtr" "local/truckstop/message" "net/http" "net/url" "regexp" "sort" "strconv" "strings" "sync" "time" ) var stateFinder = regexp.MustCompile(`[A-Za-z]+`) var zipFinder = regexp.MustCompile(`[0-9]{5}[0-9]*`) func main() { if err := _main(); err != nil { panic(err) } } func _main() error { if err := config.Refresh(message.NewSOSMatrix); err != nil { return err } if config.Get().Message.Matrix.ReceiveEnabled { if err := matrixrecv(); err != nil { logtr.SOSf("failed to recv matrix on boot: %v", err) return err } } lock := &sync.Mutex{} go func() { for { time.Sleep(config.Get().Interval.Input.Get()) if err := config.Refresh(message.NewSOSMatrix); err != nil { logtr.Errorf("failed parsing config: %v", err) } else { if config.Get().Message.Matrix.ReceiveEnabled { lock.Lock() if err := matrixrecv(); err != nil { logtr.Errorf("failed receiving and parsing matrix: %v", err) } lock.Unlock() } } } }() if err := __main(); err != nil { logtr.SOSf("failed __main: %v", err) return err } lock.Lock() return nil } func matrixrecv() error { logtr.Verbosef("checking matrix...") defer logtr.Verbosef("/checking matrix...") sender := message.NewMatrix() messages, err := sender.Receive() if err != nil { return err } func() { logtr.Verbosef("looking for help") printed := false for _, msg := range messages { if !strings.HasPrefix(msg.Content, "!help") { continue } key := fmt.Sprintf("help_%d", msg.Timestamp.Unix()) db := config.Get().DB() if !printed { if _, err := db.Get(key); err == storage.ErrNotFound { logtr.Debugf("sending help") locationHelp := "`!state nc NC nC Nc` set states for self" if config.Get().Brokers.UseZips { locationHelp = "...`!zip 27006 84058` to set zip codes for self\n...`!radius 100` to set miles radius to 100, 200, or 300" } help := fmt.Sprintf("commands:\n...`!help` print this help\n%s\n\nrun a command for someone else: `!zip 2022-12-31 @caleb`", locationHelp) if err := sender.Send(help); err != nil { logtr.Errorf("failed to send help: %v", err) } else { printed = true if err := db.Set(key, []byte{'k'}); err != nil { logtr.Errorf("failed to mark help given @%s: %v", key, err) } } } } else { if err := db.Set(key, []byte{'k'}); err != nil { logtr.Errorf("failed to mark help given @%s: %v", key, err) } } } }() func() { logtr.Verbosef("looking for zips") db := config.Get().DB() zips := map[string]map[string]struct{}{} for _, msg := range messages { key := fmt.Sprintf("zips_%d", msg.Timestamp.Unix()) if !strings.HasPrefix(msg.Content, "!zip") { continue } if _, ok := zips[msg.Sender]; ok { continue } if _, err := db.Get(key); err == storage.ErrNotFound { zips[msg.Sender] = map[string]struct{}{} for _, zip := range parseOutZips([]byte(msg.Content)) { zips[msg.Sender][zip] = struct{}{} } } if err := db.Set(key, []byte{'k'}); err != nil { logtr.Errorf("failed to mark state gathered @%s: %v", key, err) } } if config.Get().Brokers.UseZips { setNewZips(zips) } }() func() { logtr.Verbosef("looking for states") db := config.Get().DB() states := map[string]map[config.State]struct{}{} for _, msg := range messages { key := fmt.Sprintf("states_%d", msg.Timestamp.Unix()) if !strings.HasPrefix(msg.Content, "!state") { continue } if _, ok := states[msg.Sender]; ok { continue } if _, err := db.Get(key); err == storage.ErrNotFound { states[msg.Sender] = map[config.State]struct{}{} for _, state := range parseOutStates([]byte(msg.Content)) { states[msg.Sender][state] = struct{}{} } } if err := db.Set(key, []byte{'k'}); err != nil { logtr.Errorf("failed to mark state gathered @%s: %v", key, err) } } if !config.Get().Brokers.UseZips { setNewStates(states) } }() func() { logtr.Verbosef("looking for radius") db := config.Get().DB() var radius *int for _, msg := range messages { key := fmt.Sprintf("radius_%d", msg.Timestamp.Unix()) if !strings.HasPrefix(msg.Content, "!radius ") { continue } if _, err := db.Get(key); err == storage.ErrNotFound { cmd := msg.Content for strings.HasPrefix(cmd, "!radius ") { cmd = strings.TrimPrefix(cmd, "!radius ") } n, err := strconv.Atoi(strings.TrimSpace(cmd)) if err == nil { radius = &n } } if err := db.Set(key, []byte{'k'}); err != nil { logtr.Errorf("failed to mark radius gathered @%s: %v", key, err) } } setNewRadius(radius) }() func() { logtr.Verbosef("looking for pauses") db := config.Get().DB() pauses := map[string]time.Time{} for _, msg := range messages { key := fmt.Sprintf("pauses_%d", msg.Timestamp.Unix()) if !strings.HasPrefix(msg.Content, "!available ") { continue } if _, ok := pauses[msg.Sender]; ok { continue } if _, err := db.Get(key); err == storage.ErrNotFound { t, err := time.ParseInLocation( "2006-01-02", strings.TrimSpace(strings.TrimPrefix(msg.Content, "!available ")), time.Local, ) if err == nil { pauses[msg.Sender] = t } } if err := db.Set(key, []byte{'k'}); err != nil { logtr.Errorf("failed to mark pause gathered @%s: %v", key, err) } } setNewPauses(pauses) }() message.SetMatrixContinuation(sender.Continuation()) return nil } func setNewRadius(radius *int) { if radius == nil { return } sender := message.NewMatrix() logtr.Debugf("set new radius: %d", *radius) acceptable := map[int]struct{}{ 100: struct{}{}, 200: struct{}{}, 300: struct{}{}, } if _, ok := acceptable[*radius]; !ok { sender.Send(fmt.Sprintf("radius of %v is not among acceptable %+v", *radius, acceptable)) logtr.Debugf("bad radius, not setting: %d", *radius) return } conf := *config.Get() if conf.Brokers.RadiusMiles == *radius { logtr.Debugf("noop radius, not setting: %d", *radius) return } conf.Brokers.RadiusMiles = *radius logtr.Infof("updating config new pauses: %+v", conf) config.Set(conf) sender.Send(fmt.Sprintf("now using radius of %d mi for zip code search", *radius)) } func setNewPauses(pauses map[string]time.Time) { if len(pauses) == 0 { return } logtr.Debugf("set new pauses: %+v", pauses) conf := *config.Get() changed := map[string]time.Time{} for client, pause := range pauses { clientconf := conf.Clients[client] if clientconf.Available.Get().Unix() == pause.Unix() { continue } clientconf.Available = config.Time(pause) conf.Clients[client] = clientconf changed[client] = pause } if len(changed) == 0 { return } logtr.Infof("updating config new pauses: %+v", conf) config.Set(conf) for client, pause := range changed { if err := sendNewPause(client, pause); err != nil { logtr.Errorf("failed to send new pause %s/%+v: %v", client, pause, err) } } } func setNewZips(zips map[string]map[string]struct{}) { if len(zips) == 0 { return } conf := *config.Get() changed := map[string][]string{} for client, clientZips := range zips { newzips := []string{} for k := range clientZips { newzips = append(newzips, k) } sort.Slice(newzips, func(i, j int) bool { return newzips[i] < newzips[j] }) clientconf := conf.Clients[client] if fmt.Sprint(newzips) == fmt.Sprint(clientconf.Zips) { message.NewMatrix().Send(fmt.Sprintf("%s: still searching for %+v", client, newzips)) continue } clientconf.Zips = newzips conf.Clients[client] = clientconf changed[client] = newzips } if len(changed) == 0 { return } logtr.Infof("updating config new zips: %+v", conf) config.Set(conf) for client, zips := range changed { if err := sendNewZips(client, zips); err != nil { logtr.Errorf("failed to send new zips %s/%+v: %v", client, zips, err) } } } func setNewStates(states map[string]map[config.State]struct{}) { if len(states) == 0 { return } conf := *config.Get() changed := map[string][]config.State{} for client, clientStates := range states { newstates := []config.State{} for k := range clientStates { newstates = append(newstates, k) } sort.Slice(newstates, func(i, j int) bool { return newstates[i] < newstates[j] }) clientconf := conf.Clients[client] if fmt.Sprint(newstates) == fmt.Sprint(clientconf.States) { message.NewMatrix().Send(fmt.Sprintf("%s: still searching for %+v", client, newstates)) continue } clientconf.States = newstates conf.Clients[client] = clientconf changed[client] = newstates } if len(changed) == 0 { return } logtr.Infof("updating config new states: %+v", conf) config.Set(conf) for client, states := range changed { if err := sendNewStates(client, states); err != nil { logtr.Errorf("failed to send new states %s/%+v: %v", client, states, err) } } } func parseOutZips(b []byte) []string { var zips []string candidates := zipFinder.FindAll(b, -1) for _, candidate := range candidates { if len(candidate) != 5 { continue } zips = append(zips, string(candidate)) } return zips } func parseOutStates(b []byte) []config.State { var states []config.State var state config.State candidates := stateFinder.FindAll(b, -1) for _, candidate := range candidates { if len(candidate) != 2 { continue } s := fmt.Sprintf(`"%s"`, bytes.ToUpper(candidate)) err := json.Unmarshal([]byte(s), &state) if err != nil { } else { states = append(states, state) } } return states } func __main() error { for { err := __mainOne() if err != nil { logtr.Errorf("failed _main: %v", err) } if config.Get().Once { time.Sleep(10 * time.Second) return err } if err != nil { time.Sleep(config.Get().Interval.Error.Get()) } else { time.Sleep(config.Get().Interval.OK.Get()) } } return nil } func __mainOne() error { logtr.Debugf("config.refreshing...") if err := config.Refresh(message.NewSOSMatrix); err != nil { logtr.SOSf("bad config: %v", err) return err } logtr.Debugf("once...") if err := once(); err != nil { logtr.SOSf("failed once(): %v", err) return err } logtr.Debugf("/__mainOne") return nil } func once() error { alljobs, err := getJobs() if err != nil { return err } logtr.Debugf("once: update dead jobs: %+v", alljobs) err = updateDeadJobs(alljobs) if err != nil { return err } logtr.Debugf("once: all jobs: %+v", alljobs) newjobs, err := dropStaleJobs(alljobs) if err != nil { return err } logtr.Debugf("once: new jobs: %+v", newjobs) jobs, err := dropBanlistJobs(newjobs) if err != nil { return err } logtr.Debugf("found jobs: %+v", jobs) if len(jobs) == 0 { return nil } logtr.Debugf("once: loading job secrets: %+v", jobs) for i := range jobs { jobs[i].Secrets() } logtr.Infof("once: sending jobs: %+v", jobs) db := config.Get().DB() for i := range jobs { if ok, err := sendJob(jobs[i]); err != nil { return err } else if ok { logtr.Debugf("sent job %+v", jobs[i]) if err := db.Set(jobs[i].UID(), []byte(`sent`)); err != nil { return err } } } return nil } func getJobs() ([]broker.Job, error) { if config.Get().Brokers.UseZips { return getJobsZips() } return getJobsStates() } func getJobsZips() ([]broker.Job, error) { zips := config.AllZips() jobs := []broker.Job{} for _, broker := range getBrokers() { somejobs, err := broker.SearchZips(zips) if err != nil { return nil, err } jobs = append(jobs, somejobs...) } return jobs, nil } func getJobsStates() ([]broker.Job, error) { states := config.AllStates() jobs := []broker.Job{} for _, broker := range getBrokers() { somejobs, err := broker.SearchStates(states) if err != nil { return nil, err } jobs = append(jobs, somejobs...) } return jobs, nil } func getBrokers() []broker.Broker { brokers := []broker.Broker{} if config.Get().Brokers.NTG.Enabled { logtr.Debugf("NTG enabled") brokers = append(brokers, broker.NewNTGVision()) } if config.Get().Brokers.FastExact.Enabled { logtr.Debugf("FastExact enabled") brokers = append(brokers, broker.NewFastExact()) } return brokers } type recordedJob struct { Job broker.Job SentNS int64 MatrixID string MatrixImageIDs []string } func updateDeadJobs(jobs []broker.Job) error { db := config.Get().DB() list, err := db.List([]string{}, "sent_job_", "sent_job_}}") if err != nil { return err } for _, listEntry := range list { wouldBe := strings.TrimPrefix(listEntry, "sent_job_") found := false for i := range jobs { found = found || jobs[i].UID() == wouldBe || jobs[i].ID == wouldBe } logtr.Debugf("found job %s to be still alive==%v", wouldBe, found) if !found { logtr.Debugf("updating dead job %+v", listEntry) b, err := db.Get(listEntry) if err != nil { return err } var recorded recordedJob if err := json.Unmarshal(b, &recorded); err != nil { return err } /* // TODO this beeps on fluffychat if err := message.NewMatrix().Update(recorded.MatrixID, recorded.Job.FormatMultilineTextDead()); err != nil { return err } */ if err := message.NewMatrix().Remove(recorded.MatrixID); err != nil { logtr.Debugf("failed to remove matrix: %v: %v", recorded.MatrixID, err) } if err := db.Set(listEntry, nil); err != nil { logtr.Debugf("failed to remove db: %v: %v", listEntry, err) } for _, imageid := range recorded.MatrixImageIDs { if err := message.NewMatrix().Remove(imageid); err != nil { logtr.Debugf("failed to remove matrix image: %v: %v", imageid, err) } } } } return nil } func dropStaleJobs(jobs []broker.Job) ([]broker.Job, error) { db := config.Get().DB() for i := len(jobs) - 1; i >= 0; i-- { if _, err := db.Get(jobs[i].UID()); err == storage.ErrNotFound { } else if err != nil { return nil, err } else { jobs = append(jobs[:i], jobs[i+1:]...) } } return jobs, nil } func dropBanlistJobs(jobs []broker.Job) ([]broker.Job, error) { // TODO return jobs, nil } func sendJob(job broker.Job) (bool, error) { sender := message.NewMatrix() payload := job.FormatMultilineText() logtr.Debugf("once: send job %s if nonzero: %q", job.String(), payload) if len(payload) == 0 { return false, nil } id, err := sender.SendTracked(payload) if err != nil { return false, err } recordedJob := recordedJob{ Job: job, SentNS: time.Now().UnixNano(), MatrixID: id, } defer func() { db := config.Get().DB() b, err := json.Marshal(recordedJob) if err != nil { logtr.Errorf("failed to marshal recorded job: %v", err) return } if err := db.Set("sent_job_"+job.UID(), b); err != nil { logtr.Errorf("failed to set recorded job: %v", err) return } }() maps := config.Get().Maps pickup := fmt.Sprintf("%s,%s", url.QueryEscape(job.Pickup.City), job.Pickup.State) dropoff := fmt.Sprintf("%s,%s", url.QueryEscape(job.Dropoff.City), job.Dropoff.State) if maps.Pathed.Enabled { directionsURI := fmt.Sprintf(maps.Pathed.DirectionsURIFormat, pickup, dropoff) resp, err := http.Get(directionsURI) if err != nil { return true, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return true, fmt.Errorf("bad status getting path: %d", resp.StatusCode) } var directionsResp struct { Routes []struct { Legs []struct { Steps []struct { StartLocation struct { Lat float32 `json:"lat"` Lng float32 `json:"lng"` } `json:"start_location"` } `json:"steps"` } `json:"legs"` } `json:"routes"` } if err := json.NewDecoder(resp.Body).Decode(&directionsResp); err != nil { return true, err } if len(directionsResp.Routes) < 1 || len(directionsResp.Routes[0].Legs) < 1 || len(directionsResp.Routes[0].Legs[0].Steps) < 1 { } else { latLngPath := make([]string, 0) minLat := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lat maxLat := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lat minLng := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lng maxLng := directionsResp.Routes[0].Legs[0].Steps[0].StartLocation.Lng for _, v := range directionsResp.Routes[0].Legs[0].Steps { if v.StartLocation.Lng < minLng { minLng = v.StartLocation.Lng } if v.StartLocation.Lng > maxLng { maxLng = v.StartLocation.Lng } if v.StartLocation.Lat < minLat { minLat = v.StartLocation.Lat } if v.StartLocation.Lat > maxLat { maxLat = v.StartLocation.Lat } latLngPath = append(latLngPath, fmt.Sprintf("%.9f,%.9f", v.StartLocation.Lat, v.StartLocation.Lng)) } pathQuery := strings.Join(latLngPath, "|") uri := fmt.Sprintf(maps.Pathed.PathedURIFormat, pathQuery, pickup, dropoff) // if the bigger delta is