|
|
- package roaring
-
- import (
- "container/heap"
- "fmt"
- "runtime"
- "sync"
- )
-
- var defaultWorkerCount = runtime.NumCPU()
-
- type bitmapContainerKey struct {
- key uint16
- idx int
- bitmap *Bitmap
- }
-
- type multipleContainers struct {
- key uint16
- containers []container
- idx int
- }
-
- type keyedContainer struct {
- key uint16
- container container
- idx int
- }
-
- type bitmapContainerHeap []bitmapContainerKey
-
- func (h bitmapContainerHeap) Len() int { return len(h) }
- func (h bitmapContainerHeap) Less(i, j int) bool { return h[i].key < h[j].key }
- func (h bitmapContainerHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
-
- func (h *bitmapContainerHeap) Push(x interface{}) {
- // Push and Pop use pointer receivers because they modify the slice's length,
- // not just its contents.
- *h = append(*h, x.(bitmapContainerKey))
- }
-
- func (h *bitmapContainerHeap) Pop() interface{} {
- old := *h
- n := len(old)
- x := old[n-1]
- *h = old[0 : n-1]
- return x
- }
-
- func (h bitmapContainerHeap) Peek() bitmapContainerKey {
- return h[0]
- }
-
- func (h *bitmapContainerHeap) popIncrementing() (key uint16, container container) {
- k := h.Peek()
- key = k.key
- container = k.bitmap.highlowcontainer.containers[k.idx]
-
- newIdx := k.idx + 1
- if newIdx < k.bitmap.highlowcontainer.size() {
- k = bitmapContainerKey{
- k.bitmap.highlowcontainer.keys[newIdx],
- newIdx,
- k.bitmap,
- }
- (*h)[0] = k
- heap.Fix(h, 0)
- } else {
- heap.Pop(h)
- }
-
- return
- }
-
- func (h *bitmapContainerHeap) Next(containers []container) multipleContainers {
- if h.Len() == 0 {
- return multipleContainers{}
- }
-
- key, container := h.popIncrementing()
- containers = append(containers, container)
-
- for h.Len() > 0 && key == h.Peek().key {
- _, container = h.popIncrementing()
- containers = append(containers, container)
- }
-
- return multipleContainers{
- key,
- containers,
- -1,
- }
- }
-
- func newBitmapContainerHeap(bitmaps ...*Bitmap) bitmapContainerHeap {
- // Initialize heap
- var h bitmapContainerHeap = make([]bitmapContainerKey, 0, len(bitmaps))
- for _, bitmap := range bitmaps {
- if !bitmap.IsEmpty() {
- key := bitmapContainerKey{
- bitmap.highlowcontainer.keys[0],
- 0,
- bitmap,
- }
- h = append(h, key)
- }
- }
-
- heap.Init(&h)
-
- return h
- }
-
- func repairAfterLazy(c container) container {
- switch t := c.(type) {
- case *bitmapContainer:
- if t.cardinality == invalidCardinality {
- t.computeCardinality()
- }
-
- if t.getCardinality() <= arrayDefaultMaxSize {
- return t.toArrayContainer()
- } else if c.(*bitmapContainer).isFull() {
- return newRunContainer16Range(0, MaxUint16)
- }
- }
-
- return c
- }
-
- func toBitmapContainer(c container) container {
- switch t := c.(type) {
- case *arrayContainer:
- return t.toBitmapContainer()
- case *runContainer16:
- if !t.isFull() {
- return t.toBitmapContainer()
- }
- }
- return c
- }
-
- func appenderRoutine(bitmapChan chan<- *Bitmap, resultChan <-chan keyedContainer, expectedKeysChan <-chan int) {
- expectedKeys := -1
- appendedKeys := 0
- keys := make([]uint16, 0)
- containers := make([]container, 0)
- for appendedKeys != expectedKeys {
- select {
- case item := <-resultChan:
- if len(keys) <= item.idx {
- keys = append(keys, make([]uint16, item.idx-len(keys)+1)...)
- containers = append(containers, make([]container, item.idx-len(containers)+1)...)
- }
- keys[item.idx] = item.key
- containers[item.idx] = item.container
-
- appendedKeys++
- case msg := <-expectedKeysChan:
- expectedKeys = msg
- }
- }
- answer := &Bitmap{
- roaringArray{
- make([]uint16, 0, expectedKeys),
- make([]container, 0, expectedKeys),
- make([]bool, 0, expectedKeys),
- false,
- nil,
- },
- }
- for i := range keys {
- if containers[i] != nil { // in case a resulting container was empty, see ParAnd function
- answer.highlowcontainer.appendContainer(keys[i], containers[i], false)
- }
- }
-
- bitmapChan <- answer
- }
-
- // ParHeapOr computes the union (OR) of all provided bitmaps in parallel,
- // where the parameter "parallelism" determines how many workers are to be used
- // (if it is set to 0, a default number of workers is chosen)
- // ParHeapOr uses a heap to compute the union. For rare cases it might be faster than ParOr
- func ParHeapOr(parallelism int, bitmaps ...*Bitmap) *Bitmap {
-
- bitmapCount := len(bitmaps)
- if bitmapCount == 0 {
- return NewBitmap()
- } else if bitmapCount == 1 {
- return bitmaps[0].Clone()
- }
-
- if parallelism == 0 {
- parallelism = defaultWorkerCount
- }
-
- h := newBitmapContainerHeap(bitmaps...)
-
- bitmapChan := make(chan *Bitmap)
- inputChan := make(chan multipleContainers, 128)
- resultChan := make(chan keyedContainer, 32)
- expectedKeysChan := make(chan int)
-
- pool := sync.Pool{
- New: func() interface{} {
- return make([]container, 0, len(bitmaps))
- },
- }
-
- orFunc := func() {
- // Assumes only structs with >=2 containers are passed
- for input := range inputChan {
- c := toBitmapContainer(input.containers[0]).lazyOR(input.containers[1])
- for _, next := range input.containers[2:] {
- c = c.lazyIOR(next)
- }
- c = repairAfterLazy(c)
- kx := keyedContainer{
- input.key,
- c,
- input.idx,
- }
- resultChan <- kx
- pool.Put(input.containers[:0])
- }
- }
-
- go appenderRoutine(bitmapChan, resultChan, expectedKeysChan)
-
- for i := 0; i < parallelism; i++ {
- go orFunc()
- }
-
- idx := 0
- for h.Len() > 0 {
- ck := h.Next(pool.Get().([]container))
- if len(ck.containers) == 1 {
- resultChan <- keyedContainer{
- ck.key,
- ck.containers[0],
- idx,
- }
- pool.Put(ck.containers[:0])
- } else {
- ck.idx = idx
- inputChan <- ck
- }
- idx++
- }
- expectedKeysChan <- idx
-
- bitmap := <-bitmapChan
-
- close(inputChan)
- close(resultChan)
- close(expectedKeysChan)
-
- return bitmap
- }
-
- // ParAnd computes the intersection (AND) of all provided bitmaps in parallel,
- // where the parameter "parallelism" determines how many workers are to be used
- // (if it is set to 0, a default number of workers is chosen)
- func ParAnd(parallelism int, bitmaps ...*Bitmap) *Bitmap {
- bitmapCount := len(bitmaps)
- if bitmapCount == 0 {
- return NewBitmap()
- } else if bitmapCount == 1 {
- return bitmaps[0].Clone()
- }
-
- if parallelism == 0 {
- parallelism = defaultWorkerCount
- }
-
- h := newBitmapContainerHeap(bitmaps...)
-
- bitmapChan := make(chan *Bitmap)
- inputChan := make(chan multipleContainers, 128)
- resultChan := make(chan keyedContainer, 32)
- expectedKeysChan := make(chan int)
-
- andFunc := func() {
- // Assumes only structs with >=2 containers are passed
- for input := range inputChan {
- c := input.containers[0].and(input.containers[1])
- for _, next := range input.containers[2:] {
- if c.getCardinality() == 0 {
- break
- }
- c = c.iand(next)
- }
-
- // Send a nil explicitly if the result of the intersection is an empty container
- if c.getCardinality() == 0 {
- c = nil
- }
-
- kx := keyedContainer{
- input.key,
- c,
- input.idx,
- }
- resultChan <- kx
- }
- }
-
- go appenderRoutine(bitmapChan, resultChan, expectedKeysChan)
-
- for i := 0; i < parallelism; i++ {
- go andFunc()
- }
-
- idx := 0
- for h.Len() > 0 {
- ck := h.Next(make([]container, 0, 4))
- if len(ck.containers) == bitmapCount {
- ck.idx = idx
- inputChan <- ck
- idx++
- }
- }
- expectedKeysChan <- idx
-
- bitmap := <-bitmapChan
-
- close(inputChan)
- close(resultChan)
- close(expectedKeysChan)
-
- return bitmap
- }
-
- // ParOr computes the union (OR) of all provided bitmaps in parallel,
- // where the parameter "parallelism" determines how many workers are to be used
- // (if it is set to 0, a default number of workers is chosen)
- func ParOr(parallelism int, bitmaps ...*Bitmap) *Bitmap {
- var lKey uint16 = MaxUint16
- var hKey uint16 = 0
-
- bitmapsFiltered := bitmaps[:0]
- for _, b := range bitmaps {
- if !b.IsEmpty() {
- bitmapsFiltered = append(bitmapsFiltered, b)
- }
- }
- bitmaps = bitmapsFiltered
-
- for _, b := range bitmaps {
- lKey = minOfUint16(lKey, b.highlowcontainer.keys[0])
- hKey = maxOfUint16(hKey, b.highlowcontainer.keys[b.highlowcontainer.size()-1])
- }
-
- if lKey == MaxUint16 && hKey == 0 {
- return New()
- } else if len(bitmaps) == 1 {
- return bitmaps[0]
- }
-
- keyRange := hKey - lKey + 1
- if keyRange == 1 {
- // revert to FastOr. Since the key range is 0
- // no container-level aggregation parallelism is achievable
- return FastOr(bitmaps...)
- }
-
- if parallelism == 0 {
- parallelism = defaultWorkerCount
- }
-
- var chunkSize int
- var chunkCount int
- if parallelism*4 > int(keyRange) {
- chunkSize = 1
- chunkCount = int(keyRange)
- } else {
- chunkCount = parallelism * 4
- chunkSize = (int(keyRange) + chunkCount - 1) / chunkCount
- }
-
- if chunkCount*chunkSize < int(keyRange) {
- // it's fine to panic to indicate an implementation error
- panic(fmt.Sprintf("invariant check failed: chunkCount * chunkSize < keyRange, %d * %d < %d", chunkCount, chunkSize, keyRange))
- }
-
- chunks := make([]*roaringArray, chunkCount)
-
- chunkSpecChan := make(chan parChunkSpec, minOfInt(maxOfInt(64, 2*parallelism), int(chunkCount)))
- chunkChan := make(chan parChunk, minOfInt(32, int(chunkCount)))
-
- orFunc := func() {
- for spec := range chunkSpecChan {
- ra := lazyOrOnRange(&bitmaps[0].highlowcontainer, &bitmaps[1].highlowcontainer, spec.start, spec.end)
- for _, b := range bitmaps[2:] {
- ra = lazyIOrOnRange(ra, &b.highlowcontainer, spec.start, spec.end)
- }
-
- for i, c := range ra.containers {
- ra.containers[i] = repairAfterLazy(c)
- }
-
- chunkChan <- parChunk{ra, spec.idx}
- }
- }
-
- for i := 0; i < parallelism; i++ {
- go orFunc()
- }
-
- go func() {
- for i := 0; i < chunkCount; i++ {
- spec := parChunkSpec{
- start: uint16(int(lKey) + i*chunkSize),
- end: uint16(minOfInt(int(lKey)+(i+1)*chunkSize-1, int(hKey))),
- idx: int(i),
- }
- chunkSpecChan <- spec
- }
- }()
-
- chunksRemaining := chunkCount
- for chunk := range chunkChan {
- chunks[chunk.idx] = chunk.ra
- chunksRemaining--
- if chunksRemaining == 0 {
- break
- }
- }
- close(chunkChan)
- close(chunkSpecChan)
-
- containerCount := 0
- for _, chunk := range chunks {
- containerCount += chunk.size()
- }
-
- result := Bitmap{
- roaringArray{
- containers: make([]container, containerCount),
- keys: make([]uint16, containerCount),
- needCopyOnWrite: make([]bool, containerCount),
- },
- }
-
- resultOffset := 0
- for _, chunk := range chunks {
- copy(result.highlowcontainer.containers[resultOffset:], chunk.containers)
- copy(result.highlowcontainer.keys[resultOffset:], chunk.keys)
- copy(result.highlowcontainer.needCopyOnWrite[resultOffset:], chunk.needCopyOnWrite)
- resultOffset += chunk.size()
- }
-
- return &result
- }
-
- type parChunkSpec struct {
- start uint16
- end uint16
- idx int
- }
-
- type parChunk struct {
- ra *roaringArray
- idx int
- }
-
- func (c parChunk) size() int {
- return c.ra.size()
- }
-
- func parNaiveStartAt(ra *roaringArray, start uint16, last uint16) int {
- for idx, key := range ra.keys {
- if key >= start && key <= last {
- return idx
- } else if key > last {
- break
- }
- }
- return ra.size()
- }
-
- func lazyOrOnRange(ra1, ra2 *roaringArray, start, last uint16) *roaringArray {
- answer := newRoaringArray()
- length1 := ra1.size()
- length2 := ra2.size()
-
- idx1 := parNaiveStartAt(ra1, start, last)
- idx2 := parNaiveStartAt(ra2, start, last)
-
- var key1 uint16
- var key2 uint16
- if idx1 < length1 && idx2 < length2 {
- key1 = ra1.getKeyAtIndex(idx1)
- key2 = ra2.getKeyAtIndex(idx2)
-
- for key1 <= last && key2 <= last {
-
- if key1 < key2 {
- answer.appendCopy(*ra1, idx1)
- idx1++
- if idx1 == length1 {
- break
- }
- key1 = ra1.getKeyAtIndex(idx1)
- } else if key1 > key2 {
- answer.appendCopy(*ra2, idx2)
- idx2++
- if idx2 == length2 {
- break
- }
- key2 = ra2.getKeyAtIndex(idx2)
- } else {
- c1 := ra1.getFastContainerAtIndex(idx1, false)
-
- answer.appendContainer(key1, c1.lazyOR(ra2.getContainerAtIndex(idx2)), false)
- idx1++
- idx2++
- if idx1 == length1 || idx2 == length2 {
- break
- }
-
- key1 = ra1.getKeyAtIndex(idx1)
- key2 = ra2.getKeyAtIndex(idx2)
- }
- }
- }
-
- if idx2 < length2 {
- key2 = ra2.getKeyAtIndex(idx2)
- for key2 <= last {
- answer.appendCopy(*ra2, idx2)
- idx2++
- if idx2 == length2 {
- break
- }
- key2 = ra2.getKeyAtIndex(idx2)
- }
- }
-
- if idx1 < length1 {
- key1 = ra1.getKeyAtIndex(idx1)
- for key1 <= last {
- answer.appendCopy(*ra1, idx1)
- idx1++
- if idx1 == length1 {
- break
- }
- key1 = ra1.getKeyAtIndex(idx1)
- }
- }
- return answer
- }
-
- func lazyIOrOnRange(ra1, ra2 *roaringArray, start, last uint16) *roaringArray {
- length1 := ra1.size()
- length2 := ra2.size()
-
- idx1 := 0
- idx2 := parNaiveStartAt(ra2, start, last)
-
- var key1 uint16
- var key2 uint16
- if idx1 < length1 && idx2 < length2 {
- key1 = ra1.getKeyAtIndex(idx1)
- key2 = ra2.getKeyAtIndex(idx2)
-
- for key1 <= last && key2 <= last {
- if key1 < key2 {
- idx1++
- if idx1 >= length1 {
- break
- }
- key1 = ra1.getKeyAtIndex(idx1)
- } else if key1 > key2 {
- ra1.insertNewKeyValueAt(idx1, key2, ra2.getContainerAtIndex(idx2))
- ra1.needCopyOnWrite[idx1] = true
- idx2++
- idx1++
- length1++
- if idx2 >= length2 {
- break
- }
- key2 = ra2.getKeyAtIndex(idx2)
- } else {
- c1 := ra1.getFastContainerAtIndex(idx1, true)
-
- ra1.containers[idx1] = c1.lazyIOR(ra2.getContainerAtIndex(idx2))
- ra1.needCopyOnWrite[idx1] = false
- idx1++
- idx2++
- if idx1 >= length1 || idx2 >= length2 {
- break
- }
-
- key1 = ra1.getKeyAtIndex(idx1)
- key2 = ra2.getKeyAtIndex(idx2)
- }
- }
- }
- if idx2 < length2 {
- key2 = ra2.getKeyAtIndex(idx2)
- for key2 <= last {
- ra1.appendCopy(*ra2, idx2)
- idx2++
- if idx2 >= length2 {
- break
- }
- key2 = ra2.getKeyAtIndex(idx2)
- }
- }
- return ra1
- }
|