Skip to content

Commit e3dac30

Browse files
committed
feat(PubSub): Add CreateTopicWithCloudStorageIngestion sample
1 parent ff3d936 commit e3dac30

File tree

2 files changed

+116
-0
lines changed

2 files changed

+116
-0
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
3+
/**
4+
* Copyright 2025 Google LLC.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* For instructions on how to run the full sample:
21+
*
22+
* @see https://github.com/GoogleCloudPlatform/php-docs-samples/blob/main/pubsub/api/README.md
23+
*/
24+
25+
namespace Google\Cloud\Samples\PubSub;
26+
27+
# [START pubsub_create_topic_with_cloud_storage_ingestion]
28+
use Google\Cloud\PubSub\PubSubClient;
29+
use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\AvroFormat;
30+
use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\PubSubAvroFormat;
31+
use Google\Cloud\PubSub\V1\IngestionDataSourceSettings\CloudStorage\TextFormat;
32+
use Google\Protobuf\Timestamp;
33+
34+
/**
35+
* Creates a topic with Cloud Storage Ingestion.
36+
*
37+
* @param string $projectId The Google project ID.
38+
* @param string $topicName The Pub/Sub topic name.
39+
* @param string $bucket Cloud Storage bucket.
40+
* @param string $inputFormat Input format for the Cloud Storage data. Must be one of text, avro, or pubsub_avro.
41+
* @param string $textDelimiter Delimiter for text format input.
42+
* @param string $matchGlob Glob pattern used to match objects that will be ingested. If unset, all objects will be ingested.
43+
* @param string $minimumObjectCreatedTime Only objects with a larger or equal creation timestamp will be ingested.
44+
*/
45+
function create_topic_with_cloud_storage_ingestion(
46+
string $projectId,
47+
string $topicName,
48+
string $bucket,
49+
string $inputFormat,
50+
string $minimumObjectCreatedTime,
51+
string $textDelimiter='',
52+
string $matchGlob=''
53+
): void {
54+
$datetime = new \DateTimeImmutable($minimumObjectCreatedTime);
55+
$timestamp = (new Timestamp())
56+
->setSeconds($datetime->getTimestamp())
57+
->setNanos($datetime->format('u') * 1000);
58+
59+
$cloudStorageData = [
60+
'bucket' => $bucket,
61+
'minimum_object_create_time' => $timestamp
62+
];
63+
64+
$cloudStorageData[$inputFormat . '_format'] = match($inputFormat) {
65+
'text' => new TextFormat(['delimiter' => $textDelimiter]),
66+
'avro' => new AvroFormat(),
67+
'pubsub_avro' => new PubSubAvroFormat(),
68+
default => throw new \InvalidArgumentException(
69+
'inputFormat must be in (\'text\', \'avro\', \'pubsub_avro\'); got value: ' . $inputFormat
70+
)
71+
};
72+
73+
if (!empty($matchGlob)) {
74+
$cloudStorageData['match_glob'] = $matchGlob;
75+
}
76+
77+
$pubsub = new PubSubClient([
78+
'projectId' => $projectId,
79+
]);
80+
81+
$topic = $pubsub->createTopic($topicName, [
82+
'ingestionDataSourceSettings' => [
83+
'cloud_storage' => $cloudStorageData
84+
]
85+
]);
86+
87+
printf('Topic created: %s' . PHP_EOL, $topic->name());
88+
}
89+
# [END pubsub_create_topic_with_cloud_storage_ingestion]
90+
require_once __DIR__ . '/../../../testing/sample_helpers.php';
91+
\Google\Cloud\Samples\execute_sample(__FILE__, __NAMESPACE__, $argv);

pubsub/api/test/pubsubTest.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,4 +487,29 @@ public function testPublishAndSubscribeWithOrderingKeys()
487487
$this->assertMatchesRegularExpression('/Created subscription with ordering/', $output);
488488
$this->assertMatchesRegularExpression('/\"enableMessageOrdering\":true/', $output);
489489
}
490+
491+
public function testCreateTopicWithCloudStorageIngestion()
492+
{
493+
$this->requireEnv('PUBSUB_EMULATOR_HOST');
494+
495+
$topic = 'test-topic-' . rand();
496+
$output = $this->runFunctionSnippet('create_topic_with_cloud_storage_ingestion', [
497+
self::$projectId,
498+
$topic,
499+
$this->requireEnv('GOOGLE_PUBSUB_STORAGE_BUCKET'),
500+
'text',
501+
'1970-01-01T00:00:00Z',
502+
"\n",
503+
'**.txt'
504+
]);
505+
$this->assertMatchesRegularExpression('/Topic created:/', $output);
506+
$this->assertMatchesRegularExpression(sprintf('/%s/', $topic), $output);
507+
508+
$output = $this->runFunctionSnippet('delete_topic', [
509+
self::$projectId,
510+
$topic,
511+
]);
512+
$this->assertMatchesRegularExpression('/Topic deleted:/', $output);
513+
$this->assertMatchesRegularExpression(sprintf('/%s/', $topic), $output);
514+
}
490515
}

0 commit comments

Comments
 (0)