236 lines
4.8 KiB
Go
236 lines
4.8 KiB
Go
/*
|
|
Copyright 2014 Workiva, LLC
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
/*
|
|
The priority queue is almost a spitting image of the logic
|
|
used for a regular queue. In order to keep the logic fast,
|
|
this code is repeated instead of using casts to cast to interface{}
|
|
back and forth. If Go had inheritance and generics, this problem
|
|
would be easier to solve.
|
|
*/
|
|
package queue
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
)
|
|
|
|
// Item is an item that can be added to the priority queue.
|
|
type Item interface {
|
|
// Compare returns a bool that can be used to determine
|
|
// ordering in the priority queue. Assuming the queue
|
|
// is in ascending order, this should return > logic.
|
|
// Return 1 to indicate this object is greater than the
|
|
// the other logic, 0 to indicate equality, and -1 to indicate
|
|
// less than other.
|
|
Compare(other Item) int
|
|
}
|
|
|
|
type priorityItems []Item
|
|
|
|
func (items *priorityItems) get(number int) []Item {
|
|
returnItems := make([]Item, 0, number)
|
|
index := 0
|
|
for i := 0; i < number; i++ {
|
|
if i >= len(*items) {
|
|
break
|
|
}
|
|
|
|
returnItems = append(returnItems, (*items)[i])
|
|
(*items)[i] = nil
|
|
index++
|
|
}
|
|
|
|
*items = (*items)[index:]
|
|
return returnItems
|
|
}
|
|
|
|
func (items *priorityItems) insert(item Item) {
|
|
if len(*items) == 0 {
|
|
*items = append(*items, item)
|
|
return
|
|
}
|
|
|
|
equalFound := false
|
|
i := sort.Search(len(*items), func(i int) bool {
|
|
result := (*items)[i].Compare(item)
|
|
if result == 0 {
|
|
equalFound = true
|
|
}
|
|
return result >= 0
|
|
})
|
|
|
|
if equalFound {
|
|
return
|
|
}
|
|
|
|
if i == len(*items) {
|
|
*items = append(*items, item)
|
|
return
|
|
}
|
|
|
|
*items = append(*items, nil)
|
|
copy((*items)[i+1:], (*items)[i:])
|
|
(*items)[i] = item
|
|
}
|
|
|
|
// PriorityQueue is similar to queue except that it takes
|
|
// items that implement the Item interface and adds them
|
|
// to the queue in priority order.
|
|
type PriorityQueue struct {
|
|
waiters waiters
|
|
items priorityItems
|
|
lock sync.Mutex
|
|
disposeLock sync.Mutex
|
|
disposed bool
|
|
}
|
|
|
|
// Put adds items to the queue.
|
|
func (pq *PriorityQueue) Put(items ...Item) error {
|
|
if len(items) == 0 {
|
|
return nil
|
|
}
|
|
|
|
pq.lock.Lock()
|
|
if pq.disposed {
|
|
pq.lock.Unlock()
|
|
return disposedError
|
|
}
|
|
|
|
for _, item := range items {
|
|
pq.items.insert(item)
|
|
}
|
|
|
|
for {
|
|
sema := pq.waiters.get()
|
|
if sema == nil {
|
|
break
|
|
}
|
|
|
|
sema.response.Add(1)
|
|
sema.wg.Done()
|
|
sema.response.Wait()
|
|
if len(pq.items) == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
pq.lock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Get retrieves items from the queue. If the queue is empty,
|
|
// this call blocks until the next item is added to the queue. This
|
|
// will attempt to retrieve number of items.
|
|
func (pq *PriorityQueue) Get(number int) ([]Item, error) {
|
|
if number < 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
pq.lock.Lock()
|
|
|
|
if pq.disposed {
|
|
pq.lock.Unlock()
|
|
return nil, disposedError
|
|
}
|
|
|
|
var items []Item
|
|
|
|
if len(pq.items) == 0 {
|
|
sema := newSema()
|
|
pq.waiters.put(sema)
|
|
sema.wg.Add(1)
|
|
pq.lock.Unlock()
|
|
|
|
sema.wg.Wait()
|
|
pq.disposeLock.Lock()
|
|
if pq.disposed {
|
|
pq.disposeLock.Unlock()
|
|
return nil, disposedError
|
|
}
|
|
pq.disposeLock.Unlock()
|
|
|
|
items = pq.items.get(number)
|
|
sema.response.Done()
|
|
return items, nil
|
|
}
|
|
|
|
items = pq.items.get(number)
|
|
pq.lock.Unlock()
|
|
return items, nil
|
|
}
|
|
|
|
// Peek will look at the next item without removing it from the queue.
|
|
func (pq *PriorityQueue) Peek() Item {
|
|
pq.lock.Lock()
|
|
defer pq.lock.Unlock()
|
|
if len(pq.items) > 0 {
|
|
return pq.items[0]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Empty returns a bool indicating if there are any items left
|
|
// in the queue.
|
|
func (pq *PriorityQueue) Empty() bool {
|
|
pq.lock.Lock()
|
|
defer pq.lock.Unlock()
|
|
|
|
return len(pq.items) == 0
|
|
}
|
|
|
|
// Len returns a number indicating how many items are in the queue.
|
|
func (pq *PriorityQueue) Len() int {
|
|
pq.lock.Lock()
|
|
defer pq.lock.Unlock()
|
|
|
|
return len(pq.items)
|
|
}
|
|
|
|
// Disposed returns a bool indicating if this queue has been disposed.
|
|
func (pq *PriorityQueue) Disposed() bool {
|
|
pq.lock.Lock()
|
|
defer pq.lock.Unlock()
|
|
|
|
return pq.disposed
|
|
}
|
|
|
|
// Dispose will prevent any further reads/writes to this queue
|
|
// and frees available resources.
|
|
func (pq *PriorityQueue) Dispose() {
|
|
pq.lock.Lock()
|
|
defer pq.lock.Unlock()
|
|
|
|
pq.disposeLock.Lock()
|
|
defer pq.disposeLock.Unlock()
|
|
|
|
pq.disposed = true
|
|
for _, waiter := range pq.waiters {
|
|
waiter.response.Add(1)
|
|
waiter.wg.Done()
|
|
}
|
|
|
|
pq.items = nil
|
|
pq.waiters = nil
|
|
}
|
|
|
|
// NewPriorityQueue is the constructor for a priority queue.
|
|
func NewPriorityQueue(hint int) *PriorityQueue {
|
|
return &PriorityQueue{
|
|
items: make(priorityItems, 0, hint),
|
|
}
|
|
}
|