emahiro/b.log

Drastically Repeat Yourself !!!!

BigQuery のクライアントを Go で実装する

Overview

BigQuery のクライアントを Go で実装する手順とハマったことについてまとめます。

基本的には 公式ドキュメント に記載されている内容そのままです。

cloud.google.com

Go で BigQuery のクライアントを実装する

公式ドキュメントのとおりであれば以下

ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
        return fmt.Errorf("bigquery.NewClient: %v", err)
}
defer client.Close()

q := client.Query(...)
job := q.Run(ctx)

itr, err := job.Read(ctx)
for {
        var row []bigquery.Value
         err := it.Next(&row)
        if err == iterator.Done {
                break
        }
        if err != nil {
                return err
        }
        fmt.Fprintln(w, row)
}

独自の HTTP Client を使いたいときはクライアントの生成のときに Option に差し込みます。

hcl := custom.NewHTTPClient()
ctx := context.Background()
client, err := bigquery.NewClient(ctx, projectID, option.WithHTTPClient(hcl))
if err != nil {
        return fmt.Errorf("bigquery.NewClient: %v", err)
}
defer client.Close()

ここまではドキュメントどおりに実装すれば普通に動く実装です。

以下実装時に気づいたことまとめます。

BigQuery のライブラリは 2種類ある

BigQuery に限りませんが、GCP をサービスを利用するときには利用する Google のライブラリの種類を見る必要があります。多くの場合(ドキュメントのサンプルコードでも)は cloud.google.com/go/... というライブラリを使うことになります。
ただ実は他にも google.golang.org/api/... というライブラリもあり、一見するとどっちを使うべきなのか迷いますが cloud.google.com/go/... のライブラリは google.golang.org/api/... をより使いやすくしたライブラリでもあるので cloud 接頭辞の方のライブラリを使えばよいです。

例えば今回の BigQuery のケースだと、 cloud.google.com/go/bigquery の Query は iterator 型を採用してるので、クエリの結果が終わるまで値を取得することができますが、 google.golang.org/api/bigquery にはありません。一回のクエリのレスポンスはライブラリの側で最大 10MB に制限されてしまい、正しくすべてのクエリ結果を返すことができないシグネチャになっています。

// MaxResults: [Optional] The maximum number of rows of data to return
// per page of results. Setting this flag to a small value such as 1000
// and then paging through results might improve reliability when the
// query result set is large. In addition to this limit, responses are
// also limited to 10 MB. By default, there is no maximum row count, and
// only the byte limit applies.

BigQuery の型はそれぞれ Go で対応する形が決まっている

Next メソッドの GoDoc を見るとわかるのですが、Next メソッドには 1レコードを Struct で表現することでクエリの結果を自動で Struct にマッピングしてくれる という振る舞いがあります。
詳細は上記のメソッドの Example 参照。

ただしこの Struct にマッピングするときは実際の BigQuery の Schema がどういう Schema であるかによって対応する型が異なります。
例えば BigQuery の Schema が Nullable な String のとき Struct に string を割り当てても上手く以下ないことがあります(文字列が入ってくるときは問題ありませんが、カラムが NULL のときにエラーになる)

このため、Nullable なカラムのスキーマの事も考え、BigQuery の対応する Type の方を Struct に割り当てておくと安全です。対応表は上記 Next メソッドの GoDoc に記載してあります。

STRING      string
BOOL        bool
INTEGER     int, int8, int16, int32, int64, uint8, uint16, uint32
FLOAT       float32, float64
BYTES       []byte
TIMESTAMP   time.Time
DATE        civil.Date
TIME        civil.Time
DATETIME    civil.DateTime
NUMERIC     *big.Rat
BIGNUMERIC  *big.Rat

ref: https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.59.0/bigquery/iterator.go#L124-L137

具体的には以下のような Struct を定義することになります。

type Record struct {
    UserID int64 // Null が許容されていない Number 型のカラム
    HogeID bigquery.NullInt64 // Nullable な Number 型のカラム 
}

q := client.Query("select userID, HogeID from $dataset ...")
job := q.Run(ctx)


itr, err := job.Read(ctx)
for {
        record := Record{}
        err := it.Next(&record)
        if err == iterator.Done {
                break
        }
        if err != nil {
                return err
        }
        fmt.Fprintln(w, row)
}

カラム名とフィールド名は一致させる必要がある

