Create crond scheduler and dependencies and store to disk
parent
f022b26987
commit
65655080dd
|
|
@ -0,0 +1,21 @@
|
||||||
|
package logger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"local/logb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Logger struct{}
|
||||||
|
|
||||||
|
func New() Logger {
|
||||||
|
return Logger{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l Logger) Info(m string, args ...interface{}) {
|
||||||
|
args = append([]interface{}{m}, args...)
|
||||||
|
logb.Info(args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l Logger) Error(e error, m string, args ...interface{}) {
|
||||||
|
args = append([]interface{}{e, m}, args...)
|
||||||
|
logb.Error(args...)
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
package logger
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"io/ioutil"
|
||||||
|
"local/logb"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
cron "github.com/robfig/cron/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInterface(t *testing.T) {
|
||||||
|
l := New()
|
||||||
|
var logger cron.Logger
|
||||||
|
logger = l
|
||||||
|
|
||||||
|
w := bytes.NewBuffer(nil)
|
||||||
|
logb.SetWriter(w)
|
||||||
|
f, err := ioutil.TempFile(os.TempDir(), "logger.testInterface")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.Remove(f.Name())
|
||||||
|
was := os.Stderr
|
||||||
|
os.Stderr = f
|
||||||
|
defer func() {
|
||||||
|
os.Stderr = was
|
||||||
|
}()
|
||||||
|
|
||||||
|
logger.Info("hello from %v", "me")
|
||||||
|
logger.Error(errors.New("bad"), "error from %v", "me")
|
||||||
|
|
||||||
|
if !bytes.Contains(w.Bytes(), []byte(`error from me`)) {
|
||||||
|
t.Errorf("%s", w.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(w.Bytes(), []byte(`hello from me`)) {
|
||||||
|
t.Errorf("%s", w.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var ErrBadCron = errors.New("bad cron input detected")
|
||||||
|
var ErrBadRunner = errors.New("bad cron runner")
|
||||||
|
var ErrJobNotFound = errors.New("bad job name given")
|
||||||
|
var ErrDuplicateJob = errors.New("duplicate job name given")
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestErr(t *testing.T) {
|
||||||
|
var e, f error
|
||||||
|
e = f
|
||||||
|
e = ErrBadCron
|
||||||
|
f = e
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,73 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/gob"
|
||||||
|
"local/firestormy/logger"
|
||||||
|
"os/exec"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
Name string
|
||||||
|
Schedule string
|
||||||
|
Raw string
|
||||||
|
Runner Runner
|
||||||
|
foo func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJob(runner Runner, schedule, raw string) (*Job, error) {
|
||||||
|
switch runner {
|
||||||
|
case Bash:
|
||||||
|
return newBashJob(schedule, raw)
|
||||||
|
default:
|
||||||
|
return nil, ErrBadRunner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBashJob(schedule, sh string) (*Job, error) {
|
||||||
|
if !validCron(schedule) {
|
||||||
|
return nil, ErrBadCron
|
||||||
|
}
|
||||||
|
return &Job{
|
||||||
|
Name: uuid.New().String(),
|
||||||
|
Schedule: schedule,
|
||||||
|
Raw: sh,
|
||||||
|
Runner: Bash,
|
||||||
|
foo: func() {
|
||||||
|
cmd := exec.Command("bash", "-c", sh)
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
logger.New().Info("executed %s: %s", sh, out)
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) Run() {
|
||||||
|
j.foo()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) Encode() ([]byte, error) {
|
||||||
|
buff := bytes.NewBuffer(nil)
|
||||||
|
encoder := gob.NewEncoder(buff)
|
||||||
|
err := encoder.Encode(*j)
|
||||||
|
return buff.Bytes(), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) Decode(b []byte) error {
|
||||||
|
buff := bytes.NewReader(b)
|
||||||
|
decoder := gob.NewDecoder(buff)
|
||||||
|
err := decoder.Decode(j)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
k, err := NewJob(j.Runner, j.Schedule, j.Raw)
|
||||||
|
if err == nil {
|
||||||
|
k.Name = j.Name
|
||||||
|
*j = *k
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,111 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"io/ioutil"
|
||||||
|
"local/logb"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewBashJobBadCron(t *testing.T) {
|
||||||
|
_, err := newBashJob("1 1 1 ", "hostname")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewBashJobAndRun(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
sched string
|
||||||
|
cmd string
|
||||||
|
out string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
sched: "1 1 1 1 1",
|
||||||
|
cmd: "hostname",
|
||||||
|
out: os.Getenv("HOSTNAME"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
sched: "@hourly",
|
||||||
|
cmd: "hostname",
|
||||||
|
out: os.Getenv("HOSTNAME"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
sched: "@hourly",
|
||||||
|
cmd: "exit 1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
b, clean := captureLog()
|
||||||
|
defer clean()
|
||||||
|
j, err := newBashJob(c.sched, c.cmd)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
recover()
|
||||||
|
}()
|
||||||
|
j.Run()
|
||||||
|
}()
|
||||||
|
if !bytes.Contains(b.Bytes(), []byte(c.out)) {
|
||||||
|
t.Errorf("(%s, %s) => %s", c.sched, c.cmd, b.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJobEncodeDecode(t *testing.T) {
|
||||||
|
buff, clean := captureLog()
|
||||||
|
defer clean()
|
||||||
|
|
||||||
|
j, err := newBashJob("1 1 1 1 1", "hostname")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
j.foo()
|
||||||
|
if !bytes.Contains(buff.Bytes(), []byte(os.Getenv("HOSTNAME"))) {
|
||||||
|
t.Errorf("%s", buff.Bytes())
|
||||||
|
}
|
||||||
|
buff.Reset()
|
||||||
|
|
||||||
|
b, err := j.Encode()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
k := &Job{}
|
||||||
|
if err := k.Decode(b); err != nil {
|
||||||
|
t.Logf("decoded %+v", k)
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if k.Schedule != j.Schedule {
|
||||||
|
t.Error(k.Schedule, "vs", j.Schedule)
|
||||||
|
}
|
||||||
|
if k.Name != j.Name {
|
||||||
|
t.Error(k.Name, "vs", j.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
k.foo()
|
||||||
|
if !bytes.Contains(buff.Bytes(), []byte(os.Getenv("HOSTNAME"))) {
|
||||||
|
t.Errorf("%s", buff.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func captureLog() (*bytes.Buffer, func()) {
|
||||||
|
was := logb.Writer()
|
||||||
|
wase := os.Stderr
|
||||||
|
f, _ := ioutil.TempFile(os.TempDir(), "test.newBashJobAndRun")
|
||||||
|
os.Stderr = f
|
||||||
|
b := bytes.NewBuffer(nil)
|
||||||
|
logb.SetWriter(b)
|
||||||
|
return b, func() {
|
||||||
|
os.Remove(f.Name())
|
||||||
|
logb.SetWriter(was)
|
||||||
|
os.Stderr = wase
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import cron "github.com/robfig/cron/v3"
|
||||||
|
|
||||||
|
func getParser() cron.Parser {
|
||||||
|
return cron.NewParser(
|
||||||
|
cron.Minute |
|
||||||
|
cron.Hour |
|
||||||
|
cron.Dom |
|
||||||
|
cron.Month |
|
||||||
|
cron.Dow |
|
||||||
|
cron.Descriptor,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func validCron(s string) bool {
|
||||||
|
_, err := getParser().Parse(s)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestValidCronFail(t *testing.T) {
|
||||||
|
cases := []string{
|
||||||
|
"a 1 1 1 1",
|
||||||
|
"1 1 1 1",
|
||||||
|
"1 1 1 1 1 1",
|
||||||
|
"@minutely",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
if validCron(c) {
|
||||||
|
t.Error("should fail:", c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidCronPass(t *testing.T) {
|
||||||
|
cases := []string{
|
||||||
|
"1 1 1 1 1",
|
||||||
|
"* * 1 1 1",
|
||||||
|
"@hourly",
|
||||||
|
"@daily",
|
||||||
|
"@yearly",
|
||||||
|
"@weekly",
|
||||||
|
"* */5 1 1 1",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range cases {
|
||||||
|
if !validCron(c) {
|
||||||
|
t.Error("should pass:", c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,7 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
type Runner int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Bash Runner = iota + 1
|
||||||
|
)
|
||||||
|
|
@ -0,0 +1,121 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"local/firestormy/config"
|
||||||
|
"local/firestormy/logger"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
cron "github.com/robfig/cron/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Scheduler struct {
|
||||||
|
cron *cron.Cron
|
||||||
|
running map[string]cron.EntryID
|
||||||
|
}
|
||||||
|
|
||||||
|
func New() *Scheduler {
|
||||||
|
l := logger.New()
|
||||||
|
c := cron.New(
|
||||||
|
cron.WithLocation(time.Local),
|
||||||
|
cron.WithLogger(l),
|
||||||
|
cron.WithChain(
|
||||||
|
cron.SkipIfStillRunning(l),
|
||||||
|
cron.Recover(l),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return &Scheduler{
|
||||||
|
cron: c,
|
||||||
|
running: make(map[string]cron.EntryID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Start() error {
|
||||||
|
jobs, err := s.List()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for i := range jobs {
|
||||||
|
if err := s.Add(jobs[i]); err != nil && err != ErrDuplicateJob {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.cron.Start()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Stop() error {
|
||||||
|
ctx := s.cron.Stop()
|
||||||
|
<-ctx.Done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) List() ([]*Job, error) {
|
||||||
|
entries, err := config.Store.List(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
errors := []error{}
|
||||||
|
jobs := []*Job{}
|
||||||
|
for _, k := range entries {
|
||||||
|
j, err := s.loadJobFromStore(k)
|
||||||
|
if err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
} else {
|
||||||
|
jobs = append(jobs, j)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = nil
|
||||||
|
if len(errors) > 0 {
|
||||||
|
err = fmt.Errorf("errors listing jobs: %v", errors)
|
||||||
|
}
|
||||||
|
return jobs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) loadJobFromStore(k string) (*Job, error) {
|
||||||
|
b, err := config.Store.Get(k)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
j := &Job{}
|
||||||
|
err = j.Decode(b)
|
||||||
|
return j, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Add(j *Job) error {
|
||||||
|
if _, ok := s.getEntry(j); ok {
|
||||||
|
return ErrDuplicateJob
|
||||||
|
}
|
||||||
|
b, err := j.Encode()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := config.Store.Set(j.Name, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
entryID, err := s.cron.AddJob(j.Schedule, j)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.running[j.Name] = entryID
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) Remove(j *Job) error {
|
||||||
|
entryID, ok := s.getEntry(j)
|
||||||
|
if !ok {
|
||||||
|
return ErrJobNotFound
|
||||||
|
}
|
||||||
|
was := len(s.cron.Entries())
|
||||||
|
s.cron.Remove(entryID)
|
||||||
|
is := len(s.cron.Entries())
|
||||||
|
if was == is {
|
||||||
|
return ErrJobNotFound
|
||||||
|
}
|
||||||
|
return config.Store.Set(j.Name, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) getEntry(j *Job) (cron.EntryID, bool) {
|
||||||
|
entryID, ok := s.running[j.Name]
|
||||||
|
return entryID, ok
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,61 @@
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"local/firestormy/config"
|
||||||
|
"local/storage"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSchedulerAddRemove(t *testing.T) {
|
||||||
|
config.Store, _ = storage.New(storage.MAP)
|
||||||
|
s := New()
|
||||||
|
j, err := NewJob(Bash, "@hourly", "hostname")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := s.Add(j); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if list, err := s.List(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(list) != 1 {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := s.Remove(j); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if list, err := s.List(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
} else if len(list) != 0 {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSchedulerStartStop(t *testing.T) {
|
||||||
|
b, clean := captureLog()
|
||||||
|
defer clean()
|
||||||
|
config.Store, _ = storage.New(storage.MAP)
|
||||||
|
|
||||||
|
s := New()
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
j, err := NewJob(Bash, "* * * * *", "hostname")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := s.Add(j); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.Start(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := s.Stop(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if n := bytes.Count(b.Bytes(), []byte("schedule")); n != 5 {
|
||||||
|
t.Errorf("%v: %s", n, b.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
{
|
||||||
|
"comment": "",
|
||||||
|
"ignore": "test",
|
||||||
|
"package": [],
|
||||||
|
"rootPath": "local/firestormy"
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue