diff --git a/cmd/ctrlc/root/sync/kubernetes/kubernetes.go b/cmd/ctrlc/root/sync/kubernetes/kubernetes.go index b967c1d..3578a2d 100644 --- a/cmd/ctrlc/root/sync/kubernetes/kubernetes.go +++ b/cmd/ctrlc/root/sync/kubernetes/kubernetes.go @@ -1,9 +1,7 @@ package kubernetes import ( - "context" "fmt" - "strings" "github.com/MakeNowJust/heredoc/v2" "github.com/charmbracelet/log" @@ -11,65 +9,13 @@ import ( ctrlp "github.com/ctrlplanedev/cli/internal/common" "github.com/spf13/cobra" "github.com/spf13/viper" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" ) -func processNamespace(_ context.Context, clusterName string, namespace corev1.Namespace) api.ResourceProviderResource { - metadata := map[string]string{} - for key, value := range namespace.Labels { - metadata[fmt.Sprintf("tags/%s", key)] = value - } - - metadata["kubernetes/namespace"] = namespace.Name - metadata["namespace/id"] = string(namespace.UID) - metadata["namespace/api-version"] = namespace.APIVersion - metadata["namespace/status"] = string(namespace.Status.Phase) - - return api.ResourceProviderResource{ - Version: "ctrlplane.dev/kubernetes/namespace/v1", - Kind: "KubernetesNamespace", - Name: fmt.Sprintf("%s/%s", clusterName, namespace.Name), - Identifier: string(namespace.UID), - Config: map[string]any{ - "id": string(namespace.UID), - "name": namespace.Name, - "status": namespace.Status.Phase, - }, - Metadata: metadata, - } -} - -func processDeployment(_ context.Context, clusterName string, deployment appsv1.Deployment) api.ResourceProviderResource { - metadata := map[string]string{} - for key, value := range deployment.Labels { - metadata[fmt.Sprintf("tags/%s", key)] = value - } - metadata["deployment/name"] = deployment.Name - metadata["deployment/id"] = string(deployment.UID) - metadata["deployment/api-version"] = deployment.APIVersion - metadata["deployment/namespace"] = deployment.Namespace - - return api.ResourceProviderResource{ - Version: "ctrlplane.dev/kubernetes/deployment/v1", - Kind: "KubernetesDeployment", - Name: fmt.Sprintf("%s/%s/%s", clusterName, deployment.Namespace, deployment.Name), - Identifier: string(deployment.UID), - Config: map[string]any{ - "id": string(deployment.UID), - "name": deployment.Name, - "namespace": deployment.Namespace, - }, - Metadata: metadata, - } -} - func NewSyncKubernetesCmd() *cobra.Command { var clusterIdentifier string var providerName string var clusterName string + var selectors ResourceTypes cmd := &cobra.Command{ Use: "kubernetes", @@ -78,7 +24,7 @@ func NewSyncKubernetesCmd() *cobra.Command { $ ctrlc sync kubernetes --cluster-identifier 1234567890 --cluster-name my-cluster `), RunE: func(cmd *cobra.Command, args []string) error { - ctx := context.Background() + ctx := cmd.Context() log.Info("Syncing Kubernetes resources on a cluster") if clusterIdentifier == "" { clusterIdentifier = viper.GetString("cluster-identifier") @@ -89,6 +35,10 @@ func NewSyncKubernetesCmd() *cobra.Command { return err } + if clusterName == "" { + clusterName = configClusterName + } + log.Info("Connected to cluster", "name", clusterName) apiURL := viper.GetString("url") @@ -99,70 +49,19 @@ func NewSyncKubernetesCmd() *cobra.Command { if err != nil { return fmt.Errorf("failed to create API client: %w", err) } - - clusterResource, err := ctrlplaneClient.GetResourceByIdentifierWithResponse(ctx, workspaceId, clusterIdentifier) - if err != nil { - log.Warn("Failed to get cluster resource") - } - if clusterResource.StatusCode() > 499 { - log.Warn("Failed to get cluster resource", "status", clusterResource.StatusCode(), "identifier", clusterIdentifier, "error", err) - return fmt.Errorf("error access ctrlplane api: %s", clusterResource.Status()) - } - if clusterResource != nil && clusterResource.JSON200 != nil { - log.Info("Found cluster resource", "name", clusterResource.JSON200.Name) - clusterName = clusterResource.JSON200.Name - } - - if clusterName == "" { - clusterName = configClusterName - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return err - } - - namespaces, err := clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + sync := newSync(clusterIdentifier, workspaceId, ctrlplaneClient, config, clusterName) + resources, err := sync.process(ctx, selectors) if err != nil { return err } - resources := []api.ResourceProviderResource{} - for _, namespace := range namespaces.Items { - resource := processNamespace(context.Background(), clusterName, namespace) - resources = append(resources, resource) - } - - deployments, err := clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) - if err != nil { - return err - } - - for _, deployment := range deployments.Items { - resource := processDeployment(context.Background(), clusterName, deployment) - resources = append(resources, resource) - } - - if clusterResource != nil && clusterResource.JSON200 != nil { - for _, resource := range resources { - for key, value := range clusterResource.JSON200.Metadata { - if strings.HasPrefix(key, "tags/") { - continue - } - if _, exists := resource.Metadata[key]; !exists { - resource.Metadata[key] = value - } - } - resource.Metadata["kubernetes/name"] = clusterResource.JSON200.Name - } - } - return ctrlp.UpsertResources(ctx, resources, &providerName) }, } cmd.Flags().StringVarP(&providerName, "provider", "p", "", "Name of the resource provider") cmd.Flags().StringVarP(&clusterIdentifier, "cluster-identifier", "c", "", "The identifier of the parent cluster in ctrlplane (if not provided, will use the CLUSTER_IDENTIFIER environment variable)") cmd.Flags().StringVarP(&clusterName, "cluster-name", "n", "", "The name of the cluster") + cmd.Flags().VarP(&selectors, "selector", "s", "Select resources to sync [nodes|deployments|namespaces]. Repeat the flag to select multiple resources; omit it to sync all resources.") return cmd } diff --git a/cmd/ctrlc/root/sync/kubernetes/resource_types.go b/cmd/ctrlc/root/sync/kubernetes/resource_types.go new file mode 100644 index 0000000..04b1a31 --- /dev/null +++ b/cmd/ctrlc/root/sync/kubernetes/resource_types.go @@ -0,0 +1,60 @@ +package kubernetes + +import ( + "fmt" + "slices" + "strings" +) + +type ResourceType string + +const ( + ResourceNamespace ResourceType = "namespaces" + ResourceNode ResourceType = "nodes" + ResourceDeployment ResourceType = "deployments" +) + +func (r ResourceType) String() string { + return string(r) +} + +func ParseResourceType(s string) (ResourceType, error) { + switch ResourceType(s) { + case ResourceNamespace, ResourceNode, ResourceDeployment: + return ResourceType(s), nil + default: + return "", fmt.Errorf("invalid resource type %q", s) + } +} + +type ResourceTypes []ResourceType + +func (r ResourceTypes) ShouldFetch(target ResourceType) bool { + return slices.Contains(r, target) || len(r) == 0 +} + +func (r *ResourceTypes) String() string { + if r == nil { + return "" + } + out := make([]string, len(*r)) + for i, v := range *r { + out[i] = v.String() + } + return strings.Join(out, ",") +} + +func (s *ResourceTypes) Type() string { + return "resourceType" +} + +func (r *ResourceTypes) Set(value string) error { + // supports repeated flags like: + // --resource namespace --resource node + rt, err := ParseResourceType(value) + if err != nil { + return err + } + *r = append(*r, rt) + return nil +} diff --git a/cmd/ctrlc/root/sync/kubernetes/sync_processor.go b/cmd/ctrlc/root/sync/kubernetes/sync_processor.go new file mode 100644 index 0000000..25af3bd --- /dev/null +++ b/cmd/ctrlc/root/sync/kubernetes/sync_processor.go @@ -0,0 +1,227 @@ +package kubernetes + +import ( + "context" + "fmt" + "strings" + + "github.com/charmbracelet/log" + "github.com/ctrlplanedev/cli/internal/api" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type syncConfig struct { + clusterIdentifier string + workspaceID string + clusterName string + kubeConfig *rest.Config + client *api.ClientWithResponses + resources []api.ResourceProviderResource +} + +func (s *syncConfig) FetchNodes(ctx context.Context, clientset *kubernetes.Clientset) error { + nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + resources := make([]api.ResourceProviderResource, 0) + for _, node := range nodes.Items { + processedNode := processNode(ctx, s.clusterName, node) + resources = append(resources, processedNode) + } + s.resources = append(s.resources, resources...) + return nil +} + +func (s *syncConfig) FetchNamespaces(ctx context.Context, clientset *kubernetes.Clientset) error { + namespaces, err := clientset.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + + resources := make([]api.ResourceProviderResource, 0) + for _, namespace := range namespaces.Items { + resource := processNamespace(context.Background(), s.clusterName, namespace) + resources = append(resources, resource) + } + s.resources = append(s.resources, resources...) + return nil + +} + +func (s *syncConfig) FetchDeployments(ctx context.Context, clientset *kubernetes.Clientset) error { + deployments, err := clientset.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + resources := make([]api.ResourceProviderResource, 0) + for _, deployment := range deployments.Items { + resource := processDeployment(context.Background(), s.clusterName, deployment) + resources = append(resources, resource) + } + s.resources = append(s.resources, resources...) + return nil +} + +func newSync(clusterIdentifier string, workspaceID string, client *api.ClientWithResponses, kubeConfig *rest.Config, clusterName string) *syncConfig { + return &syncConfig{ + clusterIdentifier: clusterIdentifier, + workspaceID: workspaceID, + client: client, + clusterName: clusterName, + kubeConfig: kubeConfig, + resources: make([]api.ResourceProviderResource, 0), + } +} + +func processNamespace(_ context.Context, clusterName string, namespace corev1.Namespace) api.ResourceProviderResource { + metadata := map[string]string{} + for key, value := range namespace.Labels { + metadata[fmt.Sprintf("tags/%s", key)] = value + } + + metadata["kubernetes/namespace"] = namespace.Name + metadata["namespace/id"] = string(namespace.UID) + metadata["namespace/api-version"] = namespace.APIVersion + metadata["namespace/status"] = string(namespace.Status.Phase) + + return api.ResourceProviderResource{ + Version: "ctrlplane.dev/kubernetes/namespace/v1", + Kind: "KubernetesNamespace", + Name: fmt.Sprintf("%s/%s", clusterName, namespace.Name), + Identifier: string(namespace.UID), + Config: map[string]any{ + "id": string(namespace.UID), + "name": namespace.Name, + "status": namespace.Status.Phase, + }, + Metadata: metadata, + } +} + +func processNode(_ context.Context, clusterName string, node corev1.Node) api.ResourceProviderResource { + metadata := make(map[string]string) + for key, value := range node.Labels { + metadata[fmt.Sprintf("tags/%s", key)] = value + } + metadata["kubernetes/uid"] = string(node.UID) + metadata["kubernetes/created-at"] = node.CreationTimestamp.String() + // Get node ready status + nodeReady := "Unknown" + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + nodeReady = string(condition.Status) + break + } + } + metadata["kubernetes/node-ready"] = nodeReady + // Get node role from labels + nodeRole := "worker" + for label := range node.Labels { + if role, ok := strings.CutPrefix(label, "node-role.kubernetes.io/"); ok && role != "" { + nodeRole = role + break + } + } + metadata["kubernetes/node-role"] = nodeRole + metadata["kubernetes/kubelet-version"] = node.Status.NodeInfo.KubeletVersion + metadata["kubernetes/os-image"] = node.Status.NodeInfo.OSImage + metadata["kubernetes/architecture"] = node.Status.NodeInfo.Architecture + metadata["kubernetes/container-runtime"] = node.Status.NodeInfo.ContainerRuntimeVersion + + return api.ResourceProviderResource{ + Version: "ctrlplane.dev/kubernetes/node/v1", + Kind: "KubernetesNode", + Name: fmt.Sprintf("%s/%s", clusterName, node.Name), + Identifier: string(node.UID), + Config: map[string]any{ + "id": string(node.UID), + "name": node.Name, + "status": node.Status.Phase, + }, + Metadata: metadata, + } +} + +func processDeployment(_ context.Context, clusterName string, deployment appsv1.Deployment) api.ResourceProviderResource { + metadata := map[string]string{} + for key, value := range deployment.Labels { + metadata[fmt.Sprintf("tags/%s", key)] = value + } + metadata["deployment/name"] = deployment.Name + metadata["deployment/id"] = string(deployment.UID) + metadata["deployment/api-version"] = deployment.APIVersion + metadata["deployment/namespace"] = deployment.Namespace + + return api.ResourceProviderResource{ + Version: "ctrlplane.dev/kubernetes/deployment/v1", + Kind: "KubernetesDeployment", + Name: fmt.Sprintf("%s/%s/%s", clusterName, deployment.Namespace, deployment.Name), + Identifier: string(deployment.UID), + Config: map[string]any{ + "id": string(deployment.UID), + "name": deployment.Name, + "namespace": deployment.Namespace, + }, + Metadata: metadata, + } +} + +func (s *syncConfig) process(ctx context.Context, selectors ResourceTypes) ([]api.ResourceProviderResource, error) { + clusterResource, err := s.client.GetResourceByIdentifierWithResponse(ctx, s.workspaceID, s.clusterIdentifier) + if err != nil { + log.Warn("Failed to get cluster resource", "identifier", s.clusterIdentifier, "error", err) + } + if clusterResource != nil && clusterResource.StatusCode() > 499 { + log.Warn("Failed to get cluster resource", "status", clusterResource.StatusCode(), "identifier", s.clusterIdentifier, "error", err) + return nil, fmt.Errorf("error access ctrlplane api: %s", clusterResource.Status()) + } + + if clusterResource != nil && clusterResource.JSON200 != nil { + log.Info("Found cluster resource", "name", clusterResource.JSON200.Name) + s.clusterName = clusterResource.JSON200.Name + } + + clientset, err := kubernetes.NewForConfig(s.kubeConfig) + if err != nil { + return nil, err + } + + if selectors.ShouldFetch(ResourceNamespace) { + if err := s.FetchNamespaces(ctx, clientset); err != nil { + return s.resources, err + } + } + + if selectors.ShouldFetch(ResourceDeployment) { + if err := s.FetchDeployments(ctx, clientset); err != nil { + return s.resources, err + } + } + + if selectors.ShouldFetch(ResourceNode) { + if err := s.FetchNodes(ctx, clientset); err != nil { + return s.resources, err + } + } + + if clusterResource != nil && clusterResource.JSON200 != nil { + for _, resource := range s.resources { + for key, value := range clusterResource.JSON200.Metadata { + if strings.HasPrefix(key, "tags/") { + continue + } + if _, exists := resource.Metadata[key]; !exists { + resource.Metadata[key] = value + } + } + resource.Metadata["kubernetes/name"] = clusterResource.JSON200.Name + } + } + + return s.resources, nil +} diff --git a/cmd/ctrlc/root/sync/netbox/devices/types.go b/cmd/ctrlc/root/sync/netbox/devices/types.go index 7a62e11..e514851 100644 --- a/cmd/ctrlc/root/sync/netbox/devices/types.go +++ b/cmd/ctrlc/root/sync/netbox/devices/types.go @@ -48,17 +48,17 @@ type netboxPrimaryIP struct { } type netboxDeviceType struct { - Id int32 `json:"id"` - Url string `json:"url"` - Display string `json:"display"` - Manufacturer netboxNamedRef `json:"manufacturer"` - Model string `json:"model"` - Slug string `json:"slug"` - Description *string `json:"description"` - DeviceCount int32 `json:"device_count"` + Id int32 `json:"id"` + Url string `json:"url"` + Display string `json:"display"` + Manufacturer netboxNamedRef `json:"manufacturer"` + Model string `json:"model"` + Slug string `json:"slug"` + Description *string `json:"description"` + DeviceCount int32 `json:"device_count"` } -type netboxCustomASN struct { +type netboxCustomASN struct { //nolint:unused ASN int64 `json:"asn"` Description string `json:"description"` Display string `json:"display"`