You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

519 lines
12 KiB

  1. // Copyright 2013, Google Inc. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package fakezk is a pretty complete mock implementation of a
  5. // Zookeper connection (see go/zk/zk.Conn). All operations
  6. // work as expected with the exceptions of zk.Conn.ACL and
  7. // zk.Conn.SetACL. zk.Conn.SetACL will succeed, but it is a noop (and
  8. // the ACLs won't be respected). zk.Conn.ACL will panic. It is OK to
  9. // access the connection from multiple goroutines, but the locking is
  10. // very naive (every operation locks the whole connection).
  11. package zkhelper
  12. import (
  13. "bytes"
  14. "encoding/json"
  15. "fmt"
  16. "io/ioutil"
  17. "strings"
  18. "sync"
  19. "time"
  20. "github.com/ngaut/go-zookeeper/zk"
  21. )
  22. type zconn struct {
  23. mu sync.Mutex
  24. root *stat
  25. zxid int64
  26. existWatches map[string][]chan zk.Event
  27. }
  28. func (conn *zconn) getZxid() int64 {
  29. conn.zxid++
  30. return conn.zxid
  31. }
  32. func (conn *zconn) Seq2Str(seq int64) string {
  33. return fmt.Sprintf("%0.10d", seq)
  34. }
  35. // NewConn returns a fake zk.Conn implementation. Data is stored in
  36. // memory, and there's a global connection lock for concurrent access.
  37. func NewConn() Conn {
  38. return &zconn{
  39. root: &stat{
  40. name: "/",
  41. children: make(map[string]*stat),
  42. },
  43. existWatches: make(map[string][]chan zk.Event)}
  44. }
  45. // NewConnFromFile returns a fake zk.Conn implementation, that is seeded
  46. // with the json data extracted from the input file.
  47. func NewConnFromFile(filename string) Conn {
  48. result := &zconn{
  49. root: &stat{
  50. name: "/",
  51. children: make(map[string]*stat),
  52. },
  53. existWatches: make(map[string][]chan zk.Event)}
  54. data, err := ioutil.ReadFile(filename)
  55. if err != nil {
  56. panic(fmt.Errorf("NewConnFromFile failed to read file %v: %v", filename, err))
  57. }
  58. values := make(map[string]interface{})
  59. if err := json.Unmarshal(data, &values); err != nil {
  60. panic(fmt.Errorf("NewConnFromFile failed to json.Unmarshal file %v: %v", filename, err))
  61. }
  62. for k, v := range values {
  63. jv, err := json.Marshal(v)
  64. if err != nil {
  65. panic(fmt.Errorf("NewConnFromFile failed to json.Marshal value %v: %v", k, err))
  66. }
  67. // CreateRecursive will work for a leaf node where the parent
  68. // doesn't exist, but not for a node in the middle of a tree
  69. // that already exists. So have to use 'Set' as a backup.
  70. if _, err := CreateRecursive(result, k, string(jv), 0, nil); err != nil {
  71. if ZkErrorEqual(err, zk.ErrNodeExists) {
  72. _, err = result.Set(k, jv, -1)
  73. }
  74. if err != nil {
  75. panic(fmt.Errorf("NewConnFromFile failed to zk.CreateRecursive value %v: %v", k, err))
  76. }
  77. }
  78. }
  79. return result
  80. }
  81. func (conn *zconn) GetACL(path string) ([]zk.ACL, zk.Stat, error) {
  82. return nil, nil, nil
  83. }
  84. func (conn *zconn) Get(zkPath string) (data []byte, stat zk.Stat, err error) {
  85. conn.mu.Lock()
  86. defer conn.mu.Unlock()
  87. node, _, rest, err := conn.getNode(zkPath, "get")
  88. if err != nil {
  89. return nil, nil, err
  90. }
  91. if len(rest) != 0 {
  92. return nil, nil, zkError(zk.ErrNoNode, "get", zkPath)
  93. }
  94. return []byte(node.content), node, nil
  95. }
  96. func (conn *zconn) GetW(zkPath string) (data []byte, stat zk.Stat, watch <-chan zk.Event, err error) {
  97. conn.mu.Lock()
  98. defer conn.mu.Unlock()
  99. node, _, rest, err := conn.getNode(zkPath, "getw")
  100. if err != nil {
  101. return nil, nil, nil, err
  102. }
  103. if len(rest) != 0 {
  104. return nil, nil, nil, zkError(zk.ErrNoNode, "getw", zkPath)
  105. }
  106. c := make(chan zk.Event, 1)
  107. node.changeWatches = append(node.changeWatches, c)
  108. return []byte(node.content), node, c, nil
  109. }
  110. func (conn *zconn) Children(zkPath string) (children []string, stat zk.Stat, err error) {
  111. conn.mu.Lock()
  112. defer conn.mu.Unlock()
  113. //println("Children:", conn.String())
  114. node, _, rest, err := conn.getNode(zkPath, "children")
  115. if err != nil {
  116. return nil, nil, err
  117. }
  118. if len(rest) != 0 {
  119. return nil, nil, zkError(zk.ErrNoNode, "children", zkPath)
  120. }
  121. for name := range node.children {
  122. children = append(children, name)
  123. }
  124. return children, node, nil
  125. }
  126. func (conn *zconn) ChildrenW(zkPath string) (children []string, stat zk.Stat, watch <-chan zk.Event, err error) {
  127. conn.mu.Lock()
  128. defer conn.mu.Unlock()
  129. //println("ChildrenW:", conn.String())
  130. node, _, rest, err := conn.getNode(zkPath, "childrenw")
  131. if err != nil {
  132. return nil, nil, nil, err
  133. }
  134. if len(rest) != 0 {
  135. return nil, nil, nil, zkError(zk.ErrNoNode, "childrenw", zkPath)
  136. }
  137. c := make(chan zk.Event, 1)
  138. node.childrenWatches = append(node.childrenWatches, c)
  139. for name := range node.children {
  140. children = append(children, name)
  141. }
  142. return children, node, c, nil
  143. }
  144. func (conn *zconn) Exists(zkPath string) (exist bool, stat zk.Stat, err error) {
  145. // FIXME(szopa): if the path is bad, Op will be "get."
  146. exist = false
  147. _, stat, err = conn.Get(zkPath)
  148. if err != nil {
  149. if ZkErrorEqual(err, zk.ErrNoNode) {
  150. err = nil
  151. }
  152. } else {
  153. exist = true
  154. }
  155. return exist, stat, err
  156. }
  157. func (conn *zconn) ExistsW(zkPath string) (exist bool, stat zk.Stat, watch <-chan zk.Event, err error) {
  158. conn.mu.Lock()
  159. defer conn.mu.Unlock()
  160. exist = false
  161. c := make(chan zk.Event, 1)
  162. node, _, rest, err := conn.getNode(zkPath, "existsw")
  163. if err != nil {
  164. return exist, nil, nil, err
  165. }
  166. if len(rest) != 0 {
  167. watches, ok := conn.existWatches[zkPath]
  168. if !ok {
  169. watches = make([]chan zk.Event, 0)
  170. conn.existWatches[zkPath] = watches
  171. }
  172. conn.existWatches[zkPath] = append(watches, c)
  173. return exist, nil, c, nil
  174. }
  175. exist = true
  176. node.existWatches = append(node.existWatches, c)
  177. return exist, node, c, nil
  178. }
  179. func (conn *zconn) Create(zkPath string, value []byte, flags int32, aclv []zk.ACL) (zkPathCreated string, err error) {
  180. conn.mu.Lock()
  181. defer conn.mu.Unlock()
  182. node, _, rest, err := conn.getNode(zkPath, "create")
  183. if err != nil {
  184. return "", err
  185. }
  186. if len(rest) == 0 {
  187. return "", zkError(zk.ErrNodeExists, "create", zkPath)
  188. }
  189. if len(rest) > 1 {
  190. return "", zkError(zk.ErrNoNode, "create", zkPath)
  191. }
  192. zxid := conn.getZxid()
  193. name := rest[0]
  194. if (flags & zk.FlagSequence) != 0 {
  195. sequence := node.nextSequence()
  196. name += sequence
  197. zkPath = zkPath + sequence
  198. }
  199. stat := &stat{
  200. name: name,
  201. content: string(value),
  202. children: make(map[string]*stat),
  203. acl: aclv,
  204. mtime: time.Now(),
  205. ctime: time.Now(),
  206. czxid: zxid,
  207. mzxid: zxid,
  208. existWatches: make([]chan zk.Event, 0),
  209. }
  210. node.children[name] = stat
  211. event := zk.Event{
  212. Type: zk.EventNodeCreated,
  213. Path: zkPath,
  214. State: zk.StateConnected,
  215. }
  216. if watches, ok := conn.existWatches[zkPath]; ok {
  217. delete(conn.existWatches, zkPath)
  218. for _, watch := range watches {
  219. watch <- event
  220. }
  221. }
  222. childrenEvent := zk.Event{
  223. Type: zk.EventNodeChildrenChanged,
  224. Path: zkPath,
  225. State: zk.StateConnected,
  226. }
  227. for _, watch := range node.childrenWatches {
  228. watch <- childrenEvent
  229. close(watch)
  230. }
  231. node.childrenWatches = nil
  232. node.cversion++
  233. return zkPath, nil
  234. }
  235. func (conn *zconn) Set(zkPath string, value []byte, version int32) (stat zk.Stat, err error) {
  236. conn.mu.Lock()
  237. defer conn.mu.Unlock()
  238. node, _, rest, err := conn.getNode(zkPath, "set")
  239. if err != nil {
  240. return nil, err
  241. }
  242. if len(rest) != 0 {
  243. return nil, zkError(zk.ErrNoNode, "set", zkPath)
  244. }
  245. if version != -1 && node.version != int(version) {
  246. return nil, zkError(zk.ErrBadVersion, "set", zkPath)
  247. }
  248. node.content = string(value)
  249. node.version++
  250. for _, watch := range node.changeWatches {
  251. watch <- zk.Event{
  252. Type: zk.EventNodeDataChanged,
  253. Path: zkPath,
  254. State: zk.StateConnected,
  255. }
  256. }
  257. node.changeWatches = nil
  258. return node, nil
  259. }
  260. func (conn *zconn) Delete(zkPath string, version int32) (err error) {
  261. conn.mu.Lock()
  262. defer conn.mu.Unlock()
  263. node, parent, rest, err := conn.getNode(zkPath, "delete")
  264. if err != nil {
  265. return err
  266. }
  267. if len(rest) > 0 {
  268. return zkError(zk.ErrNoNode, "delete", zkPath)
  269. }
  270. if len(node.children) > 0 {
  271. return zkError(zk.ErrNotEmpty, "delete", zkPath)
  272. }
  273. delete(parent.children, node.name)
  274. event := zk.Event{
  275. Type: zk.EventNodeDeleted,
  276. Path: zkPath,
  277. State: zk.StateConnected,
  278. }
  279. for _, watch := range node.existWatches {
  280. watch <- event
  281. }
  282. for _, watch := range node.changeWatches {
  283. watch <- event
  284. }
  285. node.existWatches = nil
  286. node.changeWatches = nil
  287. childrenEvent := zk.Event{
  288. Type: zk.EventNodeChildrenChanged,
  289. Path: zkPath,
  290. State: zk.StateConnected}
  291. for _, watch := range parent.childrenWatches {
  292. watch <- childrenEvent
  293. }
  294. return nil
  295. }
  296. func (conn *zconn) Close() {
  297. conn.mu.Lock()
  298. defer conn.mu.Unlock()
  299. for _, watches := range conn.existWatches {
  300. for _, c := range watches {
  301. close(c)
  302. }
  303. }
  304. conn.root.closeAllWatches()
  305. }
  306. /*
  307. func (conn *zconn) RetryChange(path string, flags int, acl []zk.ACL, changeFunc zk.ChangeFunc) error {
  308. for {
  309. oldValue, oldStat, err := conn.Get(path)
  310. if err != nil && !ZkErrorEqual(err, zk.ErrNoNode) {
  311. return err
  312. }
  313. newValue, err := changeFunc(oldValue, oldStat)
  314. if err != nil {
  315. return err
  316. }
  317. if oldStat == nil {
  318. _, err := conn.Create(path, newValue, flags, acl)
  319. if err == nil || !ZkErrorEqual(err, zk.ZNODEEXISTS) {
  320. return err
  321. }
  322. continue
  323. }
  324. if newValue == oldValue {
  325. return nil // Nothing to do.
  326. }
  327. _, err = conn.Set(path, newValue, oldStat.Version())
  328. if err == nil || !ZkErrorEqual(err, zk.ZBADVERSION) && !ZkErrorEqual(err, zk.ErrNoNode) {
  329. return err
  330. }
  331. }
  332. }
  333. */
  334. func (conn *zconn) SetACL(zkPath string, aclv []zk.ACL, version int32) (zk.Stat, error) {
  335. return nil, nil
  336. }
  337. func (conn *zconn) getNode(zkPath string, op string) (node *stat, parent *stat, rest []string, err error) {
  338. // FIXME(szopa): Make sure the path starts with /.
  339. parts := strings.Split(zkPath, "/")
  340. if parts[0] != "" {
  341. //todo: fix this, error bad arguments
  342. return nil, nil, nil, zkError(zk.ErrUnknown, op, zkPath)
  343. }
  344. elements := parts[1:]
  345. parent = nil
  346. current := conn.root
  347. for i, el := range elements {
  348. candidateParent := current
  349. candidate, ok := current.children[el]
  350. if !ok {
  351. return current, parent, elements[i:], nil
  352. }
  353. current, parent = candidate, candidateParent
  354. }
  355. return current, parent, []string{}, nil
  356. }
  357. type ZkError struct {
  358. Code error
  359. Op string
  360. Path string
  361. }
  362. func (ze *ZkError) Error() string {
  363. return ze.Code.Error()
  364. }
  365. // zkError creates an appropriate error return from
  366. // a ZooKeeper status
  367. func zkError(code error, op, path string) error {
  368. return &ZkError{
  369. Op: op,
  370. Code: code,
  371. Path: path,
  372. }
  373. }
  374. type stat struct {
  375. name string
  376. content string
  377. children map[string]*stat
  378. acl []zk.ACL
  379. mtime time.Time
  380. ctime time.Time
  381. czxid int64
  382. mzxid int64
  383. pzxid int64
  384. version int
  385. cversion int
  386. aversion int
  387. sequence int
  388. existWatches []chan zk.Event
  389. changeWatches []chan zk.Event
  390. childrenWatches []chan zk.Event
  391. }
  392. func (st stat) closeAllWatches() {
  393. for _, c := range st.existWatches {
  394. close(c)
  395. }
  396. for _, c := range st.changeWatches {
  397. close(c)
  398. }
  399. for _, c := range st.childrenWatches {
  400. close(c)
  401. }
  402. for _, child := range st.children {
  403. child.closeAllWatches()
  404. }
  405. }
  406. func (st stat) Czxid() int64 {
  407. return st.czxid
  408. }
  409. func (st stat) Mzxid() int64 {
  410. return st.mzxid
  411. }
  412. func (st stat) CTime() time.Time {
  413. return st.ctime
  414. }
  415. func (st stat) MTime() time.Time {
  416. return st.mtime
  417. }
  418. func (st stat) Version() int {
  419. return st.version
  420. }
  421. func (st stat) CVersion() int {
  422. return st.cversion
  423. }
  424. func (st stat) AVersion() int {
  425. return st.aversion
  426. }
  427. func (st stat) EphemeralOwner() int64 {
  428. return 0
  429. }
  430. func (st stat) DataLength() int {
  431. return len(st.content)
  432. }
  433. func (st stat) NumChildren() int {
  434. return len(st.children)
  435. }
  436. func (st stat) Pzxid() int64 {
  437. return st.pzxid
  438. }
  439. func (st *stat) nextSequence() string {
  440. st.sequence++
  441. return fmt.Sprintf("%010d", st.sequence)
  442. }
  443. func (st stat) fprintRecursive(level int, buf *bytes.Buffer) {
  444. start := strings.Repeat(" ", level)
  445. fmt.Fprintf(buf, "%v-%v:\n", start, st.name)
  446. if st.content != "" {
  447. fmt.Fprintf(buf, "%v content: %q\n\n", start, st.content)
  448. }
  449. if len(st.children) > 0 {
  450. for _, child := range st.children {
  451. child.fprintRecursive(level+1, buf)
  452. }
  453. }
  454. }
  455. func (conn *zconn) String() string {
  456. b := new(bytes.Buffer)
  457. conn.root.fprintRecursive(0, b)
  458. return b.String()
  459. }