Go1.8实现watchdog功能,实现依赖启动服务程序

package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "os/signal"
    "os/user"
    "path"
    "strconv"
    "strings"
    "sync"
    "syscall"
    "time"

    "github.com/czxichen/command/watchdog"
    conf "github.com/dlintw/goconf"
)

//+build windows,linux

const logDir = "./watchdog"

func newProc(svc *Service,null,pw *os.File) *os.ProcAttr {
    return &os.ProcAttr{Dir: svc.path,Files: []*os.File{null,pw,pw}}
}

func setPriority(pid,priority uintptr) syscall.Errno {
    return 0
}

var (
    logpath    = flag.String("log_path","","Specify log path")
    configFile = flag.String("config","watchdog.ini","Watchdog configuration file")
)

func cfgOpt(cfg *conf.ConfigFile,section,option string) string {
    if !cfg.HasOption(section,option) {
        return ""
    }
    s,err := cfg.GetString(section,option)
    if err != nil {
        log.Fatalf("Failed to get %s for %s: %v",option,err)
    }
    return s
}

func svcOpt(cfg *conf.ConfigFile,service,option string,required bool) string {
    opt := cfgOpt(cfg,option)
    if opt == "" && required {
        log.Fatalf("Service %s has missing %s option",option)
    }
    return opt
}

var signalNames = map[syscall.Signal]string{
    syscall.SIGINT:  "SIGINT",syscall.SIGQUIT: "SIGQUIT",syscall.SIGTERM: "SIGTERM",}

func signalName(s syscall.Signal) string {
    if name,ok := signalNames[s]; ok {
        return name
    }
    return fmt.Sprintf("SIG %d",s)
}

type Shutdowner interface {
    Shutdown()
}

func shutdownHandler(server Shutdowner) {
    sigc := make(chan os.Signal, 3)
    signal.Notify(sigc,syscall.SIGINT,syscall.SIGQUIT,syscall.SIGTERM)
    go func() {
        for s := range sigc {
            name := s.String()
            if sig,ok := s.(syscall.Signal); ok {
                name = signalName(sig)
            }
            log.Printf("Received %v,initiating shutdown...",name)
            server.Shutdown()
        }
    }()
}

var (
    restartDelay      = 2 * time.Second
    restartBackoff    = 5 * time.Second
    restartBackoffMax = 60 * time.Second
)

type Watchdog struct {
    services map[string]*Service
    shutdown chan bool
}

func NewWatchdog() *Watchdog {
    return &Watchdog{
        services: make(map[string]*Service),shutdown: make(chan bool),}
}

//关闭服务
func (w *Watchdog) Shutdown() {
    select {
    case w.shutdown <- true:
    default:
    }
}

//添加服务,如果存在
func (w *Watchdog) AddService(name,binary string) (*Service,error) {
    if _,ok := w.services[name]; ok {
        return nil,fmt.Errorf("Service %q already exists",name)
    }

    svc := newService(name,binary)
    w.services[name] = svc

    return svc,nil
}

//启动服务
func (w *Watchdog) Walk() {
    log.Printf("Seesaw watchdog starting...")

    w.mapDependencies()

    for _,svc := range w.services {
        go svc.run()
    }
    <-w.shutdown
    for _,svc := range w.services {
        go svc.stop()
    }
    for _,svc := range w.services {
        stopped := <-svc.stopped
        svc.stopped <- stopped
    }
}

//设置依赖关系
func (w *Watchdog) mapDependencies() {
    for name := range w.services {
        svc := w.services[name]
        for depName := range svc.dependencies {
            dep,ok := w.services[depName]
            if !ok {
                log.Fatalf("Failed to find dependency %q for service %q",depName,name)
            }
            svc.dependencies[depName] = dep //依赖谁,依赖启动后才会启动自身
            dep.dependents[svc.name] = svc  //谁依赖它,依赖它的服务退出后,才退出本身
        }
    }
}

//默认的优先级为0
const prioProcess = 0

//定义服务的类型.
type Service struct {
    name   string
    binary string
    path   string
    args   []string

    uid      uint32
    gid      uint32
    priority int

    dependencies map[string]*Service
    dependents   map[string]*Service

    termTimeout time.Duration

    lock    sync.Mutex
    process *os.Process

    done     chan bool
    shutdown chan bool
    started  chan bool
    stopped  chan bool

    failures uint64
    restarts uint64

    lastFailure time.Time
    lastRestart time.Time
}

