步骤介绍

步骤类型

GoBatch支持三种类型的步骤:

  1. SimpleStep(简单步骤)

    • 在单个线程中执行一个任务
    • 适用于简单的处理逻辑
    • 通过Handler或Task接口实现具体业务逻辑
  2. ChunkStep(分块步骤)

    • 以块为单位处理数据
    • 实现"读取-处理-写入"模式
    • 支持事务管理
    • 主要组件:
      • ItemReader:数据读取
      • ItemProcessor:数据处理
      • ItemWriter:数据写入
  3. PartitionStep(分区步骤)

    • 支持并行处理
    • 将大任务拆分为多个子任务
    • 可以聚合子任务的处理结果
    • 主要组件:
      • Partitioner:任务分区
      • Aggregator:结果聚合

编写简单步骤

有多种方法编写简单步骤的逻辑,如下:

// 1. write a function with one of the following signature
func(execution *StepExecution) BatchError
func(execution *StepExecution)
func() error
func()

// 2. implement the Handler interface
type Handler interface {
    Handle(execution *StepExecution) BatchError
}

当你使用以上函数定义或接口定义编写好了业务逻辑,则可以通过以下方式构造Step对象:

step1 := gobatch.NewStep("step1").Handler(myfunction).Build()
step2 := gobatch.NewStep("step2").Handler(myHandler).Build()
//or
step1 := gobatch.NewStep("step1", myfunction).Build()
step2 := gobatch.NewStep("step2", myHandler).Build()

编写分块步骤

分块步骤需要实现以下3个接口(其中,只有Reader是必须实现的):

