You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1385 lines
35 KiB

  1. /*
  2. Package couchbase provides a smart client for go.
  3. Usage:
  4. client, err := couchbase.Connect("http://myserver:8091/")
  5. handleError(err)
  6. pool, err := client.GetPool("default")
  7. handleError(err)
  8. bucket, err := pool.GetBucket("MyAwesomeBucket")
  9. handleError(err)
  10. ...
  11. or a shortcut for the bucket directly
  12. bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")
  13. in any case, you can specify authentication credentials using
  14. standard URL userinfo syntax:
  15. b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
  16. "default", "bucket")
  17. */
  18. package couchbase
  19. import (
  20. "encoding/binary"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "runtime"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "github.com/couchbase/gomemcached"
  32. "github.com/couchbase/gomemcached/client" // package name is 'memcached'
  33. "github.com/couchbase/goutils/logging"
  34. )
  35. // Mutation Token
  36. type MutationToken struct {
  37. VBid uint16 // vbucket id
  38. Guard uint64 // vbuuid
  39. Value uint64 // sequence number
  40. }
  41. // Maximum number of times to retry a chunk of a bulk get on error.
  42. var MaxBulkRetries = 5000
  43. var backOffDuration time.Duration = 100 * time.Millisecond
  44. var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s)
  45. // If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call
  46. // takes longer than that.
  47. var SlowServerCallWarningThreshold time.Duration
  48. func slowLog(startTime time.Time, format string, args ...interface{}) {
  49. if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold {
  50. pc, _, _, _ := runtime.Caller(2)
  51. caller := runtime.FuncForPC(pc).Name()
  52. logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...)
  53. }
  54. }
  55. // Return true if error is KEY_ENOENT. Required by cbq-engine
  56. func IsKeyEExistsError(err error) bool {
  57. res, ok := err.(*gomemcached.MCResponse)
  58. if ok && res.Status == gomemcached.KEY_EEXISTS {
  59. return true
  60. }
  61. return false
  62. }
  63. // Return true if error is KEY_ENOENT. Required by cbq-engine
  64. func IsKeyNoEntError(err error) bool {
  65. res, ok := err.(*gomemcached.MCResponse)
  66. if ok && res.Status == gomemcached.KEY_ENOENT {
  67. return true
  68. }
  69. return false
  70. }
  71. // Return true if error suggests a bucket refresh is required. Required by cbq-engine
  72. func IsRefreshRequired(err error) bool {
  73. res, ok := err.(*gomemcached.MCResponse)
  74. if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) {
  75. return true
  76. }
  77. return false
  78. }
  79. // ClientOpCallback is called for each invocation of Do.
  80. var ClientOpCallback func(opname, k string, start time.Time, err error)
  81. // Do executes a function on a memcached connection to the node owning key "k"
  82. //
  83. // Note that this automatically handles transient errors by replaying
  84. // your function on a "not-my-vbucket" error, so don't assume
  85. // your command will only be executed only once.
  86. func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) {
  87. return b.Do2(k, f, true)
  88. }
  89. func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) {
  90. if SlowServerCallWarningThreshold > 0 {
  91. defer slowLog(time.Now(), "call to Do(%q)", k)
  92. }
  93. vb := b.VBHash(k)
  94. maxTries := len(b.Nodes()) * 2
  95. for i := 0; i < maxTries; i++ {
  96. conn, pool, err := b.getConnectionToVBucket(vb)
  97. if err != nil {
  98. if isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
  99. b.Refresh()
  100. continue
  101. }
  102. return err
  103. }
  104. if deadline && DefaultTimeout > 0 {
  105. conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
  106. err = f(conn, uint16(vb))
  107. conn.SetDeadline(noDeadline)
  108. } else {
  109. err = f(conn, uint16(vb))
  110. }
  111. var retry bool
  112. discard := isOutOfBoundsError(err)
  113. // MB-30967 / MB-31001 implement back off for transient errors
  114. if resp, ok := err.(*gomemcached.MCResponse); ok {
  115. switch resp.Status {
  116. case gomemcached.NOT_MY_VBUCKET:
  117. b.Refresh()
  118. // MB-28842: in case of NMVB, check if the node is still part of the map
  119. // and ditch the connection if it isn't.
  120. discard = b.checkVBmap(pool.Node())
  121. retry = true
  122. case gomemcached.NOT_SUPPORTED:
  123. discard = true
  124. retry = true
  125. case gomemcached.ENOMEM:
  126. fallthrough
  127. case gomemcached.TMPFAIL:
  128. retry = backOff(i, maxTries, backOffDuration, true)
  129. default:
  130. retry = false
  131. }
  132. } else if err != nil && isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
  133. retry = true
  134. }
  135. if discard {
  136. pool.Discard(conn)
  137. } else {
  138. pool.Return(conn)
  139. }
  140. if !retry {
  141. return err
  142. }
  143. }
  144. return fmt.Errorf("unable to complete action after %v attemps", maxTries)
  145. }
  146. type GatheredStats struct {
  147. Server string
  148. Stats map[string]string
  149. Err error
  150. }
  151. func getStatsParallel(sn string, b *Bucket, offset int, which string,
  152. ch chan<- GatheredStats) {
  153. pool := b.getConnPool(offset)
  154. var gatheredStats GatheredStats
  155. conn, err := pool.Get()
  156. defer func() {
  157. pool.Return(conn)
  158. ch <- gatheredStats
  159. }()
  160. if err != nil {
  161. gatheredStats = GatheredStats{Server: sn, Err: err}
  162. } else {
  163. sm, err := conn.StatsMap(which)
  164. gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err}
  165. }
  166. }
  167. // GetStats gets a set of stats from all servers.
  168. //
  169. // Returns a map of server ID -> map of stat key to map value.
  170. func (b *Bucket) GetStats(which string) map[string]map[string]string {
  171. rv := map[string]map[string]string{}
  172. for server, gs := range b.GatherStats(which) {
  173. if len(gs.Stats) > 0 {
  174. rv[server] = gs.Stats
  175. }
  176. }
  177. return rv
  178. }
  179. // GatherStats returns a map of server ID -> GatheredStats from all servers.
  180. func (b *Bucket) GatherStats(which string) map[string]GatheredStats {
  181. vsm := b.VBServerMap()
  182. if vsm.ServerList == nil {
  183. return nil
  184. }
  185. // Go grab all the things at once.
  186. ch := make(chan GatheredStats, len(vsm.ServerList))
  187. for i, sn := range vsm.ServerList {
  188. go getStatsParallel(sn, b, i, which, ch)
  189. }
  190. // Gather the results
  191. rv := map[string]GatheredStats{}
  192. for range vsm.ServerList {
  193. gs := <-ch
  194. rv[gs.Server] = gs
  195. }
  196. return rv
  197. }
  198. // Get bucket count through the bucket stats
  199. func (b *Bucket) GetCount(refresh bool) (count int64, err error) {
  200. if refresh {
  201. b.Refresh()
  202. }
  203. var cnt int64
  204. for _, gs := range b.GatherStats("") {
  205. if len(gs.Stats) > 0 {
  206. cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64)
  207. if err != nil {
  208. return 0, err
  209. }
  210. count += cnt
  211. }
  212. }
  213. return count, nil
  214. }
  215. func isAuthError(err error) bool {
  216. estr := err.Error()
  217. return strings.Contains(estr, "Auth failure")
  218. }
  219. func IsReadTimeOutError(err error) bool {
  220. estr := err.Error()
  221. return strings.Contains(estr, "read tcp") ||
  222. strings.Contains(estr, "i/o timeout")
  223. }
  224. func isTimeoutError(err error) bool {
  225. estr := err.Error()
  226. return strings.Contains(estr, "i/o timeout") ||
  227. strings.Contains(estr, "connection timed out") ||
  228. strings.Contains(estr, "no route to host")
  229. }
  230. // Errors that are not considered fatal for our fetch loop
  231. func isConnError(err error) bool {
  232. if err == io.EOF {
  233. return true
  234. }
  235. estr := err.Error()
  236. return strings.Contains(estr, "broken pipe") ||
  237. strings.Contains(estr, "connection reset") ||
  238. strings.Contains(estr, "connection refused") ||
  239. strings.Contains(estr, "connection pool is closed")
  240. }
  241. func isOutOfBoundsError(err error) bool {
  242. return err != nil && strings.Contains(err.Error(), "Out of Bounds error")
  243. }
  244. func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time {
  245. if reqDeadline.IsZero() && duration > 0 {
  246. return time.Now().Add(duration)
  247. }
  248. return reqDeadline
  249. }
  250. func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool {
  251. if attempt < maxAttempts {
  252. // 0th attempt return immediately
  253. if attempt > 0 {
  254. if exponential {
  255. duration = time.Duration(attempt) * duration
  256. }
  257. time.Sleep(duration)
  258. }
  259. return true
  260. }
  261. return false
  262. }
  263. func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time,
  264. ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
  265. eStatus *errorStatus) {
  266. if SlowServerCallWarningThreshold > 0 {
  267. defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys))
  268. }
  269. rv := _STRING_MCRESPONSE_POOL.Get()
  270. attempts := 0
  271. backOffAttempts := 0
  272. done := false
  273. bname := b.Name
  274. for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ {
  275. if len(b.VBServerMap().VBucketMap) < int(vb) {
  276. //fatal
  277. err := fmt.Errorf("vbmap smaller than requested for %v", bname)
  278. logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap))
  279. ech <- err
  280. return
  281. }
  282. masterID := b.VBServerMap().VBucketMap[vb][0]
  283. if masterID < 0 {
  284. // fatal
  285. err := fmt.Errorf("No master node available for %v vb %d", bname, vb)
  286. logging.Errorf("%v", err.Error())
  287. ech <- err
  288. return
  289. }
  290. // This stack frame exists to ensure we can clean up
  291. // connection at a reasonable time.
  292. err := func() error {
  293. pool := b.getConnPool(masterID)
  294. conn, err := pool.Get()
  295. if err != nil {
  296. if isAuthError(err) || isTimeoutError(err) {
  297. logging.Errorf("Fatal Error %v : %v", bname, err)
  298. ech <- err
  299. return err
  300. } else if isConnError(err) {
  301. if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
  302. logging.Errorf("Connection Error %v : %v", bname, err)
  303. ech <- err
  304. return err
  305. }
  306. b.Refresh()
  307. backOffAttempts++
  308. }
  309. logging.Infof("Pool Get returned %v: %v", bname, err)
  310. // retry
  311. return nil
  312. }
  313. conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
  314. err = conn.GetBulk(vb, keys, rv, subPaths)
  315. conn.SetDeadline(noDeadline)
  316. discard := false
  317. defer func() {
  318. if discard {
  319. pool.Discard(conn)
  320. } else {
  321. pool.Return(conn)
  322. }
  323. }()
  324. switch err.(type) {
  325. case *gomemcached.MCResponse:
  326. notSMaxTries := len(b.Nodes()) * 2
  327. st := err.(*gomemcached.MCResponse).Status
  328. if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) {
  329. b.Refresh()
  330. discard = b.checkVBmap(pool.Node())
  331. return nil // retry
  332. } else if st == gomemcached.EBUSY || st == gomemcached.LOCKED {
  333. if (attempts % (MaxBulkRetries / 100)) == 0 {
  334. logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
  335. err.Error(), bname, vb, keys)
  336. }
  337. return nil // retry
  338. } else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
  339. // MB-30967 / MB-31001 use backoff for TMPFAIL too
  340. backOffAttempts++
  341. logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
  342. err.Error(), bname, vb, keys)
  343. return nil // retry
  344. }
  345. ech <- err
  346. return err
  347. case error:
  348. if isOutOfBoundsError(err) {
  349. // We got an out of bound error, retry the operation
  350. discard = true
  351. return nil
  352. } else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
  353. backOffAttempts++
  354. logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)",
  355. err.Error(), bname, vb, keys)
  356. discard = true
  357. b.Refresh()
  358. return nil // retry
  359. }
  360. ech <- err
  361. ch <- rv
  362. return err
  363. }
  364. done = true
  365. return nil
  366. }()
  367. if err != nil {
  368. return
  369. }
  370. }
  371. if attempts >= MaxBulkRetries {
  372. err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys)
  373. logging.Errorf("%v", err.Error())
  374. ech <- err
  375. }
  376. ch <- rv
  377. }
  378. type errorStatus struct {
  379. errStatus bool
  380. }
  381. type vbBulkGet struct {
  382. b *Bucket
  383. ch chan<- map[string]*gomemcached.MCResponse
  384. ech chan<- error
  385. k uint16
  386. keys []string
  387. reqDeadline time.Time
  388. wg *sync.WaitGroup
  389. subPaths []string
  390. groupError *errorStatus
  391. }
  392. const _NUM_CHANNELS = 5
  393. var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2
  394. var DefaultDialTimeout = time.Duration(0)
  395. var DefaultTimeout = time.Duration(0)
  396. var noDeadline = time.Time{}
  397. // Buffer 4k requests per worker
  398. var _VB_BULK_GET_CHANNELS []chan *vbBulkGet
  399. func InitBulkGet() {
  400. DefaultDialTimeout = 20 * time.Second
  401. DefaultTimeout = 120 * time.Second
  402. memcached.SetDefaultDialTimeout(DefaultDialTimeout)
  403. _VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS)
  404. for i := 0; i < _NUM_CHANNELS; i++ {
  405. channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS)
  406. _VB_BULK_GET_CHANNELS[i] = channel
  407. for j := 0; j < _NUM_CHANNEL_WORKERS; j++ {
  408. go vbBulkGetWorker(channel)
  409. }
  410. }
  411. }
  412. func vbBulkGetWorker(ch chan *vbBulkGet) {
  413. defer func() {
  414. // Workers cannot panic and die
  415. recover()
  416. go vbBulkGetWorker(ch)
  417. }()
  418. for vbg := range ch {
  419. vbDoBulkGet(vbg)
  420. }
  421. }
  422. func vbDoBulkGet(vbg *vbBulkGet) {
  423. defer vbg.wg.Done()
  424. defer func() {
  425. // Workers cannot panic and die
  426. recover()
  427. }()
  428. vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError)
  429. }
  430. var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.")
  431. func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time,
  432. ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
  433. eStatus *errorStatus) {
  434. defer close(ch)
  435. defer close(ech)
  436. wg := &sync.WaitGroup{}
  437. for k, keys := range kdm {
  438. // GetBulk() group has error donot Queue items for this group
  439. if eStatus.errStatus {
  440. break
  441. }
  442. vbg := &vbBulkGet{
  443. b: b,
  444. ch: ch,
  445. ech: ech,
  446. k: k,
  447. keys: keys,
  448. reqDeadline: reqDeadline,
  449. wg: wg,
  450. subPaths: subPaths,
  451. groupError: eStatus,
  452. }
  453. wg.Add(1)
  454. // Random int
  455. // Right shift to avoid 8-byte alignment, and take low bits
  456. c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS
  457. select {
  458. case _VB_BULK_GET_CHANNELS[c] <- vbg:
  459. // No-op
  460. default:
  461. // Buffer full, abandon the bulk get
  462. ech <- _ERR_CHAN_FULL
  463. wg.Add(-1)
  464. }
  465. }
  466. // Wait for my vb bulk gets
  467. wg.Wait()
  468. }
  469. type multiError []error
  470. func (m multiError) Error() string {
  471. if len(m) == 0 {
  472. panic("Error of none")
  473. }
  474. return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error())
  475. }
  476. // Convert a stream of errors from ech into a multiError (or nil) and
  477. // send down eout.
  478. //
  479. // At least one send is guaranteed on eout, but two is possible, so
  480. // buffer the out channel appropriately.
  481. func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) {
  482. defer func() { eout <- nil }()
  483. var errs multiError
  484. for e := range ech {
  485. if !eStatus.errStatus && !IsKeyNoEntError(e) {
  486. eStatus.errStatus = true
  487. }
  488. errs = append(errs, e)
  489. }
  490. if len(errs) > 0 {
  491. eout <- errs
  492. }
  493. }
  494. // Fetches multiple keys concurrently, with []byte values
  495. //
  496. // This is a wrapper around GetBulk which converts all values returned
  497. // by GetBulk from raw memcached responses into []byte slices.
  498. // Returns one document for duplicate keys
  499. func (b *Bucket) GetBulkRaw(keys []string) (map[string][]byte, error) {
  500. resp, eout := b.getBulk(keys, noDeadline, nil)
  501. rv := make(map[string][]byte, len(keys))
  502. for k, av := range resp {
  503. rv[k] = av.Body
  504. }
  505. b.ReleaseGetBulkPools(resp)
  506. return rv, eout
  507. }
  508. // GetBulk fetches multiple keys concurrently.
  509. //
  510. // Unlike more convenient GETs, the entire response is returned in the
  511. // map array for each key. Keys that were not found will not be included in
  512. // the map.
  513. func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) {
  514. return b.getBulk(keys, reqDeadline, subPaths)
  515. }
  516. func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) {
  517. _STRING_MCRESPONSE_POOL.Put(rv)
  518. }
  519. func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) {
  520. kdm := _VB_STRING_POOL.Get()
  521. defer _VB_STRING_POOL.Put(kdm)
  522. for _, k := range keys {
  523. if k != "" {
  524. vb := uint16(b.VBHash(k))
  525. a, ok1 := kdm[vb]
  526. if !ok1 {
  527. a = _STRING_POOL.Get()
  528. }
  529. kdm[vb] = append(a, k)
  530. }
  531. }
  532. eout := make(chan error, 2)
  533. groupErrorStatus := &errorStatus{}
  534. // processBulkGet will own both of these channels and
  535. // guarantee they're closed before it returns.
  536. ch := make(chan map[string]*gomemcached.MCResponse)
  537. ech := make(chan error)
  538. go errorCollector(ech, eout, groupErrorStatus)
  539. go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus)
  540. var rv map[string]*gomemcached.MCResponse
  541. for m := range ch {
  542. if rv == nil {
  543. rv = m
  544. continue
  545. }
  546. for k, v := range m {
  547. rv[k] = v
  548. }
  549. _STRING_MCRESPONSE_POOL.Put(m)
  550. }
  551. return rv, <-eout
  552. }
  553. // WriteOptions is the set of option flags availble for the Write
  554. // method. They are ORed together to specify the desired request.
  555. type WriteOptions int
  556. const (
  557. // Raw specifies that the value is raw []byte or nil; don't
  558. // JSON-encode it.
  559. Raw = WriteOptions(1 << iota)
  560. // AddOnly indicates an item should only be written if it
  561. // doesn't exist, otherwise ErrKeyExists is returned.
  562. AddOnly
  563. // Persist causes the operation to block until the server
  564. // confirms the item is persisted.
  565. Persist
  566. // Indexable causes the operation to block until it's availble via the index.
  567. Indexable
  568. // Append indicates the given value should be appended to the
  569. // existing value for the given key.
  570. Append
  571. )
  572. var optNames = []struct {
  573. opt WriteOptions
  574. name string
  575. }{
  576. {Raw, "raw"},
  577. {AddOnly, "addonly"}, {Persist, "persist"},
  578. {Indexable, "indexable"}, {Append, "append"},
  579. }
  580. // String representation of WriteOptions
  581. func (w WriteOptions) String() string {
  582. f := []string{}
  583. for _, on := range optNames {
  584. if w&on.opt != 0 {
  585. f = append(f, on.name)
  586. w &= ^on.opt
  587. }
  588. }
  589. if len(f) == 0 || w != 0 {
  590. f = append(f, fmt.Sprintf("0x%x", int(w)))
  591. }
  592. return strings.Join(f, "|")
  593. }
  594. // Error returned from Write with AddOnly flag, when key already exists in the bucket.
  595. var ErrKeyExists = errors.New("key exists")
  596. // General-purpose value setter.
  597. //
  598. // The Set, Add and Delete methods are just wrappers around this. The
  599. // interpretation of `v` depends on whether the `Raw` option is
  600. // given. If it is, v must be a byte array or nil. (A nil value causes
  601. // a delete.) If `Raw` is not given, `v` will be marshaled as JSON
  602. // before being written. It must be JSON-marshalable and it must not
  603. // be nil.
  604. func (b *Bucket) Write(k string, flags, exp int, v interface{},
  605. opt WriteOptions) (err error) {
  606. if ClientOpCallback != nil {
  607. defer func(t time.Time) {
  608. ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
  609. }(time.Now())
  610. }
  611. var data []byte
  612. if opt&Raw == 0 {
  613. data, err = json.Marshal(v)
  614. if err != nil {
  615. return err
  616. }
  617. } else if v != nil {
  618. data = v.([]byte)
  619. }
  620. var res *gomemcached.MCResponse
  621. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  622. if opt&AddOnly != 0 {
  623. res, err = memcached.UnwrapMemcachedError(
  624. mc.Add(vb, k, flags, exp, data))
  625. if err == nil && res.Status != gomemcached.SUCCESS {
  626. if res.Status == gomemcached.KEY_EEXISTS {
  627. err = ErrKeyExists
  628. } else {
  629. err = res
  630. }
  631. }
  632. } else if opt&Append != 0 {
  633. res, err = mc.Append(vb, k, data)
  634. } else if data == nil {
  635. res, err = mc.Del(vb, k)
  636. } else {
  637. res, err = mc.Set(vb, k, flags, exp, data)
  638. }
  639. return err
  640. })
  641. if err == nil && (opt&(Persist|Indexable) != 0) {
  642. err = b.WaitForPersistence(k, res.Cas, data == nil)
  643. }
  644. return err
  645. }
  646. func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{},
  647. opt WriteOptions) (mt *MutationToken, err error) {
  648. if ClientOpCallback != nil {
  649. defer func(t time.Time) {
  650. ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err)
  651. }(time.Now())
  652. }
  653. var data []byte
  654. if opt&Raw == 0 {
  655. data, err = json.Marshal(v)
  656. if err != nil {
  657. return nil, err
  658. }
  659. } else if v != nil {
  660. data = v.([]byte)
  661. }
  662. var res *gomemcached.MCResponse
  663. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  664. if opt&AddOnly != 0 {
  665. res, err = memcached.UnwrapMemcachedError(
  666. mc.Add(vb, k, flags, exp, data))
  667. if err == nil && res.Status != gomemcached.SUCCESS {
  668. if res.Status == gomemcached.KEY_EEXISTS {
  669. err = ErrKeyExists
  670. } else {
  671. err = res
  672. }
  673. }
  674. } else if opt&Append != 0 {
  675. res, err = mc.Append(vb, k, data)
  676. } else if data == nil {
  677. res, err = mc.Del(vb, k)
  678. } else {
  679. res, err = mc.Set(vb, k, flags, exp, data)
  680. }
  681. if len(res.Extras) >= 16 {
  682. vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
  683. seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
  684. mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo}
  685. }
  686. return err
  687. })
  688. if err == nil && (opt&(Persist|Indexable) != 0) {
  689. err = b.WaitForPersistence(k, res.Cas, data == nil)
  690. }
  691. return mt, err
  692. }
  693. // Set a value in this bucket with Cas and return the new Cas value
  694. func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) (uint64, error) {
  695. return b.WriteCas(k, 0, exp, cas, v, 0)
  696. }
  697. // Set a value in this bucket with Cas without json encoding it
  698. func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) (uint64, error) {
  699. return b.WriteCas(k, 0, exp, cas, v, Raw)
  700. }
  701. func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
  702. opt WriteOptions) (newCas uint64, err error) {
  703. if ClientOpCallback != nil {
  704. defer func(t time.Time) {
  705. ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
  706. }(time.Now())
  707. }
  708. var data []byte
  709. if opt&Raw == 0 {
  710. data, err = json.Marshal(v)
  711. if err != nil {
  712. return 0, err
  713. }
  714. } else if v != nil {
  715. data = v.([]byte)
  716. }
  717. var res *gomemcached.MCResponse
  718. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  719. res, err = mc.SetCas(vb, k, flags, exp, cas, data)
  720. return err
  721. })
  722. if err == nil && (opt&(Persist|Indexable) != 0) {
  723. err = b.WaitForPersistence(k, res.Cas, data == nil)
  724. }
  725. return res.Cas, err
  726. }
  727. // Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard
  728. func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) {
  729. return b.WriteCasWithMT(k, flags, exp, cas, v, 0)
  730. }
  731. func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) {
  732. return b.WriteCasWithMT(k, flags, exp, cas, v, Raw)
  733. }
  734. func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{},
  735. opt WriteOptions) (newCas uint64, mt *MutationToken, err error) {
  736. if ClientOpCallback != nil {
  737. defer func(t time.Time) {
  738. ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
  739. }(time.Now())
  740. }
  741. var data []byte
  742. if opt&Raw == 0 {
  743. data, err = json.Marshal(v)
  744. if err != nil {
  745. return 0, nil, err
  746. }
  747. } else if v != nil {
  748. data = v.([]byte)
  749. }
  750. var res *gomemcached.MCResponse
  751. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  752. res, err = mc.SetCas(vb, k, flags, exp, cas, data)
  753. return err
  754. })
  755. if err != nil {
  756. return 0, nil, err
  757. }
  758. // check for extras
  759. if len(res.Extras) >= 16 {
  760. vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
  761. seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
  762. vb := b.VBHash(k)
  763. mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo}
  764. }
  765. if err == nil && (opt&(Persist|Indexable) != 0) {
  766. err = b.WaitForPersistence(k, res.Cas, data == nil)
  767. }
  768. return res.Cas, mt, err
  769. }
  770. // Set a value in this bucket.
  771. // The value will be serialized into a JSON document.
  772. func (b *Bucket) Set(k string, exp int, v interface{}) error {
  773. return b.Write(k, 0, exp, v, 0)
  774. }
  775. // Set a value in this bucket with with flags
  776. func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}) (*MutationToken, error) {
  777. return b.WriteWithMT(k, flags, exp, v, 0)
  778. }
  779. // SetRaw sets a value in this bucket without JSON encoding it.
  780. func (b *Bucket) SetRaw(k string, exp int, v []byte) error {
  781. return b.Write(k, 0, exp, v, Raw)
  782. }
  783. // Add adds a value to this bucket; like Set except that nothing
  784. // happens if the key exists. The value will be serialized into a
  785. // JSON document.
  786. func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error) {
  787. err = b.Write(k, 0, exp, v, AddOnly)
  788. if err == ErrKeyExists {
  789. return false, nil
  790. }
  791. return (err == nil), err
  792. }
  793. // AddRaw adds a value to this bucket; like SetRaw except that nothing
  794. // happens if the key exists. The value will be stored as raw bytes.
  795. func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error) {
  796. err = b.Write(k, 0, exp, v, AddOnly|Raw)
  797. if err == ErrKeyExists {
  798. return false, nil
  799. }
  800. return (err == nil), err
  801. }
  802. // Add adds a value to this bucket; like Set except that nothing
  803. // happens if the key exists. The value will be serialized into a
  804. // JSON document.
  805. func (b *Bucket) AddWithMT(k string, exp int, v interface{}) (added bool, mt *MutationToken, err error) {
  806. mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly)
  807. if err == ErrKeyExists {
  808. return false, mt, nil
  809. }
  810. return (err == nil), mt, err
  811. }
  812. // AddRaw adds a value to this bucket; like SetRaw except that nothing
  813. // happens if the key exists. The value will be stored as raw bytes.
  814. func (b *Bucket) AddRawWithMT(k string, exp int, v []byte) (added bool, mt *MutationToken, err error) {
  815. mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw)
  816. if err == ErrKeyExists {
  817. return false, mt, nil
  818. }
  819. return (err == nil), mt, err
  820. }
  821. // Append appends raw data to an existing item.
  822. func (b *Bucket) Append(k string, data []byte) error {
  823. return b.Write(k, 0, 0, data, Append|Raw)
  824. }
  825. // Get a value straight from Memcached
  826. func (b *Bucket) GetsMC(key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) {
  827. var err error
  828. var response *gomemcached.MCResponse
  829. if key == "" {
  830. return nil, nil
  831. }
  832. if ClientOpCallback != nil {
  833. defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now())
  834. }
  835. err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
  836. var err1 error
  837. mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
  838. response, err1 = mc.Get(vb, key)
  839. mc.SetDeadline(noDeadline)
  840. if err1 != nil {
  841. return err1
  842. }
  843. return nil
  844. }, false)
  845. return response, err
  846. }
  847. // Get a value through the subdoc API
  848. func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string) (*gomemcached.MCResponse, error) {
  849. var err error
  850. var response *gomemcached.MCResponse
  851. if key == "" {
  852. return nil, nil
  853. }
  854. if ClientOpCallback != nil {
  855. defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now())
  856. }
  857. err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
  858. var err1 error
  859. mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
  860. response, err1 = mc.GetSubdoc(vb, key, subPaths)
  861. mc.SetDeadline(noDeadline)
  862. if err1 != nil {
  863. return err1
  864. }
  865. return nil
  866. }, false)
  867. return response, err
  868. }
  869. // GetsRaw gets a raw value from this bucket including its CAS
  870. // counter and flags.
  871. func (b *Bucket) GetsRaw(k string) (data []byte, flags int,
  872. cas uint64, err error) {
  873. if ClientOpCallback != nil {
  874. defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
  875. }
  876. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  877. res, err := mc.Get(vb, k)
  878. if err != nil {
  879. return err
  880. }
  881. cas = res.Cas
  882. if len(res.Extras) >= 4 {
  883. flags = int(binary.BigEndian.Uint32(res.Extras))
  884. }
  885. data = res.Body
  886. return nil
  887. })
  888. return
  889. }
  890. // Gets gets a value from this bucket, including its CAS counter. The
  891. // value is expected to be a JSON stream and will be deserialized into
  892. // rv.
  893. func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error {
  894. data, _, cas, err := b.GetsRaw(k)
  895. if err != nil {
  896. return err
  897. }
  898. if caso != nil {
  899. *caso = cas
  900. }
  901. return json.Unmarshal(data, rv)
  902. }
  903. // Get a value from this bucket.
  904. // The value is expected to be a JSON stream and will be deserialized
  905. // into rv.
  906. func (b *Bucket) Get(k string, rv interface{}) error {
  907. return b.Gets(k, rv, nil)
  908. }
  909. // GetRaw gets a raw value from this bucket. No marshaling is performed.
  910. func (b *Bucket) GetRaw(k string) ([]byte, error) {
  911. d, _, _, err := b.GetsRaw(k)
  912. return d, err
  913. }
  914. // GetAndTouchRaw gets a raw value from this bucket including its CAS
  915. // counter and flags, and updates the expiry on the doc.
  916. func (b *Bucket) GetAndTouchRaw(k string, exp int) (data []byte,
  917. cas uint64, err error) {
  918. if ClientOpCallback != nil {
  919. defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
  920. }
  921. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  922. res, err := mc.GetAndTouch(vb, k, exp)
  923. if err != nil {
  924. return err
  925. }
  926. cas = res.Cas
  927. data = res.Body
  928. return nil
  929. })
  930. return data, cas, err
  931. }
  932. // GetMeta returns the meta values for a key
  933. func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64) (err error) {
  934. if ClientOpCallback != nil {
  935. defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now())
  936. }
  937. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  938. res, err := mc.GetMeta(vb, k)
  939. if err != nil {
  940. return err
  941. }
  942. *cas = res.Cas
  943. if len(res.Extras) >= 8 {
  944. *flags = int(binary.BigEndian.Uint32(res.Extras[4:]))
  945. }
  946. if len(res.Extras) >= 12 {
  947. *expiry = int(binary.BigEndian.Uint32(res.Extras[8:]))
  948. }
  949. if len(res.Extras) >= 20 {
  950. *seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:]))
  951. }
  952. return nil
  953. })
  954. return err
  955. }
  956. // Delete a key from this bucket.
  957. func (b *Bucket) Delete(k string) error {
  958. return b.Write(k, 0, 0, nil, Raw)
  959. }
  960. // Incr increments the value at a given key by amt and defaults to def if no value present.
  961. func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error) {
  962. if ClientOpCallback != nil {
  963. defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now())
  964. }
  965. var rv uint64
  966. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  967. res, err := mc.Incr(vb, k, amt, def, exp)
  968. if err != nil {
  969. return err
  970. }
  971. rv = res
  972. return nil
  973. })
  974. return rv, err
  975. }
  976. // Decr decrements the value at a given key by amt and defaults to def if no value present
  977. func (b *Bucket) Decr(k string, amt, def uint64, exp int) (val uint64, err error) {
  978. if ClientOpCallback != nil {
  979. defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now())
  980. }
  981. var rv uint64
  982. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  983. res, err := mc.Decr(vb, k, amt, def, exp)
  984. if err != nil {
  985. return err
  986. }
  987. rv = res
  988. return nil
  989. })
  990. return rv, err
  991. }
  992. // Wrapper around memcached.CASNext()
  993. func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool {
  994. if ClientOpCallback != nil {
  995. defer func(t time.Time) {
  996. ClientOpCallback("casNext", k, t, state.Err)
  997. }(time.Now())
  998. }
  999. keepGoing := false
  1000. state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  1001. keepGoing = mc.CASNext(vb, k, exp, state)
  1002. return state.Err
  1003. })
  1004. return keepGoing && state.Err == nil
  1005. }
  1006. // An UpdateFunc is a callback function to update a document
  1007. type UpdateFunc func(current []byte) (updated []byte, err error)
  1008. // Return this as the error from an UpdateFunc to cancel the Update
  1009. // operation.
  1010. const UpdateCancel = memcached.CASQuit
  1011. // Update performs a Safe update of a document, avoiding conflicts by
  1012. // using CAS.
  1013. //
  1014. // The callback function will be invoked with the current raw document
  1015. // contents (or nil if the document doesn't exist); it should return
  1016. // the updated raw contents (or nil to delete.) If it decides not to
  1017. // change anything it can return UpdateCancel as the error.
  1018. //
  1019. // If another writer modifies the document between the get and the
  1020. // set, the callback will be invoked again with the newer value.
  1021. func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error {
  1022. _, err := b.update(k, exp, callback)
  1023. return err
  1024. }
  1025. // internal version of Update that returns a CAS value
  1026. func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) {
  1027. var state memcached.CASState
  1028. for b.casNext(k, exp, &state) {
  1029. var err error
  1030. if state.Value, err = callback(state.Value); err != nil {
  1031. return 0, err
  1032. }
  1033. }
  1034. return state.Cas, state.Err
  1035. }
  1036. // A WriteUpdateFunc is a callback function to update a document
  1037. type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)
  1038. // WriteUpdate performs a Safe update of a document, avoiding
  1039. // conflicts by using CAS. WriteUpdate is like Update, except that
  1040. // the callback can return a set of WriteOptions, of which Persist and
  1041. // Indexable are recognized: these cause the call to wait until the
  1042. // document update has been persisted to disk and/or become available
  1043. // to index.
  1044. func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error {
  1045. var writeOpts WriteOptions
  1046. var deletion bool
  1047. // Wrap the callback in an UpdateFunc we can pass to Update:
  1048. updateCallback := func(current []byte) (updated []byte, err error) {
  1049. update, opt, err := callback(current)
  1050. writeOpts = opt
  1051. deletion = (update == nil)
  1052. return update, err
  1053. }
  1054. cas, err := b.update(k, exp, updateCallback)
  1055. if err != nil {
  1056. return err
  1057. }
  1058. // If callback asked, wait for persistence or indexability:
  1059. if writeOpts&(Persist|Indexable) != 0 {
  1060. err = b.WaitForPersistence(k, cas, deletion)
  1061. }
  1062. return err
  1063. }
  1064. // Observe observes the current state of a document.
  1065. func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) {
  1066. if ClientOpCallback != nil {
  1067. defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now())
  1068. }
  1069. err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
  1070. result, err = mc.Observe(vb, k)
  1071. return err
  1072. })
  1073. return
  1074. }
  1075. // Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
  1076. // if the value has been overwritten by another before being persisted.
  1077. var ErrOverwritten = errors.New("overwritten")
  1078. // Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
  1079. // if the value hasn't been persisted by the timeout interval
  1080. var ErrTimeout = errors.New("timeout")
  1081. // WaitForPersistence waits for an item to be considered durable.
  1082. //
  1083. // Besides transport errors, ErrOverwritten may be returned if the
  1084. // item is overwritten before it reaches durability. ErrTimeout may
  1085. // occur if the item isn't found durable in a reasonable amount of
  1086. // time.
  1087. func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error {
  1088. timeout := 10 * time.Second
  1089. sleepDelay := 5 * time.Millisecond
  1090. start := time.Now()
  1091. for {
  1092. time.Sleep(sleepDelay)
  1093. sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time
  1094. result, err := b.Observe(k)
  1095. if err != nil {
  1096. return err
  1097. }
  1098. if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten {
  1099. return ErrOverwritten
  1100. } else if persisted {
  1101. return nil
  1102. }
  1103. if result.PersistenceTime > 0 {
  1104. timeout = 2 * result.PersistenceTime
  1105. }
  1106. if time.Since(start) >= timeout-sleepDelay {
  1107. return ErrTimeout
  1108. }
  1109. }
  1110. }
  1111. var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16)
  1112. type stringPool struct {
  1113. pool *sync.Pool
  1114. size int
  1115. }
  1116. func newStringPool(size int) *stringPool {
  1117. rv := &stringPool{
  1118. pool: &sync.Pool{
  1119. New: func() interface{} {
  1120. return make([]string, 0, size)
  1121. },
  1122. },
  1123. size: size,
  1124. }
  1125. return rv
  1126. }
  1127. func (this *stringPool) Get() []string {
  1128. return this.pool.Get().([]string)
  1129. }
  1130. func (this *stringPool) Put(s []string) {
  1131. if s == nil || cap(s) < this.size || cap(s) > 2*this.size {
  1132. return
  1133. }
  1134. this.pool.Put(s[0:0])
  1135. }
  1136. var _STRING_POOL = newStringPool(16)
  1137. type vbStringPool struct {
  1138. pool *sync.Pool
  1139. strPool *stringPool
  1140. }
  1141. func newVBStringPool(size int, sp *stringPool) *vbStringPool {
  1142. rv := &vbStringPool{
  1143. pool: &sync.Pool{
  1144. New: func() interface{} {
  1145. return make(map[uint16][]string, size)
  1146. },
  1147. },
  1148. strPool: sp,
  1149. }
  1150. return rv
  1151. }
  1152. func (this *vbStringPool) Get() map[uint16][]string {
  1153. return this.pool.Get().(map[uint16][]string)
  1154. }
  1155. func (this *vbStringPool) Put(s map[uint16][]string) {
  1156. if s == nil {
  1157. return
  1158. }
  1159. for k, v := range s {
  1160. delete(s, k)
  1161. this.strPool.Put(v)
  1162. }
  1163. this.pool.Put(s)
  1164. }
  1165. var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL)