diff --git a/hook/hook.go b/hook/hook.go index 6fcda3e..8689f97 100644 --- a/hook/hook.go +++ b/hook/hook.go @@ -381,6 +381,7 @@ type Hook struct { PassEnvironmentToCommand []Argument `json:"pass-environment-to-command,omitempty"` PassArgumentsToCommand []Argument `json:"pass-arguments-to-command,omitempty"` JSONStringParameters []Argument `json:"parse-parameters-as-json,omitempty"` + MaxConcurrency int `json:"maximum-concurrent-executions,omitempty"` TriggerRule *Rules `json:"trigger-rule,omitempty"` TriggerRuleMismatchHttpResponseCode int `json:"trigger-rule-mismatch-http-response-code,omitempty"` } diff --git a/webhook.go b/webhook.go index d14ff6c..568a7b7 100644 --- a/webhook.go +++ b/webhook.go @@ -45,6 +45,8 @@ var ( watcher *fsnotify.Watcher signals chan os.Signal + + hookExecutions = make(map[string]chan struct{}) ) func matchLoadedHook(id string) *hook.Hook { @@ -110,7 +112,15 @@ func main() { if matchLoadedHook(hook.ID) != nil { log.Fatalf("error: hook with the id %s has already been loaded!\nplease check your hooks file for duplicate hooks ids!\n", hook.ID) } - log.Printf("\tloaded: %s\n", hook.ID) + + msg := fmt.Sprintf("\tloaded: %s", hook.ID) + + // initialize concurrency map + if hook.MaxConcurrency > 0 { + hookExecutions[hook.ID] = make(chan struct{}, hook.MaxConcurrency) + msg = fmt.Sprintf("%s (max: %d)", msg, hook.MaxConcurrency) + } + log.Println(msg) } loadedHooksFromFiles[hooksFilePath] = newHooks @@ -214,6 +224,18 @@ func hookHandler(w http.ResponseWriter, r *http.Request) { if matchedHook := matchLoadedHook(id); matchedHook != nil { log.Printf("[%s] %s got matched\n", rid, id) + // check if we have concurrency limits + if _, ok := hookExecutions[id]; ok { + if len(hookExecutions[id]) == cap(hookExecutions[id]) { + log.Printf("reached concurrency limit for: %s (max=%d)", id, len(hookExecutions[id])) + w.WriteHeader(http.StatusTooManyRequests) + fmt.Fprintf(w, "Hook reached maximum concurrent execution limit. Try again later.") + return + } + defer func() { <-hookExecutions[id] }() + hookExecutions[id] <- struct{}{} + } + body, err := ioutil.ReadAll(r.Body) if err != nil { log.Printf("[%s] error reading the request body. %+v\n", rid, err) @@ -375,36 +397,56 @@ func reloadHooks(hooksFilePath string) { log.Printf("attempting to reload hooks from %s\n", hooksFilePath) err := hooksInFile.LoadFromFile(hooksFilePath) - if err != nil { log.Printf("couldn't load hooks from file! %+v\n", err) - } else { - seenHooksIds := make(map[string]bool) + return + } - log.Printf("found %d hook(s) in file\n", len(hooksInFile)) + seenHooksIds := make(map[string]bool) - for _, hook := range hooksInFile { - wasHookIDAlreadyLoaded := false + log.Printf("found %d hook(s) in file\n", len(hooksInFile)) - for _, loadedHook := range loadedHooksFromFiles[hooksFilePath] { - if loadedHook.ID == hook.ID { - wasHookIDAlreadyLoaded = true - break - } + for _, hook := range hooksInFile { + wasHookIDAlreadyLoaded := false + + for _, loadedHook := range loadedHooksFromFiles[hooksFilePath] { + if loadedHook.ID == hook.ID { + wasHookIDAlreadyLoaded = true + break } - - if (matchLoadedHook(hook.ID) != nil && !wasHookIDAlreadyLoaded) || seenHooksIds[hook.ID] == true { - log.Printf("error: hook with the id %s has already been loaded!\nplease check your hooks file for duplicate hooks ids!", hook.ID) - log.Println("reverting hooks back to the previous configuration") - return - } - - seenHooksIds[hook.ID] = true - log.Printf("\tloaded: %s\n", hook.ID) } - loadedHooksFromFiles[hooksFilePath] = hooksInFile + if (matchLoadedHook(hook.ID) != nil && !wasHookIDAlreadyLoaded) || seenHooksIds[hook.ID] == true { + log.Printf("error: hook with the id %s has already been loaded!\nplease check your hooks file for duplicate hooks ids!", hook.ID) + log.Println("reverting hooks back to the previous configuration") + return + } + + seenHooksIds[hook.ID] = true + msg := fmt.Sprintf("\tloaded: %s", hook.ID) + + // initialize or update concurrency map + switch { + case hook.MaxConcurrency == 0: + if _, ok := hookExecutions[hook.ID]; ok { + delete(hookExecutions, hook.ID) + } + msg = fmt.Sprintf("%s", msg) + case hook.MaxConcurrency > 0: + hookExecutions[hook.ID] = make(chan struct{}, hook.MaxConcurrency) + msg = fmt.Sprintf("%s (max: %d)", msg, hook.MaxConcurrency) + } + log.Println(msg) } + + // clean up hookExecutions channels for removed hooks + for _, loadedHook := range loadedHooksFromFiles[hooksFilePath] { + if _, ok := seenHooksIds[loadedHook.ID]; !ok { + delete(hookExecutions, loadedHook.ID) + } + } + + loadedHooksFromFiles[hooksFilePath] = hooksInFile } func reloadAllHooks() { @@ -416,6 +458,9 @@ func reloadAllHooks() { func removeHooks(hooksFilePath string) { for _, hook := range loadedHooksFromFiles[hooksFilePath] { log.Printf("\tremoving: %s\n", hook.ID) + if _, ok := hookExecutions[hook.ID]; ok { + delete(hookExecutions, hook.ID) + } } newHooksFiles := hooksFiles[:0]