You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
4.0 KiB

  1. package nodb
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "time"
  6. "github.com/lunny/nodb/store"
  7. )
  8. var (
  9. errExpMetaKey = errors.New("invalid expire meta key")
  10. errExpTimeKey = errors.New("invalid expire time key")
  11. )
  12. type retireCallback func(*batch, []byte) int64
  13. type elimination struct {
  14. db *DB
  15. exp2Tx []*batch
  16. exp2Retire []retireCallback
  17. }
  18. var errExpType = errors.New("invalid expire type")
  19. func (db *DB) expEncodeTimeKey(dataType byte, key []byte, when int64) []byte {
  20. buf := make([]byte, len(key)+11)
  21. buf[0] = db.index
  22. buf[1] = ExpTimeType
  23. buf[2] = dataType
  24. pos := 3
  25. binary.BigEndian.PutUint64(buf[pos:], uint64(when))
  26. pos += 8
  27. copy(buf[pos:], key)
  28. return buf
  29. }
  30. func (db *DB) expEncodeMetaKey(dataType byte, key []byte) []byte {
  31. buf := make([]byte, len(key)+3)
  32. buf[0] = db.index
  33. buf[1] = ExpMetaType
  34. buf[2] = dataType
  35. pos := 3
  36. copy(buf[pos:], key)
  37. return buf
  38. }
  39. func (db *DB) expDecodeMetaKey(mk []byte) (byte, []byte, error) {
  40. if len(mk) <= 3 || mk[0] != db.index || mk[1] != ExpMetaType {
  41. return 0, nil, errExpMetaKey
  42. }
  43. return mk[2], mk[3:], nil
  44. }
  45. func (db *DB) expDecodeTimeKey(tk []byte) (byte, []byte, int64, error) {
  46. if len(tk) < 11 || tk[0] != db.index || tk[1] != ExpTimeType {
  47. return 0, nil, 0, errExpTimeKey
  48. }
  49. return tk[2], tk[11:], int64(binary.BigEndian.Uint64(tk[3:])), nil
  50. }
  51. func (db *DB) expire(t *batch, dataType byte, key []byte, duration int64) {
  52. db.expireAt(t, dataType, key, time.Now().Unix()+duration)
  53. }
  54. func (db *DB) expireAt(t *batch, dataType byte, key []byte, when int64) {
  55. mk := db.expEncodeMetaKey(dataType, key)
  56. tk := db.expEncodeTimeKey(dataType, key, when)
  57. t.Put(tk, mk)
  58. t.Put(mk, PutInt64(when))
  59. }
  60. func (db *DB) ttl(dataType byte, key []byte) (t int64, err error) {
  61. mk := db.expEncodeMetaKey(dataType, key)
  62. if t, err = Int64(db.bucket.Get(mk)); err != nil || t == 0 {
  63. t = -1
  64. } else {
  65. t -= time.Now().Unix()
  66. if t <= 0 {
  67. t = -1
  68. }
  69. // if t == -1 : to remove ????
  70. }
  71. return t, err
  72. }
  73. func (db *DB) rmExpire(t *batch, dataType byte, key []byte) (int64, error) {
  74. mk := db.expEncodeMetaKey(dataType, key)
  75. if v, err := db.bucket.Get(mk); err != nil {
  76. return 0, err
  77. } else if v == nil {
  78. return 0, nil
  79. } else if when, err2 := Int64(v, nil); err2 != nil {
  80. return 0, err2
  81. } else {
  82. tk := db.expEncodeTimeKey(dataType, key, when)
  83. t.Delete(mk)
  84. t.Delete(tk)
  85. return 1, nil
  86. }
  87. }
  88. func (db *DB) expFlush(t *batch, dataType byte) (err error) {
  89. minKey := make([]byte, 3)
  90. minKey[0] = db.index
  91. minKey[1] = ExpTimeType
  92. minKey[2] = dataType
  93. maxKey := make([]byte, 3)
  94. maxKey[0] = db.index
  95. maxKey[1] = ExpMetaType
  96. maxKey[2] = dataType + 1
  97. _, err = db.flushRegion(t, minKey, maxKey)
  98. err = t.Commit()
  99. return
  100. }
  101. //////////////////////////////////////////////////////////
  102. //
  103. //////////////////////////////////////////////////////////
  104. func newEliminator(db *DB) *elimination {
  105. eli := new(elimination)
  106. eli.db = db
  107. eli.exp2Tx = make([]*batch, maxDataType)
  108. eli.exp2Retire = make([]retireCallback, maxDataType)
  109. return eli
  110. }
  111. func (eli *elimination) regRetireContext(dataType byte, t *batch, onRetire retireCallback) {
  112. // todo .. need to ensure exist - mapExpMetaType[expType]
  113. eli.exp2Tx[dataType] = t
  114. eli.exp2Retire[dataType] = onRetire
  115. }
  116. // call by outside ... (from *db to another *db)
  117. func (eli *elimination) active() {
  118. now := time.Now().Unix()
  119. db := eli.db
  120. dbGet := db.bucket.Get
  121. minKey := db.expEncodeTimeKey(NoneType, nil, 0)
  122. maxKey := db.expEncodeTimeKey(maxDataType, nil, now)
  123. it := db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeROpen, 0, -1)
  124. for ; it.Valid(); it.Next() {
  125. tk := it.RawKey()
  126. mk := it.RawValue()
  127. dt, k, _, err := db.expDecodeTimeKey(tk)
  128. if err != nil {
  129. continue
  130. }
  131. t := eli.exp2Tx[dt]
  132. onRetire := eli.exp2Retire[dt]
  133. if tk == nil || onRetire == nil {
  134. continue
  135. }
  136. t.Lock()
  137. if exp, err := Int64(dbGet(mk)); err == nil {
  138. // check expire again
  139. if exp <= now {
  140. onRetire(t, k)
  141. t.Delete(tk)
  142. t.Delete(mk)
  143. t.Commit()
  144. }
  145. }
  146. t.Unlock()
  147. }
  148. it.Close()
  149. return
  150. }