mirror of https://github.com/tidwall/tile38.git
935 lines
25 KiB
Go
935 lines
25 KiB
Go
package sarama
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
|
|
// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
|
|
// Methods with stricter requirements will specify the minimum broker version required.
|
|
// You MUST call Close() on a client to avoid leaks
|
|
type ClusterAdmin interface {
|
|
// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
|
|
// It may take several seconds after CreateTopic returns success for all the brokers
|
|
// to become aware that the topic has been created. During this time, listTopics
|
|
// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
|
|
CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
|
|
|
|
// List the topics available in the cluster with the default options.
|
|
ListTopics() (map[string]TopicDetail, error)
|
|
|
|
// Describe some topics in the cluster.
|
|
DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
|
|
|
|
// Delete a topic. It may take several seconds after the DeleteTopic to returns success
|
|
// and for all the brokers to become aware that the topics are gone.
|
|
// During this time, listTopics may continue to return information about the deleted topic.
|
|
// If delete.topic.enable is false on the brokers, deleteTopic will mark
|
|
// the topic for deletion, but not actually delete them.
|
|
// This operation is supported by brokers with version 0.10.1.0 or higher.
|
|
DeleteTopic(topic string) error
|
|
|
|
// Increase the number of partitions of the topics according to the corresponding values.
|
|
// If partitions are increased for a topic that has a key, the partition logic or ordering of
|
|
// the messages will be affected. It may take several seconds after this method returns
|
|
// success for all the brokers to become aware that the partitions have been created.
|
|
// During this time, ClusterAdmin#describeTopics may not return information about the
|
|
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
|
|
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
|
|
|
|
// Alter the replica assignment for partitions.
|
|
// This operation is supported by brokers with version 2.4.0.0 or higher.
|
|
AlterPartitionReassignments(topic string, assignment [][]int32) error
|
|
|
|
// Provides info on ongoing partitions replica reassignments.
|
|
// This operation is supported by brokers with version 2.4.0.0 or higher.
|
|
ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
|
|
|
|
// Delete records whose offset is smaller than the given offset of the corresponding partition.
|
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
|
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
|
|
|
|
// Get the configuration for the specified resources.
|
|
// The returned configuration includes default values and the Default is true
|
|
// can be used to distinguish them from user supplied values.
|
|
// Config entries where ReadOnly is true cannot be updated.
|
|
// The value of config entries where Sensitive is true is always nil so
|
|
// sensitive information is not disclosed.
|
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
|
DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
|
|
|
|
// Update the configuration for the specified resources with the default options.
|
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
|
// The resources with their configs (topic is the only resource type with configs
|
|
// that can be updated currently Updates are not transactional so they may succeed
|
|
// for some resources while fail for others. The configs for a particular resource are updated automatically.
|
|
AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
|
|
|
|
// Creates access control lists (ACLs) which are bound to specific resources.
|
|
// This operation is not transactional so it may succeed for some ACLs while fail for others.
|
|
// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
|
|
// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
|
|
CreateACL(resource Resource, acl Acl) error
|
|
|
|
// Lists access control lists (ACLs) according to the supplied filter.
|
|
// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
|
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
|
ListAcls(filter AclFilter) ([]ResourceAcls, error)
|
|
|
|
// Deletes access control lists (ACLs) according to the supplied filters.
|
|
// This operation is not transactional so it may succeed for some ACLs while fail for others.
|
|
// This operation is supported by brokers with version 0.11.0.0 or higher.
|
|
DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
|
|
|
|
// List the consumer groups available in the cluster.
|
|
ListConsumerGroups() (map[string]string, error)
|
|
|
|
// Describe the given consumer groups.
|
|
DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
|
|
|
|
// List the consumer group offsets available in the cluster.
|
|
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
|
|
|
|
// Delete a consumer group.
|
|
DeleteConsumerGroup(group string) error
|
|
|
|
// Get information about the nodes in the cluster
|
|
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
|
|
|
|
// Get information about all log directories on the given set of brokers
|
|
DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
|
|
|
|
// Close shuts down the admin and closes underlying client.
|
|
Close() error
|
|
}
|
|
|
|
type clusterAdmin struct {
|
|
client Client
|
|
conf *Config
|
|
}
|
|
|
|
// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
|
|
func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
|
|
client, err := NewClient(addrs, conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return NewClusterAdminFromClient(client)
|
|
}
|
|
|
|
// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
|
|
// Note that underlying client will also be closed on admin's Close() call.
|
|
func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
|
|
//make sure we can retrieve the controller
|
|
_, err := client.Controller()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ca := &clusterAdmin{
|
|
client: client,
|
|
conf: client.Config(),
|
|
}
|
|
return ca, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) Close() error {
|
|
return ca.client.Close()
|
|
}
|
|
|
|
func (ca *clusterAdmin) Controller() (*Broker, error) {
|
|
return ca.client.Controller()
|
|
}
|
|
|
|
func (ca *clusterAdmin) refreshController() (*Broker, error) {
|
|
return ca.client.RefreshController()
|
|
}
|
|
|
|
// isErrNoController returns `true` if the given error type unwraps to an
|
|
// `ErrNotController` response from Kafka
|
|
func isErrNoController(err error) bool {
|
|
switch e := err.(type) {
|
|
case *TopicError:
|
|
return e.Err == ErrNotController
|
|
case *TopicPartitionError:
|
|
return e.Err == ErrNotController
|
|
case KError:
|
|
return e == ErrNotController
|
|
}
|
|
return false
|
|
}
|
|
|
|
// retryOnError will repeatedly call the given (error-returning) func in the
|
|
// case that its response is non-nil and retriable (as determined by the
|
|
// provided retriable func) up to the maximum number of tries permitted by
|
|
// the admin client configuration
|
|
func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
|
|
var err error
|
|
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
|
|
err = fn()
|
|
if err == nil || !retriable(err) {
|
|
return err
|
|
}
|
|
Logger.Printf(
|
|
"admin/request retrying after %dms... (%d attempts remaining)\n",
|
|
ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
|
|
time.Sleep(ca.conf.Admin.Retry.Backoff)
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
|
|
if topic == "" {
|
|
return ErrInvalidTopic
|
|
}
|
|
|
|
if detail == nil {
|
|
return errors.New("you must specify topic details")
|
|
}
|
|
|
|
topicDetails := make(map[string]*TopicDetail)
|
|
topicDetails[topic] = detail
|
|
|
|
request := &CreateTopicsRequest{
|
|
TopicDetails: topicDetails,
|
|
ValidateOnly: validateOnly,
|
|
Timeout: ca.conf.Admin.Timeout,
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V0_11_0_0) {
|
|
request.Version = 1
|
|
}
|
|
if ca.conf.Version.IsAtLeast(V1_0_0_0) {
|
|
request.Version = 2
|
|
}
|
|
|
|
return ca.retryOnError(isErrNoController, func() error {
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rsp, err := b.CreateTopics(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
topicErr, ok := rsp.TopicErrors[topic]
|
|
if !ok {
|
|
return ErrIncompleteResponse
|
|
}
|
|
|
|
if topicErr.Err != ErrNoError {
|
|
if topicErr.Err == ErrNotController {
|
|
_, _ = ca.refreshController()
|
|
}
|
|
return topicErr
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
|
|
controller, err := ca.Controller()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
request := &MetadataRequest{
|
|
Topics: topics,
|
|
AllowAutoTopicCreation: false,
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V1_0_0_0) {
|
|
request.Version = 5
|
|
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
|
|
request.Version = 4
|
|
}
|
|
|
|
response, err := controller.GetMetadata(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return response.Topics, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
|
|
controller, err := ca.Controller()
|
|
if err != nil {
|
|
return nil, int32(0), err
|
|
}
|
|
|
|
request := &MetadataRequest{
|
|
Topics: []string{},
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V0_10_0_0) {
|
|
request.Version = 1
|
|
}
|
|
|
|
response, err := controller.GetMetadata(request)
|
|
if err != nil {
|
|
return nil, int32(0), err
|
|
}
|
|
|
|
return response.Brokers, response.ControllerID, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
|
|
brokers := ca.client.Brokers()
|
|
for _, b := range brokers {
|
|
if b.ID() == id {
|
|
return b, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("could not find broker id %d", id)
|
|
}
|
|
|
|
func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
|
|
brokers := ca.client.Brokers()
|
|
if len(brokers) > 0 {
|
|
index := rand.Intn(len(brokers))
|
|
return brokers[index], nil
|
|
}
|
|
return nil, errors.New("no available broker")
|
|
}
|
|
|
|
func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
|
|
// In order to build TopicDetails we need to first get the list of all
|
|
// topics using a MetadataRequest and then get their configs using a
|
|
// DescribeConfigsRequest request. To avoid sending many requests to the
|
|
// broker, we use a single DescribeConfigsRequest.
|
|
|
|
// Send the all-topic MetadataRequest
|
|
b, err := ca.findAnyBroker()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_ = b.Open(ca.client.Config())
|
|
|
|
metadataReq := &MetadataRequest{}
|
|
metadataResp, err := b.GetMetadata(metadataReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
topicsDetailsMap := make(map[string]TopicDetail)
|
|
|
|
var describeConfigsResources []*ConfigResource
|
|
|
|
for _, topic := range metadataResp.Topics {
|
|
topicDetails := TopicDetail{
|
|
NumPartitions: int32(len(topic.Partitions)),
|
|
}
|
|
if len(topic.Partitions) > 0 {
|
|
topicDetails.ReplicaAssignment = map[int32][]int32{}
|
|
for _, partition := range topic.Partitions {
|
|
topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
|
|
}
|
|
topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
|
|
}
|
|
topicsDetailsMap[topic.Name] = topicDetails
|
|
|
|
// we populate the resources we want to describe from the MetadataResponse
|
|
topicResource := ConfigResource{
|
|
Type: TopicResource,
|
|
Name: topic.Name,
|
|
}
|
|
describeConfigsResources = append(describeConfigsResources, &topicResource)
|
|
}
|
|
|
|
// Send the DescribeConfigsRequest
|
|
describeConfigsReq := &DescribeConfigsRequest{
|
|
Resources: describeConfigsResources,
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V1_1_0_0) {
|
|
describeConfigsReq.Version = 1
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
|
|
describeConfigsReq.Version = 2
|
|
}
|
|
|
|
describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, resource := range describeConfigsResp.Resources {
|
|
topicDetails := topicsDetailsMap[resource.Name]
|
|
topicDetails.ConfigEntries = make(map[string]*string)
|
|
|
|
for _, entry := range resource.Configs {
|
|
// only include non-default non-sensitive config
|
|
// (don't actually think topic config will ever be sensitive)
|
|
if entry.Default || entry.Sensitive {
|
|
continue
|
|
}
|
|
topicDetails.ConfigEntries[entry.Name] = &entry.Value
|
|
}
|
|
|
|
topicsDetailsMap[resource.Name] = topicDetails
|
|
}
|
|
|
|
return topicsDetailsMap, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) DeleteTopic(topic string) error {
|
|
if topic == "" {
|
|
return ErrInvalidTopic
|
|
}
|
|
|
|
request := &DeleteTopicsRequest{
|
|
Topics: []string{topic},
|
|
Timeout: ca.conf.Admin.Timeout,
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V0_11_0_0) {
|
|
request.Version = 1
|
|
}
|
|
|
|
return ca.retryOnError(isErrNoController, func() error {
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rsp, err := b.DeleteTopics(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
topicErr, ok := rsp.TopicErrorCodes[topic]
|
|
if !ok {
|
|
return ErrIncompleteResponse
|
|
}
|
|
|
|
if topicErr != ErrNoError {
|
|
if topicErr == ErrNotController {
|
|
_, _ = ca.refreshController()
|
|
}
|
|
return topicErr
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
|
|
if topic == "" {
|
|
return ErrInvalidTopic
|
|
}
|
|
|
|
topicPartitions := make(map[string]*TopicPartition)
|
|
topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
|
|
|
|
request := &CreatePartitionsRequest{
|
|
TopicPartitions: topicPartitions,
|
|
Timeout: ca.conf.Admin.Timeout,
|
|
}
|
|
|
|
return ca.retryOnError(isErrNoController, func() error {
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rsp, err := b.CreatePartitions(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
topicErr, ok := rsp.TopicPartitionErrors[topic]
|
|
if !ok {
|
|
return ErrIncompleteResponse
|
|
}
|
|
|
|
if topicErr.Err != ErrNoError {
|
|
if topicErr.Err == ErrNotController {
|
|
_, _ = ca.refreshController()
|
|
}
|
|
return topicErr
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
|
|
if topic == "" {
|
|
return ErrInvalidTopic
|
|
}
|
|
|
|
request := &AlterPartitionReassignmentsRequest{
|
|
TimeoutMs: int32(60000),
|
|
Version: int16(0),
|
|
}
|
|
|
|
for i := 0; i < len(assignment); i++ {
|
|
request.AddBlock(topic, int32(i), assignment[i])
|
|
}
|
|
|
|
return ca.retryOnError(isErrNoController, func() error {
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errs := make([]error, 0)
|
|
|
|
rsp, err := b.AlterPartitionReassignments(request)
|
|
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
} else {
|
|
if rsp.ErrorCode > 0 {
|
|
errs = append(errs, errors.New(rsp.ErrorCode.Error()))
|
|
}
|
|
|
|
for topic, topicErrors := range rsp.Errors {
|
|
for partition, partitionError := range topicErrors {
|
|
if partitionError.errorCode != ErrNoError {
|
|
errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
|
|
errs = append(errs, errors.New(errStr))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
return ErrReassignPartitions{MultiError{&errs}}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
|
|
if topic == "" {
|
|
return nil, ErrInvalidTopic
|
|
}
|
|
|
|
request := &ListPartitionReassignmentsRequest{
|
|
TimeoutMs: int32(60000),
|
|
Version: int16(0),
|
|
}
|
|
|
|
request.AddBlock(topic, partitions)
|
|
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
_ = b.Open(ca.client.Config())
|
|
|
|
rsp, err := b.ListPartitionReassignments(request)
|
|
|
|
if err == nil && rsp != nil {
|
|
return rsp.TopicStatus, nil
|
|
} else {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
|
|
if topic == "" {
|
|
return ErrInvalidTopic
|
|
}
|
|
partitionPerBroker := make(map[*Broker][]int32)
|
|
for partition := range partitionOffsets {
|
|
broker, err := ca.client.Leader(topic, partition)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, ok := partitionPerBroker[broker]; ok {
|
|
partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
|
|
} else {
|
|
partitionPerBroker[broker] = []int32{partition}
|
|
}
|
|
}
|
|
errs := make([]error, 0)
|
|
for broker, partitions := range partitionPerBroker {
|
|
topics := make(map[string]*DeleteRecordsRequestTopic)
|
|
recordsToDelete := make(map[int32]int64)
|
|
for _, p := range partitions {
|
|
recordsToDelete[p] = partitionOffsets[p]
|
|
}
|
|
topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
|
|
request := &DeleteRecordsRequest{
|
|
Topics: topics,
|
|
Timeout: ca.conf.Admin.Timeout,
|
|
}
|
|
|
|
rsp, err := broker.DeleteRecords(request)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
} else {
|
|
deleteRecordsResponseTopic, ok := rsp.Topics[topic]
|
|
if !ok {
|
|
errs = append(errs, ErrIncompleteResponse)
|
|
} else {
|
|
for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
|
|
if deleteRecordsResponsePartition.Err != ErrNoError {
|
|
errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if len(errs) > 0 {
|
|
return ErrDeleteRecords{MultiError{&errs}}
|
|
}
|
|
//todo since we are dealing with couple of partitions it would be good if we return slice of errors
|
|
//for each partition instead of one error
|
|
return nil
|
|
}
|
|
|
|
// Returns a bool indicating whether the resource request needs to go to a
|
|
// specific broker
|
|
func dependsOnSpecificNode(resource ConfigResource) bool {
|
|
return (resource.Type == BrokerResource && resource.Name != "") ||
|
|
resource.Type == BrokerLoggerResource
|
|
}
|
|
|
|
func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
|
|
var entries []ConfigEntry
|
|
var resources []*ConfigResource
|
|
resources = append(resources, &resource)
|
|
|
|
request := &DescribeConfigsRequest{
|
|
Resources: resources,
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V1_1_0_0) {
|
|
request.Version = 1
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
|
|
request.Version = 2
|
|
}
|
|
|
|
var (
|
|
b *Broker
|
|
err error
|
|
)
|
|
|
|
// DescribeConfig of broker/broker logger must be sent to the broker in question
|
|
if dependsOnSpecificNode(resource) {
|
|
id, _ := strconv.Atoi(resource.Name)
|
|
b, err = ca.findBroker(int32(id))
|
|
} else {
|
|
b, err = ca.findAnyBroker()
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_ = b.Open(ca.client.Config())
|
|
rsp, err := b.DescribeConfigs(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, rspResource := range rsp.Resources {
|
|
if rspResource.Name == resource.Name {
|
|
if rspResource.ErrorMsg != "" {
|
|
return nil, errors.New(rspResource.ErrorMsg)
|
|
}
|
|
if rspResource.ErrorCode != 0 {
|
|
return nil, KError(rspResource.ErrorCode)
|
|
}
|
|
for _, cfgEntry := range rspResource.Configs {
|
|
entries = append(entries, *cfgEntry)
|
|
}
|
|
}
|
|
}
|
|
return entries, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
|
|
var resources []*AlterConfigsResource
|
|
resources = append(resources, &AlterConfigsResource{
|
|
Type: resourceType,
|
|
Name: name,
|
|
ConfigEntries: entries,
|
|
})
|
|
|
|
request := &AlterConfigsRequest{
|
|
Resources: resources,
|
|
ValidateOnly: validateOnly,
|
|
}
|
|
|
|
var (
|
|
b *Broker
|
|
err error
|
|
)
|
|
|
|
// AlterConfig of broker/broker logger must be sent to the broker in question
|
|
if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
|
|
id, _ := strconv.Atoi(name)
|
|
b, err = ca.findBroker(int32(id))
|
|
} else {
|
|
b, err = ca.findAnyBroker()
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_ = b.Open(ca.client.Config())
|
|
rsp, err := b.AlterConfigs(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, rspResource := range rsp.Resources {
|
|
if rspResource.Name == name {
|
|
if rspResource.ErrorMsg != "" {
|
|
return errors.New(rspResource.ErrorMsg)
|
|
}
|
|
if rspResource.ErrorCode != 0 {
|
|
return KError(rspResource.ErrorCode)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
|
|
var acls []*AclCreation
|
|
acls = append(acls, &AclCreation{resource, acl})
|
|
request := &CreateAclsRequest{AclCreations: acls}
|
|
|
|
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
|
|
request.Version = 1
|
|
}
|
|
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = b.CreateAcls(request)
|
|
return err
|
|
}
|
|
|
|
func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
|
|
request := &DescribeAclsRequest{AclFilter: filter}
|
|
|
|
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
|
|
request.Version = 1
|
|
}
|
|
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rsp, err := b.DescribeAcls(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var lAcls []ResourceAcls
|
|
for _, rAcl := range rsp.ResourceAcls {
|
|
lAcls = append(lAcls, *rAcl)
|
|
}
|
|
return lAcls, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
|
|
var filters []*AclFilter
|
|
filters = append(filters, &filter)
|
|
request := &DeleteAclsRequest{Filters: filters}
|
|
|
|
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
|
|
request.Version = 1
|
|
}
|
|
|
|
b, err := ca.Controller()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rsp, err := b.DeleteAcls(request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var mAcls []MatchingAcl
|
|
for _, fr := range rsp.FilterResponses {
|
|
for _, mACL := range fr.MatchingAcls {
|
|
mAcls = append(mAcls, *mACL)
|
|
}
|
|
}
|
|
return mAcls, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
|
|
groupsPerBroker := make(map[*Broker][]string)
|
|
|
|
for _, group := range groups {
|
|
controller, err := ca.client.Coordinator(group)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
|
|
}
|
|
|
|
for broker, brokerGroups := range groupsPerBroker {
|
|
response, err := broker.DescribeGroups(&DescribeGroupsRequest{
|
|
Groups: brokerGroups,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result = append(result, response.Groups...)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
|
|
allGroups = make(map[string]string)
|
|
|
|
// Query brokers in parallel, since we have to query *all* brokers
|
|
brokers := ca.client.Brokers()
|
|
groupMaps := make(chan map[string]string, len(brokers))
|
|
errors := make(chan error, len(brokers))
|
|
wg := sync.WaitGroup{}
|
|
|
|
for _, b := range brokers {
|
|
wg.Add(1)
|
|
go func(b *Broker, conf *Config) {
|
|
defer wg.Done()
|
|
_ = b.Open(conf) // Ensure that broker is opened
|
|
|
|
response, err := b.ListGroups(&ListGroupsRequest{})
|
|
if err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
|
|
groups := make(map[string]string)
|
|
for group, typ := range response.Groups {
|
|
groups[group] = typ
|
|
}
|
|
|
|
groupMaps <- groups
|
|
}(b, ca.conf)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(groupMaps)
|
|
close(errors)
|
|
|
|
for groupMap := range groupMaps {
|
|
for group, protocolType := range groupMap {
|
|
allGroups[group] = protocolType
|
|
}
|
|
}
|
|
|
|
// Intentionally return only the first error for simplicity
|
|
err = <-errors
|
|
return
|
|
}
|
|
|
|
func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
|
|
coordinator, err := ca.client.Coordinator(group)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
request := &OffsetFetchRequest{
|
|
ConsumerGroup: group,
|
|
partitions: topicPartitions,
|
|
}
|
|
|
|
if ca.conf.Version.IsAtLeast(V0_10_2_0) {
|
|
request.Version = 2
|
|
} else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
|
|
request.Version = 1
|
|
}
|
|
|
|
return coordinator.FetchOffset(request)
|
|
}
|
|
|
|
func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
|
|
coordinator, err := ca.client.Coordinator(group)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
request := &DeleteGroupsRequest{
|
|
Groups: []string{group},
|
|
}
|
|
|
|
resp, err := coordinator.DeleteGroups(request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
groupErr, ok := resp.GroupErrorCodes[group]
|
|
if !ok {
|
|
return ErrIncompleteResponse
|
|
}
|
|
|
|
if groupErr != ErrNoError {
|
|
return groupErr
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
|
|
allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
|
|
|
|
// Query brokers in parallel, since we may have to query multiple brokers
|
|
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
|
|
errors := make(chan error, len(brokerIds))
|
|
wg := sync.WaitGroup{}
|
|
|
|
for _, b := range brokerIds {
|
|
wg.Add(1)
|
|
broker, err := ca.findBroker(b)
|
|
if err != nil {
|
|
Logger.Printf("Unable to find broker with ID = %v\n", b)
|
|
continue
|
|
}
|
|
go func(b *Broker, conf *Config) {
|
|
defer wg.Done()
|
|
_ = b.Open(conf) // Ensure that broker is opened
|
|
|
|
response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
|
|
if err != nil {
|
|
errors <- err
|
|
return
|
|
}
|
|
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
|
|
logDirs[b.ID()] = response.LogDirs
|
|
logDirsMaps <- logDirs
|
|
}(broker, ca.conf)
|
|
}
|
|
|
|
wg.Wait()
|
|
close(logDirsMaps)
|
|
close(errors)
|
|
|
|
for logDirsMap := range logDirsMaps {
|
|
for id, logDirs := range logDirsMap {
|
|
allLogDirs[id] = logDirs
|
|
}
|
|
}
|
|
|
|
// Intentionally return only the first error for simplicity
|
|
err = <-errors
|
|
return
|
|
}
|