Commit f98ebbe1 authored by Brad Davidson's avatar Brad Davidson Committed by Brad Davidson

Fix syncing empty list of apiserver addresses during initial startup

Also add more debug logging to the sync process. Signed-off-by: 's avatarBrad Davidson <brad.davidson@rancher.com> (cherry picked from commit 781640ec) Signed-off-by: 's avatarBrad Davidson <brad.davidson@rancher.com>
parent dfffdfa2
......@@ -38,7 +38,11 @@ import (
)
var (
endpointDebounceDelay = time.Second
// endpointDebounceDelay sets how long we wait before updating apiserver
// addresses when the kubernetes endpoint list changes. When the apiserver is
// starting up it adds then removes then re-adds itself a few times in quick
// succession, and we want to avoid closing connections unnecessarily.
endpointDebounceDelay = 3 * time.Second
defaultDialer = net.Dialer{}
)
......@@ -335,14 +339,9 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
<-done
}()
var cancelUpdate context.CancelFunc
for {
select {
case <-ctx.Done():
if cancelUpdate != nil {
cancelUpdate()
}
return
case ev, ok := <-watch.ResultChan():
endpoint, ok := ev.Object.(*v1.Endpoints)
......@@ -351,20 +350,15 @@ func (a *agentTunnel) watchEndpoints(ctx context.Context, apiServerReady <-chan
continue
}
if cancelUpdate != nil {
cancelUpdate()
}
var debounceCtx context.Context
debounceCtx, cancelUpdate = context.WithCancel(ctx)
// When joining the cluster, the apiserver adds, removes, and then re-adds itself to
// the endpoint list several times. This causes a bit of thrashing if we react to
// endpoint changes immediately. Instead, perform the endpoint update in a
// goroutine that sleeps for a short period before checking for changes and updating
// the proxy addresses. If another update occurs, the previous update operation
// will be cancelled and a new one queued.
go syncProxyAddresses(debounceCtx, util.GetAddresses(endpoint))
addresses := util.GetAddresses(endpoint)
logrus.Debugf("Syncing apiserver addresses from tunnel watch: %v", addresses)
syncProxyAddresses(addresses)
}
}
}
......@@ -478,15 +472,15 @@ func (a *agentTunnel) dialContext(ctx context.Context, network, address string)
}
// proxySyncer is a common signature for functions that sync the proxy address list with a context
type proxySyncer func(ctx context.Context, addresses []string)
type proxySyncer func(addresses []string)
// getProxySyncer returns a function that can be called to update the list of supervisors.
// This function is responsible for connecting to or disconnecting websocket tunnels,
// as well as updating the proxy loadbalancer server list.
func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tlsConfig *tls.Config, proxy proxy.Proxy) proxySyncer {
disconnect := map[string]context.CancelFunc{}
// Attempt to connect to supervisors, storing their cancellation function for later when we
// need to disconnect.
// Attempt to connect to inital list of addresses, storing their cancellation
// function for later when we need to disconnect.
for _, address := range proxy.SupervisorAddresses() {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, wg, address, tlsConfig)
......@@ -495,43 +489,72 @@ func (a *agentTunnel) getProxySyncer(ctx context.Context, wg *sync.WaitGroup, tl
}
}
// return a function that can be called to update the address list.
// servers will be connected to or disconnected from as necessary,
// and the proxy addresses updated.
return func(debounceCtx context.Context, addresses []string) {
select {
case <-time.After(endpointDebounceDelay):
case <-debounceCtx.Done():
var cancelUpdate context.CancelFunc
// return a function that can be called to update the address list. servers will be
// connected to or disconnected from as necessary, and the proxy addresses updated.
// The update is done in a goroutine that waits a short period in order to reduce
// thrashing during apiserver startup. Each time the function is called, the context for
// the goroutine started by the previous call is cancelled to prevent it from updating
// if the delay has not yet expired.
return func(addresses []string) {
if len(addresses) == 0 {
logrus.Debugf("Skipping apiserver addresses sync: %v", addresses)
return
}
// Compare list of supervisor addresses before and after syncing apiserver
// endpoints into the proxy to figure out which supervisors we need to connect to
// or disconnect from. Note that the addresses we were passed will not match
// the supervisor addresses if the supervisor and apiserver are on different ports -
// they must be round-tripped through proxy.Update before comparing.
curAddresses := sets.New(proxy.SupervisorAddresses()...)
proxy.Update(addresses)
newAddresses := sets.New(proxy.SupervisorAddresses()...)
// add new servers
for address := range newAddresses.Difference(curAddresses) {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
logrus.Infof("Started tunnel to %s", address)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
if cancelUpdate != nil {
cancelUpdate()
}
// remove old servers
for address := range curAddresses.Difference(newAddresses) {
if cancel, ok := disconnect[address]; ok {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
var debounceCtx context.Context
debounceCtx, cancelUpdate = context.WithCancel(ctx)
go func() {
select {
case <-time.After(endpointDebounceDelay):
logrus.Debugf("Settled apiserver addresses sync: %v", addresses)
case <-debounceCtx.Done():
logrus.Debugf("Cancelled apiserver addresses sync: %v", addresses)
return
}
}
// Compare list of supervisor addresses before and after syncing apiserver
// endpoints into the proxy to figure out which supervisors we need to connect to
// or disconnect from. Note that the addresses we were passed will not match
// the supervisor addresses if the supervisor and apiserver are on different ports -
// they must be round-tripped through proxy.Update before comparing.
curAddresses := sets.New(proxy.SupervisorAddresses()...)
proxy.Update(addresses)
newAddresses := sets.New(proxy.SupervisorAddresses()...)
addedAddresses := newAddresses.Difference(curAddresses)
removedAddresses := curAddresses.Difference(newAddresses)
if addedAddresses.Len() == 0 && removedAddresses.Len() == 0 {
return
}
logrus.Debugf("Sync apiserver addresses - connecting: %v, disconnecting: %v", addedAddresses.UnsortedList(), removedAddresses.UnsortedList())
// add new servers
for address := range addedAddresses {
if _, ok := disconnect[address]; !ok {
conn := a.connect(ctx, nil, address, tlsConfig)
logrus.Infof("Started tunnel to %s", address)
disconnect[address] = conn.cancel
proxy.SetHealthCheck(address, conn.healthCheck)
}
}
// remove old servers
for address := range removedAddresses {
if cancel, ok := disconnect[address]; ok {
cancel()
delete(disconnect, address)
logrus.Infof("Stopped tunnel to %s", address)
}
}
}()
}
}
......@@ -550,10 +573,11 @@ func getAPIServersRequester(node *daemonconfig.Node, proxy proxy.Proxy, syncProx
}
}
if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil {
if addresses, err := agentconfig.GetAPIServers(ctx, info); err != nil || len(addresses) == 0 {
logrus.Warnf("Failed to get apiserver addresses from supervisor: %v", err)
} else {
syncProxyAddresses(ctx, addresses)
logrus.Debugf("Syncing apiserver addresses from server: %v", addresses)
syncProxyAddresses(addresses)
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment