You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

797 lines
23 KiB

  1. package pq
  2. // Package pq is a pure Go Postgres driver for the database/sql package.
  3. // This module contains support for Postgres LISTEN/NOTIFY.
  4. import (
  5. "errors"
  6. "fmt"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. // Notification represents a single notification from the database.
  12. type Notification struct {
  13. // Process ID (PID) of the notifying postgres backend.
  14. BePid int
  15. // Name of the channel the notification was sent on.
  16. Channel string
  17. // Payload, or the empty string if unspecified.
  18. Extra string
  19. }
  20. func recvNotification(r *readBuf) *Notification {
  21. bePid := r.int32()
  22. channel := r.string()
  23. extra := r.string()
  24. return &Notification{bePid, channel, extra}
  25. }
  26. const (
  27. connStateIdle int32 = iota
  28. connStateExpectResponse
  29. connStateExpectReadyForQuery
  30. )
  31. type message struct {
  32. typ byte
  33. err error
  34. }
  35. var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
  36. // ListenerConn is a low-level interface for waiting for notifications. You
  37. // should use Listener instead.
  38. type ListenerConn struct {
  39. // guards cn and err
  40. connectionLock sync.Mutex
  41. cn *conn
  42. err error
  43. connState int32
  44. // the sending goroutine will be holding this lock
  45. senderLock sync.Mutex
  46. notificationChan chan<- *Notification
  47. replyChan chan message
  48. }
  49. // NewListenerConn creates a new ListenerConn. Use NewListener instead.
  50. func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
  51. return newDialListenerConn(defaultDialer{}, name, notificationChan)
  52. }
  53. func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
  54. cn, err := DialOpen(d, name)
  55. if err != nil {
  56. return nil, err
  57. }
  58. l := &ListenerConn{
  59. cn: cn.(*conn),
  60. notificationChan: c,
  61. connState: connStateIdle,
  62. replyChan: make(chan message, 2),
  63. }
  64. go l.listenerConnMain()
  65. return l, nil
  66. }
  67. // We can only allow one goroutine at a time to be running a query on the
  68. // connection for various reasons, so the goroutine sending on the connection
  69. // must be holding senderLock.
  70. //
  71. // Returns an error if an unrecoverable error has occurred and the ListenerConn
  72. // should be abandoned.
  73. func (l *ListenerConn) acquireSenderLock() error {
  74. // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
  75. l.senderLock.Lock()
  76. l.connectionLock.Lock()
  77. err := l.err
  78. l.connectionLock.Unlock()
  79. if err != nil {
  80. l.senderLock.Unlock()
  81. return err
  82. }
  83. return nil
  84. }
  85. func (l *ListenerConn) releaseSenderLock() {
  86. l.senderLock.Unlock()
  87. }
  88. // setState advances the protocol state to newState. Returns false if moving
  89. // to that state from the current state is not allowed.
  90. func (l *ListenerConn) setState(newState int32) bool {
  91. var expectedState int32
  92. switch newState {
  93. case connStateIdle:
  94. expectedState = connStateExpectReadyForQuery
  95. case connStateExpectResponse:
  96. expectedState = connStateIdle
  97. case connStateExpectReadyForQuery:
  98. expectedState = connStateExpectResponse
  99. default:
  100. panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
  101. }
  102. return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
  103. }
  104. // Main logic is here: receive messages from the postgres backend, forward
  105. // notifications and query replies and keep the internal state in sync with the
  106. // protocol state. Returns when the connection has been lost, is about to go
  107. // away or should be discarded because we couldn't agree on the state with the
  108. // server backend.
  109. func (l *ListenerConn) listenerConnLoop() (err error) {
  110. defer errRecoverNoErrBadConn(&err)
  111. r := &readBuf{}
  112. for {
  113. t, err := l.cn.recvMessage(r)
  114. if err != nil {
  115. return err
  116. }
  117. switch t {
  118. case 'A':
  119. // recvNotification copies all the data so we don't need to worry
  120. // about the scratch buffer being overwritten.
  121. l.notificationChan <- recvNotification(r)
  122. case 'T', 'D':
  123. // only used by tests; ignore
  124. case 'E':
  125. // We might receive an ErrorResponse even when not in a query; it
  126. // is expected that the server will close the connection after
  127. // that, but we should make sure that the error we display is the
  128. // one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
  129. if !l.setState(connStateExpectReadyForQuery) {
  130. return parseError(r)
  131. }
  132. l.replyChan <- message{t, parseError(r)}
  133. case 'C', 'I':
  134. if !l.setState(connStateExpectReadyForQuery) {
  135. // protocol out of sync
  136. return fmt.Errorf("unexpected CommandComplete")
  137. }
  138. // ExecSimpleQuery doesn't need to know about this message
  139. case 'Z':
  140. if !l.setState(connStateIdle) {
  141. // protocol out of sync
  142. return fmt.Errorf("unexpected ReadyForQuery")
  143. }
  144. l.replyChan <- message{t, nil}
  145. case 'N', 'S':
  146. // ignore
  147. default:
  148. return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
  149. }
  150. }
  151. }
  152. // This is the main routine for the goroutine receiving on the database
  153. // connection. Most of the main logic is in listenerConnLoop.
  154. func (l *ListenerConn) listenerConnMain() {
  155. err := l.listenerConnLoop()
  156. // listenerConnLoop terminated; we're done, but we still have to clean up.
  157. // Make sure nobody tries to start any new queries by making sure the err
  158. // pointer is set. It is important that we do not overwrite its value; a
  159. // connection could be closed by either this goroutine or one sending on
  160. // the connection -- whoever closes the connection is assumed to have the
  161. // more meaningful error message (as the other one will probably get
  162. // net.errClosed), so that goroutine sets the error we expose while the
  163. // other error is discarded. If the connection is lost while two
  164. // goroutines are operating on the socket, it probably doesn't matter which
  165. // error we expose so we don't try to do anything more complex.
  166. l.connectionLock.Lock()
  167. if l.err == nil {
  168. l.err = err
  169. }
  170. l.cn.Close()
  171. l.connectionLock.Unlock()
  172. // There might be a query in-flight; make sure nobody's waiting for a
  173. // response to it, since there's not going to be one.
  174. close(l.replyChan)
  175. // let the listener know we're done
  176. close(l.notificationChan)
  177. // this ListenerConn is done
  178. }
  179. // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
  180. func (l *ListenerConn) Listen(channel string) (bool, error) {
  181. return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
  182. }
  183. // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
  184. func (l *ListenerConn) Unlisten(channel string) (bool, error) {
  185. return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
  186. }
  187. // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
  188. func (l *ListenerConn) UnlistenAll() (bool, error) {
  189. return l.ExecSimpleQuery("UNLISTEN *")
  190. }
  191. // Ping the remote server to make sure it's alive. Non-nil error means the
  192. // connection has failed and should be abandoned.
  193. func (l *ListenerConn) Ping() error {
  194. sent, err := l.ExecSimpleQuery("")
  195. if !sent {
  196. return err
  197. }
  198. if err != nil {
  199. // shouldn't happen
  200. panic(err)
  201. }
  202. return nil
  203. }
  204. // Attempt to send a query on the connection. Returns an error if sending the
  205. // query failed, and the caller should initiate closure of this connection.
  206. // The caller must be holding senderLock (see acquireSenderLock and
  207. // releaseSenderLock).
  208. func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
  209. defer errRecoverNoErrBadConn(&err)
  210. // must set connection state before sending the query
  211. if !l.setState(connStateExpectResponse) {
  212. panic("two queries running at the same time")
  213. }
  214. // Can't use l.cn.writeBuf here because it uses the scratch buffer which
  215. // might get overwritten by listenerConnLoop.
  216. b := &writeBuf{
  217. buf: []byte("Q\x00\x00\x00\x00"),
  218. pos: 1,
  219. }
  220. b.string(q)
  221. l.cn.send(b)
  222. return nil
  223. }
  224. // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
  225. // parameters) on the connection. The possible return values are:
  226. // 1) "executed" is true; the query was executed to completion on the
  227. // database server. If the query failed, err will be set to the error
  228. // returned by the database, otherwise err will be nil.
  229. // 2) If "executed" is false, the query could not be executed on the remote
  230. // server. err will be non-nil.
  231. //
  232. // After a call to ExecSimpleQuery has returned an executed=false value, the
  233. // connection has either been closed or will be closed shortly thereafter, and
  234. // all subsequently executed queries will return an error.
  235. func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
  236. if err = l.acquireSenderLock(); err != nil {
  237. return false, err
  238. }
  239. defer l.releaseSenderLock()
  240. err = l.sendSimpleQuery(q)
  241. if err != nil {
  242. // We can't know what state the protocol is in, so we need to abandon
  243. // this connection.
  244. l.connectionLock.Lock()
  245. // Set the error pointer if it hasn't been set already; see
  246. // listenerConnMain.
  247. if l.err == nil {
  248. l.err = err
  249. }
  250. l.connectionLock.Unlock()
  251. l.cn.c.Close()
  252. return false, err
  253. }
  254. // now we just wait for a reply..
  255. for {
  256. m, ok := <-l.replyChan
  257. if !ok {
  258. // We lost the connection to server, don't bother waiting for a
  259. // a response. err should have been set already.
  260. l.connectionLock.Lock()
  261. err := l.err
  262. l.connectionLock.Unlock()
  263. return false, err
  264. }
  265. switch m.typ {
  266. case 'Z':
  267. // sanity check
  268. if m.err != nil {
  269. panic("m.err != nil")
  270. }
  271. // done; err might or might not be set
  272. return true, err
  273. case 'E':
  274. // sanity check
  275. if m.err == nil {
  276. panic("m.err == nil")
  277. }
  278. // server responded with an error; ReadyForQuery to follow
  279. err = m.err
  280. default:
  281. return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
  282. }
  283. }
  284. }
  285. // Close closes the connection.
  286. func (l *ListenerConn) Close() error {
  287. l.connectionLock.Lock()
  288. if l.err != nil {
  289. l.connectionLock.Unlock()
  290. return errListenerConnClosed
  291. }
  292. l.err = errListenerConnClosed
  293. l.connectionLock.Unlock()
  294. // We can't send anything on the connection without holding senderLock.
  295. // Simply close the net.Conn to wake up everyone operating on it.
  296. return l.cn.c.Close()
  297. }
  298. // Err returns the reason the connection was closed. It is not safe to call
  299. // this function until l.Notify has been closed.
  300. func (l *ListenerConn) Err() error {
  301. return l.err
  302. }
  303. var errListenerClosed = errors.New("pq: Listener has been closed")
  304. // ErrChannelAlreadyOpen is returned from Listen when a channel is already
  305. // open.
  306. var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
  307. // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
  308. var ErrChannelNotOpen = errors.New("pq: channel is not open")
  309. // ListenerEventType is an enumeration of listener event types.
  310. type ListenerEventType int
  311. const (
  312. // ListenerEventConnected is emitted only when the database connection
  313. // has been initially initialized. The err argument of the callback
  314. // will always be nil.
  315. ListenerEventConnected ListenerEventType = iota
  316. // ListenerEventDisconnected is emitted after a database connection has
  317. // been lost, either because of an error or because Close has been
  318. // called. The err argument will be set to the reason the database
  319. // connection was lost.
  320. ListenerEventDisconnected
  321. // ListenerEventReconnected is emitted after a database connection has
  322. // been re-established after connection loss. The err argument of the
  323. // callback will always be nil. After this event has been emitted, a
  324. // nil pq.Notification is sent on the Listener.Notify channel.
  325. ListenerEventReconnected
  326. // ListenerEventConnectionAttemptFailed is emitted after a connection
  327. // to the database was attempted, but failed. The err argument will be
  328. // set to an error describing why the connection attempt did not
  329. // succeed.
  330. ListenerEventConnectionAttemptFailed
  331. )
  332. // EventCallbackType is the event callback type. See also ListenerEventType
  333. // constants' documentation.
  334. type EventCallbackType func(event ListenerEventType, err error)
  335. // Listener provides an interface for listening to notifications from a
  336. // PostgreSQL database. For general usage information, see section
  337. // "Notifications".
  338. //
  339. // Listener can safely be used from concurrently running goroutines.
  340. type Listener struct {
  341. // Channel for receiving notifications from the database. In some cases a
  342. // nil value will be sent. See section "Notifications" above.
  343. Notify chan *Notification
  344. name string
  345. minReconnectInterval time.Duration
  346. maxReconnectInterval time.Duration
  347. dialer Dialer
  348. eventCallback EventCallbackType
  349. lock sync.Mutex
  350. isClosed bool
  351. reconnectCond *sync.Cond
  352. cn *ListenerConn
  353. connNotificationChan <-chan *Notification
  354. channels map[string]struct{}
  355. }
  356. // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
  357. //
  358. // name should be set to a connection string to be used to establish the
  359. // database connection (see section "Connection String Parameters" above).
  360. //
  361. // minReconnectInterval controls the duration to wait before trying to
  362. // re-establish the database connection after connection loss. After each
  363. // consecutive failure this interval is doubled, until maxReconnectInterval is
  364. // reached. Successfully completing the connection establishment procedure
  365. // resets the interval back to minReconnectInterval.
  366. //
  367. // The last parameter eventCallback can be set to a function which will be
  368. // called by the Listener when the state of the underlying database connection
  369. // changes. This callback will be called by the goroutine which dispatches the
  370. // notifications over the Notify channel, so you should try to avoid doing
  371. // potentially time-consuming operations from the callback.
  372. func NewListener(name string,
  373. minReconnectInterval time.Duration,
  374. maxReconnectInterval time.Duration,
  375. eventCallback EventCallbackType) *Listener {
  376. return NewDialListener(defaultDialer{}, name, minReconnectInterval, maxReconnectInterval, eventCallback)
  377. }
  378. // NewDialListener is like NewListener but it takes a Dialer.
  379. func NewDialListener(d Dialer,
  380. name string,
  381. minReconnectInterval time.Duration,
  382. maxReconnectInterval time.Duration,
  383. eventCallback EventCallbackType) *Listener {
  384. l := &Listener{
  385. name: name,
  386. minReconnectInterval: minReconnectInterval,
  387. maxReconnectInterval: maxReconnectInterval,
  388. dialer: d,
  389. eventCallback: eventCallback,
  390. channels: make(map[string]struct{}),
  391. Notify: make(chan *Notification, 32),
  392. }
  393. l.reconnectCond = sync.NewCond(&l.lock)
  394. go l.listenerMain()
  395. return l
  396. }
  397. // NotificationChannel returns the notification channel for this listener.
  398. // This is the same channel as Notify, and will not be recreated during the
  399. // life time of the Listener.
  400. func (l *Listener) NotificationChannel() <-chan *Notification {
  401. return l.Notify
  402. }
  403. // Listen starts listening for notifications on a channel. Calls to this
  404. // function will block until an acknowledgement has been received from the
  405. // server. Note that Listener automatically re-establishes the connection
  406. // after connection loss, so this function may block indefinitely if the
  407. // connection can not be re-established.
  408. //
  409. // Listen will only fail in three conditions:
  410. // 1) The channel is already open. The returned error will be
  411. // ErrChannelAlreadyOpen.
  412. // 2) The query was executed on the remote server, but PostgreSQL returned an
  413. // error message in response to the query. The returned error will be a
  414. // pq.Error containing the information the server supplied.
  415. // 3) Close is called on the Listener before the request could be completed.
  416. //
  417. // The channel name is case-sensitive.
  418. func (l *Listener) Listen(channel string) error {
  419. l.lock.Lock()
  420. defer l.lock.Unlock()
  421. if l.isClosed {
  422. return errListenerClosed
  423. }
  424. // The server allows you to issue a LISTEN on a channel which is already
  425. // open, but it seems useful to be able to detect this case to spot for
  426. // mistakes in application logic. If the application genuinely does't
  427. // care, it can check the exported error and ignore it.
  428. _, exists := l.channels[channel]
  429. if exists {
  430. return ErrChannelAlreadyOpen
  431. }
  432. if l.cn != nil {
  433. // If gotResponse is true but error is set, the query was executed on
  434. // the remote server, but resulted in an error. This should be
  435. // relatively rare, so it's fine if we just pass the error to our
  436. // caller. However, if gotResponse is false, we could not complete the
  437. // query on the remote server and our underlying connection is about
  438. // to go away, so we only add relname to l.channels, and wait for
  439. // resync() to take care of the rest.
  440. gotResponse, err := l.cn.Listen(channel)
  441. if gotResponse && err != nil {
  442. return err
  443. }
  444. }
  445. l.channels[channel] = struct{}{}
  446. for l.cn == nil {
  447. l.reconnectCond.Wait()
  448. // we let go of the mutex for a while
  449. if l.isClosed {
  450. return errListenerClosed
  451. }
  452. }
  453. return nil
  454. }
  455. // Unlisten removes a channel from the Listener's channel list. Returns
  456. // ErrChannelNotOpen if the Listener is not listening on the specified channel.
  457. // Returns immediately with no error if there is no connection. Note that you
  458. // might still get notifications for this channel even after Unlisten has
  459. // returned.
  460. //
  461. // The channel name is case-sensitive.
  462. func (l *Listener) Unlisten(channel string) error {
  463. l.lock.Lock()
  464. defer l.lock.Unlock()
  465. if l.isClosed {
  466. return errListenerClosed
  467. }
  468. // Similarly to LISTEN, this is not an error in Postgres, but it seems
  469. // useful to distinguish from the normal conditions.
  470. _, exists := l.channels[channel]
  471. if !exists {
  472. return ErrChannelNotOpen
  473. }
  474. if l.cn != nil {
  475. // Similarly to Listen (see comment in that function), the caller
  476. // should only be bothered with an error if it came from the backend as
  477. // a response to our query.
  478. gotResponse, err := l.cn.Unlisten(channel)
  479. if gotResponse && err != nil {
  480. return err
  481. }
  482. }
  483. // Don't bother waiting for resync if there's no connection.
  484. delete(l.channels, channel)
  485. return nil
  486. }
  487. // UnlistenAll removes all channels from the Listener's channel list. Returns
  488. // immediately with no error if there is no connection. Note that you might
  489. // still get notifications for any of the deleted channels even after
  490. // UnlistenAll has returned.
  491. func (l *Listener) UnlistenAll() error {
  492. l.lock.Lock()
  493. defer l.lock.Unlock()
  494. if l.isClosed {
  495. return errListenerClosed
  496. }
  497. if l.cn != nil {
  498. // Similarly to Listen (see comment in that function), the caller
  499. // should only be bothered with an error if it came from the backend as
  500. // a response to our query.
  501. gotResponse, err := l.cn.UnlistenAll()
  502. if gotResponse && err != nil {
  503. return err
  504. }
  505. }
  506. // Don't bother waiting for resync if there's no connection.
  507. l.channels = make(map[string]struct{})
  508. return nil
  509. }
  510. // Ping the remote server to make sure it's alive. Non-nil return value means
  511. // that there is no active connection.
  512. func (l *Listener) Ping() error {
  513. l.lock.Lock()
  514. defer l.lock.Unlock()
  515. if l.isClosed {
  516. return errListenerClosed
  517. }
  518. if l.cn == nil {
  519. return errors.New("no connection")
  520. }
  521. return l.cn.Ping()
  522. }
  523. // Clean up after losing the server connection. Returns l.cn.Err(), which
  524. // should have the reason the connection was lost.
  525. func (l *Listener) disconnectCleanup() error {
  526. l.lock.Lock()
  527. defer l.lock.Unlock()
  528. // sanity check; can't look at Err() until the channel has been closed
  529. select {
  530. case _, ok := <-l.connNotificationChan:
  531. if ok {
  532. panic("connNotificationChan not closed")
  533. }
  534. default:
  535. panic("connNotificationChan not closed")
  536. }
  537. err := l.cn.Err()
  538. l.cn.Close()
  539. l.cn = nil
  540. return err
  541. }
  542. // Synchronize the list of channels we want to be listening on with the server
  543. // after the connection has been established.
  544. func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
  545. doneChan := make(chan error)
  546. go func(notificationChan <-chan *Notification) {
  547. for channel := range l.channels {
  548. // If we got a response, return that error to our caller as it's
  549. // going to be more descriptive than cn.Err().
  550. gotResponse, err := cn.Listen(channel)
  551. if gotResponse && err != nil {
  552. doneChan <- err
  553. return
  554. }
  555. // If we couldn't reach the server, wait for notificationChan to
  556. // close and then return the error message from the connection, as
  557. // per ListenerConn's interface.
  558. if err != nil {
  559. for range notificationChan {
  560. }
  561. doneChan <- cn.Err()
  562. return
  563. }
  564. }
  565. doneChan <- nil
  566. }(notificationChan)
  567. // Ignore notifications while synchronization is going on to avoid
  568. // deadlocks. We have to send a nil notification over Notify anyway as
  569. // we can't possibly know which notifications (if any) were lost while
  570. // the connection was down, so there's no reason to try and process
  571. // these messages at all.
  572. for {
  573. select {
  574. case _, ok := <-notificationChan:
  575. if !ok {
  576. notificationChan = nil
  577. }
  578. case err := <-doneChan:
  579. return err
  580. }
  581. }
  582. }
  583. // caller should NOT be holding l.lock
  584. func (l *Listener) closed() bool {
  585. l.lock.Lock()
  586. defer l.lock.Unlock()
  587. return l.isClosed
  588. }
  589. func (l *Listener) connect() error {
  590. notificationChan := make(chan *Notification, 32)
  591. cn, err := newDialListenerConn(l.dialer, l.name, notificationChan)
  592. if err != nil {
  593. return err
  594. }
  595. l.lock.Lock()
  596. defer l.lock.Unlock()
  597. err = l.resync(cn, notificationChan)
  598. if err != nil {
  599. cn.Close()
  600. return err
  601. }
  602. l.cn = cn
  603. l.connNotificationChan = notificationChan
  604. l.reconnectCond.Broadcast()
  605. return nil
  606. }
  607. // Close disconnects the Listener from the database and shuts it down.
  608. // Subsequent calls to its methods will return an error. Close returns an
  609. // error if the connection has already been closed.
  610. func (l *Listener) Close() error {
  611. l.lock.Lock()
  612. defer l.lock.Unlock()
  613. if l.isClosed {
  614. return errListenerClosed
  615. }
  616. if l.cn != nil {
  617. l.cn.Close()
  618. }
  619. l.isClosed = true
  620. // Unblock calls to Listen()
  621. l.reconnectCond.Broadcast()
  622. return nil
  623. }
  624. func (l *Listener) emitEvent(event ListenerEventType, err error) {
  625. if l.eventCallback != nil {
  626. l.eventCallback(event, err)
  627. }
  628. }
  629. // Main logic here: maintain a connection to the server when possible, wait
  630. // for notifications and emit events.
  631. func (l *Listener) listenerConnLoop() {
  632. var nextReconnect time.Time
  633. reconnectInterval := l.minReconnectInterval
  634. for {
  635. for {
  636. err := l.connect()
  637. if err == nil {
  638. break
  639. }
  640. if l.closed() {
  641. return
  642. }
  643. l.emitEvent(ListenerEventConnectionAttemptFailed, err)
  644. time.Sleep(reconnectInterval)
  645. reconnectInterval *= 2
  646. if reconnectInterval > l.maxReconnectInterval {
  647. reconnectInterval = l.maxReconnectInterval
  648. }
  649. }
  650. if nextReconnect.IsZero() {
  651. l.emitEvent(ListenerEventConnected, nil)
  652. } else {
  653. l.emitEvent(ListenerEventReconnected, nil)
  654. l.Notify <- nil
  655. }
  656. reconnectInterval = l.minReconnectInterval
  657. nextReconnect = time.Now().Add(reconnectInterval)
  658. for {
  659. notification, ok := <-l.connNotificationChan
  660. if !ok {
  661. // lost connection, loop again
  662. break
  663. }
  664. l.Notify <- notification
  665. }
  666. err := l.disconnectCleanup()
  667. if l.closed() {
  668. return
  669. }
  670. l.emitEvent(ListenerEventDisconnected, err)
  671. time.Sleep(time.Until(nextReconnect))
  672. }
  673. }
  674. func (l *Listener) listenerMain() {
  675. l.listenerConnLoop()
  676. close(l.Notify)
  677. }