Avoid the start of go-routines which are never get stopped

This commit is contained in:
Wolfgang Friedl 2017-03-15 14:43:09 +01:00 committed by Bjørn Erik Pedersen
parent 0b5690fd87
commit 84f94806c6
2 changed files with 60 additions and 3 deletions

View File

@ -33,14 +33,44 @@ func (rc remoteConfigProvider) Watch(rp viper.RemoteProvider) (io.Reader, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp := <-cm.Watch(rp.Path(), nil) resp,err := cm.Get(rp.Path())
err = resp.Error
if err != nil { if err != nil {
return nil, err return nil, err
} }
return bytes.NewReader(resp.Value), nil return bytes.NewReader(resp), nil
} }
func (rc remoteConfigProvider) WatchChannel(rp viper.RemoteProvider) (<-chan *viper.RemoteResponse, chan bool) {
cm, err := getConfigManager(rp)
if err != nil {
return nil, nil
}
quit := make(chan bool)
quitwc := make(chan bool)
viperResponsCh := make(chan *viper.RemoteResponse)
cryptoResponseCh := cm.Watch(rp.Path(), quit)
// need this function to convert the Channel response form crypt.Response to viper.Response
go func(cr <-chan *crypt.Response,vr chan<- *viper.RemoteResponse, quitwc <-chan bool, quit chan<- bool) {
for {
select {
case <- quitwc:
quit <- true
return
case resp := <-cr:
vr <- &viper.RemoteResponse{
Error: resp.Error,
Value: resp.Value,
}
}
}
}(cryptoResponseCh,viperResponsCh,quitwc,quit)
return viperResponsCh,quitwc
}
func getConfigManager(rp viper.RemoteProvider) (crypt.ConfigManager, error) { func getConfigManager(rp viper.RemoteProvider) (crypt.ConfigManager, error) {

View File

@ -40,6 +40,11 @@ import (
var v *Viper var v *Viper
type RemoteResponse struct {
Value []byte
Error error
}
func init() { func init() {
v = New() v = New()
} }
@ -47,6 +52,7 @@ func init() {
type remoteConfigFactory interface { type remoteConfigFactory interface {
Get(rp RemoteProvider) (io.Reader, error) Get(rp RemoteProvider) (io.Reader, error)
Watch(rp RemoteProvider) (io.Reader, error) Watch(rp RemoteProvider) (io.Reader, error)
WatchChannel(rp RemoteProvider)(<-chan *RemoteResponse, chan bool)
} }
// RemoteConfig is optional, see the remote package // RemoteConfig is optional, see the remote package
@ -1255,6 +1261,10 @@ func (v *Viper) WatchRemoteConfig() error {
return v.watchKeyValueConfig() return v.watchKeyValueConfig()
} }
func (v *Viper) WatchRemoteConfigOnChannel() error {
return v.watchKeyValueConfigOnChannel()
}
// Unmarshall a Reader into a map. // Unmarshall a Reader into a map.
// Should probably be an unexported function. // Should probably be an unexported function.
func unmarshalReader(in io.Reader, c map[string]interface{}) error { func unmarshalReader(in io.Reader, c map[string]interface{}) error {
@ -1298,6 +1308,23 @@ func (v *Viper) getRemoteConfig(provider RemoteProvider) (map[string]interface{}
return v.kvstore, err return v.kvstore, err
} }
// Retrieve the first found remote configuration.
func (v *Viper) watchKeyValueConfigOnChannel() error {
for _, rp := range v.remoteProviders {
respc, _ := RemoteConfig.WatchChannel(rp)
//Todo: Add quit channel
go func(rc <-chan *RemoteResponse) {
for {
b := <-rc
reader := bytes.NewReader(b.Value)
v.unmarshalReader(reader, v.kvstore)
}
}(respc)
return nil
}
return RemoteConfigError("No Files Found")
}
// Retrieve the first found remote configuration. // Retrieve the first found remote configuration.
func (v *Viper) watchKeyValueConfig() error { func (v *Viper) watchKeyValueConfig() error {
for _, rp := range v.remoteProviders { for _, rp := range v.remoteProviders {