You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

243 lines
4.9 KiB

package client
import (
"container/list"
"time"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/ngaut/tso/proto"
"github.com/ngaut/tso/util"
"github.com/ngaut/zkhelper"
)
const (
maxPipelineRequest = 100000
)
// Client is a timestamp oracle client.
type Client struct {
requests chan *PipelineRequest
pending *list.List
conf *Conf
addr string
leaderCh chan string
}
// Conf is the configuration.
type Conf struct {
// tso server address, it will be deprecated later.
ServerAddr string
// ZKAddr is for zookeeper address, if set, client will ignore ServerAddr
// and find the leader tso server address in zookeeper.
// Later ServerAddr is just for simple test and backward compatibility.
ZKAddr string
// root path is the tso server saving in zookeeper, like /zk/tso.
RootPath string
}
// PipelineRequest let you get the timestamp with pipeline.
type PipelineRequest struct {
done chan error
reply *proto.Response
}
func newPipelineRequest() *PipelineRequest {
return &PipelineRequest{
done: make(chan error, 1),
}
}
// MarkDone sets the repsone for current request.
func (pr *PipelineRequest) MarkDone(reply *proto.Response, err error) {
if err != nil {
pr.reply = nil
}
pr.reply = reply
pr.done <- errors.Trace(err)
}
// GetTS gets the timestamp.
func (pr *PipelineRequest) GetTS() (*proto.Timestamp, error) {
err := <-pr.done
if err != nil {
return nil, errors.Trace(err)
}
return &pr.reply.Timestamp, nil
}
// NewClient creates a timestamp oracle client.
func NewClient(conf *Conf) *Client {
c := &Client{
requests: make(chan *PipelineRequest, maxPipelineRequest),
pending: list.New(),
conf: conf,
leaderCh: make(chan string, 1),
}
if len(conf.ZKAddr) == 0 {
c.leaderCh <- conf.ServerAddr
} else {
go c.watchLeader()
}
go c.workerLoop()
return c
}
func (c *Client) cleanupPending(err error) {
log.Warn(err)
length := c.pending.Len()
for i := 0; i < length; i++ {
e := c.pending.Front()
c.pending.Remove(e)
e.Value.(*PipelineRequest).MarkDone(nil, err)
}
// clear request in channel too
length = len(c.requests)
for i := 0; i < length; i++ {
req := <-c.requests
req.MarkDone(nil, err)
}
}
func (c *Client) notifyOne(reply *proto.Response) {
e := c.pending.Front()
c.pending.Remove(e)
req := e.Value.(*PipelineRequest)
req.MarkDone(reply, nil)
}
func (c *Client) writeRequests(session *Conn) error {
var protoHdr [1]byte
for i := 0; i < c.pending.Len(); i++ {
session.Write(protoHdr[:])
}
return session.Flush()
}
func (c *Client) handleResponse(session *Conn) error {
length := c.pending.Len()
for i := 0; i < length; i++ {
var resp proto.Response
err := resp.Decode(session)
if err != nil {
return errors.Trace(err)
}
c.notifyOne(&resp)
}
return nil
}
func (c *Client) do() error {
session, err := NewConnection(c.addr, time.Duration(1*time.Second))
if err != nil {
return errors.Trace(err)
}
log.Debugf("connect tso server %s ok", c.addr)
defer session.Close()
for {
select {
case req := <-c.requests:
c.pending.PushBack(req)
length := len(c.requests)
for i := 0; i < length; i++ {
req = <-c.requests
c.pending.PushBack(req)
}
err = c.writeRequests(session)
if err != nil {
return errors.Trace(err)
}
err = c.handleResponse(session)
if err != nil {
return errors.Trace(err)
}
case addr := <-c.leaderCh:
oldAddr := c.addr
c.addr = addr
return errors.Errorf("leader change %s -> %s", oldAddr, addr)
}
}
}
func (c *Client) workerLoop() {
// first get tso leader
c.addr = <-c.leaderCh
log.Debugf("try to connect tso server %s", c.addr)
for {
err := c.do()
if err != nil {
c.cleanupPending(err)
}
select {
case <-time.After(1 * time.Second):
case addr := <-c.leaderCh:
// If old tso server down, NewConnection will fail and return immediately in do function,
// so we must check leader change here.
log.Warnf("leader change %s -> %s", c.addr, addr)
c.addr = addr
// Wait some time to let tso server allow accepting connections.
time.Sleep(1 * time.Second)
}
}
}
func (c *Client) watchLeader() {
var (
conn zkhelper.Conn
err error
)
for {
conn, err = zkhelper.ConnectToZkWithTimeout(c.conf.ZKAddr, time.Second)
if err != nil {
log.Errorf("connect zk err %v, retry later", err)
time.Sleep(3 * time.Second)
continue
}
break
}
defer conn.Close()
var lastAddr string
for {
addr, watcher, err := util.GetWatchLeader(conn, c.conf.RootPath)
if err != nil {
log.Errorf("get tso leader err %v, retry later", err)
time.Sleep(3 * time.Second)
continue
}
if lastAddr != addr {
log.Warnf("leader change %s -> %s", lastAddr, addr)
lastAddr = addr
c.leaderCh <- addr
}
// watch the leader changes.
<-watcher
}
}
// GoGetTimestamp returns a PipelineRequest so you can get the timestamp later.
func (c *Client) GoGetTimestamp() *PipelineRequest {
pr := newPipelineRequest()
c.requests <- pr
return pr
}