diff --git a/README.md b/README.md index 60866876..b87df031 100644 --- a/README.md +++ b/README.md @@ -63,8 +63,6 @@ Frames currently supports the following backend types: - `tsdb` — a time-series database (TSDB). -- `csv` — a comma-separated-value (CSV) file. - This backend type is used only for testing purposes. #### `Client` Methods @@ -185,7 +183,7 @@ All client methods receive the following common parameters; additional, method-s - **Type:** `str` - **Requirement:** Required - - **Valid Values:** `"nosql"` | `"stream"` | `"tsdb"` | `"csv"` (for testing) + - **Valid Values:** `"nosql"` | `"stream"` | `"tsdb"` - **table** — The relative path to a data collection of the specified backend type in the target data container (as configured for the client object). For example, `"mytable"` or `"/examples/tsdb/my_metrics"`. diff --git a/_scripts/py_benchmark.py b/_scripts/py_benchmark.py index dd048084..fc3bf584 100755 --- a/_scripts/py_benchmark.py +++ b/_scripts/py_benchmark.py @@ -40,7 +40,6 @@ data = json.load(fp) for bench in data['benchmarks']: - # test_read[http-csv] match = re.match(r'test_(\w+)\[([a-z]+)', bench['name']) if not match: raise SystemExit('error: bad test name - {}'.format(bench['name'])) diff --git a/api/api.go b/api/api.go index 600b397b..5af6d0c3 100644 --- a/api/api.go +++ b/api/api.go @@ -33,7 +33,6 @@ import ( "github.com/v3io/frames" "github.com/v3io/frames/backends" // Load backends (make sure they register) - _ "github.com/v3io/frames/backends/csv" _ "github.com/v3io/frames/backends/kv" _ "github.com/v3io/frames/backends/stream" _ "github.com/v3io/frames/backends/tsdb" diff --git a/backends/csv/backend.go b/backends/csv/backend.go deleted file mode 100644 index 5d66b73a..00000000 --- a/backends/csv/backend.go +++ /dev/null @@ -1,508 +0,0 @@ -/* -Copyright 2018 Iguazio Systems Ltd. - -Licensed under the Apache License, Version 2.0 (the "License") with -an addition restriction as set forth herein. You may not use this -file except in compliance with the License. You may obtain a copy of -the License at http://www.apache.org/licenses/LICENSE-2.0. - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied. See the License for the specific language governing -permissions and limitations under the License. - -In addition, you may not use the software for any purposes that are -illegal under applicable law, and the grant of the foregoing license -under the Apache 2.0 license is conditioned upon your compliance with -such restriction. -*/ - -package csv - -import ( - "encoding/csv" - "fmt" - "io" - "os" - "strconv" - "strings" - "time" - - "github.com/nuclio/logger" - "github.com/pkg/errors" - "github.com/v3io/frames" - "github.com/v3io/frames/backends" - "github.com/v3io/frames/backends/utils" - v3io "github.com/v3io/v3io-go/pkg/dataplane" -) - -// Backend is CSV backend -type Backend struct { - rootDir string - logger logger.Logger -} - -// NewBackend returns a new CSV backend -func NewBackend(logger logger.Logger, v3ioContext v3io.Context, config *frames.BackendConfig, framesConfig *frames.Config) (frames.DataBackend, error) { - backend := &Backend{ - rootDir: config.RootDir, - logger: logger.GetChild("csv"), - } - - return backend, nil -} - -// Create creates a CSV file -func (b *Backend) Create(request *frames.CreateRequest) error { - csvPath := b.csvPath(request.Proto.Table) - // TODO: Overwrite? - if fileExists(csvPath) { - return fmt.Errorf("file '%q' already exists", request.Proto.Table) - } - - file, err := os.Create(csvPath) - if err != nil { - return errors.Wrapf(err, "cannot create file") - } - - defer file.Close() - if request.Proto.Schema == nil || len(request.Proto.Schema.Fields) == 0 { - return nil - } - - numFields := len(request.Proto.Schema.Fields) - names := make([]string, numFields) - for i, field := range request.Proto.Schema.Fields { - if field.Name == "" { - return fmt.Errorf("field %d with no name", i) - } - - names[i] = field.Name - } - - csvWriter := csv.NewWriter(file) - if err := csvWriter.Write(names); err != nil { - return errors.Wrapf(err, "cannot create header") - } - - csvWriter.Flush() - if err := file.Sync(); err != nil { - return errors.Wrap(err, "cannot flush CSV file") - } - - return nil -} - -// Delete will delete a table -func (b *Backend) Delete(request *frames.DeleteRequest) error { - - err := backends.ValidateRequest("csv", request.Proto, nil) - if err != nil { - return err - } - - csvPath := b.csvPath(request.Proto.Table) - if request.Proto.IfMissing == frames.FailOnError && !fileExists(csvPath) { - return fmt.Errorf("path to file '%q' doesn't exist", request.Proto.Table) - } - - if err := os.Remove(csvPath); err != nil { - return errors.Wrapf(err, "cannot delete file '%q'", request.Proto.Table) - } - - return nil -} - -// Read handles reading -func (b *Backend) Read(request *frames.ReadRequest) (frames.FrameIterator, error) { - - err := backends.ValidateRequest("csv", request.Proto, nil) - if err != nil { - return nil, err - } - - file, err := os.Open(b.csvPath(request.Proto.Table)) - if err != nil { - return nil, err - } - - reader := csv.NewReader(file) - columns, err := reader.Read() - if err != nil { - return nil, errors.Wrap(err, "cannot read header (columns)") - } - - it := &FrameIterator{ - logger: b.logger, - path: request.Proto.Table, - reader: reader, - columnNames: columns, - limit: int(request.Proto.Limit), - frameLimit: int(request.Proto.MessageLimit), - } - - return it, nil -} - -// Write handles writing -func (b *Backend) Write(request *frames.WriteRequest) (frames.FrameAppender, error) { - // TODO: Append? - file, err := os.Create(b.csvPath(request.Table)) - if err != nil { - return nil, err - } - - ca := &csvAppender{ - writer: file, - csvWriter: csv.NewWriter(file), - logger: b.logger, - } - - if request.ImmidiateData != nil { - if err := ca.Add(request.ImmidiateData); err != nil { - return nil, errors.Wrap(err, "cannot add immediate data") - } - } - - return ca, nil - -} - -func getInt(r *frames.ExecRequest, name string, defval int) int { - ival, err := r.Proto.Arg(name) - if err != nil { - return defval - } - - val, ok := ival.(int64) - if !ok { - return defval - } - - return int(val) -} - -// Exec executes a command -func (b *Backend) Exec(request *frames.ExecRequest) (frames.Frame, error) { - if strings.ToLower(request.Proto.Command) == "ping" { - b.logger.Info("PONG") - nRows, nCols := getInt(request, "rows", 37), getInt(request, "cols", 4) - cols := make([]frames.Column, nCols) - for c := 0; c < nCols; c++ { - name := fmt.Sprintf("col-%d", c) - bld := frames.NewSliceColumnBuilder(name, frames.IntType, nRows) - for r := 0; r < nRows; r++ { - if err := bld.Set(r, r*c); err != nil { - b.logger.WarnWith("cannot set column value", "name", name, "row", r) - } - } - cols[c] = bld.Finish() - } - return frames.NewFrame(cols, nil, nil) - } - - return nil, fmt.Errorf("CSV backend doesn't support execute command '%q'", request.Proto.Command) -} - -func (b *Backend) csvPath(table string) string { - return fmt.Sprintf("%s/%s", b.rootDir, table) -} - -// FrameIterator iterates over CSV -type FrameIterator struct { - logger logger.Logger - path string - reader *csv.Reader - frame frames.Frame - err error - columnNames []string - nRows int - limit int - frameLimit int -} - -// Next reads the next frame, return true of succeeded -func (it *FrameIterator) Next() bool { - rows, err := it.readNextRows() - if err != nil { - it.logger.ErrorWith("cannot read rows", "error", err) - it.err = err - return false - } - - if len(rows) == 0 { - return false - } - - it.frame, err = it.buildFrame(rows) - if err != nil { - it.logger.ErrorWith("cannot build a DataFrames iterator", "error", err) - it.err = err - return false - } - - return true -} - -// At return the current Frame -func (it *FrameIterator) At() frames.Frame { - return it.frame -} - -// Err returns the last error -func (it *FrameIterator) Err() error { - return it.err -} - -func (it *FrameIterator) readNextRows() ([][]string, error) { - var rows [][]string - for r := 0; it.inLimits(r); r, it.nRows = r+1, it.nRows+1 { - row, err := it.reader.Read() - if err != nil { - if err == io.EOF { - it.logger.DebugWith("EOF", "numRows", it.nRows) - return rows, nil - } - - return nil, err - } - - if len(row) != len(it.columnNames) { - err := fmt.Errorf("%s (row %d) number of columns doesn't match headers (%d != %d)", it.path, it.nRows, len(row), len(it.columnNames)) - it.logger.ErrorWith("row size mismatch", "error", err, "row", it.nRows) - return nil, err - } - - rows = append(rows, row) - } - - return rows, nil -} - -func (it *FrameIterator) inLimits(frameRow int) bool { - if it.limit > 0 && it.nRows >= it.limit { - return false - } - - if it.frameLimit > 0 && frameRow >= it.frameLimit { - return false - } - - return true -} - -func (it *FrameIterator) buildFrame(rows [][]string) (frames.Frame, error) { - columns := make([]frames.Column, len(it.columnNames)) - for c, colName := range it.columnNames { - var ( - val0 = it.parseValue(rows[0][c]) - col frames.Column - data interface{} - err error - ) - - switch val0 := val0.(type) { - case int64: - typedData := make([]int64, len(rows)) - typedData[0] = val0 - for r, row := range rows[1:] { - val, ok := it.parseValue(row[c]).(int64) - if !ok { - err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c) - it.logger.ErrorWith("type mismatch", "error", err) - return nil, err - } - - typedData[r+1] = val // +1 since we start in first row - } - data = typedData - case float64: - typedData := make([]float64, len(rows)) - typedData[0] = val0 - for r, row := range rows[1:] { - val, ok := it.parseValue(row[c]).(float64) - if !ok { - err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c) - it.logger.ErrorWith("type mismatch", "error", err) - return nil, err - } - - typedData[r+1] = val // +1 since we start in first row - } - data = typedData - case string: - typedData := make([]string, len(rows)) - typedData[0] = val0 - for r, row := range rows[1:] { - typedData[r+1] = row[c] // +1 since we start in first row - } - data = typedData - case time.Time: - typedData := make([]time.Time, len(rows)) - typedData[0] = val0 - for r, row := range rows[1:] { - val, ok := it.parseValue(row[c]).(time.Time) - if !ok { - err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c) - it.logger.ErrorWith("type mismatch", "error", err) - return nil, err - } - - typedData[r+1] = val // +1 since we start in first row - } - data = typedData - case bool: - typedData := make([]bool, len(rows)) - typedData[0] = val0 - for r, row := range rows[1:] { - val, ok := it.parseValue(row[c]).(bool) - if !ok { - err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c) - it.logger.ErrorWith("type mismatch", "error", err) - return nil, err - } - - typedData[r+1] = val // +1 since we start in first row - } - data = typedData - default: - return nil, fmt.Errorf("%s - unknown type '%T'", colName, val0) - } - - col, err = frames.NewSliceColumn(colName, data) - if err != nil { - it.logger.ErrorWith("cannot build column", "error", err, "column", colName) - return nil, errors.Wrapf(err, "cannot build column '%s'", colName) - } - - columns[c] = col - } - - return frames.NewFrame(columns, nil, nil) -} - -func (it *FrameIterator) parseValue(value string) interface{} { - // time/date formats - timeFormats := []string{time.RFC3339, time.RFC3339Nano, "2006-01-02"} - for _, format := range timeFormats { - t, err := time.Parse(format, value) - if err == nil { - return t - } - } - - // bool - switch strings.ToLower(value) { - case "true": - return true - case "false": - return false - } - - // int - i, err := strconv.Atoi(value) - if err == nil { - return int64(i) - } - - // float - f, err := strconv.ParseFloat(value, 64) - if err == nil { - return f - } - - // Leave as string - return value -} - -type csvAppender struct { - logger logger.Logger - writer io.Writer - csvWriter *csv.Writer - headerWritten bool - closed bool -} - -func (ca *csvAppender) Add(frame frames.Frame) error { - if ca.closed { - err := errors.New("Adding on a closed csv appender") - ca.logger.Error(err) - return err - } - ca.logger.InfoWith("adding frame", "size", frame.Len()) - names := frame.Names() - if !ca.headerWritten { - if err := ca.csvWriter.Write(names); err != nil { - ca.logger.ErrorWith("cannot write header", "error", err) - return errors.Wrap(err, "cannot write header") - } - ca.headerWritten = true - } - - for r := 0; r < frame.Len(); r++ { - record := make([]string, len(names)) - for c, name := range names { - col, err := frame.Column(name) - if err != nil { - ca.logger.ErrorWith("cannot get column", "error", err) - return errors.Wrap(err, "cannot get column") - } - - val, err := utils.ColAt(col, r) - if err != nil { - ca.logger.ErrorWith("cannot get value", "error", err, "name", name, "row", r) - return errors.Wrapf(err, "%s:%d cannot get value", name, r) - } - - record[c] = fmt.Sprintf("%v", val) - } - - if err := ca.csvWriter.Write(record); err != nil { - ca.logger.ErrorWith("cannot write record", "error", err) - return errors.Wrap(err, "cannot write record") - } - } - - return nil -} - -// File Sync -type syncer interface { - Sync() error -} - -// WaitForComplete wait for write completion -func (ca *csvAppender) WaitForComplete(timeout time.Duration) error { - if ca.closed { - err := errors.New("Adding on a closed csv appender") - ca.logger.Error(err) - return err - } - ca.csvWriter.Flush() - if err := ca.csvWriter.Error(); err != nil { - ca.logger.ErrorWith("CSV flush", "error", err) - return err - } - - if s, ok := ca.writer.(syncer); ok { - return s.Sync() - } - - return nil -} - -func (ca *csvAppender) Close() { - ca.closed = true -} - -func fileExists(path string) bool { - _, err := os.Stat(path) - return err == nil -} - -func init() { - if err := backends.Register("csv", NewBackend); err != nil { - panic(err) - } -} diff --git a/backends/csv/backend_test.go b/backends/csv/backend_test.go deleted file mode 100644 index 4b414bdc..00000000 --- a/backends/csv/backend_test.go +++ /dev/null @@ -1,181 +0,0 @@ -/* -Copyright 2018 Iguazio Systems Ltd. - -Licensed under the Apache License, Version 2.0 (the "License") with -an addition restriction as set forth herein. You may not use this -file except in compliance with the License. You may obtain a copy of -the License at http://www.apache.org/licenses/LICENSE-2.0. - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -implied. See the License for the specific language governing -permissions and limitations under the License. - -In addition, you may not use the software for any purposes that are -illegal under applicable law, and the grant of the foregoing license -under the Apache 2.0 license is conditioned upon your compliance with -such restriction. -*/ - -package csv - -import ( - "os" - "path" - "testing" - - "github.com/v3io/frames" - "github.com/v3io/frames/pb" -) - -var ( - csvData = []byte(`STATION,DATE,PRCP,SNWD,SNOW,TMAX,TMIN,AWND,WDF2,WDF5,WSF2,WSF5,PGTM,FMTM -GHCND:USW00094728,2000-01-01,0,-9999,0,100,11,26,250,230,72,94,1337,1337.7 -GHCND:USW00094728,2000-01-02,0,-9999,0,156,61,21,260,260,72,112,2313,2314.7 -GHCND:USW00094728,2000-01-03,0,-9999,0,178,106,30,260,250,67,94,320,321.7 -GHCND:USW00094728,2000-01-04,178,-9999,0,156,78,35,320,350,67,107,1819,1840.7 -GHCND:USW00094728,2000-01-05,0,-9999,0,83,-17,51,330,340,107,143,843,844.7 -GHCND:USW00094728,2000-01-06,0,-9999,0,56,-22,30,220,250,67,98,1833,1834.7 -GHCND:USW00094728,2000-01-07,0,-9999,0,94,17,42,300,310,103,156,1521,1601.7 -GHCND:USW00094728,2000-01-09,5,-9999,0,106,28,26,270,270,63,89,22,601.7 -GHCND:USW00094728,2000-01-10,213,-9999,0,144,67,41,280,260,94,139,1736,1758.7 -GHCND:USW00094728,2000-01-11,0,-9999,0,111,44,49,300,310,112,174,1203,1203.7 -GHCND:USW00094728,2000-01-12,0,-9999,0,83,39,39,330,330,94,161,536,610.7 -GHCND:USW00094728,2000-01-13,13,-9999,0,39,-78,51,90,10,103,143,1539,843.7 -`) - numCSVRows = 12 - numCSVCols = 14 - colTypes = map[string]frames.DType{ - "STATION": frames.StringType, - "DATE": frames.TimeType, - "PRCP": frames.IntType, - "SNWD": frames.IntType, - "FMTM": frames.FloatType, - } -) - -func TestCSV(t *testing.T) { - req := &frames.ReadRequest{Proto: &pb.ReadRequest{}} - result := loadTempCSV(t, req) - - nRows := totalRows(result) - if nRows != numCSVRows { - t.Fatalf("# rows mismatch %d != %d", nRows, numCSVRows) - } - - for _, frame := range result { - if len(frame.Names()) != numCSVCols { - t.Fatalf("# columns mismatch %d != %d", len(frame.Names()), numCSVRows) - } - - for name, dtype := range colTypes { - col, err := frame.Column(name) - if err != nil { - t.Fatalf("can't find column %q", name) - } - - if col.DType() != dtype { - t.Fatalf("dype mismatch %d != %d", dtype, col.DType()) - } - } - } - -} - -func TestLimit(t *testing.T) { - limit := numCSVRows - 3 - - req := &frames.ReadRequest{Proto: &pb.ReadRequest{}} - req.Proto.Limit = int64(limit) - - result := loadTempCSV(t, req) - if nRows := totalRows(result); nRows != limit { - t.Fatalf("got %d rows, expected %d", nRows, limit) - } -} - -func TestMaxInMessage(t *testing.T) { - frameLimit := numCSVRows / 3 - - req := &frames.ReadRequest{Proto: &pb.ReadRequest{}} - req.Proto.MessageLimit = int64(frameLimit) - - result := loadTempCSV(t, req) - if nRows := totalRows(result); nRows != numCSVRows { - t.Fatalf("got %d rows, expected %d", nRows, numCSVRows) - } - - for _, frame := range result { - if frame.Len() > frameLimit { - t.Fatalf("frame too big (%d > %d)", frame.Len(), frameLimit) - } - } -} - -func totalRows(result []frames.Frame) int { - total := 0 - for _, frame := range result { - total += frame.Len() - } - - return total -} - -func loadTempCSV(t *testing.T, req *frames.ReadRequest) []frames.Frame { - logger, err := frames.NewLogger("debug") - if err != nil { - t.Fatalf("can't create logger - %s", err) - } - - csvPath, err := tmpCSV() - if err != nil { - t.Fatal(err) - } - - cfg := &frames.BackendConfig{ - Name: "testCsv", - Type: "csv", - RootDir: path.Dir(csvPath), - } - - backend, err := NewBackend(logger, nil, cfg, nil) - if err != nil { - t.Fatal(err) - } - - req.Proto.Table = path.Base(csvPath) - it, err := backend.Read(req) - if err != nil { - t.Fatal(err) - } - - var result []frames.Frame - for it.Next() { - result = append(result, it.At()) - } - - if err := it.Err(); err != nil { - t.Fatal(err) - } - - return result -} - -func tmpCSV() (string, error) { - tmp, err := os.CreateTemp("", "csv-test") - if err != nil { - return "", err - } - - _, err = tmp.Write(csvData) - if err != nil { - return "", nil - } - - if err := tmp.Sync(); err != nil { - return "", err - } - - return tmp.Name(), nil -} diff --git a/clients/py/tests/conftest.py b/clients/py/tests/conftest.py index 10cc649a..1ff573e5 100644 --- a/clients/py/tests/conftest.py +++ b/clients/py/tests/conftest.py @@ -45,7 +45,7 @@ {'type': 'tsdb', 'workers': 16}, ] -test_backends = ['csv'] +test_backends = [] if has_session: test_backends.extend(backend['type'] for backend in extra_backends) @@ -101,12 +101,7 @@ def framesd(): 'log': { 'level': 'debug', }, - 'backends': [ - { - 'type': 'csv', - 'rootDir': root_dir, - }, - ] + 'backends': [] } if has_session: diff --git a/clients/py/tests/pip_docker.py b/clients/py/tests/pip_docker.py deleted file mode 100644 index 2b477481..00000000 --- a/clients/py/tests/pip_docker.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2018 Iguazio -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Code used by test_pip_docker - -from argparse import ArgumentParser - -import v3io_frames as v3f - -parser = ArgumentParser() -parser.add_argument('--grpc-port', default='8081') -parser.add_argument('--http-port', default='8080') -args = parser.parse_args() - -client = v3f.Client('localhost:{}'.format(args.grpc_port)) -df = client.read('csv', table='weather.csv') -assert len(df) > 0, 'empty df' diff --git a/clients/py/tests/test_benchmark.py b/clients/py/tests/test_benchmark.py deleted file mode 100644 index 565c93d4..00000000 --- a/clients/py/tests/test_benchmark.py +++ /dev/null @@ -1,33 +0,0 @@ -import pytest - -import v3io_frames as v3f -from conftest import has_go - -from test_integration import integ_params, csv_df - -wdf = csv_df(1982) - - -def read_benchmark(client): - for df in client.read('csv', 'weather.csv'): - assert len(df), 'empty df' - - -def write_benchmark(client, df): - client.write('csv', 'write-bench.csv', df) - - -@pytest.mark.skipif(not has_go, reason='Go SDK not found') -@pytest.mark.parametrize('protocol,backend', integ_params) -def test_read(benchmark, framesd, protocol, backend): - addr = getattr(framesd, '{}_addr'.format(protocol)) - client = v3f.Client(addr) - benchmark(read_benchmark, client) - - -@pytest.mark.skipif(not has_go, reason='Go SDK not found') -@pytest.mark.parametrize('protocol,backend', integ_params) -def test_write(benchmark, framesd, protocol, backend): - addr = getattr(framesd, '{}_addr'.format(protocol)) - client = v3f.Client(addr) - benchmark(write_benchmark, client, wdf) diff --git a/clients/py/tests/test_cudf.py b/clients/py/tests/test_cudf.py index 1a79e689..f1b53318 100644 --- a/clients/py/tests/test_cudf.py +++ b/clients/py/tests/test_cudf.py @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from time import sleep, time - import pandas as pd import pytest import v3io_frames as v3f -from conftest import has_go from conftest import test_backends try: @@ -28,27 +25,6 @@ has_cudf = False -@pytest.mark.skipif(not has_cudf, reason='cudf not found') -@pytest.mark.skipif(not has_go, reason='Go SDK not found') -def test_cudf(framesd, session): - df = cudf.DataFrame({ - 'a': [1, 2, 3], - 'b': [1.1, 2.2, 3.3], - }) - - c = v3f.Client(framesd.grpc_addr, frame_factory=cudf.DataFrame) - backend = 'csv' - table = 'cudf-{}'.format(int(time())) - print('table = {}'.format(table)) - - c.write(backend, table, [df]) - sleep(1) # Let db flush - rdf = c.read(backend, table=table) - assert isinstance(rdf, cudf.DataFrame), 'not a cudf.DataFrame' - assert len(rdf) == len(df), 'wrong frame size' - assert set(rdf.columns) == set(df.columns), 'columns mismatch' - - @pytest.mark.skipif(not has_cudf, reason='cudf not found') def test_concat_categorical(): df1 = cudf.DataFrame({'a': range(10, 13), 'b': range(50, 53)}) diff --git a/clients/py/tests/test_pip_docker.py b/clients/py/tests/test_pip_docker.py deleted file mode 100644 index cfec2d51..00000000 --- a/clients/py/tests/test_pip_docker.py +++ /dev/null @@ -1,93 +0,0 @@ -# Copyright 2018 Iguazio -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -from contextlib import contextmanager -from shutil import copy -from subprocess import PIPE, run -from sys import executable -from tempfile import mkdtemp - -import pytest - -from conftest import here, is_travis - -config = ''' -log: - level: debug - -backends: - - type: "csv" - rootDir: "/csv-root" -''' - -docker_image = 'quay.io/v3io/frames:unstable' - - -# docker run \ -# -v /path/to/config.yaml:/etc/framesd.yaml \ -# quay.io/v3io/frames - - -def docker_ports(cid): - out = run(['docker', 'inspect', cid], stdout=PIPE, check=True) - obj = json.loads(out.stdout.decode('utf-8'))[0] - grpc_port = obj['NetworkSettings']['Ports']['8081/tcp'][0]['HostPort'] - http_port = obj['NetworkSettings']['Ports']['8080/tcp'][0]['HostPort'] - - return grpc_port, http_port - - -@contextmanager -def docker(tmp, cfg_file): - cmd = [ - 'docker', 'run', - '-d', - '-v', '{}:/csv-root'.format(tmp), - '-v', '{}:/etc/framesd.yaml'.format(cfg_file), - '-p', '8080', - '-p', '8081', - docker_image, - ] - proc = run(cmd, stdout=PIPE, check=True) - cid = proc.stdout.decode('utf-8').strip() - grpc_port, http_port = docker_ports(cid) - try: - yield grpc_port, http_port - finally: - run(['docker', 'rm', '-f', cid]) - - -@pytest.mark.skipif(not is_travis, reason='integration test') -def test_pip_docker(): - tmp = mkdtemp() - run(['virtualenv', '-p', executable, tmp], check=True) - python = '{}/bin/python'.format(tmp) - # Run in different directoy so local v3io_frames won't be used - run([python, '-m', 'pip', 'install', 'v3io_frames'], check=True, cwd=tmp) - run(['docker', 'pull', docker_image], check=True) - - cfg_file = '{}/framesd.yaml'.format(tmp) - with open(cfg_file, 'w') as out: - out.write(config) - - copy('{}/weather.csv'.format(here), tmp) - with docker(tmp, cfg_file) as (grpc_port, http_port): - cmd = [ - python, '{}/pip_docker.py'.format(here), - '--grpc-port', grpc_port, - '--http-port', http_port, - ] - # Run in different directoy so local v3io_frames won't be used - run(cmd, check=True, cwd=tmp) diff --git a/clients/py/v3io_frames/client.py b/clients/py/v3io_frames/client.py index 351b9e5b..05159170 100644 --- a/clients/py/v3io_frames/client.py +++ b/clients/py/v3io_frames/client.py @@ -198,7 +198,7 @@ def read(self, backend, table='', query='', columns=None, filter='', Common Parameters ---------- backend (Required) : str - Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests) + Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' table : str Path to the collection to query; ignored when the path is set in the `query` parameter (currently supported for the 'tsdb' backend) @@ -261,7 +261,7 @@ def write(self, backend, table, dfs, expression='', condition='', Parameters ---------- backend (Required) : str - Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests) + Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' table (Required) : str Path to the collection to write dfs (Required) : a single DataFrame (DF), a DF list, or a DF iterator @@ -327,11 +327,11 @@ def create(self, backend, table, schema=None, if_exists=FAIL, **kw): Parameters ---------- backend (Required) : str - Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests) + Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' table (Required) : str Table to create - schema (Optional) : Backend-specific data schema or None - Table schema; used for testing purposes with the 'csv' backend + schema (defunct) : Backend-specific data schema or None + Table schema; formerly used for testing purposes with the 'csv' backend if_exists (Optional) : int (frames_pb2 pb.ErrorOptions) Determines the behavior when the specified collection already exists - `FAIL` (default) to raise an error or `IGNORE` to ignore @@ -353,7 +353,7 @@ def delete(self, backend, table, filter='', start='', end='', Parameters ---------- backend (Required) : str - Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests) + Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' table (Required) : str Path to the collection to delete or from which to delete items filter (Optional) : str @@ -396,7 +396,7 @@ def execute(self, backend, table, command='', args=None): Parameters ---------- backend (Required) : str - Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests) + Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' table (Required) : str Path to the collection on which to execute the specified command command (Required) : str @@ -425,7 +425,7 @@ def history(self, backend='', container='', table='', user='', action='', min_st Parameters ---------- backend (Optional) : str - Filter by Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests) + Filter by Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' container (Optional) : str Filter by associated v3io container table (Optional) : str diff --git a/config.go b/config.go index 4b0022d3..6eba3611 100644 --- a/config.go +++ b/config.go @@ -134,7 +134,7 @@ func (c *Config) Validate() error { // BackendConfig is default backend configuration type BackendConfig struct { - Type string `json:"type"` // v3io, csv, ... + Type string `json:"type"` // v3io, ... Name string `json:"name"` Workers int `json:"workers"` UpdateWorkersPerVN int `json:"updateWorkersPerVN"` @@ -223,7 +223,7 @@ func initBackendDefaults(cfg *BackendConfig, framesConfig *Config) { if cfg.V3ioGoWorkers == 0 { switch cfg.Name { - case "csv", "stream": + case "stream": cfg.V3ioGoWorkers = 256 default: cfg.V3ioGoWorkers = 1024 diff --git a/config_example.yaml b/config_example.yaml index c40e3124..fe8634ae 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -11,5 +11,3 @@ backends: - type: "stream" - type: "tsdb" workers: 16 - - type: "csv" - rootdir: "/mnt/csvroot" diff --git a/grpc/end2end_test.go b/grpc/end2end_test.go index 114b05df..6c580028 100644 --- a/grpc/end2end_test.go +++ b/grpc/end2end_test.go @@ -23,7 +23,6 @@ package grpc_test import ( "fmt" "net" - "os" "reflect" "testing" "time" @@ -34,11 +33,6 @@ import ( ) func TestEnd2End(t *testing.T) { - tmpDir, err := os.MkdirTemp("", "frames-grpc-e2e") - if err != nil { - t.Fatal(err) - } - backendName := "e2e-backend" cfg := &frames.Config{ Log: frames.LogConfig{ @@ -46,9 +40,8 @@ func TestEnd2End(t *testing.T) { }, Backends: []*frames.BackendConfig{ { - Name: backendName, - Type: "csv", - RootDir: tmpDir, + Name: backendName, + Type: "kv", }, }, } @@ -81,8 +74,9 @@ func TestEnd2End(t *testing.T) { tableName := "e2e" writeReq := &frames.WriteRequest{ - Backend: backendName, - Table: tableName, + Backend: backendName, + Table: tableName, + SaveMode: frames.OverwriteTable, } appender, err := client.Write(writeReq) @@ -126,17 +120,6 @@ func TestEnd2End(t *testing.T) { if nRows != frame.Len() { t.Fatalf("# of rows mismatch - %d != %d", nRows, frame.Len()) } - - // Exec - execReq := &pb.ExecRequest{ - Backend: backendName, - Table: tableName, - Command: "ping", - } - - if _, err := client.Exec(execReq); err != nil { - t.Fatalf("can't exec - %s", err) - } } func makeFrame() (frames.Frame, error) { diff --git a/http/end2end_test.go b/http/end2end_test.go index 084fa5f1..2380bff3 100644 --- a/http/end2end_test.go +++ b/http/end2end_test.go @@ -25,7 +25,6 @@ import ( "fmt" "net" nhttp "net/http" - "os" "reflect" "testing" "time" @@ -36,11 +35,6 @@ import ( ) func TestEnd2End(t *testing.T) { - tmpDir, err := os.MkdirTemp("", "frames-e2e") - if err != nil { - t.Fatal(err) - } - backendName := "e2e-backend" cfg := &frames.Config{ Log: frames.LogConfig{ @@ -48,9 +42,8 @@ func TestEnd2End(t *testing.T) { }, Backends: []*frames.BackendConfig{ { - Name: backendName, - Type: "csv", - RootDir: tmpDir, + Name: backendName, + Type: "kv", }, }, } @@ -84,8 +77,9 @@ func TestEnd2End(t *testing.T) { tableName := "e2e" writeReq := &frames.WriteRequest{ - Backend: backendName, - Table: tableName, + Backend: backendName, + Table: tableName, + SaveMode: frames.OverwriteTable, } appender, err := client.Write(writeReq) @@ -131,17 +125,6 @@ func TestEnd2End(t *testing.T) { } testGrafana(t, url, backendName, tableName) - - // Exec - execReq := &pb.ExecRequest{ - Backend: backendName, - Table: tableName, - Command: "ping", - } - - if _, err := client.Exec(execReq); err != nil { - t.Fatalf("can't exec - %s", err) - } } func testGrafana(t *testing.T, baseURL string, backend string, table string) { diff --git a/http/server_example_test.go b/http/server_example_test.go index 2f4665b1..37509063 100644 --- a/http/server_example_test.go +++ b/http/server_example_test.go @@ -34,8 +34,6 @@ log: backends: - type: "kv" - - type: "csv" - rootDir = "/tmp" `) func ExampleServer() { diff --git a/http/server_test.go b/http/server_test.go index ecb7fb14..464ddb5c 100644 --- a/http/server_test.go +++ b/http/server_test.go @@ -38,7 +38,7 @@ func createServer() (*Server, error) { Backends: []*frames.BackendConfig{ { Name: "weather", - Type: "csv", + Type: "kv", }, }, } diff --git a/integration_test.go b/integration_test.go index 73dbf8c4..bdec7df5 100644 --- a/integration_test.go +++ b/integration_test.go @@ -106,12 +106,7 @@ func sessionInfo(t testing.TB) *frames.Session { } func genConfig(root string, session *frames.Session) *frames.Config { - backends := []*frames.BackendConfig{ - { - Type: "csv", - RootDir: root, - }, - } + var backends []*frames.BackendConfig if session != nil { backends = append(backends, &frames.BackendConfig{ diff --git a/test/basic_suite_integration_test.go b/test/basic_suite_integration_test.go index 9ac973e2..684df6dd 100644 --- a/test/basic_suite_integration_test.go +++ b/test/basic_suite_integration_test.go @@ -32,7 +32,6 @@ var ( kvSuites = []SuiteCreateFunc{GetKvTestsConstructorFunc()} tsdbSuites = []SuiteCreateFunc{GetTsdbTestsConstructorFunc()} streamSuites = []SuiteCreateFunc{GetStreamTestsConstructorFunc()} - csvSuites = []SuiteCreateFunc{GetCsvTestsConstructorFunc()} ) type testInfo struct { @@ -96,13 +95,6 @@ func (mainSuite *mainTestSuite) TestStreamBackend() { mainSuite.runSubSuites(streamSuites) } -func (mainSuite *mainTestSuite) TestCSVBackend() { - if !strings.Contains(mainSuite.info.backendsToTest, "csv") { - mainSuite.T().Skip("skipping csv backend tests") - } - mainSuite.runSubSuites(csvSuites) -} - func (mainSuite *mainTestSuite) runSubSuites(suites []SuiteCreateFunc) { for _, currSuite := range suites { // Run both Grpc and Http tests @@ -193,12 +185,7 @@ func sessionInfo(t testing.TB) *frames.Session { } func generateConfig(root string, session *frames.Session) *frames.Config { - backends := []*frames.BackendConfig{ - { - Type: "csv", - RootDir: root, - }, - } + var backends []*frames.BackendConfig if session != nil { backends = append(backends, &frames.BackendConfig{ @@ -297,7 +284,7 @@ func setupTest(t testing.TB, internalLogger logger.Logger) *testInfo { info.debugMode = strings.ToLower(os.Getenv("DEBUG")) == "true" info.backendsToTest = os.Getenv("TEST_BACKENDS") if info.backendsToTest == "" { - info.backendsToTest = "kv,tsdb,stream,csv" + info.backendsToTest = "kv,tsdb,stream" } info.root = setupRoot(t) diff --git a/test/csv_integration_test.go b/test/csv_integration_test.go deleted file mode 100644 index d3627e0a..00000000 --- a/test/csv_integration_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package test - -import ( - "fmt" - "math/rand" - "testing" - "time" - - "github.com/nuclio/logger" - "github.com/stretchr/testify/suite" - "github.com/v3io/frames" - "github.com/v3io/frames/pb" - v3io "github.com/v3io/v3io-go/pkg/dataplane" -) - -var ( - random = rand.New(rand.NewSource(time.Now().Unix())) - size = 200 -) - -type CsvTestSuite struct { - suite.Suite - tablePath string - suiteTimestamp int64 - basicQueryTime int64 - client frames.Client - backendName string -} - -func GetCsvTestsConstructorFunc() SuiteCreateFunc { - return func(client frames.Client, _ v3io.Container, _ logger.Logger) suite.TestingSuite { - return &CsvTestSuite{client: client, backendName: "csv"} - } -} - -func (csvSuite *CsvTestSuite) SetupSuite() { - csvSuite.Require().NotNil(csvSuite.client, "client not set") -} - -func (csvSuite *CsvTestSuite) generateSampleFrame(t testing.TB) frames.Frame { - var ( - columns []frames.Column - col frames.Column - err error - ) - - bools := make([]bool, size) - for i := range bools { - if random.Float64() < 0.5 { - bools[i] = true - } - } - col, err = frames.NewSliceColumn("bools", bools) - csvSuite.Require().NoError(err) - columns = append(columns, col) - - col = FloatCol(t, "floats", size) - columns = append(columns, col) - - ints := make([]int64, size) - for i := range ints { - ints[i] = random.Int63() - } - col, err = frames.NewSliceColumn("ints", ints) - csvSuite.Require().NoError(err) - columns = append(columns, col) - - col = StringCol(t, "strings", size) - columns = append(columns, col) - - times := make([]time.Time, size) - for i := range times { - times[i] = time.Now().Add(time.Duration(i) * time.Second) - } - col, err = frames.NewSliceColumn("times", times) - csvSuite.Require().NoError(err) - columns = append(columns, col) - - frame, err := frames.NewFrame(columns, nil, nil) - csvSuite.Require().NoError(err) - - return frame -} - -func (csvSuite *CsvTestSuite) TestAll() { - table := fmt.Sprintf("csv_test_all%d", time.Now().UnixNano()) - - csvSuite.T().Log("write") - frame := csvSuite.generateSampleFrame(csvSuite.T()) - wreq := &frames.WriteRequest{ - Backend: csvSuite.backendName, - Table: table, - } - - appender, err := csvSuite.client.Write(wreq) - csvSuite.Require().NoError(err) - - err = appender.Add(frame) - csvSuite.Require().NoError(err) - - err = appender.WaitForComplete(10 * time.Second) - csvSuite.Require().NoError(err) - - time.Sleep(3 * time.Second) // Let DB sync - - csvSuite.T().Log("read") - rreq := &pb.ReadRequest{ - Backend: csvSuite.backendName, - Table: table, - } - - it, err := csvSuite.client.Read(rreq) - csvSuite.Require().NoError(err) - - for it.Next() { - // TODO: More checks - fr := it.At() - csvSuite.Require().Contains([]int{fr.Len(), fr.Len() - 1}, frame.Len(), "wrong length") - } - - csvSuite.Require().NoError(it.Err()) - - csvSuite.T().Log("delete") - dreq := &pb.DeleteRequest{ - Backend: csvSuite.backendName, - Table: table, - } - - err = csvSuite.client.Delete(dreq) - csvSuite.Require().NoError(err) -}