Go的高并发处理实例

By | 2018年5月2日

使用场景

不需要实时返回结果的处理,通常是异步的操作或者用户无感知的操作,例如:数据打点,文件上传等

代码来源

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

https://www.cnblogs.com/woodzcl/p/7602530.html

优化代码

main.go

package main

import (
	"fmt"
	"jobtest/tasks"
	"net/http"
	"os"
)

type Test struct {
	Name string
}

func (t *Test) Exec() error {
	fmt.Println("Here is:", t.Name)
	return nil
}

type Test2 struct {
	Name string
}

func (t *Test2) Exec() error {
	fmt.Println("Here is another:", t.Name)
	return nil
}

func entry(res http.ResponseWriter, req *http.Request) {
	var work tasks.Job
	// fetch job
	t := Test{Name: "Coeus"}
	work = &t
	tasks.JobQueue <- work

	t2 := Test2{Name: "Coeus"}
	work = &t2
	tasks.JobQueue <- work
	fmt.Fprintf(res, "Hello World ...again")
}

func main() {
	Port := "8086"
	IsHttp := true
	arg_num := len(os.Args)
	if 2 <= arg_num {
		Port = os.Args[1]
	}
	if 3 <= arg_num {
		if os.Args[2] == "true" {
			IsHttp = true
		} else {
			IsHttp = false
		}
	}
	fmt.Printf("server is http %t\n", IsHttp)
	fmt.Println("server listens at ", Port)

	http.HandleFunc("/", entry)

	var err error
	if IsHttp {
		err = http.ListenAndServe(":"+Port, nil)
	} else {
		err = http.ListenAndServeTLS(":"+Port, "server.crt", "server.key", nil)
	}
	if err != nil {
		fmt.Println("Server failure /// ", err)
	}

	fmt.Println("quit")
}

tasks/define.go

package tasks

import "runtime"

var (
	MaxWorker = runtime.NumCPU()
	MaxQueue  = 512
)

type Job interface {
	Exec() error
}

var JobQueue chan Job

tasks/worker.go

package tasks

import "fmt"

type Worker struct {
	WorkerPool chan chan Job
	JobChannel chan Job
	Quit       chan bool
}

func NewWorker(workPool chan chan Job) Worker {
	return Worker{
		WorkerPool: workPool,
		JobChannel: make(chan Job),
		Quit:       make(chan bool),
	}
}

func (w Worker) Start() {
	go func() {
		for {
			w.WorkerPool <- w.JobChannel
			select {
			case job := <-w.JobChannel:
				if err := job.Exec(); err != nil {
					fmt.Printf("excute job failed with err: %v", err)
				}
			case <-w.Quit:
				return
			}
		}
	}()
}

func (w Worker) Stop() {
	go func() {
		w.Quit <- true
	}()
}

tasks/dispatcher.go

package tasks

import "runtime"

type Dispatcher struct {
	MaxWorkers int
	WorkerPool chan chan Job
	Quit       chan bool
}

func init() {
	runtime.GOMAXPROCS(MaxWorker)
	JobQueue = make(chan Job, MaxQueue)
	dispatcher := NewDispatcher(MaxWorker)
	dispatcher.Run()
}

func NewDispatcher(maxWorkers int) *Dispatcher {
	pool := make(chan chan Job, maxWorkers)
	return &Dispatcher{MaxWorkers: maxWorkers, WorkerPool: pool, Quit: make(chan bool)}
}

func (d *Dispatcher) Run() {
	for i := 0; i < d.MaxWorkers; i++ {
		worker := NewWorker(d.WorkerPool)
		worker.Start()
	}

	go d.Dispatch()
}

func (d *Dispatcher) Stop() {
	go func() {
		d.Quit <- true
	}()
}

func (d *Dispatcher) Dispatch() {
	for {
		select {
		case job := <-JobQueue:
			go func(job Job) {
				jobChannel := <-d.WorkerPool
				jobChannel <- job
			}(job)
		case <-d.Quit:
			return
		}
	}
}

如main.go中所示,你只需实现接口Job的Exec方法,就可以执行任意你要做的事情了

发表评论

电子邮件地址不会被公开。 必填项已用*标注

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据