You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

447 lines
13 KiB

  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package mirror
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "code.gitea.io/gitea/models"
  12. "code.gitea.io/gitea/modules/cache"
  13. "code.gitea.io/gitea/modules/git"
  14. "code.gitea.io/gitea/modules/graceful"
  15. "code.gitea.io/gitea/modules/log"
  16. "code.gitea.io/gitea/modules/notification"
  17. repo_module "code.gitea.io/gitea/modules/repository"
  18. "code.gitea.io/gitea/modules/setting"
  19. "code.gitea.io/gitea/modules/sync"
  20. "code.gitea.io/gitea/modules/timeutil"
  21. "code.gitea.io/gitea/modules/util"
  22. "github.com/mcuadros/go-version"
  23. "github.com/unknwon/com"
  24. )
  25. // mirrorQueue holds an UniqueQueue object of the mirror
  26. var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
  27. func readAddress(m *models.Mirror) {
  28. if len(m.Address) > 0 {
  29. return
  30. }
  31. var err error
  32. m.Address, err = remoteAddress(m.Repo.RepoPath())
  33. if err != nil {
  34. log.Error("remoteAddress: %v", err)
  35. }
  36. }
  37. func remoteAddress(repoPath string) (string, error) {
  38. var cmd *git.Command
  39. binVersion, err := git.BinVersion()
  40. if err != nil {
  41. return "", err
  42. }
  43. if version.Compare(binVersion, "2.7", ">=") {
  44. cmd = git.NewCommand("remote", "get-url", "origin")
  45. } else {
  46. cmd = git.NewCommand("config", "--get", "remote.origin.url")
  47. }
  48. result, err := cmd.RunInDir(repoPath)
  49. if err != nil {
  50. if strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
  51. return "", nil
  52. }
  53. return "", err
  54. }
  55. if len(result) > 0 {
  56. return result[:len(result)-1], nil
  57. }
  58. return "", nil
  59. }
  60. // sanitizeOutput sanitizes output of a command, replacing occurrences of the
  61. // repository's remote address with a sanitized version.
  62. func sanitizeOutput(output, repoPath string) (string, error) {
  63. remoteAddr, err := remoteAddress(repoPath)
  64. if err != nil {
  65. // if we're unable to load the remote address, then we're unable to
  66. // sanitize.
  67. return "", err
  68. }
  69. return util.SanitizeMessage(output, remoteAddr), nil
  70. }
  71. // AddressNoCredentials returns mirror address from Git repository config without credentials.
  72. func AddressNoCredentials(m *models.Mirror) string {
  73. readAddress(m)
  74. u, err := url.Parse(m.Address)
  75. if err != nil {
  76. // this shouldn't happen but just return it unsanitised
  77. return m.Address
  78. }
  79. u.User = nil
  80. return u.String()
  81. }
  82. // SaveAddress writes new address to Git repository config.
  83. func SaveAddress(m *models.Mirror, addr string) error {
  84. repoPath := m.Repo.RepoPath()
  85. // Remove old origin
  86. _, err := git.NewCommand("remote", "rm", "origin").RunInDir(repoPath)
  87. if err != nil && !strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
  88. return err
  89. }
  90. _, err = git.NewCommand("remote", "add", "origin", "--mirror=fetch", addr).RunInDir(repoPath)
  91. return err
  92. }
  93. // gitShortEmptySha Git short empty SHA
  94. const gitShortEmptySha = "0000000"
  95. // mirrorSyncResult contains information of a updated reference.
  96. // If the oldCommitID is "0000000", it means a new reference, the value of newCommitID is empty.
  97. // If the newCommitID is "0000000", it means the reference is deleted, the value of oldCommitID is empty.
  98. type mirrorSyncResult struct {
  99. refName string
  100. oldCommitID string
  101. newCommitID string
  102. }
  103. // parseRemoteUpdateOutput detects create, update and delete operations of references from upstream.
  104. func parseRemoteUpdateOutput(output string) []*mirrorSyncResult {
  105. results := make([]*mirrorSyncResult, 0, 3)
  106. lines := strings.Split(output, "\n")
  107. for i := range lines {
  108. // Make sure reference name is presented before continue
  109. idx := strings.Index(lines[i], "-> ")
  110. if idx == -1 {
  111. continue
  112. }
  113. refName := lines[i][idx+3:]
  114. switch {
  115. case strings.HasPrefix(lines[i], " * "): // New reference
  116. results = append(results, &mirrorSyncResult{
  117. refName: refName,
  118. oldCommitID: gitShortEmptySha,
  119. })
  120. case strings.HasPrefix(lines[i], " - "): // Delete reference
  121. results = append(results, &mirrorSyncResult{
  122. refName: refName,
  123. newCommitID: gitShortEmptySha,
  124. })
  125. case strings.HasPrefix(lines[i], " "): // New commits of a reference
  126. delimIdx := strings.Index(lines[i][3:], " ")
  127. if delimIdx == -1 {
  128. log.Error("SHA delimiter not found: %q", lines[i])
  129. continue
  130. }
  131. shas := strings.Split(lines[i][3:delimIdx+3], "..")
  132. if len(shas) != 2 {
  133. log.Error("Expect two SHAs but not what found: %q", lines[i])
  134. continue
  135. }
  136. results = append(results, &mirrorSyncResult{
  137. refName: refName,
  138. oldCommitID: shas[0],
  139. newCommitID: shas[1],
  140. })
  141. default:
  142. log.Warn("parseRemoteUpdateOutput: unexpected update line %q", lines[i])
  143. }
  144. }
  145. return results
  146. }
  147. // runSync returns true if sync finished without error.
  148. func runSync(m *models.Mirror) ([]*mirrorSyncResult, bool) {
  149. repoPath := m.Repo.RepoPath()
  150. wikiPath := m.Repo.WikiPath()
  151. timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second
  152. gitArgs := []string{"remote", "update"}
  153. if m.EnablePrune {
  154. gitArgs = append(gitArgs, "--prune")
  155. }
  156. stdoutBuilder := strings.Builder{}
  157. stderrBuilder := strings.Builder{}
  158. if err := git.NewCommand(gitArgs...).
  159. SetDescription(fmt.Sprintf("Mirror.runSync: %s", m.Repo.FullName())).
  160. RunInDirTimeoutPipeline(timeout, repoPath, &stdoutBuilder, &stderrBuilder); err != nil {
  161. stdout := stdoutBuilder.String()
  162. stderr := stderrBuilder.String()
  163. // sanitize the output, since it may contain the remote address, which may
  164. // contain a password
  165. stderrMessage, sanitizeErr := sanitizeOutput(stderr, repoPath)
  166. if sanitizeErr != nil {
  167. log.Error("sanitizeOutput failed on stderr: %v", sanitizeErr)
  168. log.Error("Failed to update mirror repository %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderr, err)
  169. return nil, false
  170. }
  171. stdoutMessage, err := sanitizeOutput(stdout, repoPath)
  172. if err != nil {
  173. log.Error("sanitizeOutput failed: %v", sanitizeErr)
  174. log.Error("Failed to update mirror repository %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderrMessage, err)
  175. return nil, false
  176. }
  177. log.Error("Failed to update mirror repository %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdoutMessage, stderrMessage, err)
  178. desc := fmt.Sprintf("Failed to update mirror repository '%s': %s", repoPath, stderrMessage)
  179. if err = models.CreateRepositoryNotice(desc); err != nil {
  180. log.Error("CreateRepositoryNotice: %v", err)
  181. }
  182. return nil, false
  183. }
  184. output := stderrBuilder.String()
  185. gitRepo, err := git.OpenRepository(repoPath)
  186. if err != nil {
  187. log.Error("OpenRepository: %v", err)
  188. return nil, false
  189. }
  190. if err = repo_module.SyncReleasesWithTags(m.Repo, gitRepo); err != nil {
  191. gitRepo.Close()
  192. log.Error("Failed to synchronize tags to releases for repository: %v", err)
  193. }
  194. gitRepo.Close()
  195. if err := m.Repo.UpdateSize(models.DefaultDBContext()); err != nil {
  196. log.Error("Failed to update size for mirror repository: %v", err)
  197. }
  198. if m.Repo.HasWiki() {
  199. stderrBuilder.Reset()
  200. stdoutBuilder.Reset()
  201. if err := git.NewCommand("remote", "update", "--prune").
  202. SetDescription(fmt.Sprintf("Mirror.runSync Wiki: %s ", m.Repo.FullName())).
  203. RunInDirTimeoutPipeline(timeout, wikiPath, &stdoutBuilder, &stderrBuilder); err != nil {
  204. stdout := stdoutBuilder.String()
  205. stderr := stderrBuilder.String()
  206. // sanitize the output, since it may contain the remote address, which may
  207. // contain a password
  208. stderrMessage, sanitizeErr := sanitizeOutput(stderr, repoPath)
  209. if sanitizeErr != nil {
  210. log.Error("sanitizeOutput failed on stderr: %v", sanitizeErr)
  211. log.Error("Failed to update mirror repository wiki %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderr, err)
  212. return nil, false
  213. }
  214. stdoutMessage, err := sanitizeOutput(stdout, repoPath)
  215. if err != nil {
  216. log.Error("sanitizeOutput failed: %v", sanitizeErr)
  217. log.Error("Failed to update mirror repository wiki %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderrMessage, err)
  218. return nil, false
  219. }
  220. log.Error("Failed to update mirror repository wiki %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdoutMessage, stderrMessage, err)
  221. desc := fmt.Sprintf("Failed to update mirror repository wiki '%s': %s", wikiPath, stderrMessage)
  222. if err = models.CreateRepositoryNotice(desc); err != nil {
  223. log.Error("CreateRepositoryNotice: %v", err)
  224. }
  225. return nil, false
  226. }
  227. }
  228. branches, err := repo_module.GetBranches(m.Repo)
  229. if err != nil {
  230. log.Error("GetBranches: %v", err)
  231. return nil, false
  232. }
  233. for i := range branches {
  234. cache.Remove(m.Repo.GetCommitsCountCacheKey(branches[i].Name, true))
  235. }
  236. m.UpdatedUnix = timeutil.TimeStampNow()
  237. return parseRemoteUpdateOutput(output), true
  238. }
  239. // Address returns mirror address from Git repository config without credentials.
  240. func Address(m *models.Mirror) string {
  241. readAddress(m)
  242. return util.SanitizeURLCredentials(m.Address, false)
  243. }
  244. // Username returns the mirror address username
  245. func Username(m *models.Mirror) string {
  246. readAddress(m)
  247. u, err := url.Parse(m.Address)
  248. if err != nil {
  249. // this shouldn't happen but if it does return ""
  250. return ""
  251. }
  252. return u.User.Username()
  253. }
  254. // Password returns the mirror address password
  255. func Password(m *models.Mirror) string {
  256. readAddress(m)
  257. u, err := url.Parse(m.Address)
  258. if err != nil {
  259. // this shouldn't happen but if it does return ""
  260. return ""
  261. }
  262. password, _ := u.User.Password()
  263. return password
  264. }
  265. // Update checks and updates mirror repositories.
  266. func Update(ctx context.Context) error {
  267. log.Trace("Doing: Update")
  268. if err := models.MirrorsIterate(func(idx int, bean interface{}) error {
  269. m := bean.(*models.Mirror)
  270. if m.Repo == nil {
  271. log.Error("Disconnected mirror repository found: %d", m.ID)
  272. return nil
  273. }
  274. select {
  275. case <-ctx.Done():
  276. return fmt.Errorf("Aborted")
  277. default:
  278. mirrorQueue.Add(m.RepoID)
  279. return nil
  280. }
  281. }); err != nil {
  282. log.Trace("Update: %v", err)
  283. return err
  284. }
  285. log.Trace("Finished: Update")
  286. return nil
  287. }
  288. // SyncMirrors checks and syncs mirrors.
  289. // FIXME: graceful: this should be a persistable queue
  290. func SyncMirrors(ctx context.Context) {
  291. // Start listening on new sync requests.
  292. for {
  293. select {
  294. case <-ctx.Done():
  295. mirrorQueue.Close()
  296. return
  297. case repoID := <-mirrorQueue.Queue():
  298. syncMirror(repoID)
  299. }
  300. }
  301. }
  302. func syncMirror(repoID string) {
  303. log.Trace("SyncMirrors [repo_id: %v]", repoID)
  304. defer func() {
  305. err := recover()
  306. if err == nil {
  307. return
  308. }
  309. // There was a panic whilst syncMirrors...
  310. log.Error("PANIC whilst syncMirrors[%s] Panic: %v\nStacktrace: %s", repoID, err, log.Stack(2))
  311. }()
  312. mirrorQueue.Remove(repoID)
  313. m, err := models.GetMirrorByRepoID(com.StrTo(repoID).MustInt64())
  314. if err != nil {
  315. log.Error("GetMirrorByRepoID [%s]: %v", repoID, err)
  316. return
  317. }
  318. results, ok := runSync(m)
  319. if !ok {
  320. return
  321. }
  322. m.ScheduleNextUpdate()
  323. if err = models.UpdateMirror(m); err != nil {
  324. log.Error("UpdateMirror [%s]: %v", repoID, err)
  325. return
  326. }
  327. var gitRepo *git.Repository
  328. if len(results) == 0 {
  329. log.Trace("SyncMirrors [repo_id: %d]: no commits fetched", m.RepoID)
  330. } else {
  331. gitRepo, err = git.OpenRepository(m.Repo.RepoPath())
  332. if err != nil {
  333. log.Error("OpenRepository [%d]: %v", m.RepoID, err)
  334. return
  335. }
  336. defer gitRepo.Close()
  337. }
  338. for _, result := range results {
  339. // Discard GitHub pull requests, i.e. refs/pull/*
  340. if strings.HasPrefix(result.refName, "refs/pull/") {
  341. continue
  342. }
  343. tp, _ := git.SplitRefName(result.refName)
  344. // Create reference
  345. if result.oldCommitID == gitShortEmptySha {
  346. notification.NotifySyncCreateRef(m.Repo.MustOwner(), m.Repo, tp, result.refName)
  347. continue
  348. }
  349. // Delete reference
  350. if result.newCommitID == gitShortEmptySha {
  351. notification.NotifySyncDeleteRef(m.Repo.MustOwner(), m.Repo, tp, result.refName)
  352. continue
  353. }
  354. // Push commits
  355. oldCommitID, err := git.GetFullCommitID(gitRepo.Path, result.oldCommitID)
  356. if err != nil {
  357. log.Error("GetFullCommitID [%d]: %v", m.RepoID, err)
  358. continue
  359. }
  360. newCommitID, err := git.GetFullCommitID(gitRepo.Path, result.newCommitID)
  361. if err != nil {
  362. log.Error("GetFullCommitID [%d]: %v", m.RepoID, err)
  363. continue
  364. }
  365. commits, err := gitRepo.CommitsBetweenIDs(newCommitID, oldCommitID)
  366. if err != nil {
  367. log.Error("CommitsBetweenIDs [repo_id: %d, new_commit_id: %s, old_commit_id: %s]: %v", m.RepoID, newCommitID, oldCommitID, err)
  368. continue
  369. }
  370. theCommits := repo_module.ListToPushCommits(commits)
  371. if len(theCommits.Commits) > setting.UI.FeedMaxCommitNum {
  372. theCommits.Commits = theCommits.Commits[:setting.UI.FeedMaxCommitNum]
  373. }
  374. theCommits.CompareURL = m.Repo.ComposeCompareURL(oldCommitID, newCommitID)
  375. notification.NotifySyncPushCommits(m.Repo.MustOwner(), m.Repo, result.refName, oldCommitID, newCommitID, theCommits)
  376. }
  377. // Get latest commit date and update to current repository updated time
  378. commitDate, err := git.GetLatestCommitTime(m.Repo.RepoPath())
  379. if err != nil {
  380. log.Error("GetLatestCommitDate [%d]: %v", m.RepoID, err)
  381. return
  382. }
  383. if err = models.UpdateRepositoryUpdatedTime(m.RepoID, commitDate); err != nil {
  384. log.Error("Update repository 'updated_unix' [%d]: %v", m.RepoID, err)
  385. return
  386. }
  387. }
  388. // InitSyncMirrors initializes a go routine to sync the mirrors
  389. func InitSyncMirrors() {
  390. go graceful.GetManager().RunWithShutdownContext(SyncMirrors)
  391. }
  392. // StartToMirror adds repoID to mirror queue
  393. func StartToMirror(repoID int64) {
  394. go mirrorQueue.Add(repoID)
  395. }