You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

340 lines
7.1 KiB

  1. package hbase
  2. import (
  3. "sort"
  4. "strconv"
  5. "strings"
  6. "time"
  7. "github.com/juju/errors"
  8. "github.com/ngaut/log"
  9. "github.com/pingcap/go-hbase/proto"
  10. )
  11. const defaultNS = "default"
  12. type TableName struct {
  13. namespace string
  14. name string
  15. }
  16. func newTableNameWithDefaultNS(tblName string) TableName {
  17. return TableName{
  18. namespace: defaultNS,
  19. name: tblName,
  20. }
  21. }
  22. type TableDescriptor struct {
  23. name TableName
  24. attrs map[string][]byte
  25. cfs []*ColumnFamilyDescriptor
  26. }
  27. func NewTableDesciptor(tblName string) *TableDescriptor {
  28. ret := &TableDescriptor{
  29. name: newTableNameWithDefaultNS(tblName),
  30. attrs: map[string][]byte{},
  31. }
  32. ret.AddAddr("IS_META", "false")
  33. return ret
  34. }
  35. func (c *TableDescriptor) AddAddr(attrName string, val string) {
  36. c.attrs[attrName] = []byte(val)
  37. }
  38. func (t *TableDescriptor) AddColumnDesc(cf *ColumnFamilyDescriptor) {
  39. for _, c := range t.cfs {
  40. if c.name == cf.name {
  41. return
  42. }
  43. }
  44. t.cfs = append(t.cfs, cf)
  45. }
  46. type ColumnFamilyDescriptor struct {
  47. name string
  48. attrs map[string][]byte
  49. }
  50. func (c *ColumnFamilyDescriptor) AddAttr(attrName string, val string) {
  51. c.attrs[attrName] = []byte(val)
  52. }
  53. // Themis will use VERSIONS=1 for some hook.
  54. func NewColumnFamilyDescriptor(name string) *ColumnFamilyDescriptor {
  55. return newColumnFamilyDescriptor(name, 1)
  56. }
  57. func newColumnFamilyDescriptor(name string, versionsNum int) *ColumnFamilyDescriptor {
  58. versions := strconv.Itoa(versionsNum)
  59. ret := &ColumnFamilyDescriptor{
  60. name: name,
  61. attrs: make(map[string][]byte),
  62. }
  63. // add default attrs
  64. ret.AddAttr("DATA_BLOCK_ENCODING", "NONE")
  65. ret.AddAttr("BLOOMFILTER", "ROW")
  66. ret.AddAttr("REPLICATION_SCOPE", "0")
  67. ret.AddAttr("COMPRESSION", "NONE")
  68. ret.AddAttr("VERSIONS", versions)
  69. ret.AddAttr("TTL", "2147483647") // 1 << 31
  70. ret.AddAttr("MIN_VERSIONS", "0")
  71. ret.AddAttr("KEEP_DELETED_CELLS", "false")
  72. ret.AddAttr("BLOCKSIZE", "65536")
  73. ret.AddAttr("IN_MEMORY", "false")
  74. ret.AddAttr("BLOCKCACHE", "true")
  75. return ret
  76. }
  77. func getPauseTime(retry int) int64 {
  78. if retry >= len(retryPauseTime) {
  79. retry = len(retryPauseTime) - 1
  80. }
  81. if retry < 0 {
  82. retry = 0
  83. }
  84. return retryPauseTime[retry] * defaultRetryWaitMs
  85. }
  86. func (c *client) CreateTable(t *TableDescriptor, splits [][]byte) error {
  87. req := &proto.CreateTableRequest{}
  88. schema := &proto.TableSchema{}
  89. sort.Sort(BytesSlice(splits))
  90. schema.TableName = &proto.TableName{
  91. Qualifier: []byte(t.name.name),
  92. Namespace: []byte(t.name.namespace),
  93. }
  94. for k, v := range t.attrs {
  95. schema.Attributes = append(schema.Attributes, &proto.BytesBytesPair{
  96. First: []byte(k),
  97. Second: []byte(v),
  98. })
  99. }
  100. for _, c := range t.cfs {
  101. cf := &proto.ColumnFamilySchema{
  102. Name: []byte(c.name),
  103. }
  104. for k, v := range c.attrs {
  105. cf.Attributes = append(cf.Attributes, &proto.BytesBytesPair{
  106. First: []byte(k),
  107. Second: []byte(v),
  108. })
  109. }
  110. schema.ColumnFamilies = append(schema.ColumnFamilies, cf)
  111. }
  112. req.TableSchema = schema
  113. req.SplitKeys = splits
  114. ch, err := c.adminAction(req)
  115. if err != nil {
  116. return errors.Trace(err)
  117. }
  118. resp := <-ch
  119. switch r := resp.(type) {
  120. case *exception:
  121. return errors.New(r.msg)
  122. }
  123. // wait and check
  124. for retry := 0; retry < defaultMaxActionRetries*retryLongerMultiplier; retry++ {
  125. regCnt := 0
  126. numRegs := len(splits) + 1
  127. err = c.metaScan(t.name.name, func(r *RegionInfo) (bool, error) {
  128. if !(r.Offline || r.Split) && len(r.Server) > 0 && r.TableName == t.name.name {
  129. regCnt++
  130. }
  131. return true, nil
  132. })
  133. if err != nil {
  134. return errors.Trace(err)
  135. }
  136. if regCnt == numRegs {
  137. return nil
  138. }
  139. log.Warnf("Retrying create table for the %d time(s)", retry+1)
  140. time.Sleep(time.Duration(getPauseTime(retry)) * time.Millisecond)
  141. }
  142. return errors.New("create table timeout")
  143. }
  144. func (c *client) DisableTable(tblName string) error {
  145. req := &proto.DisableTableRequest{
  146. TableName: &proto.TableName{
  147. Qualifier: []byte(tblName),
  148. Namespace: []byte(defaultNS),
  149. },
  150. }
  151. ch, err := c.adminAction(req)
  152. if err != nil {
  153. return errors.Trace(err)
  154. }
  155. resp := <-ch
  156. switch r := resp.(type) {
  157. case *exception:
  158. return errors.New(r.msg)
  159. }
  160. return nil
  161. }
  162. func (c *client) EnableTable(tblName string) error {
  163. req := &proto.EnableTableRequest{
  164. TableName: &proto.TableName{
  165. Qualifier: []byte(tblName),
  166. Namespace: []byte(defaultNS),
  167. },
  168. }
  169. ch, err := c.adminAction(req)
  170. if err != nil {
  171. return errors.Trace(err)
  172. }
  173. resp := <-ch
  174. switch r := resp.(type) {
  175. case *exception:
  176. return errors.New(r.msg)
  177. }
  178. return nil
  179. }
  180. func (c *client) DropTable(tblName string) error {
  181. req := &proto.DeleteTableRequest{
  182. TableName: &proto.TableName{
  183. Qualifier: []byte(tblName),
  184. Namespace: []byte(defaultNS),
  185. },
  186. }
  187. ch, err := c.adminAction(req)
  188. if err != nil {
  189. return errors.Trace(err)
  190. }
  191. resp := <-ch
  192. switch r := resp.(type) {
  193. case *exception:
  194. return errors.New(r.msg)
  195. }
  196. return nil
  197. }
  198. func (c *client) metaScan(tbl string, fn func(r *RegionInfo) (bool, error)) error {
  199. scan := NewScan(metaTableName, 0, c)
  200. defer scan.Close()
  201. scan.StartRow = []byte(tbl)
  202. scan.StopRow = nextKey([]byte(tbl))
  203. for {
  204. r := scan.Next()
  205. if r == nil || scan.Closed() {
  206. break
  207. }
  208. region, err := c.parseRegion(r)
  209. if err != nil {
  210. return errors.Trace(err)
  211. }
  212. if more, err := fn(region); !more || err != nil {
  213. return errors.Trace(err)
  214. }
  215. }
  216. return nil
  217. }
  218. func (c *client) TableExists(tbl string) (bool, error) {
  219. found := false
  220. err := c.metaScan(tbl, func(region *RegionInfo) (bool, error) {
  221. if region.TableName == tbl {
  222. found = true
  223. return false, nil
  224. }
  225. return true, nil
  226. })
  227. if err != nil {
  228. return false, errors.Trace(err)
  229. }
  230. return found, nil
  231. }
  232. // Split splits region.
  233. // tblOrRegion table name or region(<tbl>,<endKey>,<timestamp>.<md5>).
  234. // splitPoint which is a key, leave "" if want to split each region automatically.
  235. func (c *client) Split(tblOrRegion, splitPoint string) error {
  236. // Extract table name from supposing regionName.
  237. tbls := strings.SplitN(tblOrRegion, ",", 2)
  238. tbl := tbls[0]
  239. found := false
  240. var foundRegion *RegionInfo
  241. err := c.metaScan(tbl, func(region *RegionInfo) (bool, error) {
  242. if region != nil && region.Name == tblOrRegion {
  243. found = true
  244. foundRegion = region
  245. return false, nil
  246. }
  247. return true, nil
  248. })
  249. if err != nil {
  250. return errors.Trace(err)
  251. }
  252. // This is a region name, split it directly.
  253. if found {
  254. return c.split(foundRegion, []byte(splitPoint))
  255. }
  256. // This is a table name.
  257. tbl = tblOrRegion
  258. regions, err := c.GetRegions([]byte(tbl), false)
  259. if err != nil {
  260. return errors.Trace(err)
  261. }
  262. // Split each region.
  263. for _, region := range regions {
  264. err := c.split(region, []byte(splitPoint))
  265. if err != nil {
  266. return errors.Trace(err)
  267. }
  268. }
  269. return nil
  270. }
  271. func (c *client) split(region *RegionInfo, splitPoint []byte) error {
  272. // Not in this region, skip it.
  273. if len(splitPoint) > 0 && !findKey(region, splitPoint) {
  274. return nil
  275. }
  276. c.CleanRegionCache([]byte(region.TableName))
  277. rs := NewRegionSpecifier(region.Name)
  278. req := &proto.SplitRegionRequest{
  279. Region: rs,
  280. }
  281. if len(splitPoint) > 0 {
  282. req.SplitPoint = splitPoint
  283. }
  284. // Empty response.
  285. _, err := c.regionAction(region.Server, req)
  286. if err != nil {
  287. return errors.Trace(err)
  288. }
  289. return nil
  290. }