diff --git a/cmd/main.go b/cmd/main.go index 54524ed..08889df 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -36,7 +36,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" schedulev1 "github.com/baschno/tdset-operator/api/v1" - "github.com/baschno/tdset-operator/internal/controller" // +kubebuilder:scaffold:imports ) diff --git a/internal/controller/deployment.go b/internal/controller/deployment.go new file mode 100644 index 0000000..44993ba --- /dev/null +++ b/internal/controller/deployment.go @@ -0,0 +1,218 @@ +package controllers + +import ( + "context" + "fmt" + + schedulev1 "github.com/baschno/tdset-operator/api/v1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func (r *TDSetReconciler) Deployment( + ctx context.Context, req ctrl.Request, + tdSet *schedulev1.TDSet, +) (*appsv1.Deployment, error) { + log := log.FromContext(ctx) + + replicas, err := r.GetExpectedReplica(ctx, req, tdSet) + if err != nil { + log.Error(err, "failed to get expected replica") + + return nil, err + } + + labels := map[string]string{ + "app.kubernetes.io/name": "TDSet", + "app.kubernetes.io/instance": tdSet.Name, + "app.kubernetes.io/version": "v1", + "app.kubernetes.io/part-of": "tdset-operator", + "app.kubernetes.io/created-by": "controller-manager", + } + + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: tdSet.Name, + Namespace: tdSet.Namespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Image: tdSet.Spec.Container.Image, + Name: tdSet.Name, + ImagePullPolicy: corev1.PullIfNotPresent, + Ports: []corev1.ContainerPort{{ + ContainerPort: int32(tdSet.Spec.Container.Port), + Name: "tdset", + }}, + }}, + }, + }, + }, + } + + // Set the ownerRef for the Deployment + // More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/owners-dependents/ + if err := ctrl.SetControllerReference(tdSet, dep, r.Scheme); err != nil { + log.Error(err, "failed to set controller owner reference") + return nil, err + } + + return dep, nil +} + +func (r *TDSetReconciler) DeploymentIfNotExist( + ctx context.Context, req ctrl.Request, + tdSet *schedulev1.TDSet, +) (bool, error) { + log := log.FromContext(ctx) + + dep := &appsv1.Deployment{} + + err := r.Get(ctx, types.NamespacedName{Name: tdSet.Name, Namespace: tdSet.Namespace}, dep) + if err != nil && apierrors.IsNotFound(err) { + dep, err := r.Deployment(ctx, req, tdSet) + if err != nil { + log.Error(err, "Failed to define new Deployment resource for TDSet") + + err = r.SetCondition( + ctx, req, tdSet, TypeAvailable, + fmt.Sprintf("Failed to create Deployment for TDSet (%s): (%s)", tdSet.Name, err), + ) + if err != nil { + return false, err + } + } + + log.Info( + "Creating a new Deployment", + "Deployment.Namespace", dep.Namespace, + "Deployment.Name", dep.Name, + ) + + err = r.Create(ctx, dep) + if err != nil { + log.Error( + err, "Failed to create new Deployment", + "Deployment.Namespace", dep.Namespace, + "Deployment.Name", dep.Name, + ) + + return false, err + } + + err = r.GetTDSet(ctx, req, tdSet) + if err != nil { + log.Error(err, "Failed to re-fetch TDSet") + return false, err + } + + err = r.SetCondition( + ctx, req, tdSet, TypeProgressing, + fmt.Sprintf("Created Deployment for the TDSet: (%s)", tdSet.Name), + ) + if err != nil { + return false, err + } + + return true, nil + } + + if err != nil { + log.Error(err, "Failed to get Deployment") + + return false, err + } + + return false, nil +} + +func (r *TDSetReconciler) UpdateDeploymentReplica( + ctx context.Context, req ctrl.Request, + tdSet *schedulev1.TDSet, +) error { + log := log.FromContext(ctx) + + dep := &appsv1.Deployment{} + + err := r.Get(ctx, types.NamespacedName{Name: tdSet.Name, Namespace: tdSet.Namespace}, dep) + if err != nil { + log.Error(err, "Failed to get Deployment") + + return err + } + + replicas, err := r.GetExpectedReplica(ctx, req, tdSet) + if err != nil { + log.Error(err, "failed to get expected replica") + + return err + } + + if replicas == *dep.Spec.Replicas { + return nil + } + + log.Info( + "Updating a Deployment replica", + "Deployment.Namespace", dep.Namespace, + "Deployment.Name", dep.Name, + ) + + dep.Spec.Replicas = &replicas + + err = r.Update(ctx, dep) + if err != nil { + log.Error( + err, "Failed to update Deployment", + "Deployment.Namespace", dep.Namespace, + "Deployment.Name", dep.Name, + ) + + err = r.GetTDSet(ctx, req, tdSet) + if err != nil { + log.Error(err, "Failed to re-fetch TDSet") + return err + } + + err = r.SetCondition( + ctx, req, tdSet, TypeProgressing, + fmt.Sprintf("Failed to update replica for the TDSet (%s): (%s)", tdSet.Name, err), + ) + if err != nil { + return err + } + + return nil + } + + err = r.GetTDSet(ctx, req, tdSet) + if err != nil { + log.Error(err, "Failed to re-fetch TDSet") + return err + } + + err = r.SetCondition( + ctx, req, tdSet, TypeProgressing, + fmt.Sprintf("Updated replica for the TDSet (%s)", tdSet.Name), + ) + if err != nil { + return err + } + + return nil +} diff --git a/internal/controller/schedule.go b/internal/controller/schedule.go new file mode 100644 index 0000000..f4a7e23 --- /dev/null +++ b/internal/controller/schedule.go @@ -0,0 +1,30 @@ +package controllers + +import ( + "context" + "time" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log" + + schedulev1 "github.com/baschno/tdset-operator/api/v1" +) + +func (r *TDSetReconciler) GetExpectedReplica(ctx context.Context, req ctrl.Request, tdSet *schedulev1.TDSet) (int32, error) { + log := log.FromContext(ctx) + + if tdSet.Spec.SchedulingConfig != nil && len(tdSet.Spec.SchedulingConfig) != 0 { + now := time.Now() + hour := now.Hour() + + log.Info("current server", "hour", hour, "time", now) + + for _, config := range tdSet.Spec.SchedulingConfig { + if hour >= config.StartTime && hour < config.EndTime { + return int32(config.Replica), nil + } + } + } + + return tdSet.Spec.DefaultReplica, nil +} diff --git a/internal/controller/tdset.go b/internal/controller/tdset.go new file mode 100644 index 0000000..79e6153 --- /dev/null +++ b/internal/controller/tdset.go @@ -0,0 +1,76 @@ +package controllers + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/log" + + schedulev1 "github.com/baschno/tdset-operator/api/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" +) + +// ConditionStatus defines TDSet condition status. +type ConditionStatus string + +// Defines TDSet condition status. +const ( + TypeAvailable ConditionStatus = "Available" + TypeProgressing ConditionStatus = "Progressing" + TypeDegraded ConditionStatus = "Degraded" +) + +// GetTDSet gets the TDSet from api server. +func (r *TDSetReconciler) GetTDSet(ctx context.Context, req ctrl.Request, tdSet *schedulev1.TDSet) error { + err := r.Get(ctx, req.NamespacedName, tdSet) + if err != nil { + return err + } + + return nil +} + +// SetInitialCondition sets the status condition of the TDSet to available initially +// when no condition exists yet. +func (r *TDSetReconciler) SetInitialCondition(ctx context.Context, req ctrl.Request, tdSet *schedulev1.TDSet) error { + if tdSet.Status.Conditions != nil || len(tdSet.Status.Conditions) != 0 { + return nil + } + + err := r.SetCondition(ctx, req, tdSet, TypeAvailable, "Starting reconciliation") + + return err +} + +// SetCondition sets the status condition of the TDSet. +func (r *TDSetReconciler) SetCondition( + ctx context.Context, req ctrl.Request, + tdSet *schedulev1.TDSet, condition ConditionStatus, + message string, +) error { + log := log.FromContext(ctx) + + meta.SetStatusCondition( + &tdSet.Status.Conditions, + metav1.Condition{ + Type: string(condition), + Status: metav1.ConditionUnknown, Reason: "Reconciling", + Message: message, + }, + ) + + if err := r.Status().Update(ctx, tdSet); err != nil { + log.Error(err, "Failed to update TDSet status") + + return err + } + + if err := r.Get(ctx, req.NamespacedName, tdSet); err != nil { + log.Error(err, "Failed to re-fetch TDSet") + + return err + } + + return nil +}