You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

430 lines
12 KiB

  1. // Copyright 2015 PingCAP, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package ddl
  14. import (
  15. "github.com/juju/errors"
  16. "github.com/ngaut/log"
  17. "github.com/pingcap/tidb/ast"
  18. "github.com/pingcap/tidb/column"
  19. "github.com/pingcap/tidb/kv"
  20. "github.com/pingcap/tidb/meta"
  21. "github.com/pingcap/tidb/model"
  22. "github.com/pingcap/tidb/table"
  23. "github.com/pingcap/tidb/table/tables"
  24. "github.com/pingcap/tidb/terror"
  25. )
  26. func (d *ddl) adjustColumnOffset(columns []*model.ColumnInfo, indices []*model.IndexInfo, offset int, added bool) {
  27. offsetChanged := make(map[int]int)
  28. if added {
  29. for i := offset + 1; i < len(columns); i++ {
  30. offsetChanged[columns[i].Offset] = i
  31. columns[i].Offset = i
  32. }
  33. columns[offset].Offset = offset
  34. } else {
  35. for i := offset + 1; i < len(columns); i++ {
  36. offsetChanged[columns[i].Offset] = i - 1
  37. columns[i].Offset = i - 1
  38. }
  39. columns[offset].Offset = len(columns) - 1
  40. }
  41. // TODO: index can't cover the add/remove column with offset now, we may check this later.
  42. // Update index column offset info.
  43. for _, idx := range indices {
  44. for _, col := range idx.Columns {
  45. newOffset, ok := offsetChanged[col.Offset]
  46. if ok {
  47. col.Offset = newOffset
  48. }
  49. }
  50. }
  51. }
  52. func (d *ddl) addColumn(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) {
  53. // Check column name duplicate.
  54. cols := tblInfo.Columns
  55. position := len(cols)
  56. // Get column position.
  57. if pos.Tp == ast.ColumnPositionFirst {
  58. position = 0
  59. } else if pos.Tp == ast.ColumnPositionAfter {
  60. c := findCol(cols, pos.RelativeColumn.Name.L)
  61. if c == nil {
  62. return nil, 0, errors.Errorf("No such column: %v", pos.RelativeColumn)
  63. }
  64. // Insert position is after the mentioned column.
  65. position = c.Offset + 1
  66. }
  67. colInfo.State = model.StateNone
  68. // To support add column asynchronous, we should mark its offset as the last column.
  69. // So that we can use origin column offset to get value from row.
  70. colInfo.Offset = len(cols)
  71. // Insert col into the right place of the column list.
  72. newCols := make([]*model.ColumnInfo, 0, len(cols)+1)
  73. newCols = append(newCols, cols[:position]...)
  74. newCols = append(newCols, colInfo)
  75. newCols = append(newCols, cols[position:]...)
  76. tblInfo.Columns = newCols
  77. return colInfo, position, nil
  78. }
  79. func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
  80. schemaID := job.SchemaID
  81. tblInfo, err := d.getTableInfo(t, job)
  82. if err != nil {
  83. return errors.Trace(err)
  84. }
  85. col := &model.ColumnInfo{}
  86. pos := &ast.ColumnPosition{}
  87. offset := 0
  88. err = job.DecodeArgs(col, pos, &offset)
  89. if err != nil {
  90. job.State = model.JobCancelled
  91. return errors.Trace(err)
  92. }
  93. columnInfo := findCol(tblInfo.Columns, col.Name.L)
  94. if columnInfo != nil {
  95. if columnInfo.State == model.StatePublic {
  96. // we already have a column with same column name
  97. job.State = model.JobCancelled
  98. return errors.Errorf("ADD COLUMN: column already exist %s", col.Name.L)
  99. }
  100. } else {
  101. columnInfo, offset, err = d.addColumn(tblInfo, col, pos)
  102. if err != nil {
  103. job.State = model.JobCancelled
  104. return errors.Trace(err)
  105. }
  106. // Set offset arg to job.
  107. if offset != 0 {
  108. job.Args = []interface{}{columnInfo, pos, offset}
  109. }
  110. }
  111. _, err = t.GenSchemaVersion()
  112. if err != nil {
  113. return errors.Trace(err)
  114. }
  115. switch columnInfo.State {
  116. case model.StateNone:
  117. // none -> delete only
  118. job.SchemaState = model.StateDeleteOnly
  119. columnInfo.State = model.StateDeleteOnly
  120. err = t.UpdateTable(schemaID, tblInfo)
  121. return errors.Trace(err)
  122. case model.StateDeleteOnly:
  123. // delete only -> write only
  124. job.SchemaState = model.StateWriteOnly
  125. columnInfo.State = model.StateWriteOnly
  126. err = t.UpdateTable(schemaID, tblInfo)
  127. return errors.Trace(err)
  128. case model.StateWriteOnly:
  129. // write only -> reorganization
  130. job.SchemaState = model.StateWriteReorganization
  131. columnInfo.State = model.StateWriteReorganization
  132. // initialize SnapshotVer to 0 for later reorganization check.
  133. job.SnapshotVer = 0
  134. err = t.UpdateTable(schemaID, tblInfo)
  135. return errors.Trace(err)
  136. case model.StateWriteReorganization:
  137. // reorganization -> public
  138. // get the current version for reorganization if we don't have
  139. reorgInfo, err := d.getReorgInfo(t, job)
  140. if err != nil || reorgInfo.first {
  141. // if we run reorg firstly, we should update the job snapshot version
  142. // and then run the reorg next time.
  143. return errors.Trace(err)
  144. }
  145. tbl, err := d.getTable(schemaID, tblInfo)
  146. if err != nil {
  147. return errors.Trace(err)
  148. }
  149. err = d.runReorgJob(func() error {
  150. return d.backfillColumn(tbl, columnInfo, reorgInfo)
  151. })
  152. if terror.ErrorEqual(err, errWaitReorgTimeout) {
  153. // if timeout, we should return, check for the owner and re-wait job done.
  154. return nil
  155. }
  156. if err != nil {
  157. return errors.Trace(err)
  158. }
  159. // Adjust column offset.
  160. d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, offset, true)
  161. columnInfo.State = model.StatePublic
  162. if err = t.UpdateTable(schemaID, tblInfo); err != nil {
  163. return errors.Trace(err)
  164. }
  165. // finish this job
  166. job.SchemaState = model.StatePublic
  167. job.State = model.JobDone
  168. return nil
  169. default:
  170. return errors.Errorf("invalid column state %v", columnInfo.State)
  171. }
  172. }
  173. func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
  174. schemaID := job.SchemaID
  175. tblInfo, err := d.getTableInfo(t, job)
  176. if err != nil {
  177. return errors.Trace(err)
  178. }
  179. var colName model.CIStr
  180. err = job.DecodeArgs(&colName)
  181. if err != nil {
  182. job.State = model.JobCancelled
  183. return errors.Trace(err)
  184. }
  185. colInfo := findCol(tblInfo.Columns, colName.L)
  186. if colInfo == nil {
  187. job.State = model.JobCancelled
  188. return errors.Errorf("column %s doesn't exist", colName)
  189. }
  190. if len(tblInfo.Columns) == 1 {
  191. job.State = model.JobCancelled
  192. return errors.Errorf("can't drop only column %s in table %s", colName, tblInfo.Name)
  193. }
  194. // we don't support drop column with index covered now.
  195. // we must drop the index first, then drop the column.
  196. for _, indexInfo := range tblInfo.Indices {
  197. for _, col := range indexInfo.Columns {
  198. if col.Name.L == colName.L {
  199. job.State = model.JobCancelled
  200. return errors.Errorf("can't drop column %s with index %s covered now", colName, indexInfo.Name)
  201. }
  202. }
  203. }
  204. _, err = t.GenSchemaVersion()
  205. if err != nil {
  206. return errors.Trace(err)
  207. }
  208. switch colInfo.State {
  209. case model.StatePublic:
  210. // public -> write only
  211. job.SchemaState = model.StateWriteOnly
  212. colInfo.State = model.StateWriteOnly
  213. // set this column's offset to the last and reset all following columns' offset
  214. d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, colInfo.Offset, false)
  215. err = t.UpdateTable(schemaID, tblInfo)
  216. return errors.Trace(err)
  217. case model.StateWriteOnly:
  218. // write only -> delete only
  219. job.SchemaState = model.StateDeleteOnly
  220. colInfo.State = model.StateDeleteOnly
  221. err = t.UpdateTable(schemaID, tblInfo)
  222. return errors.Trace(err)
  223. case model.StateDeleteOnly:
  224. // delete only -> reorganization
  225. job.SchemaState = model.StateDeleteReorganization
  226. colInfo.State = model.StateDeleteReorganization
  227. // initialize SnapshotVer to 0 for later reorganization check.
  228. job.SnapshotVer = 0
  229. err = t.UpdateTable(schemaID, tblInfo)
  230. return errors.Trace(err)
  231. case model.StateDeleteReorganization:
  232. // reorganization -> absent
  233. reorgInfo, err := d.getReorgInfo(t, job)
  234. if err != nil || reorgInfo.first {
  235. // if we run reorg firstly, we should update the job snapshot version
  236. // and then run the reorg next time.
  237. return errors.Trace(err)
  238. }
  239. tbl, err := d.getTable(schemaID, tblInfo)
  240. if err != nil {
  241. return errors.Trace(err)
  242. }
  243. err = d.runReorgJob(func() error {
  244. return d.dropTableColumn(tbl, colInfo, reorgInfo)
  245. })
  246. if terror.ErrorEqual(err, errWaitReorgTimeout) {
  247. // if timeout, we should return, check for the owner and re-wait job done.
  248. return nil
  249. }
  250. if err != nil {
  251. return errors.Trace(err)
  252. }
  253. // all reorganization jobs done, drop this column
  254. newColumns := make([]*model.ColumnInfo, 0, len(tblInfo.Columns))
  255. for _, col := range tblInfo.Columns {
  256. if col.Name.L != colName.L {
  257. newColumns = append(newColumns, col)
  258. }
  259. }
  260. tblInfo.Columns = newColumns
  261. if err = t.UpdateTable(schemaID, tblInfo); err != nil {
  262. return errors.Trace(err)
  263. }
  264. // finish this job
  265. job.SchemaState = model.StateNone
  266. job.State = model.JobDone
  267. return nil
  268. default:
  269. return errors.Errorf("invalid table state %v", tblInfo.State)
  270. }
  271. }
  272. // How to backfill column data in reorganization state?
  273. // 1. Generate a snapshot with special version.
  274. // 2. Traverse the snapshot, get every row in the table.
  275. // 3. For one row, if the row has been already deleted, skip to next row.
  276. // 4. If not deleted, check whether column data has existed, if existed, skip to next row.
  277. // 5. If column data doesn't exist, backfill the column with default value and then continue to handle next row.
  278. func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
  279. seekHandle := reorgInfo.Handle
  280. version := reorgInfo.SnapshotVer
  281. for {
  282. handles, err := d.getSnapshotRows(t, version, seekHandle)
  283. if err != nil {
  284. return errors.Trace(err)
  285. } else if len(handles) == 0 {
  286. return nil
  287. }
  288. seekHandle = handles[len(handles)-1] + 1
  289. err = d.backfillColumnData(t, columnInfo, handles, reorgInfo)
  290. if err != nil {
  291. return errors.Trace(err)
  292. }
  293. }
  294. }
  295. func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error {
  296. for _, handle := range handles {
  297. log.Info("[ddl] backfill column...", handle)
  298. err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
  299. if err := d.isReorgRunnable(txn); err != nil {
  300. return errors.Trace(err)
  301. }
  302. // First check if row exists.
  303. exist, err := checkRowExist(txn, t, handle)
  304. if err != nil {
  305. return errors.Trace(err)
  306. } else if !exist {
  307. // If row doesn't exist, skip it.
  308. return nil
  309. }
  310. backfillKey := t.RecordKey(handle, &column.Col{ColumnInfo: *columnInfo})
  311. backfillValue, err := txn.Get(backfillKey)
  312. if err != nil && !kv.IsErrNotFound(err) {
  313. return errors.Trace(err)
  314. }
  315. if backfillValue != nil {
  316. return nil
  317. }
  318. value, _, err := table.GetColDefaultValue(nil, columnInfo)
  319. if err != nil {
  320. return errors.Trace(err)
  321. }
  322. // must convert to the column field type.
  323. v, err := value.ConvertTo(&columnInfo.FieldType)
  324. if err != nil {
  325. return errors.Trace(err)
  326. }
  327. err = lockRow(txn, t, handle)
  328. if err != nil {
  329. return errors.Trace(err)
  330. }
  331. err = tables.SetColValue(txn, backfillKey, v)
  332. if err != nil {
  333. return errors.Trace(err)
  334. }
  335. return errors.Trace(reorgInfo.UpdateHandle(txn, handle))
  336. })
  337. if err != nil {
  338. return errors.Trace(err)
  339. }
  340. }
  341. return nil
  342. }
  343. func (d *ddl) dropTableColumn(t table.Table, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
  344. version := reorgInfo.SnapshotVer
  345. seekHandle := reorgInfo.Handle
  346. col := &column.Col{ColumnInfo: *colInfo}
  347. for {
  348. handles, err := d.getSnapshotRows(t, version, seekHandle)
  349. if err != nil {
  350. return errors.Trace(err)
  351. } else if len(handles) == 0 {
  352. return nil
  353. }
  354. seekHandle = handles[len(handles)-1] + 1
  355. err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
  356. if err1 := d.isReorgRunnable(txn); err1 != nil {
  357. return errors.Trace(err1)
  358. }
  359. var h int64
  360. for _, h = range handles {
  361. key := t.RecordKey(h, col)
  362. err1 := txn.Delete(key)
  363. if err1 != nil && !terror.ErrorEqual(err1, kv.ErrNotExist) {
  364. return errors.Trace(err1)
  365. }
  366. }
  367. return errors.Trace(reorgInfo.UpdateHandle(txn, h))
  368. })
  369. if err != nil {
  370. return errors.Trace(err)
  371. }
  372. }
  373. }