Skip to content
This repository has been archived by the owner on Nov 24, 2024. It is now read-only.

Use functional options to New to configure channelqueue #6

Merged
merged 5 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 100 additions & 27 deletions channelqueue.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,86 @@
package channelqueue

import "github.com/gammazero/deque"
import (
"sync"

"github.com/gammazero/deque"
)

// ChannelQueue uses a queue to buffer data between input and output channels.
type ChannelQueue[T any] struct {
input, output chan T
length chan int
capacity int
closeOnce sync.Once
}

// New creates a new ChannelQueue with the specified buffer capacity.
type Option[T any] func(*ChannelQueue[T])

// WithCapacity sets the limit on the number of unread items that channelqueue
// will hold. Unbuffered behavior is not supported (use a normal channel for
// that), and a value of zero or less configures the default of no limit.
//
// A capacity < 0 specifies unlimited capacity. Unbuffered behavior is not
// supported; use a normal channel for that. Use caution if specifying an
// unlimited capacity since storage is still limited by system resources.
func New[T any](capacity int) *ChannelQueue[T] {
if capacity == 0 {
panic("unbuffered behavior not supported")
// Example:
//
// cq := channelqueue.New(channelqueue.WithCapacity[int](64))
func WithCapacity[T any](n int) func(*ChannelQueue[T]) {
return func(c *ChannelQueue[T]) {
if n < 1 {
n = -1
}
c.capacity = n
}
if capacity < 0 {
capacity = -1
}

// WithInput uses an existing channel as the input channel, which is the
// channel used to write to the queue. This is used when buffering items that
// must be read from an existing channel. Be aware that calling Close or
// Shutdown will close this channel.
//
// Example:
//
// in := make(chan int)
// cq := channelqueue.New(channelqueue.WithInput[int](in))
func WithInput[T any](in chan T) func(*ChannelQueue[T]) {
return func(c *ChannelQueue[T]) {
if in != nil {
c.input = in
}
}
}

// WithOutput uses an existing channel as the output channel, which is the
// channel used to read from the queue. This is used when buffering items that
// must be written to an existing channel. Be aware that ChannelQueue will
// close this channel when no more items are available.
//
// Example:
//
// out := make(chan int)
// cq := channelqueue.New(channelqueue.WithOutput[int](out))
func WithOutput[T any](out chan T) func(*ChannelQueue[T]) {
return func(c *ChannelQueue[T]) {
if out != nil {
c.output = out
}
}
}

// New creates a new ChannelQueue that, by default, holds an unbounded number
// of items of the specified type.
func New[T any](options ...Option[T]) *ChannelQueue[T] {
cq := &ChannelQueue[T]{
input: make(chan T),
output: make(chan T),
length: make(chan int),
capacity: capacity,
capacity: -1,
}
for _, opt := range options {
opt(cq)
}
if cq.input == nil {
cq.input = make(chan T)
}
if cq.output == nil {
cq.output = make(chan T)
}
go cq.bufferData()
return cq
Expand All @@ -34,18 +89,25 @@ func New[T any](capacity int) *ChannelQueue[T] {
// NewRing creates a new ChannelQueue with the specified buffer capacity, and
// circular buffer behavior. When the buffer is full, writing an additional
// item discards the oldest buffered item.
func NewRing[T any](capacity int) *ChannelQueue[T] {
if capacity < 1 {
return New[T](capacity)
}

func NewRing[T any](options ...Option[T]) *ChannelQueue[T] {
cq := &ChannelQueue[T]{
input: make(chan T),
output: make(chan T),
length: make(chan int),
capacity: capacity,
capacity: -1,
}
if capacity == 1 {
for _, opt := range options {
opt(cq)
}
if cq.capacity < 1 {
// Unbounded ring is the same as an unbounded queue.
return New(WithInput[T](cq.input))
}
if cq.input == nil {
cq.input = make(chan T)
}
if cq.output == nil {
cq.output = make(chan T)
}
if cq.capacity == 1 {
go cq.oneBufferData()
} else {
go cq.ringBufferData()
Expand All @@ -68,16 +130,27 @@ func (cq *ChannelQueue[T]) Len() int {
return <-cq.length
}

// Cap returns the capacity of the channel.
// Cap returns the capacity of the channelqueue. Returns -1 if unbounded.
func (cq *ChannelQueue[T]) Cap() int {
return cq.capacity
}

// Close closes the input channel. Additional input will panic, output will
// continue to be readable until there is no more data, and then the output
// channel is closed.
// Close closes the input channel. This is the same as calling the builtin
// close on the input channel, except Close can be called multiple times..
// Additional input will panic, output will continue to be readable until there
// is no more data, and then the output channel is closed.
func (cq *ChannelQueue[T]) Close() {
close(cq.input)
cq.closeOnce.Do(func() {
close(cq.input)
})
}

// Shutdown calls Close then drains the channel to ensure that the internal
// goroutine finishes.
func (cq *ChannelQueue[T]) Shutdown() {
cq.Close()
for range cq.output {
}
}

// bufferData is the goroutine that transfers data from the In() chan to the
Expand Down
Loading
Loading