diff --git a/README.md b/README.md
index 60866876..b87df031 100644
--- a/README.md
+++ b/README.md
@@ -63,8 +63,6 @@ Frames currently supports the following backend types:
- `tsdb` — a time-series database (TSDB).
-- `csv` — a comma-separated-value (CSV) file.
- This backend type is used only for testing purposes.
#### `Client` Methods
@@ -185,7 +183,7 @@ All client methods receive the following common parameters; additional, method-s
- **Type:** `str`
- **Requirement:** Required
- - **Valid Values:** `"nosql"` | `"stream"` | `"tsdb"` | `"csv"` (for testing)
+ - **Valid Values:** `"nosql"` | `"stream"` | `"tsdb"`
- **table** — The relative path to a data collection of the specified backend type in the target data container (as configured for the client object).
For example, `"mytable"` or `"/examples/tsdb/my_metrics"`.
diff --git a/_scripts/py_benchmark.py b/_scripts/py_benchmark.py
index dd048084..fc3bf584 100755
--- a/_scripts/py_benchmark.py
+++ b/_scripts/py_benchmark.py
@@ -40,7 +40,6 @@
data = json.load(fp)
for bench in data['benchmarks']:
- # test_read[http-csv]
match = re.match(r'test_(\w+)\[([a-z]+)', bench['name'])
if not match:
raise SystemExit('error: bad test name - {}'.format(bench['name']))
diff --git a/api/api.go b/api/api.go
index 600b397b..5af6d0c3 100644
--- a/api/api.go
+++ b/api/api.go
@@ -33,7 +33,6 @@ import (
"github.com/v3io/frames"
"github.com/v3io/frames/backends"
// Load backends (make sure they register)
- _ "github.com/v3io/frames/backends/csv"
_ "github.com/v3io/frames/backends/kv"
_ "github.com/v3io/frames/backends/stream"
_ "github.com/v3io/frames/backends/tsdb"
diff --git a/backends/csv/backend.go b/backends/csv/backend.go
deleted file mode 100644
index 5d66b73a..00000000
--- a/backends/csv/backend.go
+++ /dev/null
@@ -1,508 +0,0 @@
-/*
-Copyright 2018 Iguazio Systems Ltd.
-
-Licensed under the Apache License, Version 2.0 (the "License") with
-an addition restriction as set forth herein. You may not use this
-file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0.
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-implied. See the License for the specific language governing
-permissions and limitations under the License.
-
-In addition, you may not use the software for any purposes that are
-illegal under applicable law, and the grant of the foregoing license
-under the Apache 2.0 license is conditioned upon your compliance with
-such restriction.
-*/
-
-package csv
-
-import (
- "encoding/csv"
- "fmt"
- "io"
- "os"
- "strconv"
- "strings"
- "time"
-
- "github.com/nuclio/logger"
- "github.com/pkg/errors"
- "github.com/v3io/frames"
- "github.com/v3io/frames/backends"
- "github.com/v3io/frames/backends/utils"
- v3io "github.com/v3io/v3io-go/pkg/dataplane"
-)
-
-// Backend is CSV backend
-type Backend struct {
- rootDir string
- logger logger.Logger
-}
-
-// NewBackend returns a new CSV backend
-func NewBackend(logger logger.Logger, v3ioContext v3io.Context, config *frames.BackendConfig, framesConfig *frames.Config) (frames.DataBackend, error) {
- backend := &Backend{
- rootDir: config.RootDir,
- logger: logger.GetChild("csv"),
- }
-
- return backend, nil
-}
-
-// Create creates a CSV file
-func (b *Backend) Create(request *frames.CreateRequest) error {
- csvPath := b.csvPath(request.Proto.Table)
- // TODO: Overwrite?
- if fileExists(csvPath) {
- return fmt.Errorf("file '%q' already exists", request.Proto.Table)
- }
-
- file, err := os.Create(csvPath)
- if err != nil {
- return errors.Wrapf(err, "cannot create file")
- }
-
- defer file.Close()
- if request.Proto.Schema == nil || len(request.Proto.Schema.Fields) == 0 {
- return nil
- }
-
- numFields := len(request.Proto.Schema.Fields)
- names := make([]string, numFields)
- for i, field := range request.Proto.Schema.Fields {
- if field.Name == "" {
- return fmt.Errorf("field %d with no name", i)
- }
-
- names[i] = field.Name
- }
-
- csvWriter := csv.NewWriter(file)
- if err := csvWriter.Write(names); err != nil {
- return errors.Wrapf(err, "cannot create header")
- }
-
- csvWriter.Flush()
- if err := file.Sync(); err != nil {
- return errors.Wrap(err, "cannot flush CSV file")
- }
-
- return nil
-}
-
-// Delete will delete a table
-func (b *Backend) Delete(request *frames.DeleteRequest) error {
-
- err := backends.ValidateRequest("csv", request.Proto, nil)
- if err != nil {
- return err
- }
-
- csvPath := b.csvPath(request.Proto.Table)
- if request.Proto.IfMissing == frames.FailOnError && !fileExists(csvPath) {
- return fmt.Errorf("path to file '%q' doesn't exist", request.Proto.Table)
- }
-
- if err := os.Remove(csvPath); err != nil {
- return errors.Wrapf(err, "cannot delete file '%q'", request.Proto.Table)
- }
-
- return nil
-}
-
-// Read handles reading
-func (b *Backend) Read(request *frames.ReadRequest) (frames.FrameIterator, error) {
-
- err := backends.ValidateRequest("csv", request.Proto, nil)
- if err != nil {
- return nil, err
- }
-
- file, err := os.Open(b.csvPath(request.Proto.Table))
- if err != nil {
- return nil, err
- }
-
- reader := csv.NewReader(file)
- columns, err := reader.Read()
- if err != nil {
- return nil, errors.Wrap(err, "cannot read header (columns)")
- }
-
- it := &FrameIterator{
- logger: b.logger,
- path: request.Proto.Table,
- reader: reader,
- columnNames: columns,
- limit: int(request.Proto.Limit),
- frameLimit: int(request.Proto.MessageLimit),
- }
-
- return it, nil
-}
-
-// Write handles writing
-func (b *Backend) Write(request *frames.WriteRequest) (frames.FrameAppender, error) {
- // TODO: Append?
- file, err := os.Create(b.csvPath(request.Table))
- if err != nil {
- return nil, err
- }
-
- ca := &csvAppender{
- writer: file,
- csvWriter: csv.NewWriter(file),
- logger: b.logger,
- }
-
- if request.ImmidiateData != nil {
- if err := ca.Add(request.ImmidiateData); err != nil {
- return nil, errors.Wrap(err, "cannot add immediate data")
- }
- }
-
- return ca, nil
-
-}
-
-func getInt(r *frames.ExecRequest, name string, defval int) int {
- ival, err := r.Proto.Arg(name)
- if err != nil {
- return defval
- }
-
- val, ok := ival.(int64)
- if !ok {
- return defval
- }
-
- return int(val)
-}
-
-// Exec executes a command
-func (b *Backend) Exec(request *frames.ExecRequest) (frames.Frame, error) {
- if strings.ToLower(request.Proto.Command) == "ping" {
- b.logger.Info("PONG")
- nRows, nCols := getInt(request, "rows", 37), getInt(request, "cols", 4)
- cols := make([]frames.Column, nCols)
- for c := 0; c < nCols; c++ {
- name := fmt.Sprintf("col-%d", c)
- bld := frames.NewSliceColumnBuilder(name, frames.IntType, nRows)
- for r := 0; r < nRows; r++ {
- if err := bld.Set(r, r*c); err != nil {
- b.logger.WarnWith("cannot set column value", "name", name, "row", r)
- }
- }
- cols[c] = bld.Finish()
- }
- return frames.NewFrame(cols, nil, nil)
- }
-
- return nil, fmt.Errorf("CSV backend doesn't support execute command '%q'", request.Proto.Command)
-}
-
-func (b *Backend) csvPath(table string) string {
- return fmt.Sprintf("%s/%s", b.rootDir, table)
-}
-
-// FrameIterator iterates over CSV
-type FrameIterator struct {
- logger logger.Logger
- path string
- reader *csv.Reader
- frame frames.Frame
- err error
- columnNames []string
- nRows int
- limit int
- frameLimit int
-}
-
-// Next reads the next frame, return true of succeeded
-func (it *FrameIterator) Next() bool {
- rows, err := it.readNextRows()
- if err != nil {
- it.logger.ErrorWith("cannot read rows", "error", err)
- it.err = err
- return false
- }
-
- if len(rows) == 0 {
- return false
- }
-
- it.frame, err = it.buildFrame(rows)
- if err != nil {
- it.logger.ErrorWith("cannot build a DataFrames iterator", "error", err)
- it.err = err
- return false
- }
-
- return true
-}
-
-// At return the current Frame
-func (it *FrameIterator) At() frames.Frame {
- return it.frame
-}
-
-// Err returns the last error
-func (it *FrameIterator) Err() error {
- return it.err
-}
-
-func (it *FrameIterator) readNextRows() ([][]string, error) {
- var rows [][]string
- for r := 0; it.inLimits(r); r, it.nRows = r+1, it.nRows+1 {
- row, err := it.reader.Read()
- if err != nil {
- if err == io.EOF {
- it.logger.DebugWith("EOF", "numRows", it.nRows)
- return rows, nil
- }
-
- return nil, err
- }
-
- if len(row) != len(it.columnNames) {
- err := fmt.Errorf("%s (row %d) number of columns doesn't match headers (%d != %d)", it.path, it.nRows, len(row), len(it.columnNames))
- it.logger.ErrorWith("row size mismatch", "error", err, "row", it.nRows)
- return nil, err
- }
-
- rows = append(rows, row)
- }
-
- return rows, nil
-}
-
-func (it *FrameIterator) inLimits(frameRow int) bool {
- if it.limit > 0 && it.nRows >= it.limit {
- return false
- }
-
- if it.frameLimit > 0 && frameRow >= it.frameLimit {
- return false
- }
-
- return true
-}
-
-func (it *FrameIterator) buildFrame(rows [][]string) (frames.Frame, error) {
- columns := make([]frames.Column, len(it.columnNames))
- for c, colName := range it.columnNames {
- var (
- val0 = it.parseValue(rows[0][c])
- col frames.Column
- data interface{}
- err error
- )
-
- switch val0 := val0.(type) {
- case int64:
- typedData := make([]int64, len(rows))
- typedData[0] = val0
- for r, row := range rows[1:] {
- val, ok := it.parseValue(row[c]).(int64)
- if !ok {
- err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c)
- it.logger.ErrorWith("type mismatch", "error", err)
- return nil, err
- }
-
- typedData[r+1] = val // +1 since we start in first row
- }
- data = typedData
- case float64:
- typedData := make([]float64, len(rows))
- typedData[0] = val0
- for r, row := range rows[1:] {
- val, ok := it.parseValue(row[c]).(float64)
- if !ok {
- err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c)
- it.logger.ErrorWith("type mismatch", "error", err)
- return nil, err
- }
-
- typedData[r+1] = val // +1 since we start in first row
- }
- data = typedData
- case string:
- typedData := make([]string, len(rows))
- typedData[0] = val0
- for r, row := range rows[1:] {
- typedData[r+1] = row[c] // +1 since we start in first row
- }
- data = typedData
- case time.Time:
- typedData := make([]time.Time, len(rows))
- typedData[0] = val0
- for r, row := range rows[1:] {
- val, ok := it.parseValue(row[c]).(time.Time)
- if !ok {
- err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c)
- it.logger.ErrorWith("type mismatch", "error", err)
- return nil, err
- }
-
- typedData[r+1] = val // +1 since we start in first row
- }
- data = typedData
- case bool:
- typedData := make([]bool, len(rows))
- typedData[0] = val0
- for r, row := range rows[1:] {
- val, ok := it.parseValue(row[c]).(bool)
- if !ok {
- err := fmt.Errorf("type mismatch in row %d, col %d", it.nRows, c)
- it.logger.ErrorWith("type mismatch", "error", err)
- return nil, err
- }
-
- typedData[r+1] = val // +1 since we start in first row
- }
- data = typedData
- default:
- return nil, fmt.Errorf("%s - unknown type '%T'", colName, val0)
- }
-
- col, err = frames.NewSliceColumn(colName, data)
- if err != nil {
- it.logger.ErrorWith("cannot build column", "error", err, "column", colName)
- return nil, errors.Wrapf(err, "cannot build column '%s'", colName)
- }
-
- columns[c] = col
- }
-
- return frames.NewFrame(columns, nil, nil)
-}
-
-func (it *FrameIterator) parseValue(value string) interface{} {
- // time/date formats
- timeFormats := []string{time.RFC3339, time.RFC3339Nano, "2006-01-02"}
- for _, format := range timeFormats {
- t, err := time.Parse(format, value)
- if err == nil {
- return t
- }
- }
-
- // bool
- switch strings.ToLower(value) {
- case "true":
- return true
- case "false":
- return false
- }
-
- // int
- i, err := strconv.Atoi(value)
- if err == nil {
- return int64(i)
- }
-
- // float
- f, err := strconv.ParseFloat(value, 64)
- if err == nil {
- return f
- }
-
- // Leave as string
- return value
-}
-
-type csvAppender struct {
- logger logger.Logger
- writer io.Writer
- csvWriter *csv.Writer
- headerWritten bool
- closed bool
-}
-
-func (ca *csvAppender) Add(frame frames.Frame) error {
- if ca.closed {
- err := errors.New("Adding on a closed csv appender")
- ca.logger.Error(err)
- return err
- }
- ca.logger.InfoWith("adding frame", "size", frame.Len())
- names := frame.Names()
- if !ca.headerWritten {
- if err := ca.csvWriter.Write(names); err != nil {
- ca.logger.ErrorWith("cannot write header", "error", err)
- return errors.Wrap(err, "cannot write header")
- }
- ca.headerWritten = true
- }
-
- for r := 0; r < frame.Len(); r++ {
- record := make([]string, len(names))
- for c, name := range names {
- col, err := frame.Column(name)
- if err != nil {
- ca.logger.ErrorWith("cannot get column", "error", err)
- return errors.Wrap(err, "cannot get column")
- }
-
- val, err := utils.ColAt(col, r)
- if err != nil {
- ca.logger.ErrorWith("cannot get value", "error", err, "name", name, "row", r)
- return errors.Wrapf(err, "%s:%d cannot get value", name, r)
- }
-
- record[c] = fmt.Sprintf("%v", val)
- }
-
- if err := ca.csvWriter.Write(record); err != nil {
- ca.logger.ErrorWith("cannot write record", "error", err)
- return errors.Wrap(err, "cannot write record")
- }
- }
-
- return nil
-}
-
-// File Sync
-type syncer interface {
- Sync() error
-}
-
-// WaitForComplete wait for write completion
-func (ca *csvAppender) WaitForComplete(timeout time.Duration) error {
- if ca.closed {
- err := errors.New("Adding on a closed csv appender")
- ca.logger.Error(err)
- return err
- }
- ca.csvWriter.Flush()
- if err := ca.csvWriter.Error(); err != nil {
- ca.logger.ErrorWith("CSV flush", "error", err)
- return err
- }
-
- if s, ok := ca.writer.(syncer); ok {
- return s.Sync()
- }
-
- return nil
-}
-
-func (ca *csvAppender) Close() {
- ca.closed = true
-}
-
-func fileExists(path string) bool {
- _, err := os.Stat(path)
- return err == nil
-}
-
-func init() {
- if err := backends.Register("csv", NewBackend); err != nil {
- panic(err)
- }
-}
diff --git a/backends/csv/backend_test.go b/backends/csv/backend_test.go
deleted file mode 100644
index 4b414bdc..00000000
--- a/backends/csv/backend_test.go
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
-Copyright 2018 Iguazio Systems Ltd.
-
-Licensed under the Apache License, Version 2.0 (the "License") with
-an addition restriction as set forth herein. You may not use this
-file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0.
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-implied. See the License for the specific language governing
-permissions and limitations under the License.
-
-In addition, you may not use the software for any purposes that are
-illegal under applicable law, and the grant of the foregoing license
-under the Apache 2.0 license is conditioned upon your compliance with
-such restriction.
-*/
-
-package csv
-
-import (
- "os"
- "path"
- "testing"
-
- "github.com/v3io/frames"
- "github.com/v3io/frames/pb"
-)
-
-var (
- csvData = []byte(`STATION,DATE,PRCP,SNWD,SNOW,TMAX,TMIN,AWND,WDF2,WDF5,WSF2,WSF5,PGTM,FMTM
-GHCND:USW00094728,2000-01-01,0,-9999,0,100,11,26,250,230,72,94,1337,1337.7
-GHCND:USW00094728,2000-01-02,0,-9999,0,156,61,21,260,260,72,112,2313,2314.7
-GHCND:USW00094728,2000-01-03,0,-9999,0,178,106,30,260,250,67,94,320,321.7
-GHCND:USW00094728,2000-01-04,178,-9999,0,156,78,35,320,350,67,107,1819,1840.7
-GHCND:USW00094728,2000-01-05,0,-9999,0,83,-17,51,330,340,107,143,843,844.7
-GHCND:USW00094728,2000-01-06,0,-9999,0,56,-22,30,220,250,67,98,1833,1834.7
-GHCND:USW00094728,2000-01-07,0,-9999,0,94,17,42,300,310,103,156,1521,1601.7
-GHCND:USW00094728,2000-01-09,5,-9999,0,106,28,26,270,270,63,89,22,601.7
-GHCND:USW00094728,2000-01-10,213,-9999,0,144,67,41,280,260,94,139,1736,1758.7
-GHCND:USW00094728,2000-01-11,0,-9999,0,111,44,49,300,310,112,174,1203,1203.7
-GHCND:USW00094728,2000-01-12,0,-9999,0,83,39,39,330,330,94,161,536,610.7
-GHCND:USW00094728,2000-01-13,13,-9999,0,39,-78,51,90,10,103,143,1539,843.7
-`)
- numCSVRows = 12
- numCSVCols = 14
- colTypes = map[string]frames.DType{
- "STATION": frames.StringType,
- "DATE": frames.TimeType,
- "PRCP": frames.IntType,
- "SNWD": frames.IntType,
- "FMTM": frames.FloatType,
- }
-)
-
-func TestCSV(t *testing.T) {
- req := &frames.ReadRequest{Proto: &pb.ReadRequest{}}
- result := loadTempCSV(t, req)
-
- nRows := totalRows(result)
- if nRows != numCSVRows {
- t.Fatalf("# rows mismatch %d != %d", nRows, numCSVRows)
- }
-
- for _, frame := range result {
- if len(frame.Names()) != numCSVCols {
- t.Fatalf("# columns mismatch %d != %d", len(frame.Names()), numCSVRows)
- }
-
- for name, dtype := range colTypes {
- col, err := frame.Column(name)
- if err != nil {
- t.Fatalf("can't find column %q", name)
- }
-
- if col.DType() != dtype {
- t.Fatalf("dype mismatch %d != %d", dtype, col.DType())
- }
- }
- }
-
-}
-
-func TestLimit(t *testing.T) {
- limit := numCSVRows - 3
-
- req := &frames.ReadRequest{Proto: &pb.ReadRequest{}}
- req.Proto.Limit = int64(limit)
-
- result := loadTempCSV(t, req)
- if nRows := totalRows(result); nRows != limit {
- t.Fatalf("got %d rows, expected %d", nRows, limit)
- }
-}
-
-func TestMaxInMessage(t *testing.T) {
- frameLimit := numCSVRows / 3
-
- req := &frames.ReadRequest{Proto: &pb.ReadRequest{}}
- req.Proto.MessageLimit = int64(frameLimit)
-
- result := loadTempCSV(t, req)
- if nRows := totalRows(result); nRows != numCSVRows {
- t.Fatalf("got %d rows, expected %d", nRows, numCSVRows)
- }
-
- for _, frame := range result {
- if frame.Len() > frameLimit {
- t.Fatalf("frame too big (%d > %d)", frame.Len(), frameLimit)
- }
- }
-}
-
-func totalRows(result []frames.Frame) int {
- total := 0
- for _, frame := range result {
- total += frame.Len()
- }
-
- return total
-}
-
-func loadTempCSV(t *testing.T, req *frames.ReadRequest) []frames.Frame {
- logger, err := frames.NewLogger("debug")
- if err != nil {
- t.Fatalf("can't create logger - %s", err)
- }
-
- csvPath, err := tmpCSV()
- if err != nil {
- t.Fatal(err)
- }
-
- cfg := &frames.BackendConfig{
- Name: "testCsv",
- Type: "csv",
- RootDir: path.Dir(csvPath),
- }
-
- backend, err := NewBackend(logger, nil, cfg, nil)
- if err != nil {
- t.Fatal(err)
- }
-
- req.Proto.Table = path.Base(csvPath)
- it, err := backend.Read(req)
- if err != nil {
- t.Fatal(err)
- }
-
- var result []frames.Frame
- for it.Next() {
- result = append(result, it.At())
- }
-
- if err := it.Err(); err != nil {
- t.Fatal(err)
- }
-
- return result
-}
-
-func tmpCSV() (string, error) {
- tmp, err := os.CreateTemp("", "csv-test")
- if err != nil {
- return "", err
- }
-
- _, err = tmp.Write(csvData)
- if err != nil {
- return "", nil
- }
-
- if err := tmp.Sync(); err != nil {
- return "", err
- }
-
- return tmp.Name(), nil
-}
diff --git a/clients/py/tests/conftest.py b/clients/py/tests/conftest.py
index 10cc649a..1ff573e5 100644
--- a/clients/py/tests/conftest.py
+++ b/clients/py/tests/conftest.py
@@ -45,7 +45,7 @@
{'type': 'tsdb', 'workers': 16},
]
-test_backends = ['csv']
+test_backends = []
if has_session:
test_backends.extend(backend['type'] for backend in extra_backends)
@@ -101,12 +101,7 @@ def framesd():
'log': {
'level': 'debug',
},
- 'backends': [
- {
- 'type': 'csv',
- 'rootDir': root_dir,
- },
- ]
+ 'backends': []
}
if has_session:
diff --git a/clients/py/tests/pip_docker.py b/clients/py/tests/pip_docker.py
deleted file mode 100644
index 2b477481..00000000
--- a/clients/py/tests/pip_docker.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Copyright 2018 Iguazio
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Code used by test_pip_docker
-
-from argparse import ArgumentParser
-
-import v3io_frames as v3f
-
-parser = ArgumentParser()
-parser.add_argument('--grpc-port', default='8081')
-parser.add_argument('--http-port', default='8080')
-args = parser.parse_args()
-
-client = v3f.Client('localhost:{}'.format(args.grpc_port))
-df = client.read('csv', table='weather.csv')
-assert len(df) > 0, 'empty df'
diff --git a/clients/py/tests/test_benchmark.py b/clients/py/tests/test_benchmark.py
deleted file mode 100644
index 565c93d4..00000000
--- a/clients/py/tests/test_benchmark.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import pytest
-
-import v3io_frames as v3f
-from conftest import has_go
-
-from test_integration import integ_params, csv_df
-
-wdf = csv_df(1982)
-
-
-def read_benchmark(client):
- for df in client.read('csv', 'weather.csv'):
- assert len(df), 'empty df'
-
-
-def write_benchmark(client, df):
- client.write('csv', 'write-bench.csv', df)
-
-
-@pytest.mark.skipif(not has_go, reason='Go SDK not found')
-@pytest.mark.parametrize('protocol,backend', integ_params)
-def test_read(benchmark, framesd, protocol, backend):
- addr = getattr(framesd, '{}_addr'.format(protocol))
- client = v3f.Client(addr)
- benchmark(read_benchmark, client)
-
-
-@pytest.mark.skipif(not has_go, reason='Go SDK not found')
-@pytest.mark.parametrize('protocol,backend', integ_params)
-def test_write(benchmark, framesd, protocol, backend):
- addr = getattr(framesd, '{}_addr'.format(protocol))
- client = v3f.Client(addr)
- benchmark(write_benchmark, client, wdf)
diff --git a/clients/py/tests/test_cudf.py b/clients/py/tests/test_cudf.py
index 1a79e689..f1b53318 100644
--- a/clients/py/tests/test_cudf.py
+++ b/clients/py/tests/test_cudf.py
@@ -12,12 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from time import sleep, time
-
import pandas as pd
import pytest
import v3io_frames as v3f
-from conftest import has_go
from conftest import test_backends
try:
@@ -28,27 +25,6 @@
has_cudf = False
-@pytest.mark.skipif(not has_cudf, reason='cudf not found')
-@pytest.mark.skipif(not has_go, reason='Go SDK not found')
-def test_cudf(framesd, session):
- df = cudf.DataFrame({
- 'a': [1, 2, 3],
- 'b': [1.1, 2.2, 3.3],
- })
-
- c = v3f.Client(framesd.grpc_addr, frame_factory=cudf.DataFrame)
- backend = 'csv'
- table = 'cudf-{}'.format(int(time()))
- print('table = {}'.format(table))
-
- c.write(backend, table, [df])
- sleep(1) # Let db flush
- rdf = c.read(backend, table=table)
- assert isinstance(rdf, cudf.DataFrame), 'not a cudf.DataFrame'
- assert len(rdf) == len(df), 'wrong frame size'
- assert set(rdf.columns) == set(df.columns), 'columns mismatch'
-
-
@pytest.mark.skipif(not has_cudf, reason='cudf not found')
def test_concat_categorical():
df1 = cudf.DataFrame({'a': range(10, 13), 'b': range(50, 53)})
diff --git a/clients/py/tests/test_pip_docker.py b/clients/py/tests/test_pip_docker.py
deleted file mode 100644
index cfec2d51..00000000
--- a/clients/py/tests/test_pip_docker.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Copyright 2018 Iguazio
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-from contextlib import contextmanager
-from shutil import copy
-from subprocess import PIPE, run
-from sys import executable
-from tempfile import mkdtemp
-
-import pytest
-
-from conftest import here, is_travis
-
-config = '''
-log:
- level: debug
-
-backends:
- - type: "csv"
- rootDir: "/csv-root"
-'''
-
-docker_image = 'quay.io/v3io/frames:unstable'
-
-
-# docker run \
-# -v /path/to/config.yaml:/etc/framesd.yaml \
-# quay.io/v3io/frames
-
-
-def docker_ports(cid):
- out = run(['docker', 'inspect', cid], stdout=PIPE, check=True)
- obj = json.loads(out.stdout.decode('utf-8'))[0]
- grpc_port = obj['NetworkSettings']['Ports']['8081/tcp'][0]['HostPort']
- http_port = obj['NetworkSettings']['Ports']['8080/tcp'][0]['HostPort']
-
- return grpc_port, http_port
-
-
-@contextmanager
-def docker(tmp, cfg_file):
- cmd = [
- 'docker', 'run',
- '-d',
- '-v', '{}:/csv-root'.format(tmp),
- '-v', '{}:/etc/framesd.yaml'.format(cfg_file),
- '-p', '8080',
- '-p', '8081',
- docker_image,
- ]
- proc = run(cmd, stdout=PIPE, check=True)
- cid = proc.stdout.decode('utf-8').strip()
- grpc_port, http_port = docker_ports(cid)
- try:
- yield grpc_port, http_port
- finally:
- run(['docker', 'rm', '-f', cid])
-
-
-@pytest.mark.skipif(not is_travis, reason='integration test')
-def test_pip_docker():
- tmp = mkdtemp()
- run(['virtualenv', '-p', executable, tmp], check=True)
- python = '{}/bin/python'.format(tmp)
- # Run in different directoy so local v3io_frames won't be used
- run([python, '-m', 'pip', 'install', 'v3io_frames'], check=True, cwd=tmp)
- run(['docker', 'pull', docker_image], check=True)
-
- cfg_file = '{}/framesd.yaml'.format(tmp)
- with open(cfg_file, 'w') as out:
- out.write(config)
-
- copy('{}/weather.csv'.format(here), tmp)
- with docker(tmp, cfg_file) as (grpc_port, http_port):
- cmd = [
- python, '{}/pip_docker.py'.format(here),
- '--grpc-port', grpc_port,
- '--http-port', http_port,
- ]
- # Run in different directoy so local v3io_frames won't be used
- run(cmd, check=True, cwd=tmp)
diff --git a/clients/py/v3io_frames/client.py b/clients/py/v3io_frames/client.py
index 351b9e5b..05159170 100644
--- a/clients/py/v3io_frames/client.py
+++ b/clients/py/v3io_frames/client.py
@@ -198,7 +198,7 @@ def read(self, backend, table='', query='', columns=None, filter='',
Common Parameters
----------
backend (Required) : str
- Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests)
+ Backend name - 'nosql'/'kv' | 'tsdb' | 'stream'
table : str
Path to the collection to query; ignored when the path is set in
the `query` parameter (currently supported for the 'tsdb' backend)
@@ -261,7 +261,7 @@ def write(self, backend, table, dfs, expression='', condition='',
Parameters
----------
backend (Required) : str
- Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests)
+ Backend name - 'nosql'/'kv' | 'tsdb' | 'stream'
table (Required) : str
Path to the collection to write
dfs (Required) : a single DataFrame (DF), a DF list, or a DF iterator
@@ -327,11 +327,11 @@ def create(self, backend, table, schema=None, if_exists=FAIL, **kw):
Parameters
----------
backend (Required) : str
- Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests)
+ Backend name - 'nosql'/'kv' | 'tsdb' | 'stream'
table (Required) : str
Table to create
- schema (Optional) : Backend-specific data schema or None
- Table schema; used for testing purposes with the 'csv' backend
+ schema (defunct) : Backend-specific data schema or None
+ Table schema; formerly used for testing purposes with the 'csv' backend
if_exists (Optional) : int (frames_pb2 pb.ErrorOptions)
Determines the behavior when the specified collection already
exists - `FAIL` (default) to raise an error or `IGNORE` to ignore
@@ -353,7 +353,7 @@ def delete(self, backend, table, filter='', start='', end='',
Parameters
----------
backend (Required) : str
- Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests)
+ Backend name - 'nosql'/'kv' | 'tsdb' | 'stream'
table (Required) : str
Path to the collection to delete or from which to delete items
filter (Optional) : str
@@ -396,7 +396,7 @@ def execute(self, backend, table, command='', args=None):
Parameters
----------
backend (Required) : str
- Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests)
+ Backend name - 'nosql'/'kv' | 'tsdb' | 'stream'
table (Required) : str
Path to the collection on which to execute the specified command
command (Required) : str
@@ -425,7 +425,7 @@ def history(self, backend='', container='', table='', user='', action='', min_st
Parameters
----------
backend (Optional) : str
- Filter by Backend name - 'nosql'/'kv' | 'tsdb' | 'stream' | 'csv' (for tests)
+ Filter by Backend name - 'nosql'/'kv' | 'tsdb' | 'stream'
container (Optional) : str
Filter by associated v3io container
table (Optional) : str
diff --git a/config.go b/config.go
index 4b0022d3..6eba3611 100644
--- a/config.go
+++ b/config.go
@@ -134,7 +134,7 @@ func (c *Config) Validate() error {
// BackendConfig is default backend configuration
type BackendConfig struct {
- Type string `json:"type"` // v3io, csv, ...
+ Type string `json:"type"` // v3io, ...
Name string `json:"name"`
Workers int `json:"workers"`
UpdateWorkersPerVN int `json:"updateWorkersPerVN"`
@@ -223,7 +223,7 @@ func initBackendDefaults(cfg *BackendConfig, framesConfig *Config) {
if cfg.V3ioGoWorkers == 0 {
switch cfg.Name {
- case "csv", "stream":
+ case "stream":
cfg.V3ioGoWorkers = 256
default:
cfg.V3ioGoWorkers = 1024
diff --git a/config_example.yaml b/config_example.yaml
index c40e3124..fe8634ae 100644
--- a/config_example.yaml
+++ b/config_example.yaml
@@ -11,5 +11,3 @@ backends:
- type: "stream"
- type: "tsdb"
workers: 16
- - type: "csv"
- rootdir: "/mnt/csvroot"
diff --git a/grpc/end2end_test.go b/grpc/end2end_test.go
index 114b05df..6c580028 100644
--- a/grpc/end2end_test.go
+++ b/grpc/end2end_test.go
@@ -23,7 +23,6 @@ package grpc_test
import (
"fmt"
"net"
- "os"
"reflect"
"testing"
"time"
@@ -34,11 +33,6 @@ import (
)
func TestEnd2End(t *testing.T) {
- tmpDir, err := os.MkdirTemp("", "frames-grpc-e2e")
- if err != nil {
- t.Fatal(err)
- }
-
backendName := "e2e-backend"
cfg := &frames.Config{
Log: frames.LogConfig{
@@ -46,9 +40,8 @@ func TestEnd2End(t *testing.T) {
},
Backends: []*frames.BackendConfig{
{
- Name: backendName,
- Type: "csv",
- RootDir: tmpDir,
+ Name: backendName,
+ Type: "kv",
},
},
}
@@ -81,8 +74,9 @@ func TestEnd2End(t *testing.T) {
tableName := "e2e"
writeReq := &frames.WriteRequest{
- Backend: backendName,
- Table: tableName,
+ Backend: backendName,
+ Table: tableName,
+ SaveMode: frames.OverwriteTable,
}
appender, err := client.Write(writeReq)
@@ -126,17 +120,6 @@ func TestEnd2End(t *testing.T) {
if nRows != frame.Len() {
t.Fatalf("# of rows mismatch - %d != %d", nRows, frame.Len())
}
-
- // Exec
- execReq := &pb.ExecRequest{
- Backend: backendName,
- Table: tableName,
- Command: "ping",
- }
-
- if _, err := client.Exec(execReq); err != nil {
- t.Fatalf("can't exec - %s", err)
- }
}
func makeFrame() (frames.Frame, error) {
diff --git a/http/end2end_test.go b/http/end2end_test.go
index 084fa5f1..2380bff3 100644
--- a/http/end2end_test.go
+++ b/http/end2end_test.go
@@ -25,7 +25,6 @@ import (
"fmt"
"net"
nhttp "net/http"
- "os"
"reflect"
"testing"
"time"
@@ -36,11 +35,6 @@ import (
)
func TestEnd2End(t *testing.T) {
- tmpDir, err := os.MkdirTemp("", "frames-e2e")
- if err != nil {
- t.Fatal(err)
- }
-
backendName := "e2e-backend"
cfg := &frames.Config{
Log: frames.LogConfig{
@@ -48,9 +42,8 @@ func TestEnd2End(t *testing.T) {
},
Backends: []*frames.BackendConfig{
{
- Name: backendName,
- Type: "csv",
- RootDir: tmpDir,
+ Name: backendName,
+ Type: "kv",
},
},
}
@@ -84,8 +77,9 @@ func TestEnd2End(t *testing.T) {
tableName := "e2e"
writeReq := &frames.WriteRequest{
- Backend: backendName,
- Table: tableName,
+ Backend: backendName,
+ Table: tableName,
+ SaveMode: frames.OverwriteTable,
}
appender, err := client.Write(writeReq)
@@ -131,17 +125,6 @@ func TestEnd2End(t *testing.T) {
}
testGrafana(t, url, backendName, tableName)
-
- // Exec
- execReq := &pb.ExecRequest{
- Backend: backendName,
- Table: tableName,
- Command: "ping",
- }
-
- if _, err := client.Exec(execReq); err != nil {
- t.Fatalf("can't exec - %s", err)
- }
}
func testGrafana(t *testing.T, baseURL string, backend string, table string) {
diff --git a/http/server_example_test.go b/http/server_example_test.go
index 2f4665b1..37509063 100644
--- a/http/server_example_test.go
+++ b/http/server_example_test.go
@@ -34,8 +34,6 @@ log:
backends:
- type: "kv"
- - type: "csv"
- rootDir = "/tmp"
`)
func ExampleServer() {
diff --git a/http/server_test.go b/http/server_test.go
index ecb7fb14..464ddb5c 100644
--- a/http/server_test.go
+++ b/http/server_test.go
@@ -38,7 +38,7 @@ func createServer() (*Server, error) {
Backends: []*frames.BackendConfig{
{
Name: "weather",
- Type: "csv",
+ Type: "kv",
},
},
}
diff --git a/integration_test.go b/integration_test.go
index 73dbf8c4..bdec7df5 100644
--- a/integration_test.go
+++ b/integration_test.go
@@ -106,12 +106,7 @@ func sessionInfo(t testing.TB) *frames.Session {
}
func genConfig(root string, session *frames.Session) *frames.Config {
- backends := []*frames.BackendConfig{
- {
- Type: "csv",
- RootDir: root,
- },
- }
+ var backends []*frames.BackendConfig
if session != nil {
backends = append(backends, &frames.BackendConfig{
diff --git a/test/basic_suite_integration_test.go b/test/basic_suite_integration_test.go
index 9ac973e2..684df6dd 100644
--- a/test/basic_suite_integration_test.go
+++ b/test/basic_suite_integration_test.go
@@ -32,7 +32,6 @@ var (
kvSuites = []SuiteCreateFunc{GetKvTestsConstructorFunc()}
tsdbSuites = []SuiteCreateFunc{GetTsdbTestsConstructorFunc()}
streamSuites = []SuiteCreateFunc{GetStreamTestsConstructorFunc()}
- csvSuites = []SuiteCreateFunc{GetCsvTestsConstructorFunc()}
)
type testInfo struct {
@@ -96,13 +95,6 @@ func (mainSuite *mainTestSuite) TestStreamBackend() {
mainSuite.runSubSuites(streamSuites)
}
-func (mainSuite *mainTestSuite) TestCSVBackend() {
- if !strings.Contains(mainSuite.info.backendsToTest, "csv") {
- mainSuite.T().Skip("skipping csv backend tests")
- }
- mainSuite.runSubSuites(csvSuites)
-}
-
func (mainSuite *mainTestSuite) runSubSuites(suites []SuiteCreateFunc) {
for _, currSuite := range suites {
// Run both Grpc and Http tests
@@ -193,12 +185,7 @@ func sessionInfo(t testing.TB) *frames.Session {
}
func generateConfig(root string, session *frames.Session) *frames.Config {
- backends := []*frames.BackendConfig{
- {
- Type: "csv",
- RootDir: root,
- },
- }
+ var backends []*frames.BackendConfig
if session != nil {
backends = append(backends, &frames.BackendConfig{
@@ -297,7 +284,7 @@ func setupTest(t testing.TB, internalLogger logger.Logger) *testInfo {
info.debugMode = strings.ToLower(os.Getenv("DEBUG")) == "true"
info.backendsToTest = os.Getenv("TEST_BACKENDS")
if info.backendsToTest == "" {
- info.backendsToTest = "kv,tsdb,stream,csv"
+ info.backendsToTest = "kv,tsdb,stream"
}
info.root = setupRoot(t)
diff --git a/test/csv_integration_test.go b/test/csv_integration_test.go
deleted file mode 100644
index d3627e0a..00000000
--- a/test/csv_integration_test.go
+++ /dev/null
@@ -1,131 +0,0 @@
-package test
-
-import (
- "fmt"
- "math/rand"
- "testing"
- "time"
-
- "github.com/nuclio/logger"
- "github.com/stretchr/testify/suite"
- "github.com/v3io/frames"
- "github.com/v3io/frames/pb"
- v3io "github.com/v3io/v3io-go/pkg/dataplane"
-)
-
-var (
- random = rand.New(rand.NewSource(time.Now().Unix()))
- size = 200
-)
-
-type CsvTestSuite struct {
- suite.Suite
- tablePath string
- suiteTimestamp int64
- basicQueryTime int64
- client frames.Client
- backendName string
-}
-
-func GetCsvTestsConstructorFunc() SuiteCreateFunc {
- return func(client frames.Client, _ v3io.Container, _ logger.Logger) suite.TestingSuite {
- return &CsvTestSuite{client: client, backendName: "csv"}
- }
-}
-
-func (csvSuite *CsvTestSuite) SetupSuite() {
- csvSuite.Require().NotNil(csvSuite.client, "client not set")
-}
-
-func (csvSuite *CsvTestSuite) generateSampleFrame(t testing.TB) frames.Frame {
- var (
- columns []frames.Column
- col frames.Column
- err error
- )
-
- bools := make([]bool, size)
- for i := range bools {
- if random.Float64() < 0.5 {
- bools[i] = true
- }
- }
- col, err = frames.NewSliceColumn("bools", bools)
- csvSuite.Require().NoError(err)
- columns = append(columns, col)
-
- col = FloatCol(t, "floats", size)
- columns = append(columns, col)
-
- ints := make([]int64, size)
- for i := range ints {
- ints[i] = random.Int63()
- }
- col, err = frames.NewSliceColumn("ints", ints)
- csvSuite.Require().NoError(err)
- columns = append(columns, col)
-
- col = StringCol(t, "strings", size)
- columns = append(columns, col)
-
- times := make([]time.Time, size)
- for i := range times {
- times[i] = time.Now().Add(time.Duration(i) * time.Second)
- }
- col, err = frames.NewSliceColumn("times", times)
- csvSuite.Require().NoError(err)
- columns = append(columns, col)
-
- frame, err := frames.NewFrame(columns, nil, nil)
- csvSuite.Require().NoError(err)
-
- return frame
-}
-
-func (csvSuite *CsvTestSuite) TestAll() {
- table := fmt.Sprintf("csv_test_all%d", time.Now().UnixNano())
-
- csvSuite.T().Log("write")
- frame := csvSuite.generateSampleFrame(csvSuite.T())
- wreq := &frames.WriteRequest{
- Backend: csvSuite.backendName,
- Table: table,
- }
-
- appender, err := csvSuite.client.Write(wreq)
- csvSuite.Require().NoError(err)
-
- err = appender.Add(frame)
- csvSuite.Require().NoError(err)
-
- err = appender.WaitForComplete(10 * time.Second)
- csvSuite.Require().NoError(err)
-
- time.Sleep(3 * time.Second) // Let DB sync
-
- csvSuite.T().Log("read")
- rreq := &pb.ReadRequest{
- Backend: csvSuite.backendName,
- Table: table,
- }
-
- it, err := csvSuite.client.Read(rreq)
- csvSuite.Require().NoError(err)
-
- for it.Next() {
- // TODO: More checks
- fr := it.At()
- csvSuite.Require().Contains([]int{fr.Len(), fr.Len() - 1}, frame.Len(), "wrong length")
- }
-
- csvSuite.Require().NoError(it.Err())
-
- csvSuite.T().Log("delete")
- dreq := &pb.DeleteRequest{
- Backend: csvSuite.backendName,
- Table: table,
- }
-
- err = csvSuite.client.Delete(dreq)
- csvSuite.Require().NoError(err)
-}