You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
4.0 KiB

  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package code
  5. import (
  6. "os"
  7. "code.gitea.io/gitea/models"
  8. "code.gitea.io/gitea/modules/graceful"
  9. "code.gitea.io/gitea/modules/log"
  10. )
  11. type repoIndexerOperation struct {
  12. repoID int64
  13. deleted bool
  14. watchers []chan<- error
  15. }
  16. var repoIndexerOperationQueue chan repoIndexerOperation
  17. func initQueue(queueLength int) {
  18. repoIndexerOperationQueue = make(chan repoIndexerOperation, queueLength)
  19. }
  20. func index(indexer Indexer, repoID int64) error {
  21. repo, err := models.GetRepositoryByID(repoID)
  22. if err != nil {
  23. return err
  24. }
  25. sha, err := getDefaultBranchSha(repo)
  26. if err != nil {
  27. return err
  28. }
  29. changes, err := getRepoChanges(repo, sha)
  30. if err != nil {
  31. return err
  32. } else if changes == nil {
  33. return nil
  34. }
  35. if err := indexer.Index(repo, sha, changes); err != nil {
  36. return err
  37. }
  38. return repo.UpdateIndexerStatus(models.RepoIndexerTypeCode, sha)
  39. }
  40. func processRepoIndexerOperationQueue(indexer Indexer) {
  41. for {
  42. select {
  43. case op := <-repoIndexerOperationQueue:
  44. var err error
  45. if op.deleted {
  46. if err = indexer.Delete(op.repoID); err != nil {
  47. log.Error("indexer.Delete: %v", err)
  48. }
  49. } else {
  50. if err = index(indexer, op.repoID); err != nil {
  51. log.Error("indexer.Index: %v", err)
  52. }
  53. }
  54. for _, watcher := range op.watchers {
  55. watcher <- err
  56. }
  57. case <-graceful.GetManager().IsShutdown():
  58. log.Info("PID: %d Repository indexer queue processing stopped", os.Getpid())
  59. return
  60. }
  61. }
  62. }
  63. // DeleteRepoFromIndexer remove all of a repository's entries from the indexer
  64. func DeleteRepoFromIndexer(repo *models.Repository, watchers ...chan<- error) {
  65. addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: true, watchers: watchers})
  66. }
  67. // UpdateRepoIndexer update a repository's entries in the indexer
  68. func UpdateRepoIndexer(repo *models.Repository, watchers ...chan<- error) {
  69. addOperationToQueue(repoIndexerOperation{repoID: repo.ID, deleted: false, watchers: watchers})
  70. }
  71. func addOperationToQueue(op repoIndexerOperation) {
  72. select {
  73. case repoIndexerOperationQueue <- op:
  74. break
  75. default:
  76. go func() {
  77. repoIndexerOperationQueue <- op
  78. }()
  79. }
  80. }
  81. // populateRepoIndexer populate the repo indexer with pre-existing data. This
  82. // should only be run when the indexer is created for the first time.
  83. func populateRepoIndexer() {
  84. log.Info("Populating the repo indexer with existing repositories")
  85. isShutdown := graceful.GetManager().IsShutdown()
  86. exist, err := models.IsTableNotEmpty("repository")
  87. if err != nil {
  88. log.Fatal("System error: %v", err)
  89. } else if !exist {
  90. return
  91. }
  92. // if there is any existing repo indexer metadata in the DB, delete it
  93. // since we are starting afresh. Also, xorm requires deletes to have a
  94. // condition, and we want to delete everything, thus 1=1.
  95. if err := models.DeleteAllRecords("repo_indexer_status"); err != nil {
  96. log.Fatal("System error: %v", err)
  97. }
  98. var maxRepoID int64
  99. if maxRepoID, err = models.GetMaxID("repository"); err != nil {
  100. log.Fatal("System error: %v", err)
  101. }
  102. // start with the maximum existing repo ID and work backwards, so that we
  103. // don't include repos that are created after gitea starts; such repos will
  104. // already be added to the indexer, and we don't need to add them again.
  105. for maxRepoID > 0 {
  106. select {
  107. case <-isShutdown:
  108. log.Info("Repository Indexer population shutdown before completion")
  109. return
  110. default:
  111. }
  112. ids, err := models.GetUnindexedRepos(models.RepoIndexerTypeCode, maxRepoID, 0, 50)
  113. if err != nil {
  114. log.Error("populateRepoIndexer: %v", err)
  115. return
  116. } else if len(ids) == 0 {
  117. break
  118. }
  119. for _, id := range ids {
  120. select {
  121. case <-isShutdown:
  122. log.Info("Repository Indexer population shutdown before completion")
  123. return
  124. default:
  125. }
  126. repoIndexerOperationQueue <- repoIndexerOperation{
  127. repoID: id,
  128. deleted: false,
  129. }
  130. maxRepoID = id - 1
  131. }
  132. }
  133. log.Info("Done (re)populating the repo indexer with existing repositories")
  134. }