feat(stacks): simplify WaitForStatus() BE-11505 (#241)
parent
35dcb5ca46
commit
13317ec43c
|
@ -59,7 +59,7 @@ services:
|
|||
|
||||
require.True(t, containerExists(composeContainerName))
|
||||
|
||||
waitResult := <-w.WaitForStatus(ctx, projectName, libstack.StatusCompleted)
|
||||
waitResult := w.WaitForStatus(ctx, projectName, libstack.StatusCompleted)
|
||||
|
||||
require.Empty(t, waitResult.ErrorMsg)
|
||||
require.Equal(t, libstack.StatusCompleted, waitResult.Status)
|
||||
|
|
|
@ -111,74 +111,66 @@ func aggregateStatuses(services []service) (libstack.Status, string) {
|
|||
|
||||
}
|
||||
|
||||
func (c *ComposeDeployer) WaitForStatus(ctx context.Context, name string, status libstack.Status) <-chan libstack.WaitResult {
|
||||
waitResultCh := make(chan libstack.WaitResult)
|
||||
func (c *ComposeDeployer) WaitForStatus(ctx context.Context, name string, status libstack.Status) libstack.WaitResult {
|
||||
waitResult := libstack.WaitResult{Status: status}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
waitResult.ErrorMsg = "failed to wait for status: " + ctx.Err().Error()
|
||||
waitResultCh <- waitResult
|
||||
default:
|
||||
}
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
waitResult.ErrorMsg = "failed to wait for status: " + ctx.Err().Error()
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
return waitResult
|
||||
}
|
||||
|
||||
var containerSummaries []api.ContainerSummary
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
if err := withComposeService(ctx, nil, libstack.Options{ProjectName: name}, func(composeService api.Service, project *types.Project) error {
|
||||
var err error
|
||||
var containerSummaries []api.ContainerSummary
|
||||
|
||||
psCtx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancelFunc()
|
||||
containerSummaries, err = composeService.Ps(psCtx, name, api.PsOptions{All: true})
|
||||
if err := withComposeService(ctx, nil, libstack.Options{ProjectName: name}, func(composeService api.Service, project *types.Project) error {
|
||||
var err error
|
||||
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Debug().
|
||||
Str("project_name", name).
|
||||
Err(err).
|
||||
Msg("error from docker compose ps")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
services := serviceListFromContainerSummary(containerSummaries)
|
||||
|
||||
if len(services) == 0 && status == libstack.StatusRemoved {
|
||||
waitResultCh <- waitResult
|
||||
return
|
||||
}
|
||||
|
||||
aggregateStatus, errorMessage := aggregateStatuses(services)
|
||||
if aggregateStatus == status {
|
||||
waitResultCh <- waitResult
|
||||
return
|
||||
}
|
||||
|
||||
if status == libstack.StatusRunning && aggregateStatus == libstack.StatusCompleted {
|
||||
waitResult.Status = libstack.StatusCompleted
|
||||
waitResultCh <- waitResult
|
||||
return
|
||||
}
|
||||
|
||||
if errorMessage != "" {
|
||||
waitResult.ErrorMsg = errorMessage
|
||||
waitResultCh <- waitResult
|
||||
return
|
||||
}
|
||||
psCtx, cancelFunc := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancelFunc()
|
||||
containerSummaries, err = composeService.Ps(psCtx, name, api.PsOptions{All: true})
|
||||
|
||||
return err
|
||||
}); err != nil {
|
||||
log.Debug().
|
||||
Str("project_name", name).
|
||||
Str("required_status", string(status)).
|
||||
Str("status", string(aggregateStatus)).
|
||||
Msg("waiting for status")
|
||||
}
|
||||
}()
|
||||
Err(err).
|
||||
Msg("error from docker compose ps")
|
||||
|
||||
return waitResultCh
|
||||
continue
|
||||
}
|
||||
|
||||
services := serviceListFromContainerSummary(containerSummaries)
|
||||
|
||||
if len(services) == 0 && status == libstack.StatusRemoved {
|
||||
return waitResult
|
||||
}
|
||||
|
||||
aggregateStatus, errorMessage := aggregateStatuses(services)
|
||||
if aggregateStatus == status {
|
||||
return waitResult
|
||||
}
|
||||
|
||||
if status == libstack.StatusRunning && aggregateStatus == libstack.StatusCompleted {
|
||||
waitResult.Status = libstack.StatusCompleted
|
||||
|
||||
return waitResult
|
||||
}
|
||||
|
||||
if errorMessage != "" {
|
||||
waitResult.ErrorMsg = errorMessage
|
||||
|
||||
return waitResult
|
||||
}
|
||||
|
||||
log.Debug().
|
||||
Str("project_name", name).
|
||||
Str("required_status", string(status)).
|
||||
Str("status", string(aggregateStatus)).
|
||||
Msg("waiting for status")
|
||||
}
|
||||
}
|
||||
|
||||
func serviceListFromContainerSummary(containerSummaries []api.ContainerSummary) []service {
|
||||
|
|
|
@ -106,8 +106,7 @@ func waitForStatus(deployer libstack.Deployer, ctx context.Context, stackName st
|
|||
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
statusCh := deployer.WaitForStatus(ctx, stackName, requiredStatus)
|
||||
result := <-statusCh
|
||||
result := deployer.WaitForStatus(ctx, stackName, requiredStatus)
|
||||
if result.ErrorMsg == "" {
|
||||
return result.Status, "", nil
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ type Deployer interface {
|
|||
Pull(ctx context.Context, filePaths []string, options Options) error
|
||||
Run(ctx context.Context, filePaths []string, serviceName string, options RunOptions) error
|
||||
Validate(ctx context.Context, filePaths []string, options Options) error
|
||||
WaitForStatus(ctx context.Context, name string, status Status) <-chan WaitResult
|
||||
WaitForStatus(ctx context.Context, name string, status Status) WaitResult
|
||||
Config(ctx context.Context, filePaths []string, options Options) ([]byte, error)
|
||||
GetExistingEdgeStacks(ctx context.Context) ([]EdgeStack, error)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue