You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

349 lines
9.4 KiB

  1. // Copyright 2017 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package models
  5. import (
  6. "fmt"
  7. "strconv"
  8. "strings"
  9. "code.gitea.io/git"
  10. "code.gitea.io/gitea/modules/base"
  11. "code.gitea.io/gitea/modules/indexer"
  12. "code.gitea.io/gitea/modules/log"
  13. "code.gitea.io/gitea/modules/setting"
  14. "github.com/ethantkoenig/rupture"
  15. )
  16. // RepoIndexerStatus status of a repo's entry in the repo indexer
  17. // For now, implicitly refers to default branch
  18. type RepoIndexerStatus struct {
  19. ID int64 `xorm:"pk autoincr"`
  20. RepoID int64 `xorm:"INDEX"`
  21. CommitSha string `xorm:"VARCHAR(40)"`
  22. }
  23. func (repo *Repository) getIndexerStatus() error {
  24. if repo.IndexerStatus != nil {
  25. return nil
  26. }
  27. status := &RepoIndexerStatus{RepoID: repo.ID}
  28. has, err := x.Get(status)
  29. if err != nil {
  30. return err
  31. } else if !has {
  32. status.CommitSha = ""
  33. }
  34. repo.IndexerStatus = status
  35. return nil
  36. }
  37. func (repo *Repository) updateIndexerStatus(sha string) error {
  38. if err := repo.getIndexerStatus(); err != nil {
  39. return err
  40. }
  41. if len(repo.IndexerStatus.CommitSha) == 0 {
  42. repo.IndexerStatus.CommitSha = sha
  43. _, err := x.Insert(repo.IndexerStatus)
  44. return err
  45. }
  46. repo.IndexerStatus.CommitSha = sha
  47. _, err := x.ID(repo.IndexerStatus.ID).Cols("commit_sha").
  48. Update(repo.IndexerStatus)
  49. return err
  50. }
  51. type repoIndexerOperation struct {
  52. repo *Repository
  53. deleted bool
  54. }
  55. var repoIndexerOperationQueue chan repoIndexerOperation
  56. // InitRepoIndexer initialize the repo indexer
  57. func InitRepoIndexer() {
  58. if !setting.Indexer.RepoIndexerEnabled {
  59. return
  60. }
  61. repoIndexerOperationQueue = make(chan repoIndexerOperation, setting.Indexer.UpdateQueueLength)
  62. indexer.InitRepoIndexer(populateRepoIndexerAsynchronously)
  63. go processRepoIndexerOperationQueue()
  64. }
  65. // populateRepoIndexerAsynchronously asynchronously populates the repo indexer
  66. // with pre-existing data. This should only be run when the indexer is created
  67. // for the first time.
  68. func populateRepoIndexerAsynchronously() error {
  69. exist, err := x.Table("repository").Exist()
  70. if err != nil {
  71. return err
  72. } else if !exist {
  73. return nil
  74. }
  75. // if there is any existing repo indexer metadata in the DB, delete it
  76. // since we are starting afresh. Also, xorm requires deletes to have a
  77. // condition, and we want to delete everything, thus 1=1.
  78. if _, err := x.Where("1=1").Delete(new(RepoIndexerStatus)); err != nil {
  79. return err
  80. }
  81. var maxRepoID int64
  82. if _, err = x.Select("MAX(id)").Table("repository").Get(&maxRepoID); err != nil {
  83. return err
  84. }
  85. go populateRepoIndexer(maxRepoID)
  86. return nil
  87. }
  88. // populateRepoIndexer populate the repo indexer with pre-existing data. This
  89. // should only be run when the indexer is created for the first time.
  90. func populateRepoIndexer(maxRepoID int64) {
  91. log.Info("Populating the repo indexer with existing repositories")
  92. // start with the maximum existing repo ID and work backwards, so that we
  93. // don't include repos that are created after gitea starts; such repos will
  94. // already be added to the indexer, and we don't need to add them again.
  95. for maxRepoID > 0 {
  96. repos := make([]*Repository, 0, RepositoryListDefaultPageSize)
  97. err := x.Where("id <= ?", maxRepoID).
  98. OrderBy("id DESC").
  99. Limit(RepositoryListDefaultPageSize).
  100. Find(&repos)
  101. if err != nil {
  102. log.Error(4, "populateRepoIndexer: %v", err)
  103. return
  104. } else if len(repos) == 0 {
  105. break
  106. }
  107. for _, repo := range repos {
  108. repoIndexerOperationQueue <- repoIndexerOperation{
  109. repo: repo,
  110. deleted: false,
  111. }
  112. maxRepoID = repo.ID - 1
  113. }
  114. }
  115. log.Info("Done populating the repo indexer with existing repositories")
  116. }
  117. func updateRepoIndexer(repo *Repository) error {
  118. sha, err := getDefaultBranchSha(repo)
  119. if err != nil {
  120. return err
  121. }
  122. changes, err := getRepoChanges(repo, sha)
  123. if err != nil {
  124. return err
  125. } else if changes == nil {
  126. return nil
  127. }
  128. batch := indexer.RepoIndexerBatch()
  129. for _, update := range changes.Updates {
  130. if err := addUpdate(update, repo, batch); err != nil {
  131. return err
  132. }
  133. }
  134. for _, filename := range changes.RemovedFilenames {
  135. if err := addDelete(filename, repo, batch); err != nil {
  136. return err
  137. }
  138. }
  139. if err = batch.Flush(); err != nil {
  140. return err
  141. }
  142. return repo.updateIndexerStatus(sha)
  143. }
  144. // repoChanges changes (file additions/updates/removals) to a repo
  145. type repoChanges struct {
  146. Updates []fileUpdate
  147. RemovedFilenames []string
  148. }
  149. type fileUpdate struct {
  150. Filename string
  151. BlobSha string
  152. }
  153. func getDefaultBranchSha(repo *Repository) (string, error) {
  154. stdout, err := git.NewCommand("show-ref", "-s", repo.DefaultBranch).RunInDir(repo.RepoPath())
  155. if err != nil {
  156. return "", err
  157. }
  158. return strings.TrimSpace(stdout), nil
  159. }
  160. // getRepoChanges returns changes to repo since last indexer update
  161. func getRepoChanges(repo *Repository, revision string) (*repoChanges, error) {
  162. if err := repo.getIndexerStatus(); err != nil {
  163. return nil, err
  164. }
  165. if len(repo.IndexerStatus.CommitSha) == 0 {
  166. return genesisChanges(repo, revision)
  167. }
  168. return nonGenesisChanges(repo, revision)
  169. }
  170. func addUpdate(update fileUpdate, repo *Repository, batch rupture.FlushingBatch) error {
  171. stdout, err := git.NewCommand("cat-file", "-s", update.BlobSha).
  172. RunInDir(repo.RepoPath())
  173. if err != nil {
  174. return err
  175. }
  176. if size, err := strconv.Atoi(strings.TrimSpace(stdout)); err != nil {
  177. return fmt.Errorf("Misformatted git cat-file output: %v", err)
  178. } else if int64(size) > setting.Indexer.MaxIndexerFileSize {
  179. return nil
  180. }
  181. fileContents, err := git.NewCommand("cat-file", "blob", update.BlobSha).
  182. RunInDirBytes(repo.RepoPath())
  183. if err != nil {
  184. return err
  185. } else if !base.IsTextFile(fileContents) {
  186. return nil
  187. }
  188. indexerUpdate := indexer.RepoIndexerUpdate{
  189. Filepath: update.Filename,
  190. Op: indexer.RepoIndexerOpUpdate,
  191. Data: &indexer.RepoIndexerData{
  192. RepoID: repo.ID,
  193. Content: string(fileContents),
  194. },
  195. }
  196. return indexerUpdate.AddToFlushingBatch(batch)
  197. }
  198. func addDelete(filename string, repo *Repository, batch rupture.FlushingBatch) error {
  199. indexerUpdate := indexer.RepoIndexerUpdate{
  200. Filepath: filename,
  201. Op: indexer.RepoIndexerOpDelete,
  202. Data: &indexer.RepoIndexerData{
  203. RepoID: repo.ID,
  204. },
  205. }
  206. return indexerUpdate.AddToFlushingBatch(batch)
  207. }
  208. // parseGitLsTreeOutput parses the output of a `git ls-tree -r --full-name` command
  209. func parseGitLsTreeOutput(stdout []byte) ([]fileUpdate, error) {
  210. entries, err := git.ParseTreeEntries(stdout)
  211. if err != nil {
  212. return nil, err
  213. }
  214. updates := make([]fileUpdate, len(entries))
  215. for i, entry := range entries {
  216. updates[i] = fileUpdate{
  217. Filename: entry.Name(),
  218. BlobSha: entry.ID.String(),
  219. }
  220. }
  221. return updates, nil
  222. }
  223. // genesisChanges get changes to add repo to the indexer for the first time
  224. func genesisChanges(repo *Repository, revision string) (*repoChanges, error) {
  225. var changes repoChanges
  226. stdout, err := git.NewCommand("ls-tree", "--full-tree", "-r", revision).
  227. RunInDirBytes(repo.RepoPath())
  228. if err != nil {
  229. return nil, err
  230. }
  231. changes.Updates, err = parseGitLsTreeOutput(stdout)
  232. return &changes, err
  233. }
  234. // nonGenesisChanges get changes since the previous indexer update
  235. func nonGenesisChanges(repo *Repository, revision string) (*repoChanges, error) {
  236. diffCmd := git.NewCommand("diff", "--name-status",
  237. repo.IndexerStatus.CommitSha, revision)
  238. stdout, err := diffCmd.RunInDir(repo.RepoPath())
  239. if err != nil {
  240. // previous commit sha may have been removed by a force push, so
  241. // try rebuilding from scratch
  242. log.Warn("git diff: %v", err)
  243. if err = indexer.DeleteRepoFromIndexer(repo.ID); err != nil {
  244. return nil, err
  245. }
  246. return genesisChanges(repo, revision)
  247. }
  248. var changes repoChanges
  249. updatedFilenames := make([]string, 0, 10)
  250. for _, line := range strings.Split(stdout, "\n") {
  251. line = strings.TrimSpace(line)
  252. if len(line) == 0 {
  253. continue
  254. }
  255. filename := strings.TrimSpace(line[1:])
  256. if len(filename) == 0 {
  257. continue
  258. } else if filename[0] == '"' {
  259. filename, err = strconv.Unquote(filename)
  260. if err != nil {
  261. return nil, err
  262. }
  263. }
  264. switch status := line[0]; status {
  265. case 'M', 'A':
  266. updatedFilenames = append(updatedFilenames, filename)
  267. case 'D':
  268. changes.RemovedFilenames = append(changes.RemovedFilenames, filename)
  269. default:
  270. log.Warn("Unrecognized status: %c (line=%s)", status, line)
  271. }
  272. }
  273. cmd := git.NewCommand("ls-tree", "--full-tree", revision, "--")
  274. cmd.AddArguments(updatedFilenames...)
  275. lsTreeStdout, err := cmd.RunInDirBytes(repo.RepoPath())
  276. if err != nil {
  277. return nil, err
  278. }
  279. changes.Updates, err = parseGitLsTreeOutput(lsTreeStdout)
  280. return &changes, err
  281. }
  282. func processRepoIndexerOperationQueue() {
  283. for {
  284. op := <-repoIndexerOperationQueue
  285. if op.deleted {
  286. if err := indexer.DeleteRepoFromIndexer(op.repo.ID); err != nil {
  287. log.Error(4, "DeleteRepoFromIndexer: %v", err)
  288. }
  289. } else {
  290. if err := updateRepoIndexer(op.repo); err != nil {
  291. log.Error(4, "updateRepoIndexer: %v", err)
  292. }
  293. }
  294. }
  295. }
  296. // DeleteRepoFromIndexer remove all of a repository's entries from the indexer
  297. func DeleteRepoFromIndexer(repo *Repository) {
  298. addOperationToQueue(repoIndexerOperation{repo: repo, deleted: true})
  299. }
  300. // UpdateRepoIndexer update a repository's entries in the indexer
  301. func UpdateRepoIndexer(repo *Repository) {
  302. addOperationToQueue(repoIndexerOperation{repo: repo, deleted: false})
  303. }
  304. func addOperationToQueue(op repoIndexerOperation) {
  305. if !setting.Indexer.RepoIndexerEnabled {
  306. return
  307. }
  308. select {
  309. case repoIndexerOperationQueue <- op:
  310. break
  311. default:
  312. go func() {
  313. repoIndexerOperationQueue <- op
  314. }()
  315. }
  316. }