You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

100 lines
2.5 KiB

  1. package hbase
  2. import (
  3. "strings"
  4. pb "github.com/golang/protobuf/proto"
  5. "github.com/pingcap/go-hbase/proto"
  6. )
  7. type call struct {
  8. id uint32
  9. methodName string
  10. request pb.Message
  11. responseBuffer pb.Message
  12. responseCh chan pb.Message
  13. }
  14. type exception struct {
  15. msg string
  16. }
  17. func isNotInRegionError(err error) bool {
  18. return strings.Contains(err.Error(), "org.apache.hadoop.hbase.NotServingRegionException")
  19. }
  20. func isUnknownScannerError(err error) bool {
  21. return strings.Contains(err.Error(), "org.apache.hadoop.hbase.UnknownScannerException")
  22. }
  23. func (m *exception) Reset() { *m = exception{} }
  24. func (m *exception) String() string { return m.msg }
  25. func (m *exception) ProtoMessage() {}
  26. func newCall(request pb.Message) *call {
  27. var responseBuffer pb.Message
  28. var methodName string
  29. switch request.(type) {
  30. case *proto.GetRequest:
  31. responseBuffer = &proto.GetResponse{}
  32. methodName = "Get"
  33. case *proto.MutateRequest:
  34. responseBuffer = &proto.MutateResponse{}
  35. methodName = "Mutate"
  36. case *proto.ScanRequest:
  37. responseBuffer = &proto.ScanResponse{}
  38. methodName = "Scan"
  39. case *proto.GetTableDescriptorsRequest:
  40. responseBuffer = &proto.GetTableDescriptorsResponse{}
  41. methodName = "GetTableDescriptors"
  42. case *proto.CoprocessorServiceRequest:
  43. responseBuffer = &proto.CoprocessorServiceResponse{}
  44. methodName = "ExecService"
  45. case *proto.CreateTableRequest:
  46. responseBuffer = &proto.CreateTableResponse{}
  47. methodName = "CreateTable"
  48. case *proto.DisableTableRequest:
  49. responseBuffer = &proto.DisableTableResponse{}
  50. methodName = "DisableTable"
  51. case *proto.EnableTableRequest:
  52. responseBuffer = &proto.EnableTableResponse{}
  53. methodName = "EnableTable"
  54. case *proto.DeleteTableRequest:
  55. responseBuffer = &proto.DeleteTableResponse{}
  56. methodName = "DeleteTable"
  57. case *proto.MultiRequest:
  58. responseBuffer = &proto.MultiResponse{}
  59. methodName = "Multi"
  60. case *proto.SplitRegionRequest:
  61. responseBuffer = &proto.SplitRegionResponse{}
  62. methodName = "SplitRegion"
  63. }
  64. return &call{
  65. methodName: methodName,
  66. request: request,
  67. responseBuffer: responseBuffer,
  68. responseCh: make(chan pb.Message, 1),
  69. }
  70. }
  71. func (c *call) complete(err error, response []byte) {
  72. defer close(c.responseCh)
  73. if err != nil {
  74. c.responseCh <- &exception{
  75. msg: err.Error(),
  76. }
  77. return
  78. }
  79. err = pb.Unmarshal(response, c.responseBuffer)
  80. if err != nil {
  81. c.responseCh <- &exception{
  82. msg: err.Error(),
  83. }
  84. return
  85. }
  86. c.responseCh <- c.responseBuffer
  87. }