You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

899 lines
24 KiB

  1. // zk helper functions
  2. // modified from Vitess project
  3. package zkhelper
  4. import (
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "math/rand"
  9. "os"
  10. "path"
  11. "sort"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/ngaut/go-zookeeper/zk"
  16. "github.com/ngaut/log"
  17. )
  18. var (
  19. // This error is returned by functions that wait for a result
  20. // when they are interrupted.
  21. ErrInterrupted = errors.New("zkutil: obtaining lock was interrupted")
  22. // This error is returned by functions that wait for a result
  23. // when the timeout value is reached.
  24. ErrTimeout = errors.New("zkutil: obtaining lock timed out")
  25. )
  26. const (
  27. // PERM_DIRECTORY are default permissions for a node.
  28. PERM_DIRECTORY = zk.PermAdmin | zk.PermCreate | zk.PermDelete | zk.PermRead | zk.PermWrite
  29. // PERM_FILE allows a zk node to emulate file behavior by disallowing child nodes.
  30. PERM_FILE = zk.PermAdmin | zk.PermRead | zk.PermWrite
  31. MagicPrefix = "zk"
  32. )
  33. func init() {
  34. rand.Seed(time.Now().UnixNano())
  35. }
  36. type MyZkConn struct {
  37. *zk.Conn
  38. }
  39. func (conn *MyZkConn) Seq2Str(seq int64) string {
  40. return fmt.Sprintf("%0.10d", seq)
  41. }
  42. func ConnectToZk(zkAddr string) (Conn, error) {
  43. zkConn, _, err := zk.Connect(strings.Split(zkAddr, ","), 3*time.Second)
  44. if err != nil {
  45. return nil, err
  46. }
  47. return &MyZkConn{Conn: zkConn}, nil
  48. }
  49. func ConnectToZkWithTimeout(zkAddr string, recvTime time.Duration) (Conn, error) {
  50. zkConn, _, err := zk.Connect(strings.Split(zkAddr, ","), recvTime)
  51. if err != nil {
  52. return nil, err
  53. }
  54. return &MyZkConn{Conn: zkConn}, nil
  55. }
  56. func DefaultACLs() []zk.ACL {
  57. return zk.WorldACL(zk.PermAll)
  58. }
  59. func DefaultDirACLs() []zk.ACL {
  60. return zk.WorldACL(PERM_DIRECTORY)
  61. }
  62. func DefaultFileACLs() []zk.ACL {
  63. return zk.WorldACL(PERM_FILE)
  64. }
  65. // IsDirectory returns if this node should be treated as a directory.
  66. func IsDirectory(aclv []zk.ACL) bool {
  67. for _, acl := range aclv {
  68. if acl.Perms != PERM_DIRECTORY {
  69. return false
  70. }
  71. }
  72. return true
  73. }
  74. func ZkErrorEqual(a, b error) bool {
  75. if a != nil && b != nil {
  76. return a.Error() == b.Error()
  77. }
  78. return a == b
  79. }
  80. // Create a path and any pieces required, think mkdir -p.
  81. // Intermediate znodes are always created empty.
  82. func CreateRecursive(zconn Conn, zkPath, value string, flags int, aclv []zk.ACL) (pathCreated string, err error) {
  83. parts := strings.Split(zkPath, "/")
  84. if parts[1] != MagicPrefix {
  85. return "", fmt.Errorf("zkutil: non /%v path: %v", MagicPrefix, zkPath)
  86. }
  87. pathCreated, err = zconn.Create(zkPath, []byte(value), int32(flags), aclv)
  88. if ZkErrorEqual(err, zk.ErrNoNode) {
  89. // Make sure that nodes are either "file" or "directory" to mirror file system
  90. // semantics.
  91. dirAclv := make([]zk.ACL, len(aclv))
  92. for i, acl := range aclv {
  93. dirAclv[i] = acl
  94. dirAclv[i].Perms = PERM_DIRECTORY
  95. }
  96. _, err = CreateRecursive(zconn, path.Dir(zkPath), "", flags, dirAclv)
  97. if err != nil && !ZkErrorEqual(err, zk.ErrNodeExists) {
  98. return "", err
  99. }
  100. pathCreated, err = zconn.Create(zkPath, []byte(value), int32(flags), aclv)
  101. }
  102. return
  103. }
  104. func CreateOrUpdate(zconn Conn, zkPath, value string, flags int, aclv []zk.ACL, recursive bool) (pathCreated string, err error) {
  105. if recursive {
  106. pathCreated, err = CreateRecursive(zconn, zkPath, value, 0, aclv)
  107. } else {
  108. pathCreated, err = zconn.Create(zkPath, []byte(value), 0, aclv)
  109. }
  110. if err != nil && ZkErrorEqual(err, zk.ErrNodeExists) {
  111. pathCreated = ""
  112. _, err = zconn.Set(zkPath, []byte(value), -1)
  113. }
  114. return
  115. }
  116. type pathItem struct {
  117. path string
  118. err error
  119. }
  120. func ChildrenRecursive(zconn Conn, zkPath string) ([]string, error) {
  121. var err error
  122. mutex := sync.Mutex{}
  123. wg := sync.WaitGroup{}
  124. pathList := make([]string, 0, 32)
  125. children, _, err := zconn.Children(zkPath)
  126. if err != nil {
  127. return nil, err
  128. }
  129. for _, child := range children {
  130. wg.Add(1)
  131. go func(child string) {
  132. childPath := path.Join(zkPath, child)
  133. rChildren, zkErr := ChildrenRecursive(zconn, childPath)
  134. if zkErr != nil {
  135. // If other processes are deleting nodes, we need to ignore
  136. // the missing nodes.
  137. if !ZkErrorEqual(zkErr, zk.ErrNoNode) {
  138. mutex.Lock()
  139. err = zkErr
  140. mutex.Unlock()
  141. }
  142. } else {
  143. mutex.Lock()
  144. pathList = append(pathList, child)
  145. for _, rChild := range rChildren {
  146. pathList = append(pathList, path.Join(child, rChild))
  147. }
  148. mutex.Unlock()
  149. }
  150. wg.Done()
  151. }(child)
  152. }
  153. wg.Wait()
  154. mutex.Lock()
  155. defer mutex.Unlock()
  156. if err != nil {
  157. return nil, err
  158. }
  159. return pathList, nil
  160. }
  161. func HasWildcard(path string) bool {
  162. for i := 0; i < len(path); i++ {
  163. switch path[i] {
  164. case '\\':
  165. if i+1 >= len(path) {
  166. return true
  167. } else {
  168. i++
  169. }
  170. case '*', '?', '[':
  171. return true
  172. }
  173. }
  174. return false
  175. }
  176. func resolveRecursive(zconn Conn, parts []string, toplevel bool) ([]string, error) {
  177. for i, part := range parts {
  178. if HasWildcard(part) {
  179. var children []string
  180. zkParentPath := strings.Join(parts[:i], "/")
  181. var err error
  182. children, _, err = zconn.Children(zkParentPath)
  183. if err != nil {
  184. // we asked for something like
  185. // /zk/cell/aaa/* and
  186. // /zk/cell/aaa doesn't exist
  187. // -> return empty list, no error
  188. // (note we check both a regular zk
  189. // error and the error the test
  190. // produces)
  191. if ZkErrorEqual(err, zk.ErrNoNode) {
  192. return nil, nil
  193. }
  194. // otherwise we return the error
  195. return nil, err
  196. }
  197. sort.Strings(children)
  198. results := make([][]string, len(children))
  199. wg := &sync.WaitGroup{}
  200. mu := &sync.Mutex{}
  201. var firstError error
  202. for j, child := range children {
  203. matched, err := path.Match(part, child)
  204. if err != nil {
  205. return nil, err
  206. }
  207. if matched {
  208. // we have a match!
  209. wg.Add(1)
  210. newParts := make([]string, len(parts))
  211. copy(newParts, parts)
  212. newParts[i] = child
  213. go func(j int) {
  214. defer wg.Done()
  215. subResult, err := resolveRecursive(zconn, newParts, false)
  216. if err != nil {
  217. mu.Lock()
  218. if firstError != nil {
  219. log.Infof("Multiple error: %v", err)
  220. } else {
  221. firstError = err
  222. }
  223. mu.Unlock()
  224. } else {
  225. results[j] = subResult
  226. }
  227. }(j)
  228. }
  229. }
  230. wg.Wait()
  231. if firstError != nil {
  232. return nil, firstError
  233. }
  234. result := make([]string, 0, 32)
  235. for j := 0; j < len(children); j++ {
  236. subResult := results[j]
  237. if subResult != nil {
  238. result = append(result, subResult...)
  239. }
  240. }
  241. // we found a part that is a wildcard, we
  242. // added the children already, we're done
  243. return result, nil
  244. }
  245. }
  246. // no part contains a wildcard, add the path if it exists, and done
  247. path := strings.Join(parts, "/")
  248. if toplevel {
  249. // for whatever the user typed at the toplevel, we don't
  250. // check it exists or not, we just return it
  251. return []string{path}, nil
  252. }
  253. // this is an expanded path, we need to check if it exists
  254. _, stat, err := zconn.Exists(path)
  255. if err != nil {
  256. return nil, err
  257. }
  258. if stat != nil {
  259. return []string{path}, nil
  260. }
  261. return nil, nil
  262. }
  263. // resolve paths like:
  264. // /zk/nyc/vt/tablets/*/action
  265. // /zk/global/vt/keyspaces/*/shards/*/action
  266. // /zk/*/vt/tablets/*/action
  267. // into real existing paths
  268. //
  269. // If you send paths that don't contain any wildcard and
  270. // don't exist, this function will return an empty array.
  271. func ResolveWildcards(zconn Conn, zkPaths []string) ([]string, error) {
  272. // check all the paths start with /zk/ before doing anything
  273. // time consuming
  274. // relax this in case we are not talking to a metaconn and
  275. // just want to talk to a specified instance.
  276. // for _, zkPath := range zkPaths {
  277. // if _, err := ZkCellFromZkPath(zkPath); err != nil {
  278. // return nil, err
  279. // }
  280. // }
  281. results := make([][]string, len(zkPaths))
  282. wg := &sync.WaitGroup{}
  283. mu := &sync.Mutex{}
  284. var firstError error
  285. for i, zkPath := range zkPaths {
  286. wg.Add(1)
  287. parts := strings.Split(zkPath, "/")
  288. go func(i int) {
  289. defer wg.Done()
  290. subResult, err := resolveRecursive(zconn, parts, true)
  291. if err != nil {
  292. mu.Lock()
  293. if firstError != nil {
  294. log.Infof("Multiple error: %v", err)
  295. } else {
  296. firstError = err
  297. }
  298. mu.Unlock()
  299. } else {
  300. results[i] = subResult
  301. }
  302. }(i)
  303. }
  304. wg.Wait()
  305. if firstError != nil {
  306. return nil, firstError
  307. }
  308. result := make([]string, 0, 32)
  309. for i := 0; i < len(zkPaths); i++ {
  310. subResult := results[i]
  311. if subResult != nil {
  312. result = append(result, subResult...)
  313. }
  314. }
  315. return result, nil
  316. }
  317. func DeleteRecursive(zconn Conn, zkPath string, version int) error {
  318. // version: -1 delete any version of the node at path - only applies to the top node
  319. err := zconn.Delete(zkPath, int32(version))
  320. if err == nil {
  321. return nil
  322. }
  323. if !ZkErrorEqual(err, zk.ErrNotEmpty) {
  324. return err
  325. }
  326. // Remove the ability for other nodes to get created while we are trying to delete.
  327. // Otherwise, you can enter a race condition, or get starved out from deleting.
  328. _, err = zconn.SetACL(zkPath, zk.WorldACL(zk.PermAdmin|zk.PermDelete|zk.PermRead), int32(version))
  329. if err != nil {
  330. return err
  331. }
  332. children, _, err := zconn.Children(zkPath)
  333. if err != nil {
  334. return err
  335. }
  336. for _, child := range children {
  337. err := DeleteRecursive(zconn, path.Join(zkPath, child), -1)
  338. if err != nil && !ZkErrorEqual(err, zk.ErrNoNode) {
  339. return fmt.Errorf("zkutil: recursive delete failed: %v", err)
  340. }
  341. }
  342. err = zconn.Delete(zkPath, int32(version))
  343. if err != nil && !ZkErrorEqual(err, zk.ErrNotEmpty) {
  344. err = fmt.Errorf("zkutil: nodes getting recreated underneath delete (app race condition): %v", zkPath)
  345. }
  346. return err
  347. }
  348. // The lexically lowest node is the lock holder - verify that this
  349. // path holds the lock. Call this queue-lock because the semantics are
  350. // a hybrid. Normal zk locks make assumptions about sequential
  351. // numbering that don't hold when the data in a lock is modified.
  352. // if the provided 'interrupted' chan is closed, we'll just stop waiting
  353. // and return an interruption error
  354. func ObtainQueueLock(zconn Conn, zkPath string, wait time.Duration, interrupted chan struct{}) error {
  355. queueNode := path.Dir(zkPath)
  356. lockNode := path.Base(zkPath)
  357. timer := time.NewTimer(wait)
  358. trylock:
  359. children, _, err := zconn.Children(queueNode)
  360. if err != nil {
  361. return fmt.Errorf("zkutil: trylock failed %v", err)
  362. }
  363. sort.Strings(children)
  364. if len(children) > 0 {
  365. if children[0] == lockNode {
  366. return nil
  367. }
  368. if wait > 0 {
  369. prevLock := ""
  370. for i := 1; i < len(children); i++ {
  371. if children[i] == lockNode {
  372. prevLock = children[i-1]
  373. break
  374. }
  375. }
  376. if prevLock == "" {
  377. return fmt.Errorf("zkutil: no previous queue node found: %v", zkPath)
  378. }
  379. zkPrevLock := path.Join(queueNode, prevLock)
  380. _, stat, watch, err := zconn.ExistsW(zkPrevLock)
  381. if err != nil {
  382. return fmt.Errorf("zkutil: unable to watch queued node %v %v", zkPrevLock, err)
  383. }
  384. if stat == nil {
  385. goto trylock
  386. }
  387. select {
  388. case <-timer.C:
  389. break
  390. case <-interrupted:
  391. return ErrInterrupted
  392. case <-watch:
  393. // The precise event doesn't matter - try to read again regardless.
  394. goto trylock
  395. }
  396. }
  397. return ErrTimeout
  398. }
  399. return fmt.Errorf("zkutil: empty queue node: %v", queueNode)
  400. }
  401. func ZkEventOk(e zk.Event) bool {
  402. return e.State == zk.StateConnected
  403. }
  404. func NodeExists(zconn Conn, zkPath string) (bool, error) {
  405. b, _, err := zconn.Exists(zkPath)
  406. return b, err
  407. }
  408. // Close the release channel when you want to clean up nicely.
  409. func CreatePidNode(zconn Conn, zkPath string, contents string, done chan struct{}) error {
  410. // On the first try, assume the cluster is up and running, that will
  411. // help hunt down any config issues present at startup
  412. if _, err := zconn.Create(zkPath, []byte(contents), zk.FlagEphemeral, zk.WorldACL(PERM_FILE)); err != nil {
  413. if ZkErrorEqual(err, zk.ErrNodeExists) {
  414. err = zconn.Delete(zkPath, -1)
  415. }
  416. if err != nil {
  417. return fmt.Errorf("zkutil: failed deleting pid node: %v: %v", zkPath, err)
  418. }
  419. _, err = zconn.Create(zkPath, []byte(contents), zk.FlagEphemeral, zk.WorldACL(PERM_FILE))
  420. if err != nil {
  421. return fmt.Errorf("zkutil: failed creating pid node: %v: %v", zkPath, err)
  422. }
  423. }
  424. go func() {
  425. for {
  426. _, _, watch, err := zconn.GetW(zkPath)
  427. if err != nil {
  428. if ZkErrorEqual(err, zk.ErrNoNode) {
  429. _, err = zconn.Create(zkPath, []byte(contents), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
  430. if err != nil {
  431. log.Warningf("failed recreating pid node: %v: %v", zkPath, err)
  432. } else {
  433. log.Infof("recreated pid node: %v", zkPath)
  434. continue
  435. }
  436. } else {
  437. log.Warningf("failed reading pid node: %v", err)
  438. }
  439. } else {
  440. select {
  441. case event := <-watch:
  442. if ZkEventOk(event) && event.Type == zk.EventNodeDeleted {
  443. // Most likely another process has started up. However,
  444. // there is a chance that an ephemeral node is deleted by
  445. // the session expiring, yet that same session gets a watch
  446. // notification. This seems like buggy behavior, but rather
  447. // than race too hard on the node, just wait a bit and see
  448. // if the situation resolves itself.
  449. log.Warningf("pid deleted: %v", zkPath)
  450. } else {
  451. log.Infof("pid node event: %v", event)
  452. }
  453. // break here and wait for a bit before attempting
  454. case <-done:
  455. log.Infof("pid watcher stopped on done: %v", zkPath)
  456. return
  457. }
  458. }
  459. select {
  460. // No one likes a thundering herd, least of all zk.
  461. case <-time.After(5*time.Second + time.Duration(rand.Int63n(55e9))):
  462. case <-done:
  463. log.Infof("pid watcher stopped on done: %v", zkPath)
  464. return
  465. }
  466. }
  467. }()
  468. return nil
  469. }
  470. // ZLocker is an interface for a lock that can fail.
  471. type ZLocker interface {
  472. Lock(desc string) error
  473. LockWithTimeout(wait time.Duration, desc string) error
  474. Unlock() error
  475. Interrupt()
  476. }
  477. // Experiment with a little bit of abstraction.
  478. // FIMXE(msolo) This object may need a mutex to ensure it can be shared
  479. // across goroutines.
  480. type zMutex struct {
  481. mu sync.Mutex
  482. zconn Conn
  483. path string // Path under which we try to create lock nodes.
  484. contents string
  485. interrupted chan struct{}
  486. name string // The name of the specific lock node we created.
  487. ephemeral bool
  488. }
  489. // CreateMutex initializes an unaquired mutex. A mutex is released only
  490. // by Unlock. You can clean up a mutex with delete, but you should be
  491. // careful doing so.
  492. func CreateMutex(zconn Conn, zkPath string) ZLocker {
  493. zm, err := CreateMutexWithContents(zconn, zkPath, map[string]interface{}{})
  494. if err != nil {
  495. panic(err) // should never happen
  496. }
  497. return zm
  498. }
  499. // CreateMutex initializes an unaquired mutex with special content for this mutex.
  500. // A mutex is released only by Unlock. You can clean up a mutex with delete, but you should be
  501. // careful doing so.
  502. func CreateMutexWithContents(zconn Conn, zkPath string, contents map[string]interface{}) (ZLocker, error) {
  503. hostname, err := os.Hostname()
  504. if err != nil {
  505. return nil, err
  506. }
  507. pid := os.Getpid()
  508. contents["hostname"] = hostname
  509. contents["pid"] = pid
  510. data, err := json.Marshal(contents)
  511. if err != nil {
  512. return nil, err
  513. }
  514. return &zMutex{zconn: zconn, path: zkPath, contents: string(data), interrupted: make(chan struct{})}, nil
  515. }
  516. // Interrupt releases a lock that's held.
  517. func (zm *zMutex) Interrupt() {
  518. select {
  519. case zm.interrupted <- struct{}{}:
  520. default:
  521. log.Warningf("zmutex interrupt blocked")
  522. }
  523. }
  524. // Lock returns nil when the lock is acquired.
  525. func (zm *zMutex) Lock(desc string) error {
  526. return zm.LockWithTimeout(365*24*time.Hour, desc)
  527. }
  528. // LockWithTimeout returns nil when the lock is acquired. A lock is
  529. // held if the file exists and you are the creator. Setting the wait
  530. // to zero makes this a nonblocking lock check.
  531. //
  532. // FIXME(msolo) Disallow non-super users from removing the lock?
  533. func (zm *zMutex) LockWithTimeout(wait time.Duration, desc string) (err error) {
  534. timer := time.NewTimer(wait)
  535. defer func() {
  536. if panicErr := recover(); panicErr != nil || err != nil {
  537. zm.deleteLock()
  538. }
  539. }()
  540. // Ensure the rendezvous node is here.
  541. // FIXME(msolo) Assuming locks are contended, it will be cheaper to assume this just
  542. // exists.
  543. _, err = CreateRecursive(zm.zconn, zm.path, "", 0, zk.WorldACL(PERM_DIRECTORY))
  544. if err != nil && !ZkErrorEqual(err, zk.ErrNodeExists) {
  545. return err
  546. }
  547. lockPrefix := path.Join(zm.path, "lock-")
  548. zflags := zk.FlagSequence
  549. if zm.ephemeral {
  550. zflags = zflags | zk.FlagEphemeral
  551. }
  552. // update node content
  553. var lockContent map[string]interface{}
  554. err = json.Unmarshal([]byte(zm.contents), &lockContent)
  555. if err != nil {
  556. return err
  557. }
  558. lockContent["desc"] = desc
  559. newContent, err := json.Marshal(lockContent)
  560. if err != nil {
  561. return err
  562. }
  563. createlock:
  564. lockCreated, err := zm.zconn.Create(lockPrefix, newContent, int32(zflags), zk.WorldACL(PERM_FILE))
  565. if err != nil {
  566. return err
  567. }
  568. name := path.Base(lockCreated)
  569. zm.mu.Lock()
  570. zm.name = name
  571. zm.mu.Unlock()
  572. trylock:
  573. children, _, err := zm.zconn.Children(zm.path)
  574. if err != nil {
  575. return fmt.Errorf("zkutil: trylock failed %v", err)
  576. }
  577. sort.Strings(children)
  578. if len(children) == 0 {
  579. return fmt.Errorf("zkutil: empty lock: %v", zm.path)
  580. }
  581. if children[0] == name {
  582. // We are the lock owner.
  583. return nil
  584. }
  585. // This is the degenerate case of a nonblocking lock check. It's not optimal, but
  586. // also probably not worth optimizing.
  587. if wait == 0 {
  588. return ErrTimeout
  589. }
  590. prevLock := ""
  591. for i := 1; i < len(children); i++ {
  592. if children[i] == name {
  593. prevLock = children[i-1]
  594. break
  595. }
  596. }
  597. if prevLock == "" {
  598. // This is an interesting case. The node disappeared
  599. // underneath us, probably due to a session loss. We can
  600. // recreate the lock node (with a new sequence number) and
  601. // keep trying.
  602. log.Warningf("zkutil: no lock node found: %v/%v", zm.path, zm.name)
  603. goto createlock
  604. }
  605. zkPrevLock := path.Join(zm.path, prevLock)
  606. exist, stat, watch, err := zm.zconn.ExistsW(zkPrevLock)
  607. if err != nil {
  608. // FIXME(msolo) Should this be a retry?
  609. return fmt.Errorf("zkutil: unable to watch previous lock node %v %v", zkPrevLock, err)
  610. }
  611. if stat == nil || !exist {
  612. goto trylock
  613. }
  614. select {
  615. case <-timer.C:
  616. return ErrTimeout
  617. case <-zm.interrupted:
  618. return ErrInterrupted
  619. case event := <-watch:
  620. log.Infof("zkutil: lock event: %v", event)
  621. // The precise event doesn't matter - try to read again regardless.
  622. goto trylock
  623. }
  624. panic("unexpected")
  625. }
  626. // Unlock returns nil if the lock was successfully
  627. // released. Otherwise, it is most likely a zk related error.
  628. func (zm *zMutex) Unlock() error {
  629. return zm.deleteLock()
  630. }
  631. func (zm *zMutex) deleteLock() error {
  632. zm.mu.Lock()
  633. zpath := path.Join(zm.path, zm.name)
  634. zm.mu.Unlock()
  635. err := zm.zconn.Delete(zpath, -1)
  636. if err != nil && !ZkErrorEqual(err, zk.ErrNoNode) {
  637. return err
  638. }
  639. return nil
  640. }
  641. // ZElector stores basic state for running an election.
  642. type ZElector struct {
  643. *zMutex
  644. path string
  645. leader string
  646. }
  647. func (ze *ZElector) isLeader() bool {
  648. return ze.leader == ze.name
  649. }
  650. type electionEvent struct {
  651. Event int
  652. Err error
  653. }
  654. type backoffDelay struct {
  655. min time.Duration
  656. max time.Duration
  657. delay time.Duration
  658. }
  659. func newBackoffDelay(min, max time.Duration) *backoffDelay {
  660. return &backoffDelay{min, max, min}
  661. }
  662. func (bd *backoffDelay) NextDelay() time.Duration {
  663. delay := bd.delay
  664. bd.delay = 2 * bd.delay
  665. if bd.delay > bd.max {
  666. bd.delay = bd.max
  667. }
  668. return delay
  669. }
  670. func (bd *backoffDelay) Reset() {
  671. bd.delay = bd.min
  672. }
  673. // ElectorTask is the interface for a task that runs essentially
  674. // forever or until something bad happens. If a task must be stopped,
  675. // it should be handled promptly - no second notification will be
  676. // sent.
  677. type ElectorTask interface {
  678. Run() error
  679. Stop()
  680. // Return true if interrupted, false if it died of natural causes.
  681. // An interrupted task indicates that the election should stop.
  682. Interrupted() bool
  683. }
  684. // CreateElection returns an initialized elector. An election is
  685. // really a cycle of events. You are flip-flopping between leader and
  686. // candidate. It's better to think of this as a stream of events that
  687. // one needs to react to.
  688. func CreateElection(zconn Conn, zkPath string) ZElector {
  689. zm, err := CreateElectionWithContents(zconn, zkPath, map[string]interface{}{})
  690. if err != nil {
  691. // should never happend
  692. panic(err)
  693. }
  694. return zm
  695. }
  696. // CreateElection returns an initialized elector with special contents. An election is
  697. // really a cycle of events. You are flip-flopping between leader and
  698. // candidate. It's better to think of this as a stream of events that
  699. // one needs to react to.
  700. func CreateElectionWithContents(zconn Conn, zkPath string, contents map[string]interface{}) (ZElector, error) {
  701. l, err := CreateMutexWithContents(zconn, path.Join(zkPath, "candidates"), contents)
  702. if err != nil {
  703. return ZElector{}, err
  704. }
  705. zm := l.(*zMutex)
  706. zm.ephemeral = true
  707. return ZElector{zMutex: zm, path: zkPath}, nil
  708. }
  709. // RunTask returns nil when the underlyingtask ends or the error it
  710. // generated.
  711. func (ze *ZElector) RunTask(task ElectorTask) error {
  712. delay := newBackoffDelay(100*time.Millisecond, 1*time.Minute)
  713. leaderPath := path.Join(ze.path, "leader")
  714. for {
  715. _, err := CreateRecursive(ze.zconn, leaderPath, "", 0, zk.WorldACL(PERM_FILE))
  716. if err == nil || ZkErrorEqual(err, zk.ErrNodeExists) {
  717. break
  718. }
  719. log.Warningf("election leader create failed: %v", err)
  720. time.Sleep(delay.NextDelay())
  721. }
  722. for {
  723. err := ze.Lock("RunTask")
  724. if err != nil {
  725. log.Warningf("election lock failed: %v", err)
  726. if err == ErrInterrupted {
  727. return ErrInterrupted
  728. }
  729. continue
  730. }
  731. // Confirm your win and deliver acceptance speech. This notifies
  732. // listeners who will have been watching the leader node for
  733. // changes.
  734. _, err = ze.zconn.Set(leaderPath, []byte(ze.contents), -1)
  735. if err != nil {
  736. log.Warningf("election promotion failed: %v", err)
  737. continue
  738. }
  739. log.Infof("election promote leader %v", leaderPath)
  740. taskErrChan := make(chan error)
  741. go func() {
  742. taskErrChan <- task.Run()
  743. }()
  744. watchLeader:
  745. // Watch the leader so we can get notified if something goes wrong.
  746. data, _, watch, err := ze.zconn.GetW(leaderPath)
  747. if err != nil {
  748. log.Warningf("election unable to watch leader node %v %v", leaderPath, err)
  749. // FIXME(msolo) Add delay
  750. goto watchLeader
  751. }
  752. if string(data) != ze.contents {
  753. log.Warningf("election unable to promote leader")
  754. task.Stop()
  755. // We won the election, but we didn't become the leader. How is that possible?
  756. // (see Bush v. Gore for some inspiration)
  757. // It means:
  758. // 1. Someone isn't playing by the election rules (a bad actor).
  759. // Hard to detect - let's assume we don't have this problem. :)
  760. // 2. We lost our connection somehow and the ephemeral lock was cleared,
  761. // allowing someone else to win the election.
  762. continue
  763. }
  764. // This is where we start our target process and watch for its failure.
  765. waitForEvent:
  766. select {
  767. case <-ze.interrupted:
  768. log.Warning("election interrupted - stop child process")
  769. task.Stop()
  770. // Once the process dies from the signal, this will all tear down.
  771. goto waitForEvent
  772. case taskErr := <-taskErrChan:
  773. // If our code fails, unlock to trigger an election.
  774. log.Infof("election child process ended: %v", taskErr)
  775. ze.Unlock()
  776. if task.Interrupted() {
  777. log.Warningf("election child process interrupted - stepping down")
  778. return ErrInterrupted
  779. }
  780. continue
  781. case zevent := <-watch:
  782. // We had a zk connection hiccup. We have a few choices,
  783. // but it depends on the constraints and the events.
  784. //
  785. // If we get SESSION_EXPIRED our connection loss triggered an
  786. // election that we won't have won and the thus the lock was
  787. // automatically freed. We have no choice but to start over.
  788. if zevent.State == zk.StateExpired {
  789. log.Warningf("election leader watch expired")
  790. task.Stop()
  791. continue
  792. }
  793. // Otherwise, we had an intermittent issue or something touched
  794. // the node. Either we lost our position or someone broke
  795. // protocol and touched the leader node. We just reconnect and
  796. // revalidate. In the meantime, assume we are still the leader
  797. // until we determine otherwise.
  798. //
  799. // On a reconnect we will be able to see the leader
  800. // information. If we still hold the position, great. If not, we
  801. // kill the associated process.
  802. //
  803. // On a leader node change, we need to perform the same
  804. // validation. It's possible an election completes without the
  805. // old leader realizing he is out of touch.
  806. log.Warningf("election leader watch event %v", zevent)
  807. goto watchLeader
  808. }
  809. }
  810. panic("unreachable")
  811. }