项目作者: shomali11

项目描述 :
Simplifies the parallelization of function calls.
高级语言: Go
项目地址: git://github.com/shomali11/parallelizer.git
创建时间: 2017-06-24T23:51:23Z
项目社区:https://github.com/shomali11/parallelizer

开源协议:MIT License

下载


parallelizer Build Status Go Report Card GoDoc License: MIT

Simplifies creating a pool of workers that execute jobs in parallel

Features

  • Easy to use
  • Context Support
  • Fail fast with errors
  • Customizable Pool Size
    • Default number of workers is 10
  • Customizable Job Queue Size
    • Default size is 100

Examples

Example 1

Running multiple function calls in parallel without a timeout.

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. )
  6. func main() {
  7. group := parallelizer.NewGroup()
  8. defer group.Close()
  9. group.Add(func() error {
  10. for char := 'a'; char < 'a'+3; char++ {
  11. fmt.Printf("%c ", char)
  12. }
  13. return nil
  14. })
  15. group.Add(func() error {
  16. for number := 1; number < 4; number++ {
  17. fmt.Printf("%d ", number)
  18. }
  19. return nil
  20. })
  21. err := group.Wait()
  22. fmt.Println()
  23. fmt.Println("Done")
  24. fmt.Printf("Error: %v", err)
  25. }

Output:

  1. a 1 b 2 c 3
  2. Done
  3. Error: <nil>

Example 2

Running multiple slow function calls in parallel with a context with a short timeout.
Note: The timeout will not kill the routines. It will just stop waiting for them to finish

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/shomali11/parallelizer"
  6. "time"
  7. )
  8. func main() {
  9. group := parallelizer.NewGroup()
  10. defer group.Close()
  11. group.Add(func() error {
  12. time.Sleep(2 * time.Second)
  13. fmt.Println("Finished work 1")
  14. return nil
  15. })
  16. group.Add(func() error {
  17. time.Sleep(2 * time.Second)
  18. fmt.Println("Finished work 2")
  19. return nil
  20. })
  21. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  22. defer cancel()
  23. err := group.Wait(parallelizer.WithContext(ctx))
  24. fmt.Println("Done")
  25. fmt.Printf("Error: %v", err)
  26. fmt.Println()
  27. time.Sleep(2 * time.Second)
  28. }

Output:

  1. Done
  2. Error: context deadline exceeded
  3. Finished work 2
  4. Finished work 1

Example 3

Running multiple function calls in parallel with a large enough worker pool.

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. )
  6. func main() {
  7. group := parallelizer.NewGroup(parallelizer.WithPoolSize(10))
  8. defer group.Close()
  9. for i := 1; i <= 10; i++ {
  10. i := i
  11. group.Add(func() error {
  12. fmt.Print(i, " ")
  13. return nil
  14. })
  15. }
  16. err := group.Wait()
  17. fmt.Println()
  18. fmt.Println("Done")
  19. fmt.Printf("Error: %v", err)
  20. }

Output:

  1. 7 6 3 2 8 9 5 10 1 4
  2. Done
  3. Error: <nil>

Example 4

Running multiple function calls with 1 worker. Note: the functions are no longer executed in parallel but sequentially

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. )
  6. func main() {
  7. group := parallelizer.NewGroup(parallelizer.WithPoolSize(1))
  8. defer group.Close()
  9. for i := 1; i <= 10; i++ {
  10. i := i
  11. group.Add(func() error {
  12. fmt.Print(i, " ")
  13. return nil
  14. })
  15. }
  16. err := group.Wait()
  17. fmt.Println()
  18. fmt.Println("Done")
  19. fmt.Printf("Error: %v", err)
  20. }

Output:

  1. 1 2 3 4 5 6 7 8 9 10
  2. Done
  3. Error: <nil>

Example 5

