firestormy/scheduler/scheduler.go

182 lines
3.5 KiB
Go

package scheduler
import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"local/firestormy/config"
"local/firestormy/logger"
"regexp"
"strings"
"time"
cron "github.com/robfig/cron/v3"
)
type Scheduler struct {
cron *cron.Cron
running map[string]cron.EntryID
}
func New() *Scheduler {
l := logger.New()
c := cron.New(
cron.WithLocation(time.Local),
cron.WithLogger(l),
cron.WithChain(
cron.SkipIfStillRunning(l),
cron.Recover(l),
),
)
return &Scheduler{
cron: c,
running: make(map[string]cron.EntryID),
}
}
func NewFromFile(config string) (*Scheduler, error) {
f, err := ioutil.ReadFile(config)
if err != nil {
return nil, err
}
s := New()
for _, line := range bytes.Split(f, []byte("\n")) {
line = cleanLine(line)
if len(line) == 0 {
continue
}
schedule, command := splitScheduleCommand(line)
if len(schedule) == 0 || len(command) == 0 {
continue
}
job, err := NewJob(Bash, schedule, command)
if err != nil {
logger.New().Error(err, "cannot fully parse file: new job error", config, ", sched", schedule, ", comm", command)
continue
}
if err := s.Add(job); err != nil {
logger.New().Error(err, "cannot fully parse file: add job error", config)
continue
}
}
jobs, _ := s.List()
if len(jobs) == 0 {
return nil, errors.New("no jobs parsed from file " + config)
}
return s, nil
}
func cleanLine(b []byte) []byte {
b = bytes.Trim(b, "\t \n")
if len(b) == 0 {
return nil
}
if b[0] == '#' {
return nil
}
return b
}
func splitScheduleCommand(b []byte) (string, string) {
re := regexp.MustCompile(`^((\d+|\*\/\d+|(\d,)*\d+|\*) [ ]*){5}`)
schedule := string(re.Find(b))
if len(schedule) == 0 {
return "", ""
}
command := strings.TrimPrefix(string(b), schedule)
schedule = strings.TrimSpace(schedule)
command = strings.TrimSpace(command)
return schedule, command
}
func (s *Scheduler) Start() error {
jobs, err := s.List()
if err != nil {
return err
}
for i := range jobs {
if err := s.Add(jobs[i]); err != nil && err != ErrDuplicateJob {
return err
}
}
s.cron.Start()
return nil
}
func (s *Scheduler) Stop() error {
ctx := s.cron.Stop()
<-ctx.Done()
return nil
}
func (s *Scheduler) List() ([]*Job, error) {
entries, err := config.Store.List(nil)
if err != nil {
return nil, err
}
errors := []error{}
jobs := []*Job{}
for _, k := range entries {
j, err := s.loadJobFromStore(k)
if err != nil {
errors = append(errors, err)
} else {
jobs = append(jobs, j)
}
}
err = nil
if len(errors) > 0 {
err = fmt.Errorf("errors listing jobs: %v", errors)
}
return jobs, err
}
func (s *Scheduler) loadJobFromStore(k string) (*Job, error) {
b, err := config.Store.Get(k)
if err != nil {
return nil, err
}
j := &Job{}
err = j.Decode(b)
return j, err
}
func (s *Scheduler) Add(j *Job) error {
if _, ok := s.getEntry(j); ok {
return ErrDuplicateJob
}
b, err := j.Encode()
if err != nil {
return err
}
if err := config.Store.Set(j.Name, b); err != nil {
return err
}
entryID, err := s.cron.AddJob(j.Schedule, j)
if err != nil {
return err
}
s.running[j.Name] = entryID
return nil
}
func (s *Scheduler) Remove(j *Job) error {
entryID, ok := s.getEntry(j)
if !ok {
return ErrJobNotFound
}
was := len(s.cron.Entries())
s.cron.Remove(entryID)
is := len(s.cron.Entries())
if was == is {
return ErrJobNotFound
}
return config.Store.Set(j.Name, nil)
}
func (s *Scheduler) getEntry(j *Job) (cron.EntryID, bool) {
entryID, ok := s.running[j.Name]
return entryID, ok
}