You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

437 lines
13 KiB

  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package mirror
  5. import (
  6. "context"
  7. "fmt"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "code.gitea.io/gitea/modules/graceful"
  12. "code.gitea.io/gitea/models"
  13. "code.gitea.io/gitea/modules/cache"
  14. "code.gitea.io/gitea/modules/git"
  15. "code.gitea.io/gitea/modules/log"
  16. "code.gitea.io/gitea/modules/notification"
  17. "code.gitea.io/gitea/modules/repository"
  18. "code.gitea.io/gitea/modules/setting"
  19. "code.gitea.io/gitea/modules/sync"
  20. "code.gitea.io/gitea/modules/timeutil"
  21. "code.gitea.io/gitea/modules/util"
  22. "github.com/mcuadros/go-version"
  23. "github.com/unknwon/com"
  24. )
  25. // mirrorQueue holds an UniqueQueue object of the mirror
  26. var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
  27. func readAddress(m *models.Mirror) {
  28. if len(m.Address) > 0 {
  29. return
  30. }
  31. var err error
  32. m.Address, err = remoteAddress(m.Repo.RepoPath())
  33. if err != nil {
  34. log.Error("remoteAddress: %v", err)
  35. }
  36. }
  37. func remoteAddress(repoPath string) (string, error) {
  38. var cmd *git.Command
  39. binVersion, err := git.BinVersion()
  40. if err != nil {
  41. return "", err
  42. }
  43. if version.Compare(binVersion, "2.7", ">=") {
  44. cmd = git.NewCommand("remote", "get-url", "origin")
  45. } else {
  46. cmd = git.NewCommand("config", "--get", "remote.origin.url")
  47. }
  48. result, err := cmd.RunInDir(repoPath)
  49. if err != nil {
  50. if strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
  51. return "", nil
  52. }
  53. return "", err
  54. }
  55. if len(result) > 0 {
  56. return result[:len(result)-1], nil
  57. }
  58. return "", nil
  59. }
  60. // sanitizeOutput sanitizes output of a command, replacing occurrences of the
  61. // repository's remote address with a sanitized version.
  62. func sanitizeOutput(output, repoPath string) (string, error) {
  63. remoteAddr, err := remoteAddress(repoPath)
  64. if err != nil {
  65. // if we're unable to load the remote address, then we're unable to
  66. // sanitize.
  67. return "", err
  68. }
  69. return util.SanitizeMessage(output, remoteAddr), nil
  70. }
  71. // AddressNoCredentials returns mirror address from Git repository config without credentials.
  72. func AddressNoCredentials(m *models.Mirror) string {
  73. readAddress(m)
  74. u, err := url.Parse(m.Address)
  75. if err != nil {
  76. // this shouldn't happen but just return it unsanitised
  77. return m.Address
  78. }
  79. u.User = nil
  80. return u.String()
  81. }
  82. // SaveAddress writes new address to Git repository config.
  83. func SaveAddress(m *models.Mirror, addr string) error {
  84. repoPath := m.Repo.RepoPath()
  85. // Remove old origin
  86. _, err := git.NewCommand("remote", "rm", "origin").RunInDir(repoPath)
  87. if err != nil && !strings.HasPrefix(err.Error(), "exit status 128 - fatal: No such remote ") {
  88. return err
  89. }
  90. _, err = git.NewCommand("remote", "add", "origin", "--mirror=fetch", addr).RunInDir(repoPath)
  91. return err
  92. }
  93. // gitShortEmptySha Git short empty SHA
  94. const gitShortEmptySha = "0000000"
  95. // mirrorSyncResult contains information of a updated reference.
  96. // If the oldCommitID is "0000000", it means a new reference, the value of newCommitID is empty.
  97. // If the newCommitID is "0000000", it means the reference is deleted, the value of oldCommitID is empty.
  98. type mirrorSyncResult struct {
  99. refName string
  100. oldCommitID string
  101. newCommitID string
  102. }
  103. // parseRemoteUpdateOutput detects create, update and delete operations of references from upstream.
  104. func parseRemoteUpdateOutput(output string) []*mirrorSyncResult {
  105. results := make([]*mirrorSyncResult, 0, 3)
  106. lines := strings.Split(output, "\n")
  107. for i := range lines {
  108. // Make sure reference name is presented before continue
  109. idx := strings.Index(lines[i], "-> ")
  110. if idx == -1 {
  111. continue
  112. }
  113. refName := lines[i][idx+3:]
  114. switch {
  115. case strings.HasPrefix(lines[i], " * "): // New reference
  116. results = append(results, &mirrorSyncResult{
  117. refName: refName,
  118. oldCommitID: gitShortEmptySha,
  119. })
  120. case strings.HasPrefix(lines[i], " - "): // Delete reference
  121. results = append(results, &mirrorSyncResult{
  122. refName: refName,
  123. newCommitID: gitShortEmptySha,
  124. })
  125. case strings.HasPrefix(lines[i], " "): // New commits of a reference
  126. delimIdx := strings.Index(lines[i][3:], " ")
  127. if delimIdx == -1 {
  128. log.Error("SHA delimiter not found: %q", lines[i])
  129. continue
  130. }
  131. shas := strings.Split(lines[i][3:delimIdx+3], "..")
  132. if len(shas) != 2 {
  133. log.Error("Expect two SHAs but not what found: %q", lines[i])
  134. continue
  135. }
  136. results = append(results, &mirrorSyncResult{
  137. refName: refName,
  138. oldCommitID: shas[0],
  139. newCommitID: shas[1],
  140. })
  141. default:
  142. log.Warn("parseRemoteUpdateOutput: unexpected update line %q", lines[i])
  143. }
  144. }
  145. return results
  146. }
  147. // runSync returns true if sync finished without error.
  148. func runSync(m *models.Mirror) ([]*mirrorSyncResult, bool) {
  149. repoPath := m.Repo.RepoPath()
  150. wikiPath := m.Repo.WikiPath()
  151. timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second
  152. gitArgs := []string{"remote", "update"}
  153. if m.EnablePrune {
  154. gitArgs = append(gitArgs, "--prune")
  155. }
  156. stdoutBuilder := strings.Builder{}
  157. stderrBuilder := strings.Builder{}
  158. if err := git.NewCommand(gitArgs...).
  159. SetDescription(fmt.Sprintf("Mirror.runSync: %s", m.Repo.FullName())).
  160. RunInDirTimeoutPipeline(timeout, repoPath, &stdoutBuilder, &stderrBuilder); err != nil {
  161. stdout := stdoutBuilder.String()
  162. stderr := stderrBuilder.String()
  163. // sanitize the output, since it may contain the remote address, which may
  164. // contain a password
  165. stderrMessage, sanitizeErr := sanitizeOutput(stderr, repoPath)
  166. if sanitizeErr != nil {
  167. log.Error("sanitizeOutput failed on stderr: %v", sanitizeErr)
  168. log.Error("Failed to update mirror repository %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderr, err)
  169. return nil, false
  170. }
  171. stdoutMessage, err := sanitizeOutput(stdout, repoPath)
  172. if err != nil {
  173. log.Error("sanitizeOutput failed: %v", sanitizeErr)
  174. log.Error("Failed to update mirror repository %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderrMessage, err)
  175. return nil, false
  176. }
  177. log.Error("Failed to update mirror repository %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdoutMessage, stderrMessage, err)
  178. desc := fmt.Sprintf("Failed to update mirror repository '%s': %s", repoPath, stderrMessage)
  179. if err = models.CreateRepositoryNotice(desc); err != nil {
  180. log.Error("CreateRepositoryNotice: %v", err)
  181. }
  182. return nil, false
  183. }
  184. output := stderrBuilder.String()
  185. gitRepo, err := git.OpenRepository(repoPath)
  186. if err != nil {
  187. log.Error("OpenRepository: %v", err)
  188. return nil, false
  189. }
  190. if err = repository.SyncReleasesWithTags(m.Repo, gitRepo); err != nil {
  191. gitRepo.Close()
  192. log.Error("Failed to synchronize tags to releases for repository: %v", err)
  193. }
  194. gitRepo.Close()
  195. if err := m.Repo.UpdateSize(); err != nil {
  196. log.Error("Failed to update size for mirror repository: %v", err)
  197. }
  198. if m.Repo.HasWiki() {
  199. stderrBuilder.Reset()
  200. stdoutBuilder.Reset()
  201. if err := git.NewCommand("remote", "update", "--prune").
  202. SetDescription(fmt.Sprintf("Mirror.runSync Wiki: %s ", m.Repo.FullName())).
  203. RunInDirTimeoutPipeline(timeout, wikiPath, &stdoutBuilder, &stderrBuilder); err != nil {
  204. stdout := stdoutBuilder.String()
  205. stderr := stderrBuilder.String()
  206. // sanitize the output, since it may contain the remote address, which may
  207. // contain a password
  208. stderrMessage, sanitizeErr := sanitizeOutput(stderr, repoPath)
  209. if sanitizeErr != nil {
  210. log.Error("sanitizeOutput failed on stderr: %v", sanitizeErr)
  211. log.Error("Failed to update mirror repository wiki %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderr, err)
  212. return nil, false
  213. }
  214. stdoutMessage, err := sanitizeOutput(stdout, repoPath)
  215. if err != nil {
  216. log.Error("sanitizeOutput failed: %v", sanitizeErr)
  217. log.Error("Failed to update mirror repository wiki %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdout, stderrMessage, err)
  218. return nil, false
  219. }
  220. log.Error("Failed to update mirror repository wiki %v:\nStdout: %s\nStderr: %s\nErr: %v", m.Repo, stdoutMessage, stderrMessage, err)
  221. desc := fmt.Sprintf("Failed to update mirror repository wiki '%s': %s", wikiPath, stderrMessage)
  222. if err = models.CreateRepositoryNotice(desc); err != nil {
  223. log.Error("CreateRepositoryNotice: %v", err)
  224. }
  225. return nil, false
  226. }
  227. }
  228. branches, err := m.Repo.GetBranches()
  229. if err != nil {
  230. log.Error("GetBranches: %v", err)
  231. return nil, false
  232. }
  233. for i := range branches {
  234. cache.Remove(m.Repo.GetCommitsCountCacheKey(branches[i].Name, true))
  235. }
  236. m.UpdatedUnix = timeutil.TimeStampNow()
  237. return parseRemoteUpdateOutput(output), true
  238. }
  239. // Address returns mirror address from Git repository config without credentials.
  240. func Address(m *models.Mirror) string {
  241. readAddress(m)
  242. return util.SanitizeURLCredentials(m.Address, false)
  243. }
  244. // Username returns the mirror address username
  245. func Username(m *models.Mirror) string {
  246. readAddress(m)
  247. u, err := url.Parse(m.Address)
  248. if err != nil {
  249. // this shouldn't happen but if it does return ""
  250. return ""
  251. }
  252. return u.User.Username()
  253. }
  254. // Password returns the mirror address password
  255. func Password(m *models.Mirror) string {
  256. readAddress(m)
  257. u, err := url.Parse(m.Address)
  258. if err != nil {
  259. // this shouldn't happen but if it does return ""
  260. return ""
  261. }
  262. password, _ := u.User.Password()
  263. return password
  264. }
  265. // Update checks and updates mirror repositories.
  266. func Update(ctx context.Context) {
  267. log.Trace("Doing: Update")
  268. if err := models.MirrorsIterate(func(idx int, bean interface{}) error {
  269. m := bean.(*models.Mirror)
  270. if m.Repo == nil {
  271. log.Error("Disconnected mirror repository found: %d", m.ID)
  272. return nil
  273. }
  274. select {
  275. case <-ctx.Done():
  276. return fmt.Errorf("Aborted due to shutdown")
  277. default:
  278. mirrorQueue.Add(m.RepoID)
  279. return nil
  280. }
  281. }); err != nil {
  282. log.Error("Update: %v", err)
  283. }
  284. }
  285. // SyncMirrors checks and syncs mirrors.
  286. // FIXME: graceful: this should be a persistable queue
  287. func SyncMirrors(ctx context.Context) {
  288. // Start listening on new sync requests.
  289. for {
  290. select {
  291. case <-ctx.Done():
  292. mirrorQueue.Close()
  293. return
  294. case repoID := <-mirrorQueue.Queue():
  295. syncMirror(repoID)
  296. }
  297. }
  298. }
  299. func syncMirror(repoID string) {
  300. log.Trace("SyncMirrors [repo_id: %v]", repoID)
  301. mirrorQueue.Remove(repoID)
  302. m, err := models.GetMirrorByRepoID(com.StrTo(repoID).MustInt64())
  303. if err != nil {
  304. log.Error("GetMirrorByRepoID [%s]: %v", repoID, err)
  305. return
  306. }
  307. results, ok := runSync(m)
  308. if !ok {
  309. return
  310. }
  311. m.ScheduleNextUpdate()
  312. if err = models.UpdateMirror(m); err != nil {
  313. log.Error("UpdateMirror [%s]: %v", repoID, err)
  314. return
  315. }
  316. var gitRepo *git.Repository
  317. if len(results) == 0 {
  318. log.Trace("SyncMirrors [repo_id: %d]: no commits fetched", m.RepoID)
  319. } else {
  320. gitRepo, err = git.OpenRepository(m.Repo.RepoPath())
  321. if err != nil {
  322. log.Error("OpenRepository [%d]: %v", m.RepoID, err)
  323. return
  324. }
  325. defer gitRepo.Close()
  326. }
  327. for _, result := range results {
  328. // Discard GitHub pull requests, i.e. refs/pull/*
  329. if strings.HasPrefix(result.refName, "refs/pull/") {
  330. continue
  331. }
  332. tp, _ := git.SplitRefName(result.refName)
  333. // Create reference
  334. if result.oldCommitID == gitShortEmptySha {
  335. notification.NotifySyncCreateRef(m.Repo.MustOwner(), m.Repo, tp, result.refName)
  336. continue
  337. }
  338. // Delete reference
  339. if result.newCommitID == gitShortEmptySha {
  340. notification.NotifySyncDeleteRef(m.Repo.MustOwner(), m.Repo, tp, result.refName)
  341. continue
  342. }
  343. // Push commits
  344. oldCommitID, err := git.GetFullCommitID(gitRepo.Path, result.oldCommitID)
  345. if err != nil {
  346. log.Error("GetFullCommitID [%d]: %v", m.RepoID, err)
  347. continue
  348. }
  349. newCommitID, err := git.GetFullCommitID(gitRepo.Path, result.newCommitID)
  350. if err != nil {
  351. log.Error("GetFullCommitID [%d]: %v", m.RepoID, err)
  352. continue
  353. }
  354. commits, err := gitRepo.CommitsBetweenIDs(newCommitID, oldCommitID)
  355. if err != nil {
  356. log.Error("CommitsBetweenIDs [repo_id: %d, new_commit_id: %s, old_commit_id: %s]: %v", m.RepoID, newCommitID, oldCommitID, err)
  357. continue
  358. }
  359. theCommits := repository.ListToPushCommits(commits)
  360. if len(theCommits.Commits) > setting.UI.FeedMaxCommitNum {
  361. theCommits.Commits = theCommits.Commits[:setting.UI.FeedMaxCommitNum]
  362. }
  363. theCommits.CompareURL = m.Repo.ComposeCompareURL(oldCommitID, newCommitID)
  364. notification.NotifySyncPushCommits(m.Repo.MustOwner(), m.Repo, result.refName, oldCommitID, newCommitID, theCommits)
  365. }
  366. // Get latest commit date and update to current repository updated time
  367. commitDate, err := git.GetLatestCommitTime(m.Repo.RepoPath())
  368. if err != nil {
  369. log.Error("GetLatestCommitDate [%d]: %v", m.RepoID, err)
  370. return
  371. }
  372. if err = models.UpdateRepositoryUpdatedTime(m.RepoID, commitDate); err != nil {
  373. log.Error("Update repository 'updated_unix' [%d]: %v", m.RepoID, err)
  374. return
  375. }
  376. }
  377. // InitSyncMirrors initializes a go routine to sync the mirrors
  378. func InitSyncMirrors() {
  379. go graceful.GetManager().RunWithShutdownContext(SyncMirrors)
  380. }
  381. // StartToMirror adds repoID to mirror queue
  382. func StartToMirror(repoID int64) {
  383. go mirrorQueue.Add(repoID)
  384. }