最後にこれにハマりました。 Next メソッドの Godoc を読むと書いてあるのですが、

  1. struct にマッピングさせるときに Struct のフィールド名は Query で Select するカラム名と一致してる必要がある(tag 等での縛りがないので)
  2. struct のフィールド名と Query で Select するカラム名が異なっているときは parse は失敗してフィールドは空の状態になる。

書いてあるのは以下の部分

If dst is pointer to a struct, each column in the schema will be matched with an exported field of the struct that has the same name, ignoring case. Unmatched schema columns and struct fields will be ignored.

ref: https://pkg.go.dev/cloud.google.com/go/bigquery#RowIterator.Next

つまり

  • mapping 対象(dst) の struct はテーブルのスキーマと同じ名前のフィールドを保つ必要がある。
  • フィールドは公開状態にしておく(先頭は大文字)
  • マッチしないケース(ex struct のフィールドが UserID で BQ のテーブルスキーマカラム名が user_id などのとき)にフィールドの値は ignore される。

ということになります。

もしスキーマがずれるケースはクエリを書くときにエイリアスを当てる必要があります。サンプルは以下。

clinet.Query("select user_id as UserID ... from $dataset"

余談: ValueLoader を実装すると独自型を BigQuery の型を parse することができる

bigquery.Value 型は ValueLoader という interface を実装しており、これで Next メソッドの中で自動で与えられた変数(or struct) にクエリの結果をマッピングできるようになります。

ref: https://pkg.go.dev/cloud.google.com/go/bigquery#ValueLoader

これの interface を実装した type は Next メソッドの中で bigquery.Value を parse できるようになり、以下のような書換えが可能になります。

before: もともとは struct をマッピングする実装だったが、返却されるのが []int64

type row struct {
    userID int64
}

var userIDs []int64
q := client.Query("select userID from $dataset ...")
job := q.Run(ctx)
itr, err := job.Read(ctx)
for {
        r := row{}
        err := it.Next(&r)
        if err == iterator.Done {
                break
        }
        if err != nil {
                return err
        }
        userIDs = append(userIDs, r.UserID)
}

可能であれば struct にマッピングせずに int64 の配列をそのままほしいところではあるが、そのままの int64 は ValueLoader を実装していない。

after: 新しい Slice 型を独自型として定義して Next にわたす。

type (
    Int64Slice []*int64
)

func (sl *Int64Slice) Load(v []bigquery.Value, _ bigquery.Schema) error {
    for _, vv := range v {
        switch t := vv.(type) {
        case int64:
            *sl = append(*sl, &t)
        default:
            // ignore if value type is not int64.
            *sl = append(*sl, nil)
        }
    }
    return nil
}


var userIDs []int64
q := client.Query("select userID from $dataset ...")
job := q.Run(ctx)
itr, err := job.Read(ctx)
for {
        var s int64Slice
        err := it.Next(&s)
        if err == iterator.Done {
                break
        }
        if err != nil {
                return err
        }
        userIDs = append(userIDs, s[0])
}

まぁ書いててどちらも大差ないですが、独自型を使えるということの説明をしたかった、ということです...笑

余談2: Generics を使って汎用的な wrapper メソッドを作る

Generics を使った Query メソッドの wrapper が作れるなと思ったので簡単にまとめてみました。

type queryOption func(b *bigquery.Query) *bigquery.Query

func QueryAll[T any](ctx context.Context, queryStr string, opts ...queryOption) ([]T, error) {
    q := bq.Query(queryStr)
    for _, opt := range opts {
        q = opt(q)
    }
    job, err := q.Run(ctx)
    if err != nil {
        return nil, err
    }
    itr, err := job.Read(ctx)
    if err != nil {
        return nil, err
    }

    var rows []T
    for {
        var row T
        err := itr.Next(&row)
        if err == iterator.Done {
            break
        }
        if err != nil {
            return nil, err
        }
        rows = append(rows, row)
    }
    return rows, nil
}

使うときは以下のように使います。

type row struct {
    userID bigquery.NullString
}

q := "select userID from $dataset ..."
rows, err := bigquery.QueryAll[row](ctx, q)

独自型を定義したときは以下です。

q := "select userID from $dataset ...."
rows, err := bigquery.QueryAll[bigquery.Int64Slice](ctx, q)

これもわざわざ Generics で抽象化する必要もないかなとも居ますが、冗長な実装を何度も書くことないのと Generics を使えそうというフィードバックをもらったので試してみました。
特に T には何型が入ってもよく、呼び出し元で record を表現した struct とフィールド名をエイリアスにしたカラム名を使う Select 文を考えるのみでよいでの若干コードはスッキリするかなと思います。