You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

650 lines
16 KiB

  1. // Copyright 2015 PingCAP, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package meta
  14. import (
  15. "encoding/binary"
  16. "encoding/json"
  17. "fmt"
  18. "strconv"
  19. "strings"
  20. "sync"
  21. "time"
  22. "github.com/juju/errors"
  23. "github.com/pingcap/tidb/kv"
  24. "github.com/pingcap/tidb/model"
  25. "github.com/pingcap/tidb/structure"
  26. )
  27. var (
  28. globalIDMutex sync.Mutex
  29. )
  30. // Meta structure:
  31. // NextGlobalID -> int64
  32. // SchemaVersion -> int64
  33. // DBs -> {
  34. // DB:1 -> db meta data []byte
  35. // DB:2 -> db meta data []byte
  36. // }
  37. // DB:1 -> {
  38. // Table:1 -> table meta data []byte
  39. // Table:2 -> table meta data []byte
  40. // TID:1 -> int64
  41. // TID:2 -> int64
  42. // }
  43. //
  44. var (
  45. mNextGlobalIDKey = []byte("NextGlobalID")
  46. mSchemaVersionKey = []byte("SchemaVersionKey")
  47. mDBs = []byte("DBs")
  48. mDBPrefix = "DB"
  49. mTablePrefix = "Table"
  50. mTableIDPrefix = "TID"
  51. mBootstrapKey = []byte("BootstrapKey")
  52. )
  53. var (
  54. // ErrDBExists is the error for db exists.
  55. ErrDBExists = errors.New("database already exists")
  56. // ErrDBNotExists is the error for db not exists.
  57. ErrDBNotExists = errors.New("database doesn't exist")
  58. // ErrTableExists is the error for table exists.
  59. ErrTableExists = errors.New("table already exists")
  60. // ErrTableNotExists is the error for table not exists.
  61. ErrTableNotExists = errors.New("table doesn't exist")
  62. )
  63. // Meta is for handling meta information in a transaction.
  64. type Meta struct {
  65. txn *structure.TxStructure
  66. }
  67. // NewMeta creates a Meta in transaction txn.
  68. func NewMeta(txn kv.Transaction) *Meta {
  69. t := structure.NewStructure(txn, []byte{'m'})
  70. return &Meta{txn: t}
  71. }
  72. // GenGlobalID generates next id globally.
  73. func (m *Meta) GenGlobalID() (int64, error) {
  74. globalIDMutex.Lock()
  75. defer globalIDMutex.Unlock()
  76. return m.txn.Inc(mNextGlobalIDKey, 1)
  77. }
  78. // GetGlobalID gets current global id.
  79. func (m *Meta) GetGlobalID() (int64, error) {
  80. return m.txn.GetInt64(mNextGlobalIDKey)
  81. }
  82. func (m *Meta) dbKey(dbID int64) []byte {
  83. return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID))
  84. }
  85. func (m *Meta) parseDatabaseID(key string) (int64, error) {
  86. seps := strings.Split(key, ":")
  87. if len(seps) != 2 {
  88. return 0, errors.Errorf("invalid db key %s", key)
  89. }
  90. n, err := strconv.ParseInt(seps[1], 10, 64)
  91. return n, errors.Trace(err)
  92. }
  93. func (m *Meta) autoTalbeIDKey(tableID int64) []byte {
  94. return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID))
  95. }
  96. func (m *Meta) tableKey(tableID int64) []byte {
  97. return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID))
  98. }
  99. func (m *Meta) parseTableID(key string) (int64, error) {
  100. seps := strings.Split(key, ":")
  101. if len(seps) != 2 {
  102. return 0, errors.Errorf("invalid table meta key %s", key)
  103. }
  104. n, err := strconv.ParseInt(seps[1], 10, 64)
  105. return n, errors.Trace(err)
  106. }
  107. // GenAutoTableID adds step to the auto id of the table and returns the sum.
  108. func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
  109. // Check if db exists.
  110. dbKey := m.dbKey(dbID)
  111. if err := m.checkDBExists(dbKey); err != nil {
  112. return 0, errors.Trace(err)
  113. }
  114. // Check if table exists.
  115. tableKey := m.tableKey(tableID)
  116. if err := m.checkTableExists(dbKey, tableKey); err != nil {
  117. return 0, errors.Trace(err)
  118. }
  119. return m.txn.HInc(dbKey, m.autoTalbeIDKey(tableID), step)
  120. }
  121. // GetAutoTableID gets current auto id with table id.
  122. func (m *Meta) GetAutoTableID(dbID int64, tableID int64) (int64, error) {
  123. return m.txn.HGetInt64(m.dbKey(dbID), m.autoTalbeIDKey(tableID))
  124. }
  125. // GetSchemaVersion gets current global schema version.
  126. func (m *Meta) GetSchemaVersion() (int64, error) {
  127. return m.txn.GetInt64(mSchemaVersionKey)
  128. }
  129. // GenSchemaVersion generates next schema version.
  130. func (m *Meta) GenSchemaVersion() (int64, error) {
  131. return m.txn.Inc(mSchemaVersionKey, 1)
  132. }
  133. func (m *Meta) checkDBExists(dbKey []byte) error {
  134. v, err := m.txn.HGet(mDBs, dbKey)
  135. if err != nil {
  136. return errors.Trace(err)
  137. } else if v == nil {
  138. return ErrDBNotExists
  139. }
  140. return nil
  141. }
  142. func (m *Meta) checkDBNotExists(dbKey []byte) error {
  143. v, err := m.txn.HGet(mDBs, dbKey)
  144. if err != nil {
  145. return errors.Trace(err)
  146. }
  147. if v != nil {
  148. return ErrDBExists
  149. }
  150. return nil
  151. }
  152. func (m *Meta) checkTableExists(dbKey []byte, tableKey []byte) error {
  153. v, err := m.txn.HGet(dbKey, tableKey)
  154. if err != nil {
  155. return errors.Trace(err)
  156. }
  157. if v == nil {
  158. return ErrTableNotExists
  159. }
  160. return nil
  161. }
  162. func (m *Meta) checkTableNotExists(dbKey []byte, tableKey []byte) error {
  163. v, err := m.txn.HGet(dbKey, tableKey)
  164. if err != nil {
  165. return errors.Trace(err)
  166. }
  167. if v != nil {
  168. return ErrTableExists
  169. }
  170. return nil
  171. }
  172. // CreateDatabase creates a database with db info.
  173. func (m *Meta) CreateDatabase(dbInfo *model.DBInfo) error {
  174. dbKey := m.dbKey(dbInfo.ID)
  175. if err := m.checkDBNotExists(dbKey); err != nil {
  176. return errors.Trace(err)
  177. }
  178. data, err := json.Marshal(dbInfo)
  179. if err != nil {
  180. return errors.Trace(err)
  181. }
  182. return m.txn.HSet(mDBs, dbKey, data)
  183. }
  184. // UpdateDatabase updates a database with db info.
  185. func (m *Meta) UpdateDatabase(dbInfo *model.DBInfo) error {
  186. dbKey := m.dbKey(dbInfo.ID)
  187. if err := m.checkDBExists(dbKey); err != nil {
  188. return errors.Trace(err)
  189. }
  190. data, err := json.Marshal(dbInfo)
  191. if err != nil {
  192. return errors.Trace(err)
  193. }
  194. return m.txn.HSet(mDBs, dbKey, data)
  195. }
  196. // CreateTable creates a table with tableInfo in database.
  197. func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
  198. // Check if db exists.
  199. dbKey := m.dbKey(dbID)
  200. if err := m.checkDBExists(dbKey); err != nil {
  201. return errors.Trace(err)
  202. }
  203. // Check if table exists.
  204. tableKey := m.tableKey(tableInfo.ID)
  205. if err := m.checkTableNotExists(dbKey, tableKey); err != nil {
  206. return errors.Trace(err)
  207. }
  208. data, err := json.Marshal(tableInfo)
  209. if err != nil {
  210. return errors.Trace(err)
  211. }
  212. return m.txn.HSet(dbKey, tableKey, data)
  213. }
  214. // DropDatabase drops whole database.
  215. func (m *Meta) DropDatabase(dbID int64) error {
  216. // Check if db exists.
  217. dbKey := m.dbKey(dbID)
  218. if err := m.txn.HClear(dbKey); err != nil {
  219. return errors.Trace(err)
  220. }
  221. if err := m.txn.HDel(mDBs, dbKey); err != nil {
  222. return errors.Trace(err)
  223. }
  224. return nil
  225. }
  226. // DropTable drops table in database.
  227. func (m *Meta) DropTable(dbID int64, tableID int64) error {
  228. // Check if db exists.
  229. dbKey := m.dbKey(dbID)
  230. if err := m.checkDBExists(dbKey); err != nil {
  231. return errors.Trace(err)
  232. }
  233. // Check if table exists.
  234. tableKey := m.tableKey(tableID)
  235. if err := m.checkTableExists(dbKey, tableKey); err != nil {
  236. return errors.Trace(err)
  237. }
  238. if err := m.txn.HDel(dbKey, tableKey); err != nil {
  239. return errors.Trace(err)
  240. }
  241. if err := m.txn.HDel(dbKey, m.autoTalbeIDKey(tableID)); err != nil {
  242. return errors.Trace(err)
  243. }
  244. return nil
  245. }
  246. // UpdateTable updates the table with table info.
  247. func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
  248. // Check if db exists.
  249. dbKey := m.dbKey(dbID)
  250. if err := m.checkDBExists(dbKey); err != nil {
  251. return errors.Trace(err)
  252. }
  253. // Check if table exists.
  254. tableKey := m.tableKey(tableInfo.ID)
  255. if err := m.checkTableExists(dbKey, tableKey); err != nil {
  256. return errors.Trace(err)
  257. }
  258. data, err := json.Marshal(tableInfo)
  259. if err != nil {
  260. return errors.Trace(err)
  261. }
  262. err = m.txn.HSet(dbKey, tableKey, data)
  263. return errors.Trace(err)
  264. }
  265. // ListTables shows all tables in database.
  266. func (m *Meta) ListTables(dbID int64) ([]*model.TableInfo, error) {
  267. dbKey := m.dbKey(dbID)
  268. if err := m.checkDBExists(dbKey); err != nil {
  269. return nil, errors.Trace(err)
  270. }
  271. res, err := m.txn.HGetAll(dbKey)
  272. if err != nil {
  273. return nil, errors.Trace(err)
  274. }
  275. tables := make([]*model.TableInfo, 0, len(res)/2)
  276. for _, r := range res {
  277. // only handle table meta
  278. tableKey := string(r.Field)
  279. if !strings.HasPrefix(tableKey, mTablePrefix) {
  280. continue
  281. }
  282. tbInfo := &model.TableInfo{}
  283. err = json.Unmarshal(r.Value, tbInfo)
  284. if err != nil {
  285. return nil, errors.Trace(err)
  286. }
  287. tables = append(tables, tbInfo)
  288. }
  289. return tables, nil
  290. }
  291. // ListDatabases shows all databases.
  292. func (m *Meta) ListDatabases() ([]*model.DBInfo, error) {
  293. res, err := m.txn.HGetAll(mDBs)
  294. if err != nil {
  295. return nil, errors.Trace(err)
  296. }
  297. dbs := make([]*model.DBInfo, 0, len(res))
  298. for _, r := range res {
  299. dbInfo := &model.DBInfo{}
  300. err = json.Unmarshal(r.Value, dbInfo)
  301. if err != nil {
  302. return nil, errors.Trace(err)
  303. }
  304. dbs = append(dbs, dbInfo)
  305. }
  306. return dbs, nil
  307. }
  308. // GetDatabase gets the database value with ID.
  309. func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) {
  310. dbKey := m.dbKey(dbID)
  311. value, err := m.txn.HGet(mDBs, dbKey)
  312. if err != nil || value == nil {
  313. return nil, errors.Trace(err)
  314. }
  315. dbInfo := &model.DBInfo{}
  316. err = json.Unmarshal(value, dbInfo)
  317. return dbInfo, errors.Trace(err)
  318. }
  319. // GetTable gets the table value in database with tableID.
  320. func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
  321. // Check if db exists.
  322. dbKey := m.dbKey(dbID)
  323. if err := m.checkDBExists(dbKey); err != nil {
  324. return nil, errors.Trace(err)
  325. }
  326. tableKey := m.tableKey(tableID)
  327. value, err := m.txn.HGet(dbKey, tableKey)
  328. if err != nil || value == nil {
  329. return nil, errors.Trace(err)
  330. }
  331. tableInfo := &model.TableInfo{}
  332. err = json.Unmarshal(value, tableInfo)
  333. return tableInfo, errors.Trace(err)
  334. }
  335. // DDL job structure
  336. // DDLOnwer: []byte
  337. // DDLJobList: list jobs
  338. // DDLJobHistory: hash
  339. // DDLJobReorg: hash
  340. //
  341. // for multi DDL workers, only one can become the owner
  342. // to operate DDL jobs, and dispatch them to MR Jobs.
  343. var (
  344. mDDLJobOwnerKey = []byte("DDLJobOwner")
  345. mDDLJobListKey = []byte("DDLJobList")
  346. mDDLJobHistoryKey = []byte("DDLJobHistory")
  347. mDDLJobReorgKey = []byte("DDLJobReorg")
  348. )
  349. func (m *Meta) getJobOwner(key []byte) (*model.Owner, error) {
  350. value, err := m.txn.Get(key)
  351. if err != nil || value == nil {
  352. return nil, errors.Trace(err)
  353. }
  354. owner := &model.Owner{}
  355. err = json.Unmarshal(value, owner)
  356. return owner, errors.Trace(err)
  357. }
  358. // GetDDLJobOwner gets the current owner for DDL.
  359. func (m *Meta) GetDDLJobOwner() (*model.Owner, error) {
  360. return m.getJobOwner(mDDLJobOwnerKey)
  361. }
  362. func (m *Meta) setJobOwner(key []byte, o *model.Owner) error {
  363. b, err := json.Marshal(o)
  364. if err != nil {
  365. return errors.Trace(err)
  366. }
  367. return m.txn.Set(key, b)
  368. }
  369. // SetDDLJobOwner sets the current owner for DDL.
  370. func (m *Meta) SetDDLJobOwner(o *model.Owner) error {
  371. return m.setJobOwner(mDDLJobOwnerKey, o)
  372. }
  373. func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error {
  374. b, err := job.Encode()
  375. if err != nil {
  376. return errors.Trace(err)
  377. }
  378. return m.txn.RPush(key, b)
  379. }
  380. // EnQueueDDLJob adds a DDL job to the list.
  381. func (m *Meta) EnQueueDDLJob(job *model.Job) error {
  382. return m.enQueueDDLJob(mDDLJobListKey, job)
  383. }
  384. func (m *Meta) deQueueDDLJob(key []byte) (*model.Job, error) {
  385. value, err := m.txn.LPop(key)
  386. if err != nil || value == nil {
  387. return nil, errors.Trace(err)
  388. }
  389. job := &model.Job{}
  390. err = job.Decode(value)
  391. return job, errors.Trace(err)
  392. }
  393. // DeQueueDDLJob pops a DDL job from the list.
  394. func (m *Meta) DeQueueDDLJob() (*model.Job, error) {
  395. return m.deQueueDDLJob(mDDLJobListKey)
  396. }
  397. func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {
  398. value, err := m.txn.LIndex(key, index)
  399. if err != nil || value == nil {
  400. return nil, errors.Trace(err)
  401. }
  402. job := &model.Job{}
  403. err = job.Decode(value)
  404. return job, errors.Trace(err)
  405. }
  406. // GetDDLJob returns the DDL job with index.
  407. func (m *Meta) GetDDLJob(index int64) (*model.Job, error) {
  408. job, err := m.getDDLJob(mDDLJobListKey, index)
  409. return job, errors.Trace(err)
  410. }
  411. func (m *Meta) updateDDLJob(index int64, job *model.Job, key []byte) error {
  412. // TODO: use timestamp allocated by TSO
  413. job.LastUpdateTS = time.Now().UnixNano()
  414. b, err := job.Encode()
  415. if err != nil {
  416. return errors.Trace(err)
  417. }
  418. return m.txn.LSet(key, index, b)
  419. }
  420. // UpdateDDLJob updates the DDL job with index.
  421. func (m *Meta) UpdateDDLJob(index int64, job *model.Job) error {
  422. return m.updateDDLJob(index, job, mDDLJobListKey)
  423. }
  424. // DDLJobQueueLen returns the DDL job queue length.
  425. func (m *Meta) DDLJobQueueLen() (int64, error) {
  426. return m.txn.LLen(mDDLJobListKey)
  427. }
  428. func (m *Meta) jobIDKey(id int64) []byte {
  429. b := make([]byte, 8)
  430. binary.BigEndian.PutUint64(b, uint64(id))
  431. return b
  432. }
  433. func (m *Meta) addHistoryDDLJob(key []byte, job *model.Job) error {
  434. b, err := job.Encode()
  435. if err != nil {
  436. return errors.Trace(err)
  437. }
  438. return m.txn.HSet(key, m.jobIDKey(job.ID), b)
  439. }
  440. // AddHistoryDDLJob adds DDL job to history.
  441. func (m *Meta) AddHistoryDDLJob(job *model.Job) error {
  442. return m.addHistoryDDLJob(mDDLJobHistoryKey, job)
  443. }
  444. func (m *Meta) getHistoryDDLJob(key []byte, id int64) (*model.Job, error) {
  445. value, err := m.txn.HGet(key, m.jobIDKey(id))
  446. if err != nil || value == nil {
  447. return nil, errors.Trace(err)
  448. }
  449. job := &model.Job{}
  450. err = job.Decode(value)
  451. return job, errors.Trace(err)
  452. }
  453. // GetHistoryDDLJob gets a history DDL job.
  454. func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) {
  455. return m.getHistoryDDLJob(mDDLJobHistoryKey, id)
  456. }
  457. // IsBootstrapped returns whether we have already run bootstrap or not.
  458. // return true means we don't need doing any other bootstrap.
  459. func (m *Meta) IsBootstrapped() (bool, error) {
  460. value, err := m.txn.GetInt64(mBootstrapKey)
  461. if err != nil {
  462. return false, errors.Trace(err)
  463. }
  464. return value == 1, nil
  465. }
  466. // FinishBootstrap finishes bootstrap.
  467. func (m *Meta) FinishBootstrap() error {
  468. err := m.txn.Set(mBootstrapKey, []byte("1"))
  469. return errors.Trace(err)
  470. }
  471. // UpdateDDLReorgHandle saves the job reorganization latest processed handle for later resuming.
  472. func (m *Meta) UpdateDDLReorgHandle(job *model.Job, handle int64) error {
  473. err := m.txn.HSet(mDDLJobReorgKey, m.jobIDKey(job.ID), []byte(strconv.FormatInt(handle, 10)))
  474. return errors.Trace(err)
  475. }
  476. // RemoveDDLReorgHandle removes the job reorganization handle.
  477. func (m *Meta) RemoveDDLReorgHandle(job *model.Job) error {
  478. err := m.txn.HDel(mDDLJobReorgKey, m.jobIDKey(job.ID))
  479. return errors.Trace(err)
  480. }
  481. // GetDDLReorgHandle gets the latest processed handle.
  482. func (m *Meta) GetDDLReorgHandle(job *model.Job) (int64, error) {
  483. value, err := m.txn.HGetInt64(mDDLJobReorgKey, m.jobIDKey(job.ID))
  484. return value, errors.Trace(err)
  485. }
  486. // DDL background job structure
  487. // BgJobOnwer: []byte
  488. // BgJobList: list jobs
  489. // BgJobHistory: hash
  490. // BgJobReorg: hash
  491. //
  492. // for multi background worker, only one can become the owner
  493. // to operate background job, and dispatch them to MR background job.
  494. var (
  495. mBgJobOwnerKey = []byte("BgJobOwner")
  496. mBgJobListKey = []byte("BgJobList")
  497. mBgJobHistoryKey = []byte("BgJobHistory")
  498. )
  499. // UpdateBgJob updates the background job with index.
  500. func (m *Meta) UpdateBgJob(index int64, job *model.Job) error {
  501. return m.updateDDLJob(index, job, mBgJobListKey)
  502. }
  503. // GetBgJob returns the background job with index.
  504. func (m *Meta) GetBgJob(index int64) (*model.Job, error) {
  505. job, err := m.getDDLJob(mBgJobListKey, index)
  506. return job, errors.Trace(err)
  507. }
  508. // EnQueueBgJob adds a background job to the list.
  509. func (m *Meta) EnQueueBgJob(job *model.Job) error {
  510. return m.enQueueDDLJob(mBgJobListKey, job)
  511. }
  512. // BgJobQueueLen returns the background job queue length.
  513. func (m *Meta) BgJobQueueLen() (int64, error) {
  514. return m.txn.LLen(mBgJobListKey)
  515. }
  516. // AddHistoryBgJob adds background job to history.
  517. func (m *Meta) AddHistoryBgJob(job *model.Job) error {
  518. return m.addHistoryDDLJob(mBgJobHistoryKey, job)
  519. }
  520. // GetHistoryBgJob gets a history background job.
  521. func (m *Meta) GetHistoryBgJob(id int64) (*model.Job, error) {
  522. return m.getHistoryDDLJob(mBgJobHistoryKey, id)
  523. }
  524. // DeQueueBgJob pops a background job from the list.
  525. func (m *Meta) DeQueueBgJob() (*model.Job, error) {
  526. return m.deQueueDDLJob(mBgJobListKey)
  527. }
  528. // GetBgJobOwner gets the current background job owner.
  529. func (m *Meta) GetBgJobOwner() (*model.Owner, error) {
  530. return m.getJobOwner(mBgJobOwnerKey)
  531. }
  532. // SetBgJobOwner sets the current background job owner.
  533. func (m *Meta) SetBgJobOwner(o *model.Owner) error {
  534. return m.setJobOwner(mBgJobOwnerKey, o)
  535. }