package scheduler import ( "bytes" "errors" "fmt" "io/ioutil" "regexp" "strings" "time" "gitea.inhome.blapointe.com/local/firestormy/config" "gitea.inhome.blapointe.com/local/firestormy/config/ns" "gitea.inhome.blapointe.com/local/firestormy/logger" "gitea.inhome.blapointe.com/local/logb" cron "github.com/robfig/cron/v3" ) var Schedule *Scheduler = New() type Scheduler struct { cron *cron.Cron running map[string]cron.EntryID } type semaphored struct { job cron.Job ch chan struct{} l logger.Logger } func (s semaphored) Run() { s.l.Info("scheduler.semaphored.Run() | acquiring...") s.ch <- struct{}{} s.l.Info("scheduler.semaphored.Run() | acquired") s.job.Run() s.l.Info("scheduler.semaphored.Run() | releasing...") <-s.ch s.l.Info("scheduler.semaphored.Run() | released") } func New() *Scheduler { l := logger.New() semaphore := make(chan struct{}, 2) c := cron.New( cron.WithLocation(time.Local), cron.WithLogger(l), cron.WithChain( cron.SkipIfStillRunning(l), cron.Recover(l), func(j cron.Job) cron.Job { return semaphored{ l: l, job: j, ch: semaphore, } }, ), cron.WithParser(getParser()), ) return &Scheduler{ cron: c, running: make(map[string]cron.EntryID), } } func NewFromFile(config string) (*Scheduler, error) { f, err := ioutil.ReadFile(config) if err != nil { return nil, err } s := New() for _, line := range bytes.Split(f, []byte("\n")) { line = cleanLine(line) if len(line) == 0 { continue } schedule, command, title := splitScheduleCommandTitle(line) if len(schedule) == 0 || len(command) == 0 { continue } job, err := newBashJob(schedule, command, title) if err != nil { logger.New().Error(err, "cannot fully parse file: new job error", config, ", sched", schedule, ", comm", command) continue } if err := s.Add(job); err != nil { logger.New().Error(err, "cannot fully parse file: add job error", config) continue } } jobs, _ := s.List() if len(jobs) == 0 { return nil, errors.New("no jobs parsed from file " + config) } return s, nil } func cleanLine(b []byte) []byte { b = bytes.Trim(b, "\t \n") if len(b) == 0 { return nil } if b[0] == '#' { return nil } return b } func splitScheduleCommandTitle(b []byte) (string, string, string) { re := regexp.MustCompile(`^((\d+|\*\/\d+|(\d,)*\d+|\*) [ ]*){5,6}`) schedule := string(re.Find(b)) if len(schedule) == 0 { return "", "", "" } commandTitle := strings.TrimPrefix(string(b), schedule) schedule = strings.TrimSpace(schedule) commandTitle = strings.TrimSpace(commandTitle) title := "" if i := strings.LastIndex(commandTitle, "#"); i >= 0 { title = commandTitle[i+1:] } title = strings.TrimSpace(title) return schedule, commandTitle, title } func (s *Scheduler) Start() error { jobs, err := s.List() if err != nil { return err } for i := range jobs { if err := s.Add(jobs[i]); err != nil && err != ErrDuplicateJob { return err } } s.cron.Start() entries := s.cron.Entries() logb.Debugf("[sched] start: %+v", entries) logger.New().Info("started", len(entries), "jobs") for _, entry := range entries { logger.New().Info(fmt.Sprintf("%+v", entry)) } return nil } func (s *Scheduler) Stop() error { ctx := s.cron.Stop() <-ctx.Done() return nil } func (s *Scheduler) List() ([]*Job, error) { entries, err := config.Store.List(ns.Jobs) logb.Debugf("[sched] list: %+v", entries) if err != nil { return nil, err } errors := []error{} jobs := []*Job{} for _, k := range entries { j, err := s.loadJobFromStore(k) if err != nil { errors = append(errors, err) } else { jobs = append(jobs, j) } } err = nil if len(errors) > 0 { err = fmt.Errorf("errors listing jobs: %v", errors) } return jobs, err } func (s *Scheduler) loadJobFromStore(k string) (*Job, error) { b, err := config.Store.Get(k, ns.Jobs...) if err != nil { return nil, err } j := &Job{} err = j.Decode(b) return j, err } func (s *Scheduler) Update(j *Job) error { logb.Debugf("[sched] update: %+v", j) entryID, ok := s.getEntry(j) if !ok { return errors.New("job not found in storage") } i, err := s.loadJobFromStore(j.Name) if err != nil { return err } j.LastStatus = i.LastStatus j.LastOutput = i.LastOutput j.LastRuntime = i.LastRuntime j.LastRun = i.LastRun b, err := j.Encode() if err != nil { return err } if err := config.Store.Set(j.Name, b, ns.Jobs...); err != nil { return err } s.cron.Remove(entryID) entryID, err = s.cron.AddJob(j.Schedule, j) if err != nil { return err } s.running[j.Name] = entryID return nil } func (s *Scheduler) Add(j *Job) error { logb.Debugf("[sched] add: %+v", j) if _, ok := s.getEntry(j); ok { return ErrDuplicateJob } b, err := j.Encode() if err != nil { return err } entryID, err := s.cron.AddJob(j.Schedule, j) if err != nil { return err } if err := config.Store.Set(j.Name, b, ns.Jobs...); err != nil { return err } s.running[j.Name] = entryID return nil } func (s *Scheduler) Remove(j *Job) error { logb.Debugf("[sched] rm: %+v", j) entryID, ok := s.getEntry(j) if !ok { return ErrJobNotFound } was := len(s.cron.Entries()) s.cron.Remove(entryID) is := len(s.cron.Entries()) if was == is { return ErrJobNotFound } if err := config.Store.Set(j.Name, nil, ns.Jobs...); err != nil { return err } return config.Store.Set(j.Name, nil, ns.JobsRaw...) } func (s *Scheduler) getEntry(j *Job) (cron.EntryID, bool) { entryID, ok := s.running[j.Name] return entryID, ok }