GCP BigQuery nos permite cargar datos de un fichero (con formato CSV, TXT, etc.) a tablas en nuestros datasets.
En ésta ocasión veremos como validar la existencia de un archivo CSV en un bucket (contenedor), cargarlo a una tabla temporal e insertarlo a una determinada tabla. Además veremos cómo crear un respaldo de ese archivo CSV a otro bucket.
BigQueryTransfer.java
import com.google.cloud.bigquery.*; import com.google.cloud.storage.*; import com.google.cloud.bigquerystorage.v1.*; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.util.JsonFormat; import java.io.*; import java.nio.channels.Channels; import java.nio.file.Files; import java.nio.file.Paths; import java.util.*; public class BigQueryTransfer { private static final String BUCKET_NAME = "your-bucket-name"; private static final String BACKUP_BUCKET_NAME = "your-backup-bucket-name"; private static final String DATASET_NAME = "mydataset"; private static final String TABLE_NAME = "transferencia"; private static final String PROJECT_ID = "myproject"; private static final String CSV_FILE_NAME = "datos.csv"; private static final String RESULT_CSV_NAME = "resultados.csv"; public static void main(String[] args) throws Exception { Storage storage = StorageOptions.getDefaultInstance().getService(); BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); // 1. Verificar la existencia del archivo CSV Blob blob = storage.get(BUCKET_NAME, CSV_FILE_NAME); if (blob == null) { System.out.println("El archivo " + CSV_FILE_NAME + " no existe en el bucket."); return; } // 2. Leer el contenido del archivo CSV List<String> listaContenido = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(Channels.newInputStream(blob.reader())))) { String line; while ((line = reader.readLine()) != null) { listaContenido.add(line); } } // 3. Crear la lista de Transferencia List<Transferencia> listaTransferencia = new ArrayList<>(); for (String line : listaContenido) { String[] parts = line.split(","); if (parts.length >= 3) { Transferencia transferencia = new Transferencia(parts[0], parts[1], parts[2]); listaTransferencia.add(transferencia); } } // 4. Guardar en BigQuery usando BigQuery Storage Write API writeToBigQueryStorageApi(listaTransferencia); // 5. Crear "resultados.csv" con datos de la tabla String query = "SELECT * FROM `" + PROJECT_ID + "." + DATASET_NAME + "." + TABLE_NAME + "`"; TableResult result = bigQuery.query(QueryJobConfiguration.newBuilder(query).build()); File resultFile = new File(RESULT_CSV_NAME); try (BufferedWriter writer = new BufferedWriter(new FileWriter(resultFile))) { for (FieldValueList row : result.iterateAll()) { writer.write(String.join(",", row.get(0).getStringValue(), row.get(1).getStringValue(), row.get(2).getStringValue())); writer.newLine(); } } // Subir "resultados.csv" al bucket BlobId resultBlobId = BlobId.of(BUCKET_NAME, RESULT_CSV_NAME); BlobInfo resultBlobInfo = BlobInfo.newBuilder(resultBlobId).build(); storage.create(resultBlobInfo, Files.readAllBytes(resultFile.toPath())); // 6. Mover "datos.csv" al bucket de respaldo BlobId sourceBlobId = BlobId.of(BUCKET_NAME, CSV_FILE_NAME); BlobId backupBlobId = BlobId.of(BACKUP_BUCKET_NAME, CSV_FILE_NAME); storage.copy(Storage.CopyRequest.of(sourceBlobId, backupBlobId)); storage.delete(sourceBlobId); System.out.println("Proceso completado con éxito."); } private static void writeToBigQueryStorageApi(List<Transferencia> listaTransferencia) throws Exception { try (BigQueryWriteClient client = BigQueryWriteClient.create()) { String tablePath = String.format("projects/%s/datasets/%s/tables/%s", PROJECT_ID, DATASET_NAME, TABLE_NAME); WriteStream writeStream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build(); WriteStream createdStream = client.createWriteStream(CreateWriteStreamRequest.newBuilder().setParent(tablePath).setWriteStream(writeStream).build()); ProtoSchema protoSchema = client.getWriteStream(GetWriteStreamRequest.newBuilder().setName(createdStream.getName()).build()).getTableSchema(); Descriptors.Descriptor descriptor = TableSchemaToDescriptor.parse(protoSchema.getProtoDescriptor()); for (Transferencia transferencia : listaTransferencia) { Message.Builder messageBuilder = DynamicMessage.newBuilder(descriptor); messageBuilder.setField(descriptor.findFieldByName("fecha"), transferencia.fecha); messageBuilder.setField(descriptor.findFieldByName("clave"), transferencia.clave); messageBuilder.setField(descriptor.findFieldByName("cuenta"), transferencia.cuenta); client.appendRows(AppendRowsRequest.newBuilder() .setWriteStream(createdStream.getName()) .addRows(ByteString.copyFrom(messageBuilder.build().toByteArray())) .build()); } client.finalizeWriteStream(FinalizeWriteStreamRequest.newBuilder().setName(createdStream.getName()).build()); } } static class Transferencia { String fecha; String clave; String cuenta; Transferencia(String fecha, String clave, String cuenta) { this.fecha = fecha; this.clave = clave; this.cuenta = cuenta; } } }
Con este programa:
- Verificamos la existencia del archivo "datos.csv".
- Leemos su contenido y lo almacenamos en una lista tipo String.
- Cargamos el contenido del archivo "datos.csv" a una tabla temporal.
- Creamos una lista de tipo "Transferencia".
- Guardamos los datos en la tabla "transferencia" usando BigQuery Storage Write API.
- Creamos un archivo en el bucket llamado "resultados.csv" a partir de una consulta a la tabla "transferencia".
- Creamos un respaldo de "datos.csv" a otro bucket.
Enlaces:
https://cloud.google.com/storage/docs/bucketshttps://medium.com/@bravnic/bigquery-storage-write-api-at-scale-7affcc2d7a93
https://cloud.google.com/bigquery/docs/write-api
https://www.googlecloudcommunity.com/gc/Data-Analytics/Using-Google-Bigquery-Storage-Write-API-with-high-concurrency/m-p/662308
No hay comentarios:
Publicar un comentario