//初始化一个Service.
func newService(name,binary string) *Service {
    return &Service{
        name:         name,binary:       binary,args:         make([]string, 0),dependencies: make(map[string]*Service),dependents:   make(map[string]*Service),done:     make(chan bool),shutdown: make(chan bool, 1),started:  make(chan bool,stopped:  make(chan bool,termTimeout: 5 * time.Second,}
}

//给这个服务添加依赖.
func (svc *Service) AddDependency(name string) {
    svc.dependencies[name] = nil
}

//为服务添加启动参数.
func (svc *Service) AddArgs(args string) {
    svc.args = strings.Fields(args)
}

//为进程设置优先级,Windows下面无效.
func (svc *Service) SetPriority(priority int) error {
    if priority < -20 || priority > 19 {
        return fmt.Errorf("Invalid priority %d - must be between -20 and 19",priority)
    }
    svc.priority = priority
    return nil
}

func (svc *Service) SetTermTimeout(tt time.Duration) {
    svc.termTimeout = tt
}

func (svc *Service) SetUser(username string) error {
    u,err := user.Lookup(username)
    if err != nil {
        return err
    }
    uid,err := strconv.Atoi(u.Uid)
    if err != nil {
        return err
    }
    gid,err := strconv.Atoi(u.Gid)
    if err != nil {
        return err
    }
    svc.uid = uint32(uid)
    svc.gid = uint32(gid)
    return nil
}

func (svc *Service) run() {
    //如果存在依赖,要等依赖全部启动完毕之后才会自动自身.
    for _,dep := range svc.dependencies {
        log.Printf("Service %s waiting for %s to start",svc.name,dep.name)
        select {
        case started := <-dep.started:
            dep.started <- started
        case <-svc.shutdown:
            goto done
        }
    }

    for {
        //如果启动失败,怎等待时间会延长,最大不超过restartBackoffMax时间
        //程序启动必须是阻塞的,不然会重复运行
        if svc.failures > 0 {
            delay := time.Duration(svc.failures) * restartBackoff
            if delay > restartBackoffMax {
                delay = restartBackoffMax
            }
            log.Printf("Service %s has failed %d times - delaying %s before restart",svc.failures,delay)

            select {
            case <-time.After(delay):
            case <-svc.shutdown:
                goto done
            }
        }

        svc.restarts++
        svc.lastRestart = time.Now()
        svc.runOnce()

        select {
        case <-time.After(restartDelay):
        case <-svc.shutdown:
            goto done
        }
    }
done:
    svc.done <- true
}

//为服务创建日志文件
func (svc *Service) logFile() (*os.File,error) {
    logName := svc.name + ".log"

    if err := os.MkdirAll(logDir, 0666); err != nil {
        if !os.IsExist(err) {
            return nil,err
        }
    }
    f,err := os.Create(path.Join(logDir,logName))
    if err != nil {
        return nil,err
    }
    fmt.Fprintf(f,"Log file for %s (stdout/stderr)\n",svc.name)
    fmt.Fprintf(f,"Created at: %s\n",time.Now().Format("2006/01/02 15:04:05"))
    return f,nil
}

//运行程序
func (svc *Service) runOnce() {
    args := make([]string,len(svc.args)+1)
    args[0] = svc.name
    copy(args[1:],svc.args)

    fmt.Println("Args:",args)
    null,err := os.Open(os.DevNull)
    if err != nil {
        log.Printf("Service %s - failed to open %s: %v",os.DevNull,err)
        return
    }

    lfile,err := svc.logFile()
    if err != nil {
        log.Printf("Service %s - failed to create log file: %v",err)
        null.Close()
        return
    }

    attr := newProc(svc,lfile)

    log.Printf("Starting service %s...",svc.name)
    proc,err := os.StartProcess(svc.binary,args,attr)
    if err != nil {
        log.Printf("Service %s failed to start: %v",err)
        svc.lastFailure = time.Now()
        svc.failures++
        null.Close()
        return
    }

    null.Close()
    lfile.Close()
    svc.lock.Lock()
    svc.process = proc
    svc.lock.Unlock()

    if err := setPriority(uintptr(proc.Pid),uintptr(svc.priority)); err != 0 {
        log.Printf("Failed to set priority to %d for service %s: %v",svc.priority,err)
    }
    select {
    case svc.started <- true:
    default:
    }

    state,err := svc.process.Wait()
    if err != nil {
        log.Printf("Service %s wait failed with %v",err)
        svc.lastFailure = time.Now()
        svc.failures++
        return
    }
    if !state.Success() {
        log.Printf("Service %s exited with %v",state)
        svc.lastFailure = time.Now()
        svc.failures++
        return
    }

    svc.failures = 0
    log.Printf("Service %s exited normally.",svc.name)
}

//给进程发送信号
func (svc *Service) signal(sig os.Signal) error {
    svc.lock.Lock()
    defer svc.lock.Unlock()
    if svc.process == nil {
        return nil
    }
    return svc.process.Signal(sig)
}

//停止服务
func (svc *Service) stop() {
    log.Printf("Stopping service %s...",svc.name)
    //等待依赖它的进程退出完毕之后再退出自己.
    for _,dep := range svc.dependents {
        log.Printf("Service %s waiting for %s to stop",dep.name)
        stopped := <-dep.stopped
        dep.stopped <- stopped
    }

    svc.shutdown <- true
    //首先给进程发送退出信号,如果超时没有退出,则直接发送Kill信号.
    svc.signal(syscall.SIGTERM)
    select {
    case <-svc.done:
    case <-time.After(svc.termTimeout):
        svc.signal(syscall.SIGKILL)
        <-svc.done
    }
    log.Printf("Service %s stopped",svc.name)
    svc.stopped <- true
}

func main() {
    flag.Parse()
    if *logpath == "" {
        *logpath = os.Args[0] + ".log"
    }
    logFile,err := os.Create(*logpath)
    if err != nil {
        log.Fatalf("Create log file error:%s\n",err.Error())
    }
    defer logFile.Close()
    log.SetOutput(logFile)

    cfg,err := conf.ReadConfigFile(*configFile)
    if err != nil {
        log.Fatalf("Failed to read config file %q: %v",*configFile,err)
    }

    fido := watchdog.NewWatchdog()

    shutdownHandler(fido)
    for _,name := range cfg.GetSections() {
        if name == "default" {
            continue
        }

        binary := svcOpt(cfg,name,"binary",true)
        args := svcOpt(cfg,"args",false)

        svc,err := fido.AddService(name,binary)
        if err != nil {
            log.Fatalf("Failed to add service %q: %v",err)
        }
        svc.AddArgs(args)
        if dep := svcOpt(cfg,"dependency",false); dep != "" {
            svc.AddDependency(dep)
        }
        if opt := svcOpt(cfg,"priority",false); opt != "" {
            prio,err := strconv.Atoi(opt)
            if err != nil {
                log.Fatalf("Service %s has invalid priority %q: %v",opt,err)
            }
            if err := svc.SetPriority(prio); err != nil {
                log.Fatalf("Failed to set priority for service %s: %v",err)
            }
        }
        if opt := svcOpt(cfg,"term_timeout",false); opt != "" {
            tt,err := time.ParseDuration(opt)
            if err != nil {
                log.Fatalf("Service %s has invalid term_timeout %q: %v",err)
            }
            svc.SetTermTimeout(tt)
        }

        if user := svcOpt(cfg,"user",false); user != "" {
            if err := svc.SetUser(user); err != nil {
                log.Fatalf("Failed to set user for service %s: %v",err)
            }
        }
    }

    fido.Walk()
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


什么是设计模式一套被反复使用、多数人知晓的、经过分类编目的、代码 设计经验 的总结;使用设计模式是为了 可重用 代码、让代码 更容易 被他人理解、保证代码 可靠性;设计模式使代码编制  真正工程化;设计模式使软件工程的 基石脉络, 如同大厦的结构一样;并不直接用来完成代码的编写,而是 描述 在各种不同情况下,要怎么解决问题的一种方案;能使不稳定依赖于相对稳定、具体依赖于相对抽象,避免引
单一职责原则定义(Single Responsibility Principle,SRP)一个对象应该只包含 单一的职责,并且该职责被完整地封装在一个类中。Every  Object should have  a single responsibility, and that responsibility should be entirely encapsulated by t
动态代理和CGLib代理分不清吗,看看这篇文章,写的非常好,强烈推荐。原文截图*************************************************************************************************************************原文文本************
适配器模式将一个类的接口转换成客户期望的另一个接口,使得原本接口不兼容的类可以相互合作。
策略模式定义了一系列算法族,并封装在类中,它们之间可以互相替换,此模式让算法的变化独立于使用算法的客户。
设计模式讲的是如何编写可扩展、可维护、可读的高质量代码,它是针对软件开发中经常遇到的一些设计问题,总结出来的一套通用的解决方案。
模板方法模式在一个方法中定义一个算法的骨架,而将一些步骤延迟到子类中,使得子类可以在不改变算法结构的情况下,重新定义算法中的某些步骤。
迭代器模式提供了一种方法,用于遍历集合对象中的元素,而又不暴露其内部的细节。
外观模式又叫门面模式,它提供了一个统一的(高层)接口,用来访问子系统中的一群接口,使得子系统更容易使用。
单例模式(Singleton Design Pattern)保证一个类只能有一个实例,并提供一个全局访问点。
组合模式可以将对象组合成树形结构来表示“整体-部分”的层次结构,使得客户可以用一致的方式处理个别对象和对象组合。
装饰者模式能够更灵活的,动态的给对象添加其它功能,而不需要修改任何现有的底层代码。
观察者模式(Observer Design Pattern)定义了对象之间的一对多依赖,当对象状态改变的时候,所有依赖者都会自动收到通知。
代理模式为对象提供一个代理,来控制对该对象的访问。代理模式在不改变原始类代码的情况下,通过引入代理类来给原始类附加功能。
工厂模式(Factory Design Pattern)可细分为三种,分别是简单工厂,工厂方法和抽象工厂,它们都是为了更好的创建对象。
状态模式允许对象在内部状态改变时,改变它的行为,对象看起来好像改变了它的类。
命令模式将请求封装为对象,能够支持请求的排队执行、记录日志、撤销等功能。
备忘录模式(Memento Pattern)保存一个对象的某个状态,以便在适当的时候恢复对象。备忘录模式属于行为型模式。 基本介绍 **意图:**在不破坏封装性的前提下,捕获一个对象的内部状态,并在该
顾名思义,责任链模式(Chain of Responsibility Pattern)为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。这种类型的设计模式属于行为
享元模式(Flyweight Pattern)(轻量级)(共享元素)主要用于减少创建对象的数量,以减少内存占用和提高性能。这种类型的设计模式属于结构型模式,它提供了减少对象数量从而改善应用所需的对象结