You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
3.7 KiB

  1. package rupture
  2. import (
  3. "fmt"
  4. "hash/fnv"
  5. "path/filepath"
  6. "strconv"
  7. "github.com/blevesearch/bleve"
  8. "github.com/blevesearch/bleve/document"
  9. "github.com/blevesearch/bleve/mapping"
  10. )
  11. // ShardedIndex an index that is built onto of multiple underlying bleve
  12. // indices (i.e. shards). Similar to bleve's index aliases, some methods may
  13. // not be supported.
  14. type ShardedIndex interface {
  15. bleve.Index
  16. shards() []bleve.Index
  17. }
  18. // a type alias for bleve.Index, so that the anonymous field of
  19. // shardedIndex does not conflict with the Index(..) method.
  20. type bleveIndex bleve.Index
  21. type shardedIndex struct {
  22. bleveIndex
  23. indices []bleve.Index
  24. }
  25. func hash(id string, n int) uint64 {
  26. fnvHash := fnv.New64()
  27. fnvHash.Write([]byte(id))
  28. return fnvHash.Sum64() % uint64(n)
  29. }
  30. func childIndexerPath(rootPath string, i int) string {
  31. return filepath.Join(rootPath, strconv.Itoa(i))
  32. }
  33. // NewShardedIndex creates a sharded index at the specified path, with the
  34. // specified mapping and number of shards.
  35. func NewShardedIndex(path string, mapping mapping.IndexMapping, numShards int) (ShardedIndex, error) {
  36. if numShards <= 0 {
  37. return nil, fmt.Errorf("Invalid number of shards: %d", numShards)
  38. }
  39. err := writeJSON(shardedIndexMetadataPath(path), &shardedIndexMetadata{NumShards: numShards})
  40. if err != nil {
  41. return nil, err
  42. }
  43. s := &shardedIndex{
  44. indices: make([]bleve.Index, numShards),
  45. }
  46. for i := 0; i < numShards; i++ {
  47. s.indices[i], err = bleve.New(childIndexerPath(path, i), mapping)
  48. if err != nil {
  49. return nil, err
  50. }
  51. }
  52. s.bleveIndex = bleve.NewIndexAlias(s.indices...)
  53. return s, nil
  54. }
  55. // OpenShardedIndex opens a sharded index at the specified path.
  56. func OpenShardedIndex(path string) (ShardedIndex, error) {
  57. var meta shardedIndexMetadata
  58. var err error
  59. if err = readJSON(shardedIndexMetadataPath(path), &meta); err != nil {
  60. return nil, err
  61. }
  62. s := &shardedIndex{
  63. indices: make([]bleve.Index, meta.NumShards),
  64. }
  65. for i := 0; i < meta.NumShards; i++ {
  66. s.indices[i], err = bleve.Open(childIndexerPath(path, i))
  67. if err != nil {
  68. return nil, err
  69. }
  70. }
  71. s.bleveIndex = bleve.NewIndexAlias(s.indices...)
  72. return s, nil
  73. }
  74. func (s *shardedIndex) Index(id string, data interface{}) error {
  75. return s.indices[hash(id, len(s.indices))].Index(id, data)
  76. }
  77. func (s *shardedIndex) Delete(id string) error {
  78. return s.indices[hash(id, len(s.indices))].Delete(id)
  79. }
  80. func (s *shardedIndex) Document(id string) (*document.Document, error) {
  81. return s.indices[hash(id, len(s.indices))].Document(id)
  82. }
  83. func (s *shardedIndex) Close() error {
  84. if err := s.bleveIndex.Close(); err != nil {
  85. return err
  86. }
  87. for _, index := range s.indices {
  88. if err := index.Close(); err != nil {
  89. return err
  90. }
  91. }
  92. return nil
  93. }
  94. func (s *shardedIndex) shards() []bleve.Index {
  95. return s.indices
  96. }
  97. type shardedIndexFlushingBatch struct {
  98. batches []*singleIndexFlushingBatch
  99. }
  100. // NewShardedFlushingBatch creates a flushing batch with the specified batch
  101. // size for the specified sharded index.
  102. func NewShardedFlushingBatch(index ShardedIndex, maxBatchSize int) FlushingBatch {
  103. indices := index.shards()
  104. b := &shardedIndexFlushingBatch{
  105. batches: make([]*singleIndexFlushingBatch, len(indices)),
  106. }
  107. for i, index := range indices {
  108. b.batches[i] = newFlushingBatch(index, maxBatchSize)
  109. }
  110. return b
  111. }
  112. func (b *shardedIndexFlushingBatch) Index(id string, data interface{}) error {
  113. return b.batches[hash(id, len(b.batches))].Index(id, data)
  114. }
  115. func (b *shardedIndexFlushingBatch) Delete(id string) error {
  116. return b.batches[hash(id, len(b.batches))].Delete(id)
  117. }
  118. func (b *shardedIndexFlushingBatch) Flush() error {
  119. for _, batch := range b.batches {
  120. if err := batch.Flush(); err != nil {
  121. return err
  122. }
  123. }
  124. return nil
  125. }