こんにちは。松本です。
これは TECHSCORE Advent Calendar 2015 の21日目の記事です。
前回から随分と時間が空いてしまいましたね。スズキ編集長に「まだか」と言われつつも、完全に仕事にかまけてさぼってしまいました。ごめんなさい。携わっていたプロジェクト(これとかこれ)が楽しくて仕方がなかったのです・・(本当に)。
さて、Presto のバージョンは既に 0.131 まで上がったようですが、本記事では引き続き 0.96 でのコネクター開発についてご紹介していきます。
今回はレコード操作まわりです。間が空き過ぎて前回までの内容を忘れたという方はこちらを参照下さい。
Presto コネクターを実装する 第一回
Presto コネクターを実装する 第二回
処理の分割
Presto は、ユーザーからの問い合わせをテーブル単位でコネクターに渡します。コネクターはこの問い合わせに対する処理を ConnectorSplitManager インタフェースを使い、任意に分割することができます。
8.1. SPI Overview - Presto 0.96 Documentation
The split manager partitions the data for a table into the individual chunks that Presto will distribute to workers for processing.
例えば、問い合わせ先となるテーブルのデータが、実際には複数のファイルに分割されて保存されている場合、ファイル毎に処理を分割し、それを並列処理させることで効率よく問い合わせを処理することが可能になります。Presto コネクターの実装例である Example HTTP Connector がこの例です。
次に紹介する 3 つのインタフェースがこれらの役割を担います。
- com.facebook.presto.spi.ConnectorPartition
- com.facebook.presto.spi.ConnectorSplit
- com.facebook.presto.spi.ConnectorSplitManager
ConnectorPartition はデータソースのパーティションを表すインタフェースです。
ConnectorSplit はパーティションに対するアクセス処理単位を表します。後述する RecordCursor はこの単位で生成されます。
ConnectorSplitManager は、問い合わせに対し ConnectorPartition への分割と、ConnectorSplit への分割を行います。
尚、今回のサンプルコードでは、サンプルとしてのシンプルさを優先し、処理の分割には対応していません。問い合わせに対し常に ConnectorSplit ひとつで処理を行よう実装しています。
com.techscore.example.presto.plugin.TechscorePartition
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | package com.techscore.example.presto.plugin; import lombok.NonNull; import lombok.Value; import com.facebook.presto.spi.ConnectorColumnHandle; import com.facebook.presto.spi.ConnectorPartition; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TupleDomain; @Value public class TechscorePartition implements ConnectorPartition {     private final SchemaTableName schemaTableName;     private final String partitionId;     public TechscorePartition(@NonNull SchemaTableName schemaTableName) {         this.schemaTableName = schemaTableName;         this.partitionId = new StringBuilder()                 .append(schemaTableName.getSchemaName()).append(':')                 .append(schemaTableName.getTableName()).toString();     }     @Override     public TupleDomain<ConnectorColumnHandle> getTupleDomain() {         return TupleDomain.all();     } } | 
ここで実装する事になるのは partitionId プロパティと tupleDomain プロパティの Getter メソッドです。
partitionId プロパティは対象テーブルに対して一意になるよう割り当てたパーティションの ID です。サンプルコードはパーティションがひとつなのでパーティション毎に ID を割り当てる必要がなく、スキーマ名とテーブル名のみを結合した文字列を ID として使用しています。Getter メソッドの定義は lombok の @Value アノテーションによって自動生成されます。
tupleDomain はこのパーティションが受け持つデータの範囲を表します。サンプルコードではパーティションがひとつなので、常に「全て」を表す TupleDomain を返しています。
com.techscore.example.presto.plugin.TechscoreSplit
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | package com.techscore.example.presto.plugin; import java.util.List; import lombok.EqualsAndHashCode; import lombok.NonNull; import lombok.ToString; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.SchemaTableName; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; @EqualsAndHashCode @ToString public class TechscoreSplit implements ConnectorSplit {     private final String connectorId;     private final SchemaTableName schemaTableName;     @JsonCreator     public TechscoreSplit(             @JsonProperty("connectorId") @NonNull String connectorId,             @JsonProperty("schemaTableName") @NonNull SchemaTableName schemaTableName) {         this.connectorId = connectorId;         this.schemaTableName = schemaTableName;     }     @JsonProperty     public String getConnectorId() {         return connectorId;     }     @JsonProperty     public SchemaTableName getSchemaTableName() {         return schemaTableName;     }     @JsonIgnore     @Override     public List<HostAddress> getAddresses() {         return ImmutableList.of();     }     @JsonIgnore     @Override     public boolean isRemotelyAccessible() {         return true;     }     @JsonIgnore     @Override     public Object getInfo() {         return this;     } } | 
本クラスも前回の ConnectorTableHandle や ConnectorColumn 同様、@JsonCreator や @JsonProperty アノテーションを使って JSON でのシリアライズ/デシリアライズを可能とします。
address プロパティ、remotelyAccessible プロパティはデータソースへのアクセス方法のヒントになる情報ですが、サンプルコードであるため、それぞれ Getter メソッドが固定値を返すよう実装しています。
com.techscore.example.presto.plugin.TechscoreSplitManager
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | package com.techscore.example.presto.plugin; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import java.util.ArrayList; import java.util.Collections; import java.util.List; import lombok.Getter; import lombok.NonNull; import lombok.val; import com.facebook.presto.spi.ConnectorColumnHandle; import com.facebook.presto.spi.ConnectorPartition; import com.facebook.presto.spi.ConnectorPartitionResult; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.ConnectorSplitManager; import com.facebook.presto.spi.ConnectorSplitSource; import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.FixedSplitSource; import com.facebook.presto.spi.TupleDomain; import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.google.inject.name.Named; public class TechscoreSplitManager implements ConnectorSplitManager {     @Getter     private final String connectorId;     @Getter     private final TechscoreHandleResolver tHandleResolver;     @Getter     private final TechscoreConnectorConfig tConnectorConfig;     @Inject     public TechscoreSplitManager(             @Named("connectorId") @NonNull String connectorId,             @NonNull TechscoreHandleResolver tHandleResolver,             @NonNull TechscoreConnectorConfig tConnectorConfig) {         this.connectorId = connectorId;         this.tHandleResolver = tHandleResolver;         this.tConnectorConfig = tConnectorConfig;     }     @Override     public ConnectorPartitionResult getPartitions(             ConnectorTableHandle tableHandle,             TupleDomain<ConnectorColumnHandle> tupleDomain) {         val tTableHandle = getTHandleResolver().convertTableHandle(tableHandle);         val partitions = ImmutableList                 .<ConnectorPartition> of(new TechscorePartition(tTableHandle                         .getSchemaTableName()));         return new ConnectorPartitionResult(partitions, tupleDomain);     }     @Override     public ConnectorSplitSource getPartitionSplits(             ConnectorTableHandle tableHandle,             @NonNull List<ConnectorPartition> partitions) {         checkArgument(partitions.size() == 1);         val partition = partitions.get(0);         checkNotNull(partition);         checkArgument(TechscorePartition.class.isInstance(partition));         val tPartition = (TechscorePartition) partition;         val splits = new ArrayList<ConnectorSplit>();         splits.add(new TechscoreSplit(getConnectorId(), tPartition                 .getSchemaTableName()));         return new FixedSplitSource(getConnectorId(), splits);     } } | 
getPartitions() メソッドは問い合わせ内容をもとに ConnectorPartition を作成します。サンプルでは問い合わせの内容に関係なく、単一の TechscorePartition を生成しています。
ここで、引数 tupleDomain が問い合わせ条件を保持しています。tupleDomain がどのように条件を保持しているかは、次の実行結果を見るとイメージし易いでしょう。この実行結果は、Presto に対してクエリを実行した際の、引数 tupleDomain の内容を出力したものです。
select * from techscore.schema1.authors;
| 1 | TupleDomain:ALL | 
select * from techscore.schema1.authors where id = 1;
| 1 | TupleDomain:{TechscoreColumnHandle(connectorId=techscore, caseSensitiveName=id, name=id, type=bigint, ordinalPosition=0)=[[1]]} | 
select * from techscore.schema1.authors where id in (1, 2);
| 1 | TupleDomain:{TechscoreColumnHandle(connectorId=techscore, caseSensitiveName=id, name=id, type=bigint, ordinalPosition=0)=[[1], [2]]} | 
select * from techscore.schema1.authors where id >= 1;
| 1 | TupleDomain:{TechscoreColumnHandle(connectorId=techscore, caseSensitiveName=id, name=id, type=bigint, ordinalPosition=0)=[[1, <max>)]} | 
getPartitionSplits() メソッドは、getPartitions() で作成したパーティション情報をもとに ConnectorSplit を生成します。サンプルでは常に単一の TechscoreSplit を生成しています。
データアクセス
前述の通り Presto コネクターは ConnectorSplitManager で生成された ConnectorSplit 単位でデータソースにアクセスし、Presto にデータを返します。
この役割を担うのが、次のインタフェースです。
- com.facebook.presto.spi.RecordCursor
- com.facebook.presto.spi.RecordSet
- com.facebook.presto.spi.ConnectorRecordSetProvider
RecordCursor は JDBC の ResultSet のようなもので、問い合わせ結果に対し、行を進めながらフィールドデータにアクセスするインタフェースを提供します。
RecordSet は RecordCursor を生成する役割を担います。
ConnectorRecordSetProvider は ConnectorSplit 単位で RecordSet を生成する役割を担います。
尚、RecordSet、ConnectorRecordSetProvider をそれぞれ実装した TechscoreRecordSet と TechscoreRecordSetProvider クラスについては、それぞれ TechscoreRecordCursor のインスタンス、TechscoreRecordSet のインスタンスを生成しているだけなので説明を省き、ソースコードだけを掲載します。
com.techscore.example.presto.plugin.TechscoreRecordCursor
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | package com.techscore.example.presto.plugin; import static com.facebook.presto.spi.type.BigintType.BIGINT; import static com.facebook.presto.spi.type.BooleanType.BOOLEAN; import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.List; import lombok.Getter; import lombok.NonNull; import lombok.val; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.type.Type; public class TechscoreRecordCursor implements RecordCursor {     @Getter     private final List<TechscoreColumnHandle> tColumnHandles;     protected final List<List<Object>> data;     protected int index;     public TechscoreRecordCursor(             @NonNull List<TechscoreColumnHandle> tColumnHandles,             @NonNull List<List<Object>> data) {         this.tColumnHandles = tColumnHandles;         this.data = data;         this.index = -1;     }     @Override     public long getTotalBytes() {         return 0L;     }     @Override     public long getCompletedBytes() {         return 0L;     }     @Override     public long getReadTimeNanos() {         return 0L;     }     @Override     public Type getType(int field) {         return getTColumnHandles().get(field).getType();     }     protected Object getValue(int field) {         val ordinalPosition = getTColumnHandles().get(field)                 .getOrdinalPosition();         return data.get(index).get(ordinalPosition);     }     @Override     public boolean advanceNextPosition() {         int index = this.index + 1;         if (index >= data.size()) {             return false;         }         this.index = index;         return true;     }     @Override     public boolean getBoolean(int field) {         val type = getType(field);         if (BOOLEAN.equals(type)) {             return ((Boolean) getValue(field)).booleanValue();         }         throw new IllegalArgumentException();     }     @Override     public long getLong(int field) {         val type = getType(field);         if (BIGINT.equals(type)) {             return ((Long) getValue(field)).longValue();         }         if (TIMESTAMP.equals(type)) {             val dateTime = (LocalDateTime) getValue(field);             return dateTime.atZone(ZoneId.of("Asia/Tokyo")).toEpochSecond();         }         throw new IllegalArgumentException();     }     @Override     public double getDouble(int field) {         throw new UnsupportedOperationException();     }     @Override     public Slice getSlice(int field) {         val type = getType(field);         if (VARCHAR.equals(type)) {             return Slices.utf8Slice((String) getValue(field));         }         throw new IllegalArgumentException();     }     @Override     public boolean isNull(int field) {         val value = getValue(field);         if (value == null) {             return true;         }         if (value instanceof String && value.equals("")) {             return true;         }         return false;     }     @Override     public void close() {         ;     } } | 
本実装では、コンストラクタに問い合わせ結果となるデータが渡される実装です。
getTotalBytes(), getCompletedBytes(), getReadTimeNanos() メソッドは各種統計情報に利用する数値を返します。サンプルコードでは実装を省略し、常に 0 を返すようにしています。
getType() メソッドは指定フィールドのデータ型を返します。
advanceNextPosition() メソッドが行を進める役割を持ちます。
getBoolean(), getLong(), getDouble(), getSlice() メソッドがカレント行のフィールドデータにアクセスする役割を担います。
Presto コネクターでは、データを次の 4 つの Java データ型で表現します。第二回でもご紹介した基本的なデータ型とのマッピングは次のようになります。
- boolean - BooleanType.BOOLEAN
- long - BigintType.BIGINT / TimestampType.TIMESTAMP / DateType.DATE
- double - DoubleType.DOUBLE
- Slice - VarcharType.VARCHAR
ここで、TimestampType.TIMESTAMP は日時を 1970-01-01T00:00:00 UTC からの経過ミリ秒として扱い、DateType.DATE は日付を 1970-01-01 からの経過日数として扱います。
VarcharType.VARCHAR は Slice を使います。今回は Slices.utf8Slice(String) メソッドで String から Slice への変換を行っています。
RecordCursor には Presto のバージョン 0.113 から getObject(int) という Object を返すメソッドが追加されています。ARRAY 型や MAP 型等はこのメソッドで扱う仕様に変更されています。
isNull() はカレント行の指定フィールドデータが null であることを検証するメソッドです。
com.techscore.example.presto.plugin.TechscoreRecordSet
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | package com.techscore.example.presto.plugin; import java.util.List; import java.util.stream.Collectors; import lombok.Getter; import lombok.NonNull; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordSet; import com.facebook.presto.spi.type.Type; import com.google.common.collect.ImmutableList; public class TechscoreRecordSet implements RecordSet {     @Getter     private final TechscoreClient tClient;     @Getter     private final TechscoreSplit tSplit;     @Getter     private final List<TechscoreColumnHandle> tColumnHandles;     @Getter     private final List<Type> columnTypes;     public TechscoreRecordSet(@NonNull TechscoreClient tClient,             @NonNull TechscoreSplit tSplit,             @NonNull List<TechscoreColumnHandle> tColumnHandles) {         this.tClient = tClient;         this.tSplit = tSplit;         this.tColumnHandles = ImmutableList.copyOf(tColumnHandles);         this.columnTypes = ImmutableList.copyOf(tColumnHandles.stream()                 .map(TechscoreColumnHandle::getType)                 .collect(Collectors.toList()));     }     @Override     public RecordCursor cursor() {         return getTClient().createTRecordCursor(getTSplit(),                 getTColumnHandles());     } } | 
com.techscore.example.presto.plugin.TechscoreRecordSetProvider
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | package com.techscore.example.presto.plugin; import static com.google.common.base.Preconditions.checkArgument; import java.util.List; import java.util.stream.Collectors; import lombok.Getter; import lombok.NonNull; import lombok.val; import com.facebook.presto.spi.ConnectorColumnHandle; import com.facebook.presto.spi.ConnectorRecordSetProvider; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.RecordSet; import com.google.inject.Inject; import com.google.inject.name.Named; public class TechscoreRecordSetProvider implements ConnectorRecordSetProvider {     @Getter     private final String connectorId;     @Getter     private final TechscoreClient tClient;     @Getter     private final TechscoreHandleResolver tHandleResolver;     @Getter     private final TechscoreConnectorConfig tConnectorConfig;     @Inject     public TechscoreRecordSetProvider(             @Named("connectorId") @NonNull String connectorId,             @NonNull TechscoreClient tClient,             @NonNull TechscoreHandleResolver tHandleResolver,             @NonNull TechscoreConnectorConfig tConnectorConfig) {         this.connectorId = connectorId;         this.tClient = tClient;         this.tHandleResolver = tHandleResolver;         this.tConnectorConfig = tConnectorConfig;     }     @Override     public RecordSet getRecordSet(ConnectorSplit split,             List<? extends ConnectorColumnHandle> columns) {         val tSplit = getTHandleResolver().convertSplit(split);         checkArgument(tSplit.getConnectorId().equals(getConnectorId()));         val tColumnHandles = columns.stream()                 .map(getTHandleResolver()::convertColumnHandle)                 .collect(Collectors.toList());         return new TechscoreRecordSet(getTClient(), tSplit, tColumnHandles);     } } | 
動作確認
デプロイ方法は、ビルドした JAR ファイル、および依存する JAR ファイルを所定のディレクトリに放り込むだけです。今回のサンプルでは Presto インストールディレクトリ直下の plugin ディレクトリに techscore というディレクトリを作成し、そこにビルド結果となる techscore-presto-plugin-1.0.jar を入れるのみです。デプロイ後は Presto を再起動する必要があります。
| 1 2 3 | $ ls plugin/techscore techscore-presto-plugin-1.0.jar $ ./bin/launcher restart | 
起動完了後、Presto のコマンドラインツールを使い、動作確認を行います。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | presto:default> show catalogs;   Catalog -----------  jmx  techscore (2 rows) Query 20151214_103249_00014_4wdq9, FINISHED, 1 node Splits: 2 total, 2 done (100.00%) 0:00 [2 rows, 24B] [9 rows/s, 109B/s] presto:default> show schemas from techscore;        Schema --------------------  information_schema  schema1  sys (3 rows) Query 20151214_103303_00015_4wdq9, FINISHED, 1 node Splits: 2 total, 2 done (100.00%) 0:00 [3 rows, 79B] [29 rows/s, 782B/s] presto:default> show tables from techscore.schema1;   Table ---------  authors  entries (2 rows) Query 20151214_103312_00016_4wdq9, FINISHED, 1 node Splits: 2 total, 2 done (100.00%) 0:00 [2 rows, 120B] [14 rows/s, 868B/s] presto:default> desc techscore.schema1.authors;  Column |  Type   | Null | Partition Key | Comment --------+---------+------+---------------+---------  id     | bigint  | true | false         |  name   | varchar | true | false         | (2 rows) Query 20151214_103325_00017_4wdq9, FINISHED, 1 node Splits: 2 total, 2 done (100.00%) 0:00 [2 rows, 1.4KB] [10 rows/s, 7.54KB/s] presto:default> select * from techscore.schema1.authors;  id |    name ----+------------   1 | suzuki-kei   2 | ter@ (2 rows) Query 20151214_103401_00018_4wdq9, FINISHED, 1 node Splits: 2 total, 2 done (100.00%) 0:00 [2 rows, 0B] [11 rows/s, 0B/s] | 
最後に
来年は心を入れ替え、仕事はほどほどにしてブログに力を入れます!編集長!

 
						