This commit is contained in:
Ivan Pesin 2017-11-06 09:10:32 +00:00 committed by GitHub
commit 0461900734
2 changed files with 68 additions and 22 deletions

View File

@ -381,6 +381,7 @@ type Hook struct {
PassEnvironmentToCommand []Argument `json:"pass-environment-to-command,omitempty"` PassEnvironmentToCommand []Argument `json:"pass-environment-to-command,omitempty"`
PassArgumentsToCommand []Argument `json:"pass-arguments-to-command,omitempty"` PassArgumentsToCommand []Argument `json:"pass-arguments-to-command,omitempty"`
JSONStringParameters []Argument `json:"parse-parameters-as-json,omitempty"` JSONStringParameters []Argument `json:"parse-parameters-as-json,omitempty"`
MaxConcurrency int `json:"maximum-concurrent-executions,omitempty"`
TriggerRule *Rules `json:"trigger-rule,omitempty"` TriggerRule *Rules `json:"trigger-rule,omitempty"`
TriggerRuleMismatchHttpResponseCode int `json:"trigger-rule-mismatch-http-response-code,omitempty"` TriggerRuleMismatchHttpResponseCode int `json:"trigger-rule-mismatch-http-response-code,omitempty"`
} }

View File

@ -45,6 +45,8 @@ var (
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
signals chan os.Signal signals chan os.Signal
hookExecutions = make(map[string]chan struct{})
) )
func matchLoadedHook(id string) *hook.Hook { func matchLoadedHook(id string) *hook.Hook {
@ -110,7 +112,15 @@ func main() {
if matchLoadedHook(hook.ID) != nil { if matchLoadedHook(hook.ID) != nil {
log.Fatalf("error: hook with the id %s has already been loaded!\nplease check your hooks file for duplicate hooks ids!\n", hook.ID) log.Fatalf("error: hook with the id %s has already been loaded!\nplease check your hooks file for duplicate hooks ids!\n", hook.ID)
} }
log.Printf("\tloaded: %s\n", hook.ID)
msg := fmt.Sprintf("\tloaded: %s", hook.ID)
// initialize concurrency map
if hook.MaxConcurrency > 0 {
hookExecutions[hook.ID] = make(chan struct{}, hook.MaxConcurrency)
msg = fmt.Sprintf("%s (max: %d)", msg, hook.MaxConcurrency)
}
log.Println(msg)
} }
loadedHooksFromFiles[hooksFilePath] = newHooks loadedHooksFromFiles[hooksFilePath] = newHooks
@ -214,6 +224,18 @@ func hookHandler(w http.ResponseWriter, r *http.Request) {
if matchedHook := matchLoadedHook(id); matchedHook != nil { if matchedHook := matchLoadedHook(id); matchedHook != nil {
log.Printf("[%s] %s got matched\n", rid, id) log.Printf("[%s] %s got matched\n", rid, id)
// check if we have concurrency limits
if _, ok := hookExecutions[id]; ok {
if len(hookExecutions[id]) == cap(hookExecutions[id]) {
log.Printf("reached concurrency limit for: %s (max=%d)", id, len(hookExecutions[id]))
w.WriteHeader(http.StatusTooManyRequests)
fmt.Fprintf(w, "Hook reached maximum concurrent execution limit. Try again later.")
return
}
defer func() { <-hookExecutions[id] }()
hookExecutions[id] <- struct{}{}
}
body, err := ioutil.ReadAll(r.Body) body, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
log.Printf("[%s] error reading the request body. %+v\n", rid, err) log.Printf("[%s] error reading the request body. %+v\n", rid, err)
@ -375,10 +397,11 @@ func reloadHooks(hooksFilePath string) {
log.Printf("attempting to reload hooks from %s\n", hooksFilePath) log.Printf("attempting to reload hooks from %s\n", hooksFilePath)
err := hooksInFile.LoadFromFile(hooksFilePath) err := hooksInFile.LoadFromFile(hooksFilePath)
if err != nil { if err != nil {
log.Printf("couldn't load hooks from file! %+v\n", err) log.Printf("couldn't load hooks from file! %+v\n", err)
} else { return
}
seenHooksIds := make(map[string]bool) seenHooksIds := make(map[string]bool)
log.Printf("found %d hook(s) in file\n", len(hooksInFile)) log.Printf("found %d hook(s) in file\n", len(hooksInFile))
@ -400,11 +423,30 @@ func reloadHooks(hooksFilePath string) {
} }
seenHooksIds[hook.ID] = true seenHooksIds[hook.ID] = true
log.Printf("\tloaded: %s\n", hook.ID) msg := fmt.Sprintf("\tloaded: %s", hook.ID)
// initialize or update concurrency map
switch {
case hook.MaxConcurrency == 0:
if _, ok := hookExecutions[hook.ID]; ok {
delete(hookExecutions, hook.ID)
}
msg = fmt.Sprintf("%s", msg)
case hook.MaxConcurrency > 0:
hookExecutions[hook.ID] = make(chan struct{}, hook.MaxConcurrency)
msg = fmt.Sprintf("%s (max: %d)", msg, hook.MaxConcurrency)
}
log.Println(msg)
}
// clean up hookExecutions channels for removed hooks
for _, loadedHook := range loadedHooksFromFiles[hooksFilePath] {
if _, ok := seenHooksIds[loadedHook.ID]; !ok {
delete(hookExecutions, loadedHook.ID)
}
} }
loadedHooksFromFiles[hooksFilePath] = hooksInFile loadedHooksFromFiles[hooksFilePath] = hooksInFile
}
} }
func reloadAllHooks() { func reloadAllHooks() {
@ -416,6 +458,9 @@ func reloadAllHooks() {
func removeHooks(hooksFilePath string) { func removeHooks(hooksFilePath string) {
for _, hook := range loadedHooksFromFiles[hooksFilePath] { for _, hook := range loadedHooksFromFiles[hooksFilePath] {
log.Printf("\tremoving: %s\n", hook.ID) log.Printf("\tremoving: %s\n", hook.ID)
if _, ok := hookExecutions[hook.ID]; ok {
delete(hookExecutions, hook.ID)
}
} }
newHooksFiles := hooksFiles[:0] newHooksFiles := hooksFiles[:0]