func NewQueue(capaciity uint32) *EsQueue {
q.capaciity = minQuantity(capaciity)
q.capMod = q.capaciity - 1
q.cache = make([]esCache, q.capaciity)
cache.getNo = q.capaciity
cache.putNo = q.capaciity
func (q *EsQueue) String() string {
getPos := atomic.LoadUint32(&q.getPos)
putPos := atomic.LoadUint32(&q.putPos)
return fmt.Sprintf("Queue{capaciity: %v, capMod: %v, putPos: %v, getPos: %v}",
q.capaciity, q.capMod, putPos, getPos)
func (q *EsQueue) Capaciity() uint32 {
func (q *EsQueue) Quantity() uint32 {
var putPos, getPos uint32
getPos = atomic.LoadUint32(&q.getPos)
putPos = atomic.LoadUint32(&q.putPos)
quantity = putPos - getPos
quantity = q.capMod + (putPos - getPos)
func (q *EsQueue) Put(val interface{}) (ok bool, quantity uint32) {
var putPos, putPosNew, getPos, posCnt uint32
getPos = atomic.LoadUint32(&q.getPos)
putPos = atomic.LoadUint32(&q.putPos)
posCnt = capMod + (putPos - getPos)
if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
cache = &q.cache[putPosNew&capMod]
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if putPosNew == putNo && getNo == putNo {
atomic.AddUint32(&cache.putNo, q.capaciity)
func (q *EsQueue) Puts(values []interface{}) (puts, quantity uint32) {
var putPos, putPosNew, getPos, posCnt, putCnt uint32
getPos = atomic.LoadUint32(&q.getPos)
putPos = atomic.LoadUint32(&q.putPos)
posCnt = capMod + (putPos - getPos)
if capPuts, size := q.capaciity-posCnt, uint32(len(values)); capPuts >= size {
putPosNew = putPos + putCnt
if !atomic.CompareAndSwapUint32(&q.putPos, putPos, putPosNew) {
for posNew, v := putPos+1, uint32(0); v < putCnt; posNew, v = posNew+1, v+1 {
var cache *esCache = &q.cache[posNew&capMod]
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if posNew == putNo && getNo == putNo {
atomic.AddUint32(&cache.putNo, q.capaciity)
return putCnt, posCnt + putCnt
func (q *EsQueue) Get() (val interface{}, ok bool, quantity uint32) {
var putPos, getPos, getPosNew, posCnt uint32
putPos = atomic.LoadUint32(&q.putPos)
getPos = atomic.LoadUint32(&q.getPos)
posCnt = capMod + (putPos - getPos)
return nil, false, posCnt
if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
return nil, false, posCnt
cache = &q.cache[getPosNew&capMod]
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if getPosNew == getNo && getNo == putNo-q.capaciity {
atomic.AddUint32(&cache.getNo, q.capaciity)
return val, true, posCnt - 1
func (q *EsQueue) Gets(values []interface{}) (gets, quantity uint32) {
var putPos, getPos, getPosNew, posCnt, getCnt uint32
putPos = atomic.LoadUint32(&q.putPos)
getPos = atomic.LoadUint32(&q.getPos)
posCnt = capMod + (putPos - getPos)
if size := uint32(len(values)); posCnt >= size {
getPosNew = getPos + getCnt
if !atomic.CompareAndSwapUint32(&q.getPos, getPos, getPosNew) {
for posNew, v := getPos+1, uint32(0); v < getCnt; posNew, v = posNew+1, v+1 {
var cache *esCache = &q.cache[posNew&capMod]
getNo := atomic.LoadUint32(&cache.getNo)
putNo := atomic.LoadUint32(&cache.putNo)
if posNew == getNo && getNo == putNo-q.capaciity {
getNo = atomic.AddUint32(&cache.getNo, q.capaciity)
return getCnt, posCnt - getCnt
func minQuantity(v uint32) uint32 {