type Reader interface {
    //Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
    Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Processor interface {
    //Process process an item from reader and return a result item
    Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
type Writer interface {
    //Write write items generated by processor in a chunk
    Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}

框架还包含一个ItemReader接口,在某些情况下,可以用于代替Reader,其定义如下:

type ItemReader interface {
    //ReadKeys read all keys of some kind of data
    ReadKeys() ([]interface{}, error)
    //ReadItem read value by one key from ReadKeys result
    ReadItem(key interface{}) (interface{}, error)
}

为了方便起见,可以通过实现以下接口,在Reader或Writer中执行一些初始化或清理的动作:

type OpenCloser interface {
    Open(execution *StepExecution) BatchError
    Close(execution *StepExecution) BatchError
}

示例代码可以参考 test/example2

编写分区步骤

分区步骤必须要实现Partitioner接口,该接口用于将整个步骤要处理的数据分成多个分区,每个分区对应一个子步骤,框架会启动多个线程来并行执行多个子步骤。如果需要对子步骤的执行结果进行合并,还需要实现Aggregator接口。这两个接口定义如下:

type Partitioner interface {
    //Partition generate sub step executions from specified step execution and partitions count
    Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
    //GetPartitionNames generate sub step names from specified step execution and partitions count
    GetPartitionNames(execution *StepExecution, partitions uint) []string
}

type Aggregator interface {
    //Aggregate aggregate result from all sub step executions
    Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}

对于分区步骤的子步骤来说,既可以是一个简单步骤(由Handler定义),也可以是一个分块步骤(通过Reader/Processor/Writer定义)。 如果已有了一个包含ItemReader的分块步骤,则可以通过指定分区数量就可以构造分区步骤,如下:

    step := gobatch.NewStep("partition_step").Handler(&ChunkHandler{db}).Partitions(10).Build()

这种方式是由GoBatch框架内部基于ItemReader实现了Partitioner。

文件处理

GoBatch提供了内置的文件处理组件,支持:

  • 多种文件格式(CSV、TSV等)
  • 文件编码设置
  • 自动字段映射
  • 文件校验

读取文件

我们假定有一个文件的内容如下(其中每行是一条记录,每个字段用'\t'分隔):

trade_1    account_1    cash    1000    normal    2022-02-27 12:12:12
trade_2    account_2    cash    1000    normal    2022-02-27 12:12:12
trade_3    account_3    cash    1000    normal    2022-02-27 12:12:12
……

如果想读取该文件的内容,并将文件中每条记录插入到数据库中的 t_trade 表中,则可以通过以下方式来实现:

type Trade struct {
    TradeNo   string    `order:"0"`
    AccountNo string    `order:"1"`
    Type      string    `order:"2"`
    Amount    float64   `order:"3"`
    TradeTime time.Time `order:"5"`
    Status    string    `order:"4"`
}

var tradeFile = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade.data",
    Type:          file.TSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}

type TradeWriter struct {
    db *gorm.DB
}

func (p *TradeWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
    models := make([]*Trade, len(items))
    for i, item := range items {
        models[i] = item.(*Trade)
    }
    e := p.db.Table("t_trade").Create(models).Error
    if e != nil {
        return gobatch.NewBatchError(gobatch.ErrCodeDbFail, "save trade into db err", e)
    }
    return nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_import").ReadFile(tradeFile).Writer(&TradeWriter{db}).Partitions(10).Build()
    //...
    job := gobatch.NewJob("my_job").Step(...,step,...).Build()
    gobatch.Register(job)
    gobatch.Start(context.Background(), job.Name(), "{\"date\":\"20220202\"}")
}

写入文件

再假定我们需要将 t_trade 表中的数据导出为一个csv文件,可以按照以下方式来实现:

type Trade struct {
    TradeNo   string    `order:"0" header:"trade_no"`
    AccountNo string    `order:"1" header:"account_no"`
    Type      string    `order:"2" header:"type"`
    Amount    float64   `order:"3" header:"amount"`
    TradeTime time.Time `order:"5" header:"trade_time" format:"2006-01-02_15:04:05"`
    Status    string    `order:"4" header:"trade_no"`
}

var tradeFileCsv = file.FileObjectModel{
    FileStore:     &file.LocalFileSystem{},
    FileName:      "/data/{date,yyyy-MM-dd}/trade_export.csv",
    Type:          file.CSV,
    Encoding:      "utf-8",
    ItemPrototype: &Trade{},
}


type TradeReader struct {
    db *gorm.DB
}

func (h *TradeReader) ReadKeys() ([]interface{}, error) {
    var ids []int64
    h.db.Table("t_trade").Select("id").Find(&ids)
    var result []interface{}
    for _, id := range ids {
        result = append(result, id)
    }
    return result, nil
}

func (h *TradeReader) ReadItem(key interface{}) (interface{}, error) {
    id := int64(0)
    switch r := key.(type) {
    case int64:
        id = r
    case float64:
        id = int64(r)
    default:
        return nil, fmt.Errorf("key type error, type:%T, value:%v", key, key)
    }
    trade := &Trade{}
    result := h.db.Table("t_trade").Find(trade, "id = ?", id)
    if result.Error != nil {
        return nil, result.Error
    }
    return trade, nil
}

func buildAndRunJob() {
    //...
    step := gobatch.NewStep("trade_export").Reader(&TradeReader{db}).WriteFile(tradeFileCsv).Partitions(10).Build()
    //...
}

生命周期监听

步骤执行过程中的各个阶段都可以通过监听器进行干预:

// 步骤监听器
type StepListener interface {
    BeforeStep(execution *StepExecution) BatchError
    AfterStep(execution *StepExecution) BatchError
}

// 块处理监听器
type ChunkListener interface {
    BeforeChunk(context *ChunkContext) BatchError
    AfterChunk(context *ChunkContext) BatchError
    OnError(context *ChunkContext, err BatchError)
}

// 分区监听器
type PartitionListener interface {
    BeforePartition(execution *StepExecution) BatchError
    AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
    OnError(execution *StepExecution, err BatchError)
}

执行流程

简单步骤执行

简单步骤直接在当前Job线程中执行Handler中的业务逻辑。适合业务逻辑简单的场景。

分块处理步骤执行

在单线程中执行以下流程:

  1. 启动事务
  2. 读取指定大小的数据块
  3. 处理数据
  4. 写入结果
  5. 提交或回滚事务
  6. 重复以上步骤直到处理完成

分区步骤执行

  1. 使用Partitioner进行数据分区
  2. 为每个分区生成子步骤
  3. 并行执行子步骤
  4. 等待所有子步骤完成
  5. 使用Aggregator汇总结果

如何编写Step请参考示例1示例2

results matching ""

    No results matching ""