Running multiple function calls in parallel with a small worker pool and job queue size. Note: the Add call blocks until there is space to push into the Job Queue

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. "time"
  6. )
  7. func main() {
  8. group := parallelizer.NewGroup(parallelizer.WithPoolSize(1), parallelizer.WithJobQueueSize(1))
  9. defer group.Close()
  10. for i := 1; i <= 10; i++ {
  11. group.Add(func() error {
  12. time.Sleep(time.Second)
  13. return nil
  14. })
  15. fmt.Println("Job added at", time.Now().Format("04:05"))
  16. }
  17. err := group.Wait()
  18. fmt.Println()
  19. fmt.Println("Done")
  20. fmt.Printf("Error: %v", err)
  21. }

Output:

  1. Job added at 00:12
  2. Job added at 00:13
  3. Job added at 00:14
  4. Job added at 00:15
  5. Job added at 00:16
  6. Job added at 00:17
  7. Job added at 00:18
  8. Job added at 00:19
  9. Job added at 00:20
  10. Job added at 00:21
  11. Done
  12. Error: <nil>

Example 6

Running multiple function calls in parallel with a large enough worker pool and job queue size. Note: In here the Add calls did not block because there was plenty of space in the Job Queue

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. "time"
  6. )
  7. func main() {
  8. group := parallelizer.NewGroup(parallelizer.WithPoolSize(10), parallelizer.WithJobQueueSize(10))
  9. defer group.Close()
  10. for i := 1; i <= 10; i++ {
  11. group.Add(func() error {
  12. time.Sleep(time.Second)
  13. return nil
  14. })
  15. fmt.Println("Job added at", time.Now().Format("04:05"))
  16. }
  17. err := group.Wait()
  18. fmt.Println()
  19. fmt.Println("Done")
  20. fmt.Printf("Error: %v", err)
  21. }

Output:

  1. Job added at 00:30
  2. Job added at 00:30
  3. Job added at 00:30
  4. Job added at 00:30
  5. Job added at 00:30
  6. Job added at 00:30
  7. Job added at 00:30
  8. Job added at 00:30
  9. Job added at 00:30
  10. Job added at 00:30
  11. Done
  12. Error: <nil>

Example 7

Showing an example without calling Wait

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. "time"
  6. )
  7. func main() {
  8. group := parallelizer.NewGroup()
  9. defer group.Close()
  10. group.Add(func() error {
  11. fmt.Println("Finished work")
  12. return nil
  13. })
  14. fmt.Println("We did not wait!")
  15. time.Sleep(time.Second)
  16. }

Output:

  1. We did not wait!
  2. Finished work

Example 8

Showing an example with a mixture of Add and Wait calls.

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/shomali11/parallelizer"
  5. )
  6. func main() {
  7. group := parallelizer.NewGroup()
  8. defer group.Close()
  9. group.Add(func() error {
  10. fmt.Println("Worker 1")
  11. return nil
  12. })
  13. group.Add(func() error {
  14. fmt.Println("Worker 2")
  15. return nil
  16. })
  17. fmt.Println("Waiting for workers 1 and 2 to finish")
  18. group.Wait()
  19. fmt.Println("Workers 1 and 2 have finished")
  20. group.Add(func() error {
  21. fmt.Println("Worker 3")
  22. return nil
  23. })
  24. fmt.Println("Waiting for worker 3 to finish")
  25. group.Wait()
  26. fmt.Println("Worker 3 has finished")
  27. }

Output:

  1. Waiting for workers 1 and 2 to finish
  2. Worker 1
  3. Worker 2
  4. Workers 1 and 2 have finished
  5. Waiting for worker 3 to finish
  6. Worker 3
  7. Worker 3 has finished

Example 9

Showing an example with a failed task.

  1. package main
  2. import (
  3. "errors"
  4. "fmt"
  5. "time"
  6. "github.com/shomali11/parallelizer"
  7. )
  8. func main() {
  9. group := parallelizer.NewGroup()
  10. defer group.Close()
  11. group.Add(func() error {
  12. return errors.New("something went wrong")
  13. })
  14. group.Add(func() error {
  15. time.Sleep(10 * time.Second)
  16. return nil
  17. })
  18. err := group.Wait()
  19. fmt.Println()
  20. fmt.Println("Done")
  21. fmt.Printf("Error: %v", err)
  22. }

Output:

  1. Done
  2. Error: something went wrong