Limit number of concurrent goroutines

This commit is contained in:
Mikołaj Pęczkowski 2021-11-08 18:30:45 +01:00
parent 5fd9bc851b
commit 62dadc53bf
12 changed files with 206 additions and 103 deletions

View File

@ -26,12 +26,13 @@ By default, the config file is searched for in `[HOME_DIR]./config/grm/config.ya
### Global args
| argument | type | default | Description |
|---------------------------|----------|--------------------------------------|------------------------------------------------------------------------|
|------------------------------|----------|--------------------------------------|------------------------------------------------------------------------|
| **-c**, **--config-file** | *string* | `[HOME_DIR]./config/grm/config.yaml` | Path to configuration file, where the repositories must be specified |
| **-v**, **--version** | *bool* | `false` | Display current version |
| **--no-color** | *bool* | `false` | Turning off the display of output in color |
| **-n** **--name** | *string* | `empty` | Limit action to the specified repository name |
| **-t** **--tag** | *string* | `empty` | Limit action to the specified repository tag (may be more than one tag)|
| **max-concurrent-process** | *string* | `empty` | Determine how many tasks can run simultaneously |
### Commands

View File

@ -4,7 +4,7 @@ import (
"errors"
"fmt"
"io"
"os"
"sync"
"gitlab.com/revalus/grm/commands"
"gitlab.com/revalus/grm/config"
@ -24,28 +24,34 @@ type GitRepositoryManager struct {
configuration config.Configuration
}
func (g *GitRepositoryManager) Parse(args []string) {
checkCriticalError := func(err error) {
func (g *GitRepositoryManager) Parse(args []string) error {
arguments, err := config.ParseCliArguments(APP_NAME, APP_DESCRIPTION, args)
if err != nil {
fmt.Printf("Error: %v", err.Error())
os.Exit(2)
return err
}
}
arguments, err := config.ParseCliArguments(APP_NAME, APP_DESCRIPTION, args)
checkCriticalError(err)
configFileContent, err := getFileContent(arguments.ConfigurationFile)
checkCriticalError(err)
if err != nil {
fmt.Printf("Error: %v", err.Error())
return err
}
fileExcension, err := getFileExcension(arguments.ConfigurationFile)
checkCriticalError(err)
if err != nil {
fmt.Printf("Error: %v", err.Error())
return err
}
configuration, err := config.GetRepositoryConfig(configFileContent, fileExcension)
checkCriticalError(err)
if err != nil {
fmt.Printf("Error: %v", err.Error())
return err
}
g.cliArguments = arguments
g.configuration = configuration
return nil
}
func (g *GitRepositoryManager) Run(w io.Writer) int {
@ -55,7 +61,7 @@ func (g *GitRepositoryManager) Run(w io.Writer) int {
exitCode := 0
if len(g.cliArguments.LimitTags) != 0 {
if len(g.cliArguments.LimitToTags) != 0 {
err := g.limitTags()
if err != nil {
echo.ErrorfMsg(err.Error())
@ -63,7 +69,7 @@ func (g *GitRepositoryManager) Run(w io.Writer) int {
}
}
if g.cliArguments.LimitName != "" {
if g.cliArguments.LimitToName != "" {
err := g.limitName()
if err != nil {
echo.ErrorfMsg(err.Error())
@ -90,7 +96,7 @@ func (g *GitRepositoryManager) Run(w io.Writer) int {
return exitCode
}
func (g GitRepositoryManager) describeStatus(status commands.CommandStatus) {
func describeStatus(status commands.CommandStatus) {
if status.Error {
echo.RedMessageF("Repository \"%v\": an error occurred: %v", status.Name, status.Message)
return
@ -107,7 +113,7 @@ func (g *GitRepositoryManager) limitTags() error {
limitedTagsTmp := []config.RepositoryConfig{}
for _, item := range g.configuration.Repositories {
if checkAnyOfItemInSlice(item.Tags, g.cliArguments.LimitTags) {
if checkAnyOfItemInSlice(item.Tags, g.cliArguments.LimitToTags) {
limitedTagsTmp = append(limitedTagsTmp, item)
}
}
@ -120,7 +126,7 @@ func (g *GitRepositoryManager) limitTags() error {
func (g *GitRepositoryManager) limitName() error {
for _, item := range g.configuration.Repositories {
if g.cliArguments.LimitName == item.Name {
if g.cliArguments.LimitToName == item.Name {
g.configuration.Repositories = []config.RepositoryConfig{item}
return nil
}
@ -129,13 +135,19 @@ func (g *GitRepositoryManager) limitName() error {
}
func (g *GitRepositoryManager) runCommand(cmd commands.Command) {
statusChan := make(chan commands.CommandStatus)
routines := make(chan struct{}, g.cliArguments.Routines)
var wg sync.WaitGroup
for _, repo := range g.configuration.Repositories {
go cmd.Command(repo, statusChan)
}
wg.Add(1)
for range g.configuration.Repositories {
g.describeStatus(<-statusChan)
go func(r config.RepositoryConfig) {
defer wg.Done()
routines <- struct{}{}
describeStatus(cmd.Command(r))
<-routines
}(repo)
}
wg.Wait()
}

View File

@ -31,7 +31,7 @@ func (emt ExpectedMessageTester) Write(p []byte) (n int, err error) {
return 0, nil
}
func (fk FakeCommandToTest) Command(repoCfg config.RepositoryConfig, cmdStatus chan commands.CommandStatus) {
func (fk FakeCommandToTest) Command(repoCfg config.RepositoryConfig) commands.CommandStatus {
status := commands.CommandStatus{
Name: repoCfg.Name,
Changed: false,
@ -46,7 +46,7 @@ func (fk FakeCommandToTest) Command(repoCfg config.RepositoryConfig, cmdStatus c
status.Changed = true
}
cmdStatus <- status
return status
}
func prepareConfigContent() (string, string) {
@ -125,6 +125,7 @@ func TestOutputFromSync(t *testing.T) {
Sync: true,
Version: true,
Color: false,
Routines: 10,
},
}
emt := ExpectedMessageTester{
@ -140,7 +141,8 @@ func TestOutputFromSync(t *testing.T) {
func TestLimitTags(t *testing.T) {
grm := GitRepositoryManager{
cliArguments: config.CliArguments{
LimitTags: []string{"example"},
LimitToTags: []string{"example"},
Routines: 10,
},
configuration: config.Configuration{
Repositories: []config.RepositoryConfig{
@ -169,7 +171,8 @@ func TestLimitTags(t *testing.T) {
func TestLimitName(t *testing.T) {
grm := GitRepositoryManager{
cliArguments: config.CliArguments{
LimitName: "notExample",
LimitToName: "notExample",
Routines: 10,
},
configuration: config.Configuration{
Repositories: []config.RepositoryConfig{
@ -196,7 +199,8 @@ func TestLimitName(t *testing.T) {
func TestRunWithNotExistingNameInLimit(t *testing.T) {
grm := GitRepositoryManager{
cliArguments: config.CliArguments{
LimitName: "not-existing-name",
LimitToName: "not-existing-name",
Routines: 10,
},
configuration: config.Configuration{
Repositories: []config.RepositoryConfig{
@ -221,7 +225,8 @@ func TestRunWithNotExistingNameInLimit(t *testing.T) {
func TestRunWithNotExistingTagsInLimit(t *testing.T) {
grm := GitRepositoryManager{
cliArguments: config.CliArguments{
LimitTags: []string{"not-existing-tag"},
LimitToTags: []string{"not-existing-tag"},
Routines: 10,
},
configuration: config.Configuration{
Repositories: []config.RepositoryConfig{
@ -250,6 +255,7 @@ func TestGetStatusOutput(t *testing.T) {
},
cliArguments: config.CliArguments{
Status: true,
Routines: 10,
},
}
emt := ExpectedMessageTester{
@ -262,6 +268,106 @@ func TestGetStatusOutput(t *testing.T) {
if status != 0 {
t.Errorf("Expected to get status %v, instead o this got %v", 1, status)
}
// Output:
// Info: Current status of repositories
}
func TestDescribeStatusErrorNoColor(t *testing.T) {
emt := ExpectedMessageTester{
expectedMessages: []string{
"Repository \"Test\": an error occurred: test\n",
},
}
echo.Color(false)
echo.Output(emt)
status := commands.CommandStatus{
Name: "Test",
Message: "test",
Error: true,
}
describeStatus(status)
}
func TestDescribeStatusErrorColor(t *testing.T) {
emt := ExpectedMessageTester{
expectedMessages: []string{
fmt.Sprintf("%vRepository \"Test\": an error occurred: test%v\n", echo.ColorRed, echo.ColorReset),
},
}
echo.Color(true)
echo.Output(emt)
status := commands.CommandStatus{
Name: "Test",
Message: "test",
Error: true,
}
describeStatus(status)
}
func TestDescribeStatusChangedNoColor(t *testing.T) {
emt := ExpectedMessageTester{
expectedMessages: []string{
"Repository \"Test\": test\n",
},
}
echo.Color(false)
echo.Output(emt)
status := commands.CommandStatus{
Name: "Test",
Message: "test",
Changed: true,
}
describeStatus(status)
}
func TestDescribeStatusChangedColor(t *testing.T) {
emt := ExpectedMessageTester{
expectedMessages: []string{
fmt.Sprintf("%vRepository \"Test\": test%v\n", echo.ColorYellow, echo.ColorReset),
},
}
echo.Color(true)
echo.Output(emt)
status := commands.CommandStatus{
Name: "Test",
Message: "test",
Changed: true,
}
describeStatus(status)
}
func TestDescribeStatusNoChangeNoColor(t *testing.T) {
emt := ExpectedMessageTester{
expectedMessages: []string{
"Repository \"Test\": test\n",
},
}
echo.Color(false)
echo.Output(emt)
status := commands.CommandStatus{
Name: "Test",
Message: "test",
Changed: false,
}
describeStatus(status)
}
func TestDescribeStatusNoChangeColor(t *testing.T) {
emt := ExpectedMessageTester{
expectedMessages: []string{
fmt.Sprintf("%vRepository \"Test\": test%v\n", echo.ColorGreen, echo.ColorReset),
},
}
echo.Color(true)
echo.Output(emt)
status := commands.CommandStatus{
Name: "Test",
Message: "test",
Changed: false,
}
describeStatus(status)
}

View File

@ -3,7 +3,7 @@ package commands
import "gitlab.com/revalus/grm/config"
type Command interface {
Command(repoCfg config.RepositoryConfig, cmdStatus chan CommandStatus)
Command(repoCfg config.RepositoryConfig) CommandStatus
}
type CommandStatus struct {
Name string

View File

@ -71,7 +71,7 @@ func findNumberOfCommitDiffs(srcCommit *object.Commit, dstCommit *object.Commit)
}
}
func (sc StatusChecker) Command(repoCfg config.RepositoryConfig, status chan CommandStatus) {
func (sc StatusChecker) Command(repoCfg config.RepositoryConfig) CommandStatus {
cmdStatus := CommandStatus{
Name: repoCfg.Name,
@ -86,32 +86,28 @@ func (sc StatusChecker) Command(repoCfg config.RepositoryConfig, status chan Com
if err != nil {
cmdStatus.Error = true
cmdStatus.Message = err.Error()
status <- cmdStatus
return
return cmdStatus
}
headReference, err := repo.Head()
if err != nil {
cmdStatus.Error = true
cmdStatus.Message = err.Error()
status <- cmdStatus
return
return cmdStatus
}
remotes, err := repo.Remotes()
if err != nil || len(remotes) == 0 {
cmdStatus.Error = true
cmdStatus.Message = "cannot find remote branches"
status <- cmdStatus
return
return cmdStatus
}
currentBranchCommit, err := repo.CommitObject(headReference.Hash())
if err != nil {
cmdStatus.Error = true
cmdStatus.Message = err.Error()
status <- cmdStatus
return
return cmdStatus
}
type remoteStatus struct {
@ -160,5 +156,5 @@ func (sc StatusChecker) Command(repoCfg config.RepositoryConfig, status chan Com
cmdStatus.Message = fmt.Sprintf("%v - ( | %v | \u2191%v \u2193%v )", cmdStatus.Message, remoteName, status.ahead, status.behind)
}
status <- cmdStatus
return cmdStatus
}

View File

@ -109,9 +109,7 @@ func TestCommandRepositoryDoesNotExists(t *testing.T) {
Dest: tmpDirWithInitialRepository.rootFS.Root(),
}
ch := make(chan CommandStatus)
go sc.Command(repoCfg, ch)
repoStatus := <-ch
repoStatus := sc.Command(repoCfg)
expectedMessage := "repository does not exist"
if !repoStatus.Error {
@ -149,10 +147,7 @@ func TestCommandRepositoryNoRemoteBranch(t *testing.T) {
Dest: dirNameForLocalRepository,
}
ch := make(chan CommandStatus)
go sc.Command(repoCfg, ch)
repoStatus := <-ch
repoStatus := sc.Command(repoCfg)
expectedMessage := "cannot find remote branches"
if !repoStatus.Error {
@ -171,9 +166,7 @@ func TestCommandAllCorrectWithoutChanges(t *testing.T) {
sc, _, repoCfg, _ := getBaseForTestingSyncCommand()
ch := make(chan CommandStatus)
go sc.Command(repoCfg, ch)
repoStatus := <-ch
repoStatus := sc.Command(repoCfg)
expectedMessage := "branch master - ( | origin | \u21910 \u21930 )"
if repoStatus.Error {
@ -191,15 +184,12 @@ func TestCommandAllCorrectWithOneChange(t *testing.T) {
sc, fakeLocalRepository, repoCfg, _ := getBaseForTestingSyncCommand()
ch := make(chan CommandStatus)
fakeLocalWorkTree, err := fakeLocalRepository.Worktree()
checkErrorDuringPreparation(err)
makeCommit(fakeLocalWorkTree, "commit 1")
go sc.Command(repoCfg, ch)
repoStatus := <-ch
repoStatus := sc.Command(repoCfg)
expectedMessage := "branch master - ( | origin | \u21911 \u21930 )"
if repoStatus.Message != expectedMessage {
@ -220,9 +210,7 @@ func TestCommandAllCorrectWithOneChange(t *testing.T) {
func TestCommandMultiRemoteNoChanges(t *testing.T) {
sc, _, repoCfg := getBaseForTestingSyncMultipleRemote()
ch := make(chan CommandStatus)
go sc.Command(repoCfg, ch)
repoStatus := <-ch
repoStatus := sc.Command(repoCfg)
expectedMessage := "branch master - ( | origin | \u21910 \u21930 ) - ( | subremote | \u21910 \u21930 )"
if repoStatus.Error {
@ -246,9 +234,7 @@ func TestCommandMultiRemoteWithOneChange(t *testing.T) {
makeCommit(fakeLocalWorkTree, "commit 1")
checkErrorDuringPreparation(err)
ch := make(chan CommandStatus)
go sc.Command(repoCfg, ch)
repoStatus := <-ch
repoStatus := sc.Command(repoCfg)
expectedMessage := "branch master - ( | origin | \u21911 \u21930 ) - ( | subremote | \u21911 \u21930 )"
if repoStatus.Error {

View File

@ -49,7 +49,7 @@ func cloneRepository(destPath string, repoCfg *config.RepositoryConfig) (bool, e
return true, nil
}
func (s Synchronizer) Command(repoCfg config.RepositoryConfig, status chan CommandStatus) {
func (s Synchronizer) Command(repoCfg config.RepositoryConfig) CommandStatus {
var err error
cmdStatus := CommandStatus{
@ -75,7 +75,7 @@ func (s Synchronizer) Command(repoCfg config.RepositoryConfig, status chan Comma
} else {
cmdStatus.Error = true
cmdStatus.Message = err.Error()
status <- cmdStatus
return cmdStatus
}
if err != nil {
@ -83,6 +83,6 @@ func (s Synchronizer) Command(repoCfg config.RepositoryConfig, status chan Comma
cmdStatus.Message = err.Error()
}
status <- cmdStatus
return cmdStatus
}

View File

@ -28,12 +28,7 @@ func TestSyncCommand(t *testing.T) {
Dest: "awesome-go",
}
ch := make(chan CommandStatus)
// Pull part
go sync.Command(cfg, ch)
cloneStatus := <-ch
cloneStatus := sync.Command(cfg)
if cloneStatus.Error {
t.Errorf("Unexpected error: %v", cloneStatus.Message)
}
@ -54,11 +49,7 @@ func TestSyncCommand(t *testing.T) {
t.Errorf("Expected to get %v, instead of this got %v", syncCloned, cloneStatus.Message)
}
// Fetch part
go sync.Command(cfg, ch)
fetchStatus := <-ch
fetchStatus := sync.Command(cfg)
if fetchStatus.Error {
t.Errorf("Unexpected error: %v", err.Error())
}

View File

@ -48,6 +48,11 @@ func ParseCliArguments(name, description string, arguments []string) (CliArgumen
Help: "Limit actions to repositories that contain specific tags",
})
limitRoutines := parser.Int("", "max-concurrent-process", &argparse.Options{
Default: 10,
Help: "Determine how many tasks can run simultaneously",
})
if err := parser.Parse(arguments); err != nil {
return CliArguments{}, errors.New(parser.Usage("Please follow this help"))
}
@ -66,7 +71,8 @@ func ParseCliArguments(name, description string, arguments []string) (CliArgumen
Status: statusCMD.Happened(),
Version: *version,
Color: !(*color),
LimitName: *limitName,
LimitTags: *limitTags,
LimitToName: *limitName,
LimitToTags: *limitTags,
Routines: *limitRoutines,
}, nil
}

View File

@ -18,6 +18,7 @@ type CliArguments struct {
Status bool
Version bool
Color bool
LimitName string
LimitTags []string
LimitToName string
LimitToTags []string
Routines int
}

View File

@ -7,11 +7,11 @@ import (
)
const (
colorReset = "\033[0m"
colorRed = "\033[31m"
colorGreen = "\033[32m"
colorYellow = "\033[33m"
colorBlue = "\033[34m"
ColorReset = "\033[0m"
ColorRed = "\033[31m"
ColorGreen = "\033[32m"
ColorYellow = "\033[33m"
ColorBlue = "\033[34m"
)
var (
@ -30,7 +30,7 @@ func Output(writer io.Writer) {
func ErrorfMsg(format string, a ...interface{}) error {
msg := fmt.Sprintf(format, a...)
if useColor {
msg = fmt.Sprintf("%vError:%v %v", colorRed, colorReset, msg)
msg = fmt.Sprintf("%vError:%v %v", ColorRed, ColorReset, msg)
} else {
msg = fmt.Sprintf("Error: %v", msg)
}
@ -40,7 +40,7 @@ func ErrorfMsg(format string, a ...interface{}) error {
func InfoFMsg(format string, a ...interface{}) error {
msg := fmt.Sprintf(format, a...)
if useColor {
msg = fmt.Sprintf("%vInfo:%v %v", colorBlue, colorReset, msg)
msg = fmt.Sprintf("%vInfo:%v %v", ColorBlue, ColorReset, msg)
} else {
msg = fmt.Sprintf("Info: %v", msg)
}
@ -48,19 +48,19 @@ func InfoFMsg(format string, a ...interface{}) error {
}
func GreenMessageF(format string, a ...interface{}) error {
return writeWithColor(fmt.Sprintf(format, a...), colorGreen)
return writeWithColor(fmt.Sprintf(format, a...), ColorGreen)
}
func YellowMessageF(format string, a ...interface{}) error {
return writeWithColor(fmt.Sprintf(format, a...), colorYellow)
return writeWithColor(fmt.Sprintf(format, a...), ColorYellow)
}
func RedMessageF(format string, a ...interface{}) error {
return writeWithColor(fmt.Sprintf(format, a...), colorRed)
return writeWithColor(fmt.Sprintf(format, a...), ColorRed)
}
func writeWithColor(msg string, color string) error {
if useColor {
return write(fmt.Sprintf("%v%v%v", color, msg, colorReset))
return write(fmt.Sprintf("%v%v%v", color, msg, ColorReset))
}
return write(msg)

View File

@ -11,7 +11,11 @@ const ()
func main() {
app := app.GitRepositoryManager{}
app.Parse(os.Args)
err := app.Parse(os.Args)
if err != nil {
os.Exit(2)
}
exitCode := app.Run(os.Stdout)
os.Exit(exitCode)
}