You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

796 lines
22 KiB

  1. package themis
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/juju/errors"
  7. "github.com/ngaut/log"
  8. "github.com/pingcap/go-hbase"
  9. "github.com/pingcap/go-themis/oracle"
  10. )
  11. type TxnConfig struct {
  12. ConcurrentPrewriteAndCommit bool
  13. WaitSecondaryCommit bool
  14. TTLInMs uint64
  15. MaxRowsInOneTxn int
  16. // options below is for debugging and testing
  17. brokenPrewriteSecondaryTest bool
  18. brokenPrewriteSecondaryAndRollbackTest bool
  19. brokenCommitPrimaryTest bool
  20. brokenCommitSecondaryTest bool
  21. }
  22. var defaultTxnConf = TxnConfig{
  23. ConcurrentPrewriteAndCommit: true,
  24. WaitSecondaryCommit: false,
  25. MaxRowsInOneTxn: 50000,
  26. TTLInMs: 5 * 1000, // default txn TTL: 5s
  27. brokenPrewriteSecondaryTest: false,
  28. brokenPrewriteSecondaryAndRollbackTest: false,
  29. brokenCommitPrimaryTest: false,
  30. brokenCommitSecondaryTest: false,
  31. }
  32. type themisTxn struct {
  33. client hbase.HBaseClient
  34. rpc *themisRPC
  35. lockCleaner LockManager
  36. oracle oracle.Oracle
  37. mutationCache *columnMutationCache
  38. startTs uint64
  39. commitTs uint64
  40. primaryRow *rowMutation
  41. primary *hbase.ColumnCoordinate
  42. secondaryRows []*rowMutation
  43. secondary []*hbase.ColumnCoordinate
  44. primaryRowOffset int
  45. singleRowTxn bool
  46. secondaryLockBytes []byte
  47. conf TxnConfig
  48. hooks *txnHook
  49. }
  50. var _ Txn = (*themisTxn)(nil)
  51. var (
  52. // ErrSimulated is used when maybe rollback occurs error too.
  53. ErrSimulated = errors.New("simulated error")
  54. maxCleanLockRetryCount = 30
  55. pauseTime = 300 * time.Millisecond
  56. )
  57. func NewTxn(c hbase.HBaseClient, oracle oracle.Oracle) (Txn, error) {
  58. return NewTxnWithConf(c, defaultTxnConf, oracle)
  59. }
  60. func NewTxnWithConf(c hbase.HBaseClient, conf TxnConfig, oracle oracle.Oracle) (Txn, error) {
  61. var err error
  62. txn := &themisTxn{
  63. client: c,
  64. mutationCache: newColumnMutationCache(),
  65. oracle: oracle,
  66. primaryRowOffset: -1,
  67. conf: conf,
  68. rpc: newThemisRPC(c, oracle, conf),
  69. hooks: newHook(),
  70. }
  71. txn.startTs, err = txn.oracle.GetTimestamp()
  72. if err != nil {
  73. return nil, errors.Trace(err)
  74. }
  75. txn.lockCleaner = newThemisLockManager(txn.rpc, c)
  76. return txn, nil
  77. }
  78. func (txn *themisTxn) setHook(hooks *txnHook) {
  79. txn.hooks = hooks
  80. }
  81. func (txn *themisTxn) Gets(tbl string, gets []*hbase.Get) ([]*hbase.ResultRow, error) {
  82. results, err := txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, false)
  83. if err != nil {
  84. return nil, errors.Trace(err)
  85. }
  86. var ret []*hbase.ResultRow
  87. hasLock := false
  88. for _, r := range results {
  89. // if this row is locked, try clean lock and get again
  90. if isLockResult(r) {
  91. hasLock = true
  92. err = txn.constructLockAndClean([]byte(tbl), r.SortedColumns)
  93. if err != nil {
  94. // TODO if it's a conflict error, it means this lock
  95. // isn't expired, maybe we can retry or return partial results.
  96. return nil, errors.Trace(err)
  97. }
  98. }
  99. // it's OK, because themisBatchGet doesn't return nil value.
  100. ret = append(ret, r)
  101. }
  102. if hasLock {
  103. // after we cleaned locks, try to get again.
  104. ret, err = txn.rpc.themisBatchGet([]byte(tbl), gets, txn.startTs, true)
  105. if err != nil {
  106. return nil, errors.Trace(err)
  107. }
  108. }
  109. return ret, nil
  110. }
  111. func (txn *themisTxn) Get(tbl string, g *hbase.Get) (*hbase.ResultRow, error) {
  112. r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, false)
  113. if err != nil {
  114. return nil, errors.Trace(err)
  115. }
  116. // contain locks, try to clean and get again
  117. if r != nil && isLockResult(r) {
  118. r, err = txn.tryToCleanLockAndGetAgain([]byte(tbl), g, r.SortedColumns)
  119. if err != nil {
  120. return nil, errors.Trace(err)
  121. }
  122. }
  123. return r, nil
  124. }
  125. func (txn *themisTxn) Put(tbl string, p *hbase.Put) {
  126. // add mutation to buffer
  127. for _, e := range getEntriesFromPut(p) {
  128. txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false)
  129. }
  130. }
  131. func (txn *themisTxn) Delete(tbl string, p *hbase.Delete) error {
  132. entries, err := getEntriesFromDel(p)
  133. if err != nil {
  134. return errors.Trace(err)
  135. }
  136. for _, e := range entries {
  137. txn.mutationCache.addMutation([]byte(tbl), p.Row, e.Column, e.typ, e.value, false)
  138. }
  139. return nil
  140. }
  141. func (txn *themisTxn) Commit() error {
  142. if txn.mutationCache.getMutationCount() == 0 {
  143. return nil
  144. }
  145. if txn.mutationCache.getRowCount() > txn.conf.MaxRowsInOneTxn {
  146. return ErrTooManyRows
  147. }
  148. txn.selectPrimaryAndSecondaries()
  149. err := txn.prewritePrimary()
  150. if err != nil {
  151. // no need to check wrong region here, hbase client will retry when
  152. // occurs single row NotInRegion error.
  153. log.Error(errors.ErrorStack(err))
  154. // it's safe to retry, because this transaction is not committed.
  155. return ErrRetryable
  156. }
  157. err = txn.prewriteSecondary()
  158. if err != nil {
  159. if isWrongRegionErr(err) {
  160. log.Warn("region info outdated")
  161. // reset hbase client buffered region info
  162. txn.client.CleanAllRegionCache()
  163. }
  164. return ErrRetryable
  165. }
  166. txn.commitTs, err = txn.oracle.GetTimestamp()
  167. if err != nil {
  168. log.Error(errors.ErrorStack(err))
  169. return ErrRetryable
  170. }
  171. err = txn.commitPrimary()
  172. if err != nil {
  173. // commit primary error, rollback
  174. log.Error("commit primary row failed", txn.startTs, err)
  175. txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
  176. txn.rollbackSecondaryRow(len(txn.secondaryRows) - 1)
  177. return ErrRetryable
  178. }
  179. txn.commitSecondary()
  180. log.Debug("themis txn commit successfully", txn.startTs, txn.commitTs)
  181. return nil
  182. }
  183. func (txn *themisTxn) commitSecondary() {
  184. if bypass, _, _ := txn.hooks.beforeCommitSecondary(txn, nil); !bypass {
  185. return
  186. }
  187. if txn.conf.brokenCommitSecondaryTest {
  188. txn.brokenCommitSecondary()
  189. return
  190. }
  191. if txn.conf.ConcurrentPrewriteAndCommit {
  192. txn.batchCommitSecondary(txn.conf.WaitSecondaryCommit)
  193. } else {
  194. txn.commitSecondarySync()
  195. }
  196. }
  197. func (txn *themisTxn) commitSecondarySync() {
  198. for _, r := range txn.secondaryRows {
  199. err := txn.rpc.commitSecondaryRow(r.tbl, r.row, r.mutationList(false), txn.startTs, txn.commitTs)
  200. if err != nil {
  201. // fail of secondary commit will not stop the commits of next
  202. // secondaries
  203. log.Warning(err)
  204. }
  205. }
  206. }
  207. func (txn *themisTxn) batchCommitSecondary(wait bool) error {
  208. //will batch commit all rows in a region
  209. rsRowMap, err := txn.groupByRegion()
  210. if err != nil {
  211. return errors.Trace(err)
  212. }
  213. wg := sync.WaitGroup{}
  214. for _, regionRowMap := range rsRowMap {
  215. wg.Add(1)
  216. _, firstRowM := getFirstEntity(regionRowMap)
  217. go func(cli *themisRPC, tbl string, rMap map[string]*rowMutation, startTs, commitTs uint64) {
  218. defer wg.Done()
  219. err := cli.batchCommitSecondaryRows([]byte(tbl), rMap, startTs, commitTs)
  220. if err != nil {
  221. // fail of secondary commit will not stop the commits of next
  222. // secondaries
  223. if isWrongRegionErr(err) {
  224. txn.client.CleanAllRegionCache()
  225. log.Warn("region info outdated when committing secondary rows, don't panic")
  226. }
  227. }
  228. }(txn.rpc, string(firstRowM.tbl), regionRowMap, txn.startTs, txn.commitTs)
  229. }
  230. if wait {
  231. wg.Wait()
  232. }
  233. return nil
  234. }
  235. func (txn *themisTxn) groupByRegion() (map[string]map[string]*rowMutation, error) {
  236. rsRowMap := make(map[string]map[string]*rowMutation)
  237. for _, rm := range txn.secondaryRows {
  238. region, err := txn.client.LocateRegion(rm.tbl, rm.row, true)
  239. if err != nil {
  240. return nil, errors.Trace(err)
  241. }
  242. key := getBatchGroupKey(region, string(rm.tbl))
  243. if _, exists := rsRowMap[key]; !exists {
  244. rsRowMap[key] = map[string]*rowMutation{}
  245. }
  246. rsRowMap[key][string(rm.row)] = rm
  247. }
  248. return rsRowMap, nil
  249. }
  250. func (txn *themisTxn) commitPrimary() error {
  251. if txn.conf.brokenCommitPrimaryTest {
  252. return txn.brokenCommitPrimary()
  253. }
  254. return txn.rpc.commitRow(txn.primary.Table, txn.primary.Row,
  255. txn.primaryRow.mutationList(false),
  256. txn.startTs, txn.commitTs, txn.primaryRowOffset)
  257. }
  258. func (txn *themisTxn) selectPrimaryAndSecondaries() {
  259. txn.secondary = nil
  260. for tblName, rowMutations := range txn.mutationCache.mutations {
  261. for _, rowMutation := range rowMutations {
  262. row := rowMutation.row
  263. findPrimaryInRow := false
  264. for i, mutation := range rowMutation.mutationList(true) {
  265. colcord := hbase.NewColumnCoordinate([]byte(tblName), row, mutation.Family, mutation.Qual)
  266. // set the first column as primary if primary is not set by user
  267. if txn.primaryRowOffset == -1 &&
  268. (txn.primary == nil || txn.primary.Equal(colcord)) {
  269. txn.primary = colcord
  270. txn.primaryRowOffset = i
  271. txn.primaryRow = rowMutation
  272. findPrimaryInRow = true
  273. } else {
  274. txn.secondary = append(txn.secondary, colcord)
  275. }
  276. }
  277. if !findPrimaryInRow {
  278. txn.secondaryRows = append(txn.secondaryRows, rowMutation)
  279. }
  280. }
  281. }
  282. // hook for test
  283. if bypass, _, _ := txn.hooks.afterChoosePrimaryAndSecondary(txn, nil); !bypass {
  284. return
  285. }
  286. if len(txn.secondaryRows) == 0 {
  287. txn.singleRowTxn = true
  288. }
  289. // construct secondary lock
  290. secondaryLock := txn.constructSecondaryLock(hbase.TypePut)
  291. if secondaryLock != nil {
  292. txn.secondaryLockBytes = secondaryLock.Encode()
  293. } else {
  294. txn.secondaryLockBytes = nil
  295. }
  296. }
  297. func (txn *themisTxn) constructSecondaryLock(typ hbase.Type) *themisSecondaryLock {
  298. if txn.primaryRow.getSize() <= 1 && len(txn.secondaryRows) == 0 {
  299. return nil
  300. }
  301. l := newThemisSecondaryLock()
  302. l.primaryCoordinate = txn.primary
  303. l.ts = txn.startTs
  304. // TODO set client addr
  305. return l
  306. }
  307. func (txn *themisTxn) constructPrimaryLock() *themisPrimaryLock {
  308. l := newThemisPrimaryLock()
  309. l.typ = txn.primaryRow.getType(txn.primary.Column)
  310. l.ts = txn.startTs
  311. for _, c := range txn.secondary {
  312. l.addSecondary(c, txn.mutationCache.getMutation(c).typ)
  313. }
  314. return l
  315. }
  316. func (txn *themisTxn) constructLockAndClean(tbl []byte, lockKvs []*hbase.Kv) error {
  317. locks, err := getLocksFromResults([]byte(tbl), lockKvs, txn.rpc)
  318. if err != nil {
  319. return errors.Trace(err)
  320. }
  321. for _, lock := range locks {
  322. err := txn.cleanLockWithRetry(lock)
  323. if err != nil {
  324. return errors.Trace(err)
  325. }
  326. }
  327. return nil
  328. }
  329. func (txn *themisTxn) tryToCleanLockAndGetAgain(tbl []byte, g *hbase.Get, lockKvs []*hbase.Kv) (*hbase.ResultRow, error) {
  330. // try to clean locks
  331. err := txn.constructLockAndClean(tbl, lockKvs)
  332. if err != nil {
  333. return nil, errors.Trace(err)
  334. }
  335. // get again, ignore lock
  336. r, err := txn.rpc.themisGet([]byte(tbl), g, txn.startTs, true)
  337. if err != nil {
  338. return nil, errors.Trace(err)
  339. }
  340. return r, nil
  341. }
  342. func (txn *themisTxn) commitSecondaryAndCleanLock(lock *themisSecondaryLock, commitTs uint64) error {
  343. cc := lock.Coordinate()
  344. mutation := &columnMutation{
  345. Column: &cc.Column,
  346. mutationValuePair: &mutationValuePair{
  347. typ: lock.typ,
  348. },
  349. }
  350. err := txn.rpc.commitSecondaryRow(cc.Table, cc.Row,
  351. []*columnMutation{mutation}, lock.Timestamp(), commitTs)
  352. if err != nil {
  353. return errors.Trace(err)
  354. }
  355. return nil
  356. }
  357. func (txn *themisTxn) cleanLockWithRetry(lock Lock) error {
  358. for i := 0; i < maxCleanLockRetryCount; i++ {
  359. if exists, err := txn.lockCleaner.IsLockExists(lock.Coordinate(), 0, lock.Timestamp()); err != nil || !exists {
  360. return errors.Trace(err)
  361. }
  362. log.Warnf("lock exists txn: %v lock-txn: %v row: %q", txn.startTs, lock.Timestamp(), lock.Coordinate().Row)
  363. // try clean lock
  364. err := txn.tryToCleanLock(lock)
  365. if errorEqual(err, ErrLockNotExpired) {
  366. log.Warn("sleep a while, and retry clean lock", txn.startTs)
  367. // TODO(dongxu) use cleverer retry sleep time interval
  368. time.Sleep(pauseTime)
  369. continue
  370. } else if err != nil {
  371. return errors.Trace(err)
  372. }
  373. // lock cleaned successfully
  374. return nil
  375. }
  376. return ErrCleanLockFailed
  377. }
  378. func (txn *themisTxn) tryToCleanLock(lock Lock) error {
  379. // if it's secondary lock, first we'll check if its primary lock has been released.
  380. if lock.Role() == RoleSecondary {
  381. // get primary lock
  382. pl := lock.Primary()
  383. // check primary lock is exists
  384. exists, err := txn.lockCleaner.IsLockExists(pl.Coordinate(), 0, pl.Timestamp())
  385. if err != nil {
  386. return errors.Trace(err)
  387. }
  388. if !exists {
  389. // primary row is committed, commit this row
  390. cc := pl.Coordinate()
  391. commitTs, err := txn.lockCleaner.GetCommitTimestamp(cc, pl.Timestamp())
  392. if err != nil {
  393. return errors.Trace(err)
  394. }
  395. if commitTs > 0 {
  396. // if this transction has been committed
  397. log.Info("txn has been committed, ts:", commitTs, "prewriteTs:", pl.Timestamp())
  398. // commit secondary rows
  399. err := txn.commitSecondaryAndCleanLock(lock.(*themisSecondaryLock), commitTs)
  400. if err != nil {
  401. return errors.Trace(err)
  402. }
  403. return nil
  404. }
  405. }
  406. }
  407. expired, err := txn.rpc.checkAndSetLockIsExpired(lock)
  408. if err != nil {
  409. return errors.Trace(err)
  410. }
  411. // only clean expired lock
  412. if expired {
  413. // try to clean primary lock
  414. pl := lock.Primary()
  415. commitTs, cleanedLock, err := txn.lockCleaner.CleanLock(pl.Coordinate(), pl.Timestamp())
  416. if err != nil {
  417. return errors.Trace(err)
  418. }
  419. if cleanedLock != nil {
  420. pl = cleanedLock
  421. }
  422. log.Info("try clean secondary locks", pl.Timestamp())
  423. // clean secondary locks
  424. // erase lock and data if commitTs is 0; otherwise, commit it.
  425. for k, v := range pl.(*themisPrimaryLock).secondaries {
  426. cc := &hbase.ColumnCoordinate{}
  427. if err = cc.ParseFromString(k); err != nil {
  428. return errors.Trace(err)
  429. }
  430. if commitTs == 0 {
  431. // commitTs == 0, means clean primary lock successfully
  432. // expire trx havn't committed yet, we must delete lock and
  433. // dirty data
  434. err = txn.lockCleaner.EraseLockAndData(cc, pl.Timestamp())
  435. if err != nil {
  436. return errors.Trace(err)
  437. }
  438. } else {
  439. // primary row is committed, so we must commit other
  440. // secondary rows
  441. mutation := &columnMutation{
  442. Column: &cc.Column,
  443. mutationValuePair: &mutationValuePair{
  444. typ: v,
  445. },
  446. }
  447. err = txn.rpc.commitSecondaryRow(cc.Table, cc.Row,
  448. []*columnMutation{mutation}, pl.Timestamp(), commitTs)
  449. if err != nil {
  450. return errors.Trace(err)
  451. }
  452. }
  453. }
  454. } else {
  455. return ErrLockNotExpired
  456. }
  457. return nil
  458. }
  459. func (txn *themisTxn) batchPrewriteSecondaryRowsWithLockClean(tbl []byte, rowMs map[string]*rowMutation) error {
  460. locks, err := txn.batchPrewriteSecondaryRows(tbl, rowMs)
  461. if err != nil {
  462. return errors.Trace(err)
  463. }
  464. // lock clean
  465. if locks != nil && len(locks) > 0 {
  466. // hook for test
  467. if bypass, _, err := txn.hooks.onSecondaryOccursLock(txn, locks); !bypass {
  468. return errors.Trace(err)
  469. }
  470. // try one more time after clean lock successfully
  471. for _, lock := range locks {
  472. err = txn.cleanLockWithRetry(lock)
  473. if err != nil {
  474. return errors.Trace(err)
  475. }
  476. // prewrite all secondary rows
  477. locks, err = txn.batchPrewriteSecondaryRows(tbl, rowMs)
  478. if err != nil {
  479. return errors.Trace(err)
  480. }
  481. if len(locks) > 0 {
  482. for _, l := range locks {
  483. log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", l.Coordinate(), l, l.Timestamp())
  484. }
  485. return ErrRetryable
  486. }
  487. }
  488. }
  489. return nil
  490. }
  491. func (txn *themisTxn) prewriteRowWithLockClean(tbl []byte, mutation *rowMutation, containPrimary bool) error {
  492. lock, err := txn.prewriteRow(tbl, mutation, containPrimary)
  493. if err != nil {
  494. return errors.Trace(err)
  495. }
  496. // lock clean
  497. if lock != nil {
  498. // hook for test
  499. if bypass, _, err := txn.hooks.beforePrewriteLockClean(txn, lock); !bypass {
  500. return errors.Trace(err)
  501. }
  502. err = txn.cleanLockWithRetry(lock)
  503. if err != nil {
  504. return errors.Trace(err)
  505. }
  506. // try one more time after clean lock successfully
  507. lock, err = txn.prewriteRow(tbl, mutation, containPrimary)
  508. if err != nil {
  509. return errors.Trace(err)
  510. }
  511. if lock != nil {
  512. log.Errorf("can't clean lock, column:%q; conflict lock: %+v, lock ts: %d", lock.Coordinate(), lock, lock.Timestamp())
  513. return ErrRetryable
  514. }
  515. }
  516. return nil
  517. }
  518. func (txn *themisTxn) batchPrewriteSecondaryRows(tbl []byte, rowMs map[string]*rowMutation) (map[string]Lock, error) {
  519. return txn.rpc.batchPrewriteSecondaryRows(tbl, rowMs, txn.startTs, txn.secondaryLockBytes)
  520. }
  521. func (txn *themisTxn) prewriteRow(tbl []byte, mutation *rowMutation, containPrimary bool) (Lock, error) {
  522. // hook for test
  523. if bypass, ret, err := txn.hooks.onPrewriteRow(txn, []interface{}{mutation, containPrimary}); !bypass {
  524. return ret.(Lock), errors.Trace(err)
  525. }
  526. if containPrimary {
  527. // try to get lock
  528. return txn.rpc.prewriteRow(tbl, mutation.row,
  529. mutation.mutationList(true),
  530. txn.startTs,
  531. txn.constructPrimaryLock().Encode(),
  532. txn.secondaryLockBytes, txn.primaryRowOffset)
  533. }
  534. return txn.rpc.prewriteSecondaryRow(tbl, mutation.row,
  535. mutation.mutationList(true),
  536. txn.startTs,
  537. txn.secondaryLockBytes)
  538. }
  539. func (txn *themisTxn) prewritePrimary() error {
  540. // hook for test
  541. if bypass, _, err := txn.hooks.beforePrewritePrimary(txn, nil); !bypass {
  542. return err
  543. }
  544. err := txn.prewriteRowWithLockClean(txn.primary.Table, txn.primaryRow, true)
  545. if err != nil {
  546. log.Debugf("prewrite primary %v %q failed: %v", txn.startTs, txn.primaryRow.row, err.Error())
  547. return errors.Trace(err)
  548. }
  549. log.Debugf("prewrite primary %v %q successfully", txn.startTs, txn.primaryRow.row)
  550. return nil
  551. }
  552. func (txn *themisTxn) prewriteSecondary() error {
  553. // hook for test
  554. if bypass, _, err := txn.hooks.beforePrewriteSecondary(txn, nil); !bypass {
  555. return err
  556. }
  557. if txn.conf.brokenPrewriteSecondaryTest {
  558. return txn.brokenPrewriteSecondary()
  559. }
  560. if txn.conf.ConcurrentPrewriteAndCommit {
  561. return txn.batchPrewriteSecondaries()
  562. }
  563. return txn.prewriteSecondarySync()
  564. }
  565. func (txn *themisTxn) prewriteSecondarySync() error {
  566. for i, mu := range txn.secondaryRows {
  567. err := txn.prewriteRowWithLockClean(mu.tbl, mu, false)
  568. if err != nil {
  569. // rollback
  570. txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
  571. txn.rollbackSecondaryRow(i)
  572. return errors.Trace(err)
  573. }
  574. }
  575. return nil
  576. }
  577. // just for test
  578. func (txn *themisTxn) brokenCommitPrimary() error {
  579. // do nothing
  580. log.Warn("Simulating primary commit failed")
  581. return nil
  582. }
  583. // just for test
  584. func (txn *themisTxn) brokenCommitSecondary() {
  585. // do nothing
  586. log.Warn("Simulating secondary commit failed")
  587. }
  588. func (txn *themisTxn) brokenPrewriteSecondary() error {
  589. log.Warn("Simulating prewrite secondary failed")
  590. for i, rm := range txn.secondaryRows {
  591. if i == len(txn.secondary)-1 {
  592. if !txn.conf.brokenPrewriteSecondaryAndRollbackTest {
  593. // simulating prewrite failed, need rollback
  594. txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
  595. txn.rollbackSecondaryRow(i)
  596. }
  597. // maybe rollback occurs error too
  598. return ErrSimulated
  599. }
  600. txn.prewriteRowWithLockClean(rm.tbl, rm, false)
  601. }
  602. return nil
  603. }
  604. func (txn *themisTxn) batchPrewriteSecondaries() error {
  605. wg := sync.WaitGroup{}
  606. //will batch prewrite all rows in a region
  607. rsRowMap, err := txn.groupByRegion()
  608. if err != nil {
  609. return errors.Trace(err)
  610. }
  611. errChan := make(chan error, len(rsRowMap))
  612. defer close(errChan)
  613. successChan := make(chan map[string]*rowMutation, len(rsRowMap))
  614. defer close(successChan)
  615. for _, regionRowMap := range rsRowMap {
  616. wg.Add(1)
  617. _, firstRowM := getFirstEntity(regionRowMap)
  618. go func(tbl []byte, rMap map[string]*rowMutation) {
  619. defer wg.Done()
  620. err := txn.batchPrewriteSecondaryRowsWithLockClean(tbl, rMap)
  621. if err != nil {
  622. errChan <- err
  623. } else {
  624. successChan <- rMap
  625. }
  626. }(firstRowM.tbl, regionRowMap)
  627. }
  628. wg.Wait()
  629. if len(errChan) != 0 {
  630. // occur error, clean success prewrite mutations
  631. log.Warnf("batch prewrite secondary rows error, rolling back %d %d", len(successChan), txn.startTs)
  632. txn.rollbackRow(txn.primaryRow.tbl, txn.primaryRow)
  633. L:
  634. for {
  635. select {
  636. case succMutMap := <-successChan:
  637. {
  638. for _, rowMut := range succMutMap {
  639. txn.rollbackRow(rowMut.tbl, rowMut)
  640. }
  641. }
  642. default:
  643. break L
  644. }
  645. }
  646. err := <-errChan
  647. if err != nil {
  648. log.Error("batch prewrite secondary rows error, txn:", txn.startTs, err)
  649. }
  650. return errors.Trace(err)
  651. }
  652. return nil
  653. }
  654. func getFirstEntity(rowMap map[string]*rowMutation) (string, *rowMutation) {
  655. for row, rowM := range rowMap {
  656. return row, rowM
  657. }
  658. return "", nil
  659. }
  660. func getBatchGroupKey(rInfo *hbase.RegionInfo, tblName string) string {
  661. return rInfo.Server + "_" + rInfo.Name
  662. }
  663. func (txn *themisTxn) rollbackRow(tbl []byte, mutation *rowMutation) error {
  664. l := fmt.Sprintf("\nrolling back %q %d {\n", mutation.row, txn.startTs)
  665. for _, v := range mutation.getColumns() {
  666. l += fmt.Sprintf("\t%s:%s\n", string(v.Family), string(v.Qual))
  667. }
  668. l += "}\n"
  669. log.Warn(l)
  670. for _, col := range mutation.getColumns() {
  671. cc := &hbase.ColumnCoordinate{
  672. Table: tbl,
  673. Row: mutation.row,
  674. Column: col,
  675. }
  676. err := txn.lockCleaner.EraseLockAndData(cc, txn.startTs)
  677. if err != nil {
  678. return errors.Trace(err)
  679. }
  680. }
  681. return nil
  682. }
  683. func (txn *themisTxn) rollbackSecondaryRow(successIndex int) error {
  684. for i := successIndex; i >= 0; i-- {
  685. r := txn.secondaryRows[i]
  686. err := txn.rollbackRow(r.tbl, r)
  687. if err != nil {
  688. return errors.Trace(err)
  689. }
  690. }
  691. return nil
  692. }
  693. func (txn *themisTxn) GetScanner(tbl []byte, startKey, endKey []byte, batchSize int) *ThemisScanner {
  694. scanner := newThemisScanner(tbl, txn, batchSize, txn.client)
  695. if startKey != nil {
  696. scanner.setStartRow(startKey)
  697. }
  698. if endKey != nil {
  699. scanner.setStopRow(endKey)
  700. }
  701. return scanner
  702. }
  703. func (txn *themisTxn) Release() {
  704. txn.primary = nil
  705. txn.primaryRow = nil
  706. txn.secondary = nil
  707. txn.secondaryRows = nil
  708. txn.startTs = 0
  709. txn.commitTs = 0
  710. }
  711. func (txn *themisTxn) String() string {
  712. return fmt.Sprintf("%d", txn.startTs)
  713. }
  714. func (txn *themisTxn) GetCommitTS() uint64 {
  715. return txn.commitTs
  716. }
  717. func (txn *themisTxn) GetStartTS() uint64 {
  718. return txn.startTs
  719. }
  720. func (txn *themisTxn) LockRow(tbl string, rowkey []byte) error {
  721. g := hbase.NewGet(rowkey)
  722. r, err := txn.Get(tbl, g)
  723. if err != nil {
  724. log.Warnf("get row error, table:%s, row:%q, error:%v", tbl, rowkey, err)
  725. return errors.Trace(err)
  726. }
  727. if r == nil {
  728. log.Warnf("has not data to lock, table:%s, row:%q", tbl, rowkey)
  729. return nil
  730. }
  731. for _, v := range r.Columns {
  732. txn.mutationCache.addMutation([]byte(tbl), rowkey, &v.Column, hbase.TypeMinimum, nil, true)
  733. }
  734. return nil
  735. }