diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..ab73030 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,21 @@ +package logger + +import ( + "local/logb" +) + +type Logger struct{} + +func New() Logger { + return Logger{} +} + +func (l Logger) Info(m string, args ...interface{}) { + args = append([]interface{}{m}, args...) + logb.Info(args...) +} + +func (l Logger) Error(e error, m string, args ...interface{}) { + args = append([]interface{}{e, m}, args...) + logb.Error(args...) +} diff --git a/logger/logger_test.go b/logger/logger_test.go new file mode 100644 index 0000000..31f4409 --- /dev/null +++ b/logger/logger_test.go @@ -0,0 +1,41 @@ +package logger + +import ( + "bytes" + "errors" + "io/ioutil" + "local/logb" + "os" + "testing" + + cron "github.com/robfig/cron/v3" +) + +func TestInterface(t *testing.T) { + l := New() + var logger cron.Logger + logger = l + + w := bytes.NewBuffer(nil) + logb.SetWriter(w) + f, err := ioutil.TempFile(os.TempDir(), "logger.testInterface") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + was := os.Stderr + os.Stderr = f + defer func() { + os.Stderr = was + }() + + logger.Info("hello from %v", "me") + logger.Error(errors.New("bad"), "error from %v", "me") + + if !bytes.Contains(w.Bytes(), []byte(`error from me`)) { + t.Errorf("%s", w.Bytes()) + } + if !bytes.Contains(w.Bytes(), []byte(`hello from me`)) { + t.Errorf("%s", w.Bytes()) + } +} diff --git a/scheduler/errors.go b/scheduler/errors.go new file mode 100644 index 0000000..1af6ff7 --- /dev/null +++ b/scheduler/errors.go @@ -0,0 +1,8 @@ +package scheduler + +import "errors" + +var ErrBadCron = errors.New("bad cron input detected") +var ErrBadRunner = errors.New("bad cron runner") +var ErrJobNotFound = errors.New("bad job name given") +var ErrDuplicateJob = errors.New("duplicate job name given") diff --git a/scheduler/errors_test.go b/scheduler/errors_test.go new file mode 100644 index 0000000..5d43e11 --- /dev/null +++ b/scheduler/errors_test.go @@ -0,0 +1,10 @@ +package scheduler + +import "testing" + +func TestErr(t *testing.T) { + var e, f error + e = f + e = ErrBadCron + f = e +} diff --git a/scheduler/job.go b/scheduler/job.go new file mode 100644 index 0000000..7224ee0 --- /dev/null +++ b/scheduler/job.go @@ -0,0 +1,73 @@ +package scheduler + +import ( + "bytes" + "encoding/gob" + "local/firestormy/logger" + "os/exec" + + "github.com/google/uuid" +) + +type Job struct { + Name string + Schedule string + Raw string + Runner Runner + foo func() +} + +func NewJob(runner Runner, schedule, raw string) (*Job, error) { + switch runner { + case Bash: + return newBashJob(schedule, raw) + default: + return nil, ErrBadRunner + } +} + +func newBashJob(schedule, sh string) (*Job, error) { + if !validCron(schedule) { + return nil, ErrBadCron + } + return &Job{ + Name: uuid.New().String(), + Schedule: schedule, + Raw: sh, + Runner: Bash, + foo: func() { + cmd := exec.Command("bash", "-c", sh) + out, err := cmd.CombinedOutput() + if err != nil { + panic(err) + } + logger.New().Info("executed %s: %s", sh, out) + }, + }, nil +} + +func (j *Job) Run() { + j.foo() +} + +func (j *Job) Encode() ([]byte, error) { + buff := bytes.NewBuffer(nil) + encoder := gob.NewEncoder(buff) + err := encoder.Encode(*j) + return buff.Bytes(), err +} + +func (j *Job) Decode(b []byte) error { + buff := bytes.NewReader(b) + decoder := gob.NewDecoder(buff) + err := decoder.Decode(j) + if err != nil { + return err + } + k, err := NewJob(j.Runner, j.Schedule, j.Raw) + if err == nil { + k.Name = j.Name + *j = *k + } + return err +} diff --git a/scheduler/job_test.go b/scheduler/job_test.go new file mode 100644 index 0000000..63ce564 --- /dev/null +++ b/scheduler/job_test.go @@ -0,0 +1,111 @@ +package scheduler + +import ( + "bytes" + "io/ioutil" + "local/logb" + "os" + "testing" +) + +func TestNewBashJobBadCron(t *testing.T) { + _, err := newBashJob("1 1 1 ", "hostname") + if err == nil { + t.Fatal(err) + } +} + +func TestNewBashJobAndRun(t *testing.T) { + cases := []struct { + sched string + cmd string + out string + }{ + { + sched: "1 1 1 1 1", + cmd: "hostname", + out: os.Getenv("HOSTNAME"), + }, + { + sched: "@hourly", + cmd: "hostname", + out: os.Getenv("HOSTNAME"), + }, + { + sched: "@hourly", + cmd: "exit 1", + }, + } + + for _, c := range cases { + b, clean := captureLog() + defer clean() + j, err := newBashJob(c.sched, c.cmd) + if err != nil { + t.Error(err) + continue + } + func() { + defer func() { + recover() + }() + j.Run() + }() + if !bytes.Contains(b.Bytes(), []byte(c.out)) { + t.Errorf("(%s, %s) => %s", c.sched, c.cmd, b.Bytes()) + } + } +} + +func TestJobEncodeDecode(t *testing.T) { + buff, clean := captureLog() + defer clean() + + j, err := newBashJob("1 1 1 1 1", "hostname") + if err != nil { + t.Fatal(err) + } + + j.foo() + if !bytes.Contains(buff.Bytes(), []byte(os.Getenv("HOSTNAME"))) { + t.Errorf("%s", buff.Bytes()) + } + buff.Reset() + + b, err := j.Encode() + if err != nil { + t.Fatal(err) + } + + k := &Job{} + if err := k.Decode(b); err != nil { + t.Logf("decoded %+v", k) + t.Fatal(err) + } + + if k.Schedule != j.Schedule { + t.Error(k.Schedule, "vs", j.Schedule) + } + if k.Name != j.Name { + t.Error(k.Name, "vs", j.Name) + } + + k.foo() + if !bytes.Contains(buff.Bytes(), []byte(os.Getenv("HOSTNAME"))) { + t.Errorf("%s", buff.Bytes()) + } +} + +func captureLog() (*bytes.Buffer, func()) { + was := logb.Writer() + wase := os.Stderr + f, _ := ioutil.TempFile(os.TempDir(), "test.newBashJobAndRun") + os.Stderr = f + b := bytes.NewBuffer(nil) + logb.SetWriter(b) + return b, func() { + os.Remove(f.Name()) + logb.SetWriter(was) + os.Stderr = wase + } +} diff --git a/scheduler/parser.go b/scheduler/parser.go new file mode 100644 index 0000000..5ceeef5 --- /dev/null +++ b/scheduler/parser.go @@ -0,0 +1,19 @@ +package scheduler + +import cron "github.com/robfig/cron/v3" + +func getParser() cron.Parser { + return cron.NewParser( + cron.Minute | + cron.Hour | + cron.Dom | + cron.Month | + cron.Dow | + cron.Descriptor, + ) +} + +func validCron(s string) bool { + _, err := getParser().Parse(s) + return err == nil +} diff --git a/scheduler/parser_test.go b/scheduler/parser_test.go new file mode 100644 index 0000000..4fcf659 --- /dev/null +++ b/scheduler/parser_test.go @@ -0,0 +1,36 @@ +package scheduler + +import "testing" + +func TestValidCronFail(t *testing.T) { + cases := []string{ + "a 1 1 1 1", + "1 1 1 1", + "1 1 1 1 1 1", + "@minutely", + } + + for _, c := range cases { + if validCron(c) { + t.Error("should fail:", c) + } + } +} + +func TestValidCronPass(t *testing.T) { + cases := []string{ + "1 1 1 1 1", + "* * 1 1 1", + "@hourly", + "@daily", + "@yearly", + "@weekly", + "* */5 1 1 1", + } + + for _, c := range cases { + if !validCron(c) { + t.Error("should pass:", c) + } + } +} diff --git a/scheduler/runner.go b/scheduler/runner.go new file mode 100644 index 0000000..cf5f083 --- /dev/null +++ b/scheduler/runner.go @@ -0,0 +1,7 @@ +package scheduler + +type Runner int + +const ( + Bash Runner = iota + 1 +) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go new file mode 100644 index 0000000..591b8b6 --- /dev/null +++ b/scheduler/scheduler.go @@ -0,0 +1,121 @@ +package scheduler + +import ( + "fmt" + "local/firestormy/config" + "local/firestormy/logger" + "time" + + cron "github.com/robfig/cron/v3" +) + +type Scheduler struct { + cron *cron.Cron + running map[string]cron.EntryID +} + +func New() *Scheduler { + l := logger.New() + c := cron.New( + cron.WithLocation(time.Local), + cron.WithLogger(l), + cron.WithChain( + cron.SkipIfStillRunning(l), + cron.Recover(l), + ), + ) + return &Scheduler{ + cron: c, + running: make(map[string]cron.EntryID), + } +} + +func (s *Scheduler) Start() error { + jobs, err := s.List() + if err != nil { + return err + } + for i := range jobs { + if err := s.Add(jobs[i]); err != nil && err != ErrDuplicateJob { + return err + } + } + s.cron.Start() + return nil +} + +func (s *Scheduler) Stop() error { + ctx := s.cron.Stop() + <-ctx.Done() + return nil +} + +func (s *Scheduler) List() ([]*Job, error) { + entries, err := config.Store.List(nil) + if err != nil { + return nil, err + } + errors := []error{} + jobs := []*Job{} + for _, k := range entries { + j, err := s.loadJobFromStore(k) + if err != nil { + errors = append(errors, err) + } else { + jobs = append(jobs, j) + } + } + err = nil + if len(errors) > 0 { + err = fmt.Errorf("errors listing jobs: %v", errors) + } + return jobs, err +} + +func (s *Scheduler) loadJobFromStore(k string) (*Job, error) { + b, err := config.Store.Get(k) + if err != nil { + return nil, err + } + j := &Job{} + err = j.Decode(b) + return j, err +} + +func (s *Scheduler) Add(j *Job) error { + if _, ok := s.getEntry(j); ok { + return ErrDuplicateJob + } + b, err := j.Encode() + if err != nil { + return err + } + if err := config.Store.Set(j.Name, b); err != nil { + return err + } + entryID, err := s.cron.AddJob(j.Schedule, j) + if err != nil { + return err + } + s.running[j.Name] = entryID + return nil +} + +func (s *Scheduler) Remove(j *Job) error { + entryID, ok := s.getEntry(j) + if !ok { + return ErrJobNotFound + } + was := len(s.cron.Entries()) + s.cron.Remove(entryID) + is := len(s.cron.Entries()) + if was == is { + return ErrJobNotFound + } + return config.Store.Set(j.Name, nil) +} + +func (s *Scheduler) getEntry(j *Job) (cron.EntryID, bool) { + entryID, ok := s.running[j.Name] + return entryID, ok +} diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go new file mode 100644 index 0000000..86eb2a2 --- /dev/null +++ b/scheduler/scheduler_test.go @@ -0,0 +1,61 @@ +package scheduler + +import ( + "bytes" + "local/firestormy/config" + "local/storage" + "testing" +) + +func TestSchedulerAddRemove(t *testing.T) { + config.Store, _ = storage.New(storage.MAP) + s := New() + j, err := NewJob(Bash, "@hourly", "hostname") + if err != nil { + t.Fatal(err) + } + if err := s.Add(j); err != nil { + t.Fatal(err) + } + if list, err := s.List(); err != nil { + t.Fatal(err) + } else if len(list) != 1 { + t.Fatal(err) + } + if err := s.Remove(j); err != nil { + t.Fatal(err) + } + if list, err := s.List(); err != nil { + t.Fatal(err) + } else if len(list) != 0 { + t.Fatal(err) + } +} + +func TestSchedulerStartStop(t *testing.T) { + b, clean := captureLog() + defer clean() + config.Store, _ = storage.New(storage.MAP) + + s := New() + for i := 0; i < 5; i++ { + j, err := NewJob(Bash, "* * * * *", "hostname") + if err != nil { + t.Fatal(err) + } + if err := s.Add(j); err != nil { + t.Fatal(err) + } + } + + if err := s.Start(); err != nil { + t.Fatal(err) + } + if err := s.Stop(); err != nil { + t.Fatal(err) + } + + if n := bytes.Count(b.Bytes(), []byte("schedule")); n != 5 { + t.Errorf("%v: %s", n, b.Bytes()) + } +} diff --git a/vendor/vendor.json b/vendor/vendor.json new file mode 100644 index 0000000..c5faa54 --- /dev/null +++ b/vendor/vendor.json @@ -0,0 +1,6 @@ +{ + "comment": "", + "ignore": "test", + "package": [], + "rootPath": "local/firestormy" +}