Conversation
main.go
Outdated
|
|
||
| csvWriter.Write([]string{"IntegrationID", "PID"}) | ||
|
|
||
| var wg sync.WaitGroup |
There was a problem hiding this comment.
Not 100% on this, but I feel like it is unusual for this be be declared here and passed into processSQS() as a pointer. Can this just be moved inside the function so it doesn't have to be passed in?
There was a problem hiding this comment.
I was unsure about it as well, however I thought that since processSQS() is called continuously in a loop that it would lead to many waitgroups being made which at best is wasteful and at worst could lead to some threading locks / memory leaks /other unknowns
main.go
Outdated
|
|
||
| // Receive complete messages when nextflow task is done | ||
| // Then clean up input / output files | ||
| go func() { |
There was a problem hiding this comment.
I think because of a gotcha in Go loop variables prior to version 1.22, this function should have a msg types.Message arg like the function above, and msg passed in. https://go.dev/doc/faq#closures_and_goroutines
There was a problem hiding this comment.
Great catch, didn't know about that. Thanks
- Fleshed out stub functions - Created helper functions - fixed some log and error handling logic
| logger.Error(err.Error()) | ||
| os.Exit(1) | ||
| } | ||
| return err |
There was a problem hiding this comment.
I don't think it would ever get here since each conditional does an os.Exit(1)
| return baseDir | ||
| } | ||
|
|
||
| func readFile(baseDir, integrationID string) ([]byte, error) { |
There was a problem hiding this comment.
might want to rename this to something more specific to pid files like readPidFile
| return content, nil | ||
| } | ||
|
|
||
| time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
was this for debugging? or is this something to wait indefinitely for the file to have a value?
| # compute node / workflow manager specific | ||
| subnet_ids = os.environ['SUBNET_IDS'] | ||
| cluster_name = os.environ['CLUSTER_NAME'] | ||
| security_group = os.environ['SECURITY_GROUP_ID'] | ||
| base_dir = os.environ['BASE_DIR'] | ||
| env = os.environ['ENVIRONMENT'] | ||
|
|
||
| # App specific - params? - defaults on app creation, then overriden on run | ||
| # session token retrieval should be on the processor(s) | ||
| pennsieve_host = "" | ||
| pennsieve_host2 = "" | ||
| pennsieve_upload_bucket = "" # agent specific | ||
| pennsieve_agent_home = "/tmp" # agent specific | ||
|
|
||
| if env == "dev": | ||
| pennsieve_host = "https://api.pennsieve.net" | ||
| pennsieve_host2 = "https://api2.pennsieve.net" | ||
| pennsieve_upload_bucket = "pennsieve-dev-uploads-v2-use1" | ||
| else: | ||
| pennsieve_host = "https://api.pennsieve.io" | ||
| pennsieve_host2 = "https://api2.pennsieve.io" | ||
|
|
||
| container_name = "" | ||
| task_definition_name = "" | ||
|
|
||
|
|
||
|
|
There was a problem hiding this comment.
looks like this all got added back by accident
| } | ||
|
|
||
| func ProcessSQS(ctx context.Context, client SQSService, queueUrl string, logger *slog.Logger) (bool, error) { | ||
| var wg sync.WaitGroup |
There was a problem hiding this comment.
what did the wait group end up being needed for?
Looking for feedback on the approach to. cancelling a workflow using channels. Any threading traps I might run into on this environment?
Have a few blank functions that I need to fill in, and create endpoints in further PRs