stream

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2025 License: MIT Imports: 8 Imported by: 2

Documentation

Index

Examples

Constants

View Source
const (
	DESC = true
	ASC  = false
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggeregator added in v0.0.2

type Aggeregator interface {
	Apply(e []Event) []Event
	String() string
}

type And added in v0.0.2

type And struct {
	Lhs Where
	Rhs Where
}

func (And) Apply added in v0.0.2

func (w And) Apply(input any) bool

func (And) String added in v0.0.2

func (w And) String() string

type Average

type Average struct {
	Name string
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	avg := &stream.Average{Name: "Level"}
	out := avg.Apply(e)

	fmt.Println(out[len(out)-1].ResultSet)

}
Output:

[4.5]

func (Average) Apply

func (s Average) Apply(e []Event) []Event

func (Average) String

func (s Average) String() string

type Count

type Count struct {
	Name string
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	c := &stream.Count{Name: "Level"}
	out := c.Apply(e)

	fmt.Println(out[len(out)-1].ResultSet)

}
Output:

[10]

func (Count) Apply

func (s Count) Apply(e []Event) []Event

func (Count) String

func (s Count) String() string

type Distinct

type Distinct struct {
	Name string
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	e = append(e, stream.NewEvent(LogEvent{Level: 0}))
	e = append(e, stream.NewEvent(LogEvent{Level: 0}))
	e = append(e, stream.NewEvent(LogEvent{Level: 1}))
	e = append(e, stream.NewEvent(LogEvent{Level: 1}))
	e = append(e, stream.NewEvent(LogEvent{Level: 2}))
	e = append(e, stream.NewEvent(LogEvent{Level: 2}))
	e = append(e, stream.NewEvent(LogEvent{Level: 2}))
	e = append(e, stream.NewEvent(LogEvent{Level: 2}))

	d := &stream.Distinct{Name: "Level"}
	out := d.Apply(e)

	fmt.Println(len(out))
}
Output:

3

func (Distinct) Apply

func (s Distinct) Apply(e []Event) []Event

func (Distinct) String

func (s Distinct) String() string

type Equal added in v0.0.2

type Equal struct {
	Name  string
	Value any
}

func (Equal) Apply added in v0.0.2

func (w Equal) Apply(input any) bool

func (Equal) String added in v0.0.2

func (w Equal) String() string

type Event

type Event struct {
	Time       time.Time `json:"time"`
	Underlying any       `json:"underlying"`
	ResultSet  []any     `json:"result_set"`
}

func NewEvent

func NewEvent(input any) Event

type From

type From struct {
	Type any
}

func (From) Apply

func (w From) Apply(input any) bool

func (From) String

func (w From) String() string

type LargerThan

type LargerThan struct {
	Name  string
	Value any
}

func (LargerThan) Apply

func (w LargerThan) Apply(input any) bool

func (LargerThan) String

func (w LargerThan) String() string

type Length

type Length struct {
	Length int
}

func (*Length) Apply

func (w *Length) Apply(e []Event) []Event

func (*Length) String

func (w *Length) String() string

type LengthBatch

type LengthBatch struct {
	Length int
	Batch  []Event
}

func (*LengthBatch) Apply

func (w *LengthBatch) Apply(e []Event) []Event

func (*LengthBatch) String

func (w *LengthBatch) String() string

type LessThan

type LessThan struct {
	Name  string
	Value any
}

func (LessThan) Apply

func (w LessThan) Apply(input any) bool

func (LessThan) String

func (w LessThan) String() string

type Limit

type Limit struct {
	Offset int
	Limit  int
}

func (*Limit) Apply

func (l *Limit) Apply(e []Event) []Event

func (*Limit) String

func (l *Limit) String() string

type Limiter added in v0.0.2

type Limiter interface {
	Apply(e []Event) []Event
	String() string
}

type Max

type Max struct {
	Name string
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	m := &stream.Max{Name: "Level"}
	out := m.Apply(e)

	fmt.Println(out[len(out)-1].ResultSet)

}
Output:

[9]

func (Max) Apply

func (s Max) Apply(e []Event) []Event

func (Max) String

func (s Max) String() string

type Min

type Min struct {
	Name string
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	m := &stream.Min{Name: "Level"}
	out := m.Apply(e)

	fmt.Println(out[len(out)-1].ResultSet)

}
Output:

[0]

func (Min) Apply

func (s Min) Apply(e []Event) []Event

func (Min) String

func (s Min) String() string

type NoLimit

type NoLimit struct{}

func (*NoLimit) Apply

func (l *NoLimit) Apply(e []Event) []Event

func (*NoLimit) String

func (l *NoLimit) String() string

type NoOrder

type NoOrder struct{}

func (*NoOrder) Apply

func (o *NoOrder) Apply(e []Event) []Event

func (*NoOrder) String

func (o *NoOrder) String() string

type NotEqual added in v0.0.2

type NotEqual struct {
	Name  string
	Value any
}

func (NotEqual) Apply added in v0.0.2

func (w NotEqual) Apply(input any) bool

func (NotEqual) String added in v0.0.2

func (w NotEqual) String() string

type OrderBy

type OrderBy struct {
	Name  string
	Index int
	Desc  bool
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	o := &stream.OrderBy{
		Name:  "Level",
		Index: 0,
		Desc:  false,
	}

	out := o.Apply(e)
	for _, ev := range out {
		fmt.Print(ev.Underlying)
	}

}
Output:

{0}{1}{2}{3}{4}{5}{6}{7}{8}{9}
Example (Desc)
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	o := &stream.OrderBy{
		Name:  "Level",
		Index: 0,
		Desc:  true,
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	out := o.Apply(e)
	for _, ev := range out {
		fmt.Print(ev.Underlying)
	}

}
Output:

{9}{8}{7}{6}{5}{4}{3}{2}{1}{0}

func (*OrderBy) Apply

func (o *OrderBy) Apply(e []Event) []Event

func (*OrderBy) String

func (o *OrderBy) String() string

type Select

type Select struct {
	Name string
}

func (Select) Apply

func (s Select) Apply(e []Event) []Event

func (Select) String

func (s Select) String() string

type SelectAll

type SelectAll struct{}

func (SelectAll) Apply

func (s SelectAll) Apply(e []Event) []Event

func (SelectAll) String

func (s SelectAll) String() string

type Selector added in v0.0.2

type Selector interface {
	Apply(e []Event) []Event
	String() string
}

type Sorter added in v0.0.2

type Sorter interface {
	Apply(e []Event) []Event
	String() string
}

type Stream

type Stream struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"fmt"
	"time"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Time    time.Time
		Level   int
		Message string
	}

	s := stream.New().
		SelectAll().
		From(LogEvent{}).
		Length(10).
		OrderBy("Level", true).
		Limit(10, 5)

	fmt.Println(s)

}
Output:

