You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

605 lines
12 KiB

  1. // Copyright (c) 2014 Couchbase, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bleve
  15. import (
  16. "sort"
  17. "sync"
  18. "time"
  19. "golang.org/x/net/context"
  20. "github.com/blevesearch/bleve/document"
  21. "github.com/blevesearch/bleve/index"
  22. "github.com/blevesearch/bleve/index/store"
  23. "github.com/blevesearch/bleve/mapping"
  24. "github.com/blevesearch/bleve/search"
  25. )
  26. type indexAliasImpl struct {
  27. name string
  28. indexes []Index
  29. mutex sync.RWMutex
  30. open bool
  31. }
  32. // NewIndexAlias creates a new IndexAlias over the provided
  33. // Index objects.
  34. func NewIndexAlias(indexes ...Index) *indexAliasImpl {
  35. return &indexAliasImpl{
  36. name: "alias",
  37. indexes: indexes,
  38. open: true,
  39. }
  40. }
  41. func (i *indexAliasImpl) isAliasToSingleIndex() error {
  42. if len(i.indexes) < 1 {
  43. return ErrorAliasEmpty
  44. } else if len(i.indexes) > 1 {
  45. return ErrorAliasMulti
  46. }
  47. return nil
  48. }
  49. func (i *indexAliasImpl) Index(id string, data interface{}) error {
  50. i.mutex.RLock()
  51. defer i.mutex.RUnlock()
  52. if !i.open {
  53. return ErrorIndexClosed
  54. }
  55. err := i.isAliasToSingleIndex()
  56. if err != nil {
  57. return err
  58. }
  59. return i.indexes[0].Index(id, data)
  60. }
  61. func (i *indexAliasImpl) Delete(id string) error {
  62. i.mutex.RLock()
  63. defer i.mutex.RUnlock()
  64. if !i.open {
  65. return ErrorIndexClosed
  66. }
  67. err := i.isAliasToSingleIndex()
  68. if err != nil {
  69. return err
  70. }
  71. return i.indexes[0].Delete(id)
  72. }
  73. func (i *indexAliasImpl) Batch(b *Batch) error {
  74. i.mutex.RLock()
  75. defer i.mutex.RUnlock()
  76. if !i.open {
  77. return ErrorIndexClosed
  78. }
  79. err := i.isAliasToSingleIndex()
  80. if err != nil {
  81. return err
  82. }
  83. return i.indexes[0].Batch(b)
  84. }
  85. func (i *indexAliasImpl) Document(id string) (*document.Document, error) {
  86. i.mutex.RLock()
  87. defer i.mutex.RUnlock()
  88. if !i.open {
  89. return nil, ErrorIndexClosed
  90. }
  91. err := i.isAliasToSingleIndex()
  92. if err != nil {
  93. return nil, err
  94. }
  95. return i.indexes[0].Document(id)
  96. }
  97. func (i *indexAliasImpl) DocCount() (uint64, error) {
  98. i.mutex.RLock()
  99. defer i.mutex.RUnlock()
  100. rv := uint64(0)
  101. if !i.open {
  102. return 0, ErrorIndexClosed
  103. }
  104. for _, index := range i.indexes {
  105. otherCount, err := index.DocCount()
  106. if err == nil {
  107. rv += otherCount
  108. }
  109. // tolerate errors to produce partial counts
  110. }
  111. return rv, nil
  112. }
  113. func (i *indexAliasImpl) Search(req *SearchRequest) (*SearchResult, error) {
  114. return i.SearchInContext(context.Background(), req)
  115. }
  116. func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest) (*SearchResult, error) {
  117. i.mutex.RLock()
  118. defer i.mutex.RUnlock()
  119. if !i.open {
  120. return nil, ErrorIndexClosed
  121. }
  122. if len(i.indexes) < 1 {
  123. return nil, ErrorAliasEmpty
  124. }
  125. // short circuit the simple case
  126. if len(i.indexes) == 1 {
  127. return i.indexes[0].SearchInContext(ctx, req)
  128. }
  129. return MultiSearch(ctx, req, i.indexes...)
  130. }
  131. func (i *indexAliasImpl) Fields() ([]string, error) {
  132. i.mutex.RLock()
  133. defer i.mutex.RUnlock()
  134. if !i.open {
  135. return nil, ErrorIndexClosed
  136. }
  137. err := i.isAliasToSingleIndex()
  138. if err != nil {
  139. return nil, err
  140. }
  141. return i.indexes[0].Fields()
  142. }
  143. func (i *indexAliasImpl) FieldDict(field string) (index.FieldDict, error) {
  144. i.mutex.RLock()
  145. if !i.open {
  146. i.mutex.RUnlock()
  147. return nil, ErrorIndexClosed
  148. }
  149. err := i.isAliasToSingleIndex()
  150. if err != nil {
  151. i.mutex.RUnlock()
  152. return nil, err
  153. }
  154. fieldDict, err := i.indexes[0].FieldDict(field)
  155. if err != nil {
  156. i.mutex.RUnlock()
  157. return nil, err
  158. }
  159. return &indexAliasImplFieldDict{
  160. index: i,
  161. fieldDict: fieldDict,
  162. }, nil
  163. }
  164. func (i *indexAliasImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
  165. i.mutex.RLock()
  166. if !i.open {
  167. i.mutex.RUnlock()
  168. return nil, ErrorIndexClosed
  169. }
  170. err := i.isAliasToSingleIndex()
  171. if err != nil {
  172. i.mutex.RUnlock()
  173. return nil, err
  174. }
  175. fieldDict, err := i.indexes[0].FieldDictRange(field, startTerm, endTerm)
  176. if err != nil {
  177. i.mutex.RUnlock()
  178. return nil, err
  179. }
  180. return &indexAliasImplFieldDict{
  181. index: i,
  182. fieldDict: fieldDict,
  183. }, nil
  184. }
  185. func (i *indexAliasImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
  186. i.mutex.RLock()
  187. if !i.open {
  188. i.mutex.RUnlock()
  189. return nil, ErrorIndexClosed
  190. }
  191. err := i.isAliasToSingleIndex()
  192. if err != nil {
  193. i.mutex.RUnlock()
  194. return nil, err
  195. }
  196. fieldDict, err := i.indexes[0].FieldDictPrefix(field, termPrefix)
  197. if err != nil {
  198. i.mutex.RUnlock()
  199. return nil, err
  200. }
  201. return &indexAliasImplFieldDict{
  202. index: i,
  203. fieldDict: fieldDict,
  204. }, nil
  205. }
  206. func (i *indexAliasImpl) Close() error {
  207. i.mutex.Lock()
  208. defer i.mutex.Unlock()
  209. i.open = false
  210. return nil
  211. }
  212. func (i *indexAliasImpl) Mapping() mapping.IndexMapping {
  213. i.mutex.RLock()
  214. defer i.mutex.RUnlock()
  215. if !i.open {
  216. return nil
  217. }
  218. err := i.isAliasToSingleIndex()
  219. if err != nil {
  220. return nil
  221. }
  222. return i.indexes[0].Mapping()
  223. }
  224. func (i *indexAliasImpl) Stats() *IndexStat {
  225. i.mutex.RLock()
  226. defer i.mutex.RUnlock()
  227. if !i.open {
  228. return nil
  229. }
  230. err := i.isAliasToSingleIndex()
  231. if err != nil {
  232. return nil
  233. }
  234. return i.indexes[0].Stats()
  235. }
  236. func (i *indexAliasImpl) StatsMap() map[string]interface{} {
  237. i.mutex.RLock()
  238. defer i.mutex.RUnlock()
  239. if !i.open {
  240. return nil
  241. }
  242. err := i.isAliasToSingleIndex()
  243. if err != nil {
  244. return nil
  245. }
  246. return i.indexes[0].StatsMap()
  247. }
  248. func (i *indexAliasImpl) GetInternal(key []byte) ([]byte, error) {
  249. i.mutex.RLock()
  250. defer i.mutex.RUnlock()
  251. if !i.open {
  252. return nil, ErrorIndexClosed
  253. }
  254. err := i.isAliasToSingleIndex()
  255. if err != nil {
  256. return nil, err
  257. }
  258. return i.indexes[0].GetInternal(key)
  259. }
  260. func (i *indexAliasImpl) SetInternal(key, val []byte) error {
  261. i.mutex.RLock()
  262. defer i.mutex.RUnlock()
  263. if !i.open {
  264. return ErrorIndexClosed
  265. }
  266. err := i.isAliasToSingleIndex()
  267. if err != nil {
  268. return err
  269. }
  270. return i.indexes[0].SetInternal(key, val)
  271. }
  272. func (i *indexAliasImpl) DeleteInternal(key []byte) error {
  273. i.mutex.RLock()
  274. defer i.mutex.RUnlock()
  275. if !i.open {
  276. return ErrorIndexClosed
  277. }
  278. err := i.isAliasToSingleIndex()
  279. if err != nil {
  280. return err
  281. }
  282. return i.indexes[0].DeleteInternal(key)
  283. }
  284. func (i *indexAliasImpl) Advanced() (index.Index, store.KVStore, error) {
  285. i.mutex.RLock()
  286. defer i.mutex.RUnlock()
  287. if !i.open {
  288. return nil, nil, ErrorIndexClosed
  289. }
  290. err := i.isAliasToSingleIndex()
  291. if err != nil {
  292. return nil, nil, err
  293. }
  294. return i.indexes[0].Advanced()
  295. }
  296. func (i *indexAliasImpl) Add(indexes ...Index) {
  297. i.mutex.Lock()
  298. defer i.mutex.Unlock()
  299. i.indexes = append(i.indexes, indexes...)
  300. }
  301. func (i *indexAliasImpl) removeSingle(index Index) {
  302. for pos, in := range i.indexes {
  303. if in == index {
  304. i.indexes = append(i.indexes[:pos], i.indexes[pos+1:]...)
  305. break
  306. }
  307. }
  308. }
  309. func (i *indexAliasImpl) Remove(indexes ...Index) {
  310. i.mutex.Lock()
  311. defer i.mutex.Unlock()
  312. for _, in := range indexes {
  313. i.removeSingle(in)
  314. }
  315. }
  316. func (i *indexAliasImpl) Swap(in, out []Index) {
  317. i.mutex.Lock()
  318. defer i.mutex.Unlock()
  319. // add
  320. i.indexes = append(i.indexes, in...)
  321. // delete
  322. for _, ind := range out {
  323. i.removeSingle(ind)
  324. }
  325. }
  326. // createChildSearchRequest creates a separate
  327. // request from the original
  328. // For now, avoid data race on req structure.
  329. // TODO disable highlight/field load on child
  330. // requests, and add code to do this only on
  331. // the actual final results.
  332. // Perhaps that part needs to be optional,
  333. // could be slower in remote usages.
  334. func createChildSearchRequest(req *SearchRequest) *SearchRequest {
  335. rv := SearchRequest{
  336. Query: req.Query,
  337. Size: req.Size + req.From,
  338. From: 0,
  339. Highlight: req.Highlight,
  340. Fields: req.Fields,
  341. Facets: req.Facets,
  342. Explain: req.Explain,
  343. Sort: req.Sort,
  344. }
  345. return &rv
  346. }
  347. type asyncSearchResult struct {
  348. Name string
  349. Result *SearchResult
  350. Err error
  351. }
  352. // MultiSearch executes a SearchRequest across multiple Index objects,
  353. // then merges the results. The indexes must honor any ctx deadline.
  354. func MultiSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
  355. searchStart := time.Now()
  356. asyncResults := make(chan *asyncSearchResult, len(indexes))
  357. // run search on each index in separate go routine
  358. var waitGroup sync.WaitGroup
  359. var searchChildIndex = func(in Index, childReq *SearchRequest) {
  360. rv := asyncSearchResult{Name: in.Name()}
  361. rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
  362. asyncResults <- &rv
  363. waitGroup.Done()
  364. }
  365. waitGroup.Add(len(indexes))
  366. for _, in := range indexes {
  367. go searchChildIndex(in, createChildSearchRequest(req))
  368. }
  369. // on another go routine, close after finished
  370. go func() {
  371. waitGroup.Wait()
  372. close(asyncResults)
  373. }()
  374. var sr *SearchResult
  375. indexErrors := make(map[string]error)
  376. for asr := range asyncResults {
  377. if asr.Err == nil {
  378. if sr == nil {
  379. // first result
  380. sr = asr.Result
  381. } else {
  382. // merge with previous
  383. sr.Merge(asr.Result)
  384. }
  385. } else {
  386. indexErrors[asr.Name] = asr.Err
  387. }
  388. }
  389. // merge just concatenated all the hits
  390. // now lets clean it up
  391. // handle case where no results were successful
  392. if sr == nil {
  393. sr = &SearchResult{
  394. Status: &SearchStatus{
  395. Errors: make(map[string]error),
  396. },
  397. }
  398. }
  399. // sort all hits with the requested order
  400. if len(req.Sort) > 0 {
  401. sorter := newMultiSearchHitSorter(req.Sort, sr.Hits)
  402. sort.Sort(sorter)
  403. }
  404. // now skip over the correct From
  405. if req.From > 0 && len(sr.Hits) > req.From {
  406. sr.Hits = sr.Hits[req.From:]
  407. } else if req.From > 0 {
  408. sr.Hits = search.DocumentMatchCollection{}
  409. }
  410. // now trim to the correct size
  411. if req.Size > 0 && len(sr.Hits) > req.Size {
  412. sr.Hits = sr.Hits[0:req.Size]
  413. }
  414. // fix up facets
  415. for name, fr := range req.Facets {
  416. sr.Facets.Fixup(name, fr.Size)
  417. }
  418. // fix up original request
  419. sr.Request = req
  420. searchDuration := time.Since(searchStart)
  421. sr.Took = searchDuration
  422. // fix up errors
  423. if len(indexErrors) > 0 {
  424. if sr.Status.Errors == nil {
  425. sr.Status.Errors = make(map[string]error)
  426. }
  427. for indexName, indexErr := range indexErrors {
  428. sr.Status.Errors[indexName] = indexErr
  429. sr.Status.Total++
  430. sr.Status.Failed++
  431. }
  432. }
  433. return sr, nil
  434. }
  435. func (i *indexAliasImpl) NewBatch() *Batch {
  436. i.mutex.RLock()
  437. defer i.mutex.RUnlock()
  438. if !i.open {
  439. return nil
  440. }
  441. err := i.isAliasToSingleIndex()
  442. if err != nil {
  443. return nil
  444. }
  445. return i.indexes[0].NewBatch()
  446. }
  447. func (i *indexAliasImpl) Name() string {
  448. return i.name
  449. }
  450. func (i *indexAliasImpl) SetName(name string) {
  451. i.name = name
  452. }
  453. type indexAliasImplFieldDict struct {
  454. index *indexAliasImpl
  455. fieldDict index.FieldDict
  456. }
  457. func (f *indexAliasImplFieldDict) Next() (*index.DictEntry, error) {
  458. return f.fieldDict.Next()
  459. }
  460. func (f *indexAliasImplFieldDict) Close() error {
  461. defer f.index.mutex.RUnlock()
  462. return f.fieldDict.Close()
  463. }
  464. type multiSearchHitSorter struct {
  465. hits search.DocumentMatchCollection
  466. sort search.SortOrder
  467. cachedScoring []bool
  468. cachedDesc []bool
  469. }
  470. func newMultiSearchHitSorter(sort search.SortOrder, hits search.DocumentMatchCollection) *multiSearchHitSorter {
  471. return &multiSearchHitSorter{
  472. sort: sort,
  473. hits: hits,
  474. cachedScoring: sort.CacheIsScore(),
  475. cachedDesc: sort.CacheDescending(),
  476. }
  477. }
  478. func (m *multiSearchHitSorter) Len() int { return len(m.hits) }
  479. func (m *multiSearchHitSorter) Swap(i, j int) { m.hits[i], m.hits[j] = m.hits[j], m.hits[i] }
  480. func (m *multiSearchHitSorter) Less(i, j int) bool {
  481. c := m.sort.Compare(m.cachedScoring, m.cachedDesc, m.hits[i], m.hits[j])
  482. return c < 0
  483. }