266 lines
5.4 KiB
Go
Executable File
266 lines
5.4 KiB
Go
Executable File
package scheduler
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"gitea.inhome.blapointe.com/local/firestormy/config"
|
|
"gitea.inhome.blapointe.com/local/firestormy/config/ns"
|
|
"gitea.inhome.blapointe.com/local/firestormy/logger"
|
|
"gitea.inhome.blapointe.com/local/logb"
|
|
|
|
cron "github.com/robfig/cron/v3"
|
|
)
|
|
|
|
var Schedule *Scheduler = New()
|
|
|
|
type Scheduler struct {
|
|
cron *cron.Cron
|
|
running map[string]cron.EntryID
|
|
}
|
|
|
|
type semaphored struct {
|
|
job cron.Job
|
|
ch chan struct{}
|
|
l logger.Logger
|
|
}
|
|
|
|
func (s semaphored) Run() {
|
|
s.l.Info("scheduler.semaphored.Run() | acquiring...")
|
|
s.ch <- struct{}{}
|
|
s.l.Info("scheduler.semaphored.Run() | acquired")
|
|
|
|
s.job.Run()
|
|
|
|
s.l.Info("scheduler.semaphored.Run() | releasing...")
|
|
<-s.ch
|
|
s.l.Info("scheduler.semaphored.Run() | released")
|
|
}
|
|
|
|
func New() *Scheduler {
|
|
l := logger.New()
|
|
semaphore := make(chan struct{}, 2)
|
|
c := cron.New(
|
|
cron.WithLocation(time.Local),
|
|
cron.WithLogger(l),
|
|
cron.WithChain(
|
|
cron.SkipIfStillRunning(l),
|
|
cron.Recover(l),
|
|
func(j cron.Job) cron.Job {
|
|
return semaphored{
|
|
l: l,
|
|
job: j,
|
|
ch: semaphore,
|
|
}
|
|
},
|
|
),
|
|
cron.WithParser(getParser()),
|
|
)
|
|
return &Scheduler{
|
|
cron: c,
|
|
running: make(map[string]cron.EntryID),
|
|
}
|
|
}
|
|
|
|
func NewFromFile(config string) (*Scheduler, error) {
|
|
f, err := ioutil.ReadFile(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s := New()
|
|
for _, line := range bytes.Split(f, []byte("\n")) {
|
|
line = cleanLine(line)
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
schedule, command, title := splitScheduleCommandTitle(line)
|
|
if len(schedule) == 0 || len(command) == 0 {
|
|
continue
|
|
}
|
|
job, err := newBashJob(schedule, command, title)
|
|
if err != nil {
|
|
logger.New().Error(err, "cannot fully parse file: new job error", config, ", sched", schedule, ", comm", command)
|
|
continue
|
|
}
|
|
if err := s.Add(job); err != nil {
|
|
logger.New().Error(err, "cannot fully parse file: add job error", config)
|
|
continue
|
|
}
|
|
}
|
|
jobs, _ := s.List()
|
|
if len(jobs) == 0 {
|
|
return nil, errors.New("no jobs parsed from file " + config)
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func cleanLine(b []byte) []byte {
|
|
b = bytes.Trim(b, "\t \n")
|
|
if len(b) == 0 {
|
|
return nil
|
|
}
|
|
if b[0] == '#' {
|
|
return nil
|
|
}
|
|
return b
|
|
}
|
|
|
|
func splitScheduleCommandTitle(b []byte) (string, string, string) {
|
|
re := regexp.MustCompile(`^((\d+|\*\/\d+|(\d,)*\d+|\*) [ ]*){5,6}`)
|
|
schedule := string(re.Find(b))
|
|
if len(schedule) == 0 {
|
|
return "", "", ""
|
|
}
|
|
commandTitle := strings.TrimPrefix(string(b), schedule)
|
|
schedule = strings.TrimSpace(schedule)
|
|
commandTitle = strings.TrimSpace(commandTitle)
|
|
title := ""
|
|
if i := strings.LastIndex(commandTitle, "#"); i >= 0 {
|
|
title = commandTitle[i+1:]
|
|
}
|
|
title = strings.TrimSpace(title)
|
|
return schedule, commandTitle, title
|
|
}
|
|
|
|
func (s *Scheduler) Start() error {
|
|
jobs, err := s.List()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range jobs {
|
|
if err := s.Add(jobs[i]); err != nil && err != ErrDuplicateJob {
|
|
return err
|
|
}
|
|
}
|
|
s.cron.Start()
|
|
entries := s.cron.Entries()
|
|
logb.Debugf("[sched] start: %+v", entries)
|
|
logger.New().Info("started", len(entries), "jobs")
|
|
for _, entry := range entries {
|
|
logger.New().Info(fmt.Sprintf("%+v", entry))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Scheduler) Stop() error {
|
|
ctx := s.cron.Stop()
|
|
<-ctx.Done()
|
|
return nil
|
|
}
|
|
|
|
func (s *Scheduler) List() ([]*Job, error) {
|
|
entries, err := config.Store.List(ns.Jobs)
|
|
logb.Debugf("[sched] list: %+v", entries)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
errors := []error{}
|
|
jobs := []*Job{}
|
|
for _, k := range entries {
|
|
j, err := s.loadJobFromStore(k)
|
|
if err != nil {
|
|
errors = append(errors, err)
|
|
} else {
|
|
jobs = append(jobs, j)
|
|
}
|
|
}
|
|
err = nil
|
|
if len(errors) > 0 {
|
|
err = fmt.Errorf("errors listing jobs: %v", errors)
|
|
}
|
|
return jobs, err
|
|
}
|
|
|
|
func (s *Scheduler) loadJobFromStore(k string) (*Job, error) {
|
|
b, err := config.Store.Get(k, ns.Jobs...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
j := &Job{}
|
|
err = j.Decode(b)
|
|
return j, err
|
|
}
|
|
|
|
func (s *Scheduler) Update(j *Job) error {
|
|
logb.Debugf("[sched] update: %+v", j)
|
|
entryID, ok := s.getEntry(j)
|
|
if !ok {
|
|
return errors.New("job not found in storage")
|
|
}
|
|
|
|
i, err := s.loadJobFromStore(j.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
j.LastStatus = i.LastStatus
|
|
j.LastOutput = i.LastOutput
|
|
j.LastRuntime = i.LastRuntime
|
|
j.LastRun = i.LastRun
|
|
|
|
b, err := j.Encode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := config.Store.Set(j.Name, b, ns.Jobs...); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.cron.Remove(entryID)
|
|
|
|
entryID, err = s.cron.AddJob(j.Schedule, j)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.running[j.Name] = entryID
|
|
return nil
|
|
}
|
|
|
|
func (s *Scheduler) Add(j *Job) error {
|
|
logb.Debugf("[sched] add: %+v", j)
|
|
if _, ok := s.getEntry(j); ok {
|
|
return ErrDuplicateJob
|
|
}
|
|
b, err := j.Encode()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
entryID, err := s.cron.AddJob(j.Schedule, j)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := config.Store.Set(j.Name, b, ns.Jobs...); err != nil {
|
|
return err
|
|
}
|
|
s.running[j.Name] = entryID
|
|
return nil
|
|
}
|
|
|
|
func (s *Scheduler) Remove(j *Job) error {
|
|
logb.Debugf("[sched] rm: %+v", j)
|
|
entryID, ok := s.getEntry(j)
|
|
if !ok {
|
|
return ErrJobNotFound
|
|
}
|
|
was := len(s.cron.Entries())
|
|
s.cron.Remove(entryID)
|
|
is := len(s.cron.Entries())
|
|
if was == is {
|
|
return ErrJobNotFound
|
|
}
|
|
if err := config.Store.Set(j.Name, nil, ns.Jobs...); err != nil {
|
|
return err
|
|
}
|
|
return config.Store.Set(j.Name, nil, ns.JobsRaw...)
|
|
}
|
|
|
|
func (s *Scheduler) getEntry(j *Job) (cron.EntryID, bool) {
|
|
entryID, ok := s.running[j.Name]
|
|
return entryID, ok
|
|
}
|