SELECT * FROM LogEvent.LENGTH(10) ORDER BY Level DESC LIMIT 10 OFFSET 5

func New

func New() *Stream

func (*Stream) Average

func (s *Stream) Average(name string) *Stream

func (*Stream) Close

func (s *Stream) Close() error

func (*Stream) Count

func (s *Stream) Count(name string) *Stream

func (*Stream) Distinct

func (s *Stream) Distinct(name string) *Stream

func (*Stream) Equals

func (s *Stream) Equals(name string, value any) *Stream

func (*Stream) From

func (s *Stream) From(typ any) *Stream

func (*Stream) Input

func (s *Stream) Input() chan any

func (*Stream) IsClosed

func (s *Stream) IsClosed() bool

func (*Stream) LargerThan

func (s *Stream) LargerThan(name string, value any) *Stream

func (*Stream) Length

func (s *Stream) Length(length int) *Stream

func (*Stream) LengthBatch

func (s *Stream) LengthBatch(length int) *Stream

func (*Stream) LessThan

func (s *Stream) LessThan(name string, value any) *Stream

func (*Stream) Limit

func (s *Stream) Limit(limit, offset int) *Stream

func (*Stream) Listen

func (s *Stream) Listen(input any)

func (*Stream) Max

func (s *Stream) Max(name string) *Stream

func (*Stream) Min

func (s *Stream) Min(name string) *Stream

func (*Stream) OrderBy

func (s *Stream) OrderBy(name string, desc bool) *Stream

func (*Stream) Output

func (s *Stream) Output() chan []Event

func (*Stream) Run

func (s *Stream) Run()

func (*Stream) Select

func (s *Stream) Select(name string) *Stream

func (*Stream) SelectAll

func (s *Stream) SelectAll() *Stream

func (*Stream) String

func (s *Stream) String() string

func (*Stream) Sum

func (s *Stream) Sum(name string) *Stream

func (*Stream) Time

func (s *Stream) Time(expire time.Duration, unit lexer.Token) *Stream

func (*Stream) TimeBatch

func (s *Stream) TimeBatch(expire time.Duration, unit lexer.Token) *Stream

func (*Stream) Update

func (s *Stream) Update(input any)

type Sum

type Sum struct {
	Name string
}
Example
package main

import (
	"fmt"

	"github.com/itsubaki/gostream/stream"
)

func main() {
	type LogEvent struct {
		Level int
	}

	e := make([]stream.Event, 0)
	for i := 0; i < 10; i++ {
		e = append(e, stream.NewEvent(LogEvent{
			Level: i,
		}))
	}

	s := &stream.Sum{Name: "Level"}
	out := s.Apply(e)

	fmt.Println(out[len(out)-1].ResultSet)

}
Output:

[45]

func (Sum) Apply

func (s Sum) Apply(e []Event) []Event

func (Sum) String

func (s Sum) String() string

type Time

type Time struct {
	Expire time.Duration
	Unit   lexer.Token
}

func (*Time) Apply

func (w *Time) Apply(e []Event) []Event

func (*Time) String

func (w *Time) String() string

type TimeBatch

type TimeBatch struct {
	Start  time.Time
	End    time.Time
	Expire time.Duration
	Unit   lexer.Token
}

func (*TimeBatch) Apply

func (w *TimeBatch) Apply(e []Event) []Event

func (*TimeBatch) String

func (w *TimeBatch) String() string

type Where

type Where interface {
	Apply(input any) bool
	String() string
}

type Window

type Window interface {
	Apply(e []Event) []Event
	String() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL