Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ require (
gopkg.in/yaml.v2 v2.4.0
)

replace github.com/apache/pulsar-client-go v0.13.0-candidate-1.0.20240813105849-ab042ae714d1 => ../pulsar-client-go

require (
dario.cat/mergo v1.0.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
Expand Down
115 changes: 115 additions & 0 deletions pkg/ctl/schemas/get_all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package schemas

import (
"io"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func getAllSchemas(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get all schemas for a topic."
desc.CommandPermission = "This command requires namespace admin permissions."

var examples []cmdutils.Example
del := cmdutils.Example{
Desc: "Get all schemas for a topic",
Command: "pulsarctl schemas get-all (topic name)",
}

examples = append(examples, del)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "[\n" +
" {\n" +
" \"name\": \"test-schema\",\n" +
" \"schema\": {\n" +
" \"type\": \"record\",\n" +
" \"name\": \"Test\",\n" +
" \"fields\": [\n" +
" {\n" +
" \"name\": \"id\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"int\"\n" +
" ]\n" +
" },\n" +
" {\n" +
" \"name\": \"name\",\n" +
" \"type\": [\n" +
" \"null\",\n" +
" \"string\"\n" +
" ]\n" +
" }\n" +
" ]\n" +
" },\n" +
" \"type\": \"AVRO\",\n" +
" \"properties\": {}\n" +
" }\n" +
"]",
}

failOut := cmdutils.Output{
Desc: "HTTP 404 Not Found, please check if the topic name you entered is correct",
Out: "[✖] code: 404 reason: Not Found",
}

notTopicName := cmdutils.Output{
Desc: "you must specify a topic name, please check if the topic name is provided",
Out: "[✖] the topic name is not specified or the topic name is specified more than once",
}

out = append(out, successOut, failOut, notTopicName)
desc.CommandOutput = out

vc.SetDescription(
"get-all",
"Get the schema for a topic",
desc.ToString(),
desc.ExampleToString(),
"get-all",
)

vc.SetRunFuncWithNameArg(func() error {
return doGetAllSchemas(vc)
}, "the topic name is not specified or the topic name is specified more than once")

vc.EnableOutputFlagSet()
}

func doGetAllSchemas(vc *cmdutils.VerbCmd) error {
topic := vc.NameArg

admin := cmdutils.NewPulsarClient()
infos, err := admin.Schemas().GetAllSchemas(topic)
if err == nil {
oc := cmdutils.NewOutputContent().
WithObject(infos).
WithTextFunc(func(w io.Writer) error {
PrintSchemas(w, infos)
return nil
})
err = vc.OutputConfig.WriteOutput(vc.Command.OutOrStdout(), oc)
}
return err
}
8 changes: 8 additions & 0 deletions pkg/ctl/schemas/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func TestSchema(t *testing.T) {
assert.True(t, strings.Contains(getOut.String(), "AVRO"))
assert.True(t, strings.Contains(getOut.String(), "test-schema"))

getAllArgs := []string{"get-all", "test-schema"}
getAllOut, _, err := TestSchemasCommands(getAllSchemas, getAllArgs)

fmt.Print(getAllOut.String())
assert.Nil(t, err)
assert.True(t, strings.Contains(getAllOut.String(), "AVRO"))
assert.True(t, strings.Contains(getAllOut.String(), "test-schema"))

delArgs := []string{"delete", "test-schema"}
delOut, _, err := TestSchemasCommands(deleteSchema, delArgs)
assert.Nil(t, err)
Expand Down
10 changes: 10 additions & 0 deletions pkg/ctl/schemas/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,22 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
)

cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getSchema)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, getAllSchemas)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, deleteSchema)
cmdutils.AddVerbCmd(flagGrouping, resourceCmd, uploadSchema)

return resourceCmd
}

func PrintSchemas(w io.Writer, schemas []*utils.SchemaInfoWithVersion) {
_, _ = fmt.Fprintln(w, "[")
for _, schema := range schemas {
PrintSchema(w, schema)
_, _ = fmt.Fprint(w, "\n")
}
_, _ = fmt.Fprintln(w, "]")
}

func PrintSchema(w io.Writer, schema *utils.SchemaInfoWithVersion) {
name, err := json.MarshalIndent(schema.SchemaInfo.Name, "", " ")
if err != nil {
Expand Down