Create crond scheduler and dependencies and store to disk

master
bel 2020-03-12 15:14:25 -06:00
parent f022b26987
commit 122806f438
11 changed files with 508 additions and 0 deletions

21
logger/logger.go Normal file
View File

@ -0,0 +1,21 @@
package logger
import (
"local/logb"
)
type Logger struct{}
func New() Logger {
return Logger{}
}
func (l Logger) Info(m string, args ...interface{}) {
args = append([]interface{}{m}, args...)
logb.Info(args...)
}
func (l Logger) Error(e error, m string, args ...interface{}) {
args = append([]interface{}{e, m}, args...)
logb.Error(args...)
}

41
logger/logger_test.go Normal file
View File

@ -0,0 +1,41 @@
package logger
import (
"bytes"
"errors"
"io/ioutil"
"local/logb"
"os"
"testing"
cron "github.com/robfig/cron/v3"
)
func TestInterface(t *testing.T) {
l := New()
var logger cron.Logger
logger = l
w := bytes.NewBuffer(nil)
logb.SetWriter(w)
f, err := ioutil.TempFile(os.TempDir(), "logger.testInterface")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
was := os.Stderr
os.Stderr = f
defer func() {
os.Stderr = was
}()
logger.Info("hello from %v", "me")
logger.Error(errors.New("bad"), "error from %v", "me")
if !bytes.Contains(w.Bytes(), []byte(`error from me`)) {
t.Errorf("%s", w.Bytes())
}
if !bytes.Contains(w.Bytes(), []byte(`hello from me`)) {
t.Errorf("%s", w.Bytes())
}
}

8
scheduler/errors.go Normal file
View File

@ -0,0 +1,8 @@
package scheduler
import "errors"
var ErrBadCron = errors.New("bad cron input detected")
var ErrBadRunner = errors.New("bad cron runner")
var ErrJobNotFound = errors.New("bad job name given")
var ErrDuplicateJob = errors.New("duplicate job name given")

10
scheduler/errors_test.go Normal file
View File

@ -0,0 +1,10 @@
package scheduler
import "testing"
func TestErr(t *testing.T) {
var e, f error
e = f
e = ErrBadCron
f = e
}

73
scheduler/job.go Normal file
View File

@ -0,0 +1,73 @@
package scheduler
import (
"bytes"
"encoding/gob"
"local/firestormy/logger"
"os/exec"
"github.com/google/uuid"
)
type Job struct {
Name string
Schedule string
Raw string
Runner Runner
foo func()
}
func NewJob(runner Runner, schedule, raw string) (*Job, error) {
switch runner {
case Bash:
return newBashJob(schedule, raw)
default:
return nil, ErrBadRunner
}
}
func newBashJob(schedule, sh string) (*Job, error) {
if !validCron(schedule) {
return nil, ErrBadCron
}
return &Job{
Name: uuid.New().String(),
Schedule: schedule,
Raw: sh,
Runner: Bash,
foo: func() {
cmd := exec.Command("bash", "-c", sh)
out, err := cmd.CombinedOutput()
if err != nil {
panic(err)
}
logger.New().Info("executed %s: %s", sh, out)
},
}, nil
}
func (j *Job) Run() {
j.foo()
}
func (j *Job) Encode() ([]byte, error) {
buff := bytes.NewBuffer(nil)
encoder := gob.NewEncoder(buff)
err := encoder.Encode(*j)
return buff.Bytes(), err
}
func (j *Job) Decode(b []byte) error {
buff := bytes.NewReader(b)
decoder := gob.NewDecoder(buff)
err := decoder.Decode(j)
if err != nil {
return err
}
k, err := NewJob(j.Runner, j.Schedule, j.Raw)
if err == nil {
k.Name = j.Name
*j = *k
}
return err
}

111
scheduler/job_test.go Normal file
View File

@ -0,0 +1,111 @@
package scheduler
import (
"bytes"
"io/ioutil"
"local/logb"
"os"
"testing"
)
func TestNewBashJobBadCron(t *testing.T) {
_, err := newBashJob("1 1 1 ", "hostname")
if err == nil {
t.Fatal(err)
}
}
func TestNewBashJobAndRun(t *testing.T) {
cases := []struct {
sched string
cmd string
out string
}{
{
sched: "1 1 1 1 1",
cmd: "hostname",
out: os.Getenv("HOSTNAME"),
},
{
sched: "@hourly",
cmd: "hostname",
out: os.Getenv("HOSTNAME"),
},
{
sched: "@hourly",
cmd: "exit 1",
},
}
for _, c := range cases {
b, clean := captureLog()
defer clean()
j, err := newBashJob(c.sched, c.cmd)
if err != nil {
t.Error(err)
continue
}
func() {
defer func() {
recover()
}()
j.Run()
}()
if !bytes.Contains(b.Bytes(), []byte(c.out)) {
t.Errorf("(%s, %s) => %s", c.sched, c.cmd, b.Bytes())
}
}
}
func TestJobEncodeDecode(t *testing.T) {
buff, clean := captureLog()
defer clean()
j, err := newBashJob("1 1 1 1 1", "hostname")
if err != nil {
t.Fatal(err)
}
j.foo()
if !bytes.Contains(buff.Bytes(), []byte(os.Getenv("HOSTNAME"))) {
t.Errorf("%s", buff.Bytes())
}
buff.Reset()
b, err := j.Encode()
if err != nil {
t.Fatal(err)
}
k := &Job{}
if err := k.Decode(b); err != nil {
t.Logf("decoded %+v", k)
t.Fatal(err)
}
if k.Schedule != j.Schedule {
t.Error(k.Schedule, "vs", j.Schedule)
}
if k.Name != j.Name {
t.Error(k.Name, "vs", j.Name)
}
k.foo()
if !bytes.Contains(buff.Bytes(), []byte(os.Getenv("HOSTNAME"))) {
t.Errorf("%s", buff.Bytes())
}
}
func captureLog() (*bytes.Buffer, func()) {
was := logb.Writer()
wase := os.Stderr
f, _ := ioutil.TempFile(os.TempDir(), "test.newBashJobAndRun")
os.Stderr = f
b := bytes.NewBuffer(nil)
logb.SetWriter(b)
return b, func() {
os.Remove(f.Name())
logb.SetWriter(was)
os.Stderr = wase
}
}

19
scheduler/parser.go Normal file
View File

@ -0,0 +1,19 @@
package scheduler
import cron "github.com/robfig/cron/v3"
func getParser() cron.Parser {
return cron.NewParser(
cron.Minute |
cron.Hour |
cron.Dom |
cron.Month |
cron.Dow |
cron.Descriptor,
)
}
func validCron(s string) bool {
_, err := getParser().Parse(s)
return err == nil
}

36
scheduler/parser_test.go Normal file
View File

@ -0,0 +1,36 @@
package scheduler
import "testing"
func TestValidCronFail(t *testing.T) {
cases := []string{
"a 1 1 1 1",
"1 1 1 1",
"1 1 1 1 1 1",
"@minutely",
}
for _, c := range cases {
if validCron(c) {
t.Error("should fail:", c)
}
}
}
func TestValidCronPass(t *testing.T) {
cases := []string{
"1 1 1 1 1",
"* * 1 1 1",
"@hourly",
"@daily",
"@yearly",
"@weekly",
"* */5 1 1 1",
}
for _, c := range cases {
if !validCron(c) {
t.Error("should pass:", c)
}
}
}

7
scheduler/runner.go Normal file
View File

@ -0,0 +1,7 @@
package scheduler
type Runner int
const (
Bash Runner = iota + 1
)

121
scheduler/scheduler.go Normal file
View File

@ -0,0 +1,121 @@
package scheduler
import (
"fmt"
"local/firestormy/config"
"local/firestormy/logger"
"time"
cron "github.com/robfig/cron/v3"
)
type Scheduler struct {
cron *cron.Cron
running map[string]cron.EntryID
}
func New() *Scheduler {
l := logger.New()
c := cron.New(
cron.WithLocation(time.Local),
cron.WithLogger(l),
cron.WithChain(
cron.SkipIfStillRunning(l),
cron.Recover(l),
),
)
return &Scheduler{
cron: c,
running: make(map[string]cron.EntryID),
}
}
func (s *Scheduler) Start() error {
jobs, err := s.List()
if err != nil {
return err
}
for i := range jobs {
if err := s.Add(jobs[i]); err != nil && err != ErrDuplicateJob {
return err
}
}
s.cron.Start()
return nil
}
func (s *Scheduler) Stop() error {
ctx := s.cron.Stop()
<-ctx.Done()
return nil
}
func (s *Scheduler) List() ([]*Job, error) {
entries, err := config.Store.List(nil)
if err != nil {
return nil, err
}
errors := []error{}
jobs := []*Job{}
for _, k := range entries {
j, err := s.loadJobFromStore(k)
if err != nil {
errors = append(errors, err)
} else {
jobs = append(jobs, j)
}
}
err = nil
if len(errors) > 0 {
err = fmt.Errorf("errors listing jobs: %v", errors)
}
return jobs, err
}
func (s *Scheduler) loadJobFromStore(k string) (*Job, error) {
b, err := config.Store.Get(k)
if err != nil {
return nil, err
}
j := &Job{}
err = j.Decode(b)
return j, err
}
func (s *Scheduler) Add(j *Job) error {
if _, ok := s.getEntry(j); ok {
return ErrDuplicateJob
}
b, err := j.Encode()
if err != nil {
return err
}
if err := config.Store.Set(j.Name, b); err != nil {
return err
}
entryID, err := s.cron.AddJob(j.Schedule, j)
if err != nil {
return err
}
s.running[j.Name] = entryID
return nil
}
func (s *Scheduler) Remove(j *Job) error {
entryID, ok := s.getEntry(j)
if !ok {
return ErrJobNotFound
}
was := len(s.cron.Entries())
s.cron.Remove(entryID)
is := len(s.cron.Entries())
if was == is {
return ErrJobNotFound
}
return config.Store.Set(j.Name, nil)
}
func (s *Scheduler) getEntry(j *Job) (cron.EntryID, bool) {
entryID, ok := s.running[j.Name]
return entryID, ok
}

View File

@ -0,0 +1,61 @@
package scheduler
import (
"bytes"
"local/firestormy/config"
"local/storage"
"testing"
)
func TestSchedulerAddRemove(t *testing.T) {
config.Store, _ = storage.New(storage.MAP)
s := New()
j, err := NewJob(Bash, "@hourly", "hostname")
if err != nil {
t.Fatal(err)
}
if err := s.Add(j); err != nil {
t.Fatal(err)
}
if list, err := s.List(); err != nil {
t.Fatal(err)
} else if len(list) != 1 {
t.Fatal(err)
}
if err := s.Remove(j); err != nil {
t.Fatal(err)
}
if list, err := s.List(); err != nil {
t.Fatal(err)
} else if len(list) != 0 {
t.Fatal(err)
}
}
func TestSchedulerStartStop(t *testing.T) {
b, clean := captureLog()
defer clean()
config.Store, _ = storage.New(storage.MAP)
s := New()
for i := 0; i < 5; i++ {
j, err := NewJob(Bash, "* * * * *", "hostname")
if err != nil {
t.Fatal(err)
}
if err := s.Add(j); err != nil {
t.Fatal(err)
}
}
if err := s.Start(); err != nil {
t.Fatal(err)
}
if err := s.Stop(); err != nil {
t.Fatal(err)
}
if n := bytes.Count(b.Bytes(), []byte("schedule")); n != 5 {
t.Errorf("%v: %s", n, b.Bytes